消息处理器IoHandlerAdapter实现类: 启动时连接失败自动重连: 断线重连: 断线重连功能参考:https://blog.csdn.net/qq_34928194/article/details/105204583 这里采用过滤器的方式: 使用Mina的KeepAliveFilter实现心跳:(了解Mina的KeepAliveFilter心跳机制) (1) 新建类实现KeepAliveMessageFactory (2) 会话管理中加入KeepAliveFilter 断开连接:Mina2.0+版本:调用connector的dispose()方法 加入断线重连后如何彻底断开客户端连接: 经过尝试 (1) 如果使用过滤器方式的断线重连,断开连接(即使删除了重连过滤器)后依然会进行重连(当然重连失败,一直重连),代码如下: (2) 如果使用监听器方式的断线重连,删除监听器后断开连接就完全断开连接了,代码如下: 本文使用的Mina自带的基于文本的编解码器TextLineCodecFactory,根据回车换行(windows下是rn,linux下是r)来断点传输数据,且在解码器中解决了半包粘包问题。 适用场景:报文为文本字符串类型的,且以换行符为数据分割符(当然可以利用TextLineCodecFactory的另外两个构造方法来自定义数据分割符)。 完整项目代码联系获取 1.引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.mina/mina-core --> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.7</version> </dependency>
2.创建连接客户端
NioSocketConnector connector = new NioSocketConnector(); // 创建连接客户端 connector.setConnectTimeoutMillis(30000); // 设置连接超时 TextLineCodecFactory factory = new TextLineCodecFactory(Charset.forName("UTF-8")); factory.setDecoderMaxLineLength(Integer.MAX_VALUE); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(factory)); connector.getSessionConfig().setReceiveBufferSize(Integer.MAX_VALUE); // 设置接收缓冲区的大小 connector.getSessionConfig().setSendBufferSize(Integer.MAX_VALUE);// 设置输出缓冲区的大小 connector.setDefaultRemoteAddress(new InetSocketAddress(IP, Port));// 设置默认访问地址 connector.getSessionConfig().setTcpNoDelay(true); connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); connector.setHandler(new DockHandler()); ConnectFuture future = connector.connect(); future.awaitUninterruptibly(); // 等待连接创建成功 IoSession session = future.getSession(); // 获取会话
public class DockHandler extends IoHandlerAdapter { @Override public void messageReceived(IoSession session, Object message) throws Exception { super.messageReceived(session, message); String msg = (String)message; log.info("收到消息"); // 消息处理... } @Override public void sessionCreated(IoSession session) throws Exception { super.sessionCreated(session); log.info("创建连接"); } @Override public void sessionOpened(IoSession session) throws Exception { super.sessionOpened(session); log.info("建立连接"); } @Override public void sessionClosed(IoSession session) throws Exception { super.sessionClosed(session); log.info("连接关闭"); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { super.sessionIdle(session, status); log.info("重新连接"); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { super.exceptionCaught(session, cause); log.info("会话异常!"); if (session != null) { session.closeNow(); } } @Override public void messageSent(IoSession session, Object message) throws Exception { super.messageSent(session, message); } @Override public void inputClosed(IoSession session) throws Exception { super.inputClosed(session); } }
3.连接失败自动重连&断线重连功能
for (;;) { try { ConnectFuture future = connector.connect(); future.awaitUninterruptibly(); // 等待连接创建成功 IoSession session = future.getSession(); // 获取会话 log.info("连接服务端[成功]"); break; } catch (RuntimeIoException e) { log.error("连接服务端[失败],5S后重新连接"); Thread.sleep(5000); } }
connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() { @Override public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception { for (;;) { try { Thread.sleep(3000); ConnectFuture future = connector.connect(); future.awaitUninterruptibly();// 等待连接创建成功 IoSession session = future.getSession();// 获取会话 if (session.isConnected()) { log.info("断线重连成功"); break; } } catch (Exception ex) { log.info("断线重连失败,3s再次连接"); } } } });
4.心跳设置
public class MyKeepAliveMessageFactory implements KeepAliveMessageFactory { @Override public boolean isRequest(IoSession session, Object message) { return false; } @Override public boolean isResponse(IoSession session, Object message) { return false; } @Override public Object getRequest(IoSession session) { return "#";// 心跳内容为# } @Override public Object getResponse(IoSession session, Object request) { return null; } }
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 设置会话属性读写通道10秒无操作则视为空闲状态 MyKeepAliveMessageFactory heartBeat = new MyKeepAliveMessageFactory(); KeepAliveFilter keepAliveFilter = new KeepAliveFilter(heartBeat, IdleStatus.READER_IDLE, KeepAliveRequestTimeoutHandler.NOOP);// 无心跳响应时不采取任何操作 keepAliveFilter.setForwardEvent(false); keepAliveFilter.setRequestInterval(10);// 心跳间隔10s keepAliveFilter.setRequestTimeout(1);// 超时时间1s connector.getFilterChain().addLast("heart", keepAliveFilter);
5.Mina客户端完全断开连接的方法&加入断线重连功能后如何完全断开连接
connector.getFilterChain().remove("reconnection"); connector.dispose(); // 尝试无效
connector.removeListener(ioListener);// ioListener为创建的断线重连的监听器对象 connector.dispose();
6.粘包半包问题的解决
对于自定义报文(多为16进制报文),解决Mina数据接收的半包粘包问题需要我们自定义编解码器,在解码器中解决这一问题。
一个来自未来博文的url:Mina自定义解码器,解决半包粘包问题
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算