From 5971395dcf97bdcd48739c232d3b0fb773e21748 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 3 Jul 2023 13:42:06 +0800 Subject: [PATCH] feat: add group cache operator --- include/libs/nodes/plannodes.h | 1 + source/libs/executor/inc/groupcache.h | 36 ++-- source/libs/executor/inc/operator.h | 15 +- source/libs/executor/src/groupcacheoperator.c | 159 ++++++++++++++---- source/libs/executor/src/operator.c | 12 +- 5 files changed, 178 insertions(+), 45 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 79add06cfd..700b4e7be8 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -439,6 +439,7 @@ typedef struct SHashJoinPhysiNode { typedef struct SGroupCachePhysiNode { SPhysiNode node; bool grpColsMayBeNull; + SArray* pDownstreamKey; SNodeList* pGroupCols; } SGroupCachePhysiNode; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index b1d45cc0f5..9f74ec4e02 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -21,15 +21,20 @@ extern "C" { #define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760 -typedef struct SGcSessionRes { - -} SGcSessionRes; +typedef struct SGcSessionCtx { + SOperatorInfo* pDownstream; + bool cacheHit; + bool needCache; + SGcBlkBufInfo* pLastBlk; +} SGcSessionCtx; typedef struct SGcOperatorParam { - int64_t sessionId; - bool newFetch; - void* pGroupValue; - int32_t groupValueSize; + SOperatorBasicParam basic; + int64_t sessionId; + int32_t downstreamKey; + bool needCache; + void* pGroupValue; + int32_t groupValueSize; } SGcOperatorParam; #pragma pack(push, 1) @@ -66,13 +71,20 @@ typedef struct SGroupColsInfo { char* pData; } SGroupColsInfo; -typedef struct SGroupCacheOperatorInfo { - SSHashObj* pSessionHash; - SGroupColsInfo groupColsInfo; - SArray* pBlkBufs; - SSHashObj* pBlkHash; +typedef struct SGcDownstreamInfo { + SSHashObj* pKey2Idx; SOperatorInfo** ppDownStream; int32_t downStreamNum; +} SGcDownstreamInfo; + +typedef struct SGroupCacheOperatorInfo { + SSHashObj* pSessionHash; + SGroupColsInfo groupColsInfo; + SArray* pBlkBufs; + SSHashObj* pBlkHash; + SGcDownstreamInfo downstreamInfo; + int64_t pCurrentId; + SGcSessionCtx* pCurrent; } SGroupCacheOperatorInfo; #ifdef __cplusplus diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 8281f6484f..5c42323061 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -27,9 +27,19 @@ typedef struct SOperatorCostInfo { struct SOperatorInfo; -typedef struct SOperatorParam { +typedef struct SOperatorBasicParam { + bool newExec; +} SOperatorBasicParam; + +typedef struct SOperatorSpecParam { int32_t opType; void* value; +} SOperatorSpecParam; + +typedef struct SOperatorParam { + SOperatorBasicParam basic; + int32_t opNum; + SOperatorSpecParam* pOpParams; } SOperatorParam; typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); @@ -150,6 +160,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo); + // clang-format on SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 400416dc20..de94a55606 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -72,7 +72,10 @@ static void destroyGroupCacheOperator(void* param) { taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo); taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf); taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage); - taosMemoryFree(pGrpCacheOperator->ppDownStream); + tSimpleHashCleanup(pGrpCacheOperator->pSessionHash); + tSimpleHashCleanup(pGrpCacheOperator->pBlkHash); + tSimpleHashCleanup(pGrpCacheOperator->downstreamInfo.pKey2Idx); + taosMemoryFree(pGrpCacheOperator->downstreamInfo.ppDownStream); taosMemoryFreeClear(param); } @@ -90,6 +93,26 @@ static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) { return TSDB_CODE_SUCCESS; } +static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo* pBlkInfo) { + SBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId); + return pPage->data + pBlkInfo->offset; +} + +static FORCE_INLINE char* moveRetrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo** ppLastBlk) { + if (NULL == *ppLastBlk) { + return NULL; + } + SGcBlkBufInfo* pCurr = (*ppLastBlk)->next; + *ppLastBlk = pCurr; + if (pCurr) { + SBufPageInfo *pPage = taosArrayGet(pBlkBufs, pCurr->pageId); + return pPage->data + pCurr->offset; + } + + return NULL; +} + + static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { pInfo->pBlkBufs = taosArrayInit(32, sizeof(SBufPageInfo)); if (NULL == pInfo->pBlkBufs) { @@ -99,52 +122,132 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { return addPageToGroupCacheBuf(pInfo->pBlkBufs); } -static int32_t getFromSessionCache(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes) { +static int32_t initGroupCacheDownstreamInfo(SGroupCachePhysiNode* pPhyciNode, SOperatorInfo** pDownstream, int32_t numOfDownstream, SGcDownstreamInfo* pInfo) { + pInfo->ppDownStream = taosMemoryMalloc(numOfDownstream * POINTER_BYTES); + if (NULL == pInfo->ppDownStream) { + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES); + pInfo->pKey2Idx = tSimpleHashInit(numOfDownstream, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pInfo->pKey2Idx) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < numOfDownstream; ++i) { + int32_t keyValue = taosArrayGet(pPhyciNode, i); + tSimpleHashPut(pInfo->pKey2Idx, &keyValue, sizeof(keyValue), &i, sizeof(i)); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t initGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) { + SGcSessionCtx ctx = {0}; + SGroupData* pGroup = tSimpleHashGet(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); + if (pGroup) { + ctx.cacheHit = true; + ctx.pLastBlk = pGroup->blks; + } else { + int32_t* pIdx = tSimpleHashGet(pGCache->downstreamInfo.pKey2Idx, &pParam->downstreamKey, sizeof(pParam->downstreamKey)); + if (NULL == pIdx) { + qError("Invalid downstream key value: %d", pParam->downstreamKey); + return TSDB_CODE_INVALID_PARA; + } + ctx.pDownstream = pGCache->downstreamInfo.ppDownStream[*pIdx]; + ctx.needCache = pParam->needCache; + } + + return TSDB_CODE_SUCCESS; +} + +static void getFromSessionCache(SExecTaskInfo* pTaskInfo, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) { + if (pParam->basic.newExec) { + int32_t code = initGroupCacheSession(pGCache, pParam, ppSession); + if (TSDB_CODE_SUCCESS != code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + if ((*ppSession)->pLastBlk) { + *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, (*ppSession)->pLastBlk); + } else { + *ppRes = NULL; + } + return; + } + + SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); + if (NULL == pCtx) { + qError("session %" PRIx64 " not exists", pParam->sessionId); + pTaskInfo->code = TSDB_CODE_INVALID_PARA; + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + *ppSession = pCtx; + + if (pCtx->cacheHit) { + *ppRes = moveRetrieveBlkFromBlkBufs(pGCache->pBlkBufs, &pCtx->pLastBlk); + return; + } + + *ppRes = NULL; +} + +static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo* pGCache, SGcSessionCtx** ppCurrent, int64_t* pCurrentId) { + if (NULL == *ppCurrent) { + return; + } + if (tSimpleHashRemove(pGCache->pSessionHash, pCurrentId, sizeof(*pCurrentId))) { + qError("remove session %" PRIx64 " failed", *pCurrentId); + } + + *ppCurrent = NULL; + *pCurrentId = 0; +} + +static void setCurrentGroupCacheDone(struct SOperatorInfo* pOperator) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; + destroyCurrentGroupCacheSession(pGCache, &pGCache->pCurrent, &pGCache->pCurrentId); +} + +static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) { + *ppRes = pBlock; } static SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator, SOperatorParam* param) { SGroupCacheOperatorInfo* pGCache = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int32_t code = TSDB_CODE_SUCCESS; + SGcSessionCtx* pSession = NULL; SSDataBlock* pRes = NULL; - SGcOperatorParam* pParam = getOperatorParam(pOperator->operatorType, param) - if (NULL == pParam || pOperator->status == OP_EXEC_DONE) { + int32_t code = TSDB_CODE_SUCCESS; + SGcOperatorParam* pParam = getOperatorParam(pOperator->operatorType, param); + if (NULL == pParam) { return NULL; } - code = getFromSessionCache(pGCache, pParam, &pRes, ); + getFromSessionCache(pTaskInfo, pGCache, pParam, &pRes, &pSession); + pGCache->pCurrent = pSession; + pGCache->pCurrentId = pParam->sessionId; + + if (pRes) { + return pRes; + } while (true) { - SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); + SSDataBlock* pBlock = pSession->pDownstream->fpSet.getNextExtFn(pSession->pDownstream, param); if (NULL == pBlock) { - setHJoinDone(pOperator); + setCurrentGroupCacheDone(pOperator); break; } - - code = launchBlockHashJoin(pOperator, pBlock); - if (code) { - pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); - } - if (pRes->info.rows < pOperator->resultInfo.threshold) { - continue; - } - - if (pOperator->exprSupp.pFilterInfo != NULL) { - doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); - } - if (pRes->info.rows > 0) { - break; + if (pGCache->pCurrent->needCache) { + addBlkToGroupCache(pOperator, pBlock, &pRes); } + break; } - return (pRes->info.rows > 0) ? pRes : NULL; + return pRes; } - SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo)); @@ -180,12 +283,10 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - pInfo->ppDownStream = taosMemoryMalloc(numOfDownstream * POINTER_BYTES); - if (NULL == pInfo->ppDownStream) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = initGroupCacheDownstreamInfo(pPhyciNode, pDownstream, numOfDownstream, &pInfo->downstreamInfo); + if (code) { goto _error; } - memcpy(pInfo->ppDownStream, pDownstream, numOfDownstream * POINTER_BYTES); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, getFromGroupCache, NULL); diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 0ffddbd3f6..32035db0a4 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -523,7 +523,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR } else if (QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE == type) { pOptr = createGroupCacheOperatorInfo(ops, size, (SGroupCachePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL == type) { - pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo); + //pOptr = createDynQueryCtrlOperatorInfo(ops, size, (SDynQueryCtrlPhysiNode*)pPhyNode, pTaskInfo); } else { terrno = TSDB_CODE_INVALID_PARA; pTaskInfo->code = terrno; @@ -593,9 +593,15 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf } void *getOperatorParam(int32_t opType, SOperatorParam* param) { - if (NULL == param || opType != param->opType) { + if (NULL == param) { return NULL; } - return param->value; + for (int32_t i = 0; i < param->opNum; ++i) { + if (opType == param->pOpParams[i].opType) { + memcpy(¶m->pOpParams[i], param, sizeof(param->basic)); + return ¶m->pOpParams[i]; + } + } + return NULL; }