401 lines
9.2 KiB
C
Raw Normal View History

2025-05-14 11:26:23 +08:00
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include "utils/lite-utils.h"
#include "utils/iot_import.h"
#include "utils/iot_export.h"
#include "iot_export_mqtt.h"
#include "mqtt_instance.h"
#define DPRINT(...) UTILS_printf(__VA_ARGS__);
#define TAG "mqtt"
#define LOGI(...) BK_LOGI(TAG, ##__VA_ARGS__)
#define LOGW(...) BK_LOGW(TAG, ##__VA_ARGS__)
#define LOGE(...) BK_LOGE(TAG, ##__VA_ARGS__)
#define LOGD(...) BK_LOGD(TAG, ##__VA_ARGS__)
static void *mqtt_client = NULL;
static void *mqtt_rbuf = NULL;
static void *mqtt_wbuf = NULL;
static int abort_request = 0;
typedef struct mqtt_instance_event_s {
void (*event_cb)(int event, void *ctx);
void *ctx;
struct mqtt_instance_event_s *next;
} mqtt_instance_event_t;
static mqtt_instance_event_t *first_event = NULL;
static void event_list_add(mqtt_instance_event_t *ev)
{
ev->next = first_event;
first_event = ev;
}
static void event_list_remove(mqtt_instance_event_t *ev)
{
mqtt_instance_event_t **ep, *e1;
/* remove connection from list */
ep = &first_event;
while ((*ep) != NULL) {
e1 = *ep;
if (e1 == ev)
*ep = ev->next;
else
ep = &e1->next;
}
}
static void event_handle(void *pcontext, void *pclient, iotx_mqtt_event_msg_pt msg)
{
int event = 0;
//LOGE("mqtt event_handle-> %d\n",msg->event_type);
switch (msg->event_type) {
case IOTX_MQTT_EVENT_RECONNECT:
event = MQTT_INSTANCE_EVENT_CONNECTED;
LOGE("IOTX_MQTT_EVENT_RECONNECT !!!\n");
break;
case IOTX_MQTT_EVENT_DISCONNECT:
event = MQTT_INSTANCE_EVENT_DISCONNECTED;
break;
case IOTX_MQTT_EVENT_PING_RESP:
LOGE("MQTT_INSTANCE_EVENT_PING_RESP !!!\n");
event = MQTT_INSTANCE_EVENT_PING_RESP;
break;
default:
return;
}
mqtt_instance_event_t *ev;
for (ev = first_event; ev; ev = ev->next) {
if (ev->event_cb)
ev->event_cb(event, ev->ctx);
}
}
void *mqtt_get_instance()
{
return mqtt_client;
}
void mqtt_remove_instance()
{
mqtt_client = NULL;
}
int mqtt_set_instance(void* mqtt_t)
{
if (mqtt_client || mqtt_t == NULL)
return FAIL_RETURN;
mqtt_client = mqtt_t;
return SUCCESS_RETURN;
}
int mqtt_init_instance_poka(char* host,int port,char *client, char *user, char *password, int maxMsgSize){
if (mqtt_client)
return 1;
// IOT_OpenLog("masterslave");
// IOT_SetLogLevel(IOT_LOG_DEBUG);
iotx_conn_info_pt pconn_info;
iotx_mqtt_param_t mqtt_params;
int ret = IOT_SetupConnInfo(user, user, user, (void **)&pconn_info);
if (ret != SUCCESS_RETURN)
return -1;
mqtt_rbuf = LITE_malloc(maxMsgSize);
if (!mqtt_rbuf)
return -1;
mqtt_wbuf = LITE_malloc(maxMsgSize);
if (!mqtt_wbuf)
goto fail;
/* Initialize MQTT parameter */
memset(&mqtt_params, 0x0, sizeof(mqtt_params));
mqtt_params.port = port;
mqtt_params.host = host;
mqtt_params.client_id = client;
mqtt_params.username = user;
mqtt_params.password = password;
mqtt_params.pub_key = NULL;
mqtt_params.request_timeout_ms = 2000;
mqtt_params.clean_session = 0;
mqtt_params.keepalive_interval_ms = 30000;
mqtt_params.pread_buf = mqtt_rbuf;
mqtt_params.read_buf_size = maxMsgSize;
mqtt_params.pwrite_buf = mqtt_wbuf;
mqtt_params.write_buf_size = maxMsgSize;
mqtt_params.handle_event.h_fp = event_handle;
mqtt_params.handle_event.pcontext = NULL;
/* Construct a MQTT client with specify parameter */
mqtt_client = IOT_MQTT_Construct(&mqtt_params);
if (!mqtt_client)
goto fail;
return 0;
fail:
mqtt_deinit_instance();
return -1;
}
/*
* 1: mqtt instance have been init
* 0: mqtt instance init success
* IOT_MQTT_Construct success, MQTT connected.
* -1: mqtt instance init fail
*/
int mqtt_init_instance(char *productKey, char *deviceName, char *deviceSecret, int maxMsgSize)
{
if (mqtt_client)
return 1;
// IOT_OpenLog("masterslave");
// IOT_SetLogLevel(IOT_LOG_DEBUG);
iotx_conn_info_pt pconn_info;
iotx_mqtt_param_t mqtt_params;
int ret = IOT_SetupConnInfo(productKey, deviceName, deviceSecret, (void **)&pconn_info);
if (ret != SUCCESS_RETURN)
return -1;
mqtt_rbuf = LITE_malloc(maxMsgSize);
if (!mqtt_rbuf)
return -1;
mqtt_wbuf = LITE_malloc(maxMsgSize);
if (!mqtt_wbuf)
goto fail;
/* Initialize MQTT parameter */
memset(&mqtt_params, 0x0, sizeof(mqtt_params));
mqtt_params.port = pconn_info->port;
mqtt_params.host = pconn_info->host_name;
mqtt_params.client_id = pconn_info->client_id;
mqtt_params.username = pconn_info->username;
mqtt_params.password = pconn_info->password;
mqtt_params.pub_key = pconn_info->pub_key;
mqtt_params.request_timeout_ms = 2000;
mqtt_params.clean_session = 0;
mqtt_params.keepalive_interval_ms = 60000;
mqtt_params.pread_buf = mqtt_rbuf;
mqtt_params.read_buf_size = maxMsgSize;
mqtt_params.pwrite_buf = mqtt_wbuf;
mqtt_params.write_buf_size = maxMsgSize;
mqtt_params.handle_event.h_fp = event_handle;
mqtt_params.handle_event.pcontext = NULL;
/* Construct a MQTT client with specify parameter */
mqtt_client = IOT_MQTT_Construct(&mqtt_params);
if (!mqtt_client)
goto fail;
return 0;
fail:
mqtt_deinit_instance();
return -1;
}
int mqtt_deinit_instance(void)
{
abort_request = 1;
if (mqtt_rbuf) {
LITE_free(mqtt_rbuf);
mqtt_rbuf = NULL;
}
if (mqtt_wbuf) {
LITE_free(mqtt_wbuf);
mqtt_wbuf = NULL;
}
if (mqtt_client) {
IOT_MQTT_Destroy(&mqtt_client);
mqtt_client = NULL;
}
mqtt_instance_event_t *ev, *ev_next;
for (ev = first_event; ev; ev = ev_next) {
ev_next = ev->next;
event_list_remove(ev);
LITE_free(ev);
}
first_event = NULL;
abort_request = 0;
return 0;
}
int mqtt_set_event_cb(void (*event_cb)(int event, void *ctx), void *ctx)
{
if (!event_cb)
return -1;
//helloyifa
//if (!mqtt_client)
//return -1;
mqtt_instance_event_t *ev = LITE_malloc(sizeof(mqtt_instance_event_t));
if (!ev)
return -1;
memset(ev, 0, sizeof(mqtt_instance_event_t));
ev->event_cb = event_cb;
ev->ctx = ctx;
event_list_add(ev);
return 0;
}
typedef struct mqtt_instance_topic_s {
char *topic;
void (*cb)(char *topic, int topic_len, void *payload, int payload_len, void *ctx);
void *ctx;
struct mqtt_instance_topic_s *next;
} mqtt_instance_topic_t;
static mqtt_instance_topic_t *first_topic = NULL;
static void topic_list_add(mqtt_instance_topic_t *t)
{
t->next = first_topic;
first_topic = t;
}
static void topic_list_remove(mqtt_instance_topic_t *t)
{
mqtt_instance_topic_t **tp, *t1;
/* remove connection from list */
tp = &first_topic;
while ((*tp) != NULL) {
t1 = *tp;
if (t1 == t)
*tp = t->next;
else
tp = &t1->next;
}
}
static void subscriber_cb(void *ctx, void *pclient, iotx_mqtt_event_msg_pt msg)
{
mqtt_instance_topic_t *t = ctx;
iotx_mqtt_topic_info_pt ptopic_info = (iotx_mqtt_topic_info_pt) msg->msg;
if (t && t->cb) {
t->cb((char *)ptopic_info->ptopic, ptopic_info->topic_len,
(void *)ptopic_info->payload , ptopic_info->payload_len, t->ctx);
}
}
int mqtt_subscribe(char *topic, void (*cb)(char *topic, int topic_len, void *payload, int len, void *ctx), void *ctx)
{
if (!topic || !cb)
return -1;
if (!mqtt_client)
return -1;
mqtt_instance_topic_t *t = LITE_malloc(sizeof(mqtt_instance_topic_t));
if (!t)
return -1;
memset(t, 0, sizeof(mqtt_instance_topic_t));
t->topic = LITE_malloc(strlen(topic) + 1);
if (!t->topic) {
LITE_free(t);
return -1;
}
strcpy(t->topic, topic);
t->cb = cb;
t->ctx = ctx;
int ret = IOT_MQTT_Subscribe(mqtt_client, t->topic, IOTX_MQTT_QOS1, subscriber_cb, t);
if (ret < 0) {
LITE_free(t->topic);
LITE_free(t);
return -1;
}
topic_list_add(t);
return 0;
}
int mqtt_unsubscribe(char *topic)
{
mqtt_instance_topic_t *t, *t_next;
if (!mqtt_client)
return -1;
for (t = first_topic; t; t = t_next) {
t_next = t->next;
if (strcmp(t->topic, topic) == 0) {
IOT_MQTT_Unsubscribe(mqtt_client, topic);
topic_list_remove(t);
LITE_free(t->topic);
LITE_free(t);
t = NULL;
}
}
return 0;
}
int mqtt_publish(char *topic, int qos, void *data, int len)
{
iotx_mqtt_topic_info_t mqtt_msg;
if (!mqtt_client)
return -1;
memset(&mqtt_msg, 0x0, sizeof(iotx_mqtt_topic_info_t));
mqtt_msg.qos = qos;
mqtt_msg.retain = 0;
mqtt_msg.dup = 0;
mqtt_msg.payload = (void *)data;
mqtt_msg.payload_len = len;
if (IOT_MQTT_Publish(mqtt_client, topic, &mqtt_msg) < 0) {
DPRINT("IOT_MQTT_Publish failed\n");
return -1;
}
return 0;
}