From 9ed1417bba2c98804b81a2ae2d7f3de1df683b8d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 28 Jul 2023 18:00:33 +0800 Subject: [PATCH] enh: remove group cache --- source/libs/executor/inc/dynqueryctrl.h | 16 +- source/libs/executor/inc/executorInt.h | 11 + source/libs/executor/inc/groupcache.h | 15 +- source/libs/executor/inc/operator.h | 14 +- .../libs/executor/src/dynqueryctrloperator.c | 287 +++++++++++++----- source/libs/executor/src/exchangeoperator.c | 71 +++-- source/libs/executor/src/groupcacheoperator.c | 215 +++++++++++-- source/libs/executor/src/mergejoinoperator.c | 8 +- source/libs/executor/src/operator.c | 80 +++-- source/libs/executor/src/scanoperator.c | 6 +- source/libs/planner/src/planOptimizer.c | 4 +- source/libs/qworker/src/qworker.c | 20 +- 12 files changed, 563 insertions(+), 184 deletions(-) diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index a95ec94f11..a6d0a551b8 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -24,6 +24,8 @@ typedef struct SDynQueryCtrlExecInfo { int64_t prevBlkRows; int64_t postBlkNum; int64_t postBlkRows; + int64_t leftCacheNum; + int64_t rightCacheNum; } SDynQueryCtrlExecInfo; typedef struct SStbJoinTableList { @@ -40,7 +42,8 @@ typedef struct SStbJoinPrevJoinCtx { bool joinBuild; SSHashObj* leftHash; SSHashObj* rightHash; - SSHashObj* tableTimes; + SSHashObj* leftCache; + SSHashObj* rightCache; SSHashObj* onceTable; int64_t tableNum; SStbJoinTableList* pListHead; @@ -48,6 +51,11 @@ typedef struct SStbJoinPrevJoinCtx { typedef struct SStbJoinPostJoinCtx { bool isStarted; + bool leftNeedCache; + bool rightNeedCache; + int32_t leftVgId; + int32_t rightVgId; + int64_t leftCurrUid; int64_t rightCurrUid; int64_t rightNextUid; } SStbJoinPostJoinCtx; @@ -58,13 +66,13 @@ typedef struct SStbJoinDynCtrlCtx { } SStbJoinDynCtrlCtx; typedef struct SStbJoinDynCtrlInfo { - SStbJoinDynCtrlBasic basic; - SStbJoinDynCtrlCtx ctx; + SDynQueryCtrlExecInfo execInfo; + SStbJoinDynCtrlBasic basic; + SStbJoinDynCtrlCtx ctx; } SStbJoinDynCtrlInfo; typedef struct SDynQueryCtrlOperatorInfo { EDynQueryType qType; - SDynQueryCtrlExecInfo execInfo; union { SStbJoinDynCtrlInfo stbJoin; }; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index b6b3930711..6dcec955b9 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -113,6 +113,12 @@ typedef struct SGcOperatorParam { bool needCache; } SGcOperatorParam; +typedef struct SGcNotifyOperatorParam { + int32_t downstreamIdx; + int32_t vgId; + int64_t tbUid; +} SGcNotifyOperatorParam; + typedef struct SExprSupp { SExprInfo* pExprInfo; int32_t numOfExprs; // the number of scalar expression in group operator @@ -168,6 +174,11 @@ typedef struct SExchangeOperatorParam { SExchangeOperatorBasicParam basic; } SExchangeOperatorParam; +typedef struct SExchangeSrcIndex { + int32_t srcIdx; + int32_t inUseIdx; +} SExchangeSrcIndex; + typedef struct SExchangeInfo { SArray* pSources; SSHashObj* pHashSources; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index edc2382ef0..250bc70fff 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -24,7 +24,7 @@ extern "C" { #pragma pack(push, 1) typedef struct SGcBlkBufBasic { - uint32_t fileId; + int32_t fileId; int64_t blkId; int64_t offset; int64_t bufSize; @@ -36,9 +36,15 @@ typedef struct SGroupCacheFileFd { TdFilePtr fd; } SGroupCacheFileFd; +typedef struct SGroupCacheFileInfo { + uint32_t groupNum; + bool deleted; + SGroupCacheFileFd fd; +} SGroupCacheFileInfo; + typedef struct SGcFileCacheCtx { int64_t fileSize; - uint32_t fileId; + int32_t fileId; SHashObj* pCacheFile; int32_t baseNameLen; char baseFilename[256]; @@ -61,6 +67,7 @@ typedef struct SGcDownstreamCtx { } SGcDownstreamCtx; typedef struct SGcVgroupCtx { + int32_t id; SArray* pTbList; uint64_t lastBlkUid; SGcFileCacheCtx fileCtx; @@ -81,7 +88,7 @@ typedef struct SGroupCacheData { int32_t downstreamIdx; int32_t vgId; SGcBlkList blkList; - uint32_t fileId; + int32_t fileId; int64_t startOffset; } SGroupCacheData; @@ -124,7 +131,7 @@ typedef struct SGcBlkBufInfo { void* next; void* pBuf; SGcDownstreamCtx* pCtx; - SGroupCacheData* pGroup; + int64_t groupId; } SGcBlkBufInfo; typedef struct SGcExecInfo { diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 825f2cbd90..8b13d4d222 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -20,6 +20,11 @@ extern "C" { #endif +typedef enum SOperatorParamType{ + OP_GET_PARAM = 1, + OP_NOTIFY_PARAM +} SOperatorParamType; + typedef struct SOperatorCostInfo { double openCost; double totalCost; @@ -36,7 +41,7 @@ typedef void (*__optr_close_fn_t)(void* param); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_get_ext_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param); -typedef SSDataBlock* (*__optr_notify_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param); +typedef int32_t (*__optr_notify_fn_t)(struct SOperatorInfo* pOptr, SOperatorParam* param); typedef void (*__optr_state_fn_t)(struct SOperatorInfo* pOptr); typedef struct SOperatorFpSet { @@ -74,8 +79,10 @@ typedef struct SOperatorInfo { SExecTaskInfo* pTaskInfo; SOperatorCostInfo cost; SResultInfo resultInfo; - SOperatorParam* pOperatorParam; - SOperatorParam** pDownstreamParams; + SOperatorParam* pOperatorGetParam; + SOperatorParam* pOperatorNotifyParam; + SOperatorParam** pDownstreamGetParams; + SOperatorParam** pDownstreamNotifyParams; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfRealDownstream; @@ -171,6 +178,7 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32 void* pInfo, SExecTaskInfo* pTaskInfo); int32_t optrDefaultBufFn(SOperatorInfo* pOperator); SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); +int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx); SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int32_t idx); int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx); diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index eadf41a35f..0072bd5ae8 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -36,8 +36,9 @@ void freeVgTableList(void* ptr) { static void destroyDynQueryCtrlOperator(void* param) { SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param; - qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, - pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows); + qError("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64 ", leftCacheNum:%" PRId64 ", rightCacheNum:%" PRId64, + pDyn->stbJoin.execInfo.prevBlkNum, pDyn->stbJoin.execInfo.prevBlkRows, pDyn->stbJoin.execInfo.postBlkNum, + pDyn->stbJoin.execInfo.postBlkRows, pDyn->stbJoin.execInfo.leftCacheNum, pDyn->stbJoin.execInfo.rightCacheNum); if (pDyn->stbJoin.basic.batchFetch) { if (pDyn->stbJoin.ctx.prev.leftHash) { @@ -48,13 +49,16 @@ static void destroyDynQueryCtrlOperator(void* param) { tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightHash, freeVgTableList); tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightHash); } - } - - if (pDyn->stbJoin.ctx.prev.tableTimes) { - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.tableTimes); - } - if (pDyn->stbJoin.ctx.prev.onceTable) { - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.onceTable); + } else { + if (pDyn->stbJoin.ctx.prev.leftCache) { + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftCache); + } + if (pDyn->stbJoin.ctx.prev.rightCache) { + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightCache); + } + if (pDyn->stbJoin.ctx.prev.onceTable) { + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.onceTable); + } } taosMemoryFreeClear(param); @@ -95,6 +99,31 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, return TSDB_CODE_SUCCESS; } + +static FORCE_INLINE int32_t buildGroupCacheNotifyOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pChildren = NULL; + + SGcNotifyOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcNotifyOperatorParam)); + if (NULL == pGc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pGc->downstreamIdx = downstreamIdx; + pGc->vgId = vgId; + pGc->tbUid = tbUid; + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; + (*ppRes)->downstreamIdx = downstreamIdx; + (*ppRes)->value = pGc; + + return TSDB_CODE_SUCCESS; +} + + static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t* pVgId, int64_t* pUid) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { @@ -199,6 +228,29 @@ static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, return TSDB_CODE_SUCCESS; } + +static FORCE_INLINE int32_t buildMergeJoinNotifyOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pChildren = taosArrayInit(2, POINTER_BYTES); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (pChild0 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild0)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (pChild1 && NULL == taosArrayPush((*ppRes)->pChildren, &pChild1)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; + (*ppRes)->value = NULL; + + return TSDB_CODE_SUCCESS; +} + static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, bool rightTable, bool batchFetch) { if (batchFetch) { return true; @@ -208,11 +260,54 @@ static FORCE_INLINE bool tableNeedCache(int64_t uid, SStbJoinPrevJoinCtx* pPrev, return pPost->rightCurrUid == pPost->rightNextUid; } - uint32_t* num = tSimpleHashGet(pPrev->tableTimes, &uid, sizeof(uid)); + uint32_t* num = tSimpleHashGet(pPrev->leftCache, &uid, sizeof(uid)); return (NULL == num) ? false : true; } +static void updatePostJoinCurrTableInfo(SStbJoinDynCtrlInfo* pStbJoin) { + SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; + SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; + SStbJoinTableList* pNode = pPrev->pListHead; + int32_t* leftVgId = pNode->pLeftVg + pNode->readIdx; + int32_t* rightVgId = pNode->pRightVg + pNode->readIdx; + int64_t* leftUid = pNode->pLeftUid + pNode->readIdx; + int64_t* rightUid = pNode->pRightUid + pNode->readIdx; + int64_t readIdx = pNode->readIdx + 1; + int64_t rightPrevUid = pPost->rightCurrUid; + + pPost->leftCurrUid = *leftUid; + pPost->rightCurrUid = *rightUid; + + pPost->leftVgId = *leftVgId; + pPost->rightVgId = *rightVgId; + + while (true) { + if (readIdx < pNode->uidNum) { + pPost->rightNextUid = *(rightUid + readIdx); + break; + } + + pNode = pNode->pNext; + if (NULL == pNode) { + pPost->rightNextUid = 0; + break; + } + + rightUid = pNode->pRightUid; + readIdx = 0; + } + + pPost->leftNeedCache = tableNeedCache(*leftUid, pPrev, pPost, false, pStbJoin->basic.batchFetch); + pPost->rightNeedCache = tableNeedCache(*rightUid, pPrev, pPost, true, pStbJoin->basic.batchFetch); + + if (pPost->rightNeedCache && rightPrevUid != pPost->rightCurrUid) { + tSimpleHashPut(pPrev->rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid), NULL, 0); + pStbJoin->execInfo.rightCacheNum++; + } +} + + static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SStbJoinPostJoinCtx* pPost, SOperatorParam** ppParam) { int64_t rowIdx = pPrev->pListHead->readIdx; SOperatorParam* pExcParam0 = NULL; @@ -228,6 +323,8 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS qError("start %" PRId64 ":%" PRId64 "th stbJoin, left:%d,%" PRIu64 " - right:%d,%" PRIu64, rowIdx, pPrev->tableNum, *leftVg, *leftUid, *rightVg, *rightUid); + updatePostJoinCurrTableInfo(&pInfo->stbJoin); + if (pInfo->stbJoin.basic.batchFetch) { if (pPrev->leftHash) { code = buildBatchExchangeOperatorParam(&pExcParam0, 0, pPrev->leftHash); @@ -249,10 +346,10 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, tableNeedCache(*leftUid, pPrev, pPost, false, pInfo->stbJoin.basic.batchFetch), pExcParam0); + code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pExcParam0); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, tableNeedCache(*rightUid, pPrev, pPost, true, pInfo->stbJoin.basic.batchFetch), pExcParam1); + code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pPost->rightNeedCache, pExcParam1); } if (TSDB_CODE_SUCCESS == code) { code = buildMergeJoinOperatorParam(ppParam, pExcParam0 ? true : false, pGcParam0, pGcParam1); @@ -261,7 +358,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS } -static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static void seqJoinLaunchNewRetrieveImpl(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; @@ -277,17 +374,67 @@ static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) *ppRes = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam); if (*ppRes) { pPost->isStarted = true; - pInfo->execInfo.postBlkNum++; - pInfo->execInfo.postBlkRows += (*ppRes)->info.rows; + pStbJoin->execInfo.postBlkNum++; + pStbJoin->execInfo.postBlkRows += (*ppRes)->info.rows; qError("join res block retrieved"); } else { qError("Empty join res block retrieved"); } } -static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + +static int32_t notifySeqJoinTableCacheEnd(SOperatorInfo* pOperator, SStbJoinPostJoinCtx* pPost, bool leftTable) { + SOperatorParam* pGcParam = NULL; + SOperatorParam* pMergeJoinParam = NULL; + int32_t downstreamId = leftTable ? 0 : 1; + int32_t vgId = leftTable ? pPost->leftVgId : pPost->rightVgId; + int64_t uid = leftTable ? pPost->leftCurrUid : pPost->rightCurrUid; + + qError("notify table %" PRIu64 " in vgId %d downstreamId %d cache end", uid, vgId, downstreamId); + + int32_t code = buildGroupCacheNotifyOperatorParam(&pGcParam, downstreamId, vgId, uid); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + code = buildMergeJoinNotifyOperatorParam(&pMergeJoinParam, pGcParam, NULL); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + return optrDefaultNotifyFn(pOperator->pDownstream[1], pMergeJoinParam); +} + +static void handleSeqJoinCurrRetrieveEnd(SOperatorInfo* pOperator, SStbJoinDynCtrlInfo* pStbJoin) { + SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; + + pPost->isStarted = false; + + if (pStbJoin->basic.batchFetch) { + return; + } + + if (pPost->leftNeedCache) { + uint32_t* num = tSimpleHashGet(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); + if (--(*num) <= 0) { + tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, &pPost->leftCurrUid, sizeof(pPost->leftCurrUid)); + notifySeqJoinTableCacheEnd(pOperator, pPost, true); + } + } + + if (!pPost->rightNeedCache) { + void* v = tSimpleHashGet(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); + if (NULL != v) { + tSimpleHashRemove(pStbJoin->ctx.prev.rightCache, &pPost->rightCurrUid, sizeof(pPost->rightCurrUid)); + notifySeqJoinTableCacheEnd(pOperator, pPost, false); + } + } +} + + +static FORCE_INLINE void seqJoinContinueCurrRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinPostJoinCtx* pPost = &pInfo->stbJoin.ctx.post; + SStbJoinPrevJoinCtx* pPrev = &pInfo->stbJoin.ctx.prev; if (!pPost->isStarted) { return; @@ -297,10 +444,11 @@ static FORCE_INLINE void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDat *ppRes = getNextBlockFromDownstream(pOperator, 1); if (NULL == *ppRes) { - pPost->isStarted = false; + handleSeqJoinCurrRetrieveEnd(pOperator, &pInfo->stbJoin); + pPrev->pListHead->readIdx++; } else { - pInfo->execInfo.postBlkNum++; - pInfo->execInfo.postBlkRows += (*ppRes)->info.rows; + pInfo->stbJoin.execInfo.postBlkNum++; + pInfo->stbJoin.execInfo.postBlkRows += (*ppRes)->info.rows; return; } } @@ -428,14 +576,14 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc break; } } - } - - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i); - - code = addToJoinTableHash(pStbJoin->ctx.prev.tableTimes, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid)); - if (TSDB_CODE_SUCCESS != code) { - break; + } else { + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* leftUid = (int64_t*)(pUid0->pData + pUid0->info.bytes * i); + + code = addToJoinTableHash(pStbJoin->ctx.prev.leftCache, pStbJoin->ctx.prev.onceTable, leftUid, sizeof(*leftUid)); + if (TSDB_CODE_SUCCESS != code) { + break; + } } } @@ -452,59 +600,33 @@ static void doBuildStbJoinTableHash(SOperatorInfo* pOperator, SSDataBlock* pBloc } } -static void updatePostJoinRightTableUid(SStbJoinDynCtrlInfo* pStbJoin) { - SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; - SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; - SStbJoinTableList* pNode = pPrev->pListHead; - int64_t* rightUid = pNode->pRightUid; - int64_t readIdx = pNode->readIdx + 1; - - if (pPost->rightNextUid) { - pPost->rightCurrUid = pPost->rightNextUid; - } else { - pPost->rightCurrUid = *rightUid; - } - - while (true) { - if (readIdx < pNode->uidNum) { - pPost->rightNextUid = *(rightUid + readIdx); - return; - } - - pNode = pNode->pNext; - if (NULL == pNode) { - pPost->rightNextUid = 0; - return; - } - - rightUid = pNode->pRightUid; - readIdx = 0; - } -} static void postProcessStbJoinTableHash(SOperatorInfo* pOperator) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; - updatePostJoinRightTableUid(pStbJoin); + if (pStbJoin->basic.batchFetch) { + return; + } - if (tSimpleHashGetSize(pStbJoin->ctx.prev.tableTimes) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) { - tSimpleHashClear(pStbJoin->ctx.prev.tableTimes); + if (tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache) == tSimpleHashGetSize(pStbJoin->ctx.prev.onceTable)) { + tSimpleHashClear(pStbJoin->ctx.prev.leftCache); return; } uint64_t* pUid = NULL; int32_t iter = 0; while (pUid = tSimpleHashIterate(pStbJoin->ctx.prev.onceTable, pUid, &iter)) { - tSimpleHashRemove(pStbJoin->ctx.prev.tableTimes, pUid, sizeof(*pUid)); + tSimpleHashRemove(pStbJoin->ctx.prev.leftCache, pUid, sizeof(*pUid)); } - qError("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.tableTimes)); + pStbJoin->execInfo.leftCacheNum = tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache); + qError("more than 1 ref build table num %" PRId64, (int64_t)tSimpleHashGetSize(pStbJoin->ctx.prev.leftCache)); // debug only iter = 0; uint32_t* num = NULL; - while (num = tSimpleHashIterate(pStbJoin->ctx.prev.tableTimes, num, &iter)) { + while (num = tSimpleHashIterate(pStbJoin->ctx.prev.leftCache, num, &iter)) { ASSERT(*num > 1); } } @@ -519,8 +641,8 @@ static void buildStbJoinTableList(SOperatorInfo* pOperator) { break; } - pInfo->execInfo.prevBlkNum++; - pInfo->execInfo.prevBlkRows += pBlock->info.rows; + pStbJoin->execInfo.prevBlkNum++; + pStbJoin->execInfo.prevBlkRows += pBlock->info.rows; doBuildStbJoinTableHash(pOperator, pBlock); } @@ -530,7 +652,7 @@ static void buildStbJoinTableList(SOperatorInfo* pOperator) { pStbJoin->ctx.prev.joinBuild = true; } -static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; @@ -544,12 +666,13 @@ static void seqJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) continue; } - seqJoinLaunchPostJoin(pOperator, ppRes); - pPrev->pListHead->readIdx++; - + seqJoinLaunchNewRetrieveImpl(pOperator, ppRes); if (*ppRes) { return; } + + handleSeqJoinCurrRetrieveEnd(pOperator, pStbJoin); + pPrev->pListHead->readIdx++; } *ppRes = NULL; @@ -567,18 +690,18 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { if (!pStbJoin->ctx.prev.joinBuild) { buildStbJoinTableList(pOperator); - if (pInfo->execInfo.prevBlkRows <= 0) { + if (pStbJoin->execInfo.prevBlkRows <= 0) { pOperator->status = OP_EXEC_DONE; return NULL; } } - seqJoinContinueRetrieve(pOperator, &pRes); + seqJoinContinueCurrRetrieve(pOperator, &pRes); if (pRes) { return pRes; } - seqJoinLaunchRetrieve(pOperator, &pRes); + seqJoinLaunchNewRetrieve(pOperator, &pRes); return pRes; } @@ -592,15 +715,19 @@ int32_t initSeqStbJoinTableHash(SStbJoinPrevJoinCtx* pPrev, bool batchFetch) { if (NULL == pPrev->rightHash) { return TSDB_CODE_OUT_OF_MEMORY; } - } - - pPrev->tableTimes = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - if (NULL == pPrev->tableTimes) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - if (NULL == pPrev->onceTable) { - return TSDB_CODE_OUT_OF_MEMORY; + } else { + pPrev->leftCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (NULL == pPrev->leftCache) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pPrev->rightCache = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (NULL == pPrev->rightCache) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pPrev->onceTable = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (NULL == pPrev->onceTable) { + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index f2e2d4840b..d8756accab 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -70,6 +70,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn return; } + SSourceDataInfo* pDataInfo = NULL; + while (1) { qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); tsem_wait(&pExchangeInfo->ready); @@ -79,7 +81,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } for (int32_t i = 0; i < totalSources; ++i) { - SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); + pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { continue; } @@ -99,12 +101,21 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn // todo SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, - pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); - taosMemoryFreeClear(pDataInfo->pRsp); + if (NULL != pDataInfo->pSrcUidList) { + pDataInfo->status = EX_SOURCE_DATA_NOT_READY; + code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pDataInfo->pRsp); + goto _error; + } + } else { + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 + ", totalRows:%" PRIu64 ", try next %d/%" PRIzu, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, + pExchangeInfo->loadInfo.totalRows, i + 1, totalSources); + taosMemoryFreeClear(pDataInfo->pRsp); + } break; } @@ -134,7 +145,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn taosMemoryFreeClear(pDataInfo->pRsp); - if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { + if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED || NULL != pDataInfo->pSrcUidList) { pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { @@ -289,7 +300,8 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* for (int32_t i = 0; i < numOfSources; ++i) { SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i); taosArrayPush(pInfo->pSources, pNode); - tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &i, sizeof(i)); + SExchangeSrcIndex idx = {.srcIdx = i, .inUseIdx = -1}; + tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &idx, sizeof(idx)); } initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo); @@ -749,21 +761,32 @@ _error: int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) { SExchangeInfo* pExchangeInfo = pOperator->info; - int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId)); + SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId)); if (NULL == pIdx) { qError("No exchange source for vgId: %d", pBasicParam->vgId); return TSDB_CODE_INVALID_PARA; } - - SSourceDataInfo dataInfo = {0}; - dataInfo.status = EX_SOURCE_DATA_NOT_READY; - dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo); - dataInfo.index = *pIdx; - dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); - dataInfo.srcOpType = pBasicParam->srcOpType; - dataInfo.tableSeq = pBasicParam->tableSeq; - - taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); + + if (pIdx->inUseIdx < 0) { + SSourceDataInfo dataInfo = {0}; + dataInfo.status = EX_SOURCE_DATA_NOT_READY; + dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo); + dataInfo.index = pIdx->srcIdx; + dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); + dataInfo.srcOpType = pBasicParam->srcOpType; + dataInfo.tableSeq = pBasicParam->tableSeq; + + taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); + pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1; + } else { + SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pIdx->inUseIdx); + if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { + pDataInfo->status = EX_SOURCE_DATA_NOT_READY; + } + pDataInfo->pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); + pDataInfo->srcOpType = pBasicParam->srcOpType; + pDataInfo->tableSeq = pBasicParam->tableSeq; + } return TSDB_CODE_SUCCESS; } @@ -773,9 +796,9 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; int32_t code = TSDB_CODE_SUCCESS; SExchangeOperatorBasicParam* pBasicParam = NULL; - SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam->value; + SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value; if (pParam->multiParams) { - SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorParam->value; + SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value; int32_t iter = 0; while (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter)) { code = addSingleExchangeSource(pOperator, pBasicParam); @@ -788,7 +811,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { code = addSingleExchangeSource(pOperator, pBasicParam); } - pOperator->pOperatorParam = NULL; + pOperator->pOperatorGetParam = NULL; return TSDB_CODE_SUCCESS; } @@ -797,7 +820,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; int32_t code = TSDB_CODE_SUCCESS; - if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorParam)) { + if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 3101dcc32e..ae58ec80ef 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -27,6 +27,17 @@ #include "ttypes.h" #include "groupcache.h" + +static void removeGroupCacheFile(SGroupCacheFileInfo* pFileInfo) { + if (pFileInfo->fd.fd) { + taosCloseFile(&pFileInfo->fd.fd); + pFileInfo->fd.fd = NULL; + taosThreadMutexDestroy(&pFileInfo->fd.mutex); + } + pFileInfo->deleted = true; +} + + static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) { pCols->colNum = LIST_LENGTH(pList); pCols->withNull = grpColsMayBeNull; @@ -96,7 +107,7 @@ static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* return TSDB_CODE_SUCCESS; } -static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, uint32_t fileId, SGroupCacheFileFd** ppFd) { +static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, int32_t fileId, SGroupCacheFileFd** ppFd, bool* pDeleted) { int32_t code = TSDB_CODE_SUCCESS; if (NULL == pFileCtx->pCacheFile) { pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); @@ -105,22 +116,29 @@ static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, uint32_t fileId, } } - SGroupCacheFileFd* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); + SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); if (NULL == pTmp) { - sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId); + sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%d", fileId); - SGroupCacheFileFd newVgFd = {0}; - taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd)); + SGroupCacheFileInfo newFile = {0}; + taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)); pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); + } - code = initOpenCacheFile(pTmp, pFileCtx->baseFilename); + if (pTmp->deleted) { + *pDeleted = true; + return TSDB_CODE_SUCCESS; + } + + if (NULL == pTmp->fd.fd) { + code = initOpenCacheFile(&pTmp->fd, pFileCtx->baseFilename); if (code) { return code; } } - taosThreadMutexLock(&pTmp->mutex); - *ppFd = pTmp; + taosThreadMutexLock(&pTmp->fd.mutex); + *ppFd = &pTmp->fd; return TSDB_CODE_SUCCESS; } @@ -131,16 +149,52 @@ static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) { static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) { int32_t code = TSDB_CODE_SUCCESS; - SGroupCacheFileFd *pFd; + SGroupCacheFileFd *pFd = NULL; SGcFileCacheCtx* pFileCtx = NULL; + SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; + int64_t lastGroupId = 0; + SGroupCacheData* pGroup = NULL; while (NULL != pHead) { - pFileCtx = pGCache->batchFetch ? &pHead->pCtx->fileCtx : &pHead->pGroup->pVgCtx->fileCtx; + if (pGCache->batchFetch) { + pFileCtx = &pHead->pCtx->fileCtx; + } else { + if (pHead->groupId != lastGroupId) { + if (NULL != pGroup) { + taosHashRelease(pGrpHash, pGroup); + } + pGroup = taosHashAcquire(pGrpHash, &pHead->groupId, sizeof(pHead->groupId)); + lastGroupId = pHead->groupId; + } - code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd); + if (NULL == pGroup) { + qTrace("group %" PRIu64 " in downstream %d may already be deleted, skip write", pHead->groupId, pHead->pCtx->id); + + int64_t blkId = pHead->basic.blkId; + pHead = pHead->next; + taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + continue; + } + + pFileCtx = &pGroup->pVgCtx->fileCtx; + } + + bool deleted = false; + code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd, &deleted); if (code) { goto _return; } + + if (deleted) { + qTrace("FileId:%d-%d-%d already be deleted, skip write", + pCtx->id, pGroup->vgId, pHead->basic.fileId); + + int64_t blkId = pHead->basic.blkId; + pHead = pHead->next; + + taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + continue; + } int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); if (ret == -1) { @@ -158,8 +212,8 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC releaseFdToFileCtx(pFd); - qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, - pHead->basic.fileId, pHead->basic.blkId, pHead->basic.bufSize, pHead->basic.offset); + qTrace("FileId:%d-%d-%d blk %" PRIu64 " in group %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, + pCtx->id, pGroup->vgId, pHead->basic.fileId, pHead->basic.blkId, pHead->groupId, pHead->basic.bufSize, pHead->basic.offset); int64_t blkId = pHead->basic.blkId; pHead = pHead->next; @@ -167,15 +221,18 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); } - _return: + if (NULL != pGroup) { + taosHashRelease(pGrpHash, pGroup); + } + atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1); return code; } -static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { +static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId), pBufInfo, sizeof(*pBufInfo))) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -195,7 +252,7 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr } pCache->pDirtyTail = pBufInfo; - if (pGCache->maxCacheSize > 0 && pCache->blkCacheSize > pGCache->maxCacheSize) { + if (pGCache->maxCacheSize >= 0 && pCache->blkCacheSize > pGCache->maxCacheSize) { if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) { pWriteHead = pCache->pDirtyHead; SGcBlkBufInfo* pTmp = pCache->pDirtyHead; @@ -220,11 +277,20 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr } -static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx) { +static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx, int32_t downstreamIdx, int32_t vgId, bool removeCheck) { if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { return; } - + + if (removeCheck) { + SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pFileCtx->fileId, sizeof(pFileCtx->fileId)); + if (0 == pFileInfo->groupNum) { + removeGroupCacheFile(pFileInfo); + qTrace("FileId:%d-%d-%d removed", downstreamIdx, vgId, pFileCtx->fileId); + //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId)); + } + } + pFileCtx->fileId++; pFileCtx->fileSize = 0; } @@ -248,13 +314,13 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB pBufInfo->basic.bufSize = bufSize; pBufInfo->basic.offset = atomic_fetch_add_64(&pFileCtx->fileSize, bufSize); pBufInfo->pCtx = pCtx; - pBufInfo->pGroup = pGroup; + pBufInfo->groupId = pBlock->info.id.groupId; if (pGCache->batchFetch) { - groupCacheSwitchNewFile(pFileCtx); + groupCacheSwitchNewFile(pFileCtx, pCtx->id, pGroup->vgId, false); } - int32_t code = addBlkToDirtyBufList(pGCache, pCtx, pGroup->pVgCtx, &pGCache->blkCache, pBufInfo); + int32_t code = addBlkToDirtyBufList(pGCache, pCtx, &pGCache->blkCache, pBufInfo); return code; } @@ -343,10 +409,16 @@ static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int3 static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) { SGroupCacheFileFd *pFileFd = NULL; SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pGCache->pDownstreams[pGrp->downstreamIdx].fileCtx : &pGrp->pVgCtx->fileCtx; - int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd); + bool deleted = false; + int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd, &deleted); if (code) { return code; } + if (deleted) { + qError("FileId:%d-%d-%d already be deleted, skip read", pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + int32_t ret = taosLSeekFile(pFileFd->fd, pBasic->offset, SEEK_SET); if (ret == -1) { code = TAOS_SYSTEM_ERROR(errno); @@ -366,8 +438,8 @@ static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheDa goto _return; } - qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64, - pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset); + qTrace("FileId:%d-%d-%d blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64, + pGrp->downstreamIdx, pGrp->vgId, pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset); _return: @@ -406,7 +478,7 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { pVgCtx->pTbList = pTbList; - + pVgCtx->id = vgId; snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d", tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId); pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0; @@ -491,7 +563,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p } if (pBlock) { - qError("%s group cache retrieved block with groupId: %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId); + qError("%s blk retrieved from group %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId); pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) { @@ -530,6 +602,31 @@ static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) { taosThreadMutexUnlock(&pGroup->mutex); } +static int32_t addFileRefTableNum(SGcFileCacheCtx* pFileCtx, int32_t fileId, int32_t downstreamId, int32_t vgId) { + if (NULL == pFileCtx->pCacheFile) { + pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + if (NULL == pFileCtx->pCacheFile) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + SGroupCacheFileInfo* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); + if (NULL == pTmp) { + sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId); + + SGroupCacheFileInfo newFile = {0}; + newFile.groupNum = 1; + taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newFile, sizeof(newFile)); + pTmp = &newFile; + } else { + pTmp->groupNum++; + } + + qTrace("FileId:%d-%d-%d add groupNum to %u", downstreamId, vgId, fileId, pTmp->groupNum); + + return TSDB_CODE_SUCCESS; +} + static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) { if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastBlkUid == uid) { return TSDB_CODE_SUCCESS; @@ -547,11 +644,17 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat handleGroupFetchDone(pNew->pGroup); } - groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx); + groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx, pGroup->downstreamIdx, pGroup->vgId, true); pGroup->fileId = pGroup->pVgCtx->fileCtx.fileId; pGroup->startOffset = pGroup->pVgCtx->fileCtx.fileSize; + qTrace("FileId:%d-%d-%d add groupNum for group %" PRIu64, pGroup->downstreamIdx, pGroup->vgId, pGroup->pVgCtx->fileCtx.fileId, uid); + + if (pGroup->needCache) { + return addFileRefTableNum(&pGroup->pVgCtx->fileCtx, pGroup->pVgCtx->fileCtx.fileId, pGroup->downstreamIdx, pGroup->vgId); + } + return TSDB_CODE_SUCCESS; } @@ -575,11 +678,13 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* SGroupCacheData grpData = {0}; initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch, pGcParam->needCache); + + qError("new group %" PRIu64 " initialized, downstreamIdx:%d, vgId:%d, needCache:%d", uid, pParam->downstreamIdx, vgId, pGcParam->needCache); while (true) { if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) { if (terrno == TSDB_CODE_DUP_KEY) { - *ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); + *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid)); if (*ppGrp) { break; } @@ -588,7 +693,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* } } - *ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); + *ppGrp = taosHashGet(pGrpHash, &uid, sizeof(uid)); if (*ppGrp && pParam->pChildren) { SGcNewGroupInfo newGroup; newGroup.pGroup = *ppGrp; @@ -641,7 +746,7 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD return code; } } else { - qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.groupId); + qError("group %" PRIu64 " not found in group hash", pBlock->info.id.groupId); return TSDB_CODE_INVALID_PARA; } } @@ -919,7 +1024,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; - SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); + SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); if (NULL == pGroup) { code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? -1 : pGcParam->vgId, pGcParam->tbUid); if (TSDB_CODE_SUCCESS != code) { @@ -980,6 +1085,32 @@ static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +static void freeRemoveGroupCacheData(void* p) { + SGroupCacheData* pGroup = p; + if (pGroup->vgId >= 0) { + SGcFileCacheCtx* pFileCtx = &pGroup->pVgCtx->fileCtx; + if (pGroup->fileId >= 0) { + SGroupCacheFileInfo* pFileInfo = taosHashGet(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId)); + uint32_t remainNum = atomic_sub_fetch_32(&pFileInfo->groupNum, 1); + + qTrace("FileId:%d-%d-%d sub group num to %u", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId, remainNum); + + if (0 == remainNum && pGroup->fileId != pFileCtx->fileId) { + removeGroupCacheFile(pFileInfo); + qTrace("FileId:%d-%d-%d removed", pGroup->downstreamIdx, pGroup->vgId, pFileCtx->fileId); + //taosHashRemove(pFileCtx->pCacheFile, &pGroup->fileId, sizeof(pGroup->fileId)); + } + } + } + + taosArrayDestroy(pGroup->waitQueue); + taosArrayDestroy(pGroup->blkList.pList); + + qTrace("group removed"); +} + + + static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { SGroupCacheOperatorInfo* pInfo = pOperator->info; pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams)); @@ -1012,6 +1143,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { if (pCtx->pGrpHash == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + taosHashSetFreeFp(pCtx->pGrpHash, freeRemoveGroupCacheData); } pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); @@ -1038,7 +1170,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { +static SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { SSDataBlock* pBlock = NULL; int32_t code = getBlkFromGroupCache(pOperator, &pBlock, pParam); @@ -1050,6 +1182,20 @@ SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* return pBlock; } +static int32_t groupCacheTableCacheEnd(SOperatorInfo* pOperator, SOperatorParam* pParam) { + SGcNotifyOperatorParam* pGcParam = pParam->value; + SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGcParam->downstreamIdx]; + SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; + + qTrace("try to remove group %" PRIu64, pGcParam->tbUid); + if (taosHashRemove(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid))) { + qError("failed to remove group %" PRIu64 " in vgId %d downstreamIdx %d", pGcParam->tbUid, pGcParam->vgId, pGcParam->downstreamIdx); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + return TSDB_CODE_SUCCESS; +} SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { @@ -1095,6 +1241,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } + taosHashSetFreeFp(pInfo->pGrpHash, freeRemoveGroupCacheData); } code = appendDownstream(pOperator, pDownstream, numOfDownstream); @@ -1112,7 +1259,9 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, groupCacheTableCacheEnd); + + qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch); return pOperator; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 872ea53329..aa580895af 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -648,8 +648,8 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t static void setMergeJoinDone(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; - pOperator->pDownstreamParams[0] = NULL; - pOperator->pDownstreamParams[1] = NULL; + pOperator->pDownstreamGetParams[0] = NULL; + pOperator->pDownstreamGetParams[1] = NULL; } static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { @@ -667,7 +667,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs } if (pJoinInfo->pLeft == NULL) { - if (pOperator->pOperatorParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorParam->value)->initParam) { + if (pOperator->pOperatorGetParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorGetParam->value)->initParam) { leftEmpty = true; } else { setMergeJoinDone(pOperator); @@ -773,7 +773,7 @@ void resetMergeJoinOperator(struct SOperatorInfo* pOperator) { SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoinInfo = pOperator->info; if (pOperator->status == OP_EXEC_DONE) { - if (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1]) { + if (NULL == pOperator->pDownstreamGetParams[0] || NULL == pOperator->pDownstreamGetParams[1]) { qError("total merge join res rows:%" PRId64, pJoinInfo->resRows); return NULL; } else { diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 54f581a660..27d390602d 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -654,59 +654,74 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) { } -int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { - if (NULL == pParam) { - pOperator->pOperatorParam = NULL; - taosMemoryFreeClear(pOperator->pDownstreamParams); +int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pInput, SOperatorParamType type) { + SOperatorParam** ppParam = NULL; + SOperatorParam*** pppDownstramParam = NULL; + switch (type) { + case OP_GET_PARAM: + ppParam = &pOperator->pOperatorGetParam; + pppDownstramParam = &pOperator->pDownstreamGetParams; + break; + case OP_NOTIFY_PARAM: + ppParam = &pOperator->pOperatorNotifyParam; + pppDownstramParam = &pOperator->pDownstreamNotifyParams; + break; + default: + return TSDB_CODE_INVALID_PARA; + } + + if (NULL == pInput) { + *ppParam = NULL; + taosMemoryFreeClear(*pppDownstramParam); return TSDB_CODE_SUCCESS; } - pOperator->pOperatorParam = (pParam->opType == pOperator->operatorType) ? pParam : NULL; + *ppParam = (pInput->opType == pOperator->operatorType) ? pInput : NULL; - if (NULL == pOperator->pDownstreamParams) { - pOperator->pDownstreamParams = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES); - if (NULL == pOperator->pDownstreamParams) { + if (NULL == *pppDownstramParam) { + *pppDownstramParam = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES); + if (NULL == *pppDownstramParam) { return TSDB_CODE_OUT_OF_MEMORY; } } - if (NULL == pOperator->pOperatorParam) { + if (NULL == *ppParam) { for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - pOperator->pDownstreamParams[i] = pParam; + (*pppDownstramParam)[i] = pInput; } return TSDB_CODE_SUCCESS; } - memset(pOperator->pDownstreamParams, 0, pOperator->numOfDownstream * POINTER_BYTES); + memset(*pppDownstramParam, 0, pOperator->numOfDownstream * POINTER_BYTES); - int32_t childrenNum = taosArrayGetSize(pOperator->pOperatorParam->pChildren); + int32_t childrenNum = taosArrayGetSize((*ppParam)->pChildren); if (childrenNum <= 0) { return TSDB_CODE_SUCCESS; } for (int32_t i = 0; i < childrenNum; ++i) { - SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOperator->pOperatorParam->pChildren, i); - if (pOperator->pDownstreamParams[pChild->downstreamIdx]) { - int32_t code = mergeOperatorParams(pOperator->pDownstreamParams[pChild->downstreamIdx], pChild); + SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet((*ppParam)->pChildren, i); + if ((*pppDownstramParam)[pChild->downstreamIdx]) { + int32_t code = mergeOperatorParams((*pppDownstramParam)[pChild->downstreamIdx], pChild); if (code) { return code; } } else { - pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild; + (*pppDownstramParam)[pChild->downstreamIdx] = pChild; } } - taosArrayClear(pOperator->pOperatorParam->pChildren); + taosArrayClear((*ppParam)->pChildren); return TSDB_CODE_SUCCESS; } SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam) { - if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) { + if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) { qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name); - SSDataBlock* pBlock = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]); + SSDataBlock* pBlock = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamGetParams[idx]); if (clearParam) { - pOperator->pDownstreamParams[idx] = NULL; + pOperator->pDownstreamGetParams[idx] = NULL; } return pBlock; } @@ -725,7 +740,7 @@ SSDataBlock* getNextBlockFromDownstreamOnce(struct SOperatorInfo* pOperator, int SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { - int32_t code = setOperatorParams(pOperator, pParam); + int32_t code = setOperatorParams(pOperator, pParam, OP_GET_PARAM); if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); @@ -733,6 +748,29 @@ SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorP return pOperator->fpSet.getNextFn(pOperator); } +int32_t optrDefaultNotifyFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { + int32_t code = setOperatorParams(pOperator, pParam, OP_NOTIFY_PARAM); + if (TSDB_CODE_SUCCESS == code && pOperator->fpSet.notifyFn && pOperator->pOperatorNotifyParam) { + code = pOperator->fpSet.notifyFn(pOperator, pOperator->pOperatorNotifyParam); + } + if (TSDB_CODE_SUCCESS == code) { + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + if (pOperator->pDownstreamNotifyParams[i]) { + code = optrDefaultNotifyFn(pOperator->pDownstream[i], pOperator->pDownstreamNotifyParams[i]); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + } + } + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } + + return code; +} + int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t idx) { if (pOperator->transparent) { return getOperatorResultBlockId(pOperator->pDownstream[idx], 0); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 99e5f59e9e..4c00df40f8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -787,7 +787,7 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = 0; STableListInfo* pListInfo = pInfo->base.pTableListInfo; - STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorParam->value; + STableScanOperatorParam* pParam = (STableScanOperatorParam*)pOperator->pOperatorGetParam->value; int32_t num = taosArrayGetSize(pParam->pUidList); if (num <= 0) { qError("empty table scan uid list"); @@ -916,10 +916,10 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStorageAPI* pAPI = &pTaskInfo->storageAPI; - if (pOperator->pOperatorParam) { + if (pOperator->pOperatorGetParam) { pOperator->dynamicTask = true; int32_t code = createTableListInfoFromParam(pOperator); - pOperator->pOperatorParam = NULL; + pOperator->pOperatorGetParam = NULL; if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8b533b7f8c..e83979e5b3 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3246,7 +3246,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** pGrpCache->node.dynamicOp = true; pGrpCache->grpColsMayBeNull = false; pGrpCache->grpByUid = true; - pGrpCache->batchFetch = true; + pGrpCache->batchFetch = false; pGrpCache->node.pChildren = pChildren; pGrpCache->node.pTargets = nodesMakeList(); if (NULL == pGrpCache->node.pTargets) { @@ -3352,7 +3352,7 @@ static int32_t stbJoinOptCreateDynQueryCtrlNode(SLogicNode* pPrev, SLogicNode* p } pDynCtrl->qType = DYN_QTYPE_STB_HASH; - pDynCtrl->stbJoin.batchFetch = true; + pDynCtrl->stbJoin.batchFetch = false; if (TSDB_CODE_SUCCESS == code) { pDynCtrl->node.pChildren = nodesMakeList(); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 328195b025..000200988d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -488,6 +488,7 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i } int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) { +#if 0 if (!atomic_val_compare_exchange_8((int8_t*)&ctx->queryExecDone, true, false)) { QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d", ctx->queryExecDone); return TSDB_CODE_ACTION_IN_PROGRESS; @@ -496,6 +497,10 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg QW_TASK_ELOG("dynamic task prev exec not finished, queryEnd:%d", ctx->queryEnd); return TSDB_CODE_ACTION_IN_PROGRESS; } +#else + ctx->queryExecDone = false; + ctx->queryEnd = false; +#endif dsReset(ctx->sinkHandle); @@ -503,11 +508,14 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); - atomic_store_8((int8_t *)&ctx->queryInQueue, 1); - - QW_TASK_DLOG("the %dth dynamic task exec started", ctx->dynExecId++); - - QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); + if (QW_QUERY_RUNNING(ctx)) { + atomic_store_8((int8_t *)&ctx->queryContinue, 1); + QW_TASK_DLOG("the %dth dynamic task exec started, continue running", ctx->dynExecId++); + } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { + atomic_store_8((int8_t *)&ctx->queryInQueue, 1); + QW_TASK_DLOG("the %dth dynamic task exec started", ctx->dynExecId++); + QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo)); + } return TSDB_CODE_SUCCESS; } @@ -959,7 +967,7 @@ _return: } if (!rsped) { qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); - QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), + QW_TASK_DLOG("fetch rsp send, msgType:%s, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } else { qwFreeFetchRsp(rsp);