diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 62b06f62a5..c38c083b19 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -35,9 +35,14 @@ typedef struct SGcBufPageInfo { char* data; } SGcBufPageInfo; -typedef struct SGroupData { - SGcBlkBufInfo* blks; -} SGroupData; +typedef struct SGroupCacheData { + TdThreadMutex mutex; + SSHashObj* waitQueue; + bool fetchDone; + int64_t fetchSessionId; + SGcBlkBufInfo* pFirstBlk; + SGcBlkBufInfo* pLastBlk; +} SGroupCacheData; typedef struct SGroupColInfo { int32_t slot; @@ -56,10 +61,10 @@ typedef struct SGroupColsInfo { } SGroupColsInfo; typedef struct SGcSessionCtx { - int32_t downstreamIdx; - bool cacheHit; - bool needCache; - SGcBlkBufInfo* pLastBlk; + int32_t downstreamIdx; + bool needCache; + SGroupCacheData* pGroupData; + SGcBlkBufInfo* pLastBlk; } SGcSessionCtx; typedef struct SGcExecInfo { @@ -71,9 +76,7 @@ typedef struct SGroupCacheOperatorInfo { SSHashObj* pSessionHash; SGroupColsInfo groupColsInfo; SArray* pBlkBufs; - SSHashObj* pBlkHash; - int64_t pCurrentId; - SGcSessionCtx* pCurrent; + SHashObj* pBlkHash; SGcExecInfo execInfo; } SGroupCacheOperatorInfo; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 939b516d73..c664d2d7fd 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -87,7 +87,7 @@ static void destroyGroupCacheOperator(void* param) { taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf); taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage); tSimpleHashCleanup(pGrpCacheOperator->pSessionHash); - tSimpleHashCleanup(pGrpCacheOperator->pBlkHash); + taosHashCleanup(pGrpCacheOperator->pBlkHash); taosMemoryFreeClear(param); } @@ -110,10 +110,21 @@ static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo return pPage->data + pBlkInfo->offset; } -static FORCE_INLINE char* moveRetrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo** ppLastBlk) { - if (NULL == *ppLastBlk) { - return NULL; +static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + SGroupCacheOperatorInfo* pGCache = pOperator->info; + if (NULL == pSession->pLastBlk) { + if (pSession->pGroupData->pFirstBlk) { + *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk); + pSession->pLastBlk = pSession->pGroupData->pFirstBlk; + return TSDB_CODE_SUCCESS; + } + + if (atomic_load_64(&pSession->pGroupData->fetchSessionId) == sessionId) { + getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes); + } } + SGcBlkBufInfo* pCurr = (*ppLastBlk)->next; *ppLastBlk = pCurr; if (pCurr) { @@ -134,51 +145,70 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { return addPageToGroupCacheBuf(pInfo->pBlkBufs); } -static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) { - SGcSessionCtx ctx = {0}; - SGroupCacheOperatorInfo* pGCache = pOperator->info; - SGroupData* pGroup = tSimpleHashGet(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); - if (pGroup) { - ctx.cacheHit = true; - ctx.pLastBlk = pGroup->blks; - } else { - ctx.downstreamIdx = pParam->downstreamIdx; - ctx.needCache = pParam->needCache; +static int32_t initGroupCacheGroupData(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SGroupCacheData** ppGrp) { + SGroupCacheData grpData = {0}; + grpData.fetchSessionId = pParam->sessionId; + while (true) { + if (0 != taosHashPut(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize, &grpData, sizeof(grpData))) { + if (terrno == TSDB_CODE_DUP_KEY) { + *ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); + if (*ppGrp) { + break; + } + } else { + return terrno; + } + } + + *ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); + if (*ppGrp) { + break; + } } - int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx)); - if (TSDB_CODE_SUCCESS == code) { - *ppSession = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); + return TSDB_CODE_SUCCESS; +} + +static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) { + SGcSessionCtx ctx = {0}; + int32_t code = 0; + SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGroupCacheData* pGroup = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); + if (pGroup) { + ctx.pGroupData = pGroup; + } else { + code = initGroupCacheGroupData(pGCache, pParam, &ctx.pGroupData); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } + ctx.downstreamIdx = pParam->downstreamIdx; + ctx.needCache = pParam->needCache; + + int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + *ppSession = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); + return TSDB_CODE_SUCCESS; } -static void getFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) { +static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); if (NULL == pCtx) { int32_t code = initGroupCacheSession(pOperator, pParam, ppSession); if (TSDB_CODE_SUCCESS != code) { - pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + return code; } - if ((*ppSession)->pLastBlk) { - *ppRes = (SSDataBlock*)retrieveBlkFromBlkBufs(pGCache->pBlkBufs, (*ppSession)->pLastBlk); - } else { - *ppRes = NULL; - } - return; + } else { + *ppSession = pCtx; } - - *ppSession = pCtx; - if (pCtx->cacheHit) { - *ppRes = (SSDataBlock*)moveRetrieveBlkFromBlkBufs(pGCache->pBlkBufs, &pCtx->pLastBlk); - return; - } - - *ppRes = NULL; + return getBlkFromSessionCacheImpl(pOperator, pParam->sessionId, *ppSession, ppRes); } static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) { @@ -195,7 +225,7 @@ static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo static void setCurrentGroupCacheDone(struct SOperatorInfo* pOperator) { SGroupCacheOperatorInfo* pGCache = pOperator->info; - destroyCurrentGroupCacheSession(pGCache, &pGCache->pCurrent, &pGCache->pCurrentId); + //destroyCurrentGroupCacheSession(pGCache, &pGCache->pCurrent, &pGCache->pCurrentId); } static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) { @@ -208,11 +238,11 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam->value; SGcSessionCtx* pSession = NULL; SSDataBlock* pRes = NULL; - int32_t code = TSDB_CODE_SUCCESS; - - getFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession); - pGCache->pCurrent = pSession; - pGCache->pCurrentId = pParam->sessionId; + int32_t code = getBlkFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession); + if (TSDB_CODE_SUCCESS != code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } if (pRes) { return pRes; @@ -227,7 +257,7 @@ SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { pGCache->execInfo.pDownstreamBlkNum[pSession->downstreamIdx]++; - if (pGCache->pCurrent->needCache) { + if (pSession->needCache) { addBlkToGroupCache(pOperator, pBlock, &pRes); } else { pRes = pBlock; @@ -273,7 +303,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - pInfo->pBlkHash = tSimpleHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + pInfo->pBlkHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (pInfo->pBlkHash == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _error;