From 4cb5bedfaeaeeec84a79e7a72e1a279f4f9111f6 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Tue, 18 May 2021 13:48:45 +0800 Subject: [PATCH 1/4] [TD-4232]: fix vnode queueWMsg & queuedWMsgSize counting under flow ctrl --- src/vnode/src/vnodeWrite.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 56ea32ccc0..16089c8e91 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -340,8 +340,11 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) { if (pWrite->processedCount >= 100) { vError("vgId:%d, msg:%p, failed to process since %s, retry:%d", pVnode->vgId, pWrite, tstrerror(code), pWrite->processedCount); - pWrite->processedCount = 1; - dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code); + void *handle = pWrite->rpcMsg.handle; + taosFreeQitem(pWrite); + vnodeRelease(pVnode); + SRpcMsg rpcRsp = {.handle = handle, .code = code}; + rpcSendResponse(&rpcRsp); } else { code = vnodePerformFlowCtrl(pWrite); if (code == 0) { From 6d448dcac1e522e3496c133d7d6d2a72dc2b7d5f Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 18 May 2021 16:48:17 +0800 Subject: [PATCH 2/4] Hotfix/sangshuduo/td 4136 taosdemo records morethan 32767 (#6147) * [TD-4136]: taosdemo max records per req < 32767 * [TD-4136]: taosdemo check insert rows not more than 32767. check insert rows for progressive. * fix with answer_yes. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 48 ++++++++++++---------- tests/pytest/tools/taosdemoTestWithJson.py | 22 +++++----- 2 files changed, 37 insertions(+), 33 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 12262f7a32..f6ac4a75ba 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -516,6 +516,8 @@ static int taosRandom() #endif // ifdef Windows +static void prompt(); +static void prompt2(); static int createDatabasesAndStables(); static void createChildTables(); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); @@ -1031,10 +1033,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { printf("# Print debug info: %d\n", arguments->debug_print); printf("# Print verbose info: %d\n", arguments->verbose_print); printf("###################################################################\n"); - if (!arguments->answer_yes) { - printf("Press enter key to continue\n\n"); - (void) getchar(); - } + + prompt(); } } @@ -3410,10 +3410,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_args.interlace_rows, g_args.num_of_RPR); printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n", g_args.num_of_RPR); - if (!g_args.answer_yes) { - printf(" press Enter key to continue or Ctrl-C to stop."); - (void)getchar(); - } + prompt2(); g_args.interlace_rows = g_args.num_of_RPR; } } else if (!interlaceRows) { @@ -3470,9 +3467,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_args.answer_yes = false; } } else if (!answerPrompt) { - g_args.answer_yes = false; + g_args.answer_yes = true; // default is no, mean answer_yes. } else { - printf("ERROR: failed to read json, confirm_parameter_prompt not found\n"); + errorPrint("%s", "failed to read json, confirm_parameter_prompt input mistake\n"); goto PARSE_OVER; } @@ -5342,6 +5339,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { int64_t start_time = pThreadInfo->start_time; int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); for (uint64_t i = 0; i < insertRows;) { @@ -6022,6 +6020,21 @@ static void *readMetric(void *sarg) { return NULL; } +static void prompt() +{ + if (!g_args.answer_yes) { + printf("Press enter key to continue\n\n"); + (void)getchar(); + } +} + +static void prompt2() +{ + if (!g_args.answer_yes) { + printf(" press Enter key to continue or Ctrl-C to stop."); + (void)getchar(); + } +} static int insertTestProcess() { @@ -6042,10 +6055,7 @@ static int insertTestProcess() { if (g_fpOfInsertResult) printfInsertMetaToFile(g_fpOfInsertResult); - if (!g_args.answer_yes) { - printf("Press enter key to continue\n\n"); - (void)getchar(); - } + prompt(); init_rand_data(); @@ -6317,10 +6327,7 @@ static int queryTestProcess() { &g_queryInfo.superQueryInfo.childTblCount); } - if (!g_args.answer_yes) { - printf("Press enter key to continue\n\n"); - (void)getchar(); - } + prompt(); if (g_args.debug_print || g_args.verbose_print) { printfQuerySystemInfo(taos); @@ -6673,10 +6680,7 @@ static int subscribeTestProcess() { printfQueryMeta(); resetAfterAnsiEscape(); - if (!g_args.answer_yes) { - printf("Press enter key to continue\n\n"); - (void) getchar(); - } + prompt(); TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host, diff --git a/tests/pytest/tools/taosdemoTestWithJson.py b/tests/pytest/tools/taosdemoTestWithJson.py index f57af9ce5c..b2ecd54976 100644 --- a/tests/pytest/tools/taosdemoTestWithJson.py +++ b/tests/pytest/tools/taosdemoTestWithJson.py @@ -23,32 +23,32 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug("start to execute %s" % __file__) tdSql.init(conn.cursor(), logSql) - + def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) - if ("community" in selfPath): - projPath = selfPath[:selfPath.find("community")] + if "community" in selfPath: + projPath = selfPath[: selfPath.find("community")] else: - projPath = selfPath[:selfPath.find("tests")] + projPath = selfPath[: selfPath.find("tests")] for root, dirs, files in os.walk(projPath): - if ("taosd" in files): + if "taosd" in files: rootRealPath = os.path.dirname(os.path.realpath(root)) - if ("packaging" not in rootRealPath): - buildPath = root[:len(root)-len("/build/bin")] + if "packaging" not in rootRealPath: + buildPath = root[: len(root) - len("/build/bin")] break return buildPath - + def run(self): tdSql.prepare() buildPath = self.getBuildPath() - if (buildPath == ""): + if buildPath == "": tdLog.exit("taosd not found!") else: tdLog.info("taosd found in %s" % buildPath) - binPath = buildPath+ "/build/bin/" - os.system("yes | %staosdemo -f tools/insert.json" % binPath) + binPath = buildPath + "/build/bin/" + os.system("%staosdemo -f tools/insert.json -y" % binPath) tdSql.execute("use db01") tdSql.query("select count(*) from stb01") From 4412c52f27bca02a0b645f24858d94010d1d1039 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 18 May 2021 21:36:07 +0800 Subject: [PATCH 3/4] Hotfix/sangshuduo/td 4187 taosdemo keepprogress (#6149) * [TD-4187]: taosdemo keepProgress. * add consumed. * [TD-4187]: taosdemo keepProgress. add resubAfterConsume process. * print resub msg for test. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/subscribe.json | 40 +++-- src/kit/taosdemo/taosdemo.c | 281 +++++++++++++++++++++++--------- 2 files changed, 234 insertions(+), 87 deletions(-) diff --git a/src/kit/taosdemo/subscribe.json b/src/kit/taosdemo/subscribe.json index fd33a2e2e2..9faf03a03d 100644 --- a/src/kit/taosdemo/subscribe.json +++ b/src/kit/taosdemo/subscribe.json @@ -1,17 +1,37 @@ { - "filetype":"subscribe", + "filetype": "subscribe", "cfgdir": "/etc/taos", "host": "127.0.0.1", "port": 6030, "user": "root", "password": "taosdata", - "databases": "dbx", - "specified_table_query": - {"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes", - "sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}] - }, - "super_table_query": - {"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes", - "sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}] - } + "databases": "test", + "specified_table_query": { + "concurrent": 1, + "mode": "sync", + "interval": 1000, + "restart": "yes", + "keepProgress": "yes", + "resubAfterConsume": 10, + "sqls": [ + { + "sql": "select avg(col1) from meters where col1 > 1;", + "result": "./subscribe_res0.txt" + } + ] + }, + "super_table_query": { + "stblname": "meters", + "threads": 1, + "mode": "sync", + "interval": 1000, + "restart": "yes", + "keepProgress": "yes", + "sqls": [ + { + "sql": "select col1 from xxxx where col1 > 10;", + "result": "./subscribe_res1.txt" + } + ] + } } diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index f6ac4a75ba..08c1142070 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -227,7 +227,7 @@ typedef struct SColumn_S { char field[TSDB_COL_NAME_LEN + 1]; char dataType[MAX_TB_NAME_SIZE]; uint32_t dataLen; - char note[128]; + char note[128]; } StrColumn; typedef struct SSuperTable_S { @@ -360,10 +360,11 @@ typedef struct SpecifiedQueryInfo_S { uint32_t asyncMode; // 0: sync, 1: async uint64_t subscribeInterval; // ms uint64_t queryTimes; - int subscribeRestart; + bool subscribeRestart; int subscribeKeepProgress; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; + int resubAfterConsume[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; uint64_t totalQueried; } SpecifiedQueryInfo; @@ -374,7 +375,7 @@ typedef struct SuperQueryInfo_S { uint32_t threadCnt; uint32_t asyncMode; // 0: sync, 1: async uint64_t subscribeInterval; // ms - int subscribeRestart; + bool subscribeRestart; int subscribeKeepProgress; uint64_t queryTimes; int64_t childTblCount; @@ -382,6 +383,7 @@ typedef struct SuperQueryInfo_S { uint64_t sqlCount; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; + int resubAfterConsume[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; char* childTblName; @@ -399,7 +401,7 @@ typedef struct SQueryMetaInfo_S { char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest SpecifiedQueryInfo specifiedQueryInfo; - SuperQueryInfo superQueryInfo; + SuperQueryInfo superQueryInfo; uint64_t totalQueried; } SQueryMetaInfo; @@ -521,8 +523,8 @@ static void prompt2(); static int createDatabasesAndStables(); static void createChildTables(); static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet); -static int postProceSql(char *host, struct sockaddr_in *pServAddr, uint16_t port, - char* sqlstr, char *resultFile); +static int postProceSql(char *host, struct sockaddr_in *pServAddr, + uint16_t port, char* sqlstr, char *resultFile); /* ************ Global variables ************ */ @@ -1141,12 +1143,14 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { totalLen += len; } - verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", __func__, __LINE__, databuf, resultFile); + verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n", + __func__, __LINE__, databuf, resultFile); appendResultBufToFile(databuf, resultFile); free(databuf); } -static void selectAndGetResult(threadInfo *pThreadInfo, char *command, char* resultFile) +static void selectAndGetResult( + threadInfo *pThreadInfo, char *command, char* resultFile) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) { TAOS_RES *res = taos_query(pThreadInfo->taos, command); @@ -1291,13 +1295,15 @@ static void init_rand_data() { static int printfInsertMeta() { SHOW_PARSE_RESULT_START(); - printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); + printf("host: \033[33m%s:%u\033[0m\n", + g_Dbs.host, g_Dbs.port); printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("password: \033[33m%s\033[0m\n", g_Dbs.password); printf("configDir: \033[33m%s\033[0m\n", configDir); printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount); - printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); + printf("thread num of create table: \033[33m%d\033[0m\n", + g_Dbs.threadCountByCreateTbl); printf("top insert interval: \033[33m%"PRIu64"\033[0m\n", g_args.insert_interval); printf("number of records per req: \033[33m%"PRIu64"\033[0m\n", @@ -1309,7 +1315,8 @@ static int printfInsertMeta() { for (int i = 0; i < g_Dbs.dbCount; i++) { printf("database[\033[33m%d\033[0m]:\n", i); - printf(" database[%d] name: \033[33m%s\033[0m\n", i, g_Dbs.db[i].dbName); + printf(" database[%d] name: \033[33m%s\033[0m\n", + i, g_Dbs.db[i].dbName); if (0 == g_Dbs.db[i].drop) { printf(" drop: \033[33mno\033[0m\n"); } else { @@ -1317,40 +1324,51 @@ static int printfInsertMeta() { } if (g_Dbs.db[i].dbCfg.blocks > 0) { - printf(" blocks: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.blocks); + printf(" blocks: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.blocks); } if (g_Dbs.db[i].dbCfg.cache > 0) { - printf(" cache: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.cache); + printf(" cache: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.cache); } if (g_Dbs.db[i].dbCfg.days > 0) { - printf(" days: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.days); + printf(" days: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.days); } if (g_Dbs.db[i].dbCfg.keep > 0) { - printf(" keep: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.keep); + printf(" keep: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.keep); } if (g_Dbs.db[i].dbCfg.replica > 0) { - printf(" replica: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.replica); + printf(" replica: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.replica); } if (g_Dbs.db[i].dbCfg.update > 0) { - printf(" update: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.update); + printf(" update: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.update); } if (g_Dbs.db[i].dbCfg.minRows > 0) { - printf(" minRows: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.minRows); + printf(" minRows: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.minRows); } if (g_Dbs.db[i].dbCfg.maxRows > 0) { - printf(" maxRows: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.maxRows); + printf(" maxRows: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.maxRows); } if (g_Dbs.db[i].dbCfg.comp > 0) { printf(" comp: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.comp); } if (g_Dbs.db[i].dbCfg.walLevel > 0) { - printf(" walLevel: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.walLevel); + printf(" walLevel: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.walLevel); } if (g_Dbs.db[i].dbCfg.fsync > 0) { - printf(" fsync: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.fsync); + printf(" fsync: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.fsync); } if (g_Dbs.db[i].dbCfg.quorum > 0) { - printf(" quorum: \033[33m%d\033[0m\n", g_Dbs.db[i].dbCfg.quorum); + printf(" quorum: \033[33m%d\033[0m\n", + g_Dbs.db[i].dbCfg.quorum); } if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) @@ -1543,21 +1561,26 @@ static void printfInsertMetaToFile(FILE* fp) { if (g_Dbs.db[i].dbCfg.precision[0] != 0) { if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", 2)) || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", 2))) { - fprintf(fp, " precision: %s\n", g_Dbs.db[i].dbCfg.precision); + fprintf(fp, " precision: %s\n", + g_Dbs.db[i].dbCfg.precision); } else { - fprintf(fp, " precision error: %s\n", g_Dbs.db[i].dbCfg.precision); + fprintf(fp, " precision error: %s\n", + g_Dbs.db[i].dbCfg.precision); } } - fprintf(fp, " super table count: %"PRIu64"\n", g_Dbs.db[i].superTblCount); + fprintf(fp, " super table count: %"PRIu64"\n", + g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { fprintf(fp, " super table[%d]:\n", j); - fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName); + fprintf(fp, " stbName: %s\n", + g_Dbs.db[i].superTbls[j].sTblName); if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { fprintf(fp, " autoCreateTable: %s\n", "no"); - } else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { + } else if (AUTO_CREATE_SUBTBL + == g_Dbs.db[i].superTbls[j].autoCreateTable) { fprintf(fp, " autoCreateTable: %s\n", "yes"); } else { fprintf(fp, " autoCreateTable: %s\n", "error"); @@ -1565,7 +1588,8 @@ static void printfInsertMetaToFile(FILE* fp) { if (TBL_NO_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { fprintf(fp, " childTblExists: %s\n", "no"); - } else if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) { + } else if (TBL_ALREADY_EXISTS + == g_Dbs.db[i].superTbls[j].childTblExists) { fprintf(fp, " childTblExists: %s\n", "yes"); } else { fprintf(fp, " childTblExists: %s\n", "error"); @@ -1596,8 +1620,10 @@ static void printfInsertMetaToFile(FILE* fp) { */ fprintf(fp, " interlaceRows: %"PRIu64"\n", g_Dbs.db[i].superTbls[j].interlaceRows); - fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); - fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio); + fprintf(fp, " disorderRange: %d\n", + g_Dbs.db[i].superTbls[j].disorderRange); + fprintf(fp, " disorderRatio: %d\n", + g_Dbs.db[i].superTbls[j].disorderRatio); fprintf(fp, " maxSqlLen: %"PRIu64"\n", g_Dbs.db[i].superTbls[j].maxSqlLen); @@ -1605,23 +1631,29 @@ static void printfInsertMetaToFile(FILE* fp) { g_Dbs.db[i].superTbls[j].timeStampStep); fprintf(fp, " startTimestamp: %s\n", g_Dbs.db[i].superTbls[j].startTimestamp); - fprintf(fp, " sampleFormat: %s\n", g_Dbs.db[i].superTbls[j].sampleFormat); - fprintf(fp, " sampleFile: %s\n", g_Dbs.db[i].superTbls[j].sampleFile); - fprintf(fp, " tagsFile: %s\n", g_Dbs.db[i].superTbls[j].tagsFile); + fprintf(fp, " sampleFormat: %s\n", + g_Dbs.db[i].superTbls[j].sampleFormat); + fprintf(fp, " sampleFile: %s\n", + g_Dbs.db[i].superTbls[j].sampleFile); + fprintf(fp, " tagsFile: %s\n", + g_Dbs.db[i].superTbls[j].tagsFile); - fprintf(fp, " columnCount: %d\n ", g_Dbs.db[i].superTbls[j].columnCount); + fprintf(fp, " columnCount: %d\n ", + g_Dbs.db[i].superTbls[j].columnCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); if ((0 == strncasecmp( g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", strlen("binary"))) - || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, + || (0 == strncasecmp( + g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", strlen("nchar")))) { fprintf(fp, "column[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); } else { - fprintf(fp, "column[%d]:%s ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType); + fprintf(fp, "column[%d]:%s ", + k, g_Dbs.db[i].superTbls[j].columns[k].dataType); } } fprintf(fp, "\n"); @@ -1634,7 +1666,8 @@ static void printfInsertMetaToFile(FILE* fp) { "binary", strlen("binary"))) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", strlen("nchar")))) { - fprintf(fp, "tag[%d]:%s(%d) ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType, + fprintf(fp, "tag[%d]:%s(%d) ", + k, g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); } else { fprintf(fp, "tag[%d]:%s ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType); @@ -4128,7 +4161,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { "query_times"); if (specifiedQueryTimes && specifiedQueryTimes->type == cJSON_Number) { if (specifiedQueryTimes->valueint <= 0) { - errorPrint("%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", + errorPrint( + "%s() LN%d, failed to read json, query_times: %"PRId64", need be a valid (>0) number\n", __func__, __LINE__, specifiedQueryTimes->valueint); goto PARSE_OVER; @@ -4145,7 +4179,8 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON* concurrent = cJSON_GetObjectItem(specifiedQuery, "concurrent"); if (concurrent && concurrent->type == cJSON_Number) { if (concurrent->valueint <= 0) { - errorPrint("%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n", + errorPrint( + "%s() LN%d, query sqlCount %"PRIu64" or concurrent %"PRIu64" is not correct.\n", __func__, __LINE__, g_queryInfo.specifiedQueryInfo.sqlCount, g_queryInfo.specifiedQueryInfo.concurrent); @@ -4184,15 +4219,15 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { cJSON* restart = cJSON_GetObjectItem(specifiedQuery, "restart"); if (restart && restart->type == cJSON_String && restart->valuestring != NULL) { if (0 == strcmp("yes", restart->valuestring)) { - g_queryInfo.specifiedQueryInfo.subscribeRestart = 1; + g_queryInfo.specifiedQueryInfo.subscribeRestart = true; } else if (0 == strcmp("no", restart->valuestring)) { - g_queryInfo.specifiedQueryInfo.subscribeRestart = 0; + g_queryInfo.specifiedQueryInfo.subscribeRestart = false; } else { printf("ERROR: failed to read json, subscribe restart error\n"); goto PARSE_OVER; } } else { - g_queryInfo.specifiedQueryInfo.subscribeRestart = 1; + g_queryInfo.specifiedQueryInfo.subscribeRestart = true; } cJSON* keepProgress = cJSON_GetObjectItem(specifiedQuery, "keepProgress"); @@ -4237,13 +4272,29 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { printf("ERROR: failed to read json, sql not found\n"); goto PARSE_OVER; } - tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], + sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + + cJSON* resubAfterConsume = + cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); + if (resubAfterConsume + && resubAfterConsume->type == cJSON_Number) { + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] + = resubAfterConsume->valueint; + } else if (!resubAfterConsume) { + //printf("failed to read json, subscribe interval no found\n"); + //goto PARSE_OVER; + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1; + } cJSON *result = cJSON_GetObjectItem(sql, "result"); - if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) { - tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); + if ((NULL != result) && (result->type == cJSON_String) + && (result->valuestring != NULL)) { + tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], + result->valuestring, MAX_FILE_NAME_LEN); } else if (NULL == result) { - memset(g_queryInfo.specifiedQueryInfo.result[j], 0, MAX_FILE_NAME_LEN); + memset(g_queryInfo.specifiedQueryInfo.result[j], + 0, MAX_FILE_NAME_LEN); } else { printf("ERROR: failed to read json, super query result file not found\n"); goto PARSE_OVER; @@ -4350,27 +4401,27 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { if (subrestart && subrestart->type == cJSON_String && subrestart->valuestring != NULL) { if (0 == strcmp("yes", subrestart->valuestring)) { - g_queryInfo.superQueryInfo.subscribeRestart = 1; + g_queryInfo.superQueryInfo.subscribeRestart = true; } else if (0 == strcmp("no", subrestart->valuestring)) { - g_queryInfo.superQueryInfo.subscribeRestart = 0; + g_queryInfo.superQueryInfo.subscribeRestart = false; } else { printf("ERROR: failed to read json, subscribe restart error\n"); goto PARSE_OVER; } } else { - g_queryInfo.superQueryInfo.subscribeRestart = 1; + g_queryInfo.superQueryInfo.subscribeRestart = true; } - cJSON* subkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); - if (subkeepProgress && - subkeepProgress->type == cJSON_String - && subkeepProgress->valuestring != NULL) { - if (0 == strcmp("yes", subkeepProgress->valuestring)) { + cJSON* superkeepProgress = cJSON_GetObjectItem(superQuery, "keepProgress"); + if (superkeepProgress && + superkeepProgress->type == cJSON_String + && superkeepProgress->valuestring != NULL) { + if (0 == strcmp("yes", superkeepProgress->valuestring)) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 1; - } else if (0 == strcmp("no", subkeepProgress->valuestring)) { + } else if (0 == strcmp("no", superkeepProgress->valuestring)) { g_queryInfo.superQueryInfo.subscribeKeepProgress = 0; } else { - printf("ERROR: failed to read json, subscribe keepProgress error\n"); + printf("ERROR: failed to read json, subscribe super table keepProgress error\n"); goto PARSE_OVER; } } else { @@ -4408,6 +4459,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + cJSON* superResubAfterConsume = + cJSON_GetObjectItem(sql, "resubAfterConsume"); + if (superResubAfterConsume + && superResubAfterConsume->type == cJSON_Number) { + g_queryInfo.superQueryInfo.resubAfterConsume[j] = + superResubAfterConsume->valueint; + } else if (!superResubAfterConsume) { + //printf("failed to read json, subscribe interval no found\n"); + //goto PARSE_OVER; + g_queryInfo.superQueryInfo.resubAfterConsume[j] = 1; + } + cJSON *result = cJSON_GetObjectItem(sql, "result"); if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){ @@ -5506,7 +5569,8 @@ static void callBack(void *param, TAOS_RES *res, int code) { int rand_num = taosRandom() % 100; if (0 != pThreadInfo->superTblInfo->disorderRatio && rand_num < pThreadInfo->superTblInfo->disorderRatio) { - int64_t d = pThreadInfo->lastTs - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); + int64_t d = pThreadInfo->lastTs + - (taosRandom() % pThreadInfo->superTblInfo->disorderRange + 1); generateRowData(data, d, pThreadInfo->superTblInfo); } else { generateRowData(data, pThreadInfo->lastTs += 1000, pThreadInfo->superTblInfo); @@ -6480,17 +6544,18 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c } static TAOS_SUB* subscribeImpl( - TAOS *taos, char *sql, char* topic, char* resultFileName) { + TAOS *taos, char *sql, char* topic, bool restart, + char* resultFileName) { TAOS_SUB* tsub = NULL; if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { tsub = taos_subscribe(taos, - g_queryInfo.specifiedQueryInfo.subscribeRestart, + restart, topic, sql, subscribe_callback, (void*)resultFileName, g_queryInfo.specifiedQueryInfo.subscribeInterval); } else { tsub = taos_subscribe(taos, - g_queryInfo.specifiedQueryInfo.subscribeRestart, + restart, topic, sql, NULL, NULL, 0); } @@ -6551,7 +6616,9 @@ static void *superSubscribe(void *sarg) { uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", __func__, __LINE__, subSeq, subSqlstr); - tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile); + tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + tmpFile); if (NULL == tsub[subSeq]) { taos_close(pThreadInfo->taos); return NULL; @@ -6559,17 +6626,22 @@ static void *superSubscribe(void *sarg) { } } + int consumed[MAX_QUERY_SQL_COUNT]; + for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) + consumed[i] = 0; + // start loop to consume result TAOS_RES* res = NULL; while(1) { - for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { + for (uint64_t i = pThreadInfo->start_table_from; + i <= pThreadInfo->end_table_to; i++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { continue; } uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - taosMsleep(100); // ms + taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms res = taos_consume(tsub[subSeq]); if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; @@ -6579,6 +6651,28 @@ static void *superSubscribe(void *sarg) { pThreadInfo->threadID); appendResultToFile(res, tmpFile); } + consumed[j] ++; + + if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) + && (consumed[j] >= + g_queryInfo.superQueryInfo.resubAfterConsume[j])) { + printf("keepProgress:%d, resub super table query: %d\n", + g_queryInfo.superQueryInfo.subscribeKeepProgress, j); + taos_unsubscribe(tsub[subSeq], + g_queryInfo.superQueryInfo.subscribeKeepProgress); + consumed[j]= 0; + uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; + debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", + __func__, __LINE__, subSeq, subSqlstr); + tsub[subSeq] = subscribeImpl( + pThreadInfo->taos, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + tmpFile); + if (NULL == tsub[subSeq]) { + taos_close(pThreadInfo->taos); + return NULL; + } + } } } } @@ -6589,8 +6683,7 @@ static void *superSubscribe(void *sarg) { i <= pThreadInfo->end_table_to; i++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - taos_unsubscribe(tsub[subSeq], - g_queryInfo.superQueryInfo.subscribeKeepProgress); + taos_unsubscribe(tsub[subSeq], 0); } } @@ -6635,40 +6728,68 @@ static void *specifiedSubscribe(void *sarg) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); + g_queryInfo.specifiedQueryInfo.result[i], + pThreadInfo->threadID); } tsub[i] = subscribeImpl(pThreadInfo->taos, - g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile); + g_queryInfo.specifiedQueryInfo.sql[i], topic, + g_queryInfo.specifiedQueryInfo.subscribeRestart, + tmpFile); if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; } } + // start loop to consume result TAOS_RES* res = NULL; + + int consumed[MAX_QUERY_SQL_COUNT]; + for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) + consumed[i] = 0; + while(1) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { continue; } - taosMsleep(1000); // ms + taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms res = taos_consume(tsub[i]); if (res) { char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); + g_queryInfo.specifiedQueryInfo.result[i], + pThreadInfo->threadID); appendResultToFile(res, tmpFile); } + consumed[i] ++; + + if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) + && (consumed[i] >= + g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) { + printf("keepProgress:%d, resub specified query: %d\n", + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i); + consumed[i] = 0; + taos_unsubscribe(tsub[i], + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); + tsub[i] = subscribeImpl(pThreadInfo->taos, + g_queryInfo.specifiedQueryInfo.sql[i], topic, + g_queryInfo.specifiedQueryInfo.subscribeRestart, + tmpFile); + if (NULL == tsub[i]) { + taos_close(pThreadInfo->taos); + return NULL; + } + } } } } taos_free_result(res); for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - taos_unsubscribe(tsub[i], - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); + taos_unsubscribe(tsub[i], 0); } taos_close(pThreadInfo->taos); @@ -6719,8 +6840,10 @@ static int subscribeTestProcess() { exit(-1); } - pids = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); - infos = malloc(g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); + pids = malloc( + g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); + infos = malloc( + g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); exit(-1); @@ -6905,17 +7028,21 @@ static void setParaFromArg(){ if (g_Dbs.db[0].superTbls[0].columnCount > g_args.num_of_CPR) { g_Dbs.db[0].superTbls[0].columnCount = g_args.num_of_CPR; } else { - for (int i = g_Dbs.db[0].superTbls[0].columnCount; i < g_args.num_of_CPR; i++) { - tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, "INT", MAX_TB_NAME_SIZE); + for (int i = g_Dbs.db[0].superTbls[0].columnCount; + i < g_args.num_of_CPR; i++) { + tstrncpy(g_Dbs.db[0].superTbls[0].columns[i].dataType, + "INT", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].columns[i].dataLen = 0; g_Dbs.db[0].superTbls[0].columnCount++; } } - tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", MAX_TB_NAME_SIZE); + tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, + "INT", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0; - tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, "BINARY", MAX_TB_NAME_SIZE); + tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, + "BINARY", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary; g_Dbs.db[0].superTbls[0].tagCount = 2; } else { From 8e5681f1b004cb8b5d3982fb36f0ffd4d712e2ca Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 18 May 2021 21:36:35 +0800 Subject: [PATCH 4/4] [TD-4240]: taosdemo subscribe more than max query sql count. (#6154) Co-authored-by: Shuduo Sang --- src/kit/taosdemo/taosdemo.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 08c1142070..92b5f6abff 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -6575,6 +6575,15 @@ static void *superSubscribe(void *sarg) { if (g_queryInfo.superQueryInfo.sqlCount == 0) return NULL; + if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { + errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n", + g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables, + g_queryInfo.superQueryInfo.sqlCount, + pThreadInfo->ntables, + MAX_QUERY_SQL_COUNT); + exit(-1); + } + if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; taos = taos_connect(g_queryInfo.host,