From 2b21a75ae04a4c7b591cb12169f1197ddea6c2c0 Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 16 Dec 2019 18:59:18 +0800 Subject: [PATCH 1/7] #928 --- src/kit/shell/inc/shell.h | 4 + src/kit/shell/src/shellEngine.c | 10 +- src/kit/shell/src/shellImport.c | 261 ++++++++++++++++++++++++++++++++ src/kit/shell/src/shellLinux.c | 13 ++ src/kit/shell/src/shellMain.c | 14 +- 5 files changed, 300 insertions(+), 2 deletions(-) create mode 100644 src/kit/shell/src/shellImport.c diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index 12024d0b86..499c93e0ec 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -58,6 +58,8 @@ struct arguments { bool is_raw_time; bool is_use_passwd; char file[TSDB_FILENAME_LEN]; + char dir[TSDB_FILENAME_LEN]; + int threadNum; char* commands; int abort; }; @@ -74,12 +76,14 @@ void shellRunCommandOnServer(TAOS* con, char command[]); void read_history(); void write_history(); void source_file(TAOS* con, char* fptr); +void source_dir(TAOS* con, struct arguments* args); void get_history_path(char* history); void cleanup_handler(void* arg); void exitShell(); int shellDumpResult(TAOS* con, char* fname, int* error_no, bool printMode); void shellPrintNChar(char* str, int width, bool printMode); void shellGetGrantInfo(void *con); +int isCommentLine(char *line); #define max(a, b) ((int)(a) < (int)(b) ? (int)(b) : (int)(a)) /**************** Global variable declarations ****************/ diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 116209b784..c1caf6147d 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -110,6 +110,14 @@ TAOS *shellInit(struct arguments *args) { exit(EXIT_SUCCESS); } +#ifdef LINUX + if (args->dir[0] != 0) { + source_dir(con, args); + taos_close(con); + exit(EXIT_SUCCESS); + } +#endif + printf(SERVER_VERSION, taos_get_server_info(con)); return con; @@ -762,7 +770,7 @@ void taos_error(TAOS *con) { taos_free_result(pRes); } -static int isCommentLine(char *line) { +int isCommentLine(char *line) { if (line == NULL) return 1; return regex_match(line, "^\\s*#.*", REG_EXTENDED); diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c new file mode 100644 index 0000000000..e4baf07edf --- /dev/null +++ b/src/kit/shell/src/shellImport.c @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _XOPEN_SOURCE +#define _DEFAULT_SOURCE + +#include "os.h" +#include "shell.h" +#include "shellCommand.h" +#include "ttime.h" +#include "tutil.h" + +static char **shellSQLFiles = NULL; +static int32_t shellSQLFileNum = 0; +static char shellTablesSQLFile[TSDB_FILENAME_LEN] = {0}; + +typedef struct { + pthread_t threadID; + int threadIndex; + int totalThreads; + void *taos; +} ShellThreadObj; + +static int shellGetFilesNum(const char *directoryName, const char *prefix) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/*.%s | wc -l ", directoryName, prefix); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + int fileNum = 0; + if (fscanf(fp, "%d", &fileNum) != 1) { + fprintf(stderr, "ERROR: failed to execute:%s, parse result error\n", cmd); + exit(0); + } + + if (fileNum <= 0) { + fprintf(stderr, "ERROR: directory:%s is empry\n", directoryName); + exit(0); + } + + pclose(fp); + return fileNum; +} + +static void shellParseDirectory(const char *directoryName, const char *prefix, char **fileArray, int totalFiles) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/*.%s | sort", directoryName, prefix); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + int fileNum = 0; + while (fscanf(fp, "%s", fileArray[fileNum++])) { + if (strcmp(fileArray[fileNum-1], shellTablesSQLFile) == 0) { + fileNum--; + } + if (fileNum >= totalFiles) { + break; + } + } + + if (fileNum != totalFiles) { + fprintf(stderr, "ERROR: directory:%s changed while read\n", directoryName); + exit(0); + } + + pclose(fp); +} + +static void shellCheckTablesSQLFile(const char *directoryName) +{ + char cmd[1024] = { 0 }; + sprintf(cmd, "ls %s/tables.sql", directoryName); + + FILE *fp = popen(cmd, "r"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to execute:%s, error:%s\n", cmd, strerror(errno)); + exit(0); + } + + while (fscanf(fp, "%s", shellTablesSQLFile)) { + break; + } + + pclose(fp); +} + +static void shellMallocSQLFiles() +{ + shellSQLFiles = (char**)calloc(shellSQLFileNum, sizeof(char*)); + for (int i = 0; i < shellSQLFileNum; i++) { + shellSQLFiles[i] = calloc(1, TSDB_FILENAME_LEN); + } +} + +static void shellGetDirectoryFileList(char *inputDir) +{ + struct stat fileStat; + if (stat(inputDir, &fileStat) < 0) { + fprintf(stderr, "ERROR: %s not exist\n", inputDir); + exit(0); + } + + if (fileStat.st_mode & S_IFDIR) { + shellCheckTablesSQLFile(inputDir); + shellSQLFileNum = shellGetFilesNum(inputDir, "sql"); + int totalSQLFileNum = shellSQLFileNum; + if (shellTablesSQLFile[0] != 0) { + shellSQLFileNum--; + } + shellMallocSQLFiles(); + shellParseDirectory(inputDir, "sql", shellSQLFiles, shellSQLFileNum); + fprintf(stdout, "start to dispose %d files in %s\n", totalSQLFileNum, inputDir); + } + else { + fprintf(stderr, "ERROR: %s is not a directory\n", inputDir); + exit(0); + } +} + +static void shellSourceFile(TAOS *con, char *fptr) { + wordexp_t full_path; + int read_len = 0; + char * cmd = malloc(MAX_COMMAND_SIZE); + size_t cmd_len = 0; + char * line = NULL; + size_t line_len = 0; + + if (wordexp(fptr, &full_path, 0) != 0) { + fprintf(stderr, "ERROR: illegal file name\n"); + return; + } + + char *fname = full_path.we_wordv[0]; + + if (access(fname, R_OK) == -1) { + fprintf(stderr, "ERROR: file %s is not readable\n", fptr); + wordfree(&full_path); + return; + } + + FILE *f = fopen(fname, "r"); + if (f == NULL) { + fprintf(stderr, "ERROR: failed to open file %s\n", fname); + wordfree(&full_path); + return; + } + + fprintf(stdout, "start to dispose file:%s\n", fname); + + while ((read_len = getline(&line, &line_len, f)) != -1) { + if (read_len >= MAX_COMMAND_SIZE) continue; + line[--read_len] = '\0'; + + if (read_len == 0 || isCommentLine(line)) { // line starts with # + continue; + } + + if (line[read_len - 1] == '\\') { + line[read_len - 1] = ' '; + memcpy(cmd + cmd_len, line, read_len); + cmd_len += read_len; + continue; + } + + memcpy(cmd + cmd_len, line, read_len); + if (taos_query(con, cmd)) { + taos_error(con); + } + + memset(cmd, 0, MAX_COMMAND_SIZE); + cmd_len = 0; + } + + free(cmd); + if (line) free(line); + wordfree(&full_path); + fclose(f); +} + +void* shellImportThreadFp(void *arg) +{ + ShellThreadObj *pThread = (ShellThreadObj*)arg; + for (int f = 0; f < shellSQLFileNum; ++f) { + if (f % pThread->totalThreads == pThread->threadIndex) { + char *SQLFileName = shellSQLFiles[f]; + shellSourceFile(pThread->taos, SQLFileName); + } + } + + return NULL; +} + +static void shellRunImportThreads(struct arguments* args) +{ + pthread_attr_t thattr; + ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj)); + for (int t = 0; t < args->threadNum; ++t) { + ShellThreadObj *pThread = threadObj + t; + pThread->threadIndex = t; + pThread->totalThreads = args->threadNum; + pThread->taos = taos_connect(args->host, args->user, args->password, args->database, tsMgmtShellPort); + if (pThread->taos == NULL) { + fprintf(stderr, "ERROR: thread:%d failed connect to TDengine, error:%s\n", pThread->threadIndex, taos_errstr(pThread->taos)); + exit(0); + } + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&(pThread->threadID), &thattr, shellImportThreadFp, (void*)pThread) != 0) { + fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex); + exit(0); + } + } + + for (int t = 0; t < args->threadNum; ++t) { + pthread_join(threadObj[t].threadID, NULL); + } + + for (int t = 0; t < args->threadNum; ++t) { + taos_close(threadObj[t].taos); + } + free(threadObj); +} + +void source_dir(TAOS* con, struct arguments* args) { + shellGetDirectoryFileList(args->dir); + int64_t start = taosGetTimestampMs(); + + if (shellTablesSQLFile[0] != 0) { + shellSourceFile(con, shellTablesSQLFile); + int64_t end = taosGetTimestampMs(); + fprintf(stdout, "import %s finished, time spent %.2f seconds\n", shellTablesSQLFile, (end - start) / 1000.0); + } + + shellRunImportThreads(args); + int64_t end = taosGetTimestampMs(); + fprintf(stdout, "import %s finished, time spent %.2f seconds\n", args->dir, (end - start) / 1000.0); +} diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index ad8bf6c5c3..67df2ea161 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -40,6 +40,8 @@ static struct argp_option options[] = { {"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."}, {"raw-time", 'r', 0, 0, "Output time as uint64_t."}, {"file", 'f', "FILE", 0, "Script to run without enter the shell."}, + {"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."}, + {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {0}}; @@ -89,6 +91,17 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { strcpy(arguments->file, full_path.we_wordv[0]); wordfree(&full_path); break; + case 'D': + if (wordexp(arg, &full_path, 0) != 0) { + fprintf(stderr, "Invalid path %s\n", arg); + return -1; + } + strcpy(arguments->dir, full_path.we_wordv[0]); + wordfree(&full_path); + break; + case 'T': + arguments->threadNum = atoi(arg); + break; case 'd': arguments->database = arg; break; diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 82333020f1..a7b7e8383b 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -62,7 +62,19 @@ int checkVersion() { } // Global configurations -struct arguments args = {NULL, NULL, NULL, NULL, NULL, false, false, "\0", NULL}; +struct arguments args = { + .host = NULL, + .password = NULL, + .user = NULL, + .database = NULL, + .timezone = NULL, + .is_raw_time = false, + .is_use_passwd = false, + .file = "\0", + .dir = "\0", + .threadNum = 5, + .commands = NULL +}; /* * Main function. From d62c5c30231d04a736d437cf428af6e12599bd9f Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 16 Dec 2019 19:03:44 +0800 Subject: [PATCH 2/7] #928 --- src/kit/shell/src/shellImport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index e4baf07edf..b96b971ab5 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -131,7 +131,7 @@ static void shellGetDirectoryFileList(char *inputDir) } shellMallocSQLFiles(); shellParseDirectory(inputDir, "sql", shellSQLFiles, shellSQLFileNum); - fprintf(stdout, "start to dispose %d files in %s\n", totalSQLFileNum, inputDir); + fprintf(stdout, "\nstart to dispose %d files in %s\n", totalSQLFileNum, inputDir); } else { fprintf(stderr, "ERROR: %s is not a directory\n", inputDir); From b5421c14218ede1ddb62c5e2124282e51bbfa47b Mon Sep 17 00:00:00 2001 From: slguan Date: Mon, 16 Dec 2019 21:46:38 +0800 Subject: [PATCH 3/7] #928 --- src/util/src/version.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/util/src/version.c b/src/util/src/version.c index 4f88d6170c..c85289fb8a 100644 --- a/src/util/src/version.c +++ b/src/util/src/version.c @@ -1,5 +1,5 @@ -char version[64] = "1.6.4.2"; +char version[64] = "1.6.4.4"; char compatible_version[64] = "1.6.1.0"; -char gitinfo[128] = "b9a62d60dc1d4a41452a9bc94e3a0924485c3a75"; -char gitinfoOfInternal[128] = "e6445addc77e8c96dcb57221fa6ab5dcde0458f7"; -char buildinfo[512] = "Built by root at 2019-12-10 10:31"; +char gitinfo[128] = "d62c5c30231d04a736d437cf428af6e12599bd9f"; +char gitinfoOfInternal[128] = "8094a32d78dc519bd883d01ac2ba6ec49ac57a80"; +char buildinfo[512] = "Built by ubuntu at 2019-12-16 21:40"; From 6ace40b36106a177fdc08b60f55f8937584073b0 Mon Sep 17 00:00:00 2001 From: localvar Date: Wed, 18 Dec 2019 14:54:15 +0800 Subject: [PATCH 4/7] improve error message of multiple file import --- src/kit/shell/src/shellImport.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/kit/shell/src/shellImport.c b/src/kit/shell/src/shellImport.c index b96b971ab5..3292aa8e04 100644 --- a/src/kit/shell/src/shellImport.c +++ b/src/kit/shell/src/shellImport.c @@ -167,9 +167,11 @@ static void shellSourceFile(TAOS *con, char *fptr) { return; } - fprintf(stdout, "start to dispose file:%s\n", fname); + fprintf(stdout, "begin import file:%s\n", fname); + int lineNo = 0; while ((read_len = getline(&line, &line_len, f)) != -1) { + ++lineNo; if (read_len >= MAX_COMMAND_SIZE) continue; line[--read_len] = '\0'; @@ -186,7 +188,10 @@ static void shellSourceFile(TAOS *con, char *fptr) { memcpy(cmd + cmd_len, line, read_len); if (taos_query(con, cmd)) { - taos_error(con); + fprintf(stderr, "DB error: %s: %s (%d)\n", taos_errstr(con), fname, lineNo); + /* free local resouce: allocated memory/metric-meta refcnt */ + TAOS_RES *pRes = taos_use_result(con); + taos_free_result(pRes); } memset(cmd, 0, MAX_COMMAND_SIZE); From 50865d282eeeafc890b308c8553946bf814f9c51 Mon Sep 17 00:00:00 2001 From: haojun Liao Date: Thu, 19 Dec 2019 13:29:00 +0800 Subject: [PATCH 5/7] Update vnodeQueryImpl.c fix a bug referred in issue #946 --- src/system/detail/src/vnodeQueryImpl.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 9620ab38dc..544ebb9854 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1349,11 +1349,12 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * * first filter the data block according to the value filter condition, then, if the top/bottom query applied, * invoke the filter function to decide if the data block need to be accessed or not. + * TODO handle the whole data block is NULL situation * @param pQuery * @param pField * @return */ -static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *pCtx) { +static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx *pCtx, int32_t numOfTotalPoints) { if (pField == NULL) { return false; // no need to load data } @@ -1371,6 +1372,11 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx * if (!vnodeSupportPrefilter(pFilterInfo->info.data.type)) { continue; } + + // all points in current column are NULL, no need to check its boundary value + if (pField[colIndex].numOfNullPoints == numOfTotalPoints) { + continue; + } if (pFilterInfo->info.data.type == TSDB_DATA_TYPE_FLOAT) { float minval = *(double *)(&pField[colIndex].min); @@ -6595,7 +6601,7 @@ int32_t LoadDatablockOnDemand(SCompBlock *pBlock, SField **pFields, uint8_t *blk * filter the data block according to the value filter condition. * no need to load the data block, continue for next block */ - if (!needToLoadDataBlock(pQuery, *pFields, pRuntimeEnv->pCtx)) { + if (!needToLoadDataBlock(pQuery, *pFields, pRuntimeEnv->pCtx, pBlock->numOfPoints)) { #if defined(_DEBUG_VIEW) dTrace("QInfo:%p fileId:%d, slot:%d, block discarded by per-filter, ", GET_QINFO_ADDR(pQuery), pQuery->fileId, pQuery->slot); From 96329ea18ce1c47da71bd12e58739fb86877c44a Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 19 Dec 2019 14:09:03 +0800 Subject: [PATCH 6/7] #949 --- src/client/src/tscServer.c | 4 ++-- src/system/detail/src/mgmtConn.c | 2 +- src/system/detail/src/mgmtVgroup.c | 5 +++-- src/system/lite/src/mgmtBalance.spec.c | 2 ++ 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d25b2dde53..1b93096b45 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1400,7 +1400,7 @@ void tscUpdateVnodeInSubmitMsg(SSqlObj *pSql, char *buf) { pShellMsg = (SShellSubmitMsg *)pMsg; pShellMsg->vnode = htons(pMeterMeta->vpeerDesc[pSql->index].vnode); - tscTrace("%p update submit msg vnode:%d", pSql, htons(pShellMsg->vnode)); + tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pSql->index].ip), htons(pShellMsg->vnode)); } int tscBuildSubmitMsg(SSqlObj *pSql) { @@ -1421,7 +1421,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql) { // pSql->cmd.payloadLen is set during parse sql routine, so we do not use it here pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT; - tscTrace("%p update submit msg vnode:%d", pSql, htons(pShellMsg->vnode)); + tscTrace("%p update submit msg vnode:%s:%d", pSql, taosIpStr(pMeterMeta->vpeerDesc[pMeterMeta->index].ip), htons(pShellMsg->vnode)); return msgLen; } diff --git a/src/system/detail/src/mgmtConn.c b/src/system/detail/src/mgmtConn.c index 13275300a6..b440a1042e 100644 --- a/src/system/detail/src/mgmtConn.c +++ b/src/system/detail/src/mgmtConn.c @@ -48,7 +48,7 @@ int mgmtGetConns(SShowObj *pShow, SConnObj *pConn) { pConn = pAcct->pConn; SConnInfo *pConnInfo = pConnShow->connInfo; - while (pConn) { + while (pConn && pConn->pUser) { strcpy(pConnInfo->user, pConn->pUser->user); pConnInfo->ip = pConn->ip; pConnInfo->port = pConn->port; diff --git a/src/system/detail/src/mgmtVgroup.c b/src/system/detail/src/mgmtVgroup.c index e3bed57b33..6efe9d6660 100644 --- a/src/system/detail/src/mgmtVgroup.c +++ b/src/system/detail/src/mgmtVgroup.c @@ -40,6 +40,7 @@ void *mgmtVgroupActionAfterBatchUpdate(void *row, char *str, int size, int *ssiz void *mgmtVgroupActionReset(void *row, char *str, int size, int *ssize); void *mgmtVgroupActionDestroy(void *row, char *str, int size, int *ssize); bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode); +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode); void mgmtVgroupActionInit() { mgmtVgroupActionFp[SDB_TYPE_INSERT] = mgmtVgroupActionInsert; @@ -328,8 +329,8 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn) pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; if (pVgroup->vnodeGid[i].ip != 0) { - bool ready = mgmtCheckVnodeReady(NULL, pVgroup, pVgroup->vnodeGid + i); - strcpy(pWrite, ready ? "ready" : "unsynced"); + char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i); + strcpy(pWrite, vnodeStatus); } else { strcpy(pWrite, "null"); } diff --git a/src/system/lite/src/mgmtBalance.spec.c b/src/system/lite/src/mgmtBalance.spec.c index 3d2f10dcbb..33fe4502a4 100644 --- a/src/system/lite/src/mgmtBalance.spec.c +++ b/src/system/lite/src/mgmtBalance.spec.c @@ -52,6 +52,8 @@ bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType) { return tsModule[moduleType].num != 0; } +char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { return "master"; } + bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; } void mgmtUpdateDnodeState(SDnodeObj *pDnode, int lbStatus) {} From ed942e8f1cc0b28538545140aa129e4932d03139 Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 19 Dec 2019 17:39:56 +0800 Subject: [PATCH 7/7] #952 [TBASE-1356] --- src/client/src/tscServer.c | 3 +++ src/system/detail/src/vnodeShell.c | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 1b93096b45..64c0be9283 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -469,6 +469,9 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) { if (pCmd->command > TSDB_SQL_MGMT) { tscProcessMgmtRedirect(pSql, pMsg->content + 1); + } else if (pCmd->command == TSDB_SQL_INSERT){ + pSql->index++; + pSql->maxRetry = TSDB_VNODES_SUPPORT * 2; } else { pSql->index++; } diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 66bede89b7..99535c9aa4 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -118,6 +118,9 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) { } else if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT) { if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_MASTER) { vnodeProcessShellSubmitRequest((char *) pMsg->content, pMsg->msgLen - sizeof(SIntMsg), pObj); + } else if (vnodeList[vnode].vnodeStatus == TSDB_VN_STATUS_SLAVE) { + taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_REDIRECT); + dTrace("vid:%d sid:%d, shell submit msg is redirect since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus)); } else { taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY); dTrace("vid:%d sid:%d, shell submit msg is ignored since in status:%s", vnode, sid, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));