|
|
|
An Easy RPC framework based on Netty, ZooKeeper and Spring. GitHub: https://github.com/linkedkeeper/easy-rpc RPC,即 Remote Procedure Call(远程过程调用),调用远程计算机上的服务,就像调用本地服务一样。RPC 可以很好的解耦系统,如 WebService 就是一种基于 Http 协议的 RPC。 EasyRpc 框架使用的一些技术所解决的问题:
服务端发布服务 一个服务接口 public interface HelloService {
String hello(String name);
String hello(Person person);
}一个服务实现 public class HelloServiceImpl implements HelloService {
public String hello(String name) {
return "Hello! " + name;
}
public String hello(Person person) {
return "Hello! " + person.getFirstName() + " " + person.getLastName();
}
}spring-server.xml 配置文件 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:easyrpc="http://www.linkedkeeper.com/schema/easyrpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.linkedkeeper.com/schema/easyrpc http://www.linkedkeeper.com/schema/easyrpc/easyrpc.xsd"> <bean id="helloService" class="com.linkedkeeper.easyrpc.test.server.HelloServiceImpl"/> <easyrpc:provider id="HelloProvider" interface="com.linkedkeeper.easyrpc.test.client.HelloService" alias="1.0" ref="helloService"/> <easyrpc:server id="rpcServer" protocol="easyrpc" port="18868"/> </beans> 服务在启动的时候通过 Spring 加载自定义 Bean public class EasyRpcNamespaceHandler extends NamespaceHandlerSupport {
@Override
public void init() {
registerBeanDefinitionParser("provider",
new EasyRpcBeanDefinitionParser(ProviderBean.class, true));
registerBeanDefinitionParser("consumer",
new EasyRpcBeanDefinitionParser(ConsumerBean.class, true));
registerBeanDefinitionParser("server",
new EasyRpcBeanDefinitionParser(ServerBean.class, true));
}
}通过实现 export 启动 Netty Server,并注册服务到 Server /**
* Using implements InitializingBean
*/
@Override
public void afterPropertiesSet() throws Exception {
propertiesInit();
export();
}
/**
* 发布服务
*
* @throws Exception the init error exception
*/
protected void export() throws Exception {
if (!exported) {
for (ServerConfig serverConfig : serverConfigs) {
try {
serverConfig.start();
// 注册接口
RpcServer server = serverConfig.getServer();
server.registerProcessor(this);
} catch (Exception e) {
logger.error("Catch exception server.", e);
}
}
exported = true;
}
}客户端调用服务 Junit Test @RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-client.xml")
public class HelloServiceTest {
@Autowired
private RpcClient rpcClient = null;
@Test
public void helloTest1() {
HelloService helloService = rpcClient.create(HelloService.class);
String result = helloService.hello("World");
System.out.println(result);
Assert.assertEquals("Hello! World", result);
}
@Test
public void helloTest2() {
HelloService helloService = rpcClient.create(HelloService.class);
Person person = new Person("Yong", "Huang");
String result = helloService.hello(person);
System.out.println(result.toString());
Assert.assertEquals("Hello! Yong Huang", result);
}
@Test
public void helloFutureTest1() throws ExecutionException, InterruptedException {
IAsyncObjectProxy helloService = rpcClient.createAsync(HelloService.class);
RpcFuture result = helloService.call("hello", "World");
Assert.assertEquals("Hello! World", result.get());
}
@Test
public void helloFutureTest2() throws ExecutionException, InterruptedException {
IAsyncObjectProxy helloService = rpcClient.createAsync(HelloService.class);
Person person = new Person("Yong", "Huang");
RpcFuture result = helloService.call("hello", person);
Assert.assertEquals("Hello! Yong Huang", result.get());
}
@After
public void setTear() {
if (rpcClient != null) {
rpcClient.stop();
}
}
}spring-client.xml 配置文件 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:easyrpc="http://www.linkedkeeper.com/schema/easyrpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.linkedkeeper.com/schema/easyrpc http://www.linkedkeeper.com/schema/easyrpc/easyrpc.xsd"> <easyrpc:consumer id="rpcClient" url="127.0.0.1:18868" interface="com.linkedkeeper.easyrpc.client.RpcClient" alias="1.0" timeout="3"/> </beans> 客户端调用使用代理模式调用服务 public <T> T create(Class<T> interfaceClass) {
if (proxyInstances.containsKey(interfaceClass)) {
return (T) proxyInstances.get(interfaceClass);
} else {
Object proxy = Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new ObjectProxy(interfaceClass, timeout)
);
proxyInstances.put(interfaceClass, proxy);
return (T) proxy;
}
}代理类,为 RpcFuture 设置超时,否则会出现卡死现象 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class == method.getDeclaringClass()) {
String name = method.getName();
if ("equals".equals(name)) {
return proxy == args[0];
} else if ("hashCode".equals(name)) {
return System.identityHashCode(proxy);
} else if ("toString".equals(name)) {
return proxy.getClass().getName() + "@" +
Integer.toHexString(System.identityHashCode(proxy)) +
", with InvocationHandler " + this;
} else {
throw new IllegalStateException(String.valueOf(method));
}
}
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
RpcClientHandler handler = ConnectManager.getInstance().chooseHandler();
RpcFuture rpcFuture = handler.sendRequest(request);
return rpcFuture.get(timeout, TimeUnit.SECONDS);
}通过 ConsumerFactoryBean 注入 Spring Bean public class ConsumerFactoryBean<T>
extends ConsumerConfig<T> implements FactoryBean<T> {
private transient T bean = null;
private transient Class<?> objectType = null;
@Override
public T getObject() throws Exception {
bean = refer();
return bean;
}
@Override
public Class<?> getObjectType() {
try {
objectType = getProxyClass();
} catch (Exception e) {
objectType = null;
}
return objectType;
}
@Override
public boolean isSingleton() {
return true;
}
}通过 refer 启动 Client Server protected T refer() {
if (proxyIns != null)
return proxyIns;
try {
proxyIns = (T) getProxyClass().newInstance();
initConnections();
} catch (Exception e) {
throw new RuntimeException("Build consumer proxy error!", e);
}
return proxyIns;
}
private void initConnections() {
RpcClient client = (RpcClient) proxyIns;
client.initClient(url, connectTimeout);
}
protected Class<?> getProxyClass() {
if (proxyClass != null) {
return proxyClass;
}
try {
if (StringUtils.isNotBlank(interfaceClass)) {
this.proxyClass = Class.forName(interfaceClass);
} else {
throw new Exception("consumer.interfaceId, null, interfaceId must be not null");
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
return proxyClass;
}通过对 connectFuture.channel().closeFuture 听见监听,实现断线自动重练 private void connect(final Bootstrap b, final InetSocketAddress remotePeer) {
final ChannelFuture connectFuture = b.connect(remotePeer);
connectFuture.channel().closeFuture().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
future.channel().eventLoop().schedule(new Runnable() {
public void run() {
LOGGER.warn("Attempting to reconnect.");
clearConnectedServer();
connect(b, remotePeer);
}
}, 3, TimeUnit.SECONDS);
}
});
connectFuture.addListener(new ChannelFutureListener() {
public void operationComplete(final ChannelFuture future) throws Exception {
if (future.isSuccess()) {
RpcClientHandler handler
= future.channel().pipeline().get(RpcClientHandler.class);
addHandler(handler);
} else {
LOGGER.error("Failed to connect.", future.cause());
}
}
});
}基于 Protostuff 实现序列化和反序列化工具 public abstract class SerializationUtils {
public SerializationUtils() {
}
public static byte[] serialize(Object object) {
if(object == null) {
return null;
} else {
ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
try {
ObjectOutputStream ex = new ObjectOutputStream(baos);
ex.writeObject(object);
ex.flush();
} catch (IOException var3) {
throw new IllegalArgumentException("Failed", var3);
}
return baos.toByteArray();
}
}
public static Object deserialize(byte[] bytes) {
if(bytes == null) {
return null;
} else {
try {
ObjectInputStream ex =
new ObjectInputStream(new ByteArrayInputStream(bytes));
return ex.readObject();
} catch (IOException var2) {
throw new IllegalArgumentException("Failed", var2);
} catch (ClassNotFoundException var3) {
throw new IllegalStateException("Failed", var3);
}
}
}
}由于处理的是 TCP 消息,TCP 的粘包处理 Handler。 channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,0)) 消息编解码时开始4个字节表示消息的长度,也就是消息编码的时候,先写消息的长度,再写消息。 本文受原创保护,未经作者授权,禁止转载。 linkedkeeper.com (文/张松然) ©著作权归作者所有 |