[td-225] merge with develop branch
This commit is contained in:
commit
29f4cea279
|
@ -41,3 +41,27 @@ pysim/
|
||||||
|
|
||||||
# Doxygen Generated files
|
# Doxygen Generated files
|
||||||
html/
|
html/
|
||||||
|
/.vs
|
||||||
|
/CMakeFiles/3.10.2
|
||||||
|
/CMakeCache.txt
|
||||||
|
/Makefile
|
||||||
|
/*.cmake
|
||||||
|
/deps
|
||||||
|
/src/cq/test/CMakeFiles/cqtest.dir/*.cmake
|
||||||
|
*.cmake
|
||||||
|
/src/cq/test/CMakeFiles/cqtest.dir/*.make
|
||||||
|
*.make
|
||||||
|
link.txt
|
||||||
|
*.internal
|
||||||
|
*.includecache
|
||||||
|
*.marks
|
||||||
|
Makefile
|
||||||
|
CMakeError.log
|
||||||
|
*.log
|
||||||
|
/CMakeFiles/CMakeRuleHashes.txt
|
||||||
|
/CMakeFiles/Makefile2
|
||||||
|
/CMakeFiles/TargetDirectories.txt
|
||||||
|
/CMakeFiles/cmake.check_cache
|
||||||
|
/out/isenseconfig/WSL-Clang-Debug
|
||||||
|
/out/isenseconfig/WSL-GCC-Debug
|
||||||
|
/test/cfg
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
{
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "WSL-GCC-Debug",
|
||||||
|
"generator": "Unix Makefiles",
|
||||||
|
"configurationType": "Debug",
|
||||||
|
"buildRoot": "${projectDir}\\build\\",
|
||||||
|
"installRoot": "${projectDir}\\out\\install\\${name}",
|
||||||
|
"cmakeExecutable": "/usr/bin/cmake",
|
||||||
|
"cmakeCommandArgs": "",
|
||||||
|
"buildCommandArgs": "",
|
||||||
|
"ctestCommandArgs": "",
|
||||||
|
"inheritEnvironments": [ "linux_x64" ],
|
||||||
|
"wslPath": "${defaultWSLPath}",
|
||||||
|
"addressSanitizerRuntimeFlags": "detect_leaks=0",
|
||||||
|
"variables": [
|
||||||
|
{
|
||||||
|
"name": "CMAKE_INSTALL_PREFIX",
|
||||||
|
"value": "/mnt/d/TDengine/TDengine/build",
|
||||||
|
"type": "PATH"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
|
@ -103,6 +103,7 @@ extern int32_t tsOfflineThreshold;
|
||||||
extern int32_t tsMgmtEqualVnodeNum;
|
extern int32_t tsMgmtEqualVnodeNum;
|
||||||
|
|
||||||
extern int32_t tsEnableHttpModule;
|
extern int32_t tsEnableHttpModule;
|
||||||
|
extern int32_t tsEnableMqttModule;
|
||||||
extern int32_t tsEnableMonitorModule;
|
extern int32_t tsEnableMonitorModule;
|
||||||
|
|
||||||
extern int32_t tsRestRowLimit;
|
extern int32_t tsRestRowLimit;
|
||||||
|
@ -147,6 +148,7 @@ extern int32_t jniDebugFlag;
|
||||||
extern int32_t tmrDebugFlag;
|
extern int32_t tmrDebugFlag;
|
||||||
extern int32_t sdbDebugFlag;
|
extern int32_t sdbDebugFlag;
|
||||||
extern int32_t httpDebugFlag;
|
extern int32_t httpDebugFlag;
|
||||||
|
extern int32_t mqttDebugFlag;
|
||||||
extern int32_t monitorDebugFlag;
|
extern int32_t monitorDebugFlag;
|
||||||
extern int32_t uDebugFlag;
|
extern int32_t uDebugFlag;
|
||||||
extern int32_t rpcDebugFlag;
|
extern int32_t rpcDebugFlag;
|
||||||
|
|
|
@ -120,6 +120,7 @@ int32_t tsOfflineThreshold = 86400*100; // seconds 10days
|
||||||
int32_t tsMgmtEqualVnodeNum = 4;
|
int32_t tsMgmtEqualVnodeNum = 4;
|
||||||
|
|
||||||
int32_t tsEnableHttpModule = 1;
|
int32_t tsEnableHttpModule = 1;
|
||||||
|
int32_t tsEnableMqttModule = 0; // not finished yet, not started it by default
|
||||||
int32_t tsEnableMonitorModule = 0;
|
int32_t tsEnableMonitorModule = 0;
|
||||||
|
|
||||||
int32_t tsRestRowLimit = 10240;
|
int32_t tsRestRowLimit = 10240;
|
||||||
|
@ -134,6 +135,7 @@ int32_t cDebugFlag = 135;
|
||||||
int32_t jniDebugFlag = 131;
|
int32_t jniDebugFlag = 131;
|
||||||
int32_t odbcDebugFlag = 131;
|
int32_t odbcDebugFlag = 131;
|
||||||
int32_t httpDebugFlag = 131;
|
int32_t httpDebugFlag = 131;
|
||||||
|
int32_t mqttDebugFlag = 131;
|
||||||
int32_t monitorDebugFlag = 131;
|
int32_t monitorDebugFlag = 131;
|
||||||
int32_t qDebugFlag = 131;
|
int32_t qDebugFlag = 131;
|
||||||
int32_t rpcDebugFlag = 135;
|
int32_t rpcDebugFlag = 135;
|
||||||
|
@ -212,6 +214,7 @@ void taosSetAllDebugFlag() {
|
||||||
jniDebugFlag = debugFlag;
|
jniDebugFlag = debugFlag;
|
||||||
odbcDebugFlag = debugFlag;
|
odbcDebugFlag = debugFlag;
|
||||||
httpDebugFlag = debugFlag;
|
httpDebugFlag = debugFlag;
|
||||||
|
mqttDebugFlag = debugFlag;
|
||||||
monitorDebugFlag = debugFlag;
|
monitorDebugFlag = debugFlag;
|
||||||
rpcDebugFlag = debugFlag;
|
rpcDebugFlag = debugFlag;
|
||||||
uDebugFlag = debugFlag;
|
uDebugFlag = debugFlag;
|
||||||
|
@ -890,6 +893,17 @@ static void doInitGlobalConfig() {
|
||||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||||
taosInitConfigOption(cfg);
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
|
|
||||||
|
cfg.option = "mqtt";
|
||||||
|
cfg.ptr = &tsEnableMqttModule;
|
||||||
|
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||||
|
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||||
|
cfg.minValue = 0;
|
||||||
|
cfg.maxValue = 1;
|
||||||
|
cfg.ptrLength = 1;
|
||||||
|
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||||
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
cfg.option = "monitor";
|
cfg.option = "monitor";
|
||||||
cfg.ptr = &tsEnableMonitorModule;
|
cfg.ptr = &tsEnableMonitorModule;
|
||||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||||
|
@ -1112,6 +1126,17 @@ static void doInitGlobalConfig() {
|
||||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||||
taosInitConfigOption(cfg);
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
|
cfg.option = "mqttDebugFlag";
|
||||||
|
cfg.ptr = &mqttDebugFlag;
|
||||||
|
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||||
|
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG;
|
||||||
|
cfg.minValue = 0;
|
||||||
|
cfg.maxValue = 255;
|
||||||
|
cfg.ptrLength = 0;
|
||||||
|
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||||
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
|
|
||||||
cfg.option = "monitorDebugFlag";
|
cfg.option = "monitorDebugFlag";
|
||||||
cfg.ptr = &monitorDebugFlag;
|
cfg.ptr = &monitorDebugFlag;
|
||||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||||
|
|
|
@ -29,6 +29,7 @@ void extractTableName(const char* tableId, char* name) {
|
||||||
size_t s2 = strcspn(&tableId[s1 + 1], &TS_PATH_DELIMITER[0]);
|
size_t s2 = strcspn(&tableId[s1 + 1], &TS_PATH_DELIMITER[0]);
|
||||||
|
|
||||||
strncpy(name, &tableId[s1 + s2 + 2], TSDB_TABLE_NAME_LEN);
|
strncpy(name, &tableId[s1 + s2 + 2], TSDB_TABLE_NAME_LEN);
|
||||||
|
name[TSDB_TABLE_NAME_LEN] = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* extractDBName(const char* tableId, char* name) {
|
char* extractDBName(const char* tableId, char* name) {
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <errno.h>
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
@ -64,7 +65,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
|
||||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||||
|
|
||||||
SCqContext *pContext = calloc(sizeof(SCqContext), 1);
|
SCqContext *pContext = calloc(sizeof(SCqContext), 1);
|
||||||
if (pContext == NULL) return NULL;
|
if (pContext == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
strcpy(pContext->user, pCfg->user);
|
strcpy(pContext->user, pCfg->user);
|
||||||
strcpy(pContext->pass, pCfg->pass);
|
strcpy(pContext->pass, pCfg->pass);
|
||||||
|
@ -82,6 +86,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||||
|
|
||||||
void cqClose(void *handle) {
|
void cqClose(void *handle) {
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
|
if (handle == NULL) return;
|
||||||
|
|
||||||
// stop all CQs
|
// stop all CQs
|
||||||
cqStop(pContext);
|
cqStop(pContext);
|
||||||
|
@ -106,9 +111,9 @@ void cqClose(void *handle) {
|
||||||
|
|
||||||
void cqStart(void *handle) {
|
void cqStart(void *handle) {
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
|
||||||
if (pContext->dbConn || pContext->master) return;
|
if (pContext->dbConn || pContext->master) return;
|
||||||
|
|
||||||
|
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
pContext->master = 1;
|
pContext->master = 1;
|
||||||
|
|
|
@ -16,7 +16,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
|
||||||
ADD_EXECUTABLE(taosd ${SRC})
|
ADD_EXECUTABLE(taosd ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal vnode cJson lz4)
|
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http mqtt tsdb twal vnode cJson lz4)
|
||||||
|
|
||||||
IF (TD_ACCOUNT)
|
IF (TD_ACCOUNT)
|
||||||
TARGET_LINK_LIBRARIES(taosd account)
|
TARGET_LINK_LIBRARIES(taosd account)
|
||||||
|
|
|
@ -37,6 +37,7 @@ static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED;
|
||||||
int32_t dnodeInitSystem() {
|
int32_t dnodeInitSystem() {
|
||||||
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
|
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE);
|
||||||
tscEmbedded = 1;
|
tscEmbedded = 1;
|
||||||
|
taosBlockSIGPIPE();
|
||||||
taosResolveCRC();
|
taosResolveCRC();
|
||||||
taosInitGlobalCfg();
|
taosInitGlobalCfg();
|
||||||
taosReadGlobalLogCfg();
|
taosReadGlobalLogCfg();
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
#include "http.h"
|
#include "http.h"
|
||||||
|
#include "mqtt.h"
|
||||||
#include "monitor.h"
|
#include "monitor.h"
|
||||||
#include "dnodeInt.h"
|
#include "dnodeInt.h"
|
||||||
#include "dnodeModule.h"
|
#include "dnodeModule.h"
|
||||||
|
@ -62,6 +63,16 @@ static void dnodeAllocModules() {
|
||||||
dnodeSetModuleStatus(TSDB_MOD_HTTP);
|
dnodeSetModuleStatus(TSDB_MOD_HTTP);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsModule[TSDB_MOD_MQTT].enable = (tsEnableMqttModule == 1);
|
||||||
|
tsModule[TSDB_MOD_MQTT].name = "mqtt";
|
||||||
|
tsModule[TSDB_MOD_MQTT].initFp = mqttInitSystem;
|
||||||
|
tsModule[TSDB_MOD_MQTT].cleanUpFp = mqttCleanUpSystem;
|
||||||
|
tsModule[TSDB_MOD_MQTT].startFp = mqttStartSystem;
|
||||||
|
tsModule[TSDB_MOD_MQTT].stopFp = mqttStopSystem;
|
||||||
|
if (tsEnableMqttModule) {
|
||||||
|
dnodeSetModuleStatus(TSDB_MOD_MQTT);
|
||||||
|
}
|
||||||
|
|
||||||
tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1);
|
tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1);
|
||||||
tsModule[TSDB_MOD_MONITOR].name = "monitor";
|
tsModule[TSDB_MOD_MONITOR].name = "monitor";
|
||||||
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
|
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_MQTT_H
|
||||||
|
#define TDENGINE_MQTT_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
int32_t mqttGetReqCount();
|
||||||
|
int32_t mqttInitSystem();
|
||||||
|
int32_t mqttStartSystem();
|
||||||
|
void mqttStopSystem();
|
||||||
|
void mqttCleanUpSystem();
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -375,6 +375,7 @@ typedef enum {
|
||||||
TSDB_MOD_MGMT,
|
TSDB_MOD_MGMT,
|
||||||
TSDB_MOD_HTTP,
|
TSDB_MOD_HTTP,
|
||||||
TSDB_MOD_MONITOR,
|
TSDB_MOD_MONITOR,
|
||||||
|
TSDB_MOD_MQTT,
|
||||||
TSDB_MOD_MAX
|
TSDB_MOD_MAX
|
||||||
} EModuleType;
|
} EModuleType;
|
||||||
|
|
||||||
|
|
|
@ -94,8 +94,8 @@ typedef void* tsync_h;
|
||||||
|
|
||||||
tsync_h syncStart(const SSyncInfo *);
|
tsync_h syncStart(const SSyncInfo *);
|
||||||
void syncStop(tsync_h shandle);
|
void syncStop(tsync_h shandle);
|
||||||
int syncReconfig(tsync_h shandle, const SSyncCfg *);
|
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *);
|
||||||
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype);
|
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype);
|
||||||
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
|
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
|
||||||
void syncRecover(tsync_h shandle); // recover from other nodes:
|
void syncRecover(tsync_h shandle); // recover from other nodes:
|
||||||
int syncGetNodesRole(tsync_h shandle, SNodesRole *);
|
int syncGetNodesRole(tsync_h shandle, SNodesRole *);
|
||||||
|
|
|
@ -362,6 +362,26 @@ int main(int argc, char *argv[]) {
|
||||||
|
|
||||||
time_t tTime = time(NULL);
|
time_t tTime = time(NULL);
|
||||||
struct tm tm = *localtime(&tTime);
|
struct tm tm = *localtime(&tTime);
|
||||||
|
printf("###################################################################\n");
|
||||||
|
printf("# Server IP: %s:%hu\n", ip_addr == NULL ? "localhost" : ip_addr, port);
|
||||||
|
printf("# User: %s\n", user);
|
||||||
|
printf("# Password: %s\n", pass);
|
||||||
|
printf("# Use metric: %s\n", use_metric ? "true" : "false");
|
||||||
|
printf("# Datatype of Columns: %s\n", dataString);
|
||||||
|
printf("# Binary Length(If applicable): %d\n",
|
||||||
|
(strcasestr(dataString, "BINARY") != NULL) ? len_of_binary : -1);
|
||||||
|
printf("# Number of Columns per record: %d\n", ncols_per_record);
|
||||||
|
printf("# Number of Connections: %d\n", nconnections);
|
||||||
|
printf("# Number of Tables: %d\n", ntables);
|
||||||
|
printf("# Number of Data per Table: %d\n", nrecords_per_table);
|
||||||
|
printf("# Records/Request: %d\n", nrecords_per_request);
|
||||||
|
printf("# Database name: %s\n", db_name);
|
||||||
|
printf("# Table prefix: %s\n", tb_prefix);
|
||||||
|
printf("# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
|
||||||
|
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
|
||||||
|
printf("###################################################################\n\n");
|
||||||
|
printf("Press enter key to continue");
|
||||||
|
getchar();
|
||||||
|
|
||||||
fprintf(fp, "###################################################################\n");
|
fprintf(fp, "###################################################################\n");
|
||||||
fprintf(fp, "# Server IP: %s:%hu\n", ip_addr == NULL ? "localhost" : ip_addr, port);
|
fprintf(fp, "# Server IP: %s:%hu\n", ip_addr == NULL ? "localhost" : ip_addr, port);
|
||||||
|
@ -858,15 +878,16 @@ void generateData(char *res, char **data_type, int num_of_cols, int64_t timestam
|
||||||
pstr += sprintf(pstr, ")");
|
pstr += sprintf(pstr, ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJK1234567890";
|
||||||
void rand_string(char *str, int size) {
|
void rand_string(char *str, int size) {
|
||||||
memset(str, 0, size);
|
str[0] = 0;
|
||||||
const char charset[] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJK1234567890";
|
if (size > 0) {
|
||||||
char *sptr = str;
|
|
||||||
if (size) {
|
|
||||||
--size;
|
--size;
|
||||||
for (size_t n = 0; n < size; n++) {
|
int n;
|
||||||
|
for (n = 0; n < size; n++) {
|
||||||
int key = rand() % (int)(sizeof charset - 1);
|
int key = rand() % (int)(sizeof charset - 1);
|
||||||
sptr += sprintf(sptr, "%c", charset[key]);
|
str[n] = charset[key];
|
||||||
}
|
}
|
||||||
|
str[n] = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -533,7 +533,7 @@ int taosDumpOut(SDumpArguments *arguments) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(result);
|
// taos_free_result(result);
|
||||||
|
|
||||||
if (count == 0) {
|
if (count == 0) {
|
||||||
fprintf(stderr, "No databases valid to dump\n");
|
fprintf(stderr, "No databases valid to dump\n");
|
||||||
|
@ -722,6 +722,57 @@ void taosDumpCreateMTableClause(STableDef *tableDes, char *metric, int numOfCols
|
||||||
count_temp = counter;
|
count_temp = counter;
|
||||||
|
|
||||||
for (; counter < numOfCols; counter++) {
|
for (; counter < numOfCols; counter++) {
|
||||||
|
TAOS_ROW row = NULL;
|
||||||
|
|
||||||
|
sprintf(command, "select %s from %s limit 1", tableDes->cols[counter].field, tableDes->name);
|
||||||
|
if (taos_query(taos, command) != 0) {
|
||||||
|
fprintf(stderr, "failed to run command %s\n", command);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
result = taos_use_result(taos);
|
||||||
|
if (result == NULL) {
|
||||||
|
fprintf(stderr, "failed to use result\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||||
|
|
||||||
|
row = taos_fetch_row(result);
|
||||||
|
switch (fields[0].type) {
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
sprintf(tableDes->cols[counter].note, "%d", ((((int)(*((char *)row[0]))) == 1) ? 1 : 0));
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
sprintf(tableDes->cols[counter].note, "%d", (int)(*((char *)row[0])));
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
sprintf(tableDes->cols[counter].note, "%d", (int)(*((short *)row[0])));
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
sprintf(tableDes->cols[counter].note, "%d", *((int *)row[0]));
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
sprintf(tableDes->cols[counter].note, "%" PRId64 "", *((int64_t *)row[0]));
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
|
sprintf(tableDes->cols[counter].note, "%f", GET_FLOAT_VAL(row[0]));
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
|
sprintf(tableDes->cols[counter].note, "%f", GET_DOUBLE_VAL(row[0]));
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
|
sprintf(tableDes->cols[counter].note, "%" PRId64 "", *(int64_t *)row[0]);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
default:
|
||||||
|
strncpy(tableDes->cols[counter].note, (char *)row[0], fields[0].bytes);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(result);
|
||||||
|
|
||||||
if (counter != count_temp) {
|
if (counter != count_temp) {
|
||||||
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
|
if (strcasecmp(tableDes->cols[counter].type, "binary") == 0 ||
|
||||||
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
|
strcasecmp(tableDes->cols[counter].type, "nchar") == 0) {
|
||||||
|
|
|
@ -63,7 +63,6 @@ typedef struct SMnodeObj {
|
||||||
int8_t updateEnd[1];
|
int8_t updateEnd[1];
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
int8_t role;
|
int8_t role;
|
||||||
SDnodeObj *pDnode;
|
|
||||||
} SMnodeObj;
|
} SMnodeObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -65,7 +65,6 @@ static int32_t mgmtMnodeActionInsert(SSdbOper *pOper) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
|
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
|
||||||
if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
|
if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
|
||||||
|
|
||||||
pMnode->pDnode = pDnode;
|
|
||||||
pDnode->isMgmt = true;
|
pDnode->isMgmt = true;
|
||||||
mgmtDecDnodeRef(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
|
||||||
|
@ -210,6 +209,9 @@ void mgmtUpdateMnodeIpSet() {
|
||||||
|
|
||||||
mgmtMnodeWrLock();
|
mgmtMnodeWrLock();
|
||||||
|
|
||||||
|
memset(ipSet, 0, sizeof(tsMnodeRpcIpSet));
|
||||||
|
memset(mnodes, 0, sizeof(SDMMnodeInfos));
|
||||||
|
|
||||||
int32_t index = 0;
|
int32_t index = 0;
|
||||||
void * pIter = NULL;
|
void * pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -217,22 +219,27 @@ void mgmtUpdateMnodeIpSet() {
|
||||||
pIter = mgmtGetNextMnode(pIter, &pMnode);
|
pIter = mgmtGetNextMnode(pIter, &pMnode);
|
||||||
if (pMnode == NULL) break;
|
if (pMnode == NULL) break;
|
||||||
|
|
||||||
strcpy(ipSet->fqdn[ipSet->numOfIps], pMnode->pDnode->dnodeFqdn);
|
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
|
||||||
ipSet->port[ipSet->numOfIps] = htons(pMnode->pDnode->dnodePort);
|
if (pDnode != NULL) {
|
||||||
|
strcpy(ipSet->fqdn[ipSet->numOfIps], pDnode->dnodeFqdn);
|
||||||
|
ipSet->port[ipSet->numOfIps] = htons(pDnode->dnodePort);
|
||||||
|
|
||||||
mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
|
mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId);
|
||||||
strcpy(mnodes->nodeInfos[index].nodeEp, pMnode->pDnode->dnodeEp);
|
strcpy(mnodes->nodeInfos[index].nodeEp, pDnode->dnodeEp);
|
||||||
|
|
||||||
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
|
if (pMnode->role == TAOS_SYNC_ROLE_MASTER) {
|
||||||
ipSet->inUse = ipSet->numOfIps;
|
ipSet->inUse = ipSet->numOfIps;
|
||||||
mnodes->inUse = index;
|
mnodes->inUse = index;
|
||||||
|
}
|
||||||
|
|
||||||
|
mPrint("mnode:%d, ep:%s %s", index, pDnode->dnodeEp,
|
||||||
|
pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : "");
|
||||||
|
|
||||||
|
ipSet->numOfIps++;
|
||||||
|
index++;
|
||||||
}
|
}
|
||||||
|
|
||||||
mPrint("mnode:%d, ep:%s %s", index, pMnode->pDnode->dnodeEp, pMnode->role == TAOS_SYNC_ROLE_MASTER ? "master" : "");
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
|
||||||
ipSet->numOfIps++;
|
|
||||||
index++;
|
|
||||||
|
|
||||||
mgmtDecMnodeRef(pMnode);
|
mgmtDecMnodeRef(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,7 +389,15 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pMnode->pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE);
|
|
||||||
|
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
|
||||||
|
if (pDnode != NULL) {
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->dnodeEp, pShow->bytes[cols] - VARSTR_HEADER_SIZE);
|
||||||
|
} else {
|
||||||
|
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "invalid ep", pShow->bytes[cols] - VARSTR_HEADER_SIZE);
|
||||||
|
}
|
||||||
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
#include "mgmtDef.h"
|
#include "mgmtDef.h"
|
||||||
#include "mgmtInt.h"
|
#include "mgmtInt.h"
|
||||||
#include "mgmtMnode.h"
|
#include "mgmtMnode.h"
|
||||||
|
#include "mgmtDnode.h"
|
||||||
#include "mgmtSdb.h"
|
#include "mgmtSdb.h"
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -259,10 +260,15 @@ void sdbUpdateSync() {
|
||||||
if (pMnode == NULL) break;
|
if (pMnode == NULL) break;
|
||||||
|
|
||||||
syncCfg.nodeInfo[index].nodeId = pMnode->mnodeId;
|
syncCfg.nodeInfo[index].nodeId = pMnode->mnodeId;
|
||||||
syncCfg.nodeInfo[index].nodePort = pMnode->pDnode->dnodePort + TSDB_PORT_SYNC;
|
|
||||||
strcpy(syncCfg.nodeInfo[index].nodeFqdn, pMnode->pDnode->dnodeEp);
|
|
||||||
index++;
|
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
|
||||||
|
if (pDnode != NULL) {
|
||||||
|
syncCfg.nodeInfo[index].nodePort = pDnode->dnodePort + TSDB_PORT_SYNC;
|
||||||
|
strcpy(syncCfg.nodeInfo[index].nodeFqdn, pDnode->dnodeEp);
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
|
||||||
|
mgmtDecDnodeRef(pDnode);
|
||||||
mgmtDecMnodeRef(pMnode);
|
mgmtDecMnodeRef(pMnode);
|
||||||
}
|
}
|
||||||
sdbFreeIter(pIter);
|
sdbFreeIter(pIter);
|
||||||
|
|
|
@ -1139,7 +1139,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
|
||||||
prefixLen = strlen(prefix);
|
prefixLen = strlen(prefix);
|
||||||
|
|
||||||
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
|
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
|
||||||
char stableName[TSDB_TABLE_NAME_LEN] = {0};
|
char stableName[TSDB_TABLE_NAME_LEN + 1] = {0};
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pIter = mgmtGetNextSuperTable(pShow->pIter, &pTable);
|
pShow->pIter = mgmtGetNextSuperTable(pShow->pIter, &pTable);
|
||||||
|
@ -2024,7 +2024,7 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
|
||||||
SCMMultiTableInfoMsg *pInfo = pMsg->pCont;
|
SCMMultiTableInfoMsg *pInfo = pMsg->pCont;
|
||||||
pInfo->numOfTables = htonl(pInfo->numOfTables);
|
pInfo->numOfTables = htonl(pInfo->numOfTables);
|
||||||
|
|
||||||
int32_t totalMallocLen = 4*1024*1024; // first malloc 4 MB, subsequent reallocation as twice
|
int32_t totalMallocLen = 4 * 1024 * 1024; // first malloc 4 MB, subsequent reallocation as twice
|
||||||
SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
|
SMultiTableMeta *pMultiMeta = rpcMallocCont(totalMallocLen);
|
||||||
if (pMultiMeta == NULL) {
|
if (pMultiMeta == NULL) {
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
|
||||||
|
@ -2034,26 +2034,30 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
|
||||||
pMultiMeta->contLen = sizeof(SMultiTableMeta);
|
pMultiMeta->contLen = sizeof(SMultiTableMeta);
|
||||||
pMultiMeta->numOfTables = 0;
|
pMultiMeta->numOfTables = 0;
|
||||||
|
|
||||||
for (int t = 0; t < pInfo->numOfTables; ++t) {
|
for (int32_t t = 0; t < pInfo->numOfTables; ++t) {
|
||||||
char *tableId = (char*)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN);
|
char * tableId = (char *)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN + 1);
|
||||||
SChildTableObj *pTable = mgmtGetChildTable(tableId);
|
SChildTableObj *pTable = mgmtGetChildTable(tableId);
|
||||||
if (pTable == NULL) continue;
|
if (pTable == NULL) continue;
|
||||||
|
|
||||||
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(tableId);
|
if (pMsg->pDb == NULL) pMsg->pDb = mgmtGetDbByTableId(tableId);
|
||||||
if (pMsg->pDb == NULL) continue;
|
if (pMsg->pDb == NULL) {
|
||||||
|
mgmtDecTableRef(pTable);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int availLen = totalMallocLen - pMultiMeta->contLen;
|
int availLen = totalMallocLen - pMultiMeta->contLen;
|
||||||
if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) {
|
if (availLen <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) {
|
||||||
//TODO realloc
|
totalMallocLen *= 2;
|
||||||
//totalMallocLen *= 2;
|
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen);
|
||||||
//pMultiMeta = rpcReMalloc(pMultiMeta, totalMallocLen);
|
if (pMultiMeta == NULL) {
|
||||||
//if (pMultiMeta == NULL) {
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
|
||||||
/// rpcSendResponse(ahandle, TSDB_CODE_SERV_OUT_OF_MEMORY, NULL, 0);
|
mgmtDecTableRef(pTable);
|
||||||
// return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
return;
|
||||||
//} else {
|
} else {
|
||||||
// t--;
|
t--;
|
||||||
// continue;
|
mgmtDecTableRef(pTable);
|
||||||
//}
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen);
|
STableMetaMsg *pMeta = (STableMetaMsg *)(pMultiMeta->metas + pMultiMeta->contLen);
|
||||||
|
@ -2062,6 +2066,8 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
|
||||||
pMultiMeta->numOfTables ++;
|
pMultiMeta->numOfTables ++;
|
||||||
pMultiMeta->contLen += pMeta->contLen;
|
pMultiMeta->contLen += pMeta->contLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mgmtDecTableRef(pTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {0};
|
SRpcMsg rpcRsp = {0};
|
||||||
|
@ -2148,7 +2154,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char tableName[TSDB_TABLE_NAME_LEN] = {0};
|
char tableName[TSDB_TABLE_NAME_LEN + 1] = {0};
|
||||||
|
|
||||||
// pattern compare for table name
|
// pattern compare for table name
|
||||||
mgmtExtractTableName(pTable->info.tableId, tableName);
|
mgmtExtractTableName(pTable->info.tableId, tableName);
|
||||||
|
|
|
@ -3,3 +3,4 @@ PROJECT(TDengine)
|
||||||
|
|
||||||
ADD_SUBDIRECTORY(monitor)
|
ADD_SUBDIRECTORY(monitor)
|
||||||
ADD_SUBDIRECTORY(http)
|
ADD_SUBDIRECTORY(http)
|
||||||
|
ADD_SUBDIRECTORY(mqtt)
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||||
|
PROJECT(TDengine)
|
||||||
|
|
||||||
|
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/common/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/zlib-1.2.11/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
|
||||||
|
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||||
|
INCLUDE_DIRECTORIES(inc)
|
||||||
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
ADD_LIBRARY(mqtt ${SRC})
|
||||||
|
TARGET_LINK_LIBRARIES(mqtt taos_static z)
|
||||||
|
|
||||||
|
IF (TD_ADMIN)
|
||||||
|
TARGET_LINK_LIBRARIES(mqtt admin)
|
||||||
|
ENDIF ()
|
||||||
|
ENDIF ()
|
|
@ -0,0 +1,42 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_MQTT_LOG_H
|
||||||
|
#define TDENGINE_MQTT_LOG_H
|
||||||
|
|
||||||
|
#include "tlog.h"
|
||||||
|
|
||||||
|
extern int32_t mqttDebugFlag;
|
||||||
|
|
||||||
|
#define mqttError(...) \
|
||||||
|
if (mqttDebugFlag & DEBUG_ERROR) { \
|
||||||
|
taosPrintLog("ERROR MQT ", 255, __VA_ARGS__); \
|
||||||
|
}
|
||||||
|
#define mqttWarn(...) \
|
||||||
|
if ( mqttDebugFlag & DEBUG_WARN) { \
|
||||||
|
taosPrintLog("WARN MQT ", mqttDebugFlag, __VA_ARGS__); \
|
||||||
|
}
|
||||||
|
#define mqttTrace(...) \
|
||||||
|
if ( mqttDebugFlag & DEBUG_TRACE) { \
|
||||||
|
taosPrintLog("MQT ", mqttDebugFlag, __VA_ARGS__); \
|
||||||
|
}
|
||||||
|
#define mqttDump(...) \
|
||||||
|
if ( mqttDebugFlag & DEBUG_TRACE) { \
|
||||||
|
taosPrintLongString("MQT ", mqttDebugFlag, __VA_ARGS__); \
|
||||||
|
}
|
||||||
|
#define mqttPrint(...) \
|
||||||
|
{ taosPrintLog("MQT ", 255, __VA_ARGS__); }
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_MQTT_SYSTEM_H
|
||||||
|
#define TDENGINE_MQTT_SYSTEM_H
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
|
||||||
|
int32_t mqttGetReqCount();
|
||||||
|
int32_t mqttInitSystem();
|
||||||
|
int32_t mqttStartSystem();
|
||||||
|
void mqttStopSystem();
|
||||||
|
void mqttCleanUpSystem();
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,43 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "mqttSystem.h"
|
||||||
|
#include "mqtt.h"
|
||||||
|
#include "mqttLog.h"
|
||||||
|
#include "os.h"
|
||||||
|
#include "taos.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tsocket.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
int32_t mqttGetReqCount() { return 0; }
|
||||||
|
int mqttInitSystem() {
|
||||||
|
mqttPrint("mqttInitSystem");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int mqttStartSystem() {
|
||||||
|
mqttPrint("mqttStartSystem");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void mqttStopSystem() {
|
||||||
|
mqttPrint("mqttStopSystem");
|
||||||
|
}
|
||||||
|
|
||||||
|
void mqttCleanUpSystem() {
|
||||||
|
mqttPrint("mqttCleanUpSystem");
|
||||||
|
}
|
|
@ -867,9 +867,8 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
// underlying UDP layer does not know it is server or client
|
// underlying UDP layer does not know it is server or client
|
||||||
pRecv->connType = pRecv->connType | pRpc->connType;
|
pRecv->connType = pRecv->connType | pRpc->connType;
|
||||||
|
|
||||||
if (pRecv->ip==0 && pConn) {
|
if (pRecv->ip == 0 && pConn) {
|
||||||
rpcProcessBrokenLink(pConn);
|
rpcProcessBrokenLink(pConn);
|
||||||
rpcFreeMsg(pRecv->msg);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -215,7 +215,6 @@ static void* taosAcceptTcpConnection(void *arg) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
tTrace("%s TCP connection from ip:%s:%hu", pServerObj->label, inet_ntoa(caddr.sin_addr), caddr.sin_port);
|
|
||||||
taosKeepTcpAlive(connFd);
|
taosKeepTcpAlive(connFd);
|
||||||
|
|
||||||
// pick up the thread to handle this connection
|
// pick up the thread to handle this connection
|
||||||
|
@ -229,7 +228,8 @@ static void* taosAcceptTcpConnection(void *arg) {
|
||||||
inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
|
inet_ntoa(caddr.sin_addr), pFdObj->port, pFdObj, pThreadObj->numOfFds);
|
||||||
} else {
|
} else {
|
||||||
close(connFd);
|
close(connFd);
|
||||||
tError("%s failed to malloc FdObj(%s)", pServerObj->label, strerror(errno));
|
tError("%s failed to malloc FdObj(%s) for connection from:%s:%hu", pServerObj->label, strerror(errno),
|
||||||
|
inet_ntoa(caddr.sin_addr), caddr.sin_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
// pick up next thread for next connection
|
// pick up next thread for next connection
|
||||||
|
@ -341,7 +341,9 @@ static void taosReportBrokenLink(SFdObj *pFdObj) {
|
||||||
recvInfo.chandle = NULL;
|
recvInfo.chandle = NULL;
|
||||||
recvInfo.connType = RPC_CONN_TCP;
|
recvInfo.connType = RPC_CONN_TCP;
|
||||||
(*(pThreadObj->processData))(&recvInfo);
|
(*(pThreadObj->processData))(&recvInfo);
|
||||||
}
|
} else {
|
||||||
|
taosFreeFdObj(pFdObj);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#define maxEvents 10
|
#define maxEvents 10
|
||||||
|
@ -352,7 +354,7 @@ static void *taosProcessTcpData(void *param) {
|
||||||
struct epoll_event events[maxEvents];
|
struct epoll_event events[maxEvents];
|
||||||
SRecvInfo recvInfo;
|
SRecvInfo recvInfo;
|
||||||
SRpcHead rpcHead;
|
SRpcHead rpcHead;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
int fdNum = epoll_wait(pThreadObj->pollFd, events, maxEvents, -1);
|
||||||
if (pThreadObj->stop) {
|
if (pThreadObj->stop) {
|
||||||
|
@ -466,7 +468,7 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
|
||||||
|
|
||||||
pFdObj->signature = NULL;
|
pFdObj->signature = NULL;
|
||||||
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_DEL, pFdObj->fd, NULL);
|
||||||
close(pFdObj->fd);
|
taosCloseSocket(pFdObj->fd);
|
||||||
|
|
||||||
pThreadObj->numOfFds--;
|
pThreadObj->numOfFds--;
|
||||||
|
|
||||||
|
|
|
@ -127,6 +127,8 @@ int main(int argc, char *argv[]) {
|
||||||
SRpcInit rpcInit;
|
SRpcInit rpcInit;
|
||||||
char dataName[20] = "server.data";
|
char dataName[20] = "server.data";
|
||||||
|
|
||||||
|
taosBlockSIGPIPE();
|
||||||
|
|
||||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||||
rpcInit.localPort = 7000;
|
rpcInit.localPort = 7000;
|
||||||
rpcInit.label = "SER";
|
rpcInit.label = "SER";
|
||||||
|
|
|
@ -31,7 +31,6 @@ int taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
|
||||||
int taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
|
int taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
|
||||||
int taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
|
int taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
|
||||||
int taosKeepTcpAlive(int sockFd);
|
int taosKeepTcpAlive(int sockFd);
|
||||||
void taosCloseTcpSocket(int sockFd);
|
|
||||||
|
|
||||||
int taosGetFqdn(char *);
|
int taosGetFqdn(char *);
|
||||||
uint32_t taosGetIpFromFqdn(const char *);
|
uint32_t taosGetIpFromFqdn(const char *);
|
||||||
|
|
|
@ -65,6 +65,7 @@ taos_queue taosOpenQueue() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCloseQueue(taos_queue param) {
|
void taosCloseQueue(taos_queue param) {
|
||||||
|
if (param == NULL) return;
|
||||||
STaosQueue *queue = (STaosQueue *)param;
|
STaosQueue *queue = (STaosQueue *)param;
|
||||||
STaosQnode *pTemp;
|
STaosQnode *pTemp;
|
||||||
STaosQnode *pNode = queue->head;
|
STaosQnode *pNode = queue->head;
|
||||||
|
@ -224,6 +225,7 @@ taos_qset taosOpenQset() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCloseQset(taos_qset param) {
|
void taosCloseQset(taos_qset param) {
|
||||||
|
if (param == NULL) return;
|
||||||
STaosQset *qset = (STaosQset *)param;
|
STaosQset *qset = (STaosQset *)param;
|
||||||
pthread_mutex_destroy(&qset->mutex);
|
pthread_mutex_destroy(&qset->mutex);
|
||||||
tsem_destroy(&qset->sem);
|
tsem_destroy(&qset->sem);
|
||||||
|
|
|
@ -305,20 +305,11 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
|
||||||
sockFd = -1;
|
sockFd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// taosKeepTcpAlive(sockFd);
|
||||||
|
|
||||||
return sockFd;
|
return sockFd;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCloseTcpSocket(int sockFd) {
|
|
||||||
struct linger linger;
|
|
||||||
linger.l_onoff = 1;
|
|
||||||
linger.l_linger = 0;
|
|
||||||
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
|
|
||||||
uError("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
|
|
||||||
}
|
|
||||||
|
|
||||||
taosCloseSocket(sockFd);
|
|
||||||
}
|
|
||||||
|
|
||||||
int taosKeepTcpAlive(int sockFd) {
|
int taosKeepTcpAlive(int sockFd) {
|
||||||
int alive = 1;
|
int alive = 1;
|
||||||
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
|
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
|
||||||
|
@ -355,6 +346,15 @@ int taosKeepTcpAlive(int sockFd) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct linger linger = {0};
|
||||||
|
linger.l_onoff = 1;
|
||||||
|
//linger.l_linger = 0;
|
||||||
|
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
|
||||||
|
uError("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
|
||||||
|
close(sockFd);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -405,19 +405,19 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) {
|
||||||
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);
|
||||||
return reusable;
|
return reusable;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state != TIMER_STATE_EXPIRED) {
|
if (state != TIMER_STATE_EXPIRED) {
|
||||||
// timer already stopped or cancelled, has nothing to do in this case
|
// timer already stopped or cancelled, has nothing to do in this case
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timer->executedBy == taosGetPthreadId()) {
|
if (timer->executedBy == taosGetPthreadId()) {
|
||||||
// taosTmrReset is called in the timer callback, should do nothing in this
|
// taosTmrReset is called in the timer callback, should do nothing in this
|
||||||
// case to avoid dead lock. note taosTmrReset must be the last statement
|
// case to avoid dead lock. note taosTmrReset must be the last statement
|
||||||
// of the callback funtion, will be a bug otherwise.
|
// of the callback funtion, will be a bug otherwise.
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// timer callback is executing in another thread, we SHOULD wait it to stop,
|
// timer callback is executing in another thread, we SHOULD wait it to stop,
|
||||||
// BUT this may result in dead lock if current thread are holding a lock which
|
// BUT this may result in dead lock if current thread are holding a lock which
|
||||||
// the timer callback need to acquire. so, we HAVE TO return directly.
|
// the timer callback need to acquire. so, we HAVE TO return directly.
|
||||||
|
@ -501,6 +501,7 @@ static void taosTmrModuleInit(void) {
|
||||||
tmr_ctrl_t* ctrl = tmrCtrls + i;
|
tmr_ctrl_t* ctrl = tmrCtrls + i;
|
||||||
ctrl->next = ctrl + 1;
|
ctrl->next = ctrl + 1;
|
||||||
}
|
}
|
||||||
|
(tmrCtrls + taosMaxTmrCtrl - 1)->next = NULL;
|
||||||
unusedTmrCtrl = tmrCtrls;
|
unusedTmrCtrl = tmrCtrls;
|
||||||
|
|
||||||
pthread_mutex_init(&tmrCtrlMutex, NULL);
|
pthread_mutex_init(&tmrCtrlMutex, NULL);
|
||||||
|
@ -574,12 +575,12 @@ void taosTmrCleanUp(void* handle) {
|
||||||
|
|
||||||
if (numOfTmrCtrl <=0) {
|
if (numOfTmrCtrl <=0) {
|
||||||
taosUninitTimer();
|
taosUninitTimer();
|
||||||
|
|
||||||
taosCleanUpScheduler(tmrQhandle);
|
taosCleanUpScheduler(tmrQhandle);
|
||||||
|
|
||||||
for (int i = 0; i < tListLen(wheels); i++) {
|
for (int i = 0; i < tListLen(wheels); i++) {
|
||||||
time_wheel_t* wheel = wheels + i;
|
time_wheel_t* wheel = wheels + i;
|
||||||
pthread_mutex_destroy(&wheel->mutex);
|
pthread_mutex_destroy(&wheel->mutex);
|
||||||
free(wheel->slots);
|
free(wheel->slots);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
|
||||||
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
|
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
|
||||||
static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
|
static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
|
||||||
static bool vnodeReadVersion(SVnodeObj *pVnode);
|
static int32_t vnodeReadVersion(SVnodeObj *pVnode);
|
||||||
static int vnodeProcessTsdbStatus(void *arg, int status);
|
static int vnodeProcessTsdbStatus(void *arg, int status);
|
||||||
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
|
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
|
||||||
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
|
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
|
||||||
|
@ -46,9 +46,9 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
#ifndef _SYNC
|
#ifndef _SYNC
|
||||||
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
|
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
|
||||||
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
|
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
|
||||||
void syncStop(tsync_h shandle) {}
|
void syncStop(tsync_h shandle) {}
|
||||||
int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
|
int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
|
||||||
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
|
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
|
||||||
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
|
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
|
||||||
#endif
|
#endif
|
||||||
|
@ -148,35 +148,20 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
pVnode->status = TAOS_VN_STATUS_UPDATING;
|
pVnode->status = TAOS_VN_STATUS_UPDATING;
|
||||||
|
|
||||||
int32_t code = vnodeSaveCfg(pVnodeCfg);
|
int32_t code = vnodeSaveCfg(pVnodeCfg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
vError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = vnodeReadCfg(pVnode);
|
code = vnodeReadCfg(pVnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
vError("vgId:%d, failed to read cfg file", pVnode->vgId);
|
|
||||||
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
|
code = syncReconfig(pVnode->sync, &pVnode->syncCfg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
vTrace("vgId:%d, failed to alter vnode, canot reconfig sync, result:%s", pVnode->vgId,
|
|
||||||
tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
|
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
vTrace("vgId:%d, failed to alter vnode, canot reconfig tsdb, result:%s", pVnode->vgId,
|
|
||||||
tstrerror(code));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
|
|
||||||
vTrace("vgId:%d, vnode is altered", pVnode->vgId);
|
vTrace("vgId:%d, vnode is altered", pVnode->vgId);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,26 +170,40 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pthread_once(&vnodeModuleInit, vnodeInit);
|
pthread_once(&vnodeModuleInit, vnodeInit);
|
||||||
|
|
||||||
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
|
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vError("vgId:%d, failed to open vnode since no enough memory", vnode);
|
||||||
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&tsOpennedVnodes, 1);
|
||||||
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
|
|
||||||
pVnode->vgId = vnode;
|
pVnode->vgId = vnode;
|
||||||
pVnode->status = TAOS_VN_STATUS_INIT;
|
pVnode->status = TAOS_VN_STATUS_INIT;
|
||||||
pVnode->refCount = 1;
|
|
||||||
pVnode->version = 0;
|
pVnode->version = 0;
|
||||||
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
||||||
pVnode->rootDir = strdup(rootDir);
|
pVnode->rootDir = strdup(rootDir);
|
||||||
taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
|
|
||||||
|
|
||||||
int32_t code = vnodeReadCfg(pVnode);
|
int32_t code = vnodeReadCfg(pVnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
vError("vgId:%d, failed to read cfg file", pVnode->vgId);
|
vnodeCleanUp(pVnode);
|
||||||
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = vnodeReadVersion(pVnode);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
vnodeCleanUp(pVnode);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodeReadVersion(pVnode);
|
|
||||||
pVnode->fversion = pVnode->version;
|
pVnode->fversion = pVnode->version;
|
||||||
|
|
||||||
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
||||||
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
||||||
|
if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) {
|
||||||
|
vnodeCleanUp(pVnode);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
SCqCfg cqCfg = {0};
|
SCqCfg cqCfg = {0};
|
||||||
sprintf(cqCfg.user, "root");
|
sprintf(cqCfg.user, "root");
|
||||||
|
@ -212,22 +211,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
cqCfg.vgId = vnode;
|
cqCfg.vgId = vnode;
|
||||||
cqCfg.cqWrite = vnodeWriteToQueue;
|
cqCfg.cqWrite = vnodeWriteToQueue;
|
||||||
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
||||||
|
if (pVnode->cq == NULL) {
|
||||||
|
vnodeCleanUp(pVnode);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
STsdbAppH appH = {0};
|
STsdbAppH appH = {0};
|
||||||
appH.appH = (void *)pVnode;
|
appH.appH = (void *)pVnode;
|
||||||
appH.notifyStatus = vnodeProcessTsdbStatus;
|
appH.notifyStatus = vnodeProcessTsdbStatus;
|
||||||
appH.cqH = pVnode->cq;
|
appH.cqH = pVnode->cq;
|
||||||
|
|
||||||
sprintf(temp, "%s/tsdb", rootDir);
|
sprintf(temp, "%s/tsdb", rootDir);
|
||||||
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
||||||
if (pVnode->tsdb == NULL) {
|
if (pVnode->tsdb == NULL) {
|
||||||
vError("vgId:%d, failed to open tsdb at %s(%s)", pVnode->vgId, temp, tstrerror(terrno));
|
vnodeCleanUp(pVnode);
|
||||||
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
|
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
sprintf(temp, "%s/wal", rootDir);
|
sprintf(temp, "%s/wal", rootDir);
|
||||||
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
||||||
|
if (pVnode->wal == NULL) {
|
||||||
|
vnodeCleanUp(pVnode);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
||||||
|
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo;
|
||||||
|
@ -246,6 +252,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
|
|
||||||
#ifndef _SYNC
|
#ifndef _SYNC
|
||||||
pVnode->role = TAOS_SYNC_ROLE_MASTER;
|
pVnode->role = TAOS_SYNC_ROLE_MASTER;
|
||||||
|
#else
|
||||||
|
if (pVnode->sync == NULL) {
|
||||||
|
vnodeCleanUp(pVnode);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// start continuous query
|
// start continuous query
|
||||||
|
@ -253,11 +264,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
cqStart(pVnode->cq);
|
cqStart(pVnode->cq);
|
||||||
|
|
||||||
pVnode->events = NULL;
|
pVnode->events = NULL;
|
||||||
|
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
vTrace("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
|
vTrace("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
|
||||||
|
|
||||||
atomic_add_fetch_32(&tsOpennedVnodes, 1);
|
taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,13 +297,6 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pVnode->rootDir);
|
tfree(pVnode->rootDir);
|
||||||
// remove read queue
|
|
||||||
dnodeFreeRqueue(pVnode->rqueue);
|
|
||||||
pVnode->rqueue = NULL;
|
|
||||||
|
|
||||||
// remove write queue
|
|
||||||
dnodeFreeWqueue(pVnode->wqueue);
|
|
||||||
pVnode->wqueue = NULL;
|
|
||||||
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_DELETING) {
|
if (pVnode->status == TAOS_VN_STATUS_DELETING) {
|
||||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
@ -387,15 +391,26 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
pVnode->sync = NULL;
|
pVnode->sync = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
cqClose(pVnode->cq);
|
if (pVnode->wal)
|
||||||
pVnode->cq = NULL;
|
walClose(pVnode->wal);
|
||||||
|
|
||||||
tsdbCloseRepo(pVnode->tsdb, 1);
|
|
||||||
pVnode->tsdb = NULL;
|
|
||||||
|
|
||||||
walClose(pVnode->wal);
|
|
||||||
pVnode->wal = NULL;
|
pVnode->wal = NULL;
|
||||||
|
|
||||||
|
if (pVnode->tsdb)
|
||||||
|
tsdbCloseRepo(pVnode->tsdb, 1);
|
||||||
|
pVnode->tsdb = NULL;
|
||||||
|
|
||||||
|
if (pVnode->cq)
|
||||||
|
cqClose(pVnode->cq);
|
||||||
|
pVnode->cq = NULL;
|
||||||
|
|
||||||
|
if (pVnode->wqueue)
|
||||||
|
dnodeFreeWqueue(pVnode->wqueue);
|
||||||
|
pVnode->wqueue = NULL;
|
||||||
|
|
||||||
|
if (pVnode->rqueue)
|
||||||
|
dnodeFreeRqueue(pVnode->rqueue);
|
||||||
|
pVnode->rqueue = NULL;
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -462,7 +477,8 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
if (!fp) {
|
if (!fp) {
|
||||||
vError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile,
|
vError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
return errno;
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -512,27 +528,30 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||||
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
|
cJSON *root = NULL;
|
||||||
|
char *content = NULL;
|
||||||
|
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
|
||||||
|
int maxLen = 1000;
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_OTHERS;
|
||||||
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
|
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
|
||||||
FILE *fp = fopen(cfgFile, "r");
|
FILE *fp = fopen(cfgFile, "r");
|
||||||
if (!fp) {
|
if (!fp) {
|
||||||
vError("vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode->vgId,
|
vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId,
|
||||||
cfgFile, strerror(errno));
|
cfgFile, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto PARSE_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
content = calloc(1, maxLen + 1);
|
||||||
|
if (content == NULL) goto PARSE_OVER;
|
||||||
|
int len = fread(content, 1, maxLen, fp);
|
||||||
|
if (len <= 0) {
|
||||||
|
vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
|
||||||
return errno;
|
return errno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ret = TSDB_CODE_OTHERS;
|
root = cJSON_Parse(content);
|
||||||
int maxLen = 1000;
|
|
||||||
char *content = calloc(1, maxLen + 1);
|
|
||||||
int len = fread(content, 1, maxLen, fp);
|
|
||||||
if (len <= 0) {
|
|
||||||
free(content);
|
|
||||||
fclose(fp);
|
|
||||||
vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON *root = cJSON_Parse(content);
|
|
||||||
if (root == NULL) {
|
if (root == NULL) {
|
||||||
vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId);
|
vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
|
@ -691,19 +710,19 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||||
pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
|
pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = 0;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
vPrint("vgId:%d, read vnode cfg successed, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
|
vPrint("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
|
||||||
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
|
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
|
||||||
vPrint("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
|
vPrint("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
|
||||||
pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
|
pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
|
||||||
}
|
}
|
||||||
|
|
||||||
PARSE_OVER:
|
PARSE_OVER:
|
||||||
free(content);
|
tfree(content);
|
||||||
cJSON_Delete(root);
|
cJSON_Delete(root);
|
||||||
fclose(fp);
|
if (fp) fclose(fp);
|
||||||
return ret;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
||||||
|
@ -713,7 +732,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
||||||
if (!fp) {
|
if (!fp) {
|
||||||
vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId,
|
vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId,
|
||||||
versionFile, strerror(errno));
|
versionFile, strerror(errno));
|
||||||
return errno;
|
return TAOS_SYSTEM_ERROR(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
@ -733,29 +752,33 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool vnodeReadVersion(SVnodeObj *pVnode) {
|
static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
|
||||||
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
|
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
|
||||||
|
char *content = NULL;
|
||||||
|
cJSON *root = NULL;
|
||||||
|
int maxLen = 100;
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_OTHERS;
|
||||||
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
|
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
|
||||||
FILE *fp = fopen(versionFile, "r");
|
FILE *fp = fopen(versionFile, "r");
|
||||||
if (!fp) {
|
if (!fp) {
|
||||||
if (errno != ENOENT) {
|
if (errno != ENOENT) {
|
||||||
vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno));
|
vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
return false;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ret = false;
|
content = calloc(1, maxLen + 1);
|
||||||
int maxLen = 100;
|
|
||||||
char *content = calloc(1, maxLen + 1);
|
|
||||||
int len = fread(content, 1, maxLen, fp);
|
int len = fread(content, 1, maxLen, fp);
|
||||||
if (len <= 0) {
|
if (len <= 0) {
|
||||||
free(content);
|
vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
|
||||||
fclose(fp);
|
goto PARSE_OVER;
|
||||||
vPrint("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *root = cJSON_Parse(content);
|
root = cJSON_Parse(content);
|
||||||
if (root == NULL) {
|
if (root == NULL) {
|
||||||
vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId);
|
vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
|
@ -768,13 +791,12 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) {
|
||||||
}
|
}
|
||||||
pVnode->version = version->valueint;
|
pVnode->version = version->valueint;
|
||||||
|
|
||||||
ret = true;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
vPrint("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version);
|
||||||
vPrint("vgId:%d, read vnode version succeed, version:%" PRId64, pVnode->vgId, pVnode->version);
|
|
||||||
|
|
||||||
PARSE_OVER:
|
PARSE_OVER:
|
||||||
free(content);
|
tfree(content);
|
||||||
cJSON_Delete(root);
|
cJSON_Delete(root);
|
||||||
fclose(fp);
|
if(fp) fclose(fp);
|
||||||
return ret;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
|
|
||||||
|
@ -56,7 +57,10 @@ static int walRemoveWalFiles(const char *path);
|
||||||
|
|
||||||
void *walOpen(const char *path, const SWalCfg *pCfg) {
|
void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
SWal *pWal = calloc(sizeof(SWal), 1);
|
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||||
if (pWal == NULL) return NULL;
|
if (pWal == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pWal->fd = -1;
|
pWal->fd = -1;
|
||||||
pWal->max = pCfg->wals;
|
pWal->max = pCfg->wals;
|
||||||
|
@ -75,6 +79,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
walRenew(pWal);
|
walRenew(pWal);
|
||||||
|
|
||||||
if (pWal->fd <0) {
|
if (pWal->fd <0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("wal:%s, failed to open", path);
|
wError("wal:%s, failed to open", path);
|
||||||
pthread_mutex_destroy(&pWal->mutex);
|
pthread_mutex_destroy(&pWal->mutex);
|
||||||
free(pWal);
|
free(pWal);
|
||||||
|
@ -112,9 +117,10 @@ void walClose(void *handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int walRenew(void *handle) {
|
int walRenew(void *handle) {
|
||||||
|
if (handle == NULL) return 0;
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
|
||||||
if (pWal->fd >=0) {
|
if (pWal->fd >=0) {
|
||||||
|
@ -156,6 +162,7 @@ int walRenew(void *handle) {
|
||||||
int walWrite(void *handle, SWalHead *pHead) {
|
int walWrite(void *handle, SWalHead *pHead) {
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
if (pWal == NULL) return -1;
|
||||||
|
|
||||||
// no wal
|
// no wal
|
||||||
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
if (pWal->level == TAOS_WAL_NOLOG) return 0;
|
||||||
|
@ -178,6 +185,7 @@ int walWrite(void *handle, SWalHead *pHead) {
|
||||||
void walFsync(void *handle) {
|
void walFsync(void *handle) {
|
||||||
|
|
||||||
SWal *pWal = handle;
|
SWal *pWal = handle;
|
||||||
|
if (pWal == NULL) return;
|
||||||
|
|
||||||
if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) {
|
if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) {
|
||||||
if (fsync(pWal->fd) < 0) {
|
if (fsync(pWal->fd) < 0) {
|
||||||
|
|
|
@ -13,6 +13,7 @@ python3 ./test.py -f insert/nchar.py
|
||||||
python3 ./test.py -f insert/nchar-boundary.py
|
python3 ./test.py -f insert/nchar-boundary.py
|
||||||
python3 ./test.py -f insert/nchar-unicode.py
|
python3 ./test.py -f insert/nchar-unicode.py
|
||||||
python3 ./test.py -f insert/multi.py
|
python3 ./test.py -f insert/multi.py
|
||||||
|
python3 ./test.py -f insert/randomNullCommit.py
|
||||||
|
|
||||||
python3 ./test.py -f table/column_name.py
|
python3 ./test.py -f table/column_name.py
|
||||||
python3 ./test.py -f table/column_num.py
|
python3 ./test.py -f table/column_num.py
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import random
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.dnodes import *
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
|
||||||
|
tdLog.info("=============== step1")
|
||||||
|
tdSql.execute('create table tb (ts timestamp, speed int, temp float, note binary(5), flag bool)')
|
||||||
|
|
||||||
|
numOfRecords = 0
|
||||||
|
randomList = [10, 50, 100, 500, 1000, 5000]
|
||||||
|
for i in range(0, 10):
|
||||||
|
num = random.choice(randomList)
|
||||||
|
tdLog.info("will insert %d records" % num)
|
||||||
|
for x in range(0, num):
|
||||||
|
tdLog.info(
|
||||||
|
'insert into tb values (now + %da, NULL, NULL, NULL, TRUE)' % x)
|
||||||
|
tdSql.execute(
|
||||||
|
'insert into tb values (now + %da, NULL, NULL, NULL, TRUE)' % x)
|
||||||
|
|
||||||
|
numOfRecords = numOfRecords + num
|
||||||
|
|
||||||
|
tdSql.query("select * from tb")
|
||||||
|
tdSql.checkRows(numOfRecords)
|
||||||
|
tdSql.checkData(numOfRecords-num, 1, None)
|
||||||
|
tdSql.checkData(numOfRecords-1, 2, None)
|
||||||
|
|
||||||
|
tdLog.info("stop dnode to commit data to disk")
|
||||||
|
tdDnodes.stop(1)
|
||||||
|
tdDnodes.start(1)
|
||||||
|
tdLog.sleep(5)
|
||||||
|
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -10,7 +10,7 @@ from util.sql import *
|
||||||
|
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
def init(self, conn):
|
def init(self, conn, logSql):
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
tdSql.init(conn.cursor(), logSql)
|
tdSql.init(conn.cursor(), logSql)
|
||||||
|
|
||||||
|
@ -95,17 +95,42 @@ class TDTestCase:
|
||||||
maxTableNameLen = self.getLimitFromSourceCode('TSDB_TABLE_NAME_LEN')
|
maxTableNameLen = self.getLimitFromSourceCode('TSDB_TABLE_NAME_LEN')
|
||||||
tdLog.notice("table name max length is %d" % maxTableNameLen)
|
tdLog.notice("table name max length is %d" % maxTableNameLen)
|
||||||
|
|
||||||
name = self.generateString(maxTableNameLen - 1)
|
# create a super table with name exceed max length
|
||||||
tdLog.info("table name is '%s'" % name)
|
sname = self.generateString(maxTableNameLen + 1)
|
||||||
|
tdLog.info("create a super table with length %d" % len(sname))
|
||||||
|
tdSql.error("create table %s (ts timestamp, value int) tags(id int)" % sname)
|
||||||
|
|
||||||
tdSql.execute("create table %s (ts timestamp, value int)" % name)
|
# create a super table with name of max length
|
||||||
tdSql.execute("insert into %s values(now, 0)" % name)
|
sname = self.generateString(maxTableNameLen)
|
||||||
|
tdLog.info("create a super table with length %d" % len(sname))
|
||||||
|
tdSql.execute("create table %s (ts timestamp, value int) tags(id int)" % sname)
|
||||||
|
tdLog.info("check table count, should be one")
|
||||||
|
tdSql.query('show stables')
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
|
||||||
|
# create a child table with name exceed max length
|
||||||
|
name = self.generateString(maxTableNameLen + 1)
|
||||||
|
tdLog.info("create a child table with length %d" % len(name))
|
||||||
|
tdSql.error("create table %s using %s tags(0)" % (name, sname))
|
||||||
|
|
||||||
|
# create a child table with name of max length
|
||||||
|
name = self.generateString(maxTableNameLen)
|
||||||
|
tdLog.info("create a child table with length %d" % len(name))
|
||||||
|
tdSql.execute("create table %s using %s tags(0)" % (name, sname))
|
||||||
tdSql.query('show tables')
|
tdSql.query('show tables')
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
|
||||||
tdSql.query('select * from %s' % name)
|
# insert one row
|
||||||
|
tdLog.info("insert one row of data")
|
||||||
|
tdSql.execute("insert into %s values(now, 0)" % name)
|
||||||
|
tdSql.query("select * from " + name)
|
||||||
tdSql.checkRows(1)
|
tdSql.checkRows(1)
|
||||||
|
tdSql.query("select * from " + sname)
|
||||||
|
tdSql.checkRows(1)
|
||||||
|
|
||||||
|
name = name[:len(name) - 1]
|
||||||
|
tdSql.error("select * from " + name)
|
||||||
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
def checkRowBoundaries(self):
|
def checkRowBoundaries(self):
|
||||||
tdLog.debug("checking row boundaries")
|
tdLog.debug("checking row boundaries")
|
||||||
|
|
|
@ -58,6 +58,9 @@ class TDSql:
|
||||||
"%s failed: sql:%s, expect error not occured" %
|
"%s failed: sql:%s, expect error not occured" %
|
||||||
(callerFilename, sql))
|
(callerFilename, sql))
|
||||||
else:
|
else:
|
||||||
|
self.queryRows = 0
|
||||||
|
self.queryCols = 0
|
||||||
|
self.queryResult = None
|
||||||
tdLog.info("sql:%s, expect error occured" % (sql))
|
tdLog.info("sql:%s, expect error occured" % (sql))
|
||||||
|
|
||||||
def query(self, sql):
|
def query(self, sql):
|
||||||
|
|
|
@ -3,15 +3,64 @@ cd ../../debug; make
|
||||||
cd ../../../debug; cmake ..
|
cd ../../../debug; cmake ..
|
||||||
cd ../../../debug; make
|
cd ../../../debug; make
|
||||||
|
|
||||||
|
./test.sh -u -f unique/account/account_create.sim
|
||||||
|
./test.sh -u -f unique/account/account_delete.sim
|
||||||
|
./test.sh -u -f unique/account/account_len.sim
|
||||||
|
./test.sh -u -f unique/account/authority.sim
|
||||||
|
./test.sh -u -f unique/account/basic.sim
|
||||||
|
./test.sh -u -f unique/account/paras.sim
|
||||||
|
./test.sh -u -f unique/account/pass_alter.sim
|
||||||
|
./test.sh -u -f unique/account/pass_len.sim
|
||||||
|
./test.sh -u -f unique/account/usage.sim
|
||||||
|
./test.sh -u -f unique/account/user_create.sim
|
||||||
|
./test.sh -u -f unique/account/user_len.sim
|
||||||
|
|
||||||
|
./test.sh -u -f unique/big/balance.sim
|
||||||
|
./test.sh -u -f unique/big/maxvnodes.sim
|
||||||
|
./test.sh -u -f unique/big/tcp.sim
|
||||||
|
|
||||||
./test.sh -u -f unique/cluster/balance1.sim
|
./test.sh -u -f unique/cluster/balance1.sim
|
||||||
./test.sh -u -f unique/cluster/balance2.sim
|
./test.sh -u -f unique/cluster/balance2.sim
|
||||||
./test.sh -u -f unique/cluster/balance3.sim
|
./test.sh -u -f unique/cluster/balance3.sim
|
||||||
./test.sh -u -f unique/cluster/cache.sim
|
./test.sh -u -f unique/cluster/cache.sim
|
||||||
|
|
||||||
|
./test.sh -u -f unique/column/replica3.sim
|
||||||
|
|
||||||
|
./test.sh -u -f unique/db/commit.sim
|
||||||
|
./test.sh -u -f unique/db/delete.sim
|
||||||
|
./test.sh -u -f unique/db/delete_part.sim
|
||||||
|
./test.sh -u -f unique/db/replica_add12.sim
|
||||||
|
./test.sh -u -f unique/db/replica_add13.sim
|
||||||
|
./test.sh -u -f unique/db/replica_add23.sim
|
||||||
|
./test.sh -u -f unique/db/replica_reduce21.sim
|
||||||
|
./test.sh -u -f unique/db/replica_reduce32.sim
|
||||||
|
./test.sh -u -f unique/db/replica_reduce31.sim
|
||||||
|
./test.sh -u -f unique/db/replica_part.sim
|
||||||
|
|
||||||
./test.sh -u -f unique/dnode/balance1.sim
|
./test.sh -u -f unique/dnode/balance1.sim
|
||||||
./test.sh -u -f unique/dnode/balance2.sim
|
./test.sh -u -f unique/dnode/balance2.sim
|
||||||
./test.sh -u -f unique/dnode/balance3.sim
|
./test.sh -u -f unique/dnode/balance3.sim
|
||||||
./test.sh -u -f unique/dnode/balancex.sim
|
./test.sh -u -f unique/dnode/balancex.sim
|
||||||
|
./test.sh -u -f unique/dnode/offline1.sim
|
||||||
|
./test.sh -u -f unique/dnode/offline2.sim
|
||||||
|
./test.sh -u -f unique/dnode/remove1.sim
|
||||||
|
./test.sh -u -f unique/dnode/remove2.sim
|
||||||
|
./test.sh -u -f unique/dnode/vnode_clean.sim
|
||||||
|
|
||||||
|
./test.sh -u -f unique/http/admin.sim
|
||||||
|
./test.sh -u -f unique/http/opentsdb.sim
|
||||||
|
|
||||||
|
./test.sh -u -f unique/import/replica2.sim
|
||||||
|
./test.sh -u -f unique/import/replica3.sim
|
||||||
|
|
||||||
|
./test.sh -u -f unique/stable/balance_replica1.sim
|
||||||
|
./test.sh -u -f unique/stable/dnode2_stop.sim
|
||||||
|
./test.sh -u -f unique/stable/dnode2.sim
|
||||||
|
./test.sh -u -f unique/stable/dnode3.sim
|
||||||
|
./test.sh -u -f unique/stable/replica2_dnode4.sim
|
||||||
|
./test.sh -u -f unique/stable/replica2_vnode3.sim
|
||||||
|
./test.sh -u -f unique/stable/replica3_dnode6.sim
|
||||||
|
./test.sh -u -f unique/stable/replica3_vnode3.sim
|
||||||
|
|
||||||
./test.sh -u -f unique/mnode/mgmt22.sim
|
./test.sh -u -f unique/mnode/mgmt22.sim
|
||||||
./test.sh -u -f unique/mnode/mgmt23.sim
|
./test.sh -u -f unique/mnode/mgmt23.sim
|
||||||
|
@ -21,3 +70,10 @@ cd ../../../debug; make
|
||||||
./test.sh -u -f unique/mnode/mgmt33.sim
|
./test.sh -u -f unique/mnode/mgmt33.sim
|
||||||
./test.sh -u -f unique/mnode/mgmt34.sim
|
./test.sh -u -f unique/mnode/mgmt34.sim
|
||||||
./test.sh -u -f unique/mnode/mgmtr2.sim
|
./test.sh -u -f unique/mnode/mgmtr2.sim
|
||||||
|
|
||||||
|
./test.sh -u -f unique/vnode/many.sim
|
||||||
|
./test.sh -u -f unique/vnode/replica2_basic2.sim
|
||||||
|
./test.sh -u -f unique/vnode/replica2_repeat.sim
|
||||||
|
./test.sh -u -f unique/vnode/replica3_basic.sim
|
||||||
|
./test.sh -u -f unique/vnode/replica3_repeat.sim
|
||||||
|
./test.sh -u -f unique/vnode/replica3_vgroup.sim
|
||||||
|
|
|
@ -23,4 +23,8 @@ system sh/cfg.sh -n dnode4 -c mgmtEqualVnodeNum -v 4
|
||||||
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
|
system sh/cfg.sh -n dnode1 -c numOfTotalVnodes -v 4
|
||||||
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
|
system sh/cfg.sh -n dnode2 -c numOfTotalVnodes -v 4
|
||||||
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
|
system sh/cfg.sh -n dnode3 -c numOfTotalVnodes -v 4
|
||||||
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
|
system sh/cfg.sh -n dnode4 -c numOfTotalVnodes -v 4
|
||||||
|
|
||||||
|
system sh/cfg.sh -n dnode1 -c http -v 1
|
||||||
|
system sh/cfg.sh -n dnode2 -c http -v 1
|
||||||
|
system sh/cfg.sh -n dnode3 -c http -v 1
|
|
@ -40,7 +40,6 @@ fi
|
||||||
totalPySuccess=`grep 'successfully executed' pytest-out.txt | wc -l`
|
totalPySuccess=`grep 'successfully executed' pytest-out.txt | wc -l`
|
||||||
|
|
||||||
if [ "$totalPySuccess" -gt "0" ]; then
|
if [ "$totalPySuccess" -gt "0" ]; then
|
||||||
grep 'successfully executed' pytest-out.txt
|
|
||||||
echo -e "${GREEN} ### Total $totalPySuccess python case(s) succeed! ### ${NC}"
|
echo -e "${GREEN} ### Total $totalPySuccess python case(s) succeed! ### ${NC}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue