From adb6249aaee87aa57f239636dea67585eee452b3 Mon Sep 17 00:00:00 2001 From: MysticBoy Date: Tue, 26 May 2020 14:42:07 +0800 Subject: [PATCH] Parse Uri , format is mqtt://username:password@hostname:1883/taos/ --- .gitignore | 1 + src/common/inc/tglobal.h | 3 ++ src/common/src/tglobal.c | 12 +++++- src/plugins/mqtt/inc/mqttInit.h | 20 +++++----- src/plugins/mqtt/src/mqttSystem.c | 66 +++++++++++++++++++++---------- src/util/inc/tutil.h | 2 + src/util/src/tutil.c | 16 +++++++- 7 files changed, 89 insertions(+), 31 deletions(-) diff --git a/.gitignore b/.gitignore index 178450e124..bb9a70e9b2 100644 --- a/.gitignore +++ b/.gitignore @@ -64,3 +64,4 @@ CMakeError.log /out/isenseconfig/WSL-Clang-Debug /out/isenseconfig/WSL-GCC-Debug /test/cfg +/src/.vs diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index a8838a2525..163fea24a7 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -94,7 +94,10 @@ extern int32_t tsMaxTables; extern char tsDefaultDB[]; extern char tsDefaultUser[]; extern char tsDefaultPass[]; + extern char tsMqttBrokerAddress[]; +extern char tsMqttBrokerClientId[]; + extern int32_t tsMaxMeterConnections; extern int32_t tsMaxVnodeConnections; extern int32_t tsMaxMgmtConnections; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 2fe518e3e4..65e564833a 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -202,7 +202,7 @@ char tsTimezone[64] = {0}; char tsLocale[TSDB_LOCALE_LEN] = {0}; char tsCharset[TSDB_LOCALE_LEN] = {0}; // default encode string char tsMqttBrokerAddress[128] = {0}; - +char tsMqttBrokerClientId[128] = {0}; int32_t tsMaxBinaryDisplayWidth = 30; @@ -742,6 +742,16 @@ static void doInitGlobalConfig() { cfg.ptrLength = 126; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + + cfg.option = "mqttBrokerClientId"; + cfg.ptr = tsMqttBrokerClientId; + cfg.valType = TAOS_CFG_VTYPE_STRING; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.minValue = 0; + cfg.maxValue = 0; + cfg.ptrLength = 126; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); // socket type; udp by default cfg.option = "sockettype"; diff --git a/src/plugins/mqtt/inc/mqttInit.h b/src/plugins/mqtt/inc/mqttInit.h index 828ff343e4..61f7ef9a45 100644 --- a/src/plugins/mqtt/inc/mqttInit.h +++ b/src/plugins/mqtt/inc/mqttInit.h @@ -19,7 +19,6 @@ extern "C" { #endif - /** * @file * A simple subscriber program that performs automatic reconnections. @@ -38,13 +37,16 @@ extern "C" { * \ref mqttReconnectClient is called, this instance will be passed. */ struct reconnect_state_t { - const char* hostname; - const char* port; - const char* topic; - uint8_t* sendbuf; - size_t sendbufsz; - uint8_t* recvbuf; - size_t recvbufsz; + char* hostname; + char* port; + char* topic; + char* client_id; + char* user_name; + char* password; + uint8_t* sendbuf; + size_t sendbufsz; + uint8_t* recvbuf; + size_t recvbufsz; }; /** @@ -76,7 +78,7 @@ void mqttCleanup(int status, int sockfd, pthread_t* client_daemon); void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code); void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code); #define CLIENTID "taos" -#define TOPIC "/taos/+/+/+/" // taos//// +#define TOPIC "+/+/+/" // path///
/ #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 10ca73dc7c..38741a70ca 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -33,32 +33,54 @@ struct mqtt_client client; pthread_t client_daemon; void* mqtt_conn; -struct reconnect_state_t reconnect_state; - -int32_t mqttInitSystem() { - int rc = 0; - const char* addr; - const char* port; - addr = tsMqttBrokerAddress; - port = "1883"; - reconnect_state.hostname = addr; - reconnect_state.port = port; - reconnect_state.topic = TOPIC; +struct reconnect_state_t recnt_status; +char* topicPath; +int32_t mqttInitSystem() { + int rc = 0; uint8_t sendbuf[2048]; uint8_t recvbuf[1024]; - reconnect_state.sendbuf = sendbuf; - reconnect_state.sendbufsz = sizeof(sendbuf); - reconnect_state.recvbuf = recvbuf; - reconnect_state.recvbufsz = sizeof(recvbuf); + recnt_status.sendbuf = sendbuf; + recnt_status.sendbufsz = sizeof(sendbuf); + recnt_status.recvbuf = recvbuf; + recnt_status.recvbufsz = sizeof(recvbuf); + char* url = tsMqttBrokerAddress; + recnt_status.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL; + recnt_status.password = strstr(url, "@") != NULL ? strbetween(strstr(url, recnt_status.user_name), ":", "@") : NULL; + + if (strstr(url, "@") != NULL) { + recnt_status.hostname = strbetween(url, "@", ":"); + } else if (strstr(strstr(url, "://") + 3, ":") != NULL) { + recnt_status.hostname = strbetween(url, "//", ":"); + + } else { + recnt_status.hostname = strbetween(url, "//", "/"); + } + + char* _begin_hostname = strstr(url, recnt_status.hostname); + if (strstr(_begin_hostname, ":") != NULL) { + recnt_status.port = strbetween(_begin_hostname, ":", "/"); + } else { + recnt_status.port = strbetween("'1883'", "'", "'"); + } + + topicPath = strbetween(strstr(url, strstr(_begin_hostname, ":") != NULL ? recnt_status.port : recnt_status.hostname), + "/", "/"); + int _tpsize = strlen(topicPath) + strlen(TOPIC) + 1; + recnt_status.topic = calloc(1, _tpsize); + snprintf(recnt_status.topic, _tpsize-1, "/%s/" TOPIC, topicPath); + recnt_status.client_id = tsMqttBrokerClientId==NULL || strlen(tsMqttBrokerClientId)<3? tsMqttBrokerClientId:"taos_mqtt"; + + taos_init(); - mqttPrint("mqttInitSystem %s", tsMqttBrokerAddress); + mqttPrint("mqttInitSystem mqtt://%s:%s@%s:%s/%s/", recnt_status.user_name, recnt_status.password, + recnt_status.hostname, recnt_status.port, topicPath); return rc; } int32_t mqttStartSystem() { int rc = 0; mqtt_conn = NULL; - mqtt_init_reconnect(&client, mqttReconnectClient, &reconnect_state, mqtt_PublishCallback); + mqtt_init_reconnect(&client, mqttReconnectClient, &recnt_status, mqtt_PublishCallback); if (pthread_create(&client_daemon, NULL, mqttClientRefresher, &client)) { mqttError("Failed to start client daemon."); mqttCleanup(EXIT_FAILURE, -1, NULL); @@ -77,6 +99,12 @@ void mqttCleanUpSystem() { mqttPrint("mqttCleanUpSystem"); mqttCleanup(EXIT_SUCCESS, client.socketfd, &client_daemon); taos_cleanup(mqtt_conn); + free(recnt_status.user_name); + free(recnt_status.password); + free(recnt_status.hostname); + free(recnt_status.port); + free(recnt_status.topic); + free(topicPath); } void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) { @@ -178,12 +206,10 @@ void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf, reconnect_state->recvbufsz); - /* Create an anonymous session */ - const char* client_id = NULL; /* Ensure we have a clean session */ uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION; /* Send connection request to the broker. */ - mqtt_connect(client, client_id, NULL, NULL, 0, NULL, NULL, connect_flags, 400); + mqtt_connect(client, reconnect_state->client_id, NULL, NULL, 0, reconnect_state->user_name, reconnect_state->password,connect_flags, 400); /* Subscribe to the topic. */ mqtt_subscribe(client, reconnect_state->topic, 0); diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index 3a0a1920af..a9f162a5ed 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -124,6 +124,8 @@ int64_t strnatoi(char *num, int32_t len); char* strreplace(const char* str, const char* pattern, const char* rep); +char *strbetween(char *string, char *begin, char *end); + char *paGetToken(char *src, char **token, int32_t *tokenLen); void taosMsleep(int32_t mseconds); diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index 989273e051..facef219c4 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -331,6 +331,20 @@ char *strreplace(const char *str, const char *pattern, const char *rep) { return dest; } +char *strbetween(char *string, char *begin, char *end) { + char *result = NULL; + char *_begin = strstr(string, begin); + if (_begin != NULL) { + char *_end = strstr(_begin + strlen(begin), end); + int size = _end - _begin; + if (_end != NULL && size > 0) { + result = (char *)calloc(1, size); + memcpy(result, _begin + strlen(begin), size - +strlen(begin)); + } + } + return result; +} + int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]) { int32_t i; char hexval[16] = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'}; @@ -691,4 +705,4 @@ void taosRemoveDir(char *rootDir) { rmdir(rootDir); uPrint("dir:%s is removed", rootDir); -} +} \ No newline at end of file