netty怎么设置网络通信,netty怎么设置自定义协议

首页 > 实用技巧 > 作者:YD1662024-02-21 13:46:07

@Slf4j public class NettyClient implements IClient { private final Channel channel; private final ServerAddress ServerAddress; public NettyClient(Channel channel, ServerAddress serverAddress) { this.channel = channel; this.serverAddress = serverAddress; } @Override public boolean asyncReq(Request request) { channel.writeAndFlush(request); return true; } @Override public Response syncReq(Request request) throws ApiException { return syncReq(request,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT_UNIT); } @Override public Response syncReq(Request request, long timeout, TimeUnit timeUnit) throws ApiException { log.info("request:[{}]",request); SyncWriteFuture syncWriteFuture = new SyncWriteFuture(request.getRequestId()); SyncWriteCache.put(request.getRequestId(),syncWriteFuture); channel.writeAndFlush(request); Response response; try { response = syncWriteFuture.get(timeout,timeUnit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ApiException("请求失败"); } catch (ExecutionException e) { throw new ApiException("请求失败"); } catch (TimeoutException e) { throw new ApiException("请求超时"); } finally { SyncWriteCache.clear(request.getRequestId()); } return response; } @Override public ServerAddress getServerAddress() { return this.serverAddress; } @Override public void close() { if(null != channel && channel.isActive()) { channel.close(); } } @Override public boolean isConnected() { return channel.isActive(); } @Override public String getClientId() { return channel.id().asLongText(); } }

②.客户端工厂

客户端工厂负责简化客户端的获取方式。

netty怎么设置网络通信,netty怎么设置自定义协议(13)

@Slf4j public abstract class AbstractClientFactory implements IClientFactory { public static final Cache<ServerAddress, IClient> LONG_CONNECT_CACHE = CacheBuilder.newBuilder() // 单机长连接上限,超过上限采用LRU淘汰 .maximumSize(65535) .expireAfterAccess(360, TimeUnit.DAYS) // 设置缓存移除监听器 .removalListener((RemovalListener<ServerAddress, IClient>) notic -> { log.debug("移除client[{}][{}][{}]", notic.getKey(), notic.getValue(), notic.getCause()); }).build(); protected Abstract IClient createClient(ServerAddress address) throws ApiException; @Override public IClient get(ServerAddress address) throws ApiException { IClient client; if (address.isLongConnection()) { try { client = LONG_CONNECT_CACHE.get(address, () -> createClient(address)); } catch (ExecutionException e) { log.error(e.getMessage(), e); throw new ApiException("连接失败"); } } else { client = createClient(address); } return client; } @Override public List<IClient> getCachedClients() { List<IClient> clientList = new ArrayList<>(); clientList.addAll(LONG_CONNECT_CACHE.asMap().values()); return clientList; } @Override public void remove(ServerAddress address) { LONG_CONNECT_CACHE.invalidate(address); } }

@Slf4j public class NettyClientFactory extends AbstractClientFactory{ private static final String byte_BUF_POOL_NAME = "bytebuf.pool"; private static final String IO_RATIO_NAME = "ioratio"; public static final NioEventLoopGroup WORKER_GROUP = new NioEventLoopGroup(); public static final ByteBufAllocator BYTE_BUF_ALLOCATOR; static { WORKER_GROUP.setIoRatio(SystemPropertyUtil.getInt(IO_RATIO_NAME, 100)); if (SystemPropertyUtil.getBoolean(BYTE_BUF_POOL_NAME, false)) { BYTE_BUF_ALLOCATOR = PooledByteBufAllocator.DEFAULT; } else { BYTE_BUF_ALLOCATOR = UnpooledByteBufAllocator.DEFAULT; } } @Override protected IClient createClient(ServerAddress address) throws ApiException { //启动引导类 final bootstrap bootstrap = new Bootstrap(); //绑定工作线程组 bootstrap.group(NettyClientFactory.WORKER_GROUP); //设置低延迟 bootstrap.option(ChannelOption.TCP_NODELAY, true); //设置让关闭的的端口尽早可以使用 bootstrap.option(ChannelOption.SO_REUSEADDR, true); //若是长连接,开启SOCKET默认的心跳机制,短连接则不不开启 bootstrap.option(ChannelOption.SO_KEEPALIVE, address.isLongConnection()); //使用内存池 bootstrap.option(ChannelOption.ALLOCATOR, NettyClientFactory.BYTE_BUF_ALLOCATOR); //设置IO类型 bootstrap.channel(NioSocketChannel.class); //设置初始化连接配置 bootstrap.handler(InitializerFactory.get(address)); //若是设置的连接超时时间 bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, address.getConnectTimeout()); String targetIp = address.getIp(); int targetPort = address.getPort(); ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIp, targetPort)); if (future.awaitUninterruptibly(address.getConnectTimeout()) && future.isSuccess() && future.channel().isActive()) { log.info("[{}]连接成功,服务器端地址[{}:{}]",targetIp,targetPort); Channel channel = future.channel(); return new NettyClient(channel,address); } else { future.cancel(true); future.channel().close(); StringBuilder errorMsg = new StringBuilder(); errorMsg.append("服务器[").append(targetIp).append(":").append(targetPort).append("]连接失败"); throw new ApiException(errorMsg.toString()); } } }

③.客户端处理器

客户端处理器主要负责完成对接收事件的处理工作。

netty怎么设置网络通信,netty怎么设置自定义协议(14)

netty怎么设置网络通信,netty怎么设置自定义协议(15)

④.通道初始化器

客户端通道初始化器,主要负责完成Channel 的初始化配置工作,主要包括编解码器设置、消息处理器设置等。

netty怎么设置网络通信,netty怎么设置自定义协议(16)

上一页12345下一页

栏目热文

文档排行

本站推荐

Copyright © 2018 - 2021 www.yd166.com., All Rights Reserved.