From e83a46b88f9e2c9c5ce040d4278af01cd29ecf2c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 26 Jul 2023 19:17:59 +0800 Subject: [PATCH] enh: cache necessary file for seq mode --- include/libs/nodes/plannodes.h | 4 +- source/libs/executor/inc/dynqueryctrl.h | 12 +- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/inc/groupcache.h | 1 + .../libs/executor/src/dynqueryctrloperator.c | 358 ++++++++++-------- source/libs/executor/src/groupcacheoperator.c | 77 ++-- source/libs/nodes/src/nodesCloneFuncs.c | 2 +- source/libs/nodes/src/nodesCodeFuncs.c | 12 +- source/libs/nodes/src/nodesMsgFuncs.c | 8 +- source/libs/planner/src/planOptimizer.c | 2 +- source/libs/planner/src/planPhysiCreater.c | 2 +- 11 files changed, 271 insertions(+), 208 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index a3d8af2263..3d2ad5c572 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -169,7 +169,7 @@ typedef struct SGroupCacheLogicNode { } SGroupCacheLogicNode; typedef struct SDynQueryCtrlStbJoin { - bool batchJoin; + bool batchFetch; SNodeList* pVgList; SNodeList* pUidList; } SDynQueryCtrlStbJoin; @@ -460,7 +460,7 @@ typedef struct SGroupCachePhysiNode { } SGroupCachePhysiNode; typedef struct SStbJoinDynCtrlBasic { - bool batchJoin; + bool batchFetch; int32_t vgSlot[2]; int32_t uidSlot[2]; } SStbJoinDynCtrlBasic; diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index 7d8c0f4b59..a95ec94f11 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -37,17 +37,19 @@ typedef struct SStbJoinTableList { } SStbJoinTableList; typedef struct SStbJoinPrevJoinCtx { - SSDataBlock* pLastBlk; - int32_t lastRow; bool joinBuild; - SSHashObj* leftVg; - SSHashObj* rightVg; + SSHashObj* leftHash; + SSHashObj* rightHash; + SSHashObj* tableTimes; + SSHashObj* onceTable; int64_t tableNum; SStbJoinTableList* pListHead; } SStbJoinPrevJoinCtx; typedef struct SStbJoinPostJoinCtx { - bool isStarted; + bool isStarted; + int64_t rightCurrUid; + int64_t rightNextUid; } SStbJoinPostJoinCtx; typedef struct SStbJoinDynCtrlCtx { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 69f3e934e4..b6b3930711 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -110,6 +110,7 @@ typedef struct SGcOperatorParam { int32_t downstreamIdx; int32_t vgId; int64_t tbUid; + bool needCache; } SGcOperatorParam; typedef struct SExprSupp { diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 0523b19308..edc2382ef0 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -75,6 +75,7 @@ typedef struct SGroupCacheData { TdThreadMutex mutex; SArray* waitQueue; bool fetchDone; + bool needCache; SSDataBlock* pBlock; SGcVgroupCtx* pVgCtx; int32_t downstreamIdx; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 49cada2549..eadf41a35f 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -39,19 +39,28 @@ static void destroyDynQueryCtrlOperator(void* param) { qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows); - if (pDyn->stbJoin.ctx.prev.leftVg) { - tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.leftVg, freeVgTableList); - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftVg); + if (pDyn->stbJoin.basic.batchFetch) { + if (pDyn->stbJoin.ctx.prev.leftHash) { + tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.leftHash, freeVgTableList); + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftHash); + } + if (pDyn->stbJoin.ctx.prev.rightHash) { + tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightHash, freeVgTableList); + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightHash); + } } - if (pDyn->stbJoin.ctx.prev.rightVg) { - tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightVg, freeVgTableList); - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightVg); + + if (pDyn->stbJoin.ctx.prev.tableTimes) { + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.tableTimes); + } + if (pDyn->stbJoin.ctx.prev.onceTable) { + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.onceTable); } taosMemoryFreeClear(param); } -static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, SOperatorParam* pChild) { +static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, bool needCache, SOperatorParam* pChild) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; @@ -77,6 +86,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, pGc->downstreamIdx = downstreamIdx; pGc->vgId = vgId; pGc->tbUid = tbUid; + pGc->needCache = needCache; (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; (*ppRes)->downstreamIdx = downstreamIdx; @@ -85,7 +95,7 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, return TSDB_CODE_SUCCESS; } -static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid, SOperatorParam* pChild) { +static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; @@ -189,41 +199,21 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, return TSDB_CODE_SUCCESS; } - -static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) { - int32_t rowIdx = pPrev->lastRow + 1; - SColumnInfoData* pVg0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[0]); - SColumnInfoData* pVg1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[1]); - SColumnInfoData* pUid0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.uidSlot[0]); - SColumnInfoData* pUid1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.uidSlot[1]); - SOperatorParam* pExcParam0 = NULL; - SOperatorParam* pExcParam1 = NULL; - SOperatorParam* pGcParam0 = NULL; - SOperatorParam* pGcParam1 = NULL; - int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * rowIdx); - int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * rowIdx); - int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * rowIdx); - int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * rowIdx); - - qError("start stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, *leftVg, *leftUid, *rightVg, *rightUid); +static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) { + if (batchFetch) { + return true; + } - int32_t code = buildExchangeOperatorParam(&pExcParam0, 0, leftVg, leftUid, NULL); - if (TSDB_CODE_SUCCESS == code) { - code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid, NULL); + if (rightTable) { + return pPost->rightCurrUid == pPost->rightNextUid; } - if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pExcParam0); - } - if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1); - } - if (TSDB_CODE_SUCCESS == code) { - code = buildMergeJoinOperatorParam(ppParam, false, pGcParam0, pGcParam1); - } - return code; + + uint32_t* num = tSimpleHashGet(pPrev->tableTimes, &uid, sizeof(uid)); + + return (NULL == num) ? false : true; } -static int32_t buildSeqBatchStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) { +static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) { int64_t rowIdx = pPrev->pListHead->readIdx; SOperatorParam* pExcParam0 = NULL; SOperatorParam* pExcParam1 = NULL; @@ -238,24 +228,31 @@ static int32_t buildSeqBatchStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInf qError("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid); - if (pPrev->leftVg) { - code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftVg); - if (TSDB_CODE_SUCCESS == code) { - code = buildBatchExchangeOperatorParam(&pExcParam1, 1, pPrev->rightVg); + if (pInfo->stbJoin.basic.batchFetch) { + if (pPrev->leftHash) { + code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftHash); + if (TSDB_CODE_SUCCESS == code) { + code = buildBatchExchangeOperatorParam(&pExcParam1, 1, pPrev->rightHash); + } + if (TSDB_CODE_SUCCESS == code) { + tSimpleHashCleanup(pPrev->leftHash); + tSimpleHashCleanup(pPrev->rightHash); + pPrev->leftHash = NULL; + pPrev->rightHash = NULL; + } } + } else { + code = buildExchangeOperatorParam(&pExcParam0, 0, leftVg, leftUid); if (TSDB_CODE_SUCCESS == code) { - tSimpleHashCleanup(pPrev->leftVg); - tSimpleHashCleanup(pPrev->rightVg); - pPrev->leftVg = NULL; - pPrev->rightVg = NULL; + code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid); } } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pExcParam0); + code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, tableNeedCache(*leftUid, pPrev, pPost, false, pInfo->stbJoin.basic.batchFetch), pExcParam0); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1); + code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, tableNeedCache(*rightUid, pPrev, pPost, true, pInfo->stbJoin.basic.batchFetch), pExcParam1); } if (TSDB_CODE_SUCCESS == code) { code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1); @@ -270,13 +267,7 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; SOperatorParam* pParam = NULL; - int32_t code = TSDB_CODE_SUCCESS; - - if (pInfo->stbJoin.basic.batchJoin) { - code = buildSeqBatchStbJoinOperatorParam(pInfo, pPrev, &pParam); - } else { - code = buildSeqStbJoinOperatorParam(pInfo, pPrev, &pParam); - } + int32_t code = buildSeqStbJoinOperatorParam(pInfo, pPrev, pPost, &pParam); if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); @@ -294,86 +285,27 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) } } -static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { - SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; - SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; - SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; - - while (true) { - if ((pPrev->lastRow + 1) >= pPrev->pLastBlk->info.rows) { - *ppRes = NULL; - pPrev->pLastBlk = NULL; - return; - } - - seqJoinLaunchPostJoin(pOperator, ppRes); - pPrev->lastRow++; - if (*ppRes) { - break; - } - } -} - static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; - SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; - SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; - SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; + SStbJoinPostJoinCtx* pPost = &pInfo->stbJoin.ctx.post; - if (pPost->isStarted) { - qDebug("%s dynQueryCtrl retrieve block from post op", GET_TASKID(pOperator->pTaskInfo)); - *ppRes = getNextBlockFromDownstream(pOperator, 1); - if (NULL == *ppRes) { - pPost->isStarted = false; - } else { - pInfo->execInfo.postBlkNum++; - pInfo->execInfo.postBlkRows += (*ppRes)->info.rows; - return; - } + if (!pPost->isStarted) { + return; } - if (pStbJoin->ctx.prev.pLastBlk) { - seqJoinLaunchRetrieve(pOperator, ppRes); + qDebug("%s dynQueryCtrl continue to retrieve block from post op", GET_TASKID(pOperator->pTaskInfo)); + + *ppRes = getNextBlockFromDownstream(pOperator, 1); + if (NULL == *ppRes) { + pPost->isStarted = false; + } else { + pInfo->execInfo.postBlkNum++; + pInfo->execInfo.postBlkRows += (*ppRes)->info.rows; + return; } } -SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { - SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; - SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; - SSDataBlock* pRes = NULL; - - if (pOperator->status == OP_EXEC_DONE) { - return pRes; - } - - seqJoinContinueRetrieve(pOperator, &pRes); - if (pRes) { - return pRes; - } - - while (true) { - SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); - if (NULL == pBlock) { - pOperator->status = OP_EXEC_DONE; - break; - } - - pInfo->execInfo.prevBlkNum++; - pInfo->execInfo.prevBlkRows += pBlock->info.rows; - - pStbJoin->ctx.prev.pLastBlk = pBlock; - pStbJoin->ctx.prev.lastRow = -1; - - seqJoinLaunchRetrieve(pOperator, &pRes); - if (pRes) { - break; - } - } - - return pRes; -} - -static FORCE_INLINE int32_t addToJoinHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) { +static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, int32_t keySize, void* pVal, int32_t valSize) { SArray** ppArray = tSimpleHashGet(pHash, pKey, keySize); if (NULL == ppArray) { SArray* pArray = taosArrayInit(10, valSize); @@ -396,6 +328,37 @@ static FORCE_INLINE int32_t addToJoinHash(SSHashObj* pHash, void* pKey, int32_t return TSDB_CODE_SUCCESS; } +static FORCE_INLINE int32_t addToJoinTableHash(SSHashObj* pHash, SSHashObj* pOnceHash, void* pKey, int32_t keySize) { + uint32_t* pNum = tSimpleHashGet(pHash, pKey, keySize); + if (NULL == pNum) { + uint32_t n = 1; + if (tSimpleHashPut(pHash, pKey, keySize, &n, sizeof(n))) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (tSimpleHashPut(pOnceHash, pKey, keySize, NULL, 0)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; + } + + switch (*pNum) { + case 0: + break; + case UINT32_MAX: + *pNum = 0; + break; + default: + if (1 == (*pNum)) { + tSimpleHashRemove(pOnceHash, pKey, keySize); + } + (*pNum)++; + break; + } + + return TSDB_CODE_SUCCESS; +} + + static void freeStbJoinTableList(SStbJoinTableList* pList) { if (NULL == pList) { return; @@ -440,7 +403,7 @@ static int32_t appendStbJoinTableList(SStbJoinTableList** ppHead, int64_t rows, return TSDB_CODE_SUCCESS; } -static void doBuildStbJoinHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) { +static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; @@ -449,17 +412,28 @@ static void doBuildStbJoinHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SColumnInfoData* pUid0 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[0]); SColumnInfoData* pUid1 = taosArrayGet(pBlock->pDataBlock, pStbJoin->basic.uidSlot[1]); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i); - int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i); - int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i); - int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i); + if (pStbJoin->basic.batchFetch) { + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int32_t* leftVg = (int32_t*)(pVg0->pData + pVg0->info.bytes * i); + int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i); + int32_t* rightVg = (int32_t*)(pVg1->pData + pVg1->info.bytes * i); + int64_t* rightUid = (int64_t*)(pUid1->pData + pUid1->info.bytes * i); - code = addToJoinHash(pStbJoin->ctx.prev.leftVg, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid)); - if (TSDB_CODE_SUCCESS != code) { - break; + code = addToJoinVgroupHash(pStbJoin->ctx.prev.leftHash, leftVg, sizeof(*leftVg), leftUid, sizeof(*leftUid)); + if (TSDB_CODE_SUCCESS != code) { + break; + } + code = addToJoinVgroupHash(pStbJoin->ctx.prev.rightHash, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid)); + if (TSDB_CODE_SUCCESS != code) { + break; + } } - code = addToJoinHash(pStbJoin->ctx.prev.rightVg, rightVg, sizeof(*rightVg), rightUid, sizeof(*rightUid)); + } + + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i); + + code = addToJoinTableHash(pStbJoin->ctx.prev.tableTimes, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid)); if (TSDB_CODE_SUCCESS != code) { break; } @@ -478,7 +452,64 @@ static void doBuildStbJoinHash(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } } -static void buildStbJoinVgList(SOperatorInfo* pOperator) { +static void updatePostJoinRightTableUid(SStbJoinDynCtrlInfo* pStbJoin) { + SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; + SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; + SStbJoinTableList* pNode = pPrev->pListHead; + int64_t* rightUid = pNode->pRightUid; + int64_t readIdx = pNode->readIdx + 1; + + if (pPost->rightNextUid) { + pPost->rightCurrUid = pPost->rightNextUid; + } else { + pPost->rightCurrUid = *rightUid; + } + + while (true) { + if (readIdx < pNode->uidNum) { + pPost->rightNextUid = *(rightUid + readIdx); + return; + } + + pNode = pNode->pNext; + if (NULL == pNode) { + pPost->rightNextUid = 0; + return; + } + + rightUid = pNode->pRightUid; + readIdx = 0; + } +} + +static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) { + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + + updatePostJoinRightTableUid(pStbJoin); + + if (tSimpleHashGetSize(pStbJoin->ctx.prev.tableTimes) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) { + tSimpleHashClear(pStbJoin->ctx.prev.tableTimes); + return; + } + + uint64_t* pUid = NULL; + int32_t iter = 0; + while (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter)) { + tSimpleHashRemove(pStbJoin->ctx.prev.tableTimes, pUid, sizeof(*pUid)); + } + + qError("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.tableTimes)); + + // debug only + iter = 0; + uint32_t* num = NULL; + while (num = tSimpleHashIterate(pStbJoin->ctx.prev.tableTimes, num, &iter)) { + ASSERT(*num > 1); + } +} + +static void buildStbJoinTableList(SOperatorInfo* pOperator) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; @@ -491,13 +522,15 @@ static void buildStbJoinVgList(SOperatorInfo* pOperator) { pInfo->execInfo.prevBlkNum++; pInfo->execInfo.prevBlkRows += pBlock->info.rows; - doBuildStbJoinHash(pOperator, pBlock); + doBuildStbJoinTableHash(pOperator, pBlock); } + postProcessStbJoinTableHash(pOperator); + pStbJoin->ctx.prev.joinBuild = true; } -static void seqBatchJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; @@ -523,8 +556,7 @@ static void seqBatchJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** p return; } - -SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) { +SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SSDataBlock* pRes = NULL; @@ -534,8 +566,8 @@ SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) { } if (!pStbJoin->ctx.prev.joinBuild) { - buildStbJoinVgList(pOperator); - if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftVg) <= 0 || tSimpleHashGetSize(pStbJoin->ctx.prev.rightVg) <= 0) { + buildStbJoinTableList(pOperator); + if (pInfo->execInfo.prevBlkRows <= 0) { pOperator->status = OP_EXEC_DONE; return NULL; } @@ -546,18 +578,28 @@ SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) { return pRes; } - seqBatchJoinLaunchRetrieve(pOperator, &pRes); + seqJoinLaunchRetrieve(pOperator, &pRes); return pRes; } -int32_t initBatchStbJoinVgHash(SStbJoinPrevJoinCtx* pPrev) { - pPrev->leftVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - if (NULL == pPrev->leftVg) { +int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) { + if (batchFetch) { + pPrev->leftHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pPrev->leftHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pPrev->rightHash = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pPrev->rightHash) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + pPrev->tableTimes = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (NULL == pPrev->tableTimes) { return TSDB_CODE_OUT_OF_MEMORY; } - - pPrev->rightVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - if (NULL == pPrev->rightVg) { + pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (NULL == pPrev->onceTable) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -586,15 +628,11 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 switch (pInfo->qType) { case DYN_QTYPE_STB_HASH: memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); - if (pInfo->stbJoin.basic.batchJoin) { - code = initBatchStbJoinVgHash(&pInfo->stbJoin.ctx.prev); - if (TSDB_CODE_SUCCESS != code) { - goto _error; - } - nextFp = seqBatchStableJoin; - } else { - nextFp = seqStableJoin; + code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch); + if (TSDB_CODE_SUCCESS != code) { + goto _error; } + nextFp = seqStableJoin; break; default: qError("unsupported dynamic query ctrl type: %d", pInfo->qType); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 5535fe0216..3101dcc32e 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -556,22 +556,25 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat } -static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch) { +static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch, bool needCache) { taosThreadMutexInit(&pGroup->mutex, NULL); pGroup->downstreamIdx = downstreamIdx; pGroup->vgId = vgId; pGroup->fileId = -1; pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic)); pGroup->startOffset = -1; + pGroup->needCache = needCache; pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); } static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) { SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; + SGcOperatorParam* pGcParam = pParam->value; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGroupCacheData grpData = {0}; - initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch); + + initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache); while (true) { if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) { @@ -623,12 +626,16 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; + int64_t newBlkIdx = 0; SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId)); if (NULL == pGroup) { if (pGCache->batchFetch) { + SGcOperatorParam fakeGcParam = {0}; SOperatorParam fakeParam = {0}; + fakeGcParam.needCache = true; fakeParam.downstreamIdx = pSession->downstreamIdx; + fakeParam.value = &fakeGcParam; code = addNewGroupData(pOperator, &fakeParam, &pGroup, -1, pBlock->info.id.groupId); if (TSDB_CODE_SUCCESS != code) { return code; @@ -645,22 +652,31 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD return code; } } - - SGcBlkBufInfo newBlkBuf; - code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf); - if (code) { - return code; - } - int64_t blkIdx = 0; - code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &blkIdx); - if (code) { - return code; + if (pGroup->needCache) { + qError("add block to group cache"); + + SGcBlkBufInfo newBlkBuf; + code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf); + if (code) { + return code; + } + + code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &newBlkIdx); + if (code) { + return code; + } + } else { + qError("no need to add block to group cache"); + + pGroup->pBlock = pBlock; } notifyWaitingSessions(pGroup->waitQueue); if (pGroup == pSession->pGroupData) { - pSession->lastBlkId = blkIdx; + if (pGroup->needCache) { + pSession->lastBlkId = newBlkIdx; + } *continueFetch = false; } @@ -746,26 +762,31 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; *got = true; - - SGcBlkList* pBlkList = &pSession->pGroupData->blkList; - taosRLockLatch(&pBlkList->lock); - int64_t blkNum = taosArrayGetSize(pBlkList->pList); - if (pSession->lastBlkId < 0) { - if (blkNum > 0) { - SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0); + + if (pSession->pGroupData->needCache) { + SGcBlkList* pBlkList = &pSession->pGroupData->blkList; + taosRLockLatch(&pBlkList->lock); + int64_t blkNum = taosArrayGetSize(pBlkList->pList); + if (pSession->lastBlkId < 0) { + if (blkNum > 0) { + SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0); + taosRUnLockLatch(&pBlkList->lock); + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); + pSession->lastBlkId = 0; + return code; + } + } else if ((pSession->lastBlkId + 1) < blkNum) { + SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1); taosRUnLockLatch(&pBlkList->lock); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); - pSession->lastBlkId = 0; + pSession->lastBlkId++; return code; } - } else if ((pSession->lastBlkId + 1) < blkNum) { - SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1); taosRUnLockLatch(&pBlkList->lock); - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); - pSession->lastBlkId++; - return code; + } else if (pSession->pGroupData->pBlock) { + *ppRes = pSession->pGroupData->pBlock; + pSession->pGroupData->pBlock = NULL; } - taosRUnLockLatch(&pBlkList->lock); if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { *ppRes = NULL; @@ -929,7 +950,7 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock if (TSDB_CODE_SUCCESS != code) { return code; } - } else { + } else if (pSession->pGroupData->needCache) { SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (ppBlock) { releaseBaseBlockToList(pCtx, *ppBlock); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 33967942f5..800b747e68 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -550,7 +550,7 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache static int32_t logicDynQueryCtrlCopy(const SDynQueryCtrlLogicNode* pSrc, SDynQueryCtrlLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(qType); - COPY_SCALAR_FIELD(stbJoin.batchJoin); + COPY_SCALAR_FIELD(stbJoin.batchFetch); CLONE_NODE_LIST_FIELD(stbJoin.pVgList); CLONE_NODE_LIST_FIELD(stbJoin.pUidList); return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index ede042005a..970bd6fcc8 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1228,7 +1228,7 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) { } static const char* jkDynQueryCtrlLogicPlanQueryType = "QueryType"; -static const char* jkDynQueryCtrlLogicPlanStbJoinBatchJoin = "BatchJoin"; +static const char* jkDynQueryCtrlLogicPlanStbJoinBatchFetch = "BatchFetch"; static const char* jkDynQueryCtrlLogicPlanStbJoinVgList = "VgroupList"; static const char* jkDynQueryCtrlLogicPlanStbJoinUidList = "UidList"; @@ -1240,7 +1240,7 @@ static int32_t logicDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchJoin, pNode->stbJoin.batchJoin); + code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchFetch, pNode->stbJoin.batchFetch); } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkDynQueryCtrlLogicPlanStbJoinVgList, pNode->stbJoin.pVgList); @@ -1260,7 +1260,7 @@ static int32_t jsonToLogicDynQueryCtrlNode(const SJson* pJson, void* pObj) { tjsonGetNumberValue(pJson, jkDynQueryCtrlLogicPlanQueryType, pNode->qType, code); } if (TSDB_CODE_SUCCESS == code) { - tjsonGetBoolValue(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchJoin, &pNode->stbJoin.batchJoin); + tjsonGetBoolValue(pJson, jkDynQueryCtrlLogicPlanStbJoinBatchFetch, &pNode->stbJoin.batchFetch); } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkDynQueryCtrlLogicPlanStbJoinVgList, &pNode->stbJoin.pVgList); @@ -3005,7 +3005,7 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) { } static const char* jkDynQueryCtrlPhysiPlanQueryType = "QueryType"; -static const char* jkDynQueryCtrlPhysiPlanBatchJoin = "BatchJoin"; +static const char* jkDynQueryCtrlPhysiPlanBatchFetch = "BatchFetch"; static const char* jkDynQueryCtrlPhysiPlanVgSlot0 = "VgSlot[0]"; static const char* jkDynQueryCtrlPhysiPlanVgSlot1 = "VgSlot[1]"; static const char* jkDynQueryCtrlPhysiPlanUidSlot0 = "UidSlot[0]"; @@ -3021,7 +3021,7 @@ static int32_t physiDynQueryCtrlNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { switch (pNode->qType) { case DYN_QTYPE_STB_HASH: { - code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanBatchJoin, pNode->stbJoin.batchJoin); + code = tjsonAddBoolToObject(pJson, jkDynQueryCtrlPhysiPlanBatchFetch, pNode->stbJoin.batchFetch); if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0]); } @@ -3055,7 +3055,7 @@ static int32_t jsonToPhysiDynQueryCtrlNode(const SJson* pJson, void* pObj) { case DYN_QTYPE_STB_HASH: { tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanQueryType, pNode->qType, code); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanBatchJoin, &pNode->stbJoin.batchJoin); + code = tjsonGetBoolValue(pJson, jkDynQueryCtrlPhysiPlanBatchFetch, &pNode->stbJoin.batchFetch); } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkDynQueryCtrlPhysiPlanVgSlot0, pNode->stbJoin.vgSlot[0], code); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index c9fe192e73..b869216b36 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3608,7 +3608,7 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) { enum { PHY_DYN_QUERY_CTRL_CODE_BASE_NODE = 1, PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE, - PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN, + PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_FETCH, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT1, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_UID_SLOT0, @@ -3625,7 +3625,7 @@ static int32_t physiDynQueryCtrlNodeToMsg(const void* pObj, STlvEncoder* pEncode if (TSDB_CODE_SUCCESS == code) { switch (pNode->qType) { case DYN_QTYPE_STB_HASH: { - code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN, pNode->stbJoin.batchJoin); + code = tlvEncodeBool(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_FETCH, pNode->stbJoin.batchFetch); if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeEnum(pEncoder, PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0, pNode->stbJoin.vgSlot[0]); } @@ -3660,8 +3660,8 @@ static int32_t msgToPhysiDynQueryCtrlNode(STlvDecoder* pDecoder, void* pObj) { case PHY_DYN_QUERY_CTRL_CODE_QUERY_TYPE: code = tlvDecodeEnum(pTlv, &pNode->qType, sizeof(pNode->qType)); break; - case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_JOIN: - code = tlvDecodeBool(pTlv, &pNode->stbJoin.batchJoin); + case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_BATCH_FETCH: + code = tlvDecodeBool(pTlv, &pNode->stbJoin.batchFetch); break; case PHY_DYN_QUERY_CTRL_CODE_STB_JOIN_VG_SLOT0: code = tlvDecodeEnum(pTlv, &pNode->stbJoin.vgSlot[0], sizeof(pNode->stbJoin.vgSlot[0])); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index bfaecaf4b0..8b533b7f8c 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3352,7 +3352,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p } pDynCtrl->qType = DYN_QTYPE_STB_HASH; - pDynCtrl->stbJoin.batchJoin = true; + pDynCtrl->stbJoin.batchFetch = true; if (TSDB_CODE_SUCCESS == code) { pDynCtrl->node.pChildren = nodesMakeList(); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index ae182a309d..2948cbce7c 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1026,7 +1026,7 @@ static int32_t updateDynQueryCtrlStbJoinInfo(SPhysiPlanContext* pCxt, SNodeList* pDynCtrl->stbJoin.uidSlot[i] = ((SColumnNode*)pNode)->slotId; ++i; } - pDynCtrl->stbJoin.batchJoin = pLogicNode->stbJoin.batchJoin; + pDynCtrl->stbJoin.batchFetch = pLogicNode->stbJoin.batchFetch; } return code;