From a95295774a8c57b5fe7b346cad181003bcae8d07 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 7 Aug 2023 19:35:33 +0800 Subject: [PATCH] enh: end table scan --- source/libs/executor/inc/dynqueryctrl.h | 1 + source/libs/executor/inc/groupcache.h | 1 + source/libs/executor/src/dynqueryctrloperator.c | 17 +++++++++-------- source/libs/executor/src/groupcacheoperator.c | 6 +++--- source/libs/executor/src/scanoperator.c | 9 +++++++-- source/libs/planner/src/planOptimizer.c | 4 ++-- 6 files changed, 23 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index a6d0a551b8..fe1a1140af 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -47,6 +47,7 @@ typedef struct SStbJoinPrevJoinCtx { SSHashObj* onceTable; int64_t tableNum; SStbJoinTableList* pListHead; + SStbJoinTableList* pListTail; } SStbJoinPrevJoinCtx; typedef struct SStbJoinPostJoinCtx { diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 250bc70fff..d86388a0b4 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -21,6 +21,7 @@ extern "C" { #define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600 #define GROUP_CACHE_MAX_FILE_FDS 10 +#define GROUP_CACHE_DEFAULT_VGID 0 #pragma pack(push, 1) typedef struct SGcBlkBufBasic { diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 0072bd5ae8..150c3a73ff 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -284,7 +284,7 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin) while (true) { if (readIdx < pNode->uidNum) { - pPost->rightNextUid = *(rightUid + readIdx); + pPost->rightNextUid = *(pNode->pRightUid + readIdx); break; } @@ -518,7 +518,7 @@ static void freeStbJoinTableList(SStbJoinTableList* pList) { taosMemoryFree(pList); } -static int32_t appendStbJoinTableList(SStbJoinTableList** ppHead, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) { +static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) { SStbJoinTableList* pNew = taosMemoryMalloc(sizeof(SStbJoinTableList)); if (NULL == pNew) { return TSDB_CODE_OUT_OF_MEMORY; @@ -539,15 +539,16 @@ static int32_t appendStbJoinTableList(SStbJoinTableList** ppHead, int64_t rows, pNew->readIdx = 0; pNew->uidNum = rows; + pNew->pNext = NULL; - if (*ppHead) { - pNew->pNext = *ppHead; + if (pCtx->pListTail) { + pCtx->pListTail->pNext = pNew; + pCtx->pListTail = pNew; } else { - pNew->pNext = NULL; + pCtx->pListHead = pNew; + pCtx->pListTail= pNew; } - *ppHead = pNew; - return TSDB_CODE_SUCCESS; } @@ -588,7 +589,7 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc } if (TSDB_CODE_SUCCESS == code) { - code = appendStbJoinTableList(&pStbJoin->ctx.prev.pListHead, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData); + code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData); if (TSDB_CODE_SUCCESS == code) { pStbJoin->ctx.prev.tableNum += pBlock->info.rows; } diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 321a6ac016..5b6a6abe4a 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -213,7 +213,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC releaseFdToFileCtx(pFd); qTrace("FileId:%d-%d-%d blk %" PRIu64 " in group %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, - pCtx->id, pGroup->vgId, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset); + pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset); int64_t blkId = pHead->basic.blkId; pHead = pHead->next; @@ -741,7 +741,7 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD fakeGcParam.needCache = true; fakeParam.downstreamIdx = pSession->downstreamIdx; fakeParam.value = &fakeGcParam; - code = addNewGroupData(pOperator, &fakeParam, &pGroup, -1, pBlock->info.id.groupId); + code = addNewGroupData(pOperator, &fakeParam, &pGroup, GROUP_CACHE_DEFAULT_VGID, pBlock->info.id.groupId); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -1026,7 +1026,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); if (NULL == pGroup) { - code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? -1 : pGcParam->vgId, pGcParam->tbUid); + code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? GROUP_CACHE_DEFAULT_VGID : pGcParam->vgId, pGcParam->tbUid); if (TSDB_CODE_SUCCESS != code) { return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 41066f4a3a..c2eca3e7ce 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -794,11 +794,16 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { return TSDB_CODE_INVALID_PARA; } - qError("vgId:%d add total %d dynamic tables to scan, tableSeq:%d, exist num:%" PRId64, - pTaskInfo->id.vgId, num, pParam->tableSeq, (int64_t)taosArrayGetSize(pListInfo->pTableList)); + qError("vgId:%d add total %d dynamic tables to scan, tableSeq:%d, exist num:%" PRId64 ", operator status:%d", + pTaskInfo->id.vgId, num, pParam->tableSeq, (int64_t)taosArrayGetSize(pListInfo->pTableList), pOperator->status); if (pParam->tableSeq) { pListInfo->oneTableForEachGroup = true; + if (taosArrayGetSize(pListInfo->pTableList) > 0) { + taosHashClear(pListInfo->map); + taosArrayClear(pListInfo->pTableList); + pOperator->status = OP_EXEC_DONE; + } } else { pListInfo->oneTableForEachGroup = false; pListInfo->numOfOuputGroups = 1; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index beb3cd8be5..45a1919f27 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3262,7 +3262,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** pGrpCache->node.dynamicOp = true; pGrpCache->grpColsMayBeNull = false; pGrpCache->grpByUid = true; - pGrpCache->batchFetch = false; + pGrpCache->batchFetch = true; pGrpCache->node.pChildren = pChildren; pGrpCache->node.pTargets = nodesMakeList(); if (NULL == pGrpCache->node.pTargets) { @@ -3368,7 +3368,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p } pDynCtrl->qType = DYN_QTYPE_STB_HASH; - pDynCtrl->stbJoin.batchFetch = false; + pDynCtrl->stbJoin.batchFetch = true; if (TSDB_CODE_SUCCESS == code) { pDynCtrl->node.pChildren = nodesMakeList();