一个基于Mqtt的小项目,服务器采用mosquitto,客户端有Python,C,Android三种,涉及SSL加密,传输内容:文字图片。
应用消息 Application Message MQTT协议通过网络传输应用数据。应用消息通过MQTT传输时,它们有关联的服务质量(QoS)和主题(Topic)。
客户端 Client
使用MQTT的程序或设备。客户端总是通过网络连接到服务端。它可以
- 发布应用消息给其它相关的客户端。
- 订阅以请求接受相关的应用消息。
- 取消订阅以移除接受应用消息的请求。
- 从服务端断开连接。
服务端 Server
一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。服务端
- 接受来自客户端的网络连接。
- 接受客户端发布的应用消息。
- 处理客户端的订阅和取消订阅请求。
- 转发应用消息给符合条件的已订阅客户端。
data:image/s3,"s3://crabby-images/96aee/96aee8f2d76f1a45d2598657d3077cbad96c9c2b" alt=""
工作流:
服务器先启动,然后客户端订阅相关的Topic。Client A 和C发布主题为:Question的What's the temperature?。Client B因为订阅了Question这个Topic,所以可以收到信息,Client B收到信息做判断后发布答案Topic: Temperture出去,订阅了相关Topic的Client A 和Client C能接收到37°。
C的客户端:
//mqttclient.c
#include
#include
#include
#include
#include "MQTTClient.h"
#include
#include
#define NUM_THREADS 2
#define ADDRESS "tcp://xx.xxx.xx.xxx:1883"
#define CLIENTID "ExampleClient_pub"
#define SUB_CLIENTID "ExampleClient_sub" //更改此处客户端ID
#define TOPICPUB "Question" //更改发送的话题
#define TOPICSUB "temperature"
#define QOS 1
#define TIMEOUT 10000L
#define DISCONNECT "out"
int CONNECT = 1;
volatile MQTTClient_deliveryToken deliverytoken;
long PAYLOADLEN;
char* PAYLOAD;
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliverytoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
int i;
char* payloadptr;
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: \n");
payloadptr = message->payload;
if (strcmp(payloadptr, DISCONNECT) == 0) {
printf("\n out!!");
CONNECT = 0;
}
for (i = 0; i < message->payloadlen; i++) {
putchar(*payloadptr++);
}
printf("\n");
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
void *pubClient(void *threadid) {
long tid;
tid = (long)threadid;
int count = 0;
printf("Hello World! It's me, thread #%ld!\n", tid);
//声明一个MQTTClient
MQTTClient client;
//初始化MQTT Client选项
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
//#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }
MQTTClient_message pubmsg = MQTTClient_message_initializer;
//声明消息token
MQTTClient_deliveryToken token;
int rc;
//使用参数创建一个client,并将其赋值给之前声明的client
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
//使用MQTTClient_connect将client连接到服务器,使用指定的连接选项。成功则返回MQTTCLIENT_SUCCESS
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
PAYLOAD = "What's the temperature";
// printf("%s\n", PAYLOAD);
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = (int)strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 0;
//循环发布
while (CONNECT) {
MQTTClient_publishMessage(client, TOPICPUB, &pubmsg, &token);
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPICPUB, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
// thread sleep
usleep(2000000L);
}
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
}
void *subClient(void *threadid) {
long tid;
tid = (long)threadid;
printf("Hello World! It's me, thread #%ld!\n", tid);
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
int ch;
MQTTClient_create(&client, ADDRESS, SUB_CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
//设置回调函数
MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q
MQTTClient_subscribe(client, TOPICSUB, QOS);
do
{
ch = getchar();
} while (ch != 'Q' && ch != 'q');
//quit
MQTTClient_unsubscribe(client, TOPICSUB);
MQTTClient_disconnect(client, 10000);
MQTTClient_destroy(&client);
pthread_exit(NULL);
}
int main(int argc, char* argv[])
{
pthread_t threads[NUM_THREADS];
pthread_create(&threads[0], NULL, subClient, (void *)0);
pthread_create(&threads[1], NULL, pubClient, (void *)1);
pthread_exit(NULL);
}
data:image/s3,"s3://crabby-images/f2327/f23276381203fecd74fad41701aa9b859fff41df" alt=""