From 492cb92348d6e76abd3319627d3bf6013e090361 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 23 Mar 2022 19:58:32 +0800 Subject: [PATCH 1/3] extract output name from ast --- example/src/tstream.c | 2 +- include/common/tmsg.h | 7 +++++ source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/src/mndDef.c | 21 +++++++++++++++ source/dnode/mnode/impl/src/mndStream.c | 35 +++++++++++++++++++++++++ 5 files changed, 65 insertions(+), 2 deletions(-) diff --git a/example/src/tstream.c b/example/src/tstream.c index 6976d9e398..51578bd27b 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -78,7 +78,7 @@ int32_t create_stream() { taos_free_result(pRes); /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ - const char* sql = "select min(k), max(k), sum(k) from st1"; + const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1"; /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ pRes = tmq_create_stream(pConn, "stream1", "out1", sql); if (taos_errno(pRes) != 0) { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 812025e8e4..6a7edb481f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2374,6 +2374,13 @@ typedef struct { int32_t reserved; } SStreamTaskExecRsp; +typedef struct { + SMsgHead head; + int64_t streamId; + int64_t version; + SArray* res; // SArray +} SStreamSmaSinkReq; + #pragma pack(pop) #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 909486aaac..30b4c923d9 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -697,12 +697,12 @@ typedef struct { char* logicalPlan; char* physicalPlan; SArray* tasks; // SArray> + SArray* outputName; } SStreamObj; int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SCoder* pDecoder, SStreamObj* pObj); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index f81dead325..f0905f88d2 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -16,6 +16,7 @@ #include "mndDef.h" int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { + int32_t outputNameSz = 0; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->db) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; @@ -43,6 +44,15 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { } else { tEncodeI32(pEncoder, 0); } + + if (pObj->outputName != NULL) { + outputNameSz = taosArrayGetSize(pObj->outputName); + } + if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1; + for (int32_t i = 0; i < outputNameSz; i++) { + char *name = taosArrayGetP(pObj->outputName, i); + if (tEncodeCStr(pEncoder, name) < 0) return -1; + } return pEncoder->pos; } @@ -76,5 +86,16 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { } else { pObj->tasks = NULL; } + int32_t outputNameSz; + if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; + pObj->outputName = taosArrayInit(outputNameSz, sizeof(void *)); + if (pObj->outputName == NULL) { + return -1; + } + for (int32_t i = 0; i < outputNameSz; i++) { + char *name; + if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1; + taosArrayPush(pObj->outputName, &name); + } return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 2ca4e10f68..d9b5341ba1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -218,6 +218,28 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } +static SArray *mndExtractNamesFromAst(const SNode *pAst) { + if (pAst->type != QUERY_NODE_SELECT_STMT) return NULL; + + SArray *names = taosArrayInit(0, sizeof(void *)); + if (names == NULL) { + return NULL; + } + SSelectStmt *pSelect = (SSelectStmt *)pAst; + SNodeList *pNodes = pSelect->pProjectionList; + SListCell *pCell = pNodes->pHead; + while (pCell != NULL) { + if (pCell->pNode->type != QUERY_NODE_FUNCTION) { + continue; + } + SFunctionNode *pFunction = (SFunctionNode *)pCell->pNode; + char *name = strdup(pFunction->node.aliasName); + taosArrayPush(names, &name); + pCell = pCell->pNext; + } + return names; +} + static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) { if (NULL == pCreate->ast) { return TSDB_CODE_SUCCESS; @@ -259,6 +281,19 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; + SNode *pAst = NULL; + if (nodesStringToNode(pCreate->ast, &pAst) < 0) { + return -1; + } + SArray *names = mndExtractNamesFromAst(pAst); + printf("|"); + for (int i = 0; i < taosArrayGetSize(names); i++) { + printf(" %15s |", (char *)taosArrayGetP(names, i)); + } + printf("\n=======================================================\n"); + + streamObj.outputName = names; + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); return -1; From 311e65daa7053d42cffda2f6805758db553218be Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Mar 2022 22:53:49 +0800 Subject: [PATCH 2/3] [td-13039] support add group by keys. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 8 ------- source/libs/executor/src/executorimpl.c | 31 +++++++++++++++++++++++-- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 6c79fe3726..680e517fd2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1403,14 +1403,6 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t continue; } - int32_t bytes = pColInfo->info.bytes; - - if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { - pData = (char*)pColInfo->pData + numOfRows * pColInfo->info.bytes; - } else { - pData = (char*)pColInfo->pData + (capacity - numOfRows - num) * pColInfo->info.bytes; - } - if (!isAllRowsNull(src) && pColInfo->info.colId == src->colId) { if (!IS_VAR_DATA_TYPE(pColInfo->info.type)) { // todo opt performance // memmove(pData, (char*)src->pData + bytes * start, bytes * num); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2e769936eb..787a8979fc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1715,10 +1715,31 @@ static int32_t generatedHashKey(void* pKey, int32_t* length, SArray* pGroupColVa return 0; } +// assign the group keys or user input constant values if required +static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) { + for(int32_t i = 0; i < numOfOutput; ++i) { + if (pCtx[i].functionId == -1) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]); + + SColumnInfoData* pColInfoData = pCtx[i].input.pData[0]; + if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) { + char* dest = GET_ROWCELL_INTERBUF(pEntryInfo); + char* data = colDataGetData(pColInfoData, rowIndex); + + // set result exists, todo refactor + memcpy(dest, data, pColInfoData->info.bytes); + pEntryInfo->hasResult = DATA_SET_FLAG; + pEntryInfo->numOfRes = 1; + } + } + } +} + static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) { SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; SGroupbyOperatorInfo *pInfo = pOperator->info; + SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); // if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { //qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv)); @@ -1751,7 +1772,11 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pInfo->binfo.pCtx, &w, j - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + int32_t rowIndex = j - num; + doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + + // assign the group keys or user input constant values if required + doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); keepGroupKeys(pInfo, pBlock, j, numOfGroupCols); num = 1; } @@ -1764,7 +1789,9 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock *pBlock) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } - doApplyFunctions(pInfo->binfo.pCtx, &w, pBlock->info.rows - num, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + int32_t rowIndex = pBlock->info.rows - num; + doApplyFunctions(pCtx, &w, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); } } From c1f25f3ac73bb20fee8ce07c7751d7efd74ac19d Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 24 Mar 2022 10:13:32 +0800 Subject: [PATCH 3/3] [add check tmq consume result] --- tests/script/tsim/insert/null.sim | 2 -- tests/script/tsim/tmq/basic.sim | 10 ++++++--- tests/test/c/tmqDemo.c | 35 ++++++++++++++++++++++++------- 3 files changed, 34 insertions(+), 13 deletions(-) diff --git a/tests/script/tsim/insert/null.sim b/tests/script/tsim/insert/null.sim index 156a618fb6..fbaef8cc94 100644 --- a/tests/script/tsim/insert/null.sim +++ b/tests/script/tsim/insert/null.sim @@ -244,8 +244,6 @@ endi # return -1 #endi -return - #=================================================================== #=================================================================== diff --git a/tests/script/tsim/tmq/basic.sim b/tests/script/tsim/tmq/basic.sim index 31836f8580..3e42c2cbd7 100644 --- a/tests/script/tsim/tmq/basic.sim +++ b/tests/script/tsim/tmq/basic.sim @@ -47,9 +47,12 @@ sql drop database useless_db # -m startTimestamp, default is 1640966400000 [2022-01-01 00:00:00] # -g showMsgFlag, default is 0 # -#system_content ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg -system ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg -print result-> $system_content +print cmd===> system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal +system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal +print cmd result----> $system_content +if $system_content != @{consume success: 100}@ then + print not match in pos000 +endi sql show databases print ===> $rows $data00 $data01 $data02 $data03 @@ -78,4 +81,5 @@ endi if $data00 != 10000 then return -1 endi + #system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 866bcb2ea0..15022f648a 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -58,6 +58,7 @@ typedef struct { int32_t totalRowsOfPerTbl; int64_t startTimestamp; int32_t showMsgFlag; + int32_t simCase; int32_t totalRowsOfT2; } SConfInfo; @@ -66,7 +67,7 @@ static SConfInfo g_stConfInfo = { "tmqdb", "stb", "./tmqResult.txt", // output_file - "/data2/dnode/data/vnodes/vnode2/wal", + "/data2/dnode/data/vnode/vnode2/wal", 1, // threads 1, // tables 1, // vgroups @@ -77,6 +78,7 @@ static SConfInfo g_stConfInfo = { 10000, // total rows for per table 0, // 2020-01-01 00:00:00.000 0, // show consume msg switch + 0, // if run in sim case 10000, }; @@ -117,6 +119,8 @@ static void printHelp() { printf("%s%s%s%" PRId64 "\n", indent, indent, "startTimestamp, default is ", g_stConfInfo.startTimestamp); printf("%s%s\n", indent, "-g"); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); + printf("%s%s\n", indent, "-sim"); + printf("%s%s%s%d\n", indent, indent, "simCase, default is ", g_stConfInfo.simCase); exit(EXIT_SUCCESS); } @@ -160,14 +164,17 @@ void parseArgument(int32_t argc, char *argv[]) { g_stConfInfo.startTimestamp = atol(argv[++i]); } else if (strcmp(argv[i], "-g") == 0) { g_stConfInfo.showMsgFlag = atol(argv[++i]); + } else if (strcmp(argv[i], "-sim") == 0) { + g_stConfInfo.simCase = atol(argv[++i]); } else { - pPrint("%s unknow para: %s %s", GREEN, argv[++i], NC); + printf("%s unknow para: %s %s", GREEN, argv[++i], NC); exit(-1); } } g_stConfInfo.totalRowsOfT2 = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.ratio; +#if 0 pPrint("%s configDir:%s %s", GREEN, configDir, NC); pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC); @@ -184,6 +191,7 @@ void parseArgument(int32_t argc, char *argv[]) { pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC); pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); +#endif } static int running = 1; @@ -429,15 +437,21 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog double consumeTime = (double)(endTime - startTime) / 1000000; if (batchCnt != totalMsgs) { - pPrint("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC); + printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC); + exit(-1); } - pPrint("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime); + if (0 == g_stConfInfo.simCase) { + printf("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime); + } else { + printf("{consume success: %d}", totalMsgs); + } taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.2f| %10.2f |\n", batchCnt, consumeTime, (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime, (double)walLogSize / 1024.0 / batchCnt); err = tmq_consumer_close(tmq); if (err) { fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + exit(-1); } } @@ -679,12 +693,17 @@ int main(int32_t argc, char *argv[]) { walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); if (walLogSize <= 0) { - pError("vnode2/wal size incorrect!"); + printf("vnode2/wal size incorrect!"); + exit(-1); } else { - pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0)); + if (0 == g_stConfInfo.simCase) { + pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0)); + } } - - pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows, totalMsgs, seconds, rowsSpeed, msgsSpeed); + + if (0 == g_stConfInfo.simCase) { + pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows, totalMsgs, seconds, rowsSpeed, msgsSpeed); + } taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.3f ", totalMsgs, seconds, msgsSpeed, (double)walLogSize/(1024 * 1024.0)); }