From df130c1bcb918f379f2dd19f4fd8f9bd1c719585 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Jun 2022 21:56:55 +0800 Subject: [PATCH 1/3] feat: add multiple group tsdb reads to table merge scan --- source/libs/executor/src/scanoperator.c | 50 ++++++++++++++++++------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 432c70f34a..02c7c67260 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1912,9 +1912,10 @@ typedef struct STableMergeScanInfo { int32_t tableStartIndex; int32_t tableEndIndex; bool hasGroupId; + uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort @@ -2211,6 +2212,16 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); + int32_t i = pInfo->tableStartIndex + 1; + for (; i < tableListSize; ++i) { + STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i); + if (tableKeyInfo->groupId != pInfo->groupId) { + break; + } + } + pInfo->tableEndIndex = i - 1; + int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; @@ -2307,20 +2318,34 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } + size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; + pInfo->tableStartIndex = 0; - pInfo->tableEndIndex = taosArrayGetSize(pInfo->tableListInfo->pTableList) - 1; + pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } - SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); - - if (pBlock != NULL) { - pOperator->resultInfo.totalRows += pBlock->info.rows; - } else { - stopGroupTableMergeScan(pOperator); - doSetOperatorCompleted(pOperator); + SSDataBlock* pBlock = NULL; + while (pInfo->tableStartIndex < tableListSize) { + pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); + if (pBlock != NULL) { + pBlock->info.groupId = pInfo->groupId; + pOperator->resultInfo.totalRows += pBlock->info.rows; + return pBlock; + } else { + stopGroupTableMergeScan(pOperator); + if (pInfo->tableEndIndex >= tableListSize - 1) { + doSetOperatorCompleted(pOperator); + break; + } + pInfo->tableStartIndex = pInfo->tableEndIndex + 1; + pInfo->groupId = + ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; + startGroupTableMergeScan(pOperator); + } } + return pBlock; } @@ -2328,7 +2353,6 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); - if (pTableScanInfo->pColMatchInfo != NULL) { taosArrayDestroy(pTableScanInfo->pColMatchInfo); } @@ -2417,8 +2441,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN initResultSizeInfo(pOperator, 1024); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, - NULL, NULL, getTableMergeScanExplainExecInfo); + createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL, + NULL, getTableMergeScanExplainExecInfo); pOperator->cost.openCost = 0; return pOperator; From c99f7cac793867298f8d2fa11e43e487ac47f73b Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Jun 2022 22:00:52 +0800 Subject: [PATCH 2/3] feat: add multiple group tsdb reads to table merge scan --- source/libs/executor/src/scanoperator.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 02c7c67260..a416df4546 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2212,15 +2212,17 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); - int32_t i = pInfo->tableStartIndex + 1; - for (; i < tableListSize; ++i) { - STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i); - if (tableKeyInfo->groupId != pInfo->groupId) { - break; + { + size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); + int32_t i = pInfo->tableStartIndex + 1; + for (; i < tableListSize; ++i) { + STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i); + if (tableKeyInfo->groupId != pInfo->groupId) { + break; + } } + pInfo->tableEndIndex = i - 1; } - pInfo->tableEndIndex = i - 1; int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; From fd79fcc481580fc0c664e3eb7eb379b22af6151c Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 22 Jun 2022 22:32:29 +0800 Subject: [PATCH 3/3] feat: add multiple group tsdb reads to table merge scan --- source/libs/executor/src/scanoperator.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a416df4546..13a6a8855c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2323,7 +2323,11 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; - + + if (tableListSize == 0) { + doSetOperatorCompleted(pOperator); + return NULL; + } pInfo->tableStartIndex = 0; pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator);