From 294376de0bf33f3cde8ac0f27448f93cec83765c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 18 Jul 2023 14:44:53 +0800 Subject: [PATCH] enh: add batch table scan --- include/common/tmsg.h | 1 + include/libs/nodes/plannodes.h | 1 + source/common/src/tmsg.c | 2 + source/libs/executor/inc/dynqueryctrl.h | 19 +- source/libs/executor/inc/executorInt.h | 1 + .../libs/executor/src/dynqueryctrloperator.c | 339 +++++++++++++++++- source/libs/executor/src/exchangeoperator.c | 8 +- source/libs/executor/src/scanoperator.c | 13 +- 8 files changed, 361 insertions(+), 23 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 88b7e3ba9a..c91370a625 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2044,6 +2044,7 @@ typedef struct SOperatorParam { } SOperatorParam; typedef struct STableScanOperatorParam { + bool tableSeq; SArray* pUidList; } STableScanOperatorParam; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c235b693b3..e0d681805c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -451,6 +451,7 @@ typedef struct SGroupCachePhysiNode { typedef struct SStbJoinDynCtrlBasic { int32_t vgSlot[2]; int32_t uidSlot[2]; + bool batchJoin; } SStbJoinDynCtrlBasic; typedef struct SDynQueryCtrlPhysiNode { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8e57a7ea7e..bbaccd3968 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5470,6 +5470,7 @@ int32_t tSerializeSOperatorParam(SEncoder* pEncoder, SOperatorParam* pOpParam) { switch (pOpParam->opType) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { STableScanOperatorParam* pScan = (STableScanOperatorParam*)pOpParam->value; + if (tEncodeI8(pEncoder, pScan->tableSeq) < 0) return -1; int32_t uidNum = taosArrayGetSize(pScan->pUidList); if (tEncodeI32(pEncoder, uidNum) < 0) return -1; for (int32_t m = 0; m < uidNum; ++m) { @@ -5499,6 +5500,7 @@ int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam* pOpParam) case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { STableScanOperatorParam* pScan = taosMemoryMalloc(sizeof(STableScanOperatorParam)); if (NULL == pScan) return -1; + if (tDecodeI8(pDecoder, (int8_t*)&pScan->tableSeq) < 0) return -1; int32_t uidNum = 0; int64_t uid = 0; if (tDecodeI32(pDecoder, &uidNum) < 0) return -1; diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index 003adf3437..7d8c0f4b59 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -26,9 +26,24 @@ typedef struct SDynQueryCtrlExecInfo { int64_t postBlkRows; } SDynQueryCtrlExecInfo; +typedef struct SStbJoinTableList { + void *pNext; + int64_t uidNum; + int64_t readIdx; + int32_t *pLeftVg; + int64_t *pLeftUid; + int32_t *pRightVg; + int64_t *pRightUid; +} SStbJoinTableList; + typedef struct SStbJoinPrevJoinCtx { - SSDataBlock* pLastBlk; - int32_t lastRow; + SSDataBlock* pLastBlk; + int32_t lastRow; + bool joinBuild; + SSHashObj* leftVg; + SSHashObj* rightVg; + int64_t tableNum; + SStbJoinTableList* pListHead; } SStbJoinPrevJoinCtx; typedef struct SStbJoinPostJoinCtx { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 1c1dab1df7..4c70697bb8 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -152,6 +152,7 @@ typedef struct SSortMergeJoinOperatorParam { typedef struct SExchangeOperatorBasicParam { int32_t vgId; int32_t srcOpType; + bool tableSeq; SArray* uidList; } SExchangeOperatorBasicParam; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index a67aed6685..53311a451e 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -33,6 +33,12 @@ static void destroyDynQueryCtrlOperator(void* param) { SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param; qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows); + + tSimpleHashClear(pDyn->stbJoin.ctx.prev.leftVg); + tSimpleHashClear(pDyn->stbJoin.ctx.prev.rightVg); + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftVg); + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightVg); + taosMemoryFreeClear(param); } @@ -41,12 +47,14 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; } - (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES); - if (NULL == *ppRes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) { - return TSDB_CODE_OUT_OF_MEMORY; + if (pChild) { + (*ppRes)->pChildren = taosArrayInit(1, POINTER_BYTES); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) { + return TSDB_CODE_OUT_OF_MEMORY; + } } SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam)); @@ -80,6 +88,7 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i pExc->multiParams = false; pExc->basic.vgId = *pVgId; + pExc->basic.tableSeq = true; pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t)); if (NULL == pExc->basic.uidList) { @@ -95,6 +104,49 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i return TSDB_CODE_SUCCESS; } + +static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, SSHashObj* pVg) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pChildren = NULL; + + SExchangeOperatorBatchParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam)); + if (NULL == pExc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pExc->multiParams = true; + pExc->pBatchs = tSimpleHashInit(tSimpleHashGetSize(pVg), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pExc->pBatchs) { + taosMemoryFree(pExc); + return TSDB_CODE_OUT_OF_MEMORY; + } + + SExchangeOperatorBasicParam basic; + basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + + int32_t iter = 0; + void* p = NULL; + while (p = tSimpleHashIterate(pVg, p, &iter)) { + int32_t* pVgId = tSimpleHashGetKey(p, NULL); + SArray* pUidList = *(SArray**)p; + basic.vgId = *pVgId; + basic.uidList = pUidList; + basic.tableSeq = false; + + tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)); + } + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; + (*ppRes)->downstreamIdx = downstreamIdx; + (*ppRes)->value = pExc; + + return TSDB_CODE_SUCCESS; +} + + static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { @@ -123,7 +175,7 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, } -static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) { +static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) { int32_t rowIdx = pPrev->lastRow + 1; SColumnInfoData* pVg0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[0]); SColumnInfoData* pVg1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[1]); @@ -156,14 +208,60 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ return code; } +static int32_t buildSeqBatchStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) { + int64_t rowIdx = pPrev->pListHead->readIdx; + SOperatorParam* pExcParam0 = NULL; + SOperatorParam* pExcParam1 = NULL; + SOperatorParam* pGcParam0 = NULL; + SOperatorParam* pGcParam1 = NULL; + int32_t* leftVg = pPrev->pListHead->pLeftVg + rowIdx; + int64_t* leftUid = pPrev->pListHead->pLeftUid + rowIdx; + int32_t* rightVg = pPrev->pListHead->pRightVg + rowIdx; + int64_t* rightUid = pPrev->pListHead->pRightUid + rowIdx; + int32_t code = TSDB_CODE_SUCCESS; + + qError("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, + rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid); + + if (pPrev->leftVg) { + code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftVg); + if (TSDB_CODE_SUCCESS == code) { + code = buildBatchExchangeOperatorParam(&pExcParam1, 1, pPrev->rightVg); + } + if (TSDB_CODE_SUCCESS == code) { + tSimpleHashCleanup(pPrev->leftVg); + tSimpleHashCleanup(pPrev->rightVg); + pPrev->leftVg = NULL; + pPrev->rightVg = NULL; + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pExcParam0); + } + if (TSDB_CODE_SUCCESS == code) { + code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1); + } + if (TSDB_CODE_SUCCESS == code) { + code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); + } + return code; +} + + static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; SOperatorParam* pParam = NULL; - - int32_t code = buildStbJoinOperatorParam(pInfo, pPrev, &pParam); + int32_t code = TSDB_CODE_SUCCESS; + + if (pInfo->stbJoin.basic.batchJoin) { + code = buildSeqBatchStbJoinOperatorParam(pInfo, pPrev, &pParam); + } else { + code = buildSeqStbJoinOperatorParam(pInfo, pPrev, &pParam); + } if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); @@ -181,7 +279,7 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) } } -static void seqJoinWithSeqRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; @@ -201,7 +299,7 @@ static void seqJoinWithSeqRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes } } -static void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; @@ -220,15 +318,19 @@ static void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRe } if (pStbJoin->ctx.prev.pLastBlk) { - seqJoinWithSeqRetrieve(pOperator, ppRes); + seqJoinLaunchRetrieve(pOperator, ppRes); } } -SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) { +SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SSDataBlock* pRes = NULL; + if (pOperator->status == OP_EXEC_DONE) { + return pRes; + } + seqJoinContinueRetrieve(pOperator, &pRes); if (pRes) { return pRes; @@ -237,6 +339,7 @@ SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) { while (true) { SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (NULL == pBlock) { + pOperator->status = OP_EXEC_DONE; break; } @@ -246,7 +349,7 @@ SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) { pStbJoin->ctx.prev.pLastBlk = pBlock; pStbJoin->ctx.prev.lastRow = -1; - seqJoinWithSeqRetrieve(pOperator, &pRes); + seqJoinLaunchRetrieve(pOperator, &pRes); if (pRes) { break; } @@ -255,6 +358,203 @@ SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) { return pRes; } +static FORCE_INLINE int32_t addToJoinHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) { + SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize); + if (NULL == ppArray) { + SArray* pArray = taosArrayInit(10, valSize); + if (NULL == pArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL == taosArrayPush(pArray, pVal)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; + } + + if (NULL == taosArrayPush(*ppArray, pVal)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} + +static void freeStbJoinTableList(SStbJoinTableList* pList) { + if (NULL == pList) { + return; + } + taosMemoryFree(pList->pLeftVg); + taosMemoryFree(pList->pLeftUid); + taosMemoryFree(pList->pRightVg); + taosMemoryFree(pList->pRightUid); + taosMemoryFree(pList); +} + +static int32_t appendStbJoinTableList(SStbJoinTableList** ppHead, int64_t rows, int32_t* pLeftVg, int64_t* pLeftUid, int32_t* pRightVg, int64_t* pRightUid) { + SStbJoinTableList* pNew = taosMemoryMalloc(sizeof(SStbJoinTableList)); + if (NULL == pNew) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pNew->pLeftVg = taosMemoryMalloc(rows * sizeof(*pLeftVg)); + pNew->pLeftUid = taosMemoryMalloc(rows * sizeof(*pLeftUid)); + pNew->pRightVg = taosMemoryMalloc(rows * sizeof(*pRightVg)); + pNew->pRightUid = taosMemoryMalloc(rows * sizeof(*pRightUid)); + if (NULL == pNew->pLeftVg || NULL == pNew->pLeftUid || NULL == pNew->pRightVg || NULL == pNew->pRightUid) { + freeStbJoinTableList(pNew); + return TSDB_CODE_OUT_OF_MEMORY; + } + + memcpy(pNew->pLeftVg, pLeftVg, rows * sizeof(*pLeftVg)); + memcpy(pNew->pLeftUid, pLeftUid, rows * sizeof(*pLeftUid)); + memcpy(pNew->pRightVg, pRightVg, rows * sizeof(*pRightVg)); + memcpy(pNew->pRightUid, pRightUid, rows * sizeof(*pRightUid)); + + pNew->readIdx = 0; + pNew->uidNum = rows; + + if (*ppHead) { + pNew->pNext = *ppHead; + } else { + pNew->pNext = NULL; + } + + *ppHead = pNew; + + return TSDB_CODE_SUCCESS; +} + +static void doBuildStbJoinHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + SColumnInfoData* pVg0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[0]); + SColumnInfoData* pVg1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.vgSlot[1]); + SColumnInfoData* pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]); + SColumnInfoData* pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]); + + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i); + int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i); + int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i); + int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i); + + code = addToJoinHash(pStbJoin->ctx.prev.leftVg, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid)); + if (TSDB_CODE_SUCCESS != code) { + break; + } + code = addToJoinHash(pStbJoin->ctx.prev.rightVg, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid)); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = appendStbJoinTableList(&pStbJoin->ctx.prev.pListHead, pBlock->info.rows, (int32_t*)pVg0->pData, (int64_t*)pUid0->pData, (int32_t*)pVg1->pData, (int64_t*)pUid1->pData); + if (TSDB_CODE_SUCCESS == code) { + pStbJoin->ctx.prev.tableNum += pBlock->info.rows; + } + } + + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } +} + +static void buildStbJoinVgList(SOperatorInfo* pOperator) { + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + + while (true) { + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); + if (NULL == pBlock) { + break; + } + + pInfo->execInfo.prevBlkNum++; + pInfo->execInfo.prevBlkRows += pBlock->info.rows; + + doBuildStbJoinHash(pOperator, pBlock); + } + + pStbJoin->ctx.prev.joinBuild = true; +} + +static void seqBatchJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; + SStbJoinTableList* pNode = pPrev->pListHead; + + while (pNode) { + if (pNode->readIdx >= pNode->uidNum) { + pPrev->pListHead = pNode->pNext; + freeStbJoinTableList(pNode); + pNode = pPrev->pListHead; + continue; + } + + seqJoinLaunchPostJoin(pOperator, ppRes); + if (*ppRes) { + return; + } + } + + *ppRes = NULL; + return; +} + + +SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) { + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + SSDataBlock* pRes = NULL; + + if (pOperator->status == OP_EXEC_DONE) { + return pRes; + } + + if (!pStbJoin->ctx.prev.joinBuild) { + buildStbJoinVgList(pOperator); + if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftVg) <= 0 || tSimpleHashGetSize(pStbJoin->ctx.prev.rightVg) <= 0) { + pOperator->status = OP_EXEC_DONE; + return NULL; + } + } + + seqJoinContinueRetrieve(pOperator, &pRes); + if (pRes) { + return pRes; + } + + seqBatchJoinLaunchRetrieve(pOperator, &pRes); + return pRes; +} + +void freeVgTableList(void* ptr) { + taosArrayDestroy(*(SArray**)ptr); +} + + +int32_t initBatchStbJoinVgHash(SStbJoinPrevJoinCtx* pPrev) { + pPrev->leftVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pPrev->leftVg) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tSimpleHashSetFreeFp(pPrev->leftVg, freeVgTableList); + + pPrev->rightVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pPrev->rightVg) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tSimpleHashSetFreeFp(pPrev->rightVg, freeVgTableList); + + return TSDB_CODE_SUCCESS; +} + + SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo)); @@ -276,7 +576,16 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 switch (pInfo->qType) { case DYN_QTYPE_STB_HASH: memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); - nextFp = getResFromStbJoin; + pInfo->stbJoin.basic.batchJoin = false; + if (pInfo->stbJoin.basic.batchJoin) { + code = initBatchStbJoinVgHash(&pInfo->stbJoin.ctx.prev); + if (TSDB_CODE_SUCCESS != code) { + goto _error; + } + nextFp = seqBatchStableJoin; + } else { + nextFp = seqStableJoin; + } break; default: qError("unsupported dynamic query ctrl type: %d", pInfo->qType); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index b6edfd5820..f2e2d4840b 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -42,6 +42,7 @@ typedef struct SSourceDataInfo { const char* taskId; SArray* pSrcUidList; int32_t srcOpType; + bool tableSeq; } SSourceDataInfo; static void destroyExchangeOperatorInfo(void* param); @@ -417,7 +418,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { return code; } -int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType) { +int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; @@ -432,6 +433,7 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in if (NULL == pScan->pUidList) { return TSDB_CODE_OUT_OF_MEMORY; } + pScan->tableSeq = tableSeq; (*ppRes)->opType = srcOpType; (*ppRes)->downstreamIdx = 0; @@ -472,7 +474,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas req.queryId = pTaskInfo->id.queryId; req.execId = pSource->execId; if (pDataInfo->pSrcUidList) { - int32_t code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType); + int32_t code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq); taosArrayDestroy(pDataInfo->pSrcUidList); pDataInfo->pSrcUidList = NULL; if (TSDB_CODE_SUCCESS != code) { @@ -759,6 +761,8 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic dataInfo.index = *pIdx; dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); dataInfo.srcOpType = pBasicParam->srcOpType; + dataInfo.tableSeq = pBasicParam->tableSeq; + taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a92080fbab..43b086b7c0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -797,9 +797,14 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { return TSDB_CODE_INVALID_PARA; } - qError("add total %d dynamic tables to scan, exist num:%" PRId64, num, (int64_t)taosArrayGetSize(pListInfo->pTableList)); + qError("add total %d dynamic tables to scan, tableSeq:%d, exist num:%" PRId64, num, pParam->tableSeq, (int64_t)taosArrayGetSize(pListInfo->pTableList)); - pListInfo->oneTableForEachGroup = true; + if (pParam->tableSeq) { + pListInfo->oneTableForEachGroup = true; + } else { + pListInfo->oneTableForEachGroup = false; + pListInfo->numOfOuputGroups = 1; + } for (int32_t i = 0; i < num; ++i) { uint64_t* pUid = taosArrayGet(pParam->pUidList, i); @@ -846,7 +851,7 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { - if (pInfo->base.pTableListInfo->oneTableForEachGroup) { + if (pOperator->dynamicTask) { STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId); result->info.id.groupId = pKeyInfo->uid; } @@ -885,7 +890,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { - if (pInfo->base.pTableListInfo->oneTableForEachGroup) { + if (pOperator->dynamicTask) { STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId); result->info.id.groupId = pKeyInfo->uid; }