diff --git a/CMakeLists.txt b/CMakeLists.txt index 946ceb95ab..588526c286 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,8 @@ ENDIF () SET(TD_ACCOUNT FALSE) SET(TD_ADMIN FALSE) SET(TD_GRANT FALSE) +SET(TD_SYNC TRUE) +SET(TD_MQTT TRUE) SET(TD_TSDB_PLUGINS FALSE) SET(TD_COVER FALSE) diff --git a/cmake/define.inc b/cmake/define.inc index 6dd604057c..28770be254 100755 --- a/cmake/define.inc +++ b/cmake/define.inc @@ -13,6 +13,14 @@ IF (TD_GRANT) ADD_DEFINITIONS(-D_GRANT) ENDIF () +IF (TD_SYNC) + ADD_DEFINITIONS(-D_SYNC) +ENDIF () + +IF (TD_MQTT) + ADD_DEFINITIONS(-D_MQTT) +ENDIF () + IF (TD_TSDB_PLUGINS) ADD_DEFINITIONS(-D_TSDB_PLUGINS) ENDIF () diff --git a/cmake/input.inc b/cmake/input.inc index f90b10a087..1ef2045f57 100755 --- a/cmake/input.inc +++ b/cmake/input.inc @@ -42,6 +42,16 @@ IF (${MEM_CHECK} MATCHES "true") MESSAGE(STATUS "build with memory check") ENDIF () +IF (${MQTT} MATCHES "false") + SET(TD_MQTT FALSE) + MESSAGE(STATUS "build without mqtt module") +ENDIF () + +IF (${SYNC} MATCHES "false") + SET(TD_SYNC FALSE) + MESSAGE(STATUS "build without sync module") +ENDIF () + IF (${RANDOM_FILE_FAIL} MATCHES "true") SET(TD_RANDOM_FILE_FAIL TRUE) MESSAGE(STATUS "build with random-file-fail enabled") diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 64930b29d2..8c6c73c440 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -10,6 +10,6 @@ ADD_SUBDIRECTORY(cJson) ADD_SUBDIRECTORY(wepoll) ADD_SUBDIRECTORY(MsvcLibX) -IF (TD_LINUX) +IF (TD_LINUX AND TD_MQTT) ADD_SUBDIRECTORY(MQTT-C) ENDIF () \ No newline at end of file diff --git a/deps/MQTT-C/CMakeLists.txt b/deps/MQTT-C/CMakeLists.txt index ea5d4238d5..15b3552521 100644 --- a/deps/MQTT-C/CMakeLists.txt +++ b/deps/MQTT-C/CMakeLists.txt @@ -1,5 +1,4 @@ -cmake_minimum_required(VERSION 3.5) -project(MQTT-C VERSION 1.1.2 LANGUAGES C) +CMAKE_MINIMUM_REQUIRED(VERSION 2.8) # MQTT-C build options option(MQTT_C_OpenSSL_SUPPORT "Build MQTT-C with OpenSSL support?" OFF) diff --git a/packaging/cfg/taos.cfg b/packaging/cfg/taos.cfg index a91dde8796..7be6e72962 100644 --- a/packaging/cfg/taos.cfg +++ b/packaging/cfg/taos.cfg @@ -122,11 +122,14 @@ # number of replications, for cluster only # replica 1 -# mqtt uri -# mqttBrokerAddress mqtt://username:password@hostname:1883/taos/ +# mqtt hostname +# mqttHostName test.mosquitto.org -# mqtt client name -# mqttBrokerClientId taos_mqtt +# mqtt port +# mqttPort 1883 + +# mqtt topic +# mqttTopic /weather/loop # the compressed rpc message, option: # -1 (no compression) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ede66d95bb..898b7cb032 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -10,7 +10,9 @@ ADD_SUBDIRECTORY(client) ADD_SUBDIRECTORY(query) ADD_SUBDIRECTORY(kit) ADD_SUBDIRECTORY(plugins) -ADD_SUBDIRECTORY(sync) +IF (TD_SYNC) + ADD_SUBDIRECTORY(sync) +ENDIF () ADD_SUBDIRECTORY(balance) ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(vnode) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index ef0713c415..fedafe5b02 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -104,8 +104,12 @@ extern int32_t tsTelegrafUseFieldNum; // mqtt extern int32_t tsEnableMqttModule; -extern char tsMqttBrokerAddress[]; -extern char tsMqttBrokerClientId[]; +extern char tsMqttHostName[]; +extern char tsMqttPort[]; +extern char tsMqttUser[]; +extern char tsMqttPass[]; +extern char tsMqttClientId[]; +extern char tsMqttTopic[]; // monitor extern int32_t tsEnableMonitorModule; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 795585e5c9..c7763a257a 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -137,8 +137,12 @@ int32_t tsTelegrafUseFieldNum = 0; // mqtt int32_t tsEnableMqttModule = 0; // not finished yet, not started it by default -char tsMqttBrokerAddress[128] = {0}; -char tsMqttBrokerClientId[128] = {0}; +char tsMqttHostName[TSDB_MQTT_HOSTNAME_LEN] = "test.mosquitto.org"; +char tsMqttPort[TSDB_MQTT_PORT_LEN] = "1883"; +char tsMqttUser[TSDB_MQTT_USER_LEN] = {0}; +char tsMqttPass[TSDB_MQTT_PASS_LEN] = {0}; +char tsMqttClientId[TSDB_MQTT_CLIENT_ID_LEN] = "TDengineMqttSubscriber"; +char tsMqttTopic[TSDB_MQTT_TOPIC_LEN] = "/test"; // # // monitor int32_t tsEnableMonitorModule = 1; @@ -247,8 +251,11 @@ bool taosCfgDynamicOptions(char *msg) { for (int32_t i = 0; i < tsGlobalConfigNum; ++i) { SGlobalCfg *cfg = tsGlobalConfig + i; - if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; + //if (!(cfg->cfgType & TSDB_CFG_CTYPE_B_LOG)) continue; if (cfg->valType != TAOS_CFG_VTYPE_INT32) continue; + + int32_t cfgLen = strlen(cfg->option); + if (cfgLen != olen) continue; if (strncasecmp(option, cfg->option, olen) != 0) continue; *((int32_t *)cfg->ptr) = vint; @@ -767,26 +774,36 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "mqttBrokerAddress"; - cfg.ptr = tsMqttBrokerAddress; + cfg.option = "mqttHostName"; + cfg.ptr = tsMqttHostName; cfg.valType = TAOS_CFG_VTYPE_STRING; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.minValue = 0; cfg.maxValue = 0; - cfg.ptrLength = 126; + cfg.ptrLength = TSDB_MQTT_HOSTNAME_LEN; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - cfg.option = "mqttBrokerClientId"; - cfg.ptr = tsMqttBrokerClientId; + cfg.option = "mqttPort"; + cfg.ptr = tsMqttPort; cfg.valType = TAOS_CFG_VTYPE_STRING; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLIENT | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT; cfg.minValue = 0; cfg.maxValue = 0; - cfg.ptrLength = 126; + cfg.ptrLength = TSDB_MQTT_PORT_LEN; cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); - + + cfg.option = "mqttTopic"; + cfg.ptr = tsMqttTopic; + cfg.valType = TAOS_CFG_VTYPE_STRING; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_NOT_PRINT; + cfg.minValue = 0; + cfg.maxValue = 0; + cfg.ptrLength = TSDB_MQTT_TOPIC_LEN; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + cfg.option = "compressMsgSize"; cfg.ptr = &tsCompressMsgSize; cfg.valType = TAOS_CFG_VTYPE_INT32; @@ -1270,6 +1287,9 @@ void taosInitGlobalCfg() { } bool taosCheckGlobalCfg() { + char fqdn[TSDB_FQDN_LEN]; + uint16_t port; + if (debugFlag & DEBUG_TRACE || debugFlag & DEBUG_DEBUG || debugFlag & DEBUG_DUMP) { taosSetAllDebugFlag(); } @@ -1283,12 +1303,18 @@ bool taosCheckGlobalCfg() { if (tsFirst[0] == 0) { strcpy(tsFirst, tsLocalEp); + } else { + taosGetFqdnPortFromEp(tsFirst, fqdn, &port); + snprintf(tsFirst, sizeof(tsFirst), "%s:%d", fqdn, port); } if (tsSecond[0] == 0) { strcpy(tsSecond, tsLocalEp); + } else { + taosGetFqdnPortFromEp(tsSecond, fqdn, &port); + snprintf(tsSecond, sizeof(tsSecond), "%s:%d", fqdn, port); } - + taosGetSystemInfo(); tsSetLocale(); diff --git a/src/dnode/CMakeLists.txt b/src/dnode/CMakeLists.txt index 24a109dd29..dea90811ad 100644 --- a/src/dnode/CMakeLists.txt +++ b/src/dnode/CMakeLists.txt @@ -11,10 +11,12 @@ AUX_SOURCE_DIRECTORY(src SRC) IF (TD_LINUX) ADD_EXECUTABLE(taosd ${SRC}) + TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4) + IF (TD_SOMODE_STATIC) - TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http mqtt tsdb twal vnode cJson lz4 balance sync) + TARGET_LINK_LIBRARIES(taosd taos_static) ELSE () - TARGET_LINK_LIBRARIES(taosd mnode taos monitor http mqtt tsdb twal vnode cJson lz4 balance sync) + TARGET_LINK_LIBRARIES(taosd taos) ENDIF () IF (TD_ACCOUNT) @@ -25,6 +27,14 @@ IF (TD_LINUX) TARGET_LINK_LIBRARIES(taosd grant) ENDIF () + IF (TD_MQTT) + TARGET_LINK_LIBRARIES(taosd mqtt) + ENDIF () + + IF (TD_SYNC) + TARGET_LINK_LIBRARIES(taosd balance sync) + ENDIF () + SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_TARGET "prepare_env_target") ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD} diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index 1f41bc23eb..c968246a68 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -611,7 +611,7 @@ static bool dnodeReadMnodeInfos() { } for (int i = 0; i < size; ++i) { - cJSON* nodeInfo = cJSON_GetArrayItem(nodeInfos, i); + cJSON *nodeInfo = cJSON_GetArrayItem(nodeInfos, i); if (nodeInfo == NULL) continue; cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId"); @@ -627,7 +627,7 @@ static bool dnodeReadMnodeInfos() { goto PARSE_OVER; } strncpy(tsDMnodeInfos.nodeInfos[i].nodeEp, nodeEp->valuestring, TSDB_EP_LEN); - } + } ret = true; diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index f8acb871c5..e1d298089c 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -62,6 +62,7 @@ static void dnodeAllocModules() { dnodeSetModuleStatus(TSDB_MOD_HTTP); } +#ifdef _MQTT tsModule[TSDB_MOD_MQTT].enable = (tsEnableMqttModule == 1); tsModule[TSDB_MOD_MQTT].name = "mqtt"; tsModule[TSDB_MOD_MQTT].initFp = mqttInitSystem; @@ -71,6 +72,7 @@ static void dnodeAllocModules() { if (tsEnableMqttModule) { dnodeSetModuleStatus(TSDB_MOD_MQTT); } +#endif tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1); tsModule[TSDB_MOD_MONITOR].name = "monitor"; diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index cd25ddcc55..1a40f3b56d 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -272,6 +272,13 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_SHOW_SQL_LEN 64 #define TSDB_SLOW_QUERY_SQL_LEN 512 +#define TSDB_MQTT_HOSTNAME_LEN 64 +#define TSDB_MQTT_PORT_LEN 8 +#define TSDB_MQTT_USER_LEN 24 +#define TSDB_MQTT_PASS_LEN 24 +#define TSDB_MQTT_TOPIC_LEN 64 +#define TSDB_MQTT_CLIENT_ID_LEN 32 + #define TSDB_METER_STATE_OFFLINE 0 #define TSDB_METER_STATE_ONLLINE 1 diff --git a/src/inc/tmqtt.h b/src/inc/tmqtt.h index 401aac16c6..256e61fbae 100644 --- a/src/inc/tmqtt.h +++ b/src/inc/tmqtt.h @@ -19,11 +19,11 @@ #ifdef __cplusplus extern "C" { #endif -#include + int32_t mqttInitSystem(); int32_t mqttStartSystem(); -void mqttStopSystem(); -void mqttCleanUpSystem(); +void mqttStopSystem(); +void mqttCleanUpSystem(); #ifdef __cplusplus } diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index cdea9eda60..8187cfd265 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -310,6 +310,13 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) { return TSDB_CODE_MND_INVALID_DB_OPTION; } +#ifndef _SYNC + if (pCfg->replications != 1) { + mError("invalid db option replications:%d can only be 1 in this version", pCfg->replications); + return TSDB_CODE_MND_INVALID_DB_OPTION; + } +#endif + return TSDB_CODE_SUCCESS; } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index d5b9d18e37..aaad2462ea 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -78,6 +78,9 @@ static int32_t mnodeDnodeActionInsert(SSdbOper *pOper) { static int32_t mnodeDnodeActionDelete(SSdbOper *pOper) { SDnodeObj *pDnode = pOper->pObj; +#ifndef _SYNC + mnodeDropAllDnodeVgroups(pDnode); +#endif mnodeDropMnodeLocal(pDnode->dnodeId); balanceAsyncNotify(); @@ -585,7 +588,11 @@ static int32_t mnodeDropDnodeByEp(char *ep, SMnodeMsg *pMsg) { mInfo("dnode:%d, start to drop it", pDnode->dnodeId); +#ifndef _SYNC + int32_t code = mnodeDropDnode(pDnode, pMsg); +#else int32_t code = balanceDropDnode(pDnode); +#endif mnodeDecDnodeRef(pDnode); return code; } @@ -1043,3 +1050,59 @@ static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) { } } +#ifndef _SYNC + +int32_t balanceInit() { return TSDB_CODE_SUCCESS; } +void balanceCleanUp() {} +void balanceAsyncNotify() {} +void balanceSyncNotify() {} +void balanceReset() {} +int32_t balanceAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId) { return TSDB_CODE_SYN_NOT_ENABLED; } + +char* syncRole[] = { + "offline", + "unsynced", + "syncing", + "slave", + "master" +}; + +int32_t balanceAllocVnodes(SVgObj *pVgroup) { + void * pIter = NULL; + SDnodeObj *pDnode = NULL; + SDnodeObj *pSelDnode = NULL; + float vnodeUsage = 1000.0; + + while (1) { + pIter = mnodeGetNextDnode(pIter, &pDnode); + if (pDnode == NULL) break; + + if (pDnode->numOfCores > 0 && pDnode->openVnodes < TSDB_MAX_VNODES) { + float openVnodes = pDnode->openVnodes; + if (pDnode->isMgmt) openVnodes += tsMnodeEqualVnodeNum; + + float usage = openVnodes / pDnode->numOfCores; + if (usage <= vnodeUsage) { + pSelDnode = pDnode; + vnodeUsage = usage; + } + } + mnodeDecDnodeRef(pDnode); + } + + sdbFreeIter(pIter); + + if (pSelDnode == NULL) { + mError("failed to alloc vnode to vgroup"); + return TSDB_CODE_MND_NO_ENOUGH_DNODES; + } + + pVgroup->vnodeGid[0].dnodeId = pSelDnode->dnodeId; + pVgroup->vnodeGid[0].pDnode = pSelDnode; + + mDebug("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes); + return TSDB_CODE_SUCCESS; +} + +#endif + diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 06f992c26a..26f38dde03 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -98,8 +98,10 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) { .connId = connId, .stime = taosGetTimestampMs() }; + tstrncpy(connObj.user, user, sizeof(connObj.user)); - + connObj.lastAccess = connObj.stime; + SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME * 1000); mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port); @@ -244,6 +246,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + if (pConnObj->lastAccess < pConnObj->stime) pConnObj->lastAccess = pConnObj->stime; *(int64_t *)pWrite = pConnObj->lastAccess; cols++; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index 03b1399ea7..4400927e9b 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -1711,14 +1711,20 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { mnodeDestroyChildTable(pTable); return TSDB_CODE_MND_INVALID_TABLE_NAME; } - + pTable->suid = pMsg->pSTable->uid; - pTable->uid = (((uint64_t)pTable->vgId) << 40) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 16) + - (sdbGetVersion() & ((1ul << 16) - 1ul)); + pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 24) + + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); pTable->superTable = pMsg->pSTable; } else { - pTable->uid = (((uint64_t)pTable->vgId) << 40) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 16) + - (sdbGetVersion() & ((1ul << 16) - 1ul)); + if (pTable->info.type == TSDB_SUPER_TABLE) { + int64_t us = taosGetTimestampUs(); + pTable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); + } else { + pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->sid) & ((1ul << 24) - 1ul)) << 24) + + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul)); + } + pTable->sversion = 0; pTable->numOfColumns = htons(pCreate->numOfColumns); pTable->sqlLen = htons(pCreate->sqlLen); diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index f4cb1a9ef3..c03ff688d2 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -581,7 +581,7 @@ void mnodeDropAllUsers(SAcctObj *pAcct) { int32_t mnodeRetriveAuth(char *user, char *spi, char *encrypt, char *secret, char *ckey) { if (!sdbIsMaster()) { *secret = 0; - mDebug("user:%s, failed to auth user, reason:%s", user, tstrerror(TSDB_CODE_APP_NOT_READY)); + mDebug("user:%s, failed to auth user, mnode is not master", user); return TSDB_CODE_APP_NOT_READY; } diff --git a/src/plugins/CMakeLists.txt b/src/plugins/CMakeLists.txt index 2bc6bf54bf..742235894a 100644 --- a/src/plugins/CMakeLists.txt +++ b/src/plugins/CMakeLists.txt @@ -3,4 +3,6 @@ PROJECT(TDengine) ADD_SUBDIRECTORY(monitor) ADD_SUBDIRECTORY(http) -ADD_SUBDIRECTORY(mqtt) +IF (TD_MQTT) + ADD_SUBDIRECTORY(mqtt) +ENDIF () \ No newline at end of file diff --git a/src/plugins/http/CMakeLists.txt b/src/plugins/http/CMakeLists.txt index 2c3cbf636f..5d8b52986a 100644 --- a/src/plugins/http/CMakeLists.txt +++ b/src/plugins/http/CMakeLists.txt @@ -11,11 +11,12 @@ AUX_SOURCE_DIRECTORY(src SRC) IF (TD_LINUX) ADD_LIBRARY(http ${SRC}) + TARGET_LINK_LIBRARIES(http z) IF (TD_SOMODE_STATIC) - TARGET_LINK_LIBRARIES(http taos_static z) + TARGET_LINK_LIBRARIES(http taos_static) ELSE () - TARGET_LINK_LIBRARIES(http taos z) + TARGET_LINK_LIBRARIES(http taos) ENDIF () IF (TD_ADMIN) diff --git a/src/plugins/monitor/CMakeLists.txt b/src/plugins/monitor/CMakeLists.txt index 26a7775e9c..edea2187ea 100644 --- a/src/plugins/monitor/CMakeLists.txt +++ b/src/plugins/monitor/CMakeLists.txt @@ -2,11 +2,11 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) AUX_SOURCE_DIRECTORY(./src SRC) - + IF (TD_LINUX) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) ADD_LIBRARY(monitor ${SRC}) IF (TD_SOMODE_STATIC) diff --git a/src/plugins/monitor/src/monitorMain.c b/src/plugins/monitor/src/monitorMain.c index d76bb4bd82..9c94bdc1fa 100644 --- a/src/plugins/monitor/src/monitorMain.c +++ b/src/plugins/monitor/src/monitorMain.c @@ -25,6 +25,7 @@ #include "tsclient.h" #include "dnode.h" #include "monitor.h" +#include "taoserror.h" #define monitorFatal(...) { if (monitorDebugFlag & DEBUG_FATAL) { taosPrintLog("MON FATAL ", 255, __VA_ARGS__); }} #define monitorError(...) { if (monitorDebugFlag & DEBUG_ERROR) { taosPrintLog("MON ERROR ", 255, __VA_ARGS__); }} @@ -33,129 +34,159 @@ #define monitorDebug(...) { if (monitorDebugFlag & DEBUG_DEBUG) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} #define monitorTrace(...) { if (monitorDebugFlag & DEBUG_TRACE) { taosPrintLog("MON ", monitorDebugFlag, __VA_ARGS__); }} -#define SQL_LENGTH 1024 +#define SQL_LENGTH 1030 #define LOG_LEN_STR 100 #define IP_LEN_STR TSDB_EP_LEN #define CHECK_INTERVAL 1000 typedef enum { - MONITOR_CMD_CREATE_DB, - MONITOR_CMD_CREATE_TB_LOG, - MONITOR_CMD_CREATE_MT_DN, - MONITOR_CMD_CREATE_MT_ACCT, - MONITOR_CMD_CREATE_TB_DN, - MONITOR_CMD_CREATE_TB_ACCT_ROOT, - MONITOR_CMD_CREATE_TB_SLOWQUERY, - MONITOR_CMD_MAX + MON_CMD_CREATE_DB, + MON_CMD_CREATE_TB_LOG, + MON_CMD_CREATE_MT_DN, + MON_CMD_CREATE_MT_ACCT, + MON_CMD_CREATE_TB_DN, + MON_CMD_CREATE_TB_ACCT_ROOT, + MON_CMD_CREATE_TB_SLOWQUERY, + MON_CMD_MAX } EMonitorCommand; typedef enum { - MONITOR_STATE_UN_INIT, - MONITOR_STATE_INITIALIZING, - MONITOR_STATE_INITIALIZED, - MONITOR_STATE_STOPPED + MON_STATE_NOT_INIT, + MON_STATE_INITED } EMonitorState; typedef struct { - void * conn; - void * timer; - char ep[TSDB_EP_LEN]; - int8_t cmdIndex; - int8_t state; - char sql[SQL_LENGTH + 1]; - void * initTimer; - void * diskTimer; + pthread_t thread; + void * conn; + char ep[TSDB_EP_LEN]; + int8_t cmdIndex; + int8_t state; + int8_t start; // enable/disable by mnode + int8_t quiting; // taosd is quiting + char sql[SQL_LENGTH + 1]; } SMonitorConn; -static SMonitorConn tsMonitorConn; -static void monitorInitConn(void *para, void *unused); -static void monitorInitConnCb(void *param, TAOS_RES *result, int32_t code); -static void monitorInitDatabase(); -static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code); -static void monitorStartTimer(); -static void monitorSaveSystemInfo(); +static SMonitorConn tsMonitor = {0}; +static void monitorSaveSystemInfo(); +static void *monitorThreadFunc(void *param); +static void monitorBuildMonitorSql(char *sql, int32_t cmd); extern int32_t (*monitorStartSystemFp)(); -extern void (*monitorStopSystemFp)(); -extern void (*monitorExecuteSQLFp)(char *sql); - -static void monitorCheckDiskUsage(void *para, void *unused) { - taosGetDisk(); - taosTmrReset(monitorCheckDiskUsage, CHECK_INTERVAL, NULL, tscTmr, &tsMonitorConn.diskTimer); -} +extern void (*monitorStopSystemFp)(); +extern void (*monitorExecuteSQLFp)(char *sql); int32_t monitorInitSystem() { - taos_init(); - taosTmrReset(monitorCheckDiskUsage, CHECK_INTERVAL, NULL, tscTmr, &tsMonitorConn.diskTimer); + if (tsMonitor.ep[0] == 0) { + strcpy(tsMonitor.ep, tsLocalEp); + } + + int len = strlen(tsMonitor.ep); + for (int i = 0; i < len; ++i) { + if (tsMonitor.ep[i] == ':' || tsMonitor.ep[i] == '-' || tsMonitor.ep[i] == '.') { + tsMonitor.ep[i] = '_'; + } + } + + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&tsMonitor.thread, &thAttr, monitorThreadFunc, NULL)) { + monitorError("failed to create thread to for monitor module, reason:%s", strerror(errno)); + return -1; + } + + pthread_attr_destroy(&thAttr); + monitorDebug("monitor thread is launched"); + monitorStartSystemFp = monitorStartSystem; monitorStopSystemFp = monitorStopSystem; return 0; } int32_t monitorStartSystem() { - monitorInfo("start monitor module"); - monitorInitSystem(); - taosTmrReset(monitorInitConn, 10, NULL, tscTmr, &tsMonitorConn.initTimer); + taos_init(); + tsMonitor.start = 1; + monitorExecuteSQLFp = monitorExecuteSQL; + monitorInfo("monitor module start"); return 0; } -static void monitorStartSystemRetry() { - if (tsMonitorConn.initTimer != NULL) { - taosTmrReset(monitorInitConn, 3000, NULL, tscTmr, &tsMonitorConn.initTimer); - } -} +static void *monitorThreadFunc(void *param) { + monitorDebug("starting to initialize monitor module ..."); -static void monitorInitConn(void *para, void *unused) { - if (dnodeGetDnodeId() <= 0) { - monitorStartSystemRetry(); - return; - } - - monitorInfo("starting to initialize monitor service .."); - tsMonitorConn.state = MONITOR_STATE_INITIALIZING; + while (1) { + if (tsMonitor.quiting) { + tsMonitor.state = MON_STATE_NOT_INIT; + monitorInfo("monitor thread will quit, for taosd is quiting"); + break; + } else { + taosGetDisk(); + } - if (tsMonitorConn.ep[0] == 0) - strcpy(tsMonitorConn.ep, tsLocalEp); + if (tsMonitor.start == 0) { + continue; + } - int len = strlen(tsMonitorConn.ep); - for (int i = 0; i < len; ++i) { - if (tsMonitorConn.ep[i] == ':' || tsMonitorConn.ep[i] == '-') { - tsMonitorConn.ep[i] = '_'; + static int32_t accessTimes = 0; + accessTimes++; + taosMsleep(1000); + + if (dnodeGetDnodeId() <= 0) { + monitorDebug("dnode not initialized, waiting for 3000 ms to start monitor module"); + continue; + } + + if (tsMonitor.conn == NULL) { + tsMonitor.state = MON_STATE_NOT_INIT; + tsMonitor.conn = taos_connect(NULL, "monitor", tsInternalPass, "", 0); + if (tsMonitor.conn == NULL) { + monitorError("failed to connect to database, reason:%s", tstrerror(terrno)); + continue; + } else { + monitorDebug("connect to database success"); + } + } + + if (tsMonitor.state == MON_STATE_NOT_INIT) { + for (; tsMonitor.cmdIndex < MON_CMD_MAX; ++tsMonitor.cmdIndex) { + monitorBuildMonitorSql(tsMonitor.sql, tsMonitor.cmdIndex); + void *res = taos_query(tsMonitor.conn, tsMonitor.sql); + int code = taos_errno(res); + taos_free_result(res); + + if (code != 0) { + monitorError("failed to exec sql:%s, reason:%s", tsMonitor.sql, tstrerror(code)); + break; + } else { + monitorDebug("successfully to exec sql:%s", tsMonitor.sql); + } + } + + if (tsMonitor.start) { + tsMonitor.state = MON_STATE_INITED; + } + } + + if (tsMonitor.state == MON_STATE_INITED) { + if (accessTimes % tsMonitorInterval == 0) { + monitorSaveSystemInfo(); + } } } - if (tsMonitorConn.conn == NULL) { - taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, monitorInitConnCb, &tsMonitorConn, &(tsMonitorConn.conn)); - } else { - monitorInitDatabase(); - } + monitorInfo("monitor thread is stopped"); + return NULL; } -static void monitorInitConnCb(void *param, TAOS_RES *result, int32_t code) { - // free it firstly in any cases. - taos_free_result(result); - - if (code != TSDB_CODE_SUCCESS) { - monitorError("monitor:%p, connect to database failed, reason:%s", tsMonitorConn.conn, tstrerror(code)); - taos_close(tsMonitorConn.conn); - tsMonitorConn.conn = NULL; - tsMonitorConn.state = MONITOR_STATE_UN_INIT; - monitorStartSystemRetry(); - return; - } - - monitorDebug("monitor:%p, connect to database success, reason:%s", tsMonitorConn.conn, tstrerror(code)); - monitorInitDatabase(); -} - -static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { +static void monitorBuildMonitorSql(char *sql, int32_t cmd) { memset(sql, 0, SQL_LENGTH); - if (cmd == MONITOR_CMD_CREATE_DB) { + if (cmd == MON_CMD_CREATE_DB) { snprintf(sql, SQL_LENGTH, "create database if not exists %s replica 1 days 10 keep 30 cache %d " - "blocks %d maxtables 16 precision 'us'", + "blocks %d precision 'us'", tsMonitorDbName, TSDB_MIN_CACHE_BLOCK_SIZE, TSDB_MIN_TOTAL_BLOCKS); - } else if (cmd == MONITOR_CMD_CREATE_MT_DN) { + } else if (cmd == MON_CMD_CREATE_MT_DN) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn(ts timestamp" ", cpu_taosd float, cpu_system float, cpu_cores int" @@ -166,10 +197,10 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { ", req_http int, req_select int, req_insert int" ") tags (dnodeid int, fqdn binary(%d))", tsMonitorDbName, TSDB_FQDN_LEN); - } else if (cmd == MONITOR_CMD_CREATE_TB_DN) { + } else if (cmd == MON_CMD_CREATE_TB_DN) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.dn%d using %s.dn tags(%d, '%s')", tsMonitorDbName, dnodeGetDnodeId(), tsMonitorDbName, dnodeGetDnodeId(), tsLocalEp); - } else if (cmd == MONITOR_CMD_CREATE_MT_ACCT) { + } else if (cmd == MON_CMD_CREATE_MT_ACCT) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.acct(ts timestamp " ", currentPointsPerSecond bigint, maxPointsPerSecond bigint" @@ -185,15 +216,15 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { ", accessState smallint" ") tags (acctId binary(%d))", tsMonitorDbName, TSDB_USER_LEN); - } else if (cmd == MONITOR_CMD_CREATE_TB_ACCT_ROOT) { + } else if (cmd == MON_CMD_CREATE_TB_ACCT_ROOT) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.acct_%s using %s.acct tags('%s')", tsMonitorDbName, TSDB_DEFAULT_USER, tsMonitorDbName, TSDB_DEFAULT_USER); - } else if (cmd == MONITOR_CMD_CREATE_TB_SLOWQUERY) { + } else if (cmd == MON_CMD_CREATE_TB_SLOWQUERY) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.slowquery(ts timestamp, username " "binary(%d), created_time timestamp, time bigint, sql binary(%d))", tsMonitorDbName, TSDB_TABLE_FNAME_LEN - 1, TSDB_SLOW_QUERY_SQL_LEN); - } else if (cmd == MONITOR_CMD_CREATE_TB_LOG) { + } else if (cmd == MON_CMD_CREATE_TB_LOG) { snprintf(sql, SQL_LENGTH, "create table if not exists %s.log(ts timestamp, level tinyint, " "content binary(%d), ipaddr binary(%d))", @@ -203,75 +234,22 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) { sql[SQL_LENGTH] = 0; } -static void monitorInitDatabase() { - if (tsMonitorConn.cmdIndex < MONITOR_CMD_MAX) { - dnodeBuildMonitorSql(tsMonitorConn.sql, tsMonitorConn.cmdIndex); - taos_query_a(tsMonitorConn.conn, tsMonitorConn.sql, monitorInitDatabaseCb, NULL); - } else { - tsMonitorConn.state = MONITOR_STATE_INITIALIZED; - monitorExecuteSQLFp = monitorExecuteSQL; - monitorInfo("monitor service init success"); - - monitorStartTimer(); - } -} - -static void monitorInitDatabaseCb(void *param, TAOS_RES *result, int32_t code) { - if (code == TSDB_CODE_MND_TABLE_ALREADY_EXIST || code == TSDB_CODE_MND_DB_ALREADY_EXIST || code >= 0) { - monitorDebug("monitor:%p, sql success, reason:%s, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql); - if (tsMonitorConn.cmdIndex == MONITOR_CMD_CREATE_TB_LOG) { - monitorInfo("dnode:%s is started", tsLocalEp); - } - tsMonitorConn.cmdIndex++; - monitorInitDatabase(); - } else { - monitorError("monitor:%p, sql failed, reason:%s, %s", tsMonitorConn.conn, tstrerror(code), tsMonitorConn.sql); - tsMonitorConn.state = MONITOR_STATE_UN_INIT; - monitorStartSystemRetry(); - } - - taos_free_result(result); -} - void monitorStopSystem() { - if (tsMonitorConn.state == MONITOR_STATE_STOPPED) return; - tsMonitorConn.state = MONITOR_STATE_STOPPED; + tsMonitor.start = 0; + tsMonitor.state = MON_STATE_NOT_INIT; monitorExecuteSQLFp = NULL; - - monitorInfo("monitor module is stopped"); - - if (tsMonitorConn.initTimer != NULL) { - taosTmrStopA(&(tsMonitorConn.initTimer)); - } - if (tsMonitorConn.timer != NULL) { - taosTmrStopA(&(tsMonitorConn.timer)); - } - if (tsMonitorConn.conn != NULL) { - taos_close(tsMonitorConn.conn); - tsMonitorConn.conn = NULL; - } + monitorInfo("monitor module stopped"); } void monitorCleanUpSystem() { + tsMonitor.quiting = 1; monitorStopSystem(); - monitorInfo("monitor module cleanup"); -} - -static void monitorStartTimer() { - taosTmrReset(monitorSaveSystemInfo, tsMonitorInterval * 1000, NULL, tscTmr, &tsMonitorConn.timer); -} - -static void dnodeMontiorLogCallback(void *param, TAOS_RES *result, int32_t code) { - int32_t c = taos_errno(result); - - if (c != TSDB_CODE_SUCCESS) { - monitorError("monitor:%p, save %s failed, reason:%s", tsMonitorConn.conn, (char *)param, tstrerror(c)); - } else { - int32_t rows = taos_affected_rows(result); - monitorDebug("monitor:%p, save %s succ, rows:%d", tsMonitorConn.conn, (char *)param, rows); + pthread_join(tsMonitor.thread, NULL); + if (tsMonitor.conn != NULL) { + taos_close(tsMonitor.conn); + tsMonitor.conn = NULL; } - - taos_free_result(result); + monitorInfo("monitor module is cleaned up"); } // unit is MB @@ -279,13 +257,13 @@ static int32_t monitorBuildMemorySql(char *sql) { float sysMemoryUsedMB = 0; bool suc = taosGetSysMemory(&sysMemoryUsedMB); if (!suc) { - monitorError("monitor:%p, get sys memory info failed.", tsMonitorConn.conn); + monitorDebug("failed to get sys memory info"); } float procMemoryUsedMB = 0; suc = taosGetProcMemory(&procMemoryUsedMB); if (!suc) { - monitorError("monitor:%p, get proc memory info failed.", tsMonitorConn.conn); + monitorDebug("failed to get proc memory info"); } return sprintf(sql, ", %f, %f, %d", procMemoryUsedMB, sysMemoryUsedMB, tsTotalMemoryMB); @@ -296,11 +274,11 @@ static int32_t monitorBuildCpuSql(char *sql) { float sysCpuUsage = 0, procCpuUsage = 0; bool suc = taosGetCpuUsage(&sysCpuUsage, &procCpuUsage); if (!suc) { - monitorError("monitor:%p, get cpu usage failed.", tsMonitorConn.conn); + monitorDebug("failed to get cpu usage"); } if (sysCpuUsage <= procCpuUsage) { - sysCpuUsage = procCpuUsage + (float)0.1; + sysCpuUsage = procCpuUsage + 0.1f; } return sprintf(sql, ", %f, %f, %d", procCpuUsage, sysCpuUsage, tsNumOfCores); @@ -316,14 +294,14 @@ static int32_t monitorBuildBandSql(char *sql) { float bandSpeedKb = 0; bool suc = taosGetBandSpeed(&bandSpeedKb); if (!suc) { - monitorError("monitor:%p, get bandwidth speed failed.", tsMonitorConn.conn); + monitorDebug("failed to get bandwidth speed"); } return sprintf(sql, ", %f", bandSpeedKb); } static int32_t monitorBuildReqSql(char *sql) { - SDnodeStatisInfo info = dnodeGetStatisInfo(); + SDnodeStatisInfo info = dnodeGetStatisInfo(); return sprintf(sql, ", %d, %d, %d)", info.httpReqNum, info.queryReqNum, info.submitReqNum); } @@ -331,20 +309,15 @@ static int32_t monitorBuildIoSql(char *sql) { float readKB = 0, writeKB = 0; bool suc = taosGetProcIO(&readKB, &writeKB); if (!suc) { - monitorError("monitor:%p, get io info failed.", tsMonitorConn.conn); + monitorDebug("failed to get io info"); } return sprintf(sql, ", %f, %f", readKB, writeKB); } static void monitorSaveSystemInfo() { - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) { - monitorStartTimer(); - return; - } - int64_t ts = taosGetTimestampUs(); - char * sql = tsMonitorConn.sql; + char * sql = tsMonitor.sql; int32_t pos = snprintf(sql, SQL_LENGTH, "insert into %s.dn%d values(%" PRId64, tsMonitorDbName, dnodeGetDnodeId(), ts); pos += monitorBuildCpuSql(sql + pos); @@ -354,16 +327,31 @@ static void monitorSaveSystemInfo() { pos += monitorBuildIoSql(sql + pos); pos += monitorBuildReqSql(sql + pos); - monitorDebug("monitor:%p, save system info, sql:%s", tsMonitorConn.conn, sql); - taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "sys"); + void *res = taos_query(tsMonitor.conn, tsMonitor.sql); + int code = taos_errno(res); + taos_free_result(res); - if (tsMonitorConn.timer != NULL && tsMonitorConn.state != MONITOR_STATE_STOPPED) { - monitorStartTimer(); + if (code != 0) { + monitorError("failed to save system info, reason:%s, sql:%s", tstrerror(code), tsMonitor.sql); + } else { + monitorDebug("successfully to save system info, sql:%s", tsMonitor.sql); } } +static void montiorExecSqlCb(void *param, TAOS_RES *result, int32_t code) { + int32_t c = taos_errno(result); + if (c != TSDB_CODE_SUCCESS) { + monitorError("save %s failed, reason:%s", (char *)param, tstrerror(c)); + } else { + int32_t rows = taos_affected_rows(result); + monitorDebug("save %s succ, rows:%d", (char *)param, rows); + } + + taos_free_result(result); +} + void monitorSaveAcctLog(SAcctMonitorObj *pMon) { - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return; + if (tsMonitor.state != MON_STATE_INITED) return; char sql[1024] = {0}; sprintf(sql, @@ -392,19 +380,16 @@ void monitorSaveAcctLog(SAcctMonitorObj *pMon) { pMon->totalConns, pMon->maxConns, pMon->accessState); - monitorDebug("monitor:%p, save account info, sql %s", tsMonitorConn.conn, sql); - taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "account"); + monitorDebug("save account info, sql:%s", sql); + taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "account info"); } void monitorSaveLog(int32_t level, const char *const format, ...) { - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return; + if (tsMonitor.state != MON_STATE_INITED) return; va_list argpointer; char sql[SQL_LENGTH] = {0}; int32_t max_length = SQL_LENGTH - 30; - - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return; - int32_t len = snprintf(sql, (size_t)max_length, "insert into %s.log values(%" PRId64 ", %d,'", tsMonitorDbName, taosGetTimestampUs(), level); @@ -416,12 +401,13 @@ void monitorSaveLog(int32_t level, const char *const format, ...) { len += sprintf(sql + len, "', '%s')", tsLocalEp); sql[len++] = 0; - monitorDebug("monitor:%p, save log, sql: %s", tsMonitorConn.conn, sql); - taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "log"); + monitorDebug("save log, sql: %s", sql); + taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "log"); } void monitorExecuteSQL(char *sql) { - if (tsMonitorConn.state != MONITOR_STATE_INITIALIZED) return; - monitorDebug("monitor:%p, execute sql: %s", tsMonitorConn.conn, sql); - taos_query_a(tsMonitorConn.conn, sql, dnodeMontiorLogCallback, "sql"); + if (tsMonitor.state != MON_STATE_INITED) return; + + monitorDebug("execute sql:%s", sql); + taos_query_a(tsMonitor.conn, sql, montiorExecSqlCb, "sql"); } diff --git a/src/plugins/mqtt/CMakeLists.txt b/src/plugins/mqtt/CMakeLists.txt index 2467af588c..cf5729d608 100644 --- a/src/plugins/mqtt/CMakeLists.txt +++ b/src/plugins/mqtt/CMakeLists.txt @@ -2,21 +2,19 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8) PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/include) +INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/examples/templates) AUX_SOURCE_DIRECTORY(src SRC) IF (TD_LINUX) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/include) - INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MQTT-C/examples/templates) ADD_LIBRARY(mqtt ${SRC}) + TARGET_LINK_LIBRARIES(mqtt cJson mqttc) IF (TD_SOMODE_STATIC) - TARGET_LINK_LIBRARIES(mqtt taos_static cJson mqttc) + TARGET_LINK_LIBRARIES(mqtt taos_static) ELSE () - TARGET_LINK_LIBRARIES(mqtt taos cJson mqttc) - ENDIF () - IF (TD_ADMIN) - TARGET_LINK_LIBRARIES(mqtt admin cJson) + TARGET_LINK_LIBRARIES(mqtt taos) ENDIF () ENDIF () diff --git a/src/plugins/mqtt/inc/mqttInit.h b/src/plugins/mqtt/inc/mqttInit.h index 5dbd62789b..81a9a39a2c 100644 --- a/src/plugins/mqtt/inc/mqttInit.h +++ b/src/plugins/mqtt/inc/mqttInit.h @@ -23,11 +23,12 @@ extern "C" { * @file * A simple subscriber program that performs automatic reconnections. */ -#include -#include -#include #include "mqtt.h" -#include "taos.h" + +#define QOS 1 +#define TIMEOUT 10000L +#define MQTT_SEND_BUF_SIZE 102400 +#define MQTT_RECV_BUF_SIZE 102400 /** * @brief A structure that I will use to keep track of some data needed @@ -36,18 +37,12 @@ extern "C" { * An instance of this struct will be created in my \c main(). Then, whenever * \ref mqttReconnectClient is called, this instance will be passed. */ -struct reconnect_state_t { - char* hostname; - char* port; - char* topic; - char* client_id; - char* user_name; - char* password; +typedef struct SMqttReconnectState { uint8_t* sendbuf; size_t sendbufsz; uint8_t* recvbuf; size_t recvbufsz; -}; +} SMqttReconnectState; /** * @brief My reconnect callback. It will reestablish the connection whenever @@ -58,7 +53,7 @@ void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr /** * @brief The function will be called whenever a PUBLISH message is received. */ -void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published); +void mqttPublishCallback(void** unused, struct mqtt_response_publish* published); /** * @brief The client's refresher. This function triggers back-end routines to @@ -73,12 +68,7 @@ void* mqttClientRefresher(void* client); /** * @brief Safelty closes the \p sockfd and cancels the \p client_daemon before \c exit. */ - -void mqttCleanup(int status, int sockfd, pthread_t* client_daemon); -void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code); -void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code); -#define QOS 1 -#define TIMEOUT 10000L +void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon); #ifdef __cplusplus } diff --git a/src/plugins/mqtt/inc/mqttPayload.h b/src/plugins/mqtt/inc/mqttPayload.h index b7e7abbd96..12a714afac 100644 --- a/src/plugins/mqtt/inc/mqttPayload.h +++ b/src/plugins/mqtt/inc/mqttPayload.h @@ -15,11 +15,13 @@ #ifndef TDENGINE_MQTT_PLYLOAD_H #define TDENGINE_MQTT_PLYLOAD_H + #ifdef __cplusplus extern "C" { #endif -char split(char str[], char delims[], char** p_p_cmd_part, int max); -char* converJsonToSql(char* json, char* _dbname, char* _tablename); + +char* mqttConverJsonToSql(char* json, int maxSize); + #ifdef __cplusplus } #endif diff --git a/src/plugins/mqtt/inc/mqttSystem.h b/src/plugins/mqtt/inc/mqttSystem.h deleted file mode 100644 index a79fac33b5..0000000000 --- a/src/plugins/mqtt/inc/mqttSystem.h +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MQTT_SYSTEM_H -#define TDENGINE_MQTT_SYSTEM_H -#ifdef __cplusplus -extern "C" { -#endif -#include -int32_t mqttInitSystem(); -int32_t mqttStartSystem(); -void mqttStopSystem(); -void mqttCleanUpSystem(); -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/plugins/mqtt/src/mqttPayload.c b/src/plugins/mqtt/src/mqttPayload.c index 96c8e71edd..1af8b02fad 100644 --- a/src/plugins/mqtt/src/mqttPayload.c +++ b/src/plugins/mqtt/src/mqttPayload.c @@ -14,52 +14,146 @@ */ #define _DEFAULT_SOURCE -#include "mqttPayload.h" -#include "cJSON.h" -#include "string.h" -#include "taos.h" -#include "mqttLog.h" #include "os.h" -char split(char str[], char delims[], char** p_p_cmd_part, int max) { - char* token = strtok(str, delims); - char part_index = 0; - char** tmp_part = p_p_cmd_part; - while (token) { - *tmp_part++ = token; - token = strtok(NULL, delims); - part_index++; - if (part_index >= max) break; - } - return part_index; -} +#include "cJSON.h" +#include "mqttLog.h" +#include "mqttPayload.h" -char* converJsonToSql(char* json, char* _dbname, char* _tablename) { - cJSON* jPlayload = cJSON_Parse(json); - char _names[102400] = {0}; - char _values[102400] = {0}; - int i = 0; - int count = cJSON_GetArraySize(jPlayload); - for (; i < count; i++) - { - cJSON* item = cJSON_GetArrayItem(jPlayload, i); - if (cJSON_Object == item->type) { - mqttInfo("The item '%s' is not supported", item->string); - } else { - strcat(_names, item->string); - if (i < count - 1) { - strcat(_names, ","); - } - char* __value_json = cJSON_Print(item); - strcat(_values, __value_json); - free(__value_json); - if (i < count - 1) { - strcat(_values, ","); - } +// subscribe message like this + +/* +/test { + "timestamp": 1599121290, + "gateway": { + "name": "AcuLink 810 Gateway", + "model": "AcuLink810-868", + "serial": "S8P20200207" + }, + "device": { + "name": "Acuvim L V3 .221", + "model": "Acuvim-L-V3", + "serial": "221", + "online": true, + "readings": [ + { + "param": "Freq_Hz", + "value": "59.977539", + "unit": "Hz" + }, + { + "param": "Va_V", + "value": "122.002907", + "unit": "V" + }, + { + "param": "DI4", + "value": "5.000000", + "unit": "" + } + ] } +} +*/ + +// send msg cmd +// mosquitto_pub -h test.mosquitto.org -t "/test" -m '{"timestamp": 1599121290,"gateway": {"name": "AcuLink 810 Gateway","model": "AcuLink810-868","serial": "S8P20200207"},"device": {"name": "Acuvim L V3 .221","model": "Acuvim-L-V3","serial": "221","online": true,"readings": [{"param": "Freq_Hz","value": "59.977539","unit": "Hz"},{"param": "Va_V","value": "122.002907","unit": "V"},{"param": "DI4","value": "5.000000","unit": ""}]}}' + +/* + * This is an example, this function needs to be implemented in order to parse the json file into a sql statement + * Note that you need to create a super table and database before writing data + * In this case: + * create database mqttdb; + * create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16)); + */ + +char* mqttConverJsonToSql(char* json, int maxSize) { + // const int32_t maxSize = 10240; + maxSize *= 5; + char* sql = malloc(maxSize); + + cJSON* root = cJSON_Parse(json); + if (root == NULL) { + mqttError("failed to parse msg, invalid json format"); + goto MQTT_PARSE_OVER; } - cJSON_free(jPlayload); - int sqllen = strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024; - char* _sql = calloc(1, sqllen); - sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values); - return _sql; + + cJSON* timestamp = cJSON_GetObjectItem(root, "timestamp"); + if (!timestamp || timestamp->type != cJSON_Number) { + mqttError("failed to parse msg, timestamp not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* device = cJSON_GetObjectItem(root, "device"); + if (!device) { + mqttError("failed to parse msg, device not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* name = cJSON_GetObjectItem(device, "name"); + if (!name || name->type != cJSON_String) { + mqttError("failed to parse msg, name not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* model = cJSON_GetObjectItem(device, "model"); + if (!model || model->type != cJSON_String) { + mqttError("failed to parse msg, model not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* serial = cJSON_GetObjectItem(device, "serial"); + if (!serial || serial->type != cJSON_String) { + mqttError("failed to parse msg, serial not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* readings = cJSON_GetObjectItem(device, "readings"); + if (!readings || readings->type != cJSON_Array) { + mqttError("failed to parse msg, readings not found"); + goto MQTT_PARSE_OVER; + } + + int count = cJSON_GetArraySize(readings); + if (count <= 0) { + mqttError("failed to parse msg, readings size smaller than 0"); + goto MQTT_PARSE_OVER; + } + + int len = snprintf(sql, maxSize, "insert into"); + + for (int i = 0; i < count; ++i) { + cJSON* reading = cJSON_GetArrayItem(readings, i); + if (reading == NULL) continue; + + cJSON* param = cJSON_GetObjectItem(reading, "param"); + if (!param || param->type != cJSON_String) { + mqttError("failed to parse msg, param not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* value = cJSON_GetObjectItem(reading, "value"); + if (!value || value->type != cJSON_String) { + mqttError("failed to parse msg, value not found"); + goto MQTT_PARSE_OVER; + } + + cJSON* unit = cJSON_GetObjectItem(reading, "unit"); + if (!unit || unit->type != cJSON_String) { + mqttError("failed to parse msg, unit not found"); + goto MQTT_PARSE_OVER; + } + + len += snprintf(sql + len, maxSize - len, + " mqttdb.serial_%s_%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)", + serial->valuestring, param->valuestring, name->valuestring, model->valuestring, serial->valuestring, + param->valuestring, unit->valuestring, timestamp->valueint * 1000, value->valuestring); + } + + cJSON_free(root); + return sql; + +MQTT_PARSE_OVER: + cJSON_free(root); + free(sql); + return NULL; } \ No newline at end of file diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 69810e2785..eacc3b1f74 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -14,244 +14,131 @@ */ #define _DEFAULT_SOURCE - -#include "cJSON.h" +#include "os.h" #include "mqtt.h" #include "mqttInit.h" #include "mqttLog.h" #include "mqttPayload.h" -#include "os.h" +#include "tmqtt.h" #include "posix_sockets.h" -#include "string.h" #include "taos.h" #include "tglobal.h" -#include "tmqtt.h" -#include "tsclient.h" -#include "tsocket.h" -#include "ttimer.h" -#include "mqttSystem.h" -struct mqtt_client mqttClient = {0}; -pthread_t clientDaemonThread = {0}; -void* mqttConnect=NULL; -struct reconnect_state_t recntStatus = {0}; -char* topicPath=NULL; -int mttIsRuning = 1; +#include "taoserror.h" -int32_t mqttInitSystem() { - int rc = 0; -#if 0 - uint8_t sendbuf[2048]; - uint8_t recvbuf[1024]; - recntStatus.sendbuf = sendbuf; - recntStatus.sendbufsz = sizeof(sendbuf); - recntStatus.recvbuf = recvbuf; - recntStatus.recvbufsz = sizeof(recvbuf); - char* url = tsMqttBrokerAddress; - recntStatus.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL; - - char * passStr = strstr(url, recntStatus.user_name); - if (passStr != NULL) { - recntStatus.password = strstr(url, "@") != NULL ? strbetween(passStr, ":", "@") : NULL; - } +struct SMqttReconnectState tsMqttStatus = {0}; +struct mqtt_client tsMqttClient = {0}; +static pthread_t tsMqttClientDaemonThread = {0}; +static void* tsMqttConnect = NULL; +static bool tsMqttIsRuning = false; - if (strlen(url) == 0) { - mqttDebug("mqtt module not init, url is null"); - return rc; - } - - if (strstr(url, "@") != NULL) { - recntStatus.hostname = strbetween(url, "@", ":"); - } else if (strstr(strstr(url, "://") + 3, ":") != NULL) { - recntStatus.hostname = strbetween(url, "//", ":"); - - } else { - recntStatus.hostname = strbetween(url, "//", "/"); - } - - char* _begin_hostname = strstr(url, recntStatus.hostname); - if (_begin_hostname != NULL && strstr(_begin_hostname, ":") != NULL) { - recntStatus.port = strbetween(_begin_hostname, ":", "/"); - } else { - recntStatus.port = strbetween("'1883'", "'", "'"); - } - - char* portStr = recntStatus.hostname; - if (_begin_hostname != NULL) { - char* colonStr = strstr(_begin_hostname, ":"); - if (colonStr != NULL) { - portStr = recntStatus.port; - } - } - - char* topicStr = strstr(url, portStr); - if (topicStr != NULL) { - topicPath = strbetween(topicStr, "/", "/"); - char* _topic = "+/+/+/"; - int _tpsize = strlen(topicPath) + strlen(_topic) + 1; - recntStatus.topic = calloc(1, _tpsize); - sprintf(recntStatus.topic, "/%s/%s", topicPath, _topic); - recntStatus.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt"; - mqttConnect = NULL; - } else { - topicPath = NULL; - } - -#endif - return rc; -} +int32_t mqttInitSystem() { return 0; } int32_t mqttStartSystem() { - int rc = 0; -#if 0 - if (recntStatus.user_name != NULL && recntStatus.password != NULL) { - mqttInfo("connecting to mqtt://%s:%s@%s:%s/%s/", recntStatus.user_name, recntStatus.password, - recntStatus.hostname, recntStatus.port, topicPath); - } else if (recntStatus.user_name != NULL && recntStatus.password == NULL) { - mqttInfo("connecting to mqtt://%s@%s:%s/%s/", recntStatus.user_name, recntStatus.hostname, recntStatus.port, - topicPath); + tsMqttStatus.sendbufsz = MQTT_SEND_BUF_SIZE; + tsMqttStatus.recvbufsz = MQTT_RECV_BUF_SIZE; + tsMqttStatus.sendbuf = malloc(MQTT_SEND_BUF_SIZE); + tsMqttStatus.recvbuf = malloc(MQTT_RECV_BUF_SIZE); + tsMqttIsRuning = true; + + mqtt_init_reconnect(&tsMqttClient, mqttReconnectClient, &tsMqttStatus, mqttPublishCallback); + if (pthread_create(&tsMqttClientDaemonThread, NULL, mqttClientRefresher, &tsMqttClient)) { + mqttError("mqtt failed to start daemon."); + mqttCleanupRes(EXIT_FAILURE, -1, NULL); + return -1; } - mqtt_init_reconnect(&mqttClient, mqttReconnectClient, &recntStatus, mqtt_PublishCallback); - if (pthread_create(&clientDaemonThread, NULL, mqttClientRefresher, &mqttClient)) { - mqttError("Failed to start client daemon."); - mqttCleanup(EXIT_FAILURE, -1, NULL); - rc = -1; - } else { - mqttInfo("listening for '%s' messages.", recntStatus.topic); - } -#endif - return rc; + mqttInfo("mqtt listening for topic:%s messages", tsMqttTopic); + return 0; } void mqttStopSystem() { -#if 0 - mqttClient.error = MQTT_ERROR_SOCKET_ERROR; - mttIsRuning = 0; - usleep(300000U); - mqttCleanup(EXIT_SUCCESS, mqttClient.socketfd, &clientDaemonThread); - mqttInfo("mqtt is stoped"); -#endif + if (tsMqttIsRuning) { + tsMqttIsRuning = false; + tsMqttClient.error = MQTT_ERROR_SOCKET_ERROR; + + taosMsleep(300); + mqttCleanupRes(EXIT_SUCCESS, tsMqttClient.socketfd, &tsMqttClientDaemonThread); + + mqttInfo("mqtt is stopped"); + } } void mqttCleanUpSystem() { -#if 0 - mqttInfo("starting to cleanup mqtt"); - free(recntStatus.user_name); - free(recntStatus.password); - free(recntStatus.hostname); - free(recntStatus.port); - free(recntStatus.topic); - free(topicPath); + mqttStopSystem(); mqttInfo("mqtt is cleaned up"); -#endif } -void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) { - /* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */ - char* topic_name = (char*)malloc(published->topic_name_size + 1); - memcpy(topic_name, published->topic_name, published->topic_name_size); - topic_name[published->topic_name_size] = '\0'; - mqttInfo("received publish('%s'): %s", topic_name, (const char*)published->application_message); - char _token[128] = {0}; - char _dbname[128] = {0}; - char _tablename[128] = {0}; - if (mqttConnect == NULL) { - mqttInfo("connect database"); - taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &mqttClient, &mqttConnect); - } - if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) { - char* p_p_cmd_part[5] = {0}; - char copystr[1024] = {0}; - strncpy(copystr, topic_name, MIN(1024, published->topic_name_size)); - char part_index = split(copystr, "/", p_p_cmd_part, 10); - if (part_index < 4) { - mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'", topic_name); - } else { - strncpy(_token, p_p_cmd_part[1], 127); - strncpy(_dbname, p_p_cmd_part[2], 127); - strncpy(_tablename, p_p_cmd_part[3], 127); - mqttInfo("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, - _tablename); +void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) { + const char* content = published->application_message; + mqttDebug("receive mqtt message, size:%d", (int)published->application_message_size); - if (mqttConnect != NULL) { - char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename); - mqttInfo("query:%s", _sql); - taos_query_a(mqttConnect, _sql, mqttQueryInsertCallback, &mqttClient); - mqttInfo("free sql:%s", _sql); - free(_sql); - } + if (tsMqttConnect == NULL) { + tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0); + if (tsMqttConnect == NULL) { + mqttError("failed to connect to tdengine, reason:%s", tstrerror(terrno)); + return; + } else { + mqttInfo("successfully connected to the tdengine"); } } - free(topic_name); + + mqttTrace("receive mqtt message, content:%s", content); + + char* sql = mqttConverJsonToSql((char*)content, (int)published->application_message_size); + if (sql != NULL) { + void* res = taos_query(tsMqttConnect, sql); + int code = taos_errno(res); + if (code != 0) { + mqttError("failed to exec sql, reason:%s sql:%s", tstrerror(code), sql); + } else { + mqttTrace("successfully to exec sql:%s", sql); + } + taos_free_result(res); + } else { + mqttError("failed to parse mqtt message"); + } } void* mqttClientRefresher(void* client) { - while (mttIsRuning) { + while (tsMqttIsRuning) { mqtt_sync((struct mqtt_client*)client); taosMsleep(100); } - mqttDebug("quit refresher"); + + mqttDebug("mqtt quit refresher"); return NULL; } -void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) { -#if 0 +void mqttCleanupRes(int status, int sockfd, pthread_t* client_daemon) { mqttInfo("clean up mqtt module"); - if (sockfd != -1) close(sockfd); - if (client_daemon != NULL) pthread_cancel(*client_daemon); -#endif -} - -void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) { - if (code < 0) { - mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code)); - taos_close(mqttConnect); - mqttConnect = NULL; - return; + if (sockfd != -1) { + close(sockfd); } - mqttDebug("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code)); -} -void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) { - if (code < 0) { - mqttError("mqtt:%d, save data failed, code:%s", code, tstrerror(code)); - } else if (code == 0) { - mqttError("mqtt:%d, save data failed, affect rows:%d", code, code); - } else { - mqttInfo("mqtt:%d, save data success, code:%s", code, tstrerror(code)); + if (client_daemon != NULL) { + pthread_cancel(*client_daemon); } } -void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr) { - mqttInfo("reconnect client"); - struct reconnect_state_t* reconnect_state = *((struct reconnect_state_t**)reconnect_state_vptr); +void mqttReconnectClient(struct mqtt_client* client, void** unused) { + mqttInfo("mqtt tries to connect to the mqtt server"); - /* Close the clients socket if this isn't the initial reconnect call */ if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { close(client->socketfd); } - /* Perform error handling here. */ if (client->error != MQTT_ERROR_INITIAL_RECONNECT) { - mqttError("mqttReconnectClient: called while client was in error state \"%s\"", mqtt_error_str(client->error)); + mqttError("mqtt client was in error state %s", mqtt_error_str(client->error)); } - /* Open a new socket. */ - int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port); - if (sockfd == -1) { - mqttError("failed to open socket: "); - mqttCleanup(EXIT_FAILURE, sockfd, NULL); + int sockfd = open_nb_socket(tsMqttHostName, tsMqttPort); + if (sockfd < 0) { + mqttError("mqtt client failed to open socket %s:%s", tsMqttHostName, tsMqttPort); + //mqttCleanupRes(EXIT_FAILURE, sockfd, NULL); + return; } - /* Reinitialize the client. */ - mqtt_reinit(client, sockfd, reconnect_state->sendbuf, reconnect_state->sendbufsz, reconnect_state->recvbuf, - reconnect_state->recvbufsz); - - /* Ensure we have a clean session */ - uint8_t connect_flags = MQTT_CONNECT_CLEAN_SESSION; - /* Send connection request to the broker. */ - mqtt_connect(client, reconnect_state->client_id, NULL, NULL, 0, reconnect_state->user_name, reconnect_state->password,connect_flags, 400); - - /* Subscribe to the topic. */ - mqtt_subscribe(client, reconnect_state->topic, 0); + mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); + mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); + mqtt_subscribe(client, tsMqttTopic, 0); } \ No newline at end of file diff --git a/src/query/CMakeLists.txt b/src/query/CMakeLists.txt index e2bee4285f..c1024f0080 100644 --- a/src/query/CMakeLists.txt +++ b/src/query/CMakeLists.txt @@ -8,10 +8,9 @@ INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(query ${SRC}) SET_SOURCE_FILES_PROPERTIES(src/sql.c PROPERTIES COMPILE_FLAGS -w) +TARGET_LINK_LIBRARIES(query tsdb tutil) IF (TD_LINUX) - TARGET_LINK_LIBRARIES(query tsdb tutil m rt) + TARGET_LINK_LIBRARIES(query m rt) ADD_SUBDIRECTORY(tests) -ELSEIF (TD_WINDOWS) - TARGET_LINK_LIBRARIES(query tsdb tutil) ENDIF () diff --git a/src/tsdb/CMakeLists.txt b/src/tsdb/CMakeLists.txt index cef1d0bba7..d86b104f23 100644 --- a/src/tsdb/CMakeLists.txt +++ b/src/tsdb/CMakeLists.txt @@ -3,13 +3,10 @@ PROJECT(TDengine) INCLUDE_DIRECTORIES(inc) AUX_SOURCE_DIRECTORY(src SRC) +ADD_LIBRARY(tsdb ${SRC}) +TARGET_LINK_LIBRARIES(tsdb common tutil) IF (TD_LINUX) - ADD_LIBRARY(tsdb ${SRC}) - TARGET_LINK_LIBRARIES(tsdb common tutil) # Someone has no gtest directory, so comment it # ADD_SUBDIRECTORY(tests) -ELSEIF (TD_WINDOWS) - ADD_LIBRARY(tsdb ${SRC}) - TARGET_LINK_LIBRARIES(tsdb common tutil) ENDIF () diff --git a/src/util/CMakeLists.txt b/src/util/CMakeLists.txt index b2778b69d9..e63a085cc8 100644 --- a/src/util/CMakeLists.txt +++ b/src/util/CMakeLists.txt @@ -3,9 +3,10 @@ PROJECT(TDengine) AUX_SOURCE_DIRECTORY(src SRC) ADD_LIBRARY(tutil ${SRC}) +TARGET_LINK_LIBRARIES(tutil pthread osdetail lz4) IF (TD_LINUX) - TARGET_LINK_LIBRARIES(tutil pthread osdetail m rt lz4) + TARGET_LINK_LIBRARIES(tutil m rt) ADD_SUBDIRECTORY(tests) FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/) @@ -24,7 +25,7 @@ IF (TD_LINUX) ENDIF () ELSEIF (TD_WINDOWS) - TARGET_LINK_LIBRARIES(tutil iconv regex pthread osdetail winmm IPHLPAPI ws2_32 lz4 wepoll) + TARGET_LINK_LIBRARIES(tutil iconv regex winmm IPHLPAPI ws2_32 wepoll) ELSEIF(TD_DARWIN) - TARGET_LINK_LIBRARIES(tutil iconv pthread osdetail lz4) + TARGET_LINK_LIBRARIES(tutil iconv) ENDIF() diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index c294e86839..5a778156ff 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -47,6 +47,15 @@ static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeCtrlFlow(void *handle, int32_t mseconds); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); +#ifndef _SYNC +tsync_h syncStart(const SSyncInfo *info) { return NULL; } +int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } +void syncStop(tsync_h shandle) {} +int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } +int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } +void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} +#endif + int32_t vnodeInitResources() { vnodeInitWriteFp(); vnodeInitReadFp(); @@ -289,12 +298,16 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyFileSynced = vnodeNotifyFileSynced; pVnode->sync = syncStart(&syncInfo); +#ifndef _SYNC + pVnode->role = TAOS_SYNC_ROLE_MASTER; +#else if (pVnode->sync == NULL) { vError("vgId:%d, failed to open sync module, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica, tstrerror(terrno)); vnodeCleanUp(pVnode); return terrno; } +#endif pVnode->qMgmt = qOpenQueryMgmt(pVnode->vgId); if (pVnode->qMgmt == NULL) { diff --git a/src/wal/CMakeLists.txt b/src/wal/CMakeLists.txt index e7d5ae1510..359e09287a 100644 --- a/src/wal/CMakeLists.txt +++ b/src/wal/CMakeLists.txt @@ -7,6 +7,5 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR}/src SRC) IF (TD_LINUX) ADD_LIBRARY(twal ${SRC}) TARGET_LINK_LIBRARIES(twal tutil common) - ADD_SUBDIRECTORY(test) ENDIF () diff --git a/tests/script/general/connection/mqtt.sim b/tests/script/general/connection/mqtt.sim new file mode 100644 index 0000000000..4b291f91ea --- /dev/null +++ b/tests/script/general/connection/mqtt.sim @@ -0,0 +1,19 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c walLevel -v 2 +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4 +system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 100000 +system sh/cfg.sh -n dnode1 -c http -v 1 +system sh/cfg.sh -n dnode1 -c mqtt -v 1 + +system sh/exec.sh -n dnode1 -s start + +sleep 3000 +sql connect +sql create database mqttdb; +sql create table mqttdb.devices(ts timestamp, value double) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16)); + +sleep 1000 +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/general/db/delete_reusevnode.sim b/tests/script/general/db/delete_reusevnode.sim index 79783d6dda..5cfe7729ed 100644 --- a/tests/script/general/db/delete_reusevnode.sim +++ b/tests/script/general/db/delete_reusevnode.sim @@ -50,7 +50,7 @@ $tbPrefix = t $i = 0 while $i < 10 $db = db . $i - sql create database $db maxTables 4 + sql create database $db sql use $db sql create table st (ts timestamp, i int) tags(j int); diff --git a/tests/script/general/db/delete_reusevnode2.sim b/tests/script/general/db/delete_reusevnode2.sim index e54266e312..9fa1969425 100644 --- a/tests/script/general/db/delete_reusevnode2.sim +++ b/tests/script/general/db/delete_reusevnode2.sim @@ -8,7 +8,7 @@ sql connect print ======== step1 -sql create database db maxTables 4; +sql create database db; sql use db $tbPrefix = t diff --git a/tests/script/general/db/dropdnodes.sim b/tests/script/general/db/dropdnodes.sim index be160910c5..884a88490e 100644 --- a/tests/script/general/db/dropdnodes.sim +++ b/tests/script/general/db/dropdnodes.sim @@ -15,7 +15,7 @@ sql connect sql create dnode $hostname2 sleep 2000 -sql create database db maxTables 4 +sql create database db sql use db print ========== step1 diff --git a/tests/script/general/parser/first_last.sim b/tests/script/general/parser/first_last.sim index fa2d7675d2..4598a4735c 100644 --- a/tests/script/general/parser/first_last.sim +++ b/tests/script/general/parser/first_last.sim @@ -22,7 +22,7 @@ $stb = $stbPrefix . $i sql drop database $db -x step1 step1: -sql create database $db maxrows 400 cache 1 maxTables 4 +sql create database $db maxrows 400 cache 1 sql use $db sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int) diff --git a/tests/script/general/parser/groupby.sim b/tests/script/general/parser/groupby.sim index 70edf3535b..1bd56c44f1 100644 --- a/tests/script/general/parser/groupby.sim +++ b/tests/script/general/parser/groupby.sim @@ -28,7 +28,7 @@ $tstart = 100000 sql drop database if exits $db -x step1 step1: -sql create database if not exists $db maxTables 4 keep 36500 +sql create database if not exists $db keep 36500 sql use $db sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12)) diff --git a/tests/script/general/parser/join.sim b/tests/script/general/parser/join.sim index f17e28c1da..7b47c4ff3c 100644 --- a/tests/script/general/parser/join.sim +++ b/tests/script/general/parser/join.sim @@ -24,7 +24,7 @@ $tstart = 100000 sql drop database if exits $db -x step1 step1: -sql create database if not exists $db maxTables 4 keep 36500 +sql create database if not exists $db keep 36500 sql use $db sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12)) diff --git a/tests/script/general/parser/join_multivnode.sim b/tests/script/general/parser/join_multivnode.sim index 4cf1d36672..21ed2e1ab1 100644 --- a/tests/script/general/parser/join_multivnode.sim +++ b/tests/script/general/parser/join_multivnode.sim @@ -22,7 +22,7 @@ $tstart = 100000 sql drop database if exits $db -x step1 step1: -sql create database if not exists $db maxTables 4 keep 36500 +sql create database if not exists $db keep 36500 sql use $db sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12)) diff --git a/tests/script/general/parser/lastrow.sim b/tests/script/general/parser/lastrow.sim index 48f6e65a4f..3eb3ecb24a 100644 --- a/tests/script/general/parser/lastrow.sim +++ b/tests/script/general/parser/lastrow.sim @@ -21,7 +21,7 @@ $stb = $stbPrefix . $i sql drop database $db -x step1 step1: -sql create database $db maxTables 4 +sql create database $db sql use $db sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int) diff --git a/tests/script/general/parser/mixed_blocks.sim b/tests/script/general/parser/mixed_blocks.sim index 569decaa14..b229cd9c89 100644 --- a/tests/script/general/parser/mixed_blocks.sim +++ b/tests/script/general/parser/mixed_blocks.sim @@ -22,7 +22,7 @@ sql drop database if exists $db $paramRows = 200 $rowNum = $paramRows * 4 $rowNum = $rowNum / 5 -sql create database $db maxrows $paramRows maxTables 4 +sql create database $db maxrows $paramRows print ====== create tables sql use $db sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 bool, c6 binary(10), c7 nchar(10)) tags(t1 int) diff --git a/tests/script/general/parser/projection_limit_offset.sim b/tests/script/general/parser/projection_limit_offset.sim index 2b89946ef8..79ab1800d4 100644 --- a/tests/script/general/parser/projection_limit_offset.sim +++ b/tests/script/general/parser/projection_limit_offset.sim @@ -22,7 +22,7 @@ $tstart = 100000 sql drop database if exits $db -x step1 step1: -sql create database if not exists $db maxTables 4 keep 36500 +sql create database if not exists $db keep 36500 sql use $db sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12)) diff --git a/tests/script/general/parser/select_with_tags.sim b/tests/script/general/parser/select_with_tags.sim index 9f944a586b..b46a902997 100644 --- a/tests/script/general/parser/select_with_tags.sim +++ b/tests/script/general/parser/select_with_tags.sim @@ -23,7 +23,7 @@ $tstart = 100000 sql drop database if exists $db -x step1 step1: -sql create database if not exists $db maxTables 4 keep 36500 +sql create database if not exists $db keep 36500 sql use $db sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12)) diff --git a/tests/script/general/parser/single_row_in_tb.sim b/tests/script/general/parser/single_row_in_tb.sim index 2313a98b41..8f5c6db553 100644 --- a/tests/script/general/parser/single_row_in_tb.sim +++ b/tests/script/general/parser/single_row_in_tb.sim @@ -15,7 +15,7 @@ $db = $dbPrefix $stb = $stbPrefix sql drop database if exists $db -sql create database $db maxrows 200 maxTables 4 +sql create database $db maxrows 200 print ====== create tables sql use $db sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 bool, c6 binary(10), c7 nchar(10)) tags(t1 int) diff --git a/tests/script/general/parser/slimit.sim b/tests/script/general/parser/slimit.sim index 2ce3d0c2ac..fedf3f8da8 100644 --- a/tests/script/general/parser/slimit.sim +++ b/tests/script/general/parser/slimit.sim @@ -21,7 +21,7 @@ $db = $dbPrefix . $i $stb = $stbPrefix . $i sql drop database if exists $db -sql create database $db maxrows 200 cache 16 maxTables 4 +sql create database $db maxrows 200 cache 16 print ====== create tables sql use $db sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 binary(15), t2 int, t3 bigint, t4 nchar(10), t5 double, t6 bool) diff --git a/tests/script/general/parser/timestamp.sim b/tests/script/general/parser/timestamp.sim index 28bbc9df0e..a47372bf9d 100644 --- a/tests/script/general/parser/timestamp.sim +++ b/tests/script/general/parser/timestamp.sim @@ -20,7 +20,7 @@ $db = $dbPrefix . $i $stb = $stbPrefix . $i sql drop database if exists $db -sql create database $db maxrows 200 maxTables 4 +sql create database $db maxrows 200 print ====== create tables sql use $db sql create table $stb (ts timestamp, c1 timestamp, c2 int) tags(t1 binary(20)) diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index 710156a4ff..dd3b11c2dc 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -21,7 +21,7 @@ $mt = $mtPrefix . $i sql drop database if exits $db -x step1 step1: -sql create database if not exists $db maxTables 4 +sql create database if not exists $db sql use $db sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int) diff --git a/tests/script/general/stable/dnode3.sim b/tests/script/general/stable/dnode3.sim index 76652229d2..2859f644bb 100644 --- a/tests/script/general/stable/dnode3.sim +++ b/tests/script/general/stable/dnode3.sim @@ -54,7 +54,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql create database $db maxTables 4 +sql create database $db sql use $db sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 40598332d9..88a844333b 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -293,6 +293,7 @@ cd ../../../debug; make ./test.sh -f unique/stable/replica3_dnode6.sim ./test.sh -f unique/stable/replica3_vnode3.sim +./test.sh -f unique/mnode/mgmt20.sim ./test.sh -f unique/mnode/mgmt21.sim ./test.sh -f unique/mnode/mgmt22.sim ./test.sh -f unique/mnode/mgmt23.sim diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 498f536726..0d444a5a6e 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -119,7 +119,7 @@ echo "tsdbDebugFlag 135" >> $TAOS_CFG echo "cDebugFlag 135" >> $TAOS_CFG echo "jnidebugFlag 135" >> $TAOS_CFG echo "odbcdebugFlag 135" >> $TAOS_CFG -echo "httpDebugFlag 143" >> $TAOS_CFG +echo "httpDebugFlag 135" >> $TAOS_CFG echo "monitorDebugFlag 135" >> $TAOS_CFG echo "mqttDebugFlag 135" >> $TAOS_CFG echo "qdebugFlag 135" >> $TAOS_CFG diff --git a/tests/script/unique/arbitrator/dn3_mn1_vnode_change.sim b/tests/script/unique/arbitrator/dn3_mn1_vnode_change.sim index b18edf0670..4f80c2389e 100644 --- a/tests/script/unique/arbitrator/dn3_mn1_vnode_change.sim +++ b/tests/script/unique/arbitrator/dn3_mn1_vnode_change.sim @@ -61,7 +61,7 @@ $totalTableNum = 10 $sleepTimer = 3000 $db = db -sql create database $db replica 2 maxTables $totalTableNum +sql create database $db replica 2 sql use $db # create table , insert data diff --git a/tests/script/unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim b/tests/script/unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim index 4f737b9c40..fb0650b78a 100644 --- a/tests/script/unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim +++ b/tests/script/unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim @@ -61,7 +61,7 @@ $totalTableNum = 4 $sleepTimer = 3000 $db = db -sql create database $db cache 1 replica 2 maxTables $totalTableNum +sql create database $db cache 1 replica 2 sql use $db # create table , insert data diff --git a/tests/script/unique/arbitrator/sync_replica3_createTable.sim b/tests/script/unique/arbitrator/sync_replica3_createTable.sim index 3bf274a3ea..d593577bee 100644 --- a/tests/script/unique/arbitrator/sync_replica3_createTable.sim +++ b/tests/script/unique/arbitrator/sync_replica3_createTable.sim @@ -61,8 +61,8 @@ $totalTableNum = 20 $sleepTimer = 3000 $db = db -print create database $db replica 3 maxTables $totalTableNum -sql create database $db replica 3 maxTables $totalTableNum +print create database $db replica 3 +sql create database $db replica 3 sql use $db # create table , insert data diff --git a/tests/script/unique/arbitrator/sync_replica3_dnodeChang_DropAddAlterTableDropDb.sim b/tests/script/unique/arbitrator/sync_replica3_dnodeChang_DropAddAlterTableDropDb.sim index c8fe96008b..1ef499534f 100644 --- a/tests/script/unique/arbitrator/sync_replica3_dnodeChang_DropAddAlterTableDropDb.sim +++ b/tests/script/unique/arbitrator/sync_replica3_dnodeChang_DropAddAlterTableDropDb.sim @@ -62,8 +62,8 @@ $sleepTimer = 3000 $maxTables = $totalTableNum * 2 $db = db -print create database $db replica 3 maxTables $maxTables -sql create database $db replica 3 maxTables $maxTables +print create database $db replica 3 +sql create database $db replica 3 sql use $db # create table , insert data diff --git a/tests/script/unique/big/maxvnodes.sim b/tests/script/unique/big/maxvnodes.sim index eb6e0b3b53..662d391e47 100644 --- a/tests/script/unique/big/maxvnodes.sim +++ b/tests/script/unique/big/maxvnodes.sim @@ -18,7 +18,7 @@ print ========== prepare data system sh/exec.sh -n dnode1 -s start sleep 3000 sql connect -sql create database db blocks 3 cache 1 maxTables $maxTables +sql create database db blocks 3 cache 1 sql use db print ========== step1 diff --git a/tests/script/unique/big/restartSpeed.sim b/tests/script/unique/big/restartSpeed.sim index 9d490fcb8b..ea4edefda8 100644 --- a/tests/script/unique/big/restartSpeed.sim +++ b/tests/script/unique/big/restartSpeed.sim @@ -16,7 +16,7 @@ print ========== prepare data system sh/exec.sh -n dnode1 -s start sleep 3000 sql connect -sql create database db blocks 3 cache 1 maxTables $maxTables +sql create database db blocks 3 cache 1 sql use db print ========== step1 diff --git a/tests/script/unique/cluster/balance1.sim b/tests/script/unique/cluster/balance1.sim index 44f44e3976..728ead25fe 100644 --- a/tests/script/unique/cluster/balance1.sim +++ b/tests/script/unique/cluster/balance1.sim @@ -40,7 +40,7 @@ print ========= start dnode1 system sh/exec.sh -n dnode1 -s start sql connect -sql create database c_b1_d1 maxTables 4 +sql create database c_b1_d1 sql use c_b1_d1 sql create table c_b1_t1 (t timestamp, i int) @@ -50,7 +50,7 @@ sql insert into c_b1_t1 values(1520000022013, 13) sql insert into c_b1_t1 values(1520000023012, 12) sql insert into c_b1_t1 values(1520000024011, 11) -sql create database c_b1_d2 maxTables 4 +sql create database c_b1_d2 sql use c_b1_d2 sql create table c_b1_t2 (t timestamp, i int) sql insert into c_b1_t2 values(1520000020025, 25) @@ -107,7 +107,7 @@ print dnode2 ==> $dnode2Role print ============================== step3 print ========= add db3 -sql create database c_b1_d3 maxTables 4 +sql create database c_b1_d3 sql use c_b1_d3 sql create table c_b1_t3 (t timestamp, i int) sql insert into c_b1_t3 values(1520000020035, 35) @@ -280,7 +280,7 @@ if $dnode4Role != slave then endi print ============================== step10 -sql create database c_b1_d4 maxTables 4 +sql create database c_b1_d4 sql use c_b1_d4 sql create table c_b1_t4 (t timestamp, i int) sql insert into c_b1_t4 values(1520000020045, 45) @@ -318,7 +318,7 @@ sql use c_b1_d2 sql insert into c_b1_t2 values(1520000025026, 26) print ============================== step12 -sql create database c_b1_d5 maxTables 4 +sql create database c_b1_d5 sql use c_b1_d5 sql_error create table c_b1_t5 (t timestamp, i int) -x error3 @@ -343,7 +343,7 @@ sql insert into c_b1_t5 values(1520000022053, 53) sql insert into c_b1_t5 values(1520000023052, 52) sql insert into c_b1_t5 values(1520000024051, 51) -sql create database c_b1_d6 maxTables 4 +sql create database c_b1_d6 sql use c_b1_d6 sql create table c_b1_t6 (t timestamp, i int) sql insert into c_b1_t6 values(1520000020065, 65) @@ -375,7 +375,7 @@ sql create dnode $hostname6 system sh/exec.sh -n dnode6 -s start sleep 15000 -sql create database c_b1_d7 maxTables 4 +sql create database c_b1_d7 sql use c_b1_d7 sql create table c_b1_t7 (t timestamp, i int) sql insert into c_b1_t7 values(1520000020075, 75) @@ -384,7 +384,7 @@ sql insert into c_b1_t7 values(1520000022073, 73) sql insert into c_b1_t7 values(1520000023072, 72) sql insert into c_b1_t7 values(1520000024071, 71) -sql create database c_b1_d8 maxTables 4 +sql create database c_b1_d8 sql use c_b1_d8 sql create table c_b1_t8 (t timestamp, i int) sql insert into c_b1_t8 values(1520000020085, 85) diff --git a/tests/script/unique/cluster/balance2.sim b/tests/script/unique/cluster/balance2.sim index 0c880d36ff..6d2c4bdad7 100644 --- a/tests/script/unique/cluster/balance2.sim +++ b/tests/script/unique/cluster/balance2.sim @@ -47,7 +47,7 @@ system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode3 -s start sleep 4001 -sql create database c_b2_d1 replica 2 maxTables 4 +sql create database c_b2_d1 replica 2 sql use c_b2_d1 sql create table c_b2_t1 (t timestamp, i int) sql insert into c_b2_t1 values(1520000020015, 15) @@ -56,7 +56,7 @@ sql insert into c_b2_t1 values(1520000022013, 13) sql insert into c_b2_t1 values(1520000023012, 12) sql insert into c_b2_t1 values(1520000024011, 11) -sql create database c_b2_d2 replica 2 maxTables 4 +sql create database c_b2_d2 replica 2 sql use c_b2_d2 sql create table c_b2_t2 (t timestamp, i int) sql insert into c_b2_t2 values(1520000020025, 25) @@ -65,7 +65,7 @@ sql insert into c_b2_t2 values(1520000022023, 23) sql insert into c_b2_t2 values(1520000023022, 22) sql insert into c_b2_t2 values(1520000024021, 21) -sql create database c_b2_d3 replica 2 maxTables 4 +sql create database c_b2_d3 replica 2 sql use c_b2_d3 sql create table c_b2_t3 (t timestamp, i int) sql insert into c_b2_t3 values(1520000020035, 35) diff --git a/tests/script/unique/cluster/cluster_main.sim b/tests/script/unique/cluster/cluster_main.sim index 7d4a35c7f2..f0a9b1a214 100644 --- a/tests/script/unique/cluster/cluster_main.sim +++ b/tests/script/unique/cluster/cluster_main.sim @@ -57,7 +57,7 @@ sleep 3000 print ============== step2: create db1 with replica 3 $db = db1 print create database $db replica 3 -#sql create database $db replica 3 maxTables $totalTableNum +#sql create database $db replica 3 sql create database $db replica 3 sql use $db diff --git a/tests/script/unique/cluster/cluster_main0.sim b/tests/script/unique/cluster/cluster_main0.sim index ef76f8823f..9f775c0cef 100644 --- a/tests/script/unique/cluster/cluster_main0.sim +++ b/tests/script/unique/cluster/cluster_main0.sim @@ -57,7 +57,7 @@ sleep 3000 print ============== step2: create db1 with replica 3 $db = db1 print create database $db replica 3 -#sql create database $db replica 3 maxTables $totalTableNum +#sql create database $db replica 3 sql create database $db replica 3 sql use $db diff --git a/tests/script/unique/cluster/cluster_main1.sim b/tests/script/unique/cluster/cluster_main1.sim index 4537b09091..7796f1ea00 100644 --- a/tests/script/unique/cluster/cluster_main1.sim +++ b/tests/script/unique/cluster/cluster_main1.sim @@ -58,7 +58,7 @@ print ============== step2: create db1 with replica 3 $replica = 3 $db = db1 print create database $db replica $replica -#sql create database $db replica 3 maxTables $totalTableNum +#sql create database $db replica 3 sql create database $db replica $replica sql use $db diff --git a/tests/script/unique/cluster/cluster_main2.sim b/tests/script/unique/cluster/cluster_main2.sim index 84ea3871ef..4866154681 100644 --- a/tests/script/unique/cluster/cluster_main2.sim +++ b/tests/script/unique/cluster/cluster_main2.sim @@ -58,7 +58,7 @@ print ============== step2: create db1 with replica 3 $replica = 3 $db = db1 print create database $db replica $replica -#sql create database $db replica 3 maxTables $totalTableNum +#sql create database $db replica 3 sql create database $db replica $replica sql use $db diff --git a/tests/script/unique/dnode/alternativeRole.sim b/tests/script/unique/dnode/alternativeRole.sim index fb9d344d20..ab37c1603a 100644 --- a/tests/script/unique/dnode/alternativeRole.sim +++ b/tests/script/unique/dnode/alternativeRole.sim @@ -56,7 +56,7 @@ if $data2_3 != slave then endi print ========== step2 -sql create database d1 maxTables 4 +sql create database d1 sql create table d1.t1 (ts timestamp, i int) sql create table d1.t2 (ts timestamp, i int) sql create table d1.t3 (ts timestamp, i int) diff --git a/tests/script/unique/dnode/balance1.sim b/tests/script/unique/dnode/balance1.sim index 5b1615dc28..b246197742 100644 --- a/tests/script/unique/dnode/balance1.sim +++ b/tests/script/unique/dnode/balance1.sim @@ -30,7 +30,7 @@ system sh/exec.sh -n dnode1 -s start sql connect sleep 3000 -sql create database d1 maxTables 4 +sql create database d1 sql create table d1.t1 (t timestamp, i int) sql insert into d1.t1 values(now+1s, 15) sql insert into d1.t1 values(now+2s, 14) @@ -68,7 +68,7 @@ if $data2_2 != 1 then endi print ========== step3 -sql create database d2 maxTables 4 +sql create database d2 sql create table d2.t2 (t timestamp, i int) sql insert into d2.t2 values(now+1s, 25) sql insert into d2.t2 values(now+2s, 24) @@ -139,7 +139,7 @@ if $data2_3 != 2 then endi print ========== step6 -sql create database d3 maxTables 4 +sql create database d3 sql create table d3.t3 (t timestamp, i int) sql insert into d3.t3 values(now+1s, 35) sql insert into d3.t3 values(now+2s, 34) @@ -193,7 +193,7 @@ if $data2_4 != 1 then endi print ========== step8 -sql create database d4 maxTables 4 +sql create database d4 sql create table d4.t4 (t timestamp, i int) sql insert into d4.t4 values(now+1s, 45) sql insert into d4.t4 values(now+2s, 44) diff --git a/tests/script/unique/dnode/balance2.sim b/tests/script/unique/dnode/balance2.sim index e23562d8b4..1a80e890a0 100644 --- a/tests/script/unique/dnode/balance2.sim +++ b/tests/script/unique/dnode/balance2.sim @@ -28,7 +28,7 @@ system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode3 -s start sleep 3000 -sql create database d1 replica 2 maxTables 4 +sql create database d1 replica 2 sql create table d1.t1 (t timestamp, i int) sql insert into d1.t1 values(now+1s, 15) sql insert into d1.t1 values(now+2s, 14) @@ -36,7 +36,7 @@ sql insert into d1.t1 values(now+3s, 13) sql insert into d1.t1 values(now+4s, 12) sql insert into d1.t1 values(now+5s, 11) -sql create database d2 replica 2 maxTables 4 +sql create database d2 replica 2 sql create table d2.t2 (t timestamp, i int) sql insert into d2.t2 values(now+1s, 25) sql insert into d2.t2 values(now+2s, 24) @@ -117,7 +117,7 @@ if $data2_4 != 2 then endi print ========== step4 -sql create database d3 replica 2 maxTables 4 +sql create database d3 replica 2 sql create table d3.t3 (t timestamp, i int) sql insert into d3.t3 values(now+1s, 35) sql insert into d3.t3 values(now+2s, 34) diff --git a/tests/script/unique/dnode/balance3.sim b/tests/script/unique/dnode/balance3.sim index b2adb24dfa..1f81fde968 100644 --- a/tests/script/unique/dnode/balance3.sim +++ b/tests/script/unique/dnode/balance3.sim @@ -33,7 +33,7 @@ system sh/exec.sh -n dnode3 -s start system sh/exec.sh -n dnode4 -s start sleep 3000 -sql create database d1 replica 3 maxTables 4 +sql create database d1 replica 3 sql create table d1.t1 (t timestamp, i int) sql insert into d1.t1 values(now+1s, 15) sql insert into d1.t1 values(now+2s, 14) @@ -41,7 +41,7 @@ sql insert into d1.t1 values(now+3s, 13) sql insert into d1.t1 values(now+4s, 12) sql insert into d1.t1 values(now+5s, 11) -sql create database d2 replica 3 maxTables 4 +sql create database d2 replica 3 sql create table d2.t2 (t timestamp, i int) sql insert into d2.t2 values(now+1s, 25) sql insert into d2.t2 values(now+2s, 24) @@ -136,7 +136,7 @@ if $data2_5 != 2 then endi print ========== step4 -sql create database d3 replica 3 maxTables 4 +sql create database d3 replica 3 sql create table d3.t3 (t timestamp, i int) sql insert into d3.t3 values(now+1s, 35) sql insert into d3.t3 values(now+2s, 34) diff --git a/tests/script/unique/dnode/balancex.sim b/tests/script/unique/dnode/balancex.sim index 0d5da5bbf6..6f3b7dfb74 100644 --- a/tests/script/unique/dnode/balancex.sim +++ b/tests/script/unique/dnode/balancex.sim @@ -20,7 +20,7 @@ system sh/exec.sh -n dnode1 -s start sql connect sleep 3000 -sql create database d1 maxTables 4 +sql create database d1 sql create table d1.t1 (t timestamp, i int) sql insert into d1.t1 values(now+1s, 15) sql insert into d1.t1 values(now+2s, 14) @@ -28,7 +28,7 @@ sql insert into d1.t1 values(now+3s, 13) sql insert into d1.t1 values(now+4s, 12) sql insert into d1.t1 values(now+5s, 11) -sql create database d2 maxTables 4 +sql create database d2 sql create table d2.t2 (t timestamp, i int) sql insert into d2.t2 values(now+1s, 25) sql insert into d2.t2 values(now+2s, 24) @@ -65,7 +65,7 @@ if $data2_2 != 2 then endi print ========== step3 -sql create database d3 replica 2 maxTables 4 +sql create database d3 replica 2 sql create table d3.t3 (t timestamp, i int) sql insert into d3.t3 values(now+1s, 35) sql insert into d3.t3 values(now+2s, 34) diff --git a/tests/script/unique/dnode/offline2.sim b/tests/script/unique/dnode/offline2.sim index 6db6ca4013..2d1467ad59 100644 --- a/tests/script/unique/dnode/offline2.sim +++ b/tests/script/unique/dnode/offline2.sim @@ -27,7 +27,7 @@ sql create dnode $hostname2 system sh/exec.sh -n dnode2 -s start sleep 3000 -sql create database d1 replica 2 maxTables 4 +sql create database d1 replica 2 sql create table d1.t1(ts timestamp, i int) sql insert into d1.t1 values(1588262400001, 1) diff --git a/tests/script/unique/dnode/remove1.sim b/tests/script/unique/dnode/remove1.sim index 246808c56c..7786b9f9d1 100644 --- a/tests/script/unique/dnode/remove1.sim +++ b/tests/script/unique/dnode/remove1.sim @@ -20,7 +20,7 @@ system sh/exec.sh -n dnode1 -s start sleep 3000 sql connect -sql create database d1 maxTables 4 +sql create database d1 sql create table d1.t1 (t timestamp, i int) sql insert into d1.t1 values(now+1s, 15) sql insert into d1.t1 values(now+2s, 14) @@ -28,7 +28,7 @@ sql insert into d1.t1 values(now+3s, 13) sql insert into d1.t1 values(now+4s, 12) sql insert into d1.t1 values(now+5s, 11) -sql create database d2 maxTables 4 +sql create database d2 sql create table d2.t2 (t timestamp, i int) sql insert into d2.t2 values(now+1s, 25) sql insert into d2.t2 values(now+2s, 24) @@ -47,7 +47,7 @@ sql create dnode $hostname2 system sh/exec.sh -n dnode2 -s start sleep 9000 -sql create database d3 replica 2 maxTables 4 +sql create database d3 replica 2 sql create table d3.t3 (t timestamp, i int) sql insert into d3.t3 values(now+1s, 35) sql insert into d3.t3 values(now+2s, 34) diff --git a/tests/script/unique/dnode/remove2.sim b/tests/script/unique/dnode/remove2.sim index cf9954c767..cd0331235a 100644 --- a/tests/script/unique/dnode/remove2.sim +++ b/tests/script/unique/dnode/remove2.sim @@ -20,7 +20,7 @@ system sh/exec.sh -n dnode1 -s start sleep 3000 sql connect -sql create database d1 maxTables 4 +sql create database d1 sql create table d1.t1 (t timestamp, i int) sql insert into d1.t1 values(1588262400001, 15) sql insert into d1.t1 values(1588262400002, 14) @@ -28,7 +28,7 @@ sql insert into d1.t1 values(1588262400003, 13) sql insert into d1.t1 values(1588262400004, 12) sql insert into d1.t1 values(1588262400005, 11) -sql create database d2 maxTables 4 +sql create database d2 sql create table d2.t2 (t timestamp, i int) sql insert into d2.t2 values(1588262400001, 25) sql insert into d2.t2 values(1588262400002, 24) @@ -47,7 +47,7 @@ sql create dnode $hostname2 system sh/exec.sh -n dnode2 -s start sleep 9000 -sql create database d3 replica 2 maxTables 4 +sql create database d3 replica 2 sql create table d3.t3 (t timestamp, i int) sql insert into d3.t3 values(1588262400001, 35) sql insert into d3.t3 values(1588262400002, 34) diff --git a/tests/script/unique/dnode/vnode_clean.sim b/tests/script/unique/dnode/vnode_clean.sim index e1ee1da2aa..6df4bf78e8 100644 --- a/tests/script/unique/dnode/vnode_clean.sim +++ b/tests/script/unique/dnode/vnode_clean.sim @@ -19,7 +19,7 @@ print ========== step1 system sh/exec.sh -n dnode1 -s start sql connect -sql create database d1 maxTables 4 +sql create database d1 sql create table d1.t1 (t timestamp, i int) sql insert into d1.t1 values(now+1s, 15) sql insert into d1.t1 values(now+2s, 14) @@ -55,7 +55,7 @@ if $data2_2 != 1 then endi print ========== step3 -sql create database d2 maxTables 4 +sql create database d2 sql create table d2.t2 (t timestamp, i int) sql insert into d2.t2 values(now+1s, 25) @@ -123,7 +123,7 @@ if $data2_3 != 2 then endi print ========== step6 -sql create database d3 maxTables 4 +sql create database d3 sql create table d3.t3 (t timestamp, i int) sql insert into d3.t3 values(now+1s, 35) sql insert into d3.t3 values(now+2s, 34) diff --git a/tests/script/unique/mnode/mgmt20.sim b/tests/script/unique/mnode/mgmt20.sim new file mode 100644 index 0000000000..e51d429925 --- /dev/null +++ b/tests/script/unique/mnode/mgmt20.sim @@ -0,0 +1,71 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 2 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 2 + +system sh/cfg.sh -n dnode1 -c monitor -v 1 +system sh/cfg.sh -n dnode2 -c monitor -v 1 + +print ============== step1 +system sh/exec.sh -n dnode1 -s start +system sh/exec.sh -n dnode2 -s start +sleep 3000 +sql connect + +print ============== step2 +sql create dnode $hostname2 + +$x = 0 +show2: + $x = $x + 1 + sleep 2000 + if $x == 10 then + return -1 + endi + +sql show mnodes +print dnode1 ==> $data2_1 +print dnode2 ==> $data2_2 +if $data2_1 != master then + goto show2 +endi +if $data2_2 != slave then + goto show2 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT + +print ============== step3 +system sh/exec.sh -n dnode2 -s start +sleep 10000 + +system sh/exec.sh -n dnode1 -s start +sql connect + +print =============== step4 +sql select * from log.dn1 +$d1_first = $rows +sql select * from log.dn2 +$d2_first = $rows + +sleep 3000 +sql select * from log.dn1 +$d1_second = $rows +sql select * from log.dn2 +$d2_second = $rows + +print dnode1 $d1_first $d1_second +print dnode2 $d2_first $d2_second +if $d1_first >= $d1_second then + return -1 +endi + +if $d2_first >= $d2_second then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/unique/stable/balance_replica1.sim b/tests/script/unique/stable/balance_replica1.sim index 362f8ccf7f..3ea158eb39 100644 --- a/tests/script/unique/stable/balance_replica1.sim +++ b/tests/script/unique/stable/balance_replica1.sim @@ -29,7 +29,7 @@ $db = $dbPrefix $mt = $mtPrefix $st = $stPrefix . $i -sql create database $db maxTables 4 +sql create database $db sql use $db sql create table $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int) diff --git a/tests/script/unique/stable/dnode2.sim b/tests/script/unique/stable/dnode2.sim index 441323ff3c..5c227f8cec 100644 --- a/tests/script/unique/stable/dnode2.sim +++ b/tests/script/unique/stable/dnode2.sim @@ -38,7 +38,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql create database $db maxTables 4 +sql create database $db sql use $db sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) diff --git a/tests/script/unique/stable/dnode2_stop.sim b/tests/script/unique/stable/dnode2_stop.sim index d25c68cbba..19c6de33b3 100644 --- a/tests/script/unique/stable/dnode2_stop.sim +++ b/tests/script/unique/stable/dnode2_stop.sim @@ -43,7 +43,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql create database $db maxTables 4 +sql create database $db sql use $db sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) diff --git a/tests/script/unique/stable/dnode3.sim b/tests/script/unique/stable/dnode3.sim index 712ec04b8c..5fe37faa71 100644 --- a/tests/script/unique/stable/dnode3.sim +++ b/tests/script/unique/stable/dnode3.sim @@ -47,7 +47,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql create database $db maxTables 4 +sql create database $db sql use $db sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) diff --git a/tests/script/unique/stable/replica2_dnode4.sim b/tests/script/unique/stable/replica2_dnode4.sim index c7a9767fcc..4f8211d5d4 100644 --- a/tests/script/unique/stable/replica2_dnode4.sim +++ b/tests/script/unique/stable/replica2_dnode4.sim @@ -54,7 +54,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql create database $db replica 2 maxTables 4 +sql create database $db replica 2 sql use $db sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) diff --git a/tests/script/unique/stable/replica2_vnode3.sim b/tests/script/unique/stable/replica2_vnode3.sim index 84af380106..47d45c3d3d 100644 --- a/tests/script/unique/stable/replica2_vnode3.sim +++ b/tests/script/unique/stable/replica2_vnode3.sim @@ -37,7 +37,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql create database $db replica 2 maxTables 4 +sql create database $db replica 2 sql use $db sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) diff --git a/tests/script/unique/stable/replica3_dnode6.sim b/tests/script/unique/stable/replica3_dnode6.sim index 83ef334301..eeffb86cdb 100644 --- a/tests/script/unique/stable/replica3_dnode6.sim +++ b/tests/script/unique/stable/replica3_dnode6.sim @@ -78,7 +78,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql create database $db replica 3 maxTables 4 +sql create database $db replica 3 sql use $db sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) diff --git a/tests/script/unique/stable/replica3_vnode3.sim b/tests/script/unique/stable/replica3_vnode3.sim index 3cd7b92477..bc700b7dda 100644 --- a/tests/script/unique/stable/replica3_vnode3.sim +++ b/tests/script/unique/stable/replica3_vnode3.sim @@ -54,7 +54,7 @@ $i = 0 $db = $dbPrefix . $i $mt = $mtPrefix . $i -sql create database $db replica 3 maxTables 4 +sql create database $db replica 3 sql use $db sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int) diff --git a/tests/script/unique/stream/metrics_balance.sim b/tests/script/unique/stream/metrics_balance.sim index c043044499..36086fe4b8 100644 --- a/tests/script/unique/stream/metrics_balance.sim +++ b/tests/script/unique/stream/metrics_balance.sim @@ -32,7 +32,7 @@ sql connect print ============== step1 $db = $dbPrefix -sql create database $db maxTables 4 +sql create database $db sql use $db $i = 0 diff --git a/tests/script/unique/stream/table_balance.sim b/tests/script/unique/stream/table_balance.sim index 6c8958aa48..facb7df459 100644 --- a/tests/script/unique/stream/table_balance.sim +++ b/tests/script/unique/stream/table_balance.sim @@ -34,7 +34,7 @@ $db = $dbPrefix $mt = $mtPrefix $st = $stPrefix . $i -sql create database $db maxTables 4 +sql create database $db sql use $db sql create table $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int) diff --git a/tests/script/unique/stream/table_move.sim b/tests/script/unique/stream/table_move.sim index d3ea375e1f..d2437e4920 100644 --- a/tests/script/unique/stream/table_move.sim +++ b/tests/script/unique/stream/table_move.sim @@ -65,7 +65,7 @@ $db = $dbPrefix . $i $mt = $mtPrefix . $i $st = $stPrefix . $i -sql create database $db maxTables 4 +sql create database $db sql use $db sql create table $mt (ts timestamp, tbcol int, tbcol2 float) TAGS(tgcol int)