diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c235b693b3..5ef2464f8e 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -162,6 +162,7 @@ typedef struct SGroupCacheLogicNode { bool grpColsMayBeNull; bool grpByUid; bool globalGrp; + bool enableCache; SNodeList* pGroupCols; } SGroupCacheLogicNode; @@ -445,6 +446,7 @@ typedef struct SGroupCachePhysiNode { bool grpColsMayBeNull; bool grpByUid; bool globalGrp; + bool enableCache; SNodeList* pGroupCols; } SGroupCachePhysiNode; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f14547d1bc..1c1dab1df7 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -108,7 +108,6 @@ typedef struct SExchangeOpStopInfo { typedef struct SGcOperatorParam { int64_t sessionId; int32_t downstreamIdx; - bool needCache; int32_t vgId; int64_t tbUid; } SGcOperatorParam; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index a0274ff001..2b5176eba6 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -45,7 +45,6 @@ typedef struct SGroupCacheData { TdThreadMutex mutex; SArray* waitQueue; bool fetchDone; - bool needCache; SSDataBlock* pBlock; SGcVgroupCtx* pVgCtx; int32_t downstreamIdx; @@ -89,17 +88,19 @@ typedef struct SGcDownstreamCtx { SSDataBlock* pBaseBlock; SArray* pFreeBlock; int64_t lastBlkUid; + SHashObj* pSessions; + SHashObj* pWaitSessions; } SGcDownstreamCtx; typedef struct SGcSessionCtx { int32_t downstreamIdx; - bool needCache; SGcOperatorParam* pParam; SGroupCacheData* pGroupData; int64_t lastBlkId; int64_t nextOffset; bool semInit; tsem_t waitSem; + bool newFetch; } SGcSessionCtx; typedef struct SGcExecInfo { @@ -113,11 +114,6 @@ typedef struct SGcCacheFile { int64_t fileSize; } SGcCacheFile; -typedef struct SGcReadBlkInfo { - SSDataBlock* pBlock; - int64_t nextOffset; -} SGcReadBlkInfo; - typedef struct SGcBlkCacheInfo { SRWLatch dirtyLock; SSHashObj* pCacheFile; @@ -132,10 +128,10 @@ typedef struct SGroupCacheOperatorInfo { TdThreadMutex sessionMutex; int64_t maxCacheSize; int64_t currentBlkId; - SHashObj* pSessionHash; SGroupColsInfo groupColsInfo; bool globalGrp; bool grpByUid; + bool enableCache; SGcDownstreamCtx* pDownstreams; SGcBlkCacheInfo blkCache; SHashObj* pGrpHash; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 8bd80c438f..825f2cbd90 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -66,6 +66,7 @@ typedef struct SOperatorInfo { int16_t resultDataBlockId; bool blocking; // block operator or not bool transparent; + bool dynamicTask; uint8_t status; // denote if current operator is completed char* name; // name, for debug purpose void* info; // extension attribution diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index c8f7ece8fa..a6ab5f5b49 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -36,7 +36,7 @@ static void destroyDynQueryCtrlOperator(void* param) { taosMemoryFreeClear(param); } -static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, bool needCache, int32_t vgId, int64_t tbUid, SOperatorParam* pChild) { +static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, int32_t vgId, int64_t tbUid, SOperatorParam* pChild) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; @@ -56,7 +56,6 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1); pGc->downstreamIdx = downstreamIdx; - pGc->needCache = needCache; pGc->vgId = vgId; pGc->tbUid = tbUid; @@ -146,10 +145,10 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid, NULL); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, *leftVg, *leftUid, pExcParam0); + code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pExcParam0); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, *rightVg, *rightUid, pExcParam1); + code = buildGroupCacheOperatorParam(&pGcParam1, 1, *rightVg, *rightUid, pExcParam1); } if (TSDB_CODE_SUCCESS == code) { code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 137c3e6116..a739fab23d 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -80,7 +80,6 @@ static void destroyGroupCacheOperator(void* param) { taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo); taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf); - taosHashCleanup(pGrpCacheOperator->pSessionHash); taosHashCleanup(pGrpCacheOperator->pGrpHash); taosMemoryFreeClear(param); @@ -164,12 +163,10 @@ static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** pp return TSDB_CODE_SUCCESS; } -static int32_t releaseBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) { +static void releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) { taosWLockLatch(&pCtx->blkLock); taosArrayPush(pCtx->pFreeBlock, &pBlock); taosWUnLockLatch(&pCtx->blkLock); - - return TSDB_CODE_SUCCESS; } @@ -182,15 +179,9 @@ static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int3 return blockDataFromBuf(*ppRes, pBufInfo->pBuf); } -static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) { +static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SGcBlkCacheInfo* pCache = &pGCache->blkCache; - SGcReadBlkInfo* pReadBlk = taosHashAcquire(pCache->pReadBlk, &blkId, sizeof(blkId)); - if (pReadBlk) { - *ppRes = pReadBlk->pBlock; - *nextOffset = pReadBlk->nextOffset; - return TSDB_CODE_SUCCESS; - } taosRLockLatch(&pCache->dirtyLock); SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &blkId, sizeof(blkId)); @@ -202,8 +193,8 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC } *nextOffset = pBufInfo->offset + pBufInfo->bufSize; - SGcReadBlkInfo readBlk = {.pBlock = *ppRes, .nextOffset = *nextOffset}; - taosHashPut(pCache->pReadBlk, &blkId, sizeof(blkId), &readBlk, sizeof(readBlk)); + + taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES); return TSDB_CODE_SUCCESS; } taosRUnLockLatch(&pCache->dirtyLock); @@ -279,9 +270,11 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p SOperatorParam* pDownstreamParam = NULL; SSDataBlock* pBlock = NULL; SGroupCacheOperatorInfo* pGCache = pOperator->info; - code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam); - if (code) { - return code; + if (pGCache->enableCache) { + code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam); + if (code) { + return code; + } } if (pDownstreamParam) { @@ -290,7 +283,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]); } - if (pBlock) { + if (pBlock && pGCache->enableCache) { pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) { code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock); @@ -359,41 +352,36 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; - if (pGCache->grpByUid) { - SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); - if (NULL == pGroup) { - qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.uid); - return TSDB_CODE_INVALID_PARA; - } - - handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.uid); - - pGroup->pBlock = pBlock; - - if (pGroup->needCache) { - SGcBlkBufInfo newBlkBuf; - code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf); - if (code) { - return code; - } - - if (pGroup->endBlkId > 0) { - pGroup->endBlkId = newBlkBuf.blkId; - } else { - pGroup->startBlkId = newBlkBuf.blkId; - pGroup->endBlkId = newBlkBuf.blkId; - } - } + SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId)); + if (NULL == pGroup) { + qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.groupId); + return TSDB_CODE_INVALID_PARA; + } + + handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId); - notifyWaitingSessions(pGroup->waitQueue); - if (pGroup == pSession->pGroupData) { - *continueFetch = false; - } - - return TSDB_CODE_SUCCESS; + SGcBlkBufInfo newBlkBuf; + code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf); + if (code) { + return code; + } + + if (pGroup->endBlkId > 0) { + pGroup->endBlkId = newBlkBuf.blkId; + } else { + pGroup->startBlkId = newBlkBuf.blkId; + pGroup->endBlkId = newBlkBuf.blkId; } - return TSDB_CODE_INVALID_PARA; + notifyWaitingSessions(pGroup->waitQueue); + if (pGroup == pSession->pGroupData) { + pSession->lastBlkId = newBlkBuf.blkId; + pSession->nextOffset = newBlkBuf.offset + newBlkBuf.bufSize; + + *continueFetch = false; + } + + return TSDB_CODE_SUCCESS; } static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) { @@ -413,13 +401,16 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes taosArrayClear(pVgCtx->pTbList); } + taosHashClear(pCtx->pWaitSessions); + return TSDB_CODE_SUCCESS; } -static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession, SSDataBlock** ppRes) { +static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { bool continueFetch = true; int32_t code = TSDB_CODE_SUCCESS; - + SGroupCacheOperatorInfo* pGCache = pOperator->info; + while (continueFetch && TSDB_CODE_SUCCESS == code) { int32_t code = getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes); if (TSDB_CODE_SUCCESS != code) { @@ -429,15 +420,46 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator if (NULL == *ppRes) { code = handleDownstreamFetchDone(pOperator, pSession); break; - } else { + } else if (pGCache->enableCache) { code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch); + } else { + continueFetch = false; } } + if (!pGCache->enableCache) { + return code; + } + + if (!continueFetch) { + SGcSessionCtx** ppWaitCtx = taosHashIterate(pCtx->pWaitSessions, NULL); + if (ppWaitCtx) { + taosHashCancelIterate(pCtx->pWaitSessions, ppWaitCtx); + int64_t* pSessionId = taosHashGetKey(ppWaitCtx, NULL); + if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, *pSessionId)) { + qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + SGcSessionCtx* pWaitCtx = *ppWaitCtx; + pWaitCtx->newFetch = true; + taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId)); + tsem_post(&pWaitCtx->waitSem); + + return code; + } + } + + if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, -1)) { + qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + return code; } -static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGroup, SGcSessionCtx* pSession, SSDataBlock** ppRes) { +static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGroupCacheData* pGroup = pSession->pGroupData; int32_t code = TSDB_CODE_SUCCESS; if (NULL == pGroup->waitQueue) { pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES); @@ -456,77 +478,73 @@ static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCac taosThreadMutexUnlock(&pSession->pGroupData->mutex); + taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES); + tsem_wait(&pSession->waitSem); - if (pSession->pGroupData->needCache) { - if (pSession->lastBlkId < 0) { - int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); - if (startBlkId > 0) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, startBlkId, &pSession->nextOffset, ppRes); - pSession->lastBlkId = startBlkId; - return code; - } else if (pGroup->fetchDone) { - *ppRes = NULL; - return TSDB_CODE_SUCCESS; - } - } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); - pSession->lastBlkId++; - return code; - } else if (pGroup->fetchDone) { - *ppRes = NULL; - return TSDB_CODE_SUCCESS; - } - } else { - *ppRes = pSession->pGroupData->pBlock; - return TSDB_CODE_SUCCESS; + if (pSession->newFetch) { + pSession->newFetch = false; + return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes); } - qError("no block retrieved from downstream and waked up"); - return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId)); + + if (pSession->lastBlkId < 0) { + int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); + if (startBlkId > 0) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); + pSession->lastBlkId = startBlkId; + } else if (pGroup->fetchDone) { + *ppRes = NULL; + } + } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); + pSession->lastBlkId++; + } else if (pGroup->fetchDone) { + *ppRes = NULL; + } + + return code; } static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; bool locked = false; + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; while (true) { - if (pSession->pGroupData->needCache) { + if (pGCache->enableCache) { if (pSession->lastBlkId < 0) { int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); if (startBlkId > 0) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, startBlkId, &pSession->nextOffset, ppRes); + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); pSession->lastBlkId = startBlkId; goto _return; } } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); pSession->lastBlkId++; goto _return; } else if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { *ppRes = NULL; goto _return; } - } else if (pSession->pGroupData->pBlock || atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { - *ppRes = pSession->pGroupData->pBlock; - pSession->pGroupData->pBlock = NULL; - goto _return; } - - if ((atomic_load_64(&pGCache->pDownstreams[pSession->downstreamIdx].fetchSessionId) == sessionId) - || (-1 == atomic_val_compare_exchange_64(&pGCache->pDownstreams[pSession->downstreamIdx].fetchSessionId, -1, sessionId))) { + + if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId) + || (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) { if (locked) { taosThreadMutexUnlock(&pSession->pGroupData->mutex); locked = false; } - code = getCacheBlkFromDownstreamOperator(pOperator, pSession, ppRes); + code = getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes); goto _return; } if (locked) { - code = groupCacheSessionWait(pGCache, pSession->pGroupData, pSession, ppRes); + code = groupCacheSessionWait(pOperator, pCtx, sessionId, pSession, ppRes); locked = false; if (TSDB_CODE_SUCCESS != code) { goto _return; @@ -570,7 +588,6 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcOperatorParam* pGcParam) { taosThreadMutexInit(&pGroup->mutex, NULL); - pGroup->needCache = pGcParam->needCache; pGroup->downstreamIdx = pGcParam->downstreamIdx; pGroup->vgId = pGcParam->vgId; pGroup->fileId = -1; @@ -606,7 +623,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* newGroup.pGroup = *ppGrp; newGroup.vgId = pGcParam->vgId; newGroup.uid = pGcParam->tbUid; - newGroup.pParam = taosArrayGet(pParam->pChildren, 0); + newGroup.pParam = taosArrayGetP(pParam->pChildren, 0); taosWLockLatch(&pCtx->grpLock); if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) { @@ -622,6 +639,14 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* return TSDB_CODE_SUCCESS; } +static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOperatorParam* pGcParam, SGroupCacheData* pGroup) { + pSession->pParam = pGcParam; + pSession->downstreamIdx = pGcParam->downstreamIdx; + pSession->pGroupData = pGroup; + pSession->lastBlkId = -1; + pSession->nextOffset = -1; +} + static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) { int32_t code = TSDB_CODE_SUCCESS; SGcSessionCtx ctx = {0}; @@ -631,51 +656,50 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); - if (pGroup) { - ctx.pGroupData = pGroup; - } else { - code = addNewGroupData(pOperator, pParam, &ctx.pGroupData); + if (NULL == pGroup) { + code = addNewGroupData(pOperator, pParam, &pGroup); if (TSDB_CODE_SUCCESS != code) { return code; } } - - ctx.pParam = pGcParam; - code = taosHashPut(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx)); + initGroupCacheSessionCtx(&ctx, pGcParam, pGroup); + + code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx)); if (TSDB_CODE_SUCCESS != code) { return code; } - *ppSession = taosHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + *ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); return TSDB_CODE_SUCCESS; } static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) { + int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcOperatorParam* pGcParam = pParam->value; - SGcSessionCtx* pSession = taosHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; + SGcSessionCtx* pSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (NULL == pSession) { int32_t code = initGroupCacheSession(pOperator, pParam, &pSession); if (TSDB_CODE_SUCCESS != code) { return code; } + } else if (pGCache->enableCache) { + SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + if (ppBlock) { + releaseBaseBlockToList(pCtx, *ppBlock); + taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + } } - return getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes); -} + code = getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes); + if (NULL == ppRes) { + taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + } -static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) { - if (NULL == *ppCurrent) { - return; - } - if (taosHashRemove(pGCache->pSessionHash, pCurrentId, sizeof(*pCurrentId))) { - qError("remove session %" PRIx64 " failed", *pCurrentId); - } - - *ppCurrent = NULL; - *pCurrentId = 0; + return code; } static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) { @@ -690,7 +714,7 @@ static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) { static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { SGroupCacheOperatorInfo* pInfo = pOperator->info; - pInfo->pDownstreams = taosMemoryMalloc(pOperator->numOfDownstream * sizeof(*pInfo->pDownstreams)); + pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams)); if (NULL == pInfo->pDownstreams) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -713,11 +737,21 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { return TSDB_CODE_OUT_OF_MEMORY; } } - + + pInfo->pDownstreams[i].pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (pInfo->pDownstreams[i].pSessions == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pInfo->pDownstreams[i].pFreeBlock = taosArrayInit(10, POINTER_BYTES); if (NULL == pInfo->pDownstreams[i].pFreeBlock) { return TSDB_CODE_OUT_OF_MEMORY; } + + pInfo->pDownstreams[i].pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (pInfo->pDownstreams[i].pWaitSessions == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; @@ -754,6 +788,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->maxCacheSize = -1; pInfo->grpByUid = pPhyciNode->grpByUid; pInfo->globalGrp = pPhyciNode->globalGrp; + pInfo->enableCache = pPhyciNode->enableCache; if (!pInfo->grpByUid) { qError("only group cache by uid is supported now"); @@ -781,12 +816,6 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t } } - pInfo->pSessionHash = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (pInfo->pSessionHash == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; - } - code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (TSDB_CODE_SUCCESS != code) { goto _error; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index b243c225f9..9eeb51a21c 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -642,6 +642,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t return TSDB_CODE_SUCCESS; } +static void setMergeJoinDone(SOperatorInfo* pOperator) { + pOperator->status = OP_EXEC_DONE; + pOperator->pDownstreamParams[0] = NULL; + pOperator->pDownstreamParams[1] = NULL; +} + static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { SMJoinOperatorInfo* pJoinInfo = pOperator->info; @@ -651,7 +657,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs pJoinInfo->leftPos = 0; if (pJoinInfo->pLeft == NULL) { qDebug("merge join left got empty block"); - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + setMergeJoinDone(pOperator); return false; } else { qDebug("merge join left got block"); @@ -664,7 +670,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs pJoinInfo->rightPos = 0; if (pJoinInfo->pRight == NULL) { qDebug("merge join right got empty block"); - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + setMergeJoinDone(pOperator); return false; } else { qDebug("merge join right got block"); @@ -729,6 +735,9 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoinInfo = pOperator->info; + if (pOperator->status == OP_EXEC_DONE && (NULL == pOperator->pDownstreamParams[0] || NULL == pOperator->pDownstreamParams[1])) { + return NULL; + } SSDataBlock* pRes = pJoinInfo->pRes; blockDataCleanup(pRes); @@ -746,6 +755,9 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; } + if (pOperator->status == OP_EXEC_DONE) { + break; + } } return (pRes->info.rows > 0) ? pRes : NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 44fa04d922..46db8b6197 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -799,8 +799,7 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { qDebug("add total %d dynamic tables to scan", num); - taosArrayClear(pListInfo->pTableList); - taosHashClear(pListInfo->map); + pListInfo->oneTableForEachGroup = true; for (int32_t i = 0; i < num; ++i) { uint64_t* pUid = taosArrayGet(pParam->pUidList, i); @@ -838,6 +837,10 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { + if (pInfo->base.pTableListInfo->oneTableForEachGroup) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId); + result->info.id.groupId = pKeyInfo->uid; + } return result; } @@ -851,14 +854,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SStorageAPI* pAPI = &pTaskInfo->storageAPI; if (pOperator->pOperatorParam) { + pOperator->dynamicTask = true; int32_t code = createTableListInfoFromParam(pOperator); pOperator->pOperatorParam = NULL; if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } - - if (pInfo->currentGroupId != -1) { + if (pOperator->status == OP_EXEC_DONE) { pInfo->currentGroupId = 0; return startNextGroupScan(pOperator); } @@ -922,11 +925,19 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SSDataBlock* result = doGroupedTableScan(pOperator); if (result != NULL) { + if (pInfo->base.pTableListInfo->oneTableForEachGroup) { + STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId); + result->info.id.groupId = pKeyInfo->uid; + } return result; } if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); + if (pOperator->dynamicTask) { + taosArrayClear(pInfo->base.pTableListInfo->pTableList); + taosHashClear(pInfo->base.pTableListInfo->map); + } return NULL; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index e16f16d954..68a7b06814 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -537,6 +537,10 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCacheLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + COPY_SCALAR_FIELD(grpColsMayBeNull); + COPY_SCALAR_FIELD(grpByUid); + COPY_SCALAR_FIELD(globalGrp); + COPY_SCALAR_FIELD(enableCache); CLONE_NODE_LIST_FIELD(pGroupCols); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index ef3181f412..b48745ab43 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1185,6 +1185,7 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { static const char* jkGroupCacheLogicPlanGrpColsMayBeNull = "GroupColsMayBeNull"; static const char* jkGroupCacheLogicPlanGroupByUid = "GroupByUid"; static const char* jkGroupCacheLogicPlanGlobalGroup = "GlobalGroup"; +static const char* jkGroupCacheLogicPlanEnableCache = "EnableCache"; static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols"; static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) { @@ -1200,6 +1201,9 @@ static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGlobalGroup, pNode->globalGrp); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanEnableCache, pNode->enableCache); + } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols); } @@ -1220,6 +1224,9 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGlobalGroup, &pNode->globalGrp); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanEnableCache, &pNode->enableCache); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols); } @@ -2929,6 +2936,7 @@ static const char* jkGroupCachePhysiPlanGroupCols = "GroupColumns"; static const char* jkGroupCachePhysiPlanGrpColsMayBeNull = "GroupColumnsMayBeNull"; static const char* jkGroupCachePhysiPlanGroupByUid = "GroupByUid"; static const char* jkGroupCachePhysiPlanGlobalGroup = "GlobalGroup"; +static const char* jkGroupCachePhysiPlanEnableCache = "EnableCache"; static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) { @@ -2944,6 +2952,9 @@ static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGlobalGroup, pNode->globalGrp); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanEnableCache, pNode->enableCache); + } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkGroupCachePhysiPlanGroupCols, pNode->pGroupCols); } @@ -2963,6 +2974,9 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGlobalGroup, &pNode->globalGrp); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanEnableCache, &pNode->enableCache); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkGroupCachePhysiPlanGroupCols, &pNode->pGroupCols); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 6648186b4d..34a0ace0ef 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3530,6 +3530,7 @@ enum { PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL, PHY_GROUP_CACHE_CODE_GROUP_BY_UID, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, + PHY_GROUP_CACHE_CODE_ENABLE_CACHE, PHY_GROUP_CACHE_CODE_GROUP_COLUMNS }; @@ -3549,6 +3550,9 @@ static int32_t physiGroupCacheNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, pNode->globalGrp); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_ENABLE_CACHE, pNode->enableCache); + } return code; } @@ -3575,6 +3579,9 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) { case PHY_GROUP_CACHE_CODE_GLOBAL_GROUP: code = tlvDecodeBool(pTlv, &pNode->globalGrp); break; + case PHY_GROUP_CACHE_CODE_ENABLE_CACHE: + code = tlvDecodeBool(pTlv, &pNode->enableCache); + break; default: break; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index a259ceb98f..da7f3d2737 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3263,7 +3263,8 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** pScan->node.pParent = (SLogicNode*)pGrpCache; } pGrpCache->globalGrp = !hasCond; - + pGrpCache->enableCache = pGrpCache->globalGrp; + if (TSDB_CODE_SUCCESS == code) { *ppLogic = (SLogicNode*)pGrpCache; } else { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index ff0f90498f..61843f91c8 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -988,6 +988,7 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull; pGrpCache->grpByUid = pLogicNode->grpByUid; pGrpCache->globalGrp = pLogicNode->globalGrp; + pGrpCache->enableCache = pLogicNode->enableCache; SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; int32_t code = TSDB_CODE_SUCCESS; if (TSDB_CODE_SUCCESS == code) {