- NettyClient:Netty 方式的客户端实现
@Slf4j
public class NettyClient implements IClient {
private final Channel channel;
private final ServerAddress ServerAddress;
public NettyClient(Channel channel, ServerAddress serverAddress) {
this.channel = channel;
this.serverAddress = serverAddress;
}
@Override
public boolean asyncReq(Request request) {
channel.writeAndFlush(request);
return true;
}
@Override
public Response syncReq(Request request) throws ApiException {
return syncReq(request,DEFAULT_TIMEOUT,DEFAULT_TIMEOUT_UNIT);
}
@Override
public Response syncReq(Request request, long timeout, TimeUnit timeUnit) throws ApiException {
log.info("request:[{}]",request);
SyncWriteFuture syncWriteFuture = new SyncWriteFuture(request.getRequestId());
SyncWriteCache.put(request.getRequestId(),syncWriteFuture);
channel.writeAndFlush(request);
Response response;
try {
response = syncWriteFuture.get(timeout,timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ApiException("请求失败");
} catch (ExecutionException e) {
throw new ApiException("请求失败");
} catch (TimeoutException e) {
throw new ApiException("请求超时");
} finally {
SyncWriteCache.clear(request.getRequestId());
}
return response;
}
@Override
public ServerAddress getServerAddress() {
return this.serverAddress;
}
@Override
public void close() {
if(null != channel && channel.isActive()) {
channel.close();
}
}
@Override
public boolean isConnected() {
return channel.isActive();
}
@Override
public String getClientId() {
return channel.id().asLongText();
}
}
②.客户端工厂
客户端工厂负责简化客户端的获取方式。
- IClientFactory:客户端工厂接口定义
- abstractClientFactory:客户端工厂接口抽象层实现,提供获取客户端的模板方法,具体则由子类实现。
@Slf4j
public abstract class AbstractClientFactory implements IClientFactory {
public static final Cache<ServerAddress, IClient> LONG_CONNECT_CACHE = CacheBuilder.newBuilder()
// 单机长连接上限,超过上限采用LRU淘汰
.maximumSize(65535)
.expireAfterAccess(360, TimeUnit.DAYS)
// 设置缓存移除监听器
.removalListener((RemovalListener<ServerAddress, IClient>) notic -> {
log.debug("移除client[{}][{}][{}]", notic.getKey(), notic.getValue(), notic.getCause());
}).build();
protected Abstract IClient createClient(ServerAddress address) throws ApiException;
@Override
public IClient get(ServerAddress address) throws ApiException {
IClient client;
if (address.isLongConnection()) {
try {
client = LONG_CONNECT_CACHE.get(address, () -> createClient(address));
} catch (ExecutionException e) {
log.error(e.getMessage(), e);
throw new ApiException("连接失败");
}
} else {
client = createClient(address);
}
return client;
}
@Override
public List<IClient> getCachedClients() {
List<IClient> clientList = new ArrayList<>();
clientList.addAll(LONG_CONNECT_CACHE.asMap().values());
return clientList;
}
@Override
public void remove(ServerAddress address) {
LONG_CONNECT_CACHE.invalidate(address);
}
}
- NettyClientFactory:Netty 客户端工厂实现方式
@Slf4j
public class NettyClientFactory extends AbstractClientFactory{
private static final String byte_BUF_POOL_NAME = "bytebuf.pool";
private static final String IO_RATIO_NAME = "ioratio";
public static final NioEventLoopGroup WORKER_GROUP = new NioEventLoopGroup();
public static final ByteBufAllocator BYTE_BUF_ALLOCATOR;
static {
WORKER_GROUP.setIoRatio(SystemPropertyUtil.getInt(IO_RATIO_NAME, 100));
if (SystemPropertyUtil.getBoolean(BYTE_BUF_POOL_NAME, false)) {
BYTE_BUF_ALLOCATOR = PooledByteBufAllocator.DEFAULT;
} else {
BYTE_BUF_ALLOCATOR = UnpooledByteBufAllocator.DEFAULT;
}
}
@Override
protected IClient createClient(ServerAddress address) throws ApiException {
//启动引导类
final bootstrap bootstrap = new Bootstrap();
//绑定工作线程组
bootstrap.group(NettyClientFactory.WORKER_GROUP);
//设置低延迟
bootstrap.option(ChannelOption.TCP_NODELAY, true);
//设置让关闭的的端口尽早可以使用
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
//若是长连接,开启SOCKET默认的心跳机制,短连接则不不开启
bootstrap.option(ChannelOption.SO_KEEPALIVE, address.isLongConnection());
//使用内存池
bootstrap.option(ChannelOption.ALLOCATOR, NettyClientFactory.BYTE_BUF_ALLOCATOR);
//设置IO类型
bootstrap.channel(NioSocketChannel.class);
//设置初始化连接配置
bootstrap.handler(InitializerFactory.get(address));
//若是设置的连接超时时间
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, address.getConnectTimeout());
String targetIp = address.getIp();
int targetPort = address.getPort();
ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIp, targetPort));
if (future.awaitUninterruptibly(address.getConnectTimeout()) && future.isSuccess() && future.channel().isActive()) {
log.info("[{}]连接成功,服务器端地址[{}:{}]",targetIp,targetPort);
Channel channel = future.channel();
return new NettyClient(channel,address);
} else {
future.cancel(true);
future.channel().close();
StringBuilder errorMsg = new StringBuilder();
errorMsg.append("服务器[").append(targetIp).append(":").append(targetPort).append("]连接失败");
throw new ApiException(errorMsg.toString());
}
}
}
③.客户端处理器
客户端处理器主要负责完成对接收事件的处理工作。
- AbstractChannelHandler:处理器抽象层,定义接受事件处理的模板方法,具体处理逻辑则由子类实现。
- ClientHandler:客户端具体处理器,负责对事件的具体逻辑处理。
④.通道初始化器
客户端通道初始化器,主要负责完成Channel 的初始化配置工作,主要包括编解码器设置、消息处理器设置等。
- AbstractChannelInitializer:通道初始化器抽象层实现,定义通道初始化模板方法,具体初始化哪些组件则从子类获取。