一、mosquitto broker篇 1.依赖环境安装: yum install gcc-c++ yum install openssl-develyum install c-ares-develyum install libuuid-devle 注:某些系统可能自带这些依赖环境,直接过滤 2.websocket支持 git clone
一、mosquitto broker篇
1.依赖环境安装:
yum install gcc-c++
yum install openssl-develyum install c-ares-develyum install libuuid-devle
安装libwebsockets需要cmake命令,需要2.8以上版本,yum安装的可能是2.6的版本,需要手动下载2.8以上的版本进行安装,请自行百度或谷歌下载,本文档使用cmake-3.2.3-Linux-x86_64
,安装后直接设置环境变量即可。
1)tar -zxvf cmake-3.2.3-Linux-x86_64.tar.gz
vi /etc/profile
注:需要卸载老版本的cmake ,yum remove cmake
3)查看版本
cmake --version
4)安装websocket
tar -zxvf libwebsockets-v1.5-stable.tar.gz
cd libwebsockets/
mkdir build
cd build/
cmake ..
make install
3.mosquitto安装
wget wget
http://mosquitt
o.org/files/
source/mosquitto-1.4.11.tar.gz
tar zxvf mosquitto-1.4.11.tar.gz
cd mosquitto-1.4.11
WITH_TLS:=no
WITH_TLS_PSK:=no
WITH_WEBSOCKETS:=yes
make
make install
cd /etc/mosquitto/
cp mosquitto.conf.example mosquitto.conf
vi mosquitto.conf
listener 81
protocol websockets
5.启动mosquitto
mosquitto -c /etc/mosquitto/mosquitto.conf
ln -s /usr/local/lib/libwebsockets
.so.5 /usr/lib64/libwebsockets
.so.5
二、java篇
javaclient端使用的 eclipse.paho 包,Pom配置如下:
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>
<!-- 另外大牛写的mqtt客户端 -->
<dependency>
<groupId>org.fuse
source.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.14</version>
</dependency>
import java.uti
l.Sc
Anner;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.Mqtt
message;
import org.eclipse.paho.client.mqttv3.MqttTopi
c;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
* [简要描述]:<br/>
* [详细描述]:<br/>
*
* @author xiaolinlin
* @version 1.0,2017年6月8日
* @since smile V100R001C00
*/
public class Mqtt
TESTClient
public static final @R_
489_10495@ng HOST = "tcp://ip
:1881";
public static final @R_
489_10495@ng TOPIC = "mqtt/topic";
private static final @R_
489_10495@ng clientid = "client124";
private MqttClient client;
private MqttConnectOptions options;
private @R_
489_10495@ng userName = "admin";
private void start() throws MqttException
try
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,
默认为以内存保存
client = new MqttClient(HOST,clientid,new MemoryPersistence()
);
// MQTT的连接设置
options = new MqttConnectOptions(
);
// 设置是否清空session,这里如果设置为
false表示服务器会保留客户端的连接记录,这里设置为true表示
每次连接到服务器都以新的身份连接
// 设置连接的密码
// 设置超时时间 单位为秒
option
s.setConnectionTimeout(10
);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个
方法并没有重连的机制
option
s.setKeepAliveInterval(
20);
// 设置回调
MqttTopic topic = client.getTopic(TOPIC
);
option
s.setWill(topic,"close".getBytes(),2,tru
E);
client.setCall
BACk(new ClientCall
BACk(client,options)
);
@R_
489_10495@ng[] topic1 =
{TOPIC};
client.subscribe(topic1,Qos
);
@R_
489_10495@ng msg = "";
if (msg.contains("colse"))
break;
}
}
}
}
}
}
public static void main(@R_
489_10495@ng[] args) throws MqttException
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCall
BACk;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.Mqtt
message;
import org.eclipse.paho.client.mqttv3.MqttS
ecurityException;
* [简要描述]:<br/>
* [详细描述]:<br/>
*
* @author xiaolinlin
* @version 1.0,2017年6月8日
* @since smile V100R001C00
*/
public class ClientCall
BACk implements MqttCall
BACk
private MqttClient client;
private MqttConnectOptions options;
public ClientCall
BACk(MqttClient client,MqttConnectOptions options)
{
}
@Override
public void connectionLost(Throwable caus
E)
try
}
// TODO Auto-generated catch block
}
// TODO Auto-generated catch block
}
// while(!sampleClient.isConnected())
{
// sampleClient.connect(connOpts
);
// //客户端每次上线都必须
上传自己所有涉及的
订阅关系,否则可能会导致消息接收延迟
// sampleClient.subscribe(topicFilters,qos
);
// } catch (Exception
E) {
// }
// }
}
@Override
// subscribe后得到的消息会执行到这里面
if(msg.contains("close"))
{
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token)
Sy
stem.out.println("deliveryComplete---------" + token.isComplete()
);
}
}
2:server端
import java.io.UnsupportedEncodingException;
import java.uti
l.Sc
Anner;
import org.eclipse.paho.client.mqttv3.MqttCall
BACk;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.Mqtt
message;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopi
c;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
* [简要描述]:<br/>
* [详细描述]:<br/>
*
* @author xiaolinlin
* @version 1.0,2017年6月8日
* @since smile V100R001C00
*/
public class MqttServer
public static void main(@R_
489_10495@ng[] args) throws InterruptedException,MqttException
@R_
489_10495@ng serverURI = "tcp://ip
:1881";
@R_
489_10495@ng clientId = "server001";
MqttClient client = null;
try
// MemoryPersistence设置clientid的保存形式,
默认为以内存保存
MqttClientPersistence persistence = new MemoryPersistence(
);
// 异步 待研究
// MqttAsyncClient client = new MqttAsyncClient(serverURI,
// clientId,persistence
);
client = new MqttClient(serverURI,clientId,persistenc
E);
MqttConnectOptions options = new MqttConnectOptions(
);
// 设置是否清空session,这里如果设置为
false表示服务器会保留客户端的连接记录,设置为true表示
每次连接到服务器都以新的身份连接
// option
s.setUserName("admin"
);
// 超时时间 单位S
option
s.setConnectionTimeout(5
);
// 设置会话心跳时间 单位S 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个
方法并没有重连的机制
option
s.setKeepAliveInterval(10
);
MqttCall
BACk call
BACk = new ReciveCall
BACk(client,options
);
@R_
489_10495@ng topicStr = "mqtt/topic";
MqttTopic topic = client.getTopic(topicStr
);
@R_
489_10495@ng msg = "";
if(msg.contains("close"))
{
break;
}
}
}
// TODO Auto-generated catch block
}
}
}
public static void pushMsg(MqttTopic topic,@R_
489_10495@ng msg)
MqttDeliveryToken token;
try
// 是否保留
byte[] payload = msg.getBytes("utf-8"
);
// 等待完成
token.waitForCompletion(
);
// 打印完成状态
Sy
stem.out.println(token.isComplete() + "========"
);
}
catch (MqttPersistenceException
E)
// TODO Auto-generated catch block
}
// TODO Auto-generated catch block
}
catch (UnsupportedEncodingException
E)
// TODO Auto-generated catch block
}
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCall
BACk;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.Mqtt
message;
import org.eclipse.paho.client.mqttv3.MqttS
ecurityException;
* [简要描述]:<br/>
* [详细描述]:<br/>
*
* @author xiaolinlin
* @version 1.0,2017年6月8日
* @since smile V100R001C00
*/
public class ReciveCall
BACk implements MqttCall
BACk
public ReciveCall
BACk(MqttClient client,MqttConnectOptions options)
{
}
private MqttClient client;
private MqttConnectOptions options;
@Override
public void connectionLost(Throwable caus
E)
try
}
// TODO Auto-generated catch block
}
// TODO Auto-generated catch block
}
}
@Override
// subscribe后得到的消息会执行到这里面
if(msg.contains("close"))
{
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token)
Sy
stem.out.println("deliveryComplete---------" + token.isComplete()
);
}
}
三、js篇
<!DOCTYPE html>
<html>
<head>
<
Meta http-equiv="X-UA-Compatible" content="IE=edge">
<
Meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<link rel="stylesheet" href="">
<script language="JavaScript" src="jquery.js"></script>
<script language="JavaScript" src="mqttws31-min.js"></script>
<script type="text/javascript">
var topic = "mqtt/topic";
var hostname = "120.76.216.235";
var port = 81;
var reconnectTimeout = 2000;
var mqtt;
var options;
var clientId = "ClientID_0022";
//var username = 'LTAItKYvBCx06Dbz';
mqtt = new Paho.MQTT.Client(hostname,port,clientId
);
timeout: 3,
// useSSL: useTLS,
cleanSession: true,
on
success: onConnect,//if true(default) the client and server persistent state is
deleted on
successful connect.
setTimeout(MQTTconnect,reconnectTimeout
);
}
};
mqtt
.onConnectionLost = onConnectionLost;
}
}
$('#status').val('Connected to ' + hostname + ':' + port
);
// Connection succeeded; subscribe to our topic
mqtt.subscribe(topic,
{qos: 0}
);
}
function onConnectionLost(respons
E) {
setTimeout(MQTTconnect,reconnectTimeout
);
$('#status').val("connection lost: " + response.error
message + ". Reconnec
Ting"
);
};
var payload =
message.payload@R_
489_10495@ng;
$('#ws').prepend('<li>' + topic + ' = ' + payload + '</li>'
);
};
function button_onclick()
{
//这里写你要执行的语句
var tp = $('#sendtopic').val(
);
var val = $('#textsend').val(
);
if(tp==''||val=='')
return;
}
$('#ws').prepend('<li>' + tp + ' = ' + val+ '</li>'
);
};
$(document).ready(function()
{
</script>
</head>
<body>
<h1>Mosquitto Websockets</h1>
<div>
<div>Subscribed to <input type='text' id='topic' disabled />
Status: <input type='text' id='status' size="80" disabled />
</br>
publish to <input type='text' id='sendtopic' /> text <input type='text' id='textsend' size="80"/>
Status: <input type='button' value="send" id='btn' onclick="javascript:button_onclick()"/></div>
<ul id='ws' style="font-family: 'Courier New',Courier,monospace;"></ul>
</div>
</body>
</html>
引用的js自行下载,其中mqttws31-min.js 也是属于eclipse.paho 中的。
附一些需要讨论的问题:
@H_
404_1180@
1.鉴权的
考虑