一个轻量级分布式 RPC 框架 — EasyRpc
您目前处于:技术核心竞争力  2017-06-17

An Easy RPC framework based on Netty, ZooKeeper and Spring.

GitHub: https://github.com/linkedkeeper/easy-rpc


RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC 可以很好的解耦系统,如 WebService 就是一种基于 Http 协议的 RPC。

EasyRpc 框架使用的一些技术所解决的问题:

  • 通信:使用Netty作为通信框架。

  • Spring:使用Spring配置服务,加载Bean。

  • 动态代理:客户端使用代理模式透明化服务调用。

  • 消息编解码:使用Protostuff序列化和反序列化消息。


服务端发布服务

一个服务接口

public interface HelloService {

    String hello(String name);

    String hello(Person person);
}

一个服务实现

public class HelloServiceImpl implements HelloService {

    public String hello(String name) {
        return "Hello! " + name;
    }

    public String hello(Person person) {
        return "Hello! " + person.getFirstName() + " " + person.getLastName();
    }
}

spring-server.xml 配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:easyrpc="http://www.linkedkeeper.com/schema/easyrpc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd
           http://www.linkedkeeper.com/schema/easyrpc
           http://www.linkedkeeper.com/schema/easyrpc/easyrpc.xsd">

    <bean id="helloService" 
        class="com.linkedkeeper.easyrpc.test.server.HelloServiceImpl"/>

    <easyrpc:provider id="HelloProvider" 
                      interface="com.linkedkeeper.easyrpc.test.client.HelloService"
                      alias="1.0" ref="helloService"/>

    <easyrpc:server id="rpcServer" protocol="easyrpc" port="18868"/>

</beans>

服务在启动的时候通过 Spring 加载自定义 Bean

public class EasyRpcNamespaceHandler extends NamespaceHandlerSupport {
    @Override
    public void init() {
        registerBeanDefinitionParser("provider", 
                    new EasyRpcBeanDefinitionParser(ProviderBean.class, true));
        registerBeanDefinitionParser("consumer", 
                    new EasyRpcBeanDefinitionParser(ConsumerBean.class, true));
        registerBeanDefinitionParser("server", 
                    new EasyRpcBeanDefinitionParser(ServerBean.class, true));
    }
}

通过实现 export 启动 Netty Server,并注册服务到 Server

/**
 * Using implements InitializingBean
 */
@Override
public void afterPropertiesSet() throws Exception {
    propertiesInit();
    export();
}

/**
 * 发布服务
 *
 * @throws Exception the init error exception
 */
protected void export() throws Exception {
    if (!exported) {
        for (ServerConfig serverConfig : serverConfigs) {
            try {
                serverConfig.start();
                // 注册接口
                RpcServer server = serverConfig.getServer();
                server.registerProcessor(this);
            } catch (Exception e) {
                logger.error("Catch exception server.", e);
            }
        }
        exported = true;
    }
}

客户端调用服务

Junit Test

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-client.xml")
public class HelloServiceTest {

    @Autowired
    private RpcClient rpcClient = null;

    @Test
    public void helloTest1() {
        HelloService helloService = rpcClient.create(HelloService.class);
        String result = helloService.hello("World");
        System.out.println(result);
        Assert.assertEquals("Hello! World", result);
    }

    @Test
    public void helloTest2() {
        HelloService helloService = rpcClient.create(HelloService.class);
        Person person = new Person("Yong", "Huang");
        String result = helloService.hello(person);
        System.out.println(result.toString());
        Assert.assertEquals("Hello! Yong Huang", result);
    }

    @Test
    public void helloFutureTest1() throws ExecutionException, InterruptedException {
        IAsyncObjectProxy helloService = rpcClient.createAsync(HelloService.class);
        RpcFuture result = helloService.call("hello", "World");
        Assert.assertEquals("Hello! World", result.get());
    }

    @Test
    public void helloFutureTest2() throws ExecutionException, InterruptedException {
        IAsyncObjectProxy helloService = rpcClient.createAsync(HelloService.class);
        Person person = new Person("Yong", "Huang");
        RpcFuture result = helloService.call("hello", person);
        Assert.assertEquals("Hello! Yong Huang", result.get());
    }

    @After
    public void setTear() {
        if (rpcClient != null) {
            rpcClient.stop();
        }
    }
}

spring-client.xml 配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:easyrpc="http://www.linkedkeeper.com/schema/easyrpc"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.linkedkeeper.com/schema/easyrpc
            http://www.linkedkeeper.com/schema/easyrpc/easyrpc.xsd">

    <easyrpc:consumer id="rpcClient" url="127.0.0.1:18868"
                      interface="com.linkedkeeper.easyrpc.client.RpcClient"
                      alias="1.0" timeout="3"/>

</beans>

客户端调用使用代理模式调用服务

public <T> T create(Class<T> interfaceClass) {
    if (proxyInstances.containsKey(interfaceClass)) {
        return (T) proxyInstances.get(interfaceClass);
    } else {
        Object proxy = Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new ObjectProxy(interfaceClass, timeout)
        );
        proxyInstances.put(interfaceClass, proxy);
        return (T) proxy;
    }
}

代理类,为 RpcFuture 设置超时,否则会出现卡死现象

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (Object.class == method.getDeclaringClass()) {
        String name = method.getName();
        if ("equals".equals(name)) {
            return proxy == args[0];
        } else if ("hashCode".equals(name)) {
            return System.identityHashCode(proxy);
        } else if ("toString".equals(name)) {
            return proxy.getClass().getName() + "@" +
                    Integer.toHexString(System.identityHashCode(proxy)) +
                    ", with InvocationHandler " + this;
        } else {
            throw new IllegalStateException(String.valueOf(method));
        }
    }

    RpcRequest request = new RpcRequest();
    request.setRequestId(UUID.randomUUID().toString());
    request.setClassName(method.getDeclaringClass().getName());
    request.setMethodName(method.getName());
    request.setParameterTypes(method.getParameterTypes());
    request.setParameters(args);

    RpcClientHandler handler = ConnectManager.getInstance().chooseHandler();
    RpcFuture rpcFuture = handler.sendRequest(request);
    return rpcFuture.get(timeout, TimeUnit.SECONDS);
}

通过 ConsumerFactoryBean 注入 Spring Bean

public class ConsumerFactoryBean<T> 
    extends ConsumerConfig<T> implements FactoryBean<T> {

    private transient T bean = null;
    private transient Class<?> objectType = null;

    @Override
    public T getObject() throws Exception {
        bean = refer();
        return bean;
    }

    @Override
    public Class<?> getObjectType() {
        try {
            objectType = getProxyClass();
        } catch (Exception e) {
            objectType = null;
        }
        return objectType;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }
}

通过 refer 启动 Client Server

protected T refer() {
    if (proxyIns != null)
        return proxyIns;
    try {
        proxyIns = (T) getProxyClass().newInstance();
        initConnections();
    } catch (Exception e) {
        throw new RuntimeException("Build consumer proxy error!", e);
    }
    return proxyIns;
}

private void initConnections() {
    RpcClient client = (RpcClient) proxyIns;
    client.initClient(url, connectTimeout);
}

protected Class<?> getProxyClass() {
    if (proxyClass != null) {
        return proxyClass;
    }
    try {
        if (StringUtils.isNotBlank(interfaceClass)) {
            this.proxyClass = Class.forName(interfaceClass);
        } else {
            throw new Exception("consumer.interfaceId, null, interfaceId must be not null");
        }
    } catch (Exception e) {
        throw new IllegalStateException(e.getMessage(), e);
    }
    return proxyClass;
}

通过对 connectFuture.channel().closeFuture 听见监听,实现断线自动重练

private void connect(final Bootstrap b, final InetSocketAddress remotePeer) {
    final ChannelFuture connectFuture = b.connect(remotePeer);
    connectFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            future.channel().eventLoop().schedule(new Runnable() {
                public void run() {
                    LOGGER.warn("Attempting to reconnect.");
                    clearConnectedServer();
                    connect(b, remotePeer);
                }
            }, 3, TimeUnit.SECONDS);
        }
    });
    connectFuture.addListener(new ChannelFutureListener() {
        public void operationComplete(final ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                RpcClientHandler handler 
                    = future.channel().pipeline().get(RpcClientHandler.class);
                addHandler(handler);
            } else {
                LOGGER.error("Failed to connect.", future.cause());
            }
        }
    });
}

基于 Protostuff 实现序列化和反序列化工具

public abstract class SerializationUtils {
    public SerializationUtils() {
    }

    public static byte[] serialize(Object object) {
        if(object == null) {
            return null;
        } else {
            ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);

            try {
                ObjectOutputStream ex = new ObjectOutputStream(baos);
                ex.writeObject(object);
                ex.flush();
            } catch (IOException var3) {
                throw new IllegalArgumentException("Failed", var3);
            }

            return baos.toByteArray();
        }
    }

    public static Object deserialize(byte[] bytes) {
        if(bytes == null) {
            return null;
        } else {
            try {
                ObjectInputStream ex = 
                        new ObjectInputStream(new ByteArrayInputStream(bytes));
                return ex.readObject();
            } catch (IOException var2) {
                throw new IllegalArgumentException("Failed", var2);
            } catch (ClassNotFoundException var3) {
                throw new IllegalStateException("Failed", var3);
            }
        }
    }
}

由于处理的是 TCP 消息,TCP 的粘包处理 Handler。

channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0))

消息编解码时开始4个字节表示消息的长度,也就是消息编码的时候,先写消息的长度,再写消息。



本文受原创保护,未经作者授权,禁止转载。 linkedkeeper.com (文/张松然)  ©著作权归作者所有