From 758427b3c41ba71ac559e2be9ba364dafa853d40 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 24 Apr 2023 15:48:20 +0800 Subject: [PATCH 1/3] fix: use slimit to indicate group for tag scan --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/scanoperator.c | 93 ++++++++++++++----------- source/libs/planner/src/planOptimizer.c | 29 ++++++++ 3 files changed, 82 insertions(+), 41 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 85424fd7de..43ee54f65d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -366,6 +366,7 @@ typedef struct STagScanInfo { int32_t curPos; SReadHandle readHandle; STableListInfo* pTableListInfo; + SLimitNode* pSlimit; } STagScanInfo; typedef enum EStreamScanMode { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2389c7252e..b7315db204 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2512,6 +2512,53 @@ _error: return NULL; } +static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTaskInfo, STagScanInfo* pInfo, + const SExprInfo* pExprInfo, const SSDataBlock* pRes, int32_t size, const char* str, + int32_t* count, SMetaReader* mr) { + STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); + int32_t code = metaGetTableEntryByUid(mr, item->uid); + tDecoderClear(&(*mr).coder); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), + GET_TASKID(pTaskInfo)); + metaReaderClear(mr); + T_LONG_JMP(pTaskInfo->env, terrno); + } + + for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); + + // refactor later + if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { + STR_TO_VARSTR(str, (*mr).me.name); + colDataSetVal(pDst, (*count), str, false); + } else { // it is a tag value + STagVal val = {0}; + val.cid = pExprInfo[j].base.pParam[0].pCol->colId; + const char* p = metaGetTableTagVal((*mr).me.ctbEntry.pTags, pDst->info.type, &val); + + char* data = NULL; + if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { + data = tTagValToData((const STagVal*)p, false); + } else { + data = (char*)p; + } + colDataSetVal(pDst, (*count), data, + (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); + + if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && + data != NULL) { + taosMemoryFree(data); + } + } + } + + (*count) += 1; + if (++pInfo->curPos >= size) { + setOperatorCompleted(pOperator); + } +} + static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -2536,47 +2583,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); - int32_t code = metaGetTableEntryByUid(&mr, item->uid); - tDecoderClear(&mr.coder); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), - GET_TASKID(pTaskInfo)); - metaReaderClear(&mr); - T_LONG_JMP(pTaskInfo->env, terrno); - } - - for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); - - // refactor later - if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { - STR_TO_VARSTR(str, mr.me.name); - colDataSetVal(pDst, count, str, false); - } else { // it is a tag value - STagVal val = {0}; - val.cid = pExprInfo[j].base.pParam[0].pCol->colId; - const char* p = metaGetTableTagVal(mr.me.ctbEntry.pTags, pDst->info.type, &val); - - char* data = NULL; - if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) { - data = tTagValToData((const STagVal*)p, false); - } else { - data = (char*)p; - } - colDataSetVal(pDst, count, data, - (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); - - if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && - data != NULL) { - taosMemoryFree(data); - } - } - } - - count += 1; - if (++pInfo->curPos >= size) { - setOperatorCompleted(pOperator); + doTagScanOneTable(pOperator, pTaskInfo, pInfo, pExprInfo, pRes, size, str, &count, &mr); + if (pInfo->pSlimit != NULL) { + pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name)); + break; } } @@ -2628,6 +2638,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; + pInfo->pSlimit = (SLimitNode*)pPhyNode->node.pSlimit; //TODO: slimit now only indicate group setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 07ea110d7e..4ece9be304 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2418,6 +2418,34 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) { return true; } +static SLogicNode* tagScanOptFindAncestorWithSlimit(SLogicNode* pTableScanNode) { + SLogicNode* pNode = pTableScanNode->pParent; + while (NULL != pNode) { + if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) || + QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode) || QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pNode)) { + return NULL; + } + if (NULL != pNode->pSlimit) { + return pNode; + } + pNode = pNode->pParent; + } + return NULL; +} + +static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) { + if (NULL != pTableScanNode->pSlimit) { + return; + } + + SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode); + if (NULL != pNode) { + //TODO: only set the slimit now. push down slimit later + pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit); + } + return; +} + static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SScanLogicNode* pScanNode = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tagScanOptShouldBeOptimized); if (NULL == pScanNode) { @@ -2458,6 +2486,7 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp NODES_CLEAR_LIST(pAgg->pChildren); } nodesDestroyNode((SNode*)pAgg); + tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode); pCxt->optimized = true; return TSDB_CODE_SUCCESS; } From 93ab81437f57733e536e108d8e34ad51115f94ff Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 25 Apr 2023 11:08:58 +0800 Subject: [PATCH 2/3] enhance: code refactor and operator full implement slimit --- source/libs/executor/src/scanoperator.c | 31 ++++++++++++++++--------- source/libs/planner/src/planOptimizer.c | 2 ++ 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b7315db204..6d62d55024 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2512,9 +2512,11 @@ _error: return NULL; } -static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTaskInfo, STagScanInfo* pInfo, - const SExprInfo* pExprInfo, const SSDataBlock* pRes, int32_t size, const char* str, - int32_t* count, SMetaReader* mr) { +static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes, int32_t count, SMetaReader* mr) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STagScanInfo* pInfo = pOperator->info; + SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; + STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); int32_t code = metaGetTableEntryByUid(mr, item->uid); tDecoderClear(&(*mr).coder); @@ -2525,13 +2527,14 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTa T_LONG_JMP(pTaskInfo->env, terrno); } + char str[512]; for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); // refactor later if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) { STR_TO_VARSTR(str, (*mr).me.name); - colDataSetVal(pDst, (*count), str, false); + colDataSetVal(pDst, (count), str, false); } else { // it is a tag value STagVal val = {0}; val.cid = pExprInfo[j].base.pParam[0].pCol->colId; @@ -2543,7 +2546,7 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTa } else { data = (char*)p; } - colDataSetVal(pDst, (*count), data, + colDataSetVal(pDst, (count), data, (data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data))); if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) && @@ -2552,11 +2555,6 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SExecTaskInfo* pTa } } } - - (*count) += 1; - if (++pInfo->curPos >= size) { - setOperatorCompleted(pOperator); - } } static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { @@ -2583,9 +2581,20 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - doTagScanOneTable(pOperator, pTaskInfo, pInfo, pExprInfo, pRes, size, str, &count, &mr); + doTagScanOneTable(pOperator, pRes, count, &mr); + ++count; + if (++pInfo->curPos >= size) { + setOperatorCompleted(pOperator); + } + // each table with tbname is a group, hence its own block, but only group when slimit exists for performance reason. if (pInfo->pSlimit != NULL) { + if (pInfo->curPos < pInfo->pSlimit->offset) { + continue; + } pInfo->pRes->info.id.groupId = calcGroupId(mr.me.name, strlen(mr.me.name)); + if (pInfo->curPos >= (pInfo->pSlimit->offset + pInfo->pSlimit->limit) - 1) { + setOperatorCompleted(pOperator); + } break; } } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 4ece9be304..4f8b57de5f 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2442,6 +2442,8 @@ static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) { if (NULL != pNode) { //TODO: only set the slimit now. push down slimit later pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit); + ((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset; + ((SLimitNode*)pTableScanNode->pSlimit)->offset = 0; } return; } From ae773fee8b17102feca3bfbe7a4ce2c6112b8dc3 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 25 Apr 2023 14:43:22 +0800 Subject: [PATCH 3/3] feature: add test case --- tests/parallel_test/cases.task | 1 + tests/script/tsim/query/tag_scan.sim | 48 ++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 tests/script/tsim/query/tag_scan.sim diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index dda4ec3e84..9f9679ae35 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -887,6 +887,7 @@ ,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/query/tableCount.sim +,,y,script,./test.sh -f tsim/query/tag_scan.sim ,,y,script,./test.sh -f tsim/query/nullColSma.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim diff --git a/tests/script/tsim/query/tag_scan.sim b/tests/script/tsim/query/tag_scan.sim new file mode 100644 index 0000000000..03e3a20632 --- /dev/null +++ b/tests/script/tsim/query/tag_scan.sim @@ -0,0 +1,48 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect +sql drop database if exists test +sql create database test; +sql use test; + +sql create table st(ts timestamp, f int) tags (t int); +sql insert into ct1 using st tags(1) values(now, 1); +sql insert into ct2 using st tags(2) values(now, 2); +sql insert into ct3 using st tags(3) values(now, 3); +sql insert into ct4 using st tags(4) values(now, 4); + +sql create table st2(ts timestamp, f int) tags (t int); +sql insert into ct21 using st2 tags(1) values(now, 1); +sql insert into ct22 using st2 tags(2) values(now, 2); +sql insert into ct23 using st2 tags(3) values(now, 3); +sql insert into ct24 using st2 tags(4) values(now, 4); + +sql select tbname, 1 from st group by tbname order by tbname; +print $rows $data00 $data10 $data20 +if $rows != 4 then + return -1 +endi +if $data00 != @ct1@ then + return -1 +endi +if $data10 != @ct2@ then + return -1 +endi +sql select tbname, 1 from st group by tbname slimit 0, 1; +print $rows +if $rows != 1 then + return -1 +endi +sql select tbname, 1 from st group by tbname slimit 2, 2; +print $rows $data00 $data10 +if $rows != 2 then + return -1 +endi +sql select tbname, 1 from st group by tbname order by tbname slimit 0, 1; +print $rows $data00 $data10 $data20 +if $rows != 4 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT