TCP 网关 本文将为大家介绍一个基于 Netty + Protobuf 构建的高性能 TCP 网关开源组件。该组件部署业务化运行2年以上,实现TCP 双向通道通信,维持高并发在线长连接,优化传输字节码等。 安装 从 GitHub(克隆这个工程,并将它作为一个依赖包添加到 Maven 项目中。 使用 1. 创建 TCP 服务 基于 Spring 配置文件:spring-tcp-server.xml 启动服务器 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="" xmlns:xsi="" xsi:schemaLocation="" default-autowire="byName"> <!-- tcp server config start. --> <bean id="tcpServer" class="com.linkedkeeper.tcp.connector.tcp.server.TcpServer" init-method="init" destroy-method="shutdown"> <!-- port is tcp server port --> <property name="port" value="2000"/> </bean> <bean id="tcpSessionManager" class="com.linkedkeeper.tcp.connector.tcp.TcpSessionManager"> <property name="maxInactiveInterval" value="500"/> <!-- you can add listener to listen session event, include session create, destroy and so on. --> <property name="sessionListeners"> <list> <ref bean="logSessionListener"/> </list> </property> </bean> <!-- logSessionListener is related tcpSessionManager, those listener should implements SessionListener --> <bean id="logSessionListener" class="com.linkedkeeper.tcp.connector.api.listener.LogSessionListener"/> <!-- tcp sender is a container that can send message to client from server --> <bean id="tcpSender" class="com.linkedkeeper.tcp.remoting.TcpSender"> <constructor-arg ref="tcpConnector"/> </bean> <!-- server config is combine the config, don't modify --> <bean id="serverConfig" class="com.linkedkeeper.tcp.connector.tcp.config.ServerTransportConfig"> <constructor-arg ref="tcpConnector"/> <constructor-arg ref="proxy"/> <constructor-arg ref="notify"/> </bean> <!-- tcp connector is container that manage the connection between server and client --> <bean id="tcpConnector" class="com.linkedkeeper.tcp.connector.tcp.TcpConnector" init-method="init" destroy-method="destroy"/> <!-- notify proxy is proxy that implement send notify to client --> <bean id="notify" class="com.linkedkeeper.tcp.notify.NotifyProxy"> <constructor-arg ref="tcpConnector"/> </bean> <!-- default tcp server config end. --> <!-- this proxy is your proxy that can receive message from client --> <bean id="proxy" class="com.linkedkeeper.tcp.server.TestSimpleProxy"/> </beans> 说明:
上面的配置都是默认的,你可以不更改,但是你可能需要换个 TCP 端口。 例子1 创建测试代理用于从客户端接收消息 import; import; import com.linkedkeeper.tcp.connector.tcp.codec.MessageBuf; import; import; import com.linkedkeeper.tcp.invoke.ApiProxy; import com.linkedkeeper.tcp.message.MessageWrapper; import com.linkedkeeper.tcp.message.SystemMessage; public class TestSimpleProxy implements ApiProxy { public MessageWrapper invoke(SystemMessage sMsg, MessageBuf.JMTransfer message) { ByteString body = message.getBody(); if (message.getCmd() == 1000) { try { Login.MessageBufPro.MessageReq messageReq = Login.MessageBufPro.MessageReq.parseFrom(body); if (messageReq.getCmd().equals(Login.MessageBufPro.CMD.CONNECT)) { return new MessageWrapper(MessageWrapper.MessageProtocol.CONNECT, message.getToken(), null); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } else if (message.getCmd() == 1002) { try { Login.MessageBufPro.MessageReq messageReq = Login.MessageBufPro.MessageReq.parseFrom(body); if (messageReq.getCmd().equals(Login.MessageBufPro.CMD.HEARTBEAT)) { MessageBuf.JMTransfer.Builder resp = Protocol.generateHeartbeat(); return new MessageWrapper(MessageWrapper.MessageProtocol.HEART_BEAT, message.getToken(), resp); } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } return null; } } 输入参数
例子2 发送通知到客户端 private NotifyProxy notify; final int timeout = 10 * 1000; final int NOTIFY = 3; public boolean send(long seq, String sessionId, int cmd, ByteString body) throws Exception { boolean success = false; MessageBuf.JMTransfer.Builder builder = generateNotify(sessionId, seq, cmd, body); if (builder != null) { MessageWrapper wrapper = new MessageWrapper(MessageWrapper.MessageProtocol.NOTIFY, sessionId, builder); int ret = notify.notify(seq, wrapper, timeout); if (ret == Constants.NOTIFY_SUCCESS) { success = true; } else if (ret == Constants.NOTIFY_NO_SESSION) { /** no session on this machine **/ success = true; } } else { /** no session in the cache **/ success = true; } return success; } /** * session */ final String VERSION = "version"; final String DEVICE_ID = "deviceId"; final String PLATFORM = "platform"; final String PLATFORM_VERSION = "platformVersion"; final String TOKEN = "token"; final String APP_KEY = "appKey"; final String TIMESTAMP = "timestamp"; final String SIGN = "sign"; /** * need session into redis, then when you notify you can get info from redis by session */ final Map<String, Map<String, Object>> testSessionMap = null; protected MessageBuf.JMTransfer.Builder generateNotify(String sessionId, long seq, int cmd, ByteString body) throws Exception { Map<String, Object> map = testSessionMap.get(sessionId); MessageBuf.JMTransfer.Builder builder = MessageBuf.JMTransfer.newBuilder(); builder.setVersion(String.valueOf(map.get(VERSION))); builder.setDeviceId(String.valueOf(map.get(DEVICE_ID))); builder.setCmd(cmd); builder.setSeq(seq); builder.setFormat(NOTIFY); builder.setFlag(0); builder.setPlatform(String.valueOf(map.get(PLATFORM))); builder.setPlatformVersion(String.valueOf(map.get(PLATFORM_VERSION))); builder.setToken(String.valueOf(map.get(TOKEN))); builder.setAppKey(String.valueOf(map.get(APP_KEY))); builder.setTimeStamp(String.valueOf(map.get(TIMESTAMP))); builder.setSign(String.valueOf(map.get(SIGN))); builder.setBody(body); return builder; } 2. 创建 TCP 客户端 支持 iOS,Android,C++ 等语言构建的客户端 3. 序列化 Protobuf Java /protobuf/protoc --proto_path=/protobuf/ --java_out=/protobuf/MessageBuf.proto Object-C protoc --plugin=/protobuf/protoc-gen-objc MessageBuf.proto --object_out="/protobuf/" 附件: 你可以点击 下载 protobuf 编译器 本文受原创保护,未经作者授权,禁止转载。 (文/张松然) ©著作权归作者所有 |