enh: end table scan

This commit is contained in:
dapan1121 2023-08-07 19:35:33 +08:00
parent 0546bdceb2
commit a95295774a
6 changed files with 23 additions and 15 deletions

View File

@ -47,6 +47,7 @@ typedef struct SStbJoinPrevJoinCtx {
SSHashObj* onceTable; SSHashObj* onceTable;
int64_t tableNum; int64_t tableNum;
SStbJoinTableList* pListHead; SStbJoinTableList* pListHead;
SStbJoinTableList* pListTail;
} SStbJoinPrevJoinCtx; } SStbJoinPrevJoinCtx;
typedef struct SStbJoinPostJoinCtx { typedef struct SStbJoinPostJoinCtx {

View File

@ -21,6 +21,7 @@ extern "C" {
#define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600 #define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600
#define GROUP_CACHE_MAX_FILE_FDS 10 #define GROUP_CACHE_MAX_FILE_FDS 10
#define GROUP_CACHE_DEFAULT_VGID 0
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct SGcBlkBufBasic { typedef struct SGcBlkBufBasic {

View File

@ -284,7 +284,7 @@ static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin)
while (true) { while (true) {
if (readIdx < pNode->uidNum) { if (readIdx < pNode->uidNum) {
pPost->rightNextUid = *(rightUid + readIdx); pPost->rightNextUid = *(pNode->pRightUid + readIdx);
break; break;
} }
@ -518,7 +518,7 @@ static void freeStbJoinTableList(SStbJoinTableList* pList) {
taosMemoryFree(pList); taosMemoryFree(pList);
} }
static int32_t appendStbJoinTableList(SStbJoinTableList** ppHead, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) { static int32_t appendStbJoinTableList(SStbJoinPrevJoinCtx* pCtx, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) {
SStbJoinTableList* pNew = taosMemoryMalloc(sizeof(SStbJoinTableList)); SStbJoinTableList* pNew = taosMemoryMalloc(sizeof(SStbJoinTableList));
if (NULL == pNew) { if (NULL == pNew) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -539,15 +539,16 @@ static int32_t appendStbJoinTableList(SStbJoinTableList** ppHead, int64_t rows,
pNew->readIdx = 0; pNew->readIdx = 0;
pNew->uidNum = rows; pNew->uidNum = rows;
pNew->pNext = NULL;
if (*ppHead) { if (pCtx->pListTail) {
pNew->pNext = *ppHead; pCtx->pListTail->pNext = pNew;
pCtx->pListTail = pNew;
} else { } else {
pNew->pNext = NULL; pCtx->pListHead = pNew;
pCtx->pListTail= pNew;
} }
*ppHead = pNew;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -588,7 +589,7 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = appendStbJoinTableList(&pStbJoin->ctx.prev.pListHead, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData); code = appendStbJoinTableList(&pStbJoin->ctx.prev, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pStbJoin->ctx.prev.tableNum += pBlock->info.rows; pStbJoin->ctx.prev.tableNum += pBlock->info.rows;
} }

View File

@ -213,7 +213,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
releaseFdToFileCtx(pFd); releaseFdToFileCtx(pFd);
qTrace("FileId:%d-%d-%d blk %" PRIu64 " in group %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, qTrace("FileId:%d-%d-%d blk %" PRIu64 " in group %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64,
pCtx->id, pGroup->vgId, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset); pCtx->id, pGroup ? pGroup->vgId : GROUP_CACHE_DEFAULT_VGID, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset);
int64_t blkId = pHead->basic.blkId; int64_t blkId = pHead->basic.blkId;
pHead = pHead->next; pHead = pHead->next;
@ -741,7 +741,7 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD
fakeGcParam.needCache = true; fakeGcParam.needCache = true;
fakeParam.downstreamIdx = pSession->downstreamIdx; fakeParam.downstreamIdx = pSession->downstreamIdx;
fakeParam.value = &fakeGcParam; fakeParam.value = &fakeGcParam;
code = addNewGroupData(pOperator, &fakeParam, &pGroup, -1, pBlock->info.id.groupId); code = addNewGroupData(pOperator, &fakeParam, &pGroup, GROUP_CACHE_DEFAULT_VGID, pBlock->info.id.groupId);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
@ -1026,7 +1026,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
if (NULL == pGroup) { if (NULL == pGroup) {
code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? -1 : pGcParam->vgId, pGcParam->tbUid); code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? GROUP_CACHE_DEFAULT_VGID : pGcParam->vgId, pGcParam->tbUid);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }

View File

@ -794,11 +794,16 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
qError("vgId:%d add total %d dynamic tables to scan, tableSeq:%d, exist num:%" PRId64, qError("vgId:%d add total %d dynamic tables to scan, tableSeq:%d, exist num:%" PRId64 ", operator status:%d",
pTaskInfo->id.vgId, num, pParam->tableSeq, (int64_t)taosArrayGetSize(pListInfo->pTableList)); pTaskInfo->id.vgId, num, pParam->tableSeq, (int64_t)taosArrayGetSize(pListInfo->pTableList), pOperator->status);
if (pParam->tableSeq) { if (pParam->tableSeq) {
pListInfo->oneTableForEachGroup = true; pListInfo->oneTableForEachGroup = true;
if (taosArrayGetSize(pListInfo->pTableList) > 0) {
taosHashClear(pListInfo->map);
taosArrayClear(pListInfo->pTableList);
pOperator->status = OP_EXEC_DONE;
}
} else { } else {
pListInfo->oneTableForEachGroup = false; pListInfo->oneTableForEachGroup = false;
pListInfo->numOfOuputGroups = 1; pListInfo->numOfOuputGroups = 1;

View File

@ -3262,7 +3262,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode**
pGrpCache->node.dynamicOp = true; pGrpCache->node.dynamicOp = true;
pGrpCache->grpColsMayBeNull = false; pGrpCache->grpColsMayBeNull = false;
pGrpCache->grpByUid = true; pGrpCache->grpByUid = true;
pGrpCache->batchFetch = false; pGrpCache->batchFetch = true;
pGrpCache->node.pChildren = pChildren; pGrpCache->node.pChildren = pChildren;
pGrpCache->node.pTargets = nodesMakeList(); pGrpCache->node.pTargets = nodesMakeList();
if (NULL == pGrpCache->node.pTargets) { if (NULL == pGrpCache->node.pTargets) {
@ -3368,7 +3368,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p
} }
pDynCtrl->qType = DYN_QTYPE_STB_HASH; pDynCtrl->qType = DYN_QTYPE_STB_HASH;
pDynCtrl->stbJoin.batchFetch = false; pDynCtrl->stbJoin.batchFetch = true;
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pDynCtrl->node.pChildren = nodesMakeList(); pDynCtrl->node.pChildren = nodesMakeList();