From 5c03dc9ae0f3f2a8efac6d4978878cd69eecd26b Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Wed, 19 May 2021 16:20:38 +0800 Subject: [PATCH] Hotfix/sangshuduo/td 4238 taosdemo async subscribe (#6161) * [TD-4238]: taosdemo async subscribe. * [TD-4238]: taosdemo async subscribe subsribe sql command do not use aggregation functions. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/async-sub.json | 41 +++ src/kit/taosdemo/taosdemo.c | 430 +++++++++++++++++--------------- 2 files changed, 267 insertions(+), 204 deletions(-) create mode 100644 src/kit/taosdemo/async-sub.json diff --git a/src/kit/taosdemo/async-sub.json b/src/kit/taosdemo/async-sub.json new file mode 100644 index 0000000000..a30a1be45c --- /dev/null +++ b/src/kit/taosdemo/async-sub.json @@ -0,0 +1,41 @@ +{ + "filetype": "subscribe", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "databases": "test", + "specified_table_query": { + "concurrent": 1, + "mode": "async", + "interval": 1000, + "restart": "yes", + "keepProgress": "yes", + "resubAfterConsume": 10, + "sqls": [ + { + "sql": "select col1 from meters where col1 > 1;", + "result": "./subscribe_res0.txt" + }, + { + "sql": "select col2 from meters where col2 > 1;", + "result": "./subscribe_res2.txt" + } + ] + }, + "super_table_query": { + "stblname": "meters", + "threads": 1, + "mode": "sync", + "interval": 1000, + "restart": "yes", + "keepProgress": "yes", + "sqls": [ + { + "sql": "select col1 from xxxx where col1 > 10;", + "result": "./subscribe_res1.txt" + } + ] + } +} diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 92b5f6abff..0831555d22 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -441,8 +441,9 @@ typedef struct SThreadInfo_S { uint64_t maxDelay; uint64_t minDelay; - // query + // seq of query or subscribe uint64_t querySeq; // sequence number of sql command + } threadInfo; #ifdef WINDOWS @@ -1107,7 +1108,6 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile) } } - fprintf(fp, "%s", resultBuf); tmfclose(fp); } @@ -1150,7 +1150,7 @@ static void appendResultToFile(TAOS_RES *res, char* resultFile) { } static void selectAndGetResult( - threadInfo *pThreadInfo, char *command, char* resultFile) + threadInfo *pThreadInfo, char *command) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", strlen("taosc"))) { TAOS_RES *res = taos_query(pThreadInfo->taos, command); @@ -1161,8 +1161,8 @@ static void selectAndGetResult( return; } - if ((resultFile) && (strlen(resultFile))) { - appendResultToFile(res, resultFile); + if ((strlen(pThreadInfo->fp))) { + appendResultToFile(res, pThreadInfo->fp); } taos_free_result(res); @@ -1170,7 +1170,7 @@ static void selectAndGetResult( int retCode = postProceSql( g_queryInfo.host, &(g_queryInfo.serv_addr), g_queryInfo.port, command, - resultFile); + pThreadInfo->fp); if (0 != retCode) { printf("====restful return fail, threadID[%d]\n", pThreadInfo->threadID); } @@ -6230,23 +6230,22 @@ static void *specifiedTableQuery(void *sarg) { uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs(); + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); + } + while(queryTimes --) { if (g_queryInfo.specifiedQueryInfo.queryInterval && (et - st) < (int64_t)g_queryInfo.specifiedQueryInfo.queryInterval) { taosMsleep(g_queryInfo.specifiedQueryInfo.queryInterval - (et - st)); // ms } - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], - pThreadInfo->threadID); - } - st = taosGetTimestampMs(); selectAndGetResult(pThreadInfo, - g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], tmpFile); + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq]); et = taosGetTimestampMs(); printf("=thread[%"PRId64"] use %s complete one sql, Spent %10.3f s\n", @@ -6332,13 +6331,12 @@ static void *superTableQuery(void *sarg) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { memset(sqlstr,0,sizeof(sqlstr)); replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], sqlstr, i); - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; if (g_queryInfo.superQueryInfo.result[j][0] != 0) { - sprintf(tmpFile, "%s-%d", + sprintf(pThreadInfo->fp, "%s-%d", g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); } - selectAndGetResult(pThreadInfo, sqlstr, tmpFile); + selectAndGetResult(pThreadInfo, sqlstr); totalQueried++; g_queryInfo.superQueryInfo.totalQueried ++; @@ -6407,7 +6405,7 @@ static int queryTestProcess() { threadInfo *infos = NULL; //==== create sub threads for query from specify table int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; - int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; + uint64_t nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; uint64_t startTs = taosGetTimestampMs(); @@ -6421,32 +6419,33 @@ static int queryTestProcess() { ERROR_EXIT("memory allocation failed for create threads\n"); } - for (int i = 0; i < nConcurrent; i++) { - for (int j = 0; j < nSqlCount; j++) { - threadInfo *t_info = infos + i * nSqlCount + j; - t_info->threadID = i * nSqlCount + j; - t_info->querySeq = j; + for (uint64_t i = 0; i < nSqlCount; i++) { + for (int j = 0; j < nConcurrent; j++) { + uint64_t seq = i * nConcurrent + j; + threadInfo *t_info = infos + seq; + t_info->threadID = seq; + t_info->querySeq = i; - if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { + if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { - char sqlStr[MAX_TB_NAME_SIZE*2]; - sprintf(sqlStr, "use %s", g_queryInfo.dbName); - verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); - if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) { - taos_close(taos); - free(infos); - free(pids); - errorPrint( "use database %s failed!\n\n", - g_queryInfo.dbName); - return -1; - } + char sqlStr[MAX_TB_NAME_SIZE*2]; + sprintf(sqlStr, "use %s", g_queryInfo.dbName); + verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); + if (0 != queryDbExec(taos, sqlStr, NO_INSERT_TYPE, false)) { + taos_close(taos); + free(infos); + free(pids); + errorPrint( "use database %s failed!\n\n", + g_queryInfo.dbName); + return -1; + } + } + + t_info->taos = NULL;// TODO: workaround to use separate taos connection; + + pthread_create(pids + seq, NULL, specifiedTableQuery, + t_info); } - - t_info->taos = NULL;// TODO: workaround to use separate taos connection; - - pthread_create(pids + i * nSqlCount + j, NULL, specifiedTableQuery, - t_info); - } } } else { g_queryInfo.specifiedQueryInfo.concurrent = 0; @@ -6531,7 +6530,8 @@ static int queryTestProcess() { return 0; } -static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { +static void stable_sub_callback( + TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { if (res == NULL || taos_errno(res) != 0) { errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n", __func__, __LINE__, code, taos_errstr(res)); @@ -6539,22 +6539,44 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c } if (param) - appendResultToFile(res, (char*)param); + appendResultToFile(res, ((threadInfo *)param)->fp); + // tao_unscribe() will free result. +} + +static void specified_sub_callback( + TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) { + if (res == NULL || taos_errno(res) != 0) { + errorPrint("%s() LN%d, failed to subscribe result, code:%d, reason:%s\n", + __func__, __LINE__, code, taos_errstr(res)); + return; + } + + if (param) + appendResultToFile(res, ((threadInfo *)param)->fp); // tao_unscribe() will free result. } static TAOS_SUB* subscribeImpl( - TAOS *taos, char *sql, char* topic, bool restart, - char* resultFileName) { + threadInfo *pThreadInfo, + char *sql, char* topic, bool restart) +{ TAOS_SUB* tsub = NULL; if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { - tsub = taos_subscribe(taos, + tsub = taos_subscribe( + pThreadInfo->taos, restart, - topic, sql, subscribe_callback, (void*)resultFileName, + topic, sql, specified_sub_callback, (void*)pThreadInfo, g_queryInfo.specifiedQueryInfo.subscribeInterval); + } else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { + tsub = taos_subscribe( + pThreadInfo->taos, + restart, + topic, sql, stable_sub_callback, (void*)pThreadInfo, + g_queryInfo.superQueryInfo.subscribeInterval); } else { - tsub = taos_subscribe(taos, + tsub = taos_subscribe( + pThreadInfo->taos, restart, topic, sql, NULL, NULL, 0); } @@ -6572,13 +6594,8 @@ static void *superSubscribe(void *sarg) { char subSqlstr[MAX_QUERY_SQL_LENGTH]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; - if (g_queryInfo.superQueryInfo.sqlCount == 0) - return NULL; - - if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { - errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n", - g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables, - g_queryInfo.superQueryInfo.sqlCount, + if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { + errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", pThreadInfo->ntables, MAX_QUERY_SQL_COUNT); exit(-1); @@ -6612,88 +6629,80 @@ static void *superSubscribe(void *sarg) { char topic[32] = {0}; for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { - sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%d", i, j); + sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"", + i, pThreadInfo->querySeq); memset(subSqlstr,0,sizeof(subSqlstr)); - replaceChildTblName(g_queryInfo.superQueryInfo.sql[j], subSqlstr, i); - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[j][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); + replaceChildTblName( + g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq], + subSqlstr, i); + if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], + pThreadInfo->threadID); } - uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", - __func__, __LINE__, subSeq, subSqlstr); - tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart, - tmpFile); - if (NULL == tsub[subSeq]) { + debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", + __func__, __LINE__, pThreadInfo->threadID, subSqlstr); + tsub[i] = subscribeImpl( + pThreadInfo, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart); + if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; } - } } - int consumed[MAX_QUERY_SQL_COUNT]; - for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) - consumed[i] = 0; - // start loop to consume result + int consumed[MAX_QUERY_SQL_COUNT]; + for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) { + consumed[i] = 0; + } TAOS_RES* res = NULL; + while(1) { for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { continue; } - uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; taosMsleep(g_queryInfo.superQueryInfo.subscribeInterval); // ms - res = taos_consume(tsub[subSeq]); + res = taos_consume(tsub[i]); if (res) { - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.superQueryInfo.result[j][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.superQueryInfo.result[j], + if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - appendResultToFile(res, tmpFile); + appendResultToFile(res, pThreadInfo->fp); } - consumed[j] ++; + consumed[i] ++; if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) - && (consumed[j] >= - g_queryInfo.superQueryInfo.resubAfterConsume[j])) { - printf("keepProgress:%d, resub super table query: %d\n", - g_queryInfo.superQueryInfo.subscribeKeepProgress, j); - taos_unsubscribe(tsub[subSeq], + && (consumed[i] >= + g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { + printf("keepProgress:%d, resub super table query: %"PRIu64"\n", + g_queryInfo.superQueryInfo.subscribeKeepProgress, + pThreadInfo->querySeq); + taos_unsubscribe(tsub, g_queryInfo.superQueryInfo.subscribeKeepProgress); - consumed[j]= 0; - uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", - __func__, __LINE__, subSeq, subSqlstr); - tsub[subSeq] = subscribeImpl( - pThreadInfo->taos, subSqlstr, topic, - g_queryInfo.superQueryInfo.subscribeRestart, - tmpFile); - if (NULL == tsub[subSeq]) { + consumed[i]= 0; + tsub[i] = subscribeImpl( + pThreadInfo, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart + ); + if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; } } } - } } } taos_free_result(res); for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { - for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { - uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - taos_unsubscribe(tsub[subSeq], 0); - } + taos_unsubscribe(tsub[i], 0); } taos_close(pThreadInfo->taos); @@ -6702,10 +6711,7 @@ static void *superSubscribe(void *sarg) { static void *specifiedSubscribe(void *sarg) { threadInfo *pThreadInfo = (threadInfo *)sarg; - TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; - - if (g_queryInfo.specifiedQueryInfo.sqlCount == 0) - return NULL; + TAOS_SUB* tsub = NULL; if (pThreadInfo->taos == NULL) { TAOS * taos = NULL; @@ -6732,76 +6738,61 @@ static void *specifiedSubscribe(void *sarg) { } char topic[32] = {0}; - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - sprintf(topic, "taosdemo-subscribe-%d", i); - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[i], + sprintf(topic, "taosdemo-subscribe-%"PRIu64"", pThreadInfo->querySeq); + if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { + sprintf(pThreadInfo->fp, "%s-%d", + g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], pThreadInfo->threadID); - } - tsub[i] = subscribeImpl(pThreadInfo->taos, - g_queryInfo.specifiedQueryInfo.sql[i], topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart, - tmpFile); - if (NULL == tsub[i]) { - taos_close(pThreadInfo->taos); - return NULL; - } + } + tsub = subscribeImpl(pThreadInfo, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], + topic, + g_queryInfo.specifiedQueryInfo.subscribeRestart); + if (NULL == tsub) { + taos_close(pThreadInfo->taos); + return NULL; } // start loop to consume result TAOS_RES* res = NULL; - int consumed[MAX_QUERY_SQL_COUNT]; - for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) - consumed[i] = 0; + int consumed; while(1) { - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { continue; } taosMsleep(g_queryInfo.specifiedQueryInfo.subscribeInterval); // ms - res = taos_consume(tsub[i]); + res = taos_consume(tsub); if (res) { - char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; - if (g_queryInfo.specifiedQueryInfo.result[i][0] != 0) { - sprintf(tmpFile, "%s-%d", - g_queryInfo.specifiedQueryInfo.result[i], - pThreadInfo->threadID); - appendResultToFile(res, tmpFile); - } - consumed[i] ++; - - if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) - && (consumed[i] >= - g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) { - printf("keepProgress:%d, resub specified query: %d\n", - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i); - consumed[i] = 0; - taos_unsubscribe(tsub[i], - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); - tsub[i] = subscribeImpl(pThreadInfo->taos, - g_queryInfo.specifiedQueryInfo.sql[i], topic, - g_queryInfo.specifiedQueryInfo.subscribeRestart, - tmpFile); - if (NULL == tsub[i]) { + consumed ++; + if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) + && (consumed >= + g_queryInfo.specifiedQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { + printf("keepProgress:%d, resub specified query: %"PRIu64"\n", + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, + pThreadInfo->querySeq); + consumed = 0; + taos_unsubscribe(tsub, + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); + tsub = subscribeImpl( + pThreadInfo, + g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], + topic, + g_queryInfo.specifiedQueryInfo.subscribeRestart + ); + if (NULL == tsub) { taos_close(pThreadInfo->taos); return NULL; - } - } + } + } } - } } taos_free_result(res); - - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - taos_unsubscribe(tsub[i], 0); - } - + taos_unsubscribe(tsub, 0); taos_close(pThreadInfo->taos); + return NULL; } @@ -6836,7 +6827,11 @@ static int subscribeTestProcess() { pthread_t *pids = NULL; threadInfo *infos = NULL; - //==== create sub threads for query for specified table + + pthread_t *pidsOfStable = NULL; + threadInfo *infosOfStable = NULL; + + //==== create threads for query for specified table if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", __func__, __LINE__, @@ -6850,81 +6845,108 @@ static int subscribeTestProcess() { } pids = malloc( - g_queryInfo.specifiedQueryInfo.concurrent * sizeof(pthread_t)); + g_queryInfo.specifiedQueryInfo.sqlCount * + g_queryInfo.specifiedQueryInfo.concurrent * + sizeof(pthread_t)); infos = malloc( - g_queryInfo.specifiedQueryInfo.concurrent * sizeof(threadInfo)); + g_queryInfo.specifiedQueryInfo.sqlCount * + g_queryInfo.specifiedQueryInfo.concurrent * + sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); exit(-1); } - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { - threadInfo *t_info = infos + i; - t_info->threadID = i; - t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pids + i, NULL, specifiedSubscribe, t_info); + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) { + uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j; + threadInfo *t_info = infos + seq; + t_info->threadID = seq; + t_info->querySeq = i; + t_info->taos = NULL; // TODO: workaround to use separate taos connection; + pthread_create(pids + seq, NULL, specifiedSubscribe, t_info); + } } } - //==== create sub threads for super table query - pthread_t *pidsOfSub = NULL; - threadInfo *infosOfSub = NULL; - if ((g_queryInfo.superQueryInfo.sqlCount > 0) + //==== create threads for super table query + if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) { + printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n", + __func__, __LINE__, + g_queryInfo.specifiedQueryInfo.sqlCount); + } else { + if ((g_queryInfo.superQueryInfo.sqlCount > 0) && (g_queryInfo.superQueryInfo.threadCnt > 0)) { - pidsOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * + pidsOfStable = malloc( + g_queryInfo.superQueryInfo.sqlCount * + g_queryInfo.superQueryInfo.threadCnt * sizeof(pthread_t)); - infosOfSub = malloc(g_queryInfo.superQueryInfo.threadCnt * + infosOfStable = malloc( + g_queryInfo.superQueryInfo.sqlCount * + g_queryInfo.superQueryInfo.threadCnt * sizeof(threadInfo)); - if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { - errorPrint("%s() LN%d, malloc failed for create threads\n", + if ((NULL == pidsOfStable) || (NULL == infosOfStable)) { + errorPrint("%s() LN%d, malloc failed for create threads\n", __func__, __LINE__); - // taos_close(taos); - exit(-1); - } + // taos_close(taos); + exit(-1); + } - int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; - int threads = g_queryInfo.superQueryInfo.threadCnt; + int64_t ntables = g_queryInfo.superQueryInfo.childTblCount; + int threads = g_queryInfo.superQueryInfo.threadCnt; - int64_t a = ntables / threads; - if (a < 1) { - threads = ntables; - a = 1; - } + int64_t a = ntables / threads; + if (a < 1) { + threads = ntables; + a = 1; + } - int64_t b = 0; - if (threads != 0) { - b = ntables % threads; - } + int64_t b = 0; + if (threads != 0) { + b = ntables % threads; + } - uint64_t startFrom = 0; - for (int i = 0; i < threads; i++) { - threadInfo *t_info = infosOfSub + i; - t_info->threadID = i; + uint64_t startFrom = 0; + for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + for (int j = 0; j < threads; j++) { + uint64_t seq = i * threads + j; + threadInfo *t_info = infosOfStable + seq; + t_info->threadID = seq; + t_info->querySeq = i; - t_info->start_table_from = startFrom; - t_info->ntables = iend_table_to = i < b ? startFrom + a : startFrom + a - 1; - startFrom = t_info->end_table_to + 1; - t_info->taos = NULL; // TODO: workaround to use separate taos connection; - pthread_create(pidsOfSub + i, NULL, superSubscribe, t_info); - } + t_info->start_table_from = startFrom; + t_info->ntables = jend_table_to = jend_table_to + 1; + t_info->taos = NULL; // TODO: workaround to use separate taos connection; + pthread_create(pidsOfStable + seq, + NULL, superSubscribe, t_info); + } + } - g_queryInfo.superQueryInfo.threadCnt = threads; + g_queryInfo.superQueryInfo.threadCnt = threads; - for (int i = 0; i < g_queryInfo.superQueryInfo.threadCnt; i++) { - pthread_join(pidsOfSub[i], NULL); + for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { + for (int j = 0; j < threads; j++) { + uint64_t seq = i * threads + j; + pthread_join(pidsOfStable[seq], NULL); + } + } } } - for (int i = 0; i < g_queryInfo.specifiedQueryInfo.concurrent; i++) { - pthread_join(pids[i], NULL); + for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { + for (int j = 0; j < g_queryInfo.specifiedQueryInfo.concurrent; j++) { + uint64_t seq = i * g_queryInfo.specifiedQueryInfo.concurrent + j; + pthread_join(pids[seq], NULL); + } } tmfree((char*)pids); tmfree((char*)infos); - tmfree((char*)pidsOfSub); - tmfree((char*)infosOfSub); + tmfree((char*)pidsOfStable); + tmfree((char*)infosOfStable); // taos_close(taos); return 0; }