diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 5ef2464f8e..c235b693b3 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -162,7 +162,6 @@ typedef struct SGroupCacheLogicNode { bool grpColsMayBeNull; bool grpByUid; bool globalGrp; - bool enableCache; SNodeList* pGroupCols; } SGroupCacheLogicNode; @@ -446,7 +445,6 @@ typedef struct SGroupCachePhysiNode { bool grpColsMayBeNull; bool grpByUid; bool globalGrp; - bool enableCache; SNodeList* pGroupCols; } SGroupCachePhysiNode; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 2b5176eba6..37b689a419 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -131,7 +131,6 @@ typedef struct SGroupCacheOperatorInfo { SGroupColsInfo groupColsInfo; bool globalGrp; bool grpByUid; - bool enableCache; SGcDownstreamCtx* pDownstreams; SGcBlkCacheInfo blkCache; SHashObj* pGrpHash; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index a739fab23d..8458b3e881 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -157,7 +157,7 @@ static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** pp taosWUnLockLatch(&pCtx->blkLock); return buildGroupCacheBaseBlock(ppRes, pCtx->pBaseBlock); } - *ppRes = taosArrayPop(pCtx->pFreeBlock); + *ppRes = *(SSDataBlock**)taosArrayPop(pCtx->pFreeBlock); taosWUnLockLatch(&pCtx->blkLock); return TSDB_CODE_SUCCESS; @@ -270,11 +270,9 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p SOperatorParam* pDownstreamParam = NULL; SSDataBlock* pBlock = NULL; SGroupCacheOperatorInfo* pGCache = pOperator->info; - if (pGCache->enableCache) { - code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam); - if (code) { - return code; - } + code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam); + if (code) { + return code; } if (pDownstreamParam) { @@ -283,7 +281,7 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]); } - if (pBlock && pGCache->enableCache) { + if (pBlock) { pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) { code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock); @@ -389,7 +387,6 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; int32_t uidNum = 0; - SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGcVgroupCtx* pVgCtx = NULL; int32_t iter = 0; while (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter)) { @@ -420,17 +417,11 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator if (NULL == *ppRes) { code = handleDownstreamFetchDone(pOperator, pSession); break; - } else if (pGCache->enableCache) { - code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch); } else { - continueFetch = false; + code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch); } } - if (!pGCache->enableCache) { - return code; - } - if (!continueFetch) { SGcSessionCtx** ppWaitCtx = taosHashIterate(pCtx->pWaitSessions, NULL); if (ppWaitCtx) { @@ -514,22 +505,20 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; while (true) { - if (pGCache->enableCache) { - if (pSession->lastBlkId < 0) { - int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); - if (startBlkId > 0) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); - pSession->lastBlkId = startBlkId; - goto _return; - } - } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); - pSession->lastBlkId++; - goto _return; - } else if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { - *ppRes = NULL; + if (pSession->lastBlkId < 0) { + int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); + if (startBlkId > 0) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); + pSession->lastBlkId = startBlkId; goto _return; } + } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); + pSession->lastBlkId++; + goto _return; + } else if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { + *ppRes = NULL; + goto _return; } if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId) @@ -686,7 +675,7 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock if (TSDB_CODE_SUCCESS != code) { return code; } - } else if (pGCache->enableCache) { + } else { SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (ppBlock) { releaseBaseBlockToList(pCtx, *ppBlock); @@ -788,7 +777,6 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->maxCacheSize = -1; pInfo->grpByUid = pPhyciNode->grpByUid; pInfo->globalGrp = pPhyciNode->globalGrp; - pInfo->enableCache = pPhyciNode->enableCache; if (!pInfo->grpByUid) { qError("only group cache by uid is supported now"); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 68a7b06814..9463be6755 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -540,7 +540,6 @@ static int32_t logicGroupCacheCopy(const SGroupCacheLogicNode* pSrc, SGroupCache COPY_SCALAR_FIELD(grpColsMayBeNull); COPY_SCALAR_FIELD(grpByUid); COPY_SCALAR_FIELD(globalGrp); - COPY_SCALAR_FIELD(enableCache); CLONE_NODE_LIST_FIELD(pGroupCols); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index b48745ab43..ef3181f412 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1185,7 +1185,6 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { static const char* jkGroupCacheLogicPlanGrpColsMayBeNull = "GroupColsMayBeNull"; static const char* jkGroupCacheLogicPlanGroupByUid = "GroupByUid"; static const char* jkGroupCacheLogicPlanGlobalGroup = "GlobalGroup"; -static const char* jkGroupCacheLogicPlanEnableCache = "EnableCache"; static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols"; static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) { @@ -1201,9 +1200,6 @@ static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGlobalGroup, pNode->globalGrp); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanEnableCache, pNode->enableCache); - } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols); } @@ -1224,9 +1220,6 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGlobalGroup, &pNode->globalGrp); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanEnableCache, &pNode->enableCache); - } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols); } @@ -2936,7 +2929,6 @@ static const char* jkGroupCachePhysiPlanGroupCols = "GroupColumns"; static const char* jkGroupCachePhysiPlanGrpColsMayBeNull = "GroupColumnsMayBeNull"; static const char* jkGroupCachePhysiPlanGroupByUid = "GroupByUid"; static const char* jkGroupCachePhysiPlanGlobalGroup = "GlobalGroup"; -static const char* jkGroupCachePhysiPlanEnableCache = "EnableCache"; static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) { @@ -2952,9 +2944,6 @@ static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGlobalGroup, pNode->globalGrp); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanEnableCache, pNode->enableCache); - } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkGroupCachePhysiPlanGroupCols, pNode->pGroupCols); } @@ -2974,9 +2963,6 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGlobalGroup, &pNode->globalGrp); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanEnableCache, &pNode->enableCache); - } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkGroupCachePhysiPlanGroupCols, &pNode->pGroupCols); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 34a0ace0ef..6648186b4d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3530,7 +3530,6 @@ enum { PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL, PHY_GROUP_CACHE_CODE_GROUP_BY_UID, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, - PHY_GROUP_CACHE_CODE_ENABLE_CACHE, PHY_GROUP_CACHE_CODE_GROUP_COLUMNS }; @@ -3550,9 +3549,6 @@ static int32_t physiGroupCacheNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, pNode->globalGrp); } - if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_ENABLE_CACHE, pNode->enableCache); - } return code; } @@ -3579,9 +3575,6 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) { case PHY_GROUP_CACHE_CODE_GLOBAL_GROUP: code = tlvDecodeBool(pTlv, &pNode->globalGrp); break; - case PHY_GROUP_CACHE_CODE_ENABLE_CACHE: - code = tlvDecodeBool(pTlv, &pNode->enableCache); - break; default: break; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index da7f3d2737..f808c644d9 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3262,8 +3262,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** } pScan->node.pParent = (SLogicNode*)pGrpCache; } - pGrpCache->globalGrp = !hasCond; - pGrpCache->enableCache = pGrpCache->globalGrp; + pGrpCache->globalGrp = false; if (TSDB_CODE_SUCCESS == code) { *ppLogic = (SLogicNode*)pGrpCache; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 61843f91c8..ff0f90498f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -988,7 +988,6 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull; pGrpCache->grpByUid = pLogicNode->grpByUid; pGrpCache->globalGrp = pLogicNode->globalGrp; - pGrpCache->enableCache = pLogicNode->enableCache; SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; int32_t code = TSDB_CODE_SUCCESS; if (TSDB_CODE_SUCCESS == code) {