diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 41bfe34561..2b962333d5 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -394,7 +394,7 @@ static int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start TSKEY k = *(TSKEY *)start; - if (k == 0) { + if (k == INT64_MIN) { if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) { return -1; } else if (pDataBlocks->tsSource == -1) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 73a324f521..45ca470ce6 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -520,7 +520,7 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups); pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId); - tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qhandle); } else { int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables); assert(vgIndex >= 0 && vgIndex < numOfVgroups); @@ -528,12 +528,12 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) { SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex); pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId); - tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d", pSql, pTableIdList->vgInfo.vgId, vgIndex); + tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qhandle:%" PRIX64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qhandle); } } else { STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId); - tscDebug("%p build fetch msg from only one vgroup, vgId:%d", pSql, pTableMeta->vgId); + tscDebug("%p build fetch msg from only one vgroup, vgId:%d, qhandle:%" PRIX64, pSql, pTableMeta->vgId, pSql->res.qhandle); } pSql->cmd.payloadLen = sizeof(SRetrieveTableMsg); diff --git a/src/common/src/texpr.c b/src/common/src/texpr.c index f50b829baa..1008c4cf8f 100644 --- a/src/common/src/texpr.c +++ b/src/common/src/texpr.c @@ -41,41 +41,46 @@ static uint8_t UNUSED_FUNC isQueryOnPrimaryKey(const char *primaryColumnName, co static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) { switch(type) { - case TSDB_DATA_TYPE_TINYINT: { + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_UTINYINT:{ int8_t* p = (int8_t*) dest; int8_t* pSrc = (int8_t*) src; for(int32_t i = 0; i < numOfRows; ++i) { p[i] = pSrc[numOfRows - i - 1]; } - break; + return; } - case TSDB_DATA_TYPE_SMALLINT: { + + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT:{ int16_t* p = (int16_t*) dest; int16_t* pSrc = (int16_t*) src; for(int32_t i = 0; i < numOfRows; ++i) { p[i] = pSrc[numOfRows - i - 1]; } - break; + return; } - case TSDB_DATA_TYPE_INT: { + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: { int32_t* p = (int32_t*) dest; int32_t* pSrc = (int32_t*) src; for(int32_t i = 0; i < numOfRows; ++i) { p[i] = pSrc[numOfRows - i - 1]; } - break; + return; } - case TSDB_DATA_TYPE_BIGINT: { + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: { int64_t* p = (int64_t*) dest; int64_t* pSrc = (int64_t*) src; for(int32_t i = 0; i < numOfRows; ++i) { p[i] = pSrc[numOfRows - i - 1]; } - break; + return; } case TSDB_DATA_TYPE_FLOAT: { float* p = (float*) dest; @@ -84,7 +89,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf for(int32_t i = 0; i < numOfRows; ++i) { p[i] = pSrc[numOfRows - i - 1]; } - break; + return; } case TSDB_DATA_TYPE_DOUBLE: { double* p = (double*) dest; @@ -93,7 +98,7 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf for(int32_t i = 0; i < numOfRows; ++i) { p[i] = pSrc[numOfRows - i - 1]; } - break; + return; } default: assert(0); } diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index a3ff459396..87b31e4604 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); } else { if (qtype == TAOS_QTYPE_FWD) { - vnodeConfirmForward(pVnode, pWrite->pHead.version, 0); + vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); } if (pWrite->rspRet.rsp) { rpcFreeCont(pWrite->rspRet.rsp); diff --git a/src/inc/query.h b/src/inc/query.h index 7342221cb9..77a12ebfc5 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -28,7 +28,7 @@ typedef void* qinfo_t; * @param qinfo * @return */ -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo); +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t *qId); /** @@ -88,9 +88,10 @@ void* qOpenQueryMgmt(int32_t vgId); void qQueryMgmtNotifyClosed(void* pExecutor); void qQueryMgmtReOpen(void *pExecutor); void qCleanupQueryMgmt(void* pExecutor); -void** qRegisterQInfo(void* pMgmt, uint64_t qInfo); +void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo); void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle); +bool checkQIdEqual(void *qHandle, uint64_t qId); #ifdef __cplusplus } diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 379c877b26..99dfd3a6a3 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -79,6 +79,9 @@ typedef void (*FStopSyncFile)(int32_t vgId, uint64_t fversion); // get file version typedef int32_t (*FGetVersion)(int32_t vgId, uint64_t *fver, uint64_t *vver); +// reset version +typedef int32_t (*FResetVersion)(int32_t vgId, uint64_t fver); + typedef int32_t (*FSendFile)(void *tsdb, SOCKET socketFd); typedef int32_t (*FRecvFile)(void *tsdb, SOCKET socketFd); @@ -96,6 +99,7 @@ typedef struct { FStartSyncFile startSyncFileFp; FStopSyncFile stopSyncFileFp; FGetVersion getVersionFp; + FResetVersion resetVersionFp; FSendFile sendFileFp; FRecvFile recvFileFp; } SSyncInfo; @@ -108,8 +112,8 @@ void syncCleanUp(); int64_t syncStart(const SSyncInfo *); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg *); -int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype); -void syncConfirmForward(int64_t rid, uint64_t version, int32_t code); +int32_t syncForwardToPeer(int64_t rid, void *pHead, void *mhandle, int32_t qtype, bool force); +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force); void syncRecover(int64_t rid); // recover from other nodes: int32_t syncGetNodesRole(int64_t rid, SNodesRole *); diff --git a/src/inc/twal.h b/src/inc/twal.h index 1645de77aa..bce398d6f9 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -65,6 +65,7 @@ void walFsync(twalh, bool forceFsync); int32_t walRestore(twalh, void *pVnode, FWalWrite writeFp); int32_t walGetWalFile(twalh, char *fileName, int64_t *fileId); uint64_t walGetVersion(twalh); +void walResetVersion(twalh, uint64_t newVer); #ifdef __cplusplus } diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 7c9ebd8a0b..dddec83da8 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -78,7 +78,7 @@ void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); // vnodeSync -void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); +void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force); // vnodeRead int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); diff --git a/src/kit/shell/CMakeLists.txt b/src/kit/shell/CMakeLists.txt index b6babc5bc5..d36c1e3fcc 100644 --- a/src/kit/shell/CMakeLists.txt +++ b/src/kit/shell/CMakeLists.txt @@ -36,6 +36,7 @@ ELSEIF (TD_DARWIN) LIST(APPEND SRC ./src/shellDarwin.c) LIST(APPEND SRC ./src/shellCommand.c) LIST(APPEND SRC ./src/shellImport.c) + LIST(APPEND SRC ./src/shellCheck.c) ADD_EXECUTABLE(shell ${SRC}) # linking with dylib TARGET_LINK_LIBRARIES(shell taos) diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index 50bceb1a71..019f3e5d92 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -51,6 +51,7 @@ typedef struct SShellArguments { char file[TSDB_FILENAME_LEN]; char dir[TSDB_FILENAME_LEN]; int threadNum; + int check; char* commands; int abort; int port; @@ -71,6 +72,7 @@ void read_history(); void write_history(); void source_file(TAOS* con, char* fptr); void source_dir(TAOS* con, SShellArguments* args); +void shellCheck(TAOS* con, SShellArguments* args); void get_history_path(char* history); void cleanup_handler(void* arg); void exitShell(); diff --git a/src/kit/shell/src/shellCheck.c b/src/kit/shell/src/shellCheck.c new file mode 100644 index 0000000000..b88244ea01 --- /dev/null +++ b/src/kit/shell/src/shellCheck.c @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _GNU_SOURCE +#define _XOPEN_SOURCE +#define _DEFAULT_SOURCE + +#include "os.h" +#include "shell.h" +#include "shellCommand.h" +#include "tglobal.h" +#include "tutil.h" + +#define SHELL_SQL_LEN 1024 +static int32_t tbNum = 0; +static int32_t tbMallocNum = 0; +static char ** tbNames = NULL; +static int32_t checkedNum = 0; +static int32_t errorNum = 0; + +typedef struct { + pthread_t threadID; + int threadIndex; + int totalThreads; + void * taos; + char * db; +} ShellThreadObj; + +static int32_t shellUseDb(TAOS *con, char *db) { + if (db == NULL) { + fprintf(stdout, "no dbname input\n"); + return -1; + } + + char sql[SHELL_SQL_LEN] = {0}; + snprintf(sql, SHELL_SQL_LEN, "use %s", db); + + TAOS_RES *pSql = taos_query(con, sql); + int32_t code = taos_errno(pSql); + if (code != 0) { + fprintf(stdout, "failed to execute sql:%s since %s", sql, taos_errstr(pSql)); + } + + taos_free_result(pSql); + return code; +} + +static int32_t shellShowTables(TAOS *con, char *db) { + char sql[SHELL_SQL_LEN] = {0}; + snprintf(sql, SHELL_SQL_LEN, "show %s.tables", db); + + TAOS_RES *pSql = taos_query(con, sql); + int32_t code = taos_errno(pSql); + + if (code != 0) { + fprintf(stdout, "failed to execute sql:%s since %s\n", sql, taos_errstr(pSql)); + } else { + TAOS_ROW row; + while ((row = taos_fetch_row(pSql))) { + int32_t tbIndex = tbNum++; + if (tbMallocNum < tbNum) { + tbMallocNum = (tbMallocNum * 2 + 1); + tbNames = realloc(tbNames, tbMallocNum * sizeof(char *)); + if (tbNames == NULL) { + fprintf(stdout, "failed to malloc tablenames, num:%d\n", tbMallocNum); + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + break; + } + } + + tbNames[tbIndex] = malloc(TSDB_TABLE_NAME_LEN); + strncpy(tbNames[tbIndex], (const char *)row[0], TSDB_TABLE_NAME_LEN); + if (tbIndex % 100000 == 0 && tbIndex != 0) { + fprintf(stdout, "%d tablenames fetched\n", tbIndex); + } + } + } + + taos_free_result(pSql); + + fprintf(stdout, "total %d tablenames fetched, over\n", tbNum); + return code; +} + +static void shellFreeTbnames() { + for (int32_t i = 0; i < tbNum; ++i) { + free(tbNames[i]); + } + free(tbNames); +} + +static void *shellCheckThreadFp(void *arg) { + ShellThreadObj *pThread = (ShellThreadObj *)arg; + + int32_t interval = tbNum / pThread->totalThreads + 1; + int32_t start = pThread->threadIndex * interval; + int32_t end = (pThread->threadIndex + 1) * interval; + + if (end > tbNum) end = tbNum + 1; + + char file[32] = {0}; + snprintf(file, 32, "tb%d.txt", pThread->threadIndex); + + FILE *fp = fopen(file, "w"); + if (!fp) { + fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno)); + return NULL; + } + + char sql[SHELL_SQL_LEN]; + for (int32_t t = start; t < end; ++t) { + char *tbname = tbNames[t]; + if (tbname == NULL) break; + + snprintf(sql, SHELL_SQL_LEN, "select * from %s limit 1", tbname); + + TAOS_RES *pSql = taos_query(pThread->taos, sql); + int32_t code = taos_errno(pSql); + if (code != 0) { + int32_t len = snprintf(sql, SHELL_SQL_LEN, "drop table %s.%s;\n", pThread->db, tbname); + fwrite(sql, 1, len, fp); + atomic_add_fetch_32(&errorNum, 1); + } + + int32_t cnum = atomic_add_fetch_32(&checkedNum, 1); + if (cnum % 5000 == 0 && cnum != 0) { + fprintf(stdout, "%d tables checked\n", cnum); + } + + taos_free_result(pSql); + } + + fsync(fileno(fp)); + fclose(fp); + + return NULL; +} + +static void shellRunCheckThreads(TAOS *con, SShellArguments *args) { + pthread_attr_t thattr; + ShellThreadObj *threadObj = (ShellThreadObj *)calloc(args->threadNum, sizeof(ShellThreadObj)); + for (int t = 0; t < args->threadNum; ++t) { + ShellThreadObj *pThread = threadObj + t; + pThread->threadIndex = t; + pThread->totalThreads = args->threadNum; + pThread->taos = con; + pThread->db = args->database; + + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&(pThread->threadID), &thattr, shellCheckThreadFp, (void *)pThread) != 0) { + fprintf(stderr, "ERROR: thread:%d failed to start\n", pThread->threadIndex); + exit(0); + } + } + + for (int t = 0; t < args->threadNum; ++t) { + pthread_join(threadObj[t].threadID, NULL); + } + + for (int t = 0; t < args->threadNum; ++t) { + taos_close(threadObj[t].taos); + } + free(threadObj); +} + +void shellCheck(TAOS *con, SShellArguments *args) { + int64_t start = taosGetTimestampMs(); + + if (shellUseDb(con, args->database) != 0) { + shellFreeTbnames(); + return; + } + + if (shellShowTables(con, args->database) != 0) { + shellFreeTbnames(); + return; + } + + fprintf(stdout, "total %d tables will be checked by %d threads\n", tbNum, args->threadNum); + shellRunCheckThreads(con, args); + + int64_t end = taosGetTimestampMs(); + fprintf(stdout, "total %d tables checked, failed:%d, time spent %.2f seconds\n", checkedNum, errorNum, + (end - start) / 1000.0); +} diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index a6f5869936..b9529aac8e 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -121,6 +121,12 @@ TAOS *shellInit(SShellArguments *args) { taos_close(con); exit(EXIT_SUCCESS); } + + if (args->check != 0) { + shellCheck(con, args); + taos_close(con); + exit(EXIT_SUCCESS); + } #endif return con; @@ -412,7 +418,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { #ifdef WINDOWS if (tt < 0) tt = 0; #endif - if (tt < 0 && ms != 0) { + if (tt <= 0 && ms < 0) { tt--; if (precision == TSDB_TIME_PRECISION_MICRO) { ms += 1000000; diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 07b21531a7..3f6b3da9bf 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -45,6 +45,7 @@ static struct argp_option options[] = { {"file", 'f', "FILE", 0, "Script to run without enter the shell."}, {"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."}, {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, + {"check", 'k', "CHECK", 0, "Check tables."}, {"database", 'd', "DATABASE", 0, "Database to use when connecting to the server."}, {"timezone", 't', "TIMEZONE", 0, "Time zone of the shell, default is local."}, {"netrole", 'n', "NETROLE", 0, "Net role when network connectivity test, default is startup, options: client|server|rpc|startup|sync."}, @@ -130,6 +131,9 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { return -1; } break; + case 'k': + arguments->check = atoi(arg); + break; case 'd': arguments->database = arg; break; diff --git a/src/kit/taosdemo/insert.json b/src/kit/taosdemo/insert.json index 208b5fcb74..e6b1895043 100644 --- a/src/kit/taosdemo/insert.json +++ b/src/kit/taosdemo/insert.json @@ -41,8 +41,7 @@ "insert_mode": "taosc", "insert_rows": 100000, "multi_thread_write_one_tbl": "no", - "number_of_tbl_in_one_sql": 0, - "rows_per_tbl": 100, + "rows_per_tbl": 0, "max_sql_len": 1024000, "disorder_ratio": 0, "disorder_range": 1000, diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index ce88ba5bef..b03f47bfaf 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -23,7 +23,6 @@ #ifdef LINUX #include - #include #include #ifndef _ALPINE #include @@ -39,11 +38,11 @@ #include #include #else - #include #include #include #endif +#include #include #include "cJSON.h" @@ -221,7 +220,6 @@ typedef struct SSuperTable_S { int childTblOffset; int multiThreadWriteOneTbl; // 0: no, 1: yes - int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl int rowsPerTbl; // int disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision @@ -396,6 +394,8 @@ typedef struct SThreadInfo_S { uint64_t et; int64_t lastTs; + // sample data + int samplePos; // statistics int64_t totalInsertRows; int64_t totalAffectedRows; @@ -1126,8 +1126,6 @@ static int printfInsertMeta() { }else { printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); } - printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n", - g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); printf(" rowsPerTbl: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); printf(" disorderRange: \033[33m%d\033[0m\n", @@ -1287,7 +1285,6 @@ static void printfInsertMetaToFile(FILE* fp) { }else { fprintf(fp, " multiThreadWriteOneTbl: yes\n"); } - fprintf(fp, " numberOfTblInOneSql: %d\n", g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); fprintf(fp, " rowsPerTbl: %d\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio); @@ -2335,7 +2332,8 @@ static int createDatabases() { " fsync %d", g_Dbs.db[i].dbCfg.fsync); } if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms"))) - || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", strlen("us")))) { + || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, + "us", strlen("us")))) { dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, " precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } @@ -2351,14 +2349,17 @@ static int createDatabases() { debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // describe super table, if exists - sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); + sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, + g_Dbs.db[i].superTbls[j].sTblName); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; - ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); + ret = createSuperTable(taos, g_Dbs.db[i].dbName, + &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); } else { g_Dbs.db[i].superTbls[j].superTblExists = TBL_ALREADY_EXISTS; - ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j]); + ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName, + &g_Dbs.db[i].superTbls[j]); } if (0 != ret) { @@ -2715,6 +2716,8 @@ static int readSampleFromCsvFileToMem( continue; } + verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", readLen, superTblInfo->lenOfOneRow, getRows); + memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, line, readLen); getRows++; @@ -3426,24 +3429,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n"); goto PARSE_OVER; } - - cJSON* numberOfTblInOneSql = cJSON_GetObjectItem(stbInfo, "number_of_tbl_in_one_sql"); - if (numberOfTblInOneSql && numberOfTblInOneSql->type == cJSON_Number) { - g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = numberOfTblInOneSql->valueint; - } else if (!numberOfTblInOneSql) { - g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = 0; - } else { - printf("ERROR: failed to read json, numberOfTblInOneSql not found\n"); - goto PARSE_OVER; - } - cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "rows_per_tbl"); if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint; } else if (!rowsPerTbl) { - g_Dbs.db[i].superTbls[j].rowsPerTbl = 1; + g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { - printf("ERROR: failed to read json, rowsPerTbl not found\n"); + fprintf(stderr, "ERROR: failed to read json, rowsPerTbl input mistake\n"); goto PARSE_OVER; } @@ -3901,7 +3893,7 @@ PARSE_OVER: return ret; } -void prepareSampleData() { +static void prepareSampleData() { for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { @@ -3915,7 +3907,7 @@ void prepareSampleData() { } } -void postFreeResource() { +static void postFreeResource() { tmfclose(g_fpOfInsertResult); for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { @@ -3942,16 +3934,18 @@ void postFreeResource() { static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* superTblInfo, int* sampleUsePos) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { - int ret = readSampleFromCsvFileToMem(superTblInfo); +/* int ret = readSampleFromCsvFileToMem(superTblInfo); if (0 != ret) { tmfree(superTblInfo->sampleDataBuf); superTblInfo->sampleDataBuf = NULL; return -1; } +*/ *sampleUsePos = 0; } int dataLen = 0; + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, @@ -3967,12 +3961,14 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper int dataLen = 0; dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); for (int i = 0; i < stbInfo->columnCount; i++) { - if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { + if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) + || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { - printf("binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); + printf("binary or nchar length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); return (-1); } - + char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1); if (NULL == buf) { printf("calloc failed! size:%d\n", stbInfo->columns[i].dataLen); @@ -3981,15 +3977,24 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper rand_string(buf, stbInfo->columns[i].dataLen); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "\'%s\', ", buf); tmfree(buf); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "int", 3)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_int()); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "bigint", 6)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "float", 5)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%f, ", rand_float()); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "double", 6)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%f, ", rand_double()); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "smallint", 8)) { + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "int", 3)) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%d, ", rand_int()); + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "bigint", 6)) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%"PRId64", ", rand_bigint()); + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "float", 5)) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%f, ", rand_float()); + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "double", 6)) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%f, ", rand_double()); + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "smallint", 8)) { dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_smallint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "tinyint", 7)) { dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_tinyint()); @@ -4009,255 +4014,6 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper return dataLen; } -static void syncWriteForNumberOfTblInOneSql( - threadInfo *winfo, char* sampleDataBuf) { - SSuperTable* superTblInfo = winfo->superTblInfo; - - int samplePos = 0; - - //printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id); - int64_t lastPrintTime = taosGetTimestampMs(); - - char* buffer = calloc(superTblInfo->maxSqlLen+1, 1); - if (NULL == buffer) { - printf("========calloc size[ %d ] fail!\n", superTblInfo->maxSqlLen); - return; - } - - int32_t numberOfTblInOneSql = superTblInfo->numberOfTblInOneSql; - int32_t tbls = winfo->end_table_id - winfo->start_table_id + 1; - if (numberOfTblInOneSql > tbls) { - numberOfTblInOneSql = tbls; - } - - uint64_t time_counter = winfo->start_time; - int sampleUsePos; - - int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; - int64_t st = 0; - int64_t et = 0xffffffff; - - int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; - for (int i = 0; i < insertRows;) { - int32_t tbl_id = 0; - for (int tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; ) { - int64_t start_time = 0; - int inserted = i; - - for (int k = 0; k < g_args.num_of_RPR;) { - int len = 0; - memset(buffer, 0, superTblInfo->maxSqlLen); - char *pstr = buffer; - - int32_t end_tbl_id = tableSeq + numberOfTblInOneSql; - if (end_tbl_id > winfo->end_table_id) { - end_tbl_id = winfo->end_table_id+1; - } - - for (tbl_id = tableSeq ; tbl_id < end_tbl_id; tbl_id++) { - sampleUsePos = samplePos; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { - char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagVaulesForStb(superTblInfo); - } else { - tagsValBuf = getTagValueFromTagSample( - superTblInfo, tbl_id % superTblInfo->tagSampleCount); - } - if (NULL == tagsValBuf) { - goto free_and_statistics; - } - - if (0 == len) { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s%d using %s.%s tags %s values ", - winfo->db_name, - superTblInfo->childTblPrefix, - tbl_id, - winfo->db_name, - superTblInfo->sTblName, - tagsValBuf); - } else { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - " %s.%s%d using %s.%s tags %s values ", - winfo->db_name, - superTblInfo->childTblPrefix, - tbl_id, - winfo->db_name, - superTblInfo->sTblName, - tagsValBuf); - } - tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - if (0 == len) { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s values ", - winfo->db_name, - superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); - } else { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - " %s.%s values ", - winfo->db_name, - superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); - } - } else { // pre-create child table - if (0 == len) { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s%d values ", - winfo->db_name, - superTblInfo->childTblPrefix, - tbl_id); - } else { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - " %s.%s%d values ", - winfo->db_name, - superTblInfo->childTblPrefix, - tbl_id); - } - } - - start_time = time_counter; - for (int j = 0; j < superTblInfo->rowsPerTbl;) { - int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, - "sample", strlen("sample"))) { - retLen = getRowDataFromSample(pstr + len, - superTblInfo->maxSqlLen - len, - start_time += superTblInfo->timeStampStep, - superTblInfo, - &sampleUsePos); - if (retLen < 0) { - goto free_and_statistics; - } - } else if (0 == strncasecmp( - superTblInfo->dataSource, "rand", strlen("rand"))) { - int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio - && rand_num < superTblInfo->disorderRatio) { - int64_t d = start_time - taosRandom() % superTblInfo->disorderRange; - retLen = generateRowData(pstr + len, - superTblInfo->maxSqlLen - len, - d, - superTblInfo); - } else { - retLen = generateRowData(pstr + len, - superTblInfo->maxSqlLen - len, - start_time += superTblInfo->timeStampStep, - superTblInfo); - } - if (retLen < 0) { - goto free_and_statistics; - } - } - len += retLen; - //inserted++; - j++; - winfo->totalInsertRows++; - - if (inserted >= superTblInfo->insertRows || - (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { - tableSeq = tbl_id + 1; - printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", - superTblInfo->lenOfOneRow); - goto send_to_server; - } - } - } - - tableSeq = tbl_id; - inserted += superTblInfo->rowsPerTbl; - -send_to_server: - if (insert_interval) { - st = taosGetTimestampUs(); - - if (insert_interval > ((et - st)/1000)) { - int sleep_time = insert_interval - (et -st); - printf("sleep: %d ms insert interval\n", sleep_time); - taosMsleep(sleep_time); // ms - } - } - - if (0 == strncasecmp(superTblInfo->insertMode, - "taosc", - strlen("taosc"))) { - //printf("multi table===== sql: %s \n\n", buffer); - //int64_t t1 = taosGetTimestampMs(); - int64_t startTs; - int64_t endTs; - startTs = taosGetTimestampUs(); - - debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer); - int affectedRows = queryDbExec( - winfo->taos, buffer, INSERT_TYPE); - - if (0 < affectedRows) { - endTs = taosGetTimestampUs(); - int64_t delay = endTs - startTs; - if (delay > winfo->maxDelay) winfo->maxDelay = delay; - if (delay < winfo->minDelay) winfo->minDelay = delay; - winfo->cntDelay++; - winfo->totalDelay += delay; - winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; - winfo->totalAffectedRows += affectedRows; - } else { - fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows); - goto free_and_statistics; - } - - int64_t currentPrintTime = taosGetTimestampMs(); - if (currentPrintTime - lastPrintTime > 30*1000) { - printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", - winfo->threadID, - winfo->totalInsertRows, - winfo->totalAffectedRows); - lastPrintTime = currentPrintTime; - } - //int64_t t2 = taosGetTimestampMs(); - //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); - } else { - //int64_t t1 = taosGetTimestampMs(); - int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); - //int64_t t2 = taosGetTimestampMs(); - //printf("http insert sql return, Spent %ld ms \n", t2 - t1); - - if (0 != retCode) { - printf("========restful return fail, threadID[%d]\n", winfo->threadID); - goto free_and_statistics; - } - } - if (insert_interval) { - et = taosGetTimestampUs(); - } - - break; - } - - if (tableSeq > winfo->end_table_id) { - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { - samplePos = sampleUsePos; - } - i = inserted; - time_counter = start_time; - } - } - - //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); - } - -free_and_statistics: - tmfree(buffer); - printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", - winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows); - return; -} - int32_t generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int lenOfBinary) { memset(res, 0, MAX_DATA_SIZE); @@ -4319,26 +4075,23 @@ int32_t generateData(char *res, char **data_type, static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { char* sampleDataBuf = NULL; - // each thread read sample data from csv file - if (0 == strncasecmp(superTblInfo->dataSource, - "sample", - strlen("sample"))) { - sampleDataBuf = calloc( + sampleDataBuf = calloc( superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); - if (sampleDataBuf == NULL) { + if (sampleDataBuf == NULL) { fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, strerror(errno)); return -1; - } + } - superTblInfo->sampleDataBuf = sampleDataBuf; - int ret = readSampleFromCsvFileToMem(superTblInfo); - if (0 != ret) { + superTblInfo->sampleDataBuf = sampleDataBuf; + int ret = readSampleFromCsvFileToMem(superTblInfo); + + if (0 != ret) { + fprintf(stderr, "read sample from csv file failed.\n"); tmfree(sampleDataBuf); superTblInfo->sampleDataBuf = NULL; return -1; - } } return 0; @@ -4392,7 +4145,6 @@ static int generateDataBuffer(int32_t tableSeq, assert(buffer != NULL); char *pChildTblName; - int childTblCount; pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1); if (NULL == pChildTblName) { @@ -4400,14 +4152,12 @@ static int generateDataBuffer(int32_t tableSeq, return -1; } - if (superTblInfo && (superTblInfo->childTblOffset > 0)) { - // select tbname from stb limit 1 offset tableSeq - getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos, - pThreadInfo->db_name, superTblInfo->sTblName, - &pChildTblName, &childTblCount, - 1, tableSeq); + if (superTblInfo && (superTblInfo->childTblOffset >= 0) + && (superTblInfo->childTblLimit > 0)) { + snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s", + superTblInfo->childTblName + (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); } else { - snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d", + snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d", superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq); } @@ -4467,7 +4217,7 @@ static int generateDataBuffer(int32_t tableSeq, verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); for (k = 0; k < g_args.num_of_RPR;) { if (superTblInfo) { - int retLen = 0; + int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { @@ -4488,27 +4238,26 @@ static int generateDataBuffer(int32_t tableSeq, superTblInfo->maxSqlLen - len, d, superTblInfo); - //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); - } else { - retLen = generateRowData( + } else { + retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, startTime + superTblInfo->timeStampStep * startFrom, superTblInfo); - } + } + } - if (retLen < 0) { - free(pChildTblName); - return -1; - } + if (retLen < 0) { + free(pChildTblName); + return -1; + } - len += retLen; + len += retLen; - if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite - k++; - break; - } - } + if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite + k++; + break; + } } else { int rand_num = taosRandom() % 100; char data[MAX_DATA_SIZE]; @@ -4529,14 +4278,13 @@ static int generateDataBuffer(int32_t tableSeq, } pstr += sprintf(pstr, " %s", data); - //assert(len + pstr - buffer < BUFFER_SIZE); if (len + pstr - buffer >= (g_args.max_sql_len - 256)) { // too long k++; break; } } - verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%p\n", __func__, __LINE__, len, k, buffer); + verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); k++; startFrom ++; @@ -4571,21 +4319,7 @@ static void* syncWrite(void *sarg) { strerror(errno)); return NULL; } - - if (superTblInfo) { - if (0 != prepareSampleDataForSTable(superTblInfo)) - return NULL; - - if (superTblInfo->numberOfTblInOneSql > 0) { - syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf); - tmfree(superTblInfo->sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; - return NULL; - } - } - int samplePos = 0; - int64_t lastPrintTime = taosGetTimestampMs(); int64_t startTs = taosGetTimestampUs(); int64_t endTs; @@ -4598,7 +4332,7 @@ static void* syncWrite(void *sarg) { winfo->totalInsertRows = 0; winfo->totalAffectedRows = 0; - int sampleUsePos; + winfo->samplePos = 0; for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; tableSeq ++) { @@ -4612,10 +4346,8 @@ static void* syncWrite(void *sarg) { st = taosGetTimestampUs(); } - sampleUsePos = samplePos; - int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows, - i, start_time, &sampleUsePos); + i, start_time, &(winfo->samplePos)); if (generated > 0) i += generated; else @@ -4662,16 +4394,12 @@ static void* syncWrite(void *sarg) { if ((tableSeq == winfo->end_table_id) && superTblInfo && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { - samplePos = sampleUsePos; + printf("%s() LN%d samplePos=%d\n", __func__, __LINE__, winfo->samplePos); } } // tableSeq free_and_statistics_2: tmfree(buffer); - if (superTblInfo) { - tmfree(superTblInfo->sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; - } printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, @@ -4842,6 +4570,45 @@ static void startMultiThreadInsertData(int threads, char* db_name, else last = 0; + // read sample data from file first + if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, + "sample", strlen("sample")))) { + if (0 != prepareSampleDataForSTable(superTblInfo)) { + fprintf(stderr, "prepare sample data for stable failed!\n"); + exit(-1); + } + } + + if (superTblInfo && (superTblInfo->childTblOffset >= 0) + && (superTblInfo->childTblLimit > 0)) { + + TAOS* taos = taos_connect( + g_Dbs.host, g_Dbs.user, + g_Dbs.password, db_name, g_Dbs.port); + if (NULL == taos) { + fprintf(stderr, "connect to server fail , reason: %s\n", + taos_errstr(NULL)); + exit(-1); + } + + superTblInfo->childTblName = (char*)calloc(1, + superTblInfo->childTblLimit * TSDB_TABLE_NAME_LEN); + if (superTblInfo->childTblName == NULL) { + fprintf(stderr, "alloc memory failed!"); + taos_close(taos); + exit(-1); + } + int childTblCount; + + getChildNameOfSuperTableWithLimitAndOffset( + taos, + db_name, superTblInfo->sTblName, + &superTblInfo->childTblName, &childTblCount, + superTblInfo->childTblLimit, + superTblInfo->childTblOffset); + taos_close(taos); + } + for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index fe1f70cb50..319b16e62a 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -680,7 +680,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * if (pRow != NULL) { // forward to peers pRow->processedCount = 0; - int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC); + int32_t syncCode = syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false); if (syncCode <= 0) pRow->processedCount = 1; if (syncCode < 0) { @@ -700,7 +700,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); // even it is WAL/FWD, it shall be called to update version in sync - syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC); + syncForwardToPeer(tsSdbMgmt.sync, pHead, pRow, TAOS_QTYPE_RPC, false); // from wal or forward msg, row not created, should add into hash if (action == SDB_ACTION_INSERT) { @@ -1119,7 +1119,7 @@ static void *sdbWorkerFp(void *pWorker) { sdbConfirmForward(1, pRow, pRow->code); } else { if (qtype == TAOS_QTYPE_FWD) { - syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code); + syncConfirmForward(tsSdbMgmt.sync, pRow->pHead.version, pRow->code, false); } sdbFreeFromQueue(pRow); } diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 3615c32a90..5ff574ec67 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -308,6 +308,7 @@ enum { typedef struct SQInfo { void* signature; + uint64_t qId; int32_t code; // error code to returned to client int64_t owner; // if it is in execution @@ -429,7 +430,7 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t nu SSqlGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pColIndex, int32_t *code); SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, - SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql); + SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, char* sql, uint64_t *qId); int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQInfo *pQInfo, SQueryParam* param, bool isSTable); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index b2d97e2d69..54bc72b307 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -98,6 +98,9 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) + +uint64_t queryHandleId = 0; + int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } @@ -4173,8 +4176,10 @@ static SSDataBlock* doTableScan(void* param) { assert(ret); } - pResultRowInfo->curIndex = 0; - pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey; + if (pResultRowInfo->size > 0) { + pResultRowInfo->curIndex = 0; + pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey; + } qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey); @@ -6109,9 +6114,13 @@ void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo) { pResultInfo->total = 0; } +FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { + return ((SQInfo *)qHandle)->qId == qId; +} + SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr, SExprInfo* pExprs, SExprInfo* pSecExprs, STableGroupInfo* pTableGroupInfo, SColumnInfo* pTagCols, bool stableQuery, - char* sql) { + char* sql, uint64_t *qId) { int16_t numOfCols = pQueryMsg->numOfCols; int16_t numOfOutput = pQueryMsg->numOfOutput; @@ -6252,7 +6261,9 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr // todo refactor pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); - qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo); + pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1); + *qId = pQInfo->qId; + qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo); return pQInfo; _cleanup_qinfo: diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index d35d4a5ff8..7eb5cc2600 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -68,7 +68,7 @@ void freeParam(SQueryParam *param) { tfree(param->prevResult); } -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo) { +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t *qId) { assert(pQueryMsg != NULL && tsdb != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -158,7 +158,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi goto _over; } - (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql); + (*pQInfo) = createQInfoImpl(pQueryMsg, param.pGroupbyExpr, param.pExprs, param.pSecExprs, &tableGroupInfo, param.pTagColumnInfo, isSTableQuery, param.sql, qId); param.sql = NULL; param.pExprs = NULL; @@ -472,7 +472,7 @@ void qCleanupQueryMgmt(void* pQMgmt) { qDebug("vgId:%d, queryMgmt cleanup completed", vgId); } -void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { +void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo) { if (pMgmt == NULL) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; @@ -492,8 +492,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; return NULL; } else { - TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo; - void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(TSDB_CACHE_PTR_TYPE), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), + void** handle = taosCachePut(pQueryMgmt->qinfoPool, &qId, sizeof(qId), &qInfo, sizeof(TSDB_CACHE_PTR_TYPE), (getMaximumIdleDurationSec()*1000)); pthread_mutex_unlock(&pQueryMgmt->lock); diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index 91613ae351..2b87938474 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -117,6 +117,7 @@ typedef struct SSyncNode { FStartSyncFile startSyncFileFp; FStopSyncFile stopSyncFileFp; FGetVersion getVersionFp; + FResetVersion resetVersionFp; FSendFile sendFileFp; FRecvFile recvFileFp; pthread_mutex_t mutex; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 956ccdc073..264bbf6b92 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -56,7 +56,7 @@ static void syncMonitorNodeRole(void *param, void *tmrId); static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code); static int32_t syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle); static void syncRestartPeer(SSyncPeer *pPeer); -static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtyp); +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force); static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo); static void syncStartCheckPeerConn(SSyncPeer *pPeer); @@ -182,6 +182,7 @@ int64_t syncStart(const SSyncInfo *pInfo) { pNode->startSyncFileFp = pInfo->startSyncFileFp; pNode->stopSyncFileFp = pInfo->stopSyncFileFp; pNode->getVersionFp = pInfo->getVersionFp; + pNode->resetVersionFp = pInfo->resetVersionFp; pNode->sendFileFp = pInfo->sendFileFp; pNode->recvFileFp = pInfo->recvFileFp; @@ -377,24 +378,24 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype) { +int32_t syncForwardToPeer(int64_t rid, void *data, void *mhandle, int32_t qtype, bool force) { if (rid <= 0) return 0; SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return 0; - int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype); + int32_t code = syncForwardToPeerImpl(pNode, data, mhandle, qtype, force); syncReleaseNode(pNode); return code; } -void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { +void syncConfirmForward(int64_t rid, uint64_t version, int32_t code, bool force) { SSyncNode *pNode = syncAcquireNode(rid); if (pNode == NULL) return; SSyncPeer *pPeer = pNode->pMaster; - if (pPeer && pNode->quorum > 1) { + if (pPeer && (pNode->quorum > 1 || force)) { SFwdRsp rsp; syncBuildSyncFwdRsp(&rsp, pNode->vgId, version, code); @@ -1413,7 +1414,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) { syncReleaseNode(pNode); } -static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype) { +static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle, int32_t qtype, bool force) { SSyncPeer *pPeer; SSyncHead *pSyncHead; SWalHead * pWalHead = data; @@ -1457,7 +1458,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle if (pPeer == NULL || pPeer->peerFd < 0) continue; if (pPeer->role != TAOS_SYNC_ROLE_SLAVE && pPeer->sstatus != TAOS_SYNC_STATUS_CACHE) continue; - if (pNode->quorum > 1 && code == 0) { + if ((pNode->quorum > 1 || force) && code == 0) { code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle); if (code >= 0) code = 1; } diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index c0d66316cd..22d0a27581 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -238,6 +238,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) { (*pNode->stopSyncFileFp)(pNode->vgId, fversion); nodeVersion = fversion; + if (pNode->resetVersionFp) (*pNode->resetVersionFp)(pNode->vgId, fversion); sInfo("%s, start to restore wal, fver:%" PRIu64, pPeer->id, nodeVersion); uint64_t wver = 0; diff --git a/src/vnode/inc/vnodeSync.h b/src/vnode/inc/vnodeSync.h index c9ac25c227..75d7ffbabd 100644 --- a/src/vnode/inc/vnodeSync.h +++ b/src/vnode/inc/vnodeSync.h @@ -30,8 +30,9 @@ void vnodeStopSyncFile(int32_t vgId, uint64_t fversion); void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); +int32_t vnodeResetVersion(int32_t vgId, uint64_t fver); -void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code); +void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code, bool force); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index ac9536d243..c380b60f88 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -305,6 +305,7 @@ int32_t vnodeOpen(int32_t vgId) { syncInfo.startSyncFileFp = vnodeStartSyncFile; syncInfo.stopSyncFileFp = vnodeStopSyncFile; syncInfo.getVersionFp = vnodeGetVersion; + syncInfo.resetVersionFp = vnodeResetVersion; syncInfo.sendFileFp = tsdbSyncSend; syncInfo.recvFileFp = tsdbSyncRecv; syncInfo.pTsdb = pVnode->tsdb; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 8b26f4a8e2..ef68499b88 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -247,7 +247,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (contLen != 0) { qinfo_t pQInfo = NULL; - code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo); + uint64_t qId = 0; + code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; @@ -259,22 +260,22 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // current connect is broken if (code == TSDB_CODE_SUCCESS) { - handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo); + handle = qRegisterQInfo(pVnode->qMgmt, qId, (uint64_t)pQInfo); if (handle == NULL) { // failed to register qhandle pRsp->code = terrno; terrno = 0; - vError("vgId:%d, QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, + vError("vgId:%d, QInfo:%"PRIu64 "-%p register qhandle failed, return to app, code:%s", pVnode->vgId, qId, (void *)pQInfo, tstrerror(pRsp->code)); qDestroyQueryInfo(pQInfo); // destroy it directly return pRsp->code; } else { assert(*handle == pQInfo); - pRsp->qhandle = htobe64((uint64_t)pQInfo); + pRsp->qhandle = htobe64(qId); } if (handle != NULL && vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, + vError("vgId:%d, QInfo:%"PRIu64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle, pRead->rpcHandle); pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -285,7 +286,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { } if (handle != NULL) { - vTrace("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); + vTrace("vgId:%d, QInfo:%"PRIu64 "-%p, dnode query msg disposed, create qhandle and returns to app", vgId, qId, *handle); code = vnodePutItemIntoReadQueue(pVnode, handle, pRead->rpcHandle); if (code != TSDB_CODE_SUCCESS) { pRsp->code = code; @@ -349,7 +350,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { pRetrieve->free = htons(pRetrieve->free); pRetrieve->qhandle = htobe64(pRetrieve->qhandle); - vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle, + vTrace("vgId:%d, QInfo:%" PRIu64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qhandle, pRetrieve->free, pRead->rpcHandle); memset(pRet, 0, sizeof(SRspRet)); @@ -360,19 +361,19 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (handle == NULL) { code = terrno; terrno = TSDB_CODE_SUCCESS; - } else if ((*handle) != (void *)pRetrieve->qhandle) { + } else if (!checkQIdEqual(*handle, pRetrieve->qhandle)) { code = TSDB_CODE_QRY_INVALID_QHANDLE; } if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%p", pVnode->vgId, tstrerror(code), (void *)pRetrieve->qhandle); + vError("vgId:%d, invalid handle in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), pRetrieve->qhandle); vnodeBuildNoResultQueryRsp(pRet); return code; } // kill current query and free corresponding resources. if (pRetrieve->free == 1) { - vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle); + vWarn("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qhandle, *handle); qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -383,7 +384,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // register the qhandle to connect to quit query immediate if connection is broken if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pRead->rpcHandle); + vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle); code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -441,4 +442,4 @@ void vnodeWaitReadCompleted(SVnodeObj *pVnode) { vTrace("vgId:%d, queued rmsg num:%d", pVnode->vgId, pVnode->queuedRMsg); taosMsleep(10); } -} \ No newline at end of file +} diff --git a/src/vnode/src/vnodeSync.c b/src/vnode/src/vnodeSync.c index 627783c391..a45eae9b16 100644 --- a/src/vnode/src/vnodeSync.c +++ b/src/vnode/src/vnodeSync.c @@ -158,7 +158,23 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) { return code; } -void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) { - SVnodeObj *pVnode = vparam; - syncConfirmForward(pVnode->sync, version, code); +int32_t vnodeResetVersion(int32_t vgId, uint64_t fver) { + SVnodeObj *pVnode = vnodeAcquire(vgId); + if (pVnode == NULL) { + vError("vgId:%d, vnode not found while reset version", vgId); + return -1; + } + + pVnode->fversion = fver; + pVnode->version = fver; + walResetVersion(pVnode->wal, fver); + vDebug("vgId:%d, version reset to %" PRIu64, vgId, fver); + + vnodeRelease(pVnode); + return 0; } + +void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code, bool force) { + SVnodeObj *pVnode = vparam; + syncConfirmForward(pVnode->sync, version, code, force); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index a1d4f50010..4b2616c7aa 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -89,7 +89,8 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara // forward to peers, even it is WAL/FWD, it shall be called to update version in sync int32_t syncCode = 0; - syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype); + bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); + syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force); if (syncCode < 0) return syncCode; // write into WAL diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index aeb4983029..51b770d346 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -446,3 +446,16 @@ uint64_t walGetVersion(twalh param) { return pWal->version; } + +// Wal version in slave (dnode1) must be reset. +// Because after the data file is recovered from peer (dnode2), the new file version in dnode1 may become smaller than origin. +// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent, +// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur + +void walResetVersion(twalh param, uint64_t newVer) { + SWal *pWal = param; + if (pWal == 0) return; + wDebug("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->version, newVer); + + pWal->version = newVer; +} \ No newline at end of file diff --git a/tests/pytest/tools/insert-tblimit-tboffset.json b/tests/pytest/tools/insert-tblimit-tboffset.json index 5f63660347..f3d3e864ba 100644 --- a/tests/pytest/tools/insert-tblimit-tboffset.json +++ b/tests/pytest/tools/insert-tblimit-tboffset.json @@ -44,7 +44,6 @@ "childtable_offset": 33, "multi_thread_write_one_tbl": "no", "number_of_tbl_in_one_sql": 0, - "rows_per_tbl": 100, "max_sql_len": 1024000, "disorder_ratio": 0, "disorder_range": 1000, diff --git a/tests/pytest/tools/sampledata.csv b/tests/pytest/tools/sampledata.csv new file mode 100644 index 0000000000..01e79c32a8 --- /dev/null +++ b/tests/pytest/tools/sampledata.csv @@ -0,0 +1,3 @@ +1 +2 +3 diff --git a/tests/pytest/tools/taosdemo-sampledata.json b/tests/pytest/tools/taosdemo-sampledata.json new file mode 100644 index 0000000000..473c977773 --- /dev/null +++ b/tests/pytest/tools/taosdemo-sampledata.json @@ -0,0 +1,39 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 10, + "confirm_parameter_prompt": "no", + "databases": [{ + "dbinfo": { + "name": "db", + "drop": "yes" + }, + "super_tables": [{ + "name": "stb", + "child_table_exists":"no", + "childtable_count": 20, + "childtable_limit": 10, + "childtable_offset": 0, + "childtable_prefix": "t_", + "auto_create_table": "no", + "data_source": "sample", + "insert_mode": "taosc", + "insert_rate": 0, + "insert_rows": 20, + "multi_thread_write_one_tbl": "no", + "number_of_tbl_in_one_sql": 0, + "max_sql_len": 1048000, + "timestamp_step": 1000, + "start_timestamp": "2020-1-1 00:00:00", + "sample_format": "csv", + "sample_file": "./tools/sampledata.csv", + "columns": [{"type": "INT"}], + "tags": [{"type": "INT", "count":1}] + }] + }] + +} diff --git a/tests/pytest/tools/taosdemoTestLimitOffset.py b/tests/pytest/tools/taosdemoTestLimitOffset.py index 6dbe5a7028..69a81b166b 100644 --- a/tests/pytest/tools/taosdemoTestLimitOffset.py +++ b/tests/pytest/tools/taosdemoTestLimitOffset.py @@ -59,6 +59,15 @@ class TDTestCase: tdSql.query("select count(*) from db.stb") tdSql.checkData(0, 0, 33000) + os.system("%staosdemo -f tools/insert-tblimit-tboffset0.json" % binPath) + + tdSql.execute("reset query cache") + tdSql.execute("use db") + tdSql.query("select count(tbname) from db.stb") + tdSql.checkData(0, 0, 100) + tdSql.query("select count(*) from db.stb") + tdSql.checkData(0, 0, 20000) + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/tools/taosdemoTestSampleData.py b/tests/pytest/tools/taosdemoTestSampleData.py new file mode 100644 index 0000000000..893c53984d --- /dev/null +++ b/tests/pytest/tools/taosdemoTestSampleData.py @@ -0,0 +1,68 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numberOfTables = 10000 + self.numberOfRecords = 100 + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def run(self): + tdSql.prepare() + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath+ "/build/bin/" + os.system("%staosdemo -f tools/taosdemo-sampledata.json" % binPath) + + tdSql.execute("use db") + tdSql.query("select count(tbname) from db.stb") + tdSql.checkData(0, 0, 20) + tdSql.query("select count(*) from db.stb") + tdSql.checkData(0, 0, 200) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index c2ce5df12e..f05474d158 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -1,112 +1,58 @@ run general/parser/alter.sim -sleep 100 run general/parser/alter1.sim -sleep 100 run general/parser/alter_stable.sim -sleep 100 run general/parser/auto_create_tb.sim -sleep 100 run general/parser/auto_create_tb_drop_tb.sim -sleep 100 run general/parser/col_arithmetic_operation.sim -sleep 100 run general/parser/columnValue.sim -sleep 100 run general/parser/commit.sim -sleep 100 run general/parser/create_db.sim -sleep 100 run general/parser/create_mt.sim -sleep 100 run general/parser/create_tb.sim -sleep 100 run general/parser/dbtbnameValidate.sim -sleep 100 run general/parser/fill.sim -sleep 100 run general/parser/fill_stb.sim -sleep 100 #run general/parser/fill_us.sim # -sleep 100 run general/parser/first_last.sim -sleep 100 run general/parser/import_commit1.sim -sleep 100 run general/parser/import_commit2.sim -sleep 100 run general/parser/import_commit3.sim -sleep 100 #run general/parser/import_file.sim -sleep 100 run general/parser/insert_tb.sim -sleep 100 run general/parser/tags_dynamically_specifiy.sim -sleep 100 run general/parser/interp.sim -sleep 100 run general/parser/lastrow.sim -sleep 100 run general/parser/limit.sim -sleep 100 run general/parser/limit1.sim -sleep 100 run general/parser/limit1_tblocks100.sim -sleep 100 run general/parser/limit2.sim -sleep 100 run general/parser/mixed_blocks.sim -sleep 100 run general/parser/nchar.sim -sleep 100 run general/parser/null_char.sim -sleep 100 run general/parser/selectResNum.sim -sleep 100 run general/parser/select_across_vnodes.sim -sleep 100 run general/parser/select_from_cache_disk.sim -sleep 100 run general/parser/set_tag_vals.sim -sleep 100 run general/parser/single_row_in_tb.sim -sleep 100 run general/parser/slimit.sim -sleep 100 run general/parser/slimit1.sim -sleep 100 run general/parser/slimit_alter_tags.sim -sleep 100 run general/parser/tbnameIn.sim -sleep 100 run general/parser/slimit_alter_tags.sim # persistent failed -sleep 100 run general/parser/join.sim -sleep 100 run general/parser/join_multivnode.sim -sleep 100 +run general/parser/join_manyblocks.sim run general/parser/projection_limit_offset.sim -sleep 100 run general/parser/select_with_tags.sim -sleep 100 run general/parser/groupby.sim -sleep 100 run general/parser/tags_filter.sim -sleep 100 run general/parser/topbot.sim -sleep 100 run general/parser/union.sim -sleep 100 run general/parser/constCol.sim -sleep 100 run general/parser/where.sim -sleep 100 run general/parser/timestamp.sim -sleep 100 run general/parser/sliding.sim -sleep 100 run general/parser/function.sim -sleep 100 run general/parser/stableOp.sim -sleep 100 run general/parser/slimit_alter_tags.sim diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index ace1ca6f5b..157c41ce58 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -350,5 +350,13 @@ if $rows != 0 then return -1 endi +print ==========================>td-3318 +sql create table tu(ts timestamp, k int, b binary(12)) +sql insert into tu values(now, 1, 'abc') +sql select stddev(k) from tu where b <>'abc' interval(1s) +if $rows != 0 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/issue/TD-3300.sim b/tests/script/issue/TD-3300.sim new file mode 100644 index 0000000000..0745ceb849 --- /dev/null +++ b/tests/script/issue/TD-3300.sim @@ -0,0 +1,556 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 +system sh/deploy.sh -n dnode4 -i 4 + +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1 +system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1 + +system sh/cfg.sh -n dnode1 -c role -v 1 +system sh/cfg.sh -n dnode2 -c role -v 2 +system sh/cfg.sh -n dnode3 -c role -v 2 +system sh/cfg.sh -n dnode4 -c role -v 2 + +system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator +system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator + +print ============== step0: start tarbitrator +system sh/exec_tarbitrator.sh -s start + +print ============== step1: start dnode1, only deploy mnode +system sh/exec.sh -n dnode1 -s start +sql connect + +print ============== step2: start dnode2/dnode3 +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start +sql create dnode $hostname2 +sql create dnode $hostname3 + +$x = 0 +step2: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql show dnodes +print dnode1 $data4_1 +print dnode2 $data4_2 +print dnode3 $data4_3 + +if $data4_1 != ready then + goto step2 +endi +if $data4_2 != ready then + goto step2 +endi +if $data4_3 != ready then + goto step2 +endi + +sleep 1000 + +print ============== step3 +sql create database db replica 2 +sql use db + +sql create table stb (ts timestamp, c1 int, c2 int) tags(t1 int) +sql create table t1 using stb tags(1) +sql insert into t1 values(1577980800000, 1, 5) +sql insert into t1 values(1577980800001, 2, 4) +sql insert into t1 values(1577980800002, 3, 3) +sql insert into t1 values(1577980800003, 4, 2) +sql insert into t1 values(1577980800004, 5, 1) + +sql show db.vgroups +if $data04 != 3 then + return -1 +endi +if $data06 != 2 then + return -1 +endi +if $data05 != master then + return -1 +endi +if $data07 != slave then + return -1 +endi + +sql select * from t1 +if $rows != 5 then + return -1 +endi + +system sh/exec.sh -n dnode2 -s stop -x SIGKILL +system sh/exec.sh -n dnode3 -s stop -x SIGKILL + +print ============== step4 +system sh/exec.sh -n dnode2 -s start +system sh/exec.sh -n dnode3 -s start + +$x = 0 +step4: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql show dnodes +print dnode1 $data4_1 +print dnode2 $data4_2 +print dnode3 $data4_3 + +if $data4_1 != ready then + goto step4 +endi +if $data4_2 != ready then + goto step4 +endi +if $data4_3 != ready then + goto step4 +endi + +sql show db.vgroups +if $data04 != 3 then + goto step4 +endi +if $data06 != 2 then + goto step4 +endi +if $data05 != master then + goto step4 +endi +if $data07 != slave then + goto step4 +endi + +sql create table t2 using stb tags(1) +sql insert into t2 values(1577980800000, 1, 5) +sql insert into t2 values(1577980800001, 2, 4) +sql insert into t2 values(1577980800002, 3, 3) +sql insert into t2 values(1577980800003, 4, 2) +sql insert into t2 values(1577980800004, 5, 1) + +sql select * from t2 +if $rows != 5 then + return -1 +endi + +print ============== step5 +system sh/exec.sh -n dnode3 -s stop -x SIGKILL + +$x = 0 +step5: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql show dnodes +print dnode1 $data4_1 +print dnode2 $data4_2 +print dnode3 $data4_3 + +if $data4_1 != ready then + goto step5 +endi +if $data4_2 != ready then + goto step5 +endi +if $data4_3 != offline then + goto step5 +endi + +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi + +sql show db.vgroups +if $data04 != 3 then + goto step5 +endi +if $data06 != 2 then + goto step5 +endi +if $data05 != offline then + goto step5 +endi +if $data07 != master then + goto step5 +endi + +print ============== step6 +sql create table t3 using stb tags(1) +sql insert into t3 values(1577980800000, 1, 5) +sql insert into t3 values(1577980800001, 2, 4) +sql insert into t3 values(1577980800002, 3, 3) +sql insert into t3 values(1577980800003, 4, 2) +sql insert into t3 values(1577980800004, 5, 1) +sql insert into t3 values(1577980800010, 11, 5) +sql insert into t3 values(1577980800011, 12, 4) +sql insert into t3 values(1577980800012, 13, 3) +sql insert into t3 values(1577980800013, 14, 2) +sql insert into t3 values(1577980800014, 15, 1) + +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi +sql select * from t3 +if $rows != 10 then + return -1 +endi + +system sh/exec.sh -n dnode3 -s start + +$x = 0 +step6: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql show dnodes +print dnode1 $data4_1 +print dnode2 $data4_2 +print dnode3 $data4_3 + +if $data4_1 != ready then + goto step6 +endi +if $data4_2 != ready then + goto step6 +endi +if $data4_3 != ready then + goto step6 +endi + +sql show db.vgroups +if $data04 != 3 then + goto step6 +endi +if $data06 != 2 then + goto step6 +endi +if $data05 != slave then + goto step6 +endi +if $data07 != master then + goto step6 +endi + +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi +sql select * from t3 +if $rows != 10 then + return -1 +endi + +print ============== step7 +sql create table t4 using stb tags(1) +sql insert into t4 values(1577980800000, 1, 5) +sql insert into t4 values(1577980800001, 2, 4) +sql insert into t4 values(1577980800002, 3, 3) +sql insert into t4 values(1577980800003, 4, 2) +sql insert into t4 values(1577980800004, 5, 1) +sql insert into t4 values(1577980800010, 11, 5) +sql insert into t4 values(1577980800011, 12, 4) +sql insert into t4 values(1577980800012, 13, 3) +sql insert into t4 values(1577980800013, 14, 2) +sql insert into t4 values(1577980800014, 15, 1) +sql insert into t4 values(1577980800020, 21, 5) +sql insert into t4 values(1577980800021, 22, 4) +sql insert into t4 values(1577980800022, 23, 3) +sql insert into t4 values(1577980800023, 24, 2) +sql insert into t4 values(1577980800024, 25, 1) + +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi +sql select * from t3 +if $rows != 10 then + return -1 +endi +sql select * from t4 +if $rows != 15 then + return -1 +endi + +system sh/exec.sh -n dnode2 -s stop -x SIGKILL +$x = 0 +step7: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql show dnodes +print dnode1 $data4_1 +print dnode2 $data4_2 +print dnode3 $data4_3 + +if $data4_1 != ready then + goto step7 +endi +if $data4_2 != offline then + goto step7 +endi +if $data4_3 != ready then + goto step7 +endi + +sql show db.vgroups +if $data04 != 3 then + goto step7 +endi +if $data06 != 2 then + goto step7 +endi +if $data05 != master then + goto step7 +endi +if $data07 != offline then + goto step7 +endi + +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi +sql select * from t3 +if $rows != 10 then + return -1 +endi +sql select * from t4 +if $rows != 15 then + return -1 +endi + +print ============== step8 +sql create table t5 using stb tags(1) +sql insert into t5 values(1577980800000, 1, 5) +sql insert into t5 values(1577980800001, 2, 4) +sql insert into t5 values(1577980800002, 3, 3) +sql insert into t5 values(1577980800003, 4, 2) +sql insert into t5 values(1577980800004, 5, 1) +sql insert into t5 values(1577980800010, 11, 5) + +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi +sql select * from t3 +if $rows != 10 then + return -1 +endi +sql select * from t4 +if $rows != 15 then + return -1 +endi +sql select * from t5 +if $rows != 6 then + return -1 +endi + +system sh/exec.sh -n dnode2 -s start +$x = 0 +step8: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql show dnodes +print dnode1 $data4_1 +print dnode2 $data4_2 +print dnode3 $data4_3 + +if $data4_1 != ready then + goto step8 +endi +if $data4_2 != ready then + goto step8 +endi +if $data4_3 != ready then + goto step8 +endi + +sql show db.vgroups +if $data04 != 3 then + goto step8 +endi +if $data06 != 2 then + goto step8 +endi +if $data05 != master then + goto step8 +endi +if $data07 != slave then + goto step8 +endi + +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi +sql select * from t3 +if $rows != 10 then + return -1 +endi +sql select * from t4 +if $rows != 15 then + return -1 +endi +sql select * from t5 +if $rows != 6 then + return -1 +endi + +print ============== step9 +sql create table t6 using stb tags(1) +sql insert into t6 values(1577980800000, 1, 5) +sql insert into t6 values(1577980800001, 2, 4) +sql insert into t6 values(1577980800002, 3, 3) +sql insert into t6 values(1577980800003, 4, 2) +sql insert into t6 values(1577980800004, 5, 1) +sql insert into t6 values(1577980800010, 11, 5) +sql insert into t6 values(1577980800011, 12, 4) + +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi +sql select * from t3 +if $rows != 10 then + return -1 +endi +sql select * from t4 +if $rows != 15 then + return -1 +endi +sql select * from t5 +if $rows != 6 then + return -1 +endi +sql select * from t6 +if $rows != 7 then + return -1 +endi + +system sh/exec.sh -n dnode3 -s stop -x SIGKILL +$x = 0 +step9: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql show dnodes +print dnode1 $data4_1 +print dnode2 $data4_2 +print dnode3 $data4_3 + +if $data4_1 != ready then + goto step9 +endi +if $data4_2 != ready then + goto step9 +endi +if $data4_3 != offline then + goto step9 +endi + +print ============== 2 +sql show db.vgroups + +if $data04 != 3 then + goto step7 +endi +if $data06 != 2 then + goto step7 +endi +if $data05 != offline then + goto step7 +endi +if $data07 != master then + goto step7 +endi + +print ============== 3 +sql select * from t1 +if $rows != 5 then + return -1 +endi +sql select * from t2 +if $rows != 5 then + return -1 +endi +sql select * from t3 +if $rows != 10 then + return -1 +endi +sql select * from t4 +if $rows != 15 then + return -1 +endi +sql select * from t5 +if $rows != 6 then + return -1 +endi +sql select * from t6 +if $rows != 7 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop +system sh/exec.sh -n dnode2 -s stop +system sh/exec.sh -n dnode3 -s stop