From 2312f10372324acbe4f22b24113196a475350fb1 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 14 Jul 2023 13:30:13 +0800 Subject: [PATCH] enh: support concurrent fetch from group cache --- include/libs/nodes/plannodes.h | 4 +- source/libs/executor/inc/executorInt.h | 9 +- source/libs/executor/inc/groupcache.h | 79 +++- .../libs/executor/src/dynqueryctrloperator.c | 23 +- source/libs/executor/src/exchangeoperator.c | 39 +- source/libs/executor/src/groupcacheoperator.c | 408 ++++++++++++++---- source/libs/executor/src/operator.c | 41 +- source/libs/nodes/src/nodesCodeFuncs.c | 35 ++ source/libs/nodes/src/nodesMsgFuncs.c | 22 + source/libs/planner/src/planOptimizer.c | 6 + source/libs/planner/src/planPhysiCreater.c | 2 + 11 files changed, 519 insertions(+), 149 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c9278a1961..c235b693b3 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -160,6 +160,8 @@ typedef struct SInterpFuncLogicNode { typedef struct SGroupCacheLogicNode { SLogicNode node; bool grpColsMayBeNull; + bool grpByUid; + bool globalGrp; SNodeList* pGroupCols; } SGroupCacheLogicNode; @@ -441,8 +443,8 @@ typedef struct SHashJoinPhysiNode { typedef struct SGroupCachePhysiNode { SPhysiNode node; bool grpColsMayBeNull; - SArray* pDownstreamKey; bool grpByUid; + bool globalGrp; SNodeList* pGroupCols; } SGroupCachePhysiNode; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 9d256bfb49..f14547d1bc 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -109,8 +109,8 @@ typedef struct SGcOperatorParam { int64_t sessionId; int32_t downstreamIdx; bool needCache; - void* pGroupValue; - int32_t groupValueSize; + int32_t vgId; + int64_t tbUid; } SGcOperatorParam; typedef struct SExprSupp { @@ -158,7 +158,7 @@ typedef struct SExchangeOperatorBasicParam { typedef struct SExchangeOperatorBatchParam { bool multiParams; - SArray* pBatchs; // SArray + SSHashObj* pBatchs; // SExchangeOperatorBasicParam } SExchangeOperatorBatchParam; typedef struct SExchangeOperatorParam { @@ -717,6 +717,9 @@ uint64_t calcGroupId(char* pData, int32_t len); void streamOpReleaseState(struct SOperatorInfo* pOperator); void streamOpReloadState(struct SOperatorInfo* pOperator); +void destroyOperatorParamValue(void* pValues); +int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 3eba399949..a0274ff001 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -19,21 +19,27 @@ extern "C" { #endif -#define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760 +#define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600 #pragma pack(push, 1) typedef struct SGcBlkBufInfo { + void* prev; void* next; - uint16_t pageId; - int32_t offset; + int64_t blkId; + int64_t offset; + int64_t bufSize; + void* pBuf; + uint32_t fileId; } SGcBlkBufInfo; #pragma pack(pop) -typedef struct SGcBufPageInfo { - int32_t pageSize; - int32_t offset; - char* data; -} SGcBufPageInfo; + +typedef struct SGcVgroupCtx { + SArray* pTbList; + uint64_t lastUid; + int64_t fileSize; + uint32_t fileId; +} SGcVgroupCtx; typedef struct SGroupCacheData { TdThreadMutex mutex; @@ -41,8 +47,13 @@ typedef struct SGroupCacheData { bool fetchDone; bool needCache; SSDataBlock* pBlock; - SGcBlkBufInfo* pFirstBlk; - SGcBlkBufInfo* pLastBlk; + SGcVgroupCtx* pVgCtx; + int32_t downstreamIdx; + int32_t vgId; + uint32_t fileId; + int64_t startBlkId; + int64_t endBlkId; + int64_t startOffset; } SGroupCacheData; typedef struct SGroupColInfo { @@ -62,15 +73,22 @@ typedef struct SGroupColsInfo { } SGroupColsInfo; typedef struct SGcNewGroupInfo { - int64_t uid; - SOperatorParam* pParam; + int32_t vgId; + int64_t uid; + SGroupCacheData* pGroup; + SOperatorParam* pParam; } SGcNewGroupInfo; typedef struct SGcDownstreamCtx { - SRWLatch lock; + SRWLatch grpLock; int64_t fetchSessionId; SArray* pNewGrpList; // SArray - SArray* pGrpUidList; + SSHashObj* pVgTbHash; + SHashObj* pGrpHash; + SRWLatch blkLock; + SSDataBlock* pBaseBlock; + SArray* pFreeBlock; + int64_t lastBlkUid; } SGcDownstreamCtx; typedef struct SGcSessionCtx { @@ -78,7 +96,8 @@ typedef struct SGcSessionCtx { bool needCache; SGcOperatorParam* pParam; SGroupCacheData* pGroupData; - SGcBlkBufInfo* pLastBlk; + int64_t lastBlkId; + int64_t nextOffset; bool semInit; tsem_t waitSem; } SGcSessionCtx; @@ -88,14 +107,38 @@ typedef struct SGcExecInfo { int64_t* pDownstreamBlkNum; } SGcExecInfo; +typedef struct SGcCacheFile { + uint32_t grpNum; + uint32_t grpDone; + int64_t fileSize; +} SGcCacheFile; + +typedef struct SGcReadBlkInfo { + SSDataBlock* pBlock; + int64_t nextOffset; +} SGcReadBlkInfo; + +typedef struct SGcBlkCacheInfo { + SRWLatch dirtyLock; + SSHashObj* pCacheFile; + SHashObj* pDirtyBlk; + SGcBlkBufInfo* pDirtyHead; + SGcBlkBufInfo* pDirtyTail; + SHashObj* pReadBlk; + int64_t blkCacheSize; +} SGcBlkCacheInfo; + typedef struct SGroupCacheOperatorInfo { TdThreadMutex sessionMutex; - SSHashObj* pSessionHash; + int64_t maxCacheSize; + int64_t currentBlkId; + SHashObj* pSessionHash; SGroupColsInfo groupColsInfo; + bool globalGrp; bool grpByUid; SGcDownstreamCtx* pDownstreams; - SArray* pBlkBufs; - SHashObj* pBlkHash; + SGcBlkCacheInfo blkCache; + SHashObj* pGrpHash; SGcExecInfo execInfo; } SGroupCacheOperatorInfo; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 1980f77fe0..c8f7ece8fa 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -36,7 +36,7 @@ static void destroyDynQueryCtrlOperator(void* param) { taosMemoryFreeClear(param); } -static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, bool needCache, void* pGrpValue, int32_t grpValSize, SOperatorParam* pChild) { +static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, bool needCache, int32_t vgId, int64_t tbUid, SOperatorParam* pChild) { *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); if (NULL == *ppRes) { return TSDB_CODE_OUT_OF_MEMORY; @@ -57,8 +57,8 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1); pGc->downstreamIdx = downstreamIdx; pGc->needCache = needCache; - pGc->pGroupValue = pGrpValue; - pGc->groupValueSize = grpValSize; + pGc->vgId = vgId; + pGc->tbUid = tbUid; (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; (*ppRes)->downstreamIdx = downstreamIdx; @@ -78,15 +78,16 @@ static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, i if (NULL == pExc) { return TSDB_CODE_OUT_OF_MEMORY; } - - pExc->vgId = *pVgId; - pExc->srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; - pExc->uidList = taosArrayInit(1, sizeof(int64_t)); - if (NULL == pExc->uidList) { + + pExc->multiParams = false; + pExc->basic.vgId = *pVgId; + pExc->basic.srcOpType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; + pExc->basic.uidList = taosArrayInit(1, sizeof(int64_t)); + if (NULL == pExc->basic.uidList) { taosMemoryFree(pExc); return TSDB_CODE_OUT_OF_MEMORY; } - taosArrayPush(pExc->uidList, pUid); + taosArrayPush(pExc->basic.uidList, pUid); (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; (*ppRes)->downstreamIdx = downstreamIdx; @@ -145,10 +146,10 @@ static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJ code = buildExchangeOperatorParam(&pExcParam1, 1, rightVg, rightUid, NULL); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, leftUid, pUid0->info.bytes, pExcParam0); + code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, *leftVg, *leftUid, pExcParam0); } if (TSDB_CODE_SUCCESS == code) { - code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, rightUid, pUid1->info.bytes, pExcParam1); + code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, *rightVg, *rightUid, pExcParam1); } if (TSDB_CODE_SUCCESS == code) { code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 7e6fa6830c..b6edfd5820 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -745,24 +745,45 @@ _error: return code; } -int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { +int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) { SExchangeInfo* pExchangeInfo = pOperator->info; - SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam->value; - int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pParam->vgId, sizeof(pParam->vgId)); + int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId)); if (NULL == pIdx) { - qError("No exchange source for vgId: %d", pParam->vgId); - pOperator->pTaskInfo->code = TSDB_CODE_INVALID_PARA; - T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + qError("No exchange source for vgId: %d", pBasicParam->vgId); + return TSDB_CODE_INVALID_PARA; } - + SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo); dataInfo.index = *pIdx; - dataInfo.pSrcUidList = taosArrayDup(pParam->uidList, NULL); - dataInfo.srcOpType = pParam->srcOpType; + dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); + dataInfo.srcOpType = pBasicParam->srcOpType; taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); + return TSDB_CODE_SUCCESS; +} + + +int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { + SExchangeInfo* pExchangeInfo = pOperator->info; + int32_t code = TSDB_CODE_SUCCESS; + SExchangeOperatorBasicParam* pBasicParam = NULL; + SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam->value; + if (pParam->multiParams) { + SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorParam->value; + int32_t iter = 0; + while (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter)) { + code = addSingleExchangeSource(pOperator, pBasicParam); + if (code) { + return code; + } + } + } else { + pBasicParam = &pParam->basic; + code = addSingleExchangeSource(pOperator, pBasicParam); + } + pOperator->pOperatorParam = NULL; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 9491564024..137c3e6116 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -61,11 +61,6 @@ static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, S return TSDB_CODE_SUCCESS; } -static void freeGroupCacheBufPage(void* param) { - SGcBufPageInfo* pInfo = (SGcBufPageInfo*)param; - taosMemoryFree(pInfo->data); -} - static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) { char* buf = taosMemoryMalloc(pGrpCacheOperator->execInfo.downstreamNum * 32 + 100); if (NULL == buf) { @@ -85,33 +80,155 @@ static void destroyGroupCacheOperator(void* param) { taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo); taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf); - taosArrayDestroyEx(pGrpCacheOperator->pBlkBufs, freeGroupCacheBufPage); - tSimpleHashCleanup(pGrpCacheOperator->pSessionHash); - taosHashCleanup(pGrpCacheOperator->pBlkHash); + taosHashCleanup(pGrpCacheOperator->pSessionHash); + taosHashCleanup(pGrpCacheOperator->pGrpHash); taosMemoryFreeClear(param); } -static FORCE_INLINE int32_t addPageToGroupCacheBuf(SArray* pBlkBufs) { - SGcBufPageInfo page; - page.pageSize = GROUP_CACHE_DEFAULT_PAGE_SIZE; - page.offset = 0; - page.data = taosMemoryMalloc(page.pageSize); - if (NULL == page.data) { +static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { + if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId), pBufInfo, sizeof(*pBufInfo))) { return TSDB_CODE_OUT_OF_MEMORY; } + pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId)); + + taosWLockLatch(&pCache->dirtyLock); + if (NULL == pCache->pDirtyHead) { + pCache->pDirtyHead = pBufInfo; + } else { + pBufInfo->prev = pCache->pDirtyTail; + pCache->pDirtyTail->next = pBufInfo; + } + pCache->pDirtyTail = pBufInfo; + taosWUnLockLatch(&pCache->dirtyLock); + + int64_t blkCacheSize = atomic_add_fetch_64(&pCache->blkCacheSize, pBufInfo->bufSize); + qDebug("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), blkCacheSize); + + if (pGCache->maxCacheSize > 0 && blkCacheSize > pGCache->maxCacheSize) { + //TODO + } - taosArrayPush(pBlkBufs, &page); return TSDB_CODE_SUCCESS; } -static void addBlkToBlkBufs(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcBlkBufInfo** ppBuf) { - *ppRes = pBlock; +static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) { + SGroupCacheOperatorInfo* pGCache = pOperator->info; + int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t); + pBufInfo->pBuf = taosMemoryMalloc(bufSize); + if (NULL == pBufInfo->pBuf) { + qError("group cache add block to cache failed, size:%" PRId64, bufSize); + return TSDB_CODE_OUT_OF_MEMORY; + } + blockDataToBuf(pBufInfo->pBuf, pBlock); + + pBufInfo->prev = NULL; + pBufInfo->next = NULL; + pBufInfo->blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1); + pBufInfo->fileId = pGroup->fileId; + pBufInfo->offset = pGroup->pVgCtx->fileSize; + pBufInfo->bufSize = bufSize; + + pGroup->pVgCtx->fileSize += bufSize; + + int32_t code = addBlkToDirtyBufList(pGCache, &pGCache->blkCache, pBufInfo); + + return code; } -static FORCE_INLINE char* retrieveBlkFromBlkBufs(SArray* pBlkBufs, SGcBlkBufInfo* pBlkInfo) { - SGcBufPageInfo *pPage = taosArrayGet(pBlkBufs, pBlkInfo->pageId); - return pPage->data + pBlkInfo->offset; + +static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) { + *ppDst = taosMemoryMalloc(sizeof(*pSrc)); + if (NULL == *ppDst) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppDst)->pBlockAgg = NULL; + (*ppDst)->pDataBlock = taosArrayDup(pSrc->pDataBlock, NULL); + if (NULL == (*ppDst)->pDataBlock) { + taosMemoryFree(*ppDst); + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info)); + return TSDB_CODE_SUCCESS; +} + +static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** ppRes) { + taosWLockLatch(&pCtx->blkLock); + if (taosArrayGetSize(pCtx->pFreeBlock) <= 0) { + taosWUnLockLatch(&pCtx->blkLock); + return buildGroupCacheBaseBlock(ppRes, pCtx->pBaseBlock); + } + *ppRes = taosArrayPop(pCtx->pFreeBlock); + taosWUnLockLatch(&pCtx->blkLock); + + return TSDB_CODE_SUCCESS; +} + +static int32_t releaseBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) { + taosWLockLatch(&pCtx->blkLock); + taosArrayPush(pCtx->pFreeBlock, &pBlock); + taosWUnLockLatch(&pCtx->blkLock); + + return TSDB_CODE_SUCCESS; +} + + +static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, SGcBlkBufInfo* pBufInfo, SSDataBlock** ppRes) { + int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes); + if (code) { + return code; + } + //TODO OPTIMIZE PERF + return blockDataFromBuf(*ppRes, pBufInfo->pBuf); +} + +static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + SGcBlkCacheInfo* pCache = &pGCache->blkCache; + SGcReadBlkInfo* pReadBlk = taosHashAcquire(pCache->pReadBlk, &blkId, sizeof(blkId)); + if (pReadBlk) { + *ppRes = pReadBlk->pBlock; + *nextOffset = pReadBlk->nextOffset; + return TSDB_CODE_SUCCESS; + } + + taosRLockLatch(&pCache->dirtyLock); + SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &blkId, sizeof(blkId)); + if (pBufInfo) { + code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBufInfo, ppRes); + taosRUnLockLatch(&pCache->dirtyLock); + if (code) { + return code; + } + + *nextOffset = pBufInfo->offset + pBufInfo->bufSize; + SGcReadBlkInfo readBlk = {.pBlock = *ppRes, .nextOffset = *nextOffset}; + taosHashPut(pCache->pReadBlk, &blkId, sizeof(blkId), &readBlk, sizeof(readBlk)); + return TSDB_CODE_SUCCESS; + } + taosRUnLockLatch(&pCache->dirtyLock); + + //TODO READ FROM FILE + code = TSDB_CODE_INVALID_PARA; + return code; +} + +static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) { + SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx; + if (NULL == pVgCtx) { + SArray* pList = taosArrayInit(10, sizeof(*pNew)); + if (NULL == pList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pList, pNew); + SGcVgroupCtx vgCtx = {.pTbList = pList, .lastUid = 0, .fileSize = 0, .fileId = 0}; + tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)); + pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId)); + return TSDB_CODE_SUCCESS; + } + + taosArrayPush(pVgCtx->pTbList, pNew); + return TSDB_CODE_SUCCESS; } static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SOperatorParam** ppParam) { @@ -120,7 +237,7 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx]; SOperatorParam* pDst = NULL; - taosWLockLatch(&pCtx->lock); + taosWLockLatch(&pCtx->grpLock); int32_t num = taosArrayGetSize(pCtx->pNewGrpList); if (num <= 0) { goto _return; @@ -128,10 +245,11 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp for (int32_t i = 0; i < num; ++i) { SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i); - if (NULL == taosArrayPush(pCtx->pGrpUidList, &pNew->uid)) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = addNewGroupToVgHash(pCtx->pVgTbHash, pNew); + if (code) { goto _return; } + if (num > 1) { if (0 == i) { pDst = pNew->pParam; @@ -150,7 +268,7 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp _return: - taosWUnLockLatch(&pCtx->lock); + taosWUnLockLatch(&pCtx->grpLock); *ppParam = pDst; return code; @@ -174,11 +292,18 @@ static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* p if (pBlock) { pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++; + if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) { + code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock); + if (code) { + return code; + } + taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock); + } } - + *ppRes = pBlock; - return TSDB_CODE_SUCCESS; + return code; } static void notifyWaitingSessions(SArray* pWaitQueue) { @@ -193,26 +318,70 @@ static void notifyWaitingSessions(SArray* pWaitQueue) { } } -int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) { +static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) { + pGroup->pBlock = NULL; + pGroup->fetchDone = true; + + taosThreadMutexLock(&pGroup->mutex); + notifyWaitingSessions(pGroup->waitQueue); + taosArrayClear(pGroup->waitQueue); + taosThreadMutexUnlock(&pGroup->mutex); +} + +static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) { + if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) { + return; + } + pCtx->lastBlkUid = uid; + pGroup->pVgCtx->lastUid = uid; + + int32_t i = 0; + while (true) { + SGcNewGroupInfo* pNew = taosArrayGet(pGroup->pVgCtx->pTbList, i++); + if (NULL == pNew || pNew->uid == uid) { + break; + } + handleGroupFetchDone(pNew->pGroup); + } + + if (pGroup->pVgCtx->fileSize >= GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { + pGroup->pVgCtx->fileId++; + pGroup->pVgCtx->fileSize = 0; + } + + pGroup->fileId = pGroup->pVgCtx->fileId; + pGroup->startOffset = pGroup->pVgCtx->fileSize; +} + +static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) { int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; + SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; + if (pGCache->grpByUid) { - SGroupCacheData* pGroup = taosHashGet(pGCache->pBlkHash, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); + SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); + if (NULL == pGroup) { + qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.uid); + return TSDB_CODE_INVALID_PARA; + } + + handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.uid); + pGroup->pBlock = pBlock; if (pGroup->needCache) { - SGcBlkBufInfo* pNewBlk = NULL; - code = addBlkToBlkBufs(pOperator, pBlock, &pNewBlk); + SGcBlkBufInfo newBlkBuf; + code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf); if (code) { return code; } - if (pGroup->pLastBlk) { - pGroup->pLastBlk->next = pNewBlk; - pGroup->pLastBlk = pNewBlk; + if (pGroup->endBlkId > 0) { + pGroup->endBlkId = newBlkBuf.blkId; } else { - pGroup->pFirstBlk = pNewBlk; - pGroup->pLastBlk = pNewBlk; + pGroup->startBlkId = newBlkBuf.blkId; + pGroup->endBlkId = newBlkBuf.blkId; } } @@ -231,17 +400,19 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; - int32_t uidNum = taosArrayGetSize(pCtx->pGrpUidList); - - for (int32_t i = 0; i < uidNum; ++i) { - int64_t* pUid = taosArrayGet(pCtx->pGrpUidList, i); - SGroupCacheData* pGroup = taosHashGet(pGCache->pBlkHash, pUid, sizeof(*pUid)); - pGroup->pBlock = NULL; - pGroup->fetchDone = true; - notifyWaitingSessions(pGroup->waitQueue); + int32_t uidNum = 0; + SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; + SGcVgroupCtx* pVgCtx = NULL; + int32_t iter = 0; + while (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter)) { + uidNum = taosArrayGetSize(pVgCtx->pTbList); + for (int32_t i = 0; i < uidNum; ++i) { + SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i); + handleGroupFetchDone(pNew->pGroup); + } + taosArrayClear(pVgCtx->pTbList); } - taosArrayClear(pCtx->pGrpUidList); - + return TSDB_CODE_SUCCESS; } @@ -267,6 +438,7 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator } static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGroup, SGcSessionCtx* pSession, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; if (NULL == pGroup->waitQueue) { pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES); if (NULL == pGroup->waitQueue) { @@ -287,18 +459,22 @@ static int32_t groupCacheSessionWait(SGroupCacheOperatorInfo* pGCache, SGroupCac tsem_wait(&pSession->waitSem); if (pSession->pGroupData->needCache) { - if (NULL == pSession->pLastBlk) { - if (pSession->pGroupData->pFirstBlk) { - *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk); - pSession->pLastBlk = pSession->pGroupData->pFirstBlk; - return TSDB_CODE_SUCCESS; + if (pSession->lastBlkId < 0) { + int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); + if (startBlkId > 0) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, startBlkId, &pSession->nextOffset, ppRes); + pSession->lastBlkId = startBlkId; + return code; } else if (pGroup->fetchDone) { *ppRes = NULL; return TSDB_CODE_SUCCESS; } - } else if (pSession->pLastBlk->next) { - *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pLastBlk->next); - pSession->pLastBlk = pSession->pLastBlk->next; + } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); + pSession->lastBlkId++; + return code; + } else if (pGroup->fetchDone) { + *ppRes = NULL; return TSDB_CODE_SUCCESS; } } else { @@ -317,18 +493,22 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 while (true) { if (pSession->pGroupData->needCache) { - if (NULL == pSession->pLastBlk) { - if (pSession->pGroupData->pFirstBlk) { - *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pGroupData->pFirstBlk); - pSession->pLastBlk = pSession->pGroupData->pFirstBlk; + if (pSession->lastBlkId < 0) { + int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); + if (startBlkId > 0) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, startBlkId, &pSession->nextOffset, ppRes); + pSession->lastBlkId = startBlkId; goto _return; } - } else if (pSession->pLastBlk->next) { - *ppRes = retrieveBlkFromBlkBufs(pGCache->pBlkBufs, pSession->pLastBlk->next); - pSession->pLastBlk = pSession->pLastBlk->next; + } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); + pSession->lastBlkId++; + goto _return; + } else if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { + *ppRes = NULL; goto _return; } - } else if (pSession->pGroupData->pBlock || pSession->pGroupData->fetchDone) { + } else if (pSession->pGroupData->pBlock || atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { *ppRes = pSession->pGroupData->pBlock; pSession->pGroupData->pBlock = NULL; goto _return; @@ -370,25 +550,48 @@ _return: } -static int32_t initGroupCacheBufPages(SGroupCacheOperatorInfo* pInfo) { - pInfo->pBlkBufs = taosArrayInit(32, sizeof(SGcBufPageInfo)); - if (NULL == pInfo->pBlkBufs) { +static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { + SGcBlkCacheInfo* pCache = &pInfo->blkCache; + pCache->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pCache->pCacheFile) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pCache->pDirtyBlk) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pCache->pReadBlk) { return TSDB_CODE_OUT_OF_MEMORY; } - return addPageToGroupCacheBuf(pInfo->pBlkBufs); + return TSDB_CODE_SUCCESS; } -static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp) { +static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcOperatorParam* pGcParam) { + taosThreadMutexInit(&pGroup->mutex, NULL); + pGroup->needCache = pGcParam->needCache; + pGroup->downstreamIdx = pGcParam->downstreamIdx; + pGroup->vgId = pGcParam->vgId; + pGroup->fileId = -1; + pGroup->startBlkId = -1; + pGroup->endBlkId = -1; + pGroup->startOffset = -1; + pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); +} + +static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp) { SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcOperatorParam* pGcParam = pParam->value; + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; + SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGroupCacheData grpData = {0}; - grpData.needCache = pGcParam->needCache; + initNewGroupData(pCtx, &grpData, pGcParam); while (true) { - if (0 != taosHashPut(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize, &grpData, sizeof(grpData))) { + if (0 != taosHashPut(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid), &grpData, sizeof(grpData))) { if (terrno == TSDB_CODE_DUP_KEY) { - *ppGrp = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize); + *ppGrp = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); if (*ppGrp) { break; } @@ -397,18 +600,20 @@ static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SOperato } } - *ppGrp = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize); + *ppGrp = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); if (*ppGrp) { SGcNewGroupInfo newGroup; - newGroup.uid = *(int64_t*)pGcParam->pGroupValue; + newGroup.pGroup = *ppGrp; + newGroup.vgId = pGcParam->vgId; + newGroup.uid = pGcParam->tbUid; newGroup.pParam = taosArrayGet(pParam->pChildren, 0); - taosWLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); - if (NULL == taosArrayPush(pGCache->pDownstreams[pParam->downstreamIdx].pNewGrpList, &newGroup)) { - taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); + taosWLockLatch(&pCtx->grpLock); + if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) { + taosWUnLockLatch(&pCtx->grpLock); return TSDB_CODE_OUT_OF_MEMORY; } - taosWUnLockLatch(&pGCache->pDownstreams[pParam->downstreamIdx].lock); + taosWUnLockLatch(&pCtx->grpLock); break; } @@ -418,15 +623,18 @@ static int32_t initGroupCacheGroupData(struct SOperatorInfo* pOperator, SOperato } static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) { + int32_t code = TSDB_CODE_SUCCESS; SGcSessionCtx ctx = {0}; - int32_t code = 0; SGcOperatorParam* pGcParam = pParam->value; SGroupCacheOperatorInfo* pGCache = pOperator->info; - SGroupCacheData* pGroup = taosHashAcquire(pGCache->pBlkHash, pGcParam->pGroupValue, pGcParam->groupValueSize); + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; + SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; + + SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); if (pGroup) { ctx.pGroupData = pGroup; } else { - code = initGroupCacheGroupData(pOperator, pParam, &ctx.pGroupData); + code = addNewGroupData(pOperator, pParam, &ctx.pGroupData); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -434,12 +642,12 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP ctx.pParam = pGcParam; - int32_t code = tSimpleHashPut(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx)); + code = taosHashPut(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx)); if (TSDB_CODE_SUCCESS != code) { return code; } - *ppSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + *ppSession = taosHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); return TSDB_CODE_SUCCESS; } @@ -447,7 +655,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) { SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcOperatorParam* pGcParam = pParam->value; - SGcSessionCtx* pSession = tSimpleHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); + SGcSessionCtx* pSession = taosHashGet(pGCache->pSessionHash, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); if (NULL == pSession) { int32_t code = initGroupCacheSession(pOperator, pParam, &pSession); if (TSDB_CODE_SUCCESS != code) { @@ -462,7 +670,7 @@ static FORCE_INLINE void destroyCurrentGroupCacheSession(SGroupCacheOperatorInfo if (NULL == *ppCurrent) { return; } - if (tSimpleHashRemove(pGCache->pSessionHash, pCurrentId, sizeof(*pCurrentId))) { + if (taosHashRemove(pGCache->pSessionHash, pCurrentId, sizeof(*pCurrentId))) { qError("remove session %" PRIx64 " failed", *pCurrentId); } @@ -489,8 +697,9 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { pInfo->pDownstreams[i].fetchSessionId = -1; - pInfo->pDownstreams[i].pGrpUidList = taosArrayInit(10, sizeof(int64_t)); - if (NULL == pInfo->pDownstreams[i].pGrpUidList) { + pInfo->pDownstreams[i].lastBlkUid = 0; + pInfo->pDownstreams[i].pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pInfo->pDownstreams[i].pVgTbHash) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -498,6 +707,17 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { if (NULL == pInfo->pDownstreams[i].pNewGrpList) { return TSDB_CODE_OUT_OF_MEMORY; } + if (!pInfo->globalGrp) { + pInfo->pDownstreams[i].pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (pInfo->pDownstreams[i].pGrpHash == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + pInfo->pDownstreams[i].pFreeBlock = taosArrayInit(10, POINTER_BYTES); + if (NULL == pInfo->pDownstreams[i].pFreeBlock) { + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; @@ -531,10 +751,14 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo); + pInfo->maxCacheSize = -1; pInfo->grpByUid = pPhyciNode->grpByUid; + pInfo->globalGrp = pPhyciNode->globalGrp; + if (!pInfo->grpByUid) { qError("only group cache by uid is supported now"); - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; + goto _error; } if (pPhyciNode->pGroupCols) { @@ -544,18 +768,20 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t } } - code = initGroupCacheBufPages(pInfo); + code = initGroupCacheBlockCache(pInfo); if (code) { goto _error; } - pInfo->pBlkHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (pInfo->pBlkHash == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + if (pInfo->globalGrp) { + pInfo->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (pInfo->pGrpHash == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } } - pInfo->pSessionHash = tSimpleHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + pInfo->pSessionHash = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (pInfo->pSessionHash == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _error; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index f3ca619a25..54f581a660 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -616,28 +616,37 @@ int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc) { SExchangeOperatorParam* pDExc = pDst->value; SExchangeOperatorParam* pSExc = pSrc->value; if (!pDExc->multiParams) { - SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam)); - if (NULL == pBatch) { - return TSDB_CODE_OUT_OF_MEMORY; + if (pSExc->basic.vgId != pDExc->basic.vgId) { + SExchangeOperatorBatchParam* pBatch = taosMemoryMalloc(sizeof(SExchangeOperatorBatchParam)); + if (NULL == pBatch) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pBatch->multiParams = true; + pBatch->pBatchs = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pBatch->pBatchs) { + taosMemoryFree(pBatch); + return TSDB_CODE_OUT_OF_MEMORY; + } + tSimpleHashPut(pBatch->pBatchs, &pDExc->basic.vgId, sizeof(pDExc->basic.vgId), &pDExc->basic, sizeof(pDExc->basic)); + tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic)); + destroyOperatorParamValue(pDst->value); + pDst->value = pBatch; + } else { + taosArrayAddAll(pDExc->basic.uidList, pSExc->basic.uidList); } - pBatch->multiParams = true; - pBatch->pBatchs = taosArrayInit(4, sizeof(SExchangeOperatorBasicParam)); - if (NULL == pBatch->pBatchs) { - taosMemoryFree(pBatch); - return TSDB_CODE_OUT_OF_MEMORY; - } - taosArrayPush(pBatch->pBatchs, &pDExc->basic); - taosArrayPush(pBatch->pBatchs, &pSExc->basic); - destroyOperatorParamValue(pDst->value); - pDst->value = pBatch; } else { SExchangeOperatorBatchParam* pBatch = pDst->value; - taosArrayPush(pBatch->pBatchs, &pSExc->basic); + SExchangeOperatorBasicParam* pBasic = tSimpleHashGet(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId)); + if (pBasic) { + taosArrayAddAll(pBasic->uidList, pSExc->basic.uidList); + } else { + tSimpleHashPut(pBatch->pBatchs, &pSExc->basic.vgId, sizeof(pSExc->basic.vgId), &pSExc->basic, sizeof(pSExc->basic)); + } } break; } default: - qError("invalid optype %d for merge operator params", (*ppDst)->opType); + qError("invalid optype %d for merge operator params", pDst->opType); return TSDB_CODE_INVALID_PARA; } @@ -678,7 +687,7 @@ int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pPara for (int32_t i = 0; i < childrenNum; ++i) { SOperatorParam* pChild = *(SOperatorParam**)taosArrayGet(pOperator->pOperatorParam->pChildren, i); if (pOperator->pDownstreamParams[pChild->downstreamIdx]) { - int32_t code = mergeOperatorParams(&pOperator->pDownstreamParams[pChild->downstreamIdx], pChild); + int32_t code = mergeOperatorParams(pOperator->pDownstreamParams[pChild->downstreamIdx], pChild); if (code) { return code; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0e6b51b142..ef3181f412 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1183,6 +1183,8 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { } static const char* jkGroupCacheLogicPlanGrpColsMayBeNull = "GroupColsMayBeNull"; +static const char* jkGroupCacheLogicPlanGroupByUid = "GroupByUid"; +static const char* jkGroupCacheLogicPlanGlobalGroup = "GlobalGroup"; static const char* jkGroupCacheLogicPlanGroupCols = "GroupCols"; static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) { @@ -1192,6 +1194,12 @@ static int32_t logicGroupCacheNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGrpColsMayBeNull, pNode->grpColsMayBeNull); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGroupByUid, pNode->grpByUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCacheLogicPlanGlobalGroup, pNode->globalGrp); + } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkGroupCacheLogicPlanGroupCols, pNode->pGroupCols); } @@ -1206,6 +1214,12 @@ static int32_t jsonToLogicGroupCacheNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGrpColsMayBeNull, &pNode->grpColsMayBeNull); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGroupByUid, &pNode->grpByUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCacheLogicPlanGlobalGroup, &pNode->globalGrp); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkGroupCacheLogicPlanGroupCols, &pNode->pGroupCols); } @@ -2912,12 +2926,24 @@ static int32_t jsonToPhysiDeleteNode(const SJson* pJson, void* pObj) { } static const char* jkGroupCachePhysiPlanGroupCols = "GroupColumns"; +static const char* jkGroupCachePhysiPlanGrpColsMayBeNull = "GroupColumnsMayBeNull"; +static const char* jkGroupCachePhysiPlanGroupByUid = "GroupByUid"; +static const char* jkGroupCachePhysiPlanGlobalGroup = "GlobalGroup"; static int32_t physiGroupCacheNodeToJson(const void* pObj, SJson* pJson) { const SGroupCachePhysiNode* pNode = (const SGroupCachePhysiNode*)pObj; int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGrpColsMayBeNull, pNode->grpColsMayBeNull); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGroupByUid, pNode->grpByUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkGroupCachePhysiPlanGlobalGroup, pNode->globalGrp); + } if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkGroupCachePhysiPlanGroupCols, pNode->pGroupCols); } @@ -2928,6 +2954,15 @@ static int32_t jsonToPhysiGroupCacheNode(const SJson* pJson, void* pObj) { SGroupCachePhysiNode* pNode = (SGroupCachePhysiNode*)pObj; int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGrpColsMayBeNull, &pNode->grpColsMayBeNull); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGroupByUid, &pNode->grpByUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkGroupCachePhysiPlanGlobalGroup, &pNode->globalGrp); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkGroupCachePhysiPlanGroupCols, &pNode->pGroupCols); } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 0a418958e7..6648186b4d 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -3527,6 +3527,9 @@ static int32_t msgToPhysiDeleteNode(STlvDecoder* pDecoder, void* pObj) { enum { PHY_GROUP_CACHE_CODE_BASE_NODE = 1, + PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL, + PHY_GROUP_CACHE_CODE_GROUP_BY_UID, + PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, PHY_GROUP_CACHE_CODE_GROUP_COLUMNS }; @@ -3537,6 +3540,16 @@ static int32_t physiGroupCacheNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_GROUP_CACHE_CODE_GROUP_COLUMNS, nodeListToMsg, pNode->pGroupCols); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL, pNode->grpColsMayBeNull); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GROUP_BY_UID, pNode->grpByUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_GROUP_CACHE_CODE_GLOBAL_GROUP, pNode->globalGrp); + } + return code; } @@ -3553,6 +3566,15 @@ static int32_t msgToPhysiGroupCacheNode(STlvDecoder* pDecoder, void* pObj) { case PHY_GROUP_CACHE_CODE_GROUP_COLUMNS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pGroupCols); break; + case PHY_GROUP_CACHE_CODE_GROUP_COLS_MAY_BE_NULL: + code = tlvDecodeBool(pTlv, &pNode->grpColsMayBeNull); + break; + case PHY_GROUP_CACHE_CODE_GROUP_BY_UID: + code = tlvDecodeBool(pTlv, &pNode->grpByUid); + break; + case PHY_GROUP_CACHE_CODE_GLOBAL_GROUP: + code = tlvDecodeBool(pTlv, &pNode->globalGrp); + break; default: break; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e1eea88d90..a259ceb98f 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3231,6 +3231,7 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** pGrpCache->node.dynamicOp = true; pGrpCache->grpColsMayBeNull = false; + pGrpCache->grpByUid = true; pGrpCache->node.pChildren = pChildren; pGrpCache->node.pTargets = nodesMakeList(); if (NULL == pGrpCache->node.pTargets) { @@ -3252,11 +3253,16 @@ static int32_t stbJoinOptCreateGroupCacheNode(SNodeList* pChildren, SLogicNode** } } + bool hasCond = false; SNode* pNode = NULL; FOREACH(pNode, pChildren) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; + if (pScan->node.pConditions) { + hasCond = true; + } pScan->node.pParent = (SLogicNode*)pGrpCache; } + pGrpCache->globalGrp = !hasCond; if (TSDB_CODE_SUCCESS == code) { *ppLogic = (SLogicNode*)pGrpCache; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index c3c989c2ff..ff0f90498f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -986,6 +986,8 @@ static int32_t createGroupCachePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh } pGrpCache->grpColsMayBeNull = pLogicNode->grpColsMayBeNull; + pGrpCache->grpByUid = pLogicNode->grpByUid; + pGrpCache->globalGrp = pLogicNode->globalGrp; SDataBlockDescNode* pChildDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc; int32_t code = TSDB_CODE_SUCCESS; if (TSDB_CODE_SUCCESS == code) {