From 6763c6f98c5889b955301e5f73dc7738a73468d1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 12 Jul 2023 11:29:54 +0800 Subject: [PATCH] enh: support parallel group cache fetch --- source/libs/executor/inc/executorInt.h | 13 +- source/libs/executor/inc/groupcache.h | 5 - source/libs/executor/src/executor.c | 5 +- source/libs/executor/src/groupcacheoperator.c | 154 +++++++++++------- source/libs/executor/src/operator.c | 49 +++++- 5 files changed, 157 insertions(+), 69 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 2483b73678..9d256bfb49 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -106,7 +106,6 @@ typedef struct SExchangeOpStopInfo { } SExchangeOpStopInfo; typedef struct SGcOperatorParam { - SOperatorParam* pChild; int64_t sessionId; int32_t downstreamIdx; bool needCache; @@ -151,10 +150,20 @@ typedef struct SLimitInfo { typedef struct SSortMergeJoinOperatorParam { } SSortMergeJoinOperatorParam; -typedef struct SExchangeOperatorParam { +typedef struct SExchangeOperatorBasicParam { int32_t vgId; int32_t srcOpType; SArray* uidList; +} SExchangeOperatorBasicParam; + +typedef struct SExchangeOperatorBatchParam { + bool multiParams; + SArray* pBatchs; // SArray +} SExchangeOperatorBatchParam; + +typedef struct SExchangeOperatorParam { + bool multiParams; + SExchangeOperatorBasicParam basic; } SExchangeOperatorParam; typedef struct SExchangeInfo { diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 4b3f3d3a3a..3eba399949 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -88,13 +88,8 @@ typedef struct SGcExecInfo { int64_t* pDownstreamBlkNum; } SGcExecInfo; -typedef struct SGcNewGroupInfo { - int64_t uid; -} SGcNewGroupInfo; - typedef struct SGroupCacheOperatorInfo { TdThreadMutex sessionMutex; - SGcNewGroupInfo newGroup; SSHashObj* pSessionHash; SGroupColsInfo groupColsInfo; bool grpByUid; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index dd4bbdfcad..4a2b00fab4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -516,6 +516,10 @@ bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; } +void destroyOperatorParamValue(void* pValues) { + +} + void destroyOperatorParam(SOperatorParam* pParam) { if (NULL == pParam) { return; @@ -524,7 +528,6 @@ void destroyOperatorParam(SOperatorParam* pParam) { //TODO } - void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) { destroyOperatorParam(((SExecTaskInfo*)tinfo)->pOpParam); ((SExecTaskInfo*)tinfo)->pOpParam = pParam; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index ec352562f4..9491564024 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -105,38 +105,73 @@ static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) { return TSDB_CODE_SUCCESS; } +static void addBlkToBlkBufs(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcBlkBufInfo** ppBuf) { + *ppRes = pBlock; +} + static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo* pBlkInfo) { SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId); return pPage->data + pBlkInfo->offset; } -static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx) { +static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SOperatorParam** ppParam) { + int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx]; - - taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); - - if (NULL == taosArrayPush(pCtx->pGrpUidList, &pGCache->newGroup.uid)) { - return TSDB_CODE_OUT_OF_MEMORY; + SOperatorParam* pDst = NULL; + + taosWLockLatch(&pCtx->lock); + int32_t num = taosArrayGetSize(pCtx->pNewGrpList); + if (num <= 0) { + goto _return; } + + for (int32_t i = 0; i < num; ++i) { + SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i); + if (NULL == taosArrayPush(pCtx->pGrpUidList, &pNew->uid)) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; + } + if (num > 1) { + if (0 == i) { + pDst = pNew->pParam; + } else { + code = mergeOperatorParams(pDst, pNew->pParam); + if (code) { + goto _return; + } + } + } else { + pDst = pNew->pParam; + } + } + + taosArrayClear(pCtx->pNewGrpList); - pGCache->newGroup.uid = 0; - taosThreadMutexUnlock(&pGCache->sessionMutex); +_return: + + taosWUnLockLatch(&pCtx->lock); + *ppParam = pDst; - return TSDB_CODE_SUCCESS; + return code; } static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; + SOperatorParam* pDownstreamParam = NULL; + SSDataBlock* pBlock = NULL; SGroupCacheOperatorInfo* pGCache = pOperator->info; - if (pGCache->pDownstreams[downstreamIdx].pNewGrpList) { - code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pGCache->newGroup.uid); - if (code) { - return code; - } + code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam); + if (code) { + return code; } - - SSDataBlock* pBlock = getNextBlockFromDownstreamOnce(pOperator, downstreamIdx); + + if (pDownstreamParam) { + pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextExtFn(pOperator->pDownstream[downstreamIdx], pDownstreamParam); + } else { + pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]); + } + if (pBlock) { pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; } @@ -146,10 +181,6 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p return TSDB_CODE_SUCCESS; } -static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SSDataBlock** ppRes) { - *ppRes = pBlock; -} - static void notifyWaitingSessions(SArray* pWaitQueue) { if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) { return; @@ -171,7 +202,7 @@ int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBloc if (pGroup->needCache) { SGcBlkBufInfo* pNewBlk = NULL; - code = addBlkToGroupCache(pOperator, pBlock, &pNewBlk); + code = addBlkToBlkBufs(pOperator, pBlock, &pNewBlk); if (code) { return code; } @@ -197,7 +228,21 @@ int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBloc } static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) { - notifyWaitingSessions(); + int32_t code = TSDB_CODE_SUCCESS; + SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; + int32_t uidNum = taosArrayGetSize(pCtx->pGrpUidList); + + for (int32_t i = 0; i < uidNum; ++i) { + int64_t* pUid = taosArrayGet(pCtx->pGrpUidList, i); + SGroupCacheData* pGroup = taosHashGet(pGCache->pBlkHash, pUid, sizeof(*pUid)); + pGroup->pBlock = NULL; + pGroup->fetchDone = true; + notifyWaitingSessions(pGroup->waitQueue); + } + taosArrayClear(pCtx->pGrpUidList); + + return TSDB_CODE_SUCCESS; } static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession, SSDataBlock** ppRes) { @@ -334,15 +379,16 @@ static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { return addPageToGroupCacheBuf(pInfo->pBlkBufs); } -static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGroupCacheData** ppGrp) { +static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp) { SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGcOperatorParam* pGcParam = pParam->value; SGroupCacheData grpData = {0}; - grpData.needCache = pParam->needCache; + grpData.needCache = pGcParam->needCache; while (true) { - if (0 != taosHashPut(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize, &grpData, sizeof(grpData))) { + if (0 != taosHashPut(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize, &grpData, sizeof(grpData))) { if (terrno == TSDB_CODE_DUP_KEY) { - *ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); + *ppGrp = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize); if (*ppGrp) { break; } @@ -351,11 +397,12 @@ static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SGcOpera } } - *ppGrp = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); + *ppGrp = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize); if (*ppGrp) { SGcNewGroupInfo newGroup; - newGroup.uid = *(int64_t*)pParam->pGroupValue; - newGroup.pParam = pOperator->pDownstreamParams[pParam->downstreamIdx]; + newGroup.uid = *(int64_t*)pGcParam->pGroupValue; + newGroup.pParam = taosArrayGet(pParam->pChildren, 0); + taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); if (NULL == taosArrayPush(pGCache->pDownstreams[pParam->downstreamIdx].pNewGrpList, &newGroup)) { taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); @@ -370,11 +417,12 @@ static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SGcOpera return TSDB_CODE_SUCCESS; } -static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperatorParam* pParam, SGcSessionCtx** ppSession) { +static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) { SGcSessionCtx ctx = {0}; int32_t code = 0; + SGcOperatorParam* pGcParam = pParam->value; SGroupCacheOperatorInfo* pGCache = pOperator->info; - SGroupCacheData* pGroup = taosHashAcquire(pGCache->pBlkHash, pParam->pGroupValue, pParam->groupValueSize); + SGroupCacheData* pGroup = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize); if (pGroup) { ctx.pGroupData = pGroup; } else { @@ -383,32 +431,28 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato return code; } } - - taosThreadMutexUnlock(&pGCache->sessionMutex); - ctx.pParam = pParam; + ctx.pParam = pGcParam; - int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId), &ctx, sizeof(ctx)); + int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx)); if (TSDB_CODE_SUCCESS != code) { return code; } - *ppSession = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); + *ppSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); return TSDB_CODE_SUCCESS; } -static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes) { +static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) { SGroupCacheOperatorInfo* pGCache = pOperator->info; - SGcOperatorParam* pGcParam = pOperator->pOperatorParam->value; + SGcOperatorParam* pGcParam = pParam->value; SGcSessionCtx* pSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (NULL == pSession) { - int32_t code = initGroupCacheSession(pOperator, pGcParam, &pSession); + int32_t code = initGroupCacheSession(pOperator, pParam, &pSession); if (TSDB_CODE_SUCCESS != code) { return code; } - } else { - taosThreadMutexUnlock(&pGCache->sessionMutex); } return getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes); @@ -426,19 +470,6 @@ static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo *pCurrentId = 0; } -SSDataBlock* getBlkFromGroupCache(struct SOperatorInfo* pOperator) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSDataBlock* pRes = NULL; - - int32_t code = getBlkFromSessionCache(pOperator, &pRes); - if (TSDB_CODE_SUCCESS != code) { - pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); - } - - return pRes; -} - static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) { SGroupCacheOperatorInfo* pInfo = pOperator->info; pInfo->execInfo.downstreamNum = pOperator->numOfDownstream; @@ -462,23 +493,26 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { if (NULL == pInfo->pDownstreams[i].pGrpUidList) { return TSDB_CODE_OUT_OF_MEMORY; } + + pInfo->pDownstreams[i].pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo)); + if (NULL == pInfo->pDownstreams[i].pNewGrpList) { + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; } SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { - SGroupCacheOperatorInfo* pGCache = pOperator->info; - - taosThreadMutexLock(&pGCache->sessionMutex); + SSDataBlock* pBlock = NULL; - int32_t code = setOperatorParams(pOperator, pParam); + int32_t code = getBlkFromGroupCache(pOperator, &pBlock, pParam); if (TSDB_CODE_SUCCESS != code) { pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); } - return getBlkFromGroupCache(pOperator); + return pBlock; } @@ -542,7 +576,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getBlkFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL); return pOperator; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 6521583053..f3ca619a25 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -605,6 +605,46 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf return TSDB_CODE_SUCCESS; } +int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) { + if (pDst->opType != pSrc->opType) { + qError("different optype %d:%d for merge operator params", pDst->opType, pSrc->opType); + return TSDB_CODE_INVALID_PARA; + } + + switch (pDst->opType) { + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: { + SExchangeOperatorParam* pDExc = pDst->value; + SExchangeOperatorParam* pSExc = pSrc->value; + if (!pDExc->multiParams) { + SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam)); + if (NULL == pBatch) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pBatch->multiParams = true; + pBatch->pBatchs = taosArrayInit(4, sizeof(SExchangeOperatorBasicParam)); + if (NULL == pBatch->pBatchs) { + taosMemoryFree(pBatch); + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pBatch->pBatchs, &pDExc->basic); + taosArrayPush(pBatch->pBatchs, &pSExc->basic); + destroyOperatorParamValue(pDst->value); + pDst->value = pBatch; + } else { + SExchangeOperatorBatchParam* pBatch = pDst->value; + taosArrayPush(pBatch->pBatchs, &pSExc->basic); + } + break; + } + default: + qError("invalid optype %d for merge operator params", (*ppDst)->opType); + return TSDB_CODE_INVALID_PARA; + } + + return TSDB_CODE_SUCCESS; +} + + int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { if (NULL == pParam) { pOperator->pOperatorParam = NULL; @@ -637,7 +677,14 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara for (int32_t i = 0; i < childrenNum; ++i) { SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOperator->pOperatorParam->pChildren, i); - pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild; + if (pOperator->pDownstreamParams[pChild->downstreamIdx]) { + int32_t code = mergeOperatorParams(&pOperator->pDownstreamParams[pChild->downstreamIdx], pChild); + if (code) { + return code; + } + } else { + pOperator->pDownstreamParams[pChild->downstreamIdx] = pChild; + } } taosArrayClear(pOperator->pOperatorParam->pChildren);