MQTT

3470阅读 0评论2020-01-20 可怜的猪头
分类:LINUX

https://blog.csdn.net/jinwang3526/article/details/81537297

一个基于Mqtt的小项目,服务器采用mosquitto,客户端有Python,C,Android三种,涉及SSL加密,传输内容:文字图片。

应用消息 Application Message MQTT协议通过网络传输应用数据。应用消息通过MQTT传输时,它们有关联的服务质量(QoS)和主题(Topic)。

客户端 Client
使用MQTT的程序或设备。客户端总是通过网络连接到服务端。它可以

服务端 Server
一个程序或设备,作为发送消息的客户端和请求订阅的客户端之间的中介。服务端


工作流:
服务器先启动,然后客户端订阅相关的Topic。Client A 和C发布主题为:Question的What's the temperature?。Client B因为订阅了Question这个Topic,所以可以收到信息,Client B收到信息做判断后发布答案Topic: Temperture出去,订阅了相关Topic的Client A 和Client C能接收到37°。
C的客户端:
//mqttclient.c
#include
#include
#include
#include
#include "MQTTClient.h"
#include
#include


#define NUM_THREADS 2
#define ADDRESS "tcp://xx.xxx.xx.xxx:1883"
#define CLIENTID "ExampleClient_pub"
#define SUB_CLIENTID    "ExampleClient_sub" //更改此处客户端ID
#define TOPICPUB    "Question"  //更改发送的话题
#define TOPICSUB    "temperature"
#define QOS         1
#define TIMEOUT     10000L
#define DISCONNECT  "out"

int CONNECT = 1;
volatile MQTTClient_deliveryToken deliverytoken;
long PAYLOADLEN;
char* PAYLOAD;

void delivered(void *context, MQTTClient_deliveryToken dt)
{
  printf("Message with token value %d delivery confirmed\n", dt);
  deliverytoken = dt;
}

int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
  int i;
  char* payloadptr;

  printf("Message arrived\n");
  printf("    topic: %s\n", topicName);
  printf("  message: \n");

  payloadptr = message->payload;
  if (strcmp(payloadptr, DISCONNECT) == 0) {
    printf("\n out!!");
    CONNECT = 0;
  }

  for (i = 0; i < message->payloadlen; i++) {
    putchar(*payloadptr++);
  }
  printf("\n");

  MQTTClient_freeMessage(&message);
  MQTTClient_free(topicName);
  return 1;
}

void connlost(void *context, char *cause)
{
  printf("\nConnection lost\n");
  printf("     cause: %s\n", cause);
}

void *pubClient(void *threadid) {
  long tid;
  tid = (long)threadid;
  int count = 0;
  printf("Hello World! It's me, thread #%ld!\n", tid);
  //声明一个MQTTClient
  MQTTClient client;
  //初始化MQTT Client选项
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  //#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }
  MQTTClient_message pubmsg = MQTTClient_message_initializer;
  //声明消息token
  MQTTClient_deliveryToken token;
  int rc;
  //使用参数创建一个client,并将其赋值给之前声明的client
  MQTTClient_create(&client, ADDRESS, CLIENTID,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 20;
  conn_opts.cleansession = 1;
  //使用MQTTClient_connect将client连接到服务器,使用指定的连接选项。成功则返回MQTTCLIENT_SUCCESS
  if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
  {
    printf("Failed to connect, return code %d\n", rc);
    exit(EXIT_FAILURE);
  }
  PAYLOAD = "What's the temperature";
  // printf("%s\n", PAYLOAD);
  pubmsg.payload = PAYLOAD;
  pubmsg.payloadlen = (int)strlen(PAYLOAD);
  pubmsg.qos = QOS;
  pubmsg.retained = 0;
  //循环发布
  while (CONNECT) {
    MQTTClient_publishMessage(client, TOPICPUB, &pubmsg, &token);
    printf("Waiting for up to %d seconds for publication of %s\n"
             "on topic %s for client with ClientID: %s\n",
             (int)(TIMEOUT/1000), PAYLOAD, TOPICPUB, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
    // thread sleep
    usleep(2000000L);
  }

  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);
}

void *subClient(void *threadid) {
  long tid;
  tid = (long)threadid;
  printf("Hello World! It's me, thread #%ld!\n", tid);

  MQTTClient client;
  MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
  int rc;
  int ch;

  MQTTClient_create(&client, ADDRESS, SUB_CLIENTID,
                    MQTTCLIENT_PERSISTENCE_NONE, NULL);
  conn_opts.keepAliveInterval = 20;
  conn_opts.cleansession = 1;
  //设置回调函数
  MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);

  if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
  {
    printf("Failed to connect, return code %d\n", rc);
    exit(EXIT_FAILURE);
  }
  printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
         "Press Q to quit\n\n", TOPICSUB, SUB_CLIENTID, QOS);
  MQTTClient_subscribe(client, TOPICSUB, QOS);

  do
  {
    ch = getchar();
  } while (ch != 'Q' && ch != 'q');
  //quit
  MQTTClient_unsubscribe(client, TOPICSUB);
  MQTTClient_disconnect(client, 10000);
  MQTTClient_destroy(&client);

  pthread_exit(NULL);
}

int main(int argc, char* argv[])
{
  pthread_t threads[NUM_THREADS];
  pthread_create(&threads[0], NULL, subClient, (void *)0);
  pthread_create(&threads[1], NULL, pubClient, (void *)1);
  pthread_exit(NULL);
}


上一篇:docker部署rabbitmq及mqtt
下一篇:cpu使用率计算