在EMQ官网上拿张图哈^_^;; 本来就是在做物联网项目嘛,MQTT协议肯定是必须要的嘛,但之前不是我来负责这一块的,就没有对MQTT以及EMQ有更多的理解,只是会用能用罢了,要是让我说个1 2 3 ,肯定是不行的呀,最近有一个项目刚好我来对接开发,而且是MQTT协议的,由于受测试环境的限制,只能在本地笔记本上window上搭建一套EMQ环境来试试啦; 一、EMQX安装; 1、下载EMQ X Broker压缩包;这里放了官网最新版的https://www.emqx.io/downloads/broker/v4.1.0/emqx-windows-v4.1.0.zip;下面文章中的使用的是 “emqx-windows10-v3.2.0.zip”; 2、在笔记本本地目录直接解压就行啦; 3、进入解压后的目录;C:OldData_Win7emqx-windows10-v3.2.0emqxbin; 4、进入EMQX启动命令目录;按下shift键+鼠标右键,选择 ‘open PowerShell window here’; 5、启动EMQX服务;输入./emqx console;会弹出erlang的后台界面; 输入./emqx stop;即可停止服务啦; 输入./emqx start;即可轻松启动服务啦;(推荐) 6、启动成功后,EMQX自带的dashboard既可以访问啦;https://localhost:18083/#/login 默认u/p:admin/public 7、到这里我们的EMQ X Broker就已经可以使用了; 二、java连接EMQX,并发起订阅发布的操作; 1、因为客户端和服务端是都可以发布和订阅的;所以我们就以发布和订阅来区分吧; 订阅端sub:(启动main方法就可以连接到我们本地的EMQX了,哦,我这里使用了共享订阅的,random策略) 2、发布端pub; 我们的订阅端也已经收到这条pub的数据了。。。 3、我们也可以在 dashboard来观察到这两个会话; 三、总结; 1、共享订阅 包含一个主题过滤器和订阅选项,唯一的区别在于共享订阅的主题过滤器格式必须是 $share/{ShareName}/{filter} 这种形式。这几个的字段的含义分别是: $share 前缀表明这将是一个共享订阅 共享订阅使得订阅端能够负载均衡地消费消息,但 MQTT 协议并没有规定 Server 应当使用什么负载均衡策略。作为参考,EMQ X 提供了 random, round_robin, sticky, hash 四种策略供用户自行选择。 2、connetcLost方法 我们最好要在这里做一些事情,抛出异常或者打印日志,这样你才能知道到底什么时候连接丢失了的 3、 消息发布服务质量 如何选择QoS: 4、CleanSession参数;若是共享订阅模式下,需要将此字段配置为true,保证在订阅端有一个掉线的情况下,可以清除掉session信息,这样就不会收到订阅的pub信息了,避免了数据丢失的问题。
package com.daopin.project.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SubMsg0 { private static final Logger logger = LoggerFactory.getLogger(SubMsg0.class); //private static String topic = "$share/group/test1"; //private static String topic = "$queue/test1"; //private static String topic = "test1"; private static int qos = 0; private static String broker = "tcp://127.0.0.1:1883"; private static String userName = "COAP"; private static String passWord = "coap"; private static String clientId = "nokia-mqtt-cluster-0"; /** * 有三种消息发布服务质量: * <p> * “至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。 * <p> * “至少一次”,确保消息到达,但消息重复可能会发生。 * <p> * “只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。 */ private static MqttClient connect(String clientId) throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); //String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.206:1883"}; // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录, // 这里设置为true表示每次连接到服务器都以新的身份连接(客户端再次上线时,将不再关心之前所有的订阅关系以及离线消息) // 这里设置为false表示客户端再次上线时,还需要处理之前的离线消息,而之前的订阅关系也会持续生效 connOpts.setCleanSession(true); connOpts.setUserName(userName); connOpts.setPassword(passWord.toCharArray()); connOpts.setConnectionTimeout(10); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 connOpts.setKeepAliveInterval(20); connOpts.setAutomaticReconnect(true); connOpts.setMaxInflight(10); //connOpts.setServerURIs(uris); //setWill方法(遗嘱),如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 //connOpts.setWill(topic, "close".getBytes(), 2, true); // MemoryPersistence设置clientid的保存形式,默认为以内存保存 MqttClient mqttClient = new MqttClient(broker, clientId, persistence); mqttClient.setCallback(new MqttCallbackExtended() { @Override public void connectionLost(Throwable throwable) { //在断开连接时调用 连接丢失后,一般在这里面进行重连 logger.warn("connectionLost ... ; We will do something ..."); } @Override public void messageArrived(String topic, MqttMessage mqttMessage) { //接收已经预订的发布 logger.info("topic - > " + topic + ", mqttMessage - > " + mqttMessage); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用 logger.warn("deliveryComplete ... ; We will do something ..."); } @Override public void connectComplete(boolean reconnect, String serverURI) { /*subscribe();*/ //连接成功,需要上传客户端所有的订阅关系 logger.warn("connectComplete ... ; We will do something ..."); } }); mqttClient.connect(connOpts); return mqttClient; } public static void sub(MqttClient mqttClient, String topic) throws MqttException { int[] Qos = {qos}; String[] topics = {topic}; mqttClient.subscribe(topics, Qos); logger.info("sub >> " + topic); } private static void runsub(String clientId, String topic) throws MqttException { MqttClient mqttClient = connect(clientId); if (mqttClient != null) { sub(mqttClient, topic); } } public static void main(String[] args) throws MqttException { runsub(clientId, "$queue/qdq02mzl6kvs/coap-server/uplinkMsg"); } }
package com.daopin.project.mqtt; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Date; import java.util.concurrent.*; public class PubMsg { private static final Logger logger = LoggerFactory.getLogger(PubMsg.class); private static int qos = 0; private static String broker = "tcp://127.0.0.1:1883"; private static String userName = "COAP"; private static String passWord = "coap"; public static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(); public static ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy()); private static MqttClient connect(String clientId, String userName, String password) throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(userName); connOpts.setPassword(password.toCharArray()); connOpts.setConnectionTimeout(10); connOpts.setKeepAliveInterval(20); //String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"}; //connOpts.setServerURIs(uris); //起到负载均衡和高可用的作用 MqttClient mqttClient = new MqttClient(broker, clientId, persistence); mqttClient.setCallback(new PushCallback("test")); mqttClient.connect(connOpts); return mqttClient; } private static void pub(MqttClient sampleClient, String msg, String topic) throws Exception { while (true){ MqttMessage message = new MqttMessage((msg+" "+new Date()).getBytes()); message.setQos(qos); message.setRetained(false); sampleClient.publish(topic, message); logger.info("pub-->" + message); Thread.sleep(3000L); } } private static void publish(String str, String clientId, String topic) { try { MqttClient mqttClient = connect(clientId, userName, passWord); if (mqttClient != null) { pub(mqttClient, str, topic); } if (mqttClient != null) { mqttClient.disconnect(); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws MqttException { //singleThreadPool.execute( ()-> publish("message content", "868333030030008", "sl6o3lbk94xg/868333030030008/uplinkMsg/0/data")); publish("AAAA0000", "nokia-mqtt-server", "qdq02mzl6kvs/coap-server/uplinkMsg"); } } class PushCallback implements MqttCallback { private static final Logger logger = LoggerFactory.getLogger(PushCallback.class); private String threadId; public PushCallback(String threadId) { this.threadId = threadId; } @Override public void connectionLost(Throwable cause) { //在断开连接时调用 连接丢失后,一般在这里面进行重连 logger.warn("connectionLost ... ; We will do something ..."); } @Override public void deliveryComplete(IMqttDeliveryToken token) { //System.out.println("deliveryComplete---------" + token.isComplete()); //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用 logger.warn("deliveryComplete ... ; We will do something ..."); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String msg = new String(message.getPayload()); System.out.println(threadId + " " + msg); } }
{ShareName} 是一个不包含 “/”, “+” 以及 “#” 的字符串。订阅会话通过使用相同的 {ShareName} 表示共享同一个订阅,匹配该订阅的消息每次只会发布给其中一个会话
{filter} 即非共享订阅中的主题过滤器
QoS 级别越高,流程越复杂,系统资源消耗越大。应用程序可以根据自己的网络场景和业务需求,选择合适的 QoS 级别,比如在同一个子网内部的服务间的消息交互往往选用 QoS 0;而通过互联网的实时消息通信往往选用 QoS 1;QoS 2 使用的场景相对少一些,适合一些支付请求之类的要求较高的场景。
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算