|
An Easy RPC framework based on Netty, ZooKeeper and Spring. GitHub: https://github.com/linkedkeeper/easy-rpc RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC 可以很好的解耦系统,如 WebService 就是一种基于 Http 协议的 RPC。 EasyRpc 框架使用的一些技术所解决的问题:
服务端发布服务 一个服务接口 public interface HelloService { String hello(String name); String hello(Person person); } 一个服务实现 public class HelloServiceImpl implements HelloService { public String hello(String name) { return "Hello! " + name; } public String hello(Person person) { return "Hello! " + person.getFirstName() + " " + person.getLastName(); } } spring-server.xml 配置文件 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:easyrpc="http://www.linkedkeeper.com/schema/easyrpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.linkedkeeper.com/schema/easyrpc http://www.linkedkeeper.com/schema/easyrpc/easyrpc.xsd"> <bean id="helloService" class="com.linkedkeeper.easyrpc.test.server.HelloServiceImpl"/> <easyrpc:provider id="HelloProvider" interface="com.linkedkeeper.easyrpc.test.client.HelloService" alias="1.0" ref="helloService"/> <easyrpc:server id="rpcServer" protocol="easyrpc" port="18868"/> </beans> 服务在启动的时候通过 Spring 加载自定义 Bean public class EasyRpcNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { registerBeanDefinitionParser("provider", new EasyRpcBeanDefinitionParser(ProviderBean.class, true)); registerBeanDefinitionParser("consumer", new EasyRpcBeanDefinitionParser(ConsumerBean.class, true)); registerBeanDefinitionParser("server", new EasyRpcBeanDefinitionParser(ServerBean.class, true)); } } 通过实现 export 启动 Netty Server,并注册服务到 Server /** * Using implements InitializingBean */ @Override public void afterPropertiesSet() throws Exception { propertiesInit(); export(); } /** * 发布服务 * * @throws Exception the init error exception */ protected void export() throws Exception { if (!exported) { for (ServerConfig serverConfig : serverConfigs) { try { serverConfig.start(); // 注册接口 RpcServer server = serverConfig.getServer(); server.registerProcessor(this); } catch (Exception e) { logger.error("Catch exception server.", e); } } exported = true; } } 客户端调用服务 Junit Test @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring-client.xml") public class HelloServiceTest { @Autowired private RpcClient rpcClient = null; @Test public void helloTest1() { HelloService helloService = rpcClient.create(HelloService.class); String result = helloService.hello("World"); System.out.println(result); Assert.assertEquals("Hello! World", result); } @Test public void helloTest2() { HelloService helloService = rpcClient.create(HelloService.class); Person person = new Person("Yong", "Huang"); String result = helloService.hello(person); System.out.println(result.toString()); Assert.assertEquals("Hello! Yong Huang", result); } @Test public void helloFutureTest1() throws ExecutionException, InterruptedException { IAsyncObjectProxy helloService = rpcClient.createAsync(HelloService.class); RpcFuture result = helloService.call("hello", "World"); Assert.assertEquals("Hello! World", result.get()); } @Test public void helloFutureTest2() throws ExecutionException, InterruptedException { IAsyncObjectProxy helloService = rpcClient.createAsync(HelloService.class); Person person = new Person("Yong", "Huang"); RpcFuture result = helloService.call("hello", person); Assert.assertEquals("Hello! Yong Huang", result.get()); } @After public void setTear() { if (rpcClient != null) { rpcClient.stop(); } } } spring-client.xml 配置文件 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:easyrpc="http://www.linkedkeeper.com/schema/easyrpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.linkedkeeper.com/schema/easyrpc http://www.linkedkeeper.com/schema/easyrpc/easyrpc.xsd"> <easyrpc:consumer id="rpcClient" url="127.0.0.1:18868" interface="com.linkedkeeper.easyrpc.client.RpcClient" alias="1.0" timeout="3"/> </beans> 客户端调用使用代理模式调用服务 public <T> T create(Class<T> interfaceClass) { if (proxyInstances.containsKey(interfaceClass)) { return (T) proxyInstances.get(interfaceClass); } else { Object proxy = Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new ObjectProxy(interfaceClass, timeout) ); proxyInstances.put(interfaceClass, proxy); return (T) proxy; } } 代理类,为 RpcFuture 设置超时,否则会出现卡死现象 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (Object.class == method.getDeclaringClass()) { String name = method.getName(); if ("equals".equals(name)) { return proxy == args[0]; } else if ("hashCode".equals(name)) { return System.identityHashCode(proxy); } else if ("toString".equals(name)) { return proxy.getClass().getName() + "@" + Integer.toHexString(System.identityHashCode(proxy)) + ", with InvocationHandler " + this; } else { throw new IllegalStateException(String.valueOf(method)); } } RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); RpcClientHandler handler = ConnectManager.getInstance().chooseHandler(); RpcFuture rpcFuture = handler.sendRequest(request); return rpcFuture.get(timeout, TimeUnit.SECONDS); } 通过 ConsumerFactoryBean 注入 Spring Bean public class ConsumerFactoryBean<T> extends ConsumerConfig<T> implements FactoryBean<T> { private transient T bean = null; private transient Class<?> objectType = null; @Override public T getObject() throws Exception { bean = refer(); return bean; } @Override public Class<?> getObjectType() { try { objectType = getProxyClass(); } catch (Exception e) { objectType = null; } return objectType; } @Override public boolean isSingleton() { return true; } } 通过 refer 启动 Client Server protected T refer() { if (proxyIns != null) return proxyIns; try { proxyIns = (T) getProxyClass().newInstance(); initConnections(); } catch (Exception e) { throw new RuntimeException("Build consumer proxy error!", e); } return proxyIns; } private void initConnections() { RpcClient client = (RpcClient) proxyIns; client.initClient(url, connectTimeout); } protected Class<?> getProxyClass() { if (proxyClass != null) { return proxyClass; } try { if (StringUtils.isNotBlank(interfaceClass)) { this.proxyClass = Class.forName(interfaceClass); } else { throw new Exception("consumer.interfaceId, null, interfaceId must be not null"); } } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } return proxyClass; } 通过对 connectFuture.channel().closeFuture 听见监听,实现断线自动重练 private void connect(final Bootstrap b, final InetSocketAddress remotePeer) { final ChannelFuture connectFuture = b.connect(remotePeer); connectFuture.channel().closeFuture().addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { future.channel().eventLoop().schedule(new Runnable() { public void run() { LOGGER.warn("Attempting to reconnect."); clearConnectedServer(); connect(b, remotePeer); } }, 3, TimeUnit.SECONDS); } }); connectFuture.addListener(new ChannelFutureListener() { public void operationComplete(final ChannelFuture future) throws Exception { if (future.isSuccess()) { RpcClientHandler handler = future.channel().pipeline().get(RpcClientHandler.class); addHandler(handler); } else { LOGGER.error("Failed to connect.", future.cause()); } } }); } 基于 Protostuff 实现序列化和反序列化工具 public abstract class SerializationUtils { public SerializationUtils() { } public static byte[] serialize(Object object) { if(object == null) { return null; } else { ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); try { ObjectOutputStream ex = new ObjectOutputStream(baos); ex.writeObject(object); ex.flush(); } catch (IOException var3) { throw new IllegalArgumentException("Failed", var3); } return baos.toByteArray(); } } public static Object deserialize(byte[] bytes) { if(bytes == null) { return null; } else { try { ObjectInputStream ex = new ObjectInputStream(new ByteArrayInputStream(bytes)); return ex.readObject(); } catch (IOException var2) { throw new IllegalArgumentException("Failed", var2); } catch (ClassNotFoundException var3) { throw new IllegalStateException("Failed", var3); } } } } 由于处理的是 TCP 消息,TCP 的粘包处理 Handler。 channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0)) 消息编解码时开始4个字节表示消息的长度,也就是消息编码的时候,先写消息的长度,再写消息。 本文受原创保护,未经作者授权,禁止转载。 linkedkeeper.com (文/张松然) ©著作权归作者所有 |
