From c15c88dff74baee2c4e3b215ca93e1144327d992 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 17 Feb 2022 18:37:43 +0800 Subject: [PATCH] [td-11818] add table id for streamscan operator. --- example/src/tmq.c | 20 ++++++++++---------- include/libs/executor/executor.h | 13 ++++++++++++- source/libs/executor/src/executor.c | 22 ++++++++++++++++++++++ source/libs/executor/src/executorimpl.c | 7 +++---- 4 files changed, 47 insertions(+), 15 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 5858282aab..05540780d1 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -28,7 +28,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; @@ -56,14 +56,14 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); - return -1; - } - taos_free_result(pRes); +// pRes = taos_query(pConn, "create table if not exists tu6 using st1 tags(2)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); - const char* sql = "select * from tu1"; + const char* sql = "select * from st1"; pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); if (taos_errno(pRes) != 0) { printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); @@ -197,7 +197,7 @@ int main() { code = init_env(); tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); - perf_loop(tmq, topic_list); - /*basic_consume_loop(tmq, topic_list);*/ +// perf_loop(tmq, topic_list); + basic_consume_loop(tmq, topic_list); /*sync_consume_loop(tmq, topic_list);*/ } diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d9e1957e5d..caa8f0bc36 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -31,6 +31,7 @@ typedef struct SReadHandle { void* reader; void* meta; } SReadHandle; + /** * Create the exec task for streaming mode * @param pMsg @@ -40,13 +41,23 @@ typedef struct SReadHandle { qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); /** - * + * Set the input data block for the stream scan. * @param tinfo * @param input * @return */ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input); +/** + * Update the table id list, add or remove. + * + * @param tinfo + * @param id + * @param isAdd + * @return + */ +int32_t qUpdateTQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd); + /** * Create the exec task object according to task json * @param readHandle diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c47c83ba29..29e60c8e91 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -90,3 +90,25 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { return pTaskInfo; } + +int32_t qUpdateTQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo* )tinfo; + + // traverse to the streamscan node to add this table id + SOperatorInfo* pInfo = pTaskInfo->pRoot; + while(pInfo->operatorType != OP_StreamScan) { + pInfo = pInfo->pDownstream[0]; + } + + SStreamBlockScanInfo* pScanInfo = pInfo->info; + if (isAdd) { + int32_t code = tqReadHandleSetTbUidList(pScanInfo->readerHandle, tableIdList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } else { + assert(0); + } + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 593edf257a..4d8249d12f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5441,7 +5441,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t)); for(int32_t i = 0; i < numOfOutput; ++i) { SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); - taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId); } @@ -5624,7 +5623,7 @@ static SExprInfo* exprArrayDup(SArray* pExprInfo) { return p; } -static SSDataBlock* doSortMerge(void* param, bool* newgroup) { +static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -5635,7 +5634,7 @@ static SSDataBlock* doSortMerge(void* param, bool* newgroup) { for(int32_t i = 0; i < pInfo->numOfSources; ++i) { SSDataBlock* pBlock = pOperator->pDownstream[i]->exec(pOperator->pDownstream[i], newgroup); - + // TODO set the order input sources. } return NULL; @@ -5679,7 +5678,7 @@ SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->numOfOutput = numOfOutput; pOperator->pTaskInfo = pTaskInfo; - pOperator->exec = doSortMerge; + pOperator->exec = doSortedMerge; pOperator->cleanupFn = destroyGlobalAggOperatorInfo; appendDownstream(pOperator, downstream);