diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 37b689a419..13a0312f70 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -131,6 +131,8 @@ typedef struct SGroupCacheOperatorInfo { SGroupColsInfo groupColsInfo; bool globalGrp; bool grpByUid; + bool batchFetch; + bool fetchDone; SGcDownstreamCtx* pDownstreams; SGcBlkCacheInfo blkCache; SHashObj* pGrpHash; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 53311a451e..0b2d1d2747 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -29,15 +29,24 @@ int64_t gSessionId = 0; +void freeVgTableList(void* ptr) { + taosArrayDestroy(*(SArray**)ptr); +} + + static void destroyDynQueryCtrlOperator(void* param) { SDynQueryCtrlOperatorInfo* pDyn = (SDynQueryCtrlOperatorInfo*)param; qDebug("dynQueryCtrl exec info, prevBlk:%" PRId64 ", prevRows:%" PRId64 ", postBlk:%" PRId64 ", postRows:%" PRId64, pDyn->execInfo.prevBlkNum, pDyn->execInfo.prevBlkRows, pDyn->execInfo.postBlkNum, pDyn->execInfo.postBlkRows); - tSimpleHashClear(pDyn->stbJoin.ctx.prev.leftVg); - tSimpleHashClear(pDyn->stbJoin.ctx.prev.rightVg); - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftVg); - tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightVg); + if (pDyn->stbJoin.ctx.prev.leftVg) { + tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.leftVg, freeVgTableList); + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.leftVg); + } + if (pDyn->stbJoin.ctx.prev.rightVg) { + tSimpleHashSetFreeFp(pDyn->stbJoin.ctx.prev.rightVg, freeVgTableList); + tSimpleHashCleanup(pDyn->stbJoin.ctx.prev.rightVg); + } taosMemoryFreeClear(param); } @@ -55,6 +64,8 @@ static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, if (NULL == taosArrayPush((*ppRes)->pChildren, &pChild)) { return TSDB_CODE_OUT_OF_MEMORY; } + } else { + (*ppRes)->pChildren = NULL; } SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam)); @@ -497,6 +508,8 @@ static void seqBatchJoinLaunchRetrieve(SOperatorInfo* pOperator, SSDataBlock** p } seqJoinLaunchPostJoin(pOperator, ppRes); + pPrev->pListHead->readIdx++; + if (*ppRes) { return; } @@ -533,23 +546,16 @@ SSDataBlock* seqBatchStableJoin(SOperatorInfo* pOperator) { return pRes; } -void freeVgTableList(void* ptr) { - taosArrayDestroy(*(SArray**)ptr); -} - - int32_t initBatchStbJoinVgHash(SStbJoinPrevJoinCtx* pPrev) { pPrev->leftVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); if (NULL == pPrev->leftVg) { return TSDB_CODE_OUT_OF_MEMORY; } - tSimpleHashSetFreeFp(pPrev->leftVg, freeVgTableList); pPrev->rightVg = tSimpleHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); if (NULL == pPrev->rightVg) { return TSDB_CODE_OUT_OF_MEMORY; } - tSimpleHashSetFreeFp(pPrev->rightVg, freeVgTableList); return TSDB_CODE_SUCCESS; } @@ -576,7 +582,7 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 switch (pInfo->qType) { case DYN_QTYPE_STB_HASH: memcpy(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); - pInfo->stbJoin.basic.batchJoin = false; + pInfo->stbJoin.basic.batchJoin = true; if (pInfo->stbJoin.basic.batchJoin) { code = initBatchStbJoinVgHash(&pInfo->stbJoin.ctx.prev); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 8458b3e881..81a77dba27 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -236,22 +236,20 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp for (int32_t i = 0; i < num; ++i) { SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i); - code = addNewGroupToVgHash(pCtx->pVgTbHash, pNew); - if (code) { - goto _return; + if (!pGCache->batchFetch) { + code = addNewGroupToVgHash(pCtx->pVgTbHash, pNew); + if (code) { + goto _return; + } } - if (num > 1) { - if (0 == i) { - pDst = pNew->pParam; - } else { - code = mergeOperatorParams(pDst, pNew->pParam); - if (code) { - goto _return; - } - } - } else { + if (NULL == pDst) { pDst = pNew->pParam; + } else if (pNew->pParam) { + code = mergeOperatorParams(pDst, pNew->pParam); + if (code) { + goto _return; + } } } @@ -352,12 +350,23 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId)); if (NULL == pGroup) { - qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.groupId); - return TSDB_CODE_INVALID_PARA; + if (pGCache->batchFetch) { + SOperatorParam fakeParam = {0}; + fakeParam.downstreamIdx = pSession->downstreamIdx; + code = addNewGroupData(pOperator, &fakeParam, &pGroup, -1, pBlock->info.id.groupId); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } else { + qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.groupId); + return TSDB_CODE_INVALID_PARA; + } + } + + if (!pGCache->batchFetch) { + handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId); } - handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId); - SGcBlkBufInfo newBlkBuf; code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf); if (code) { @@ -386,16 +395,26 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; - int32_t uidNum = 0; - SGcVgroupCtx* pVgCtx = NULL; - int32_t iter = 0; - while (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter)) { - uidNum = taosArrayGetSize(pVgCtx->pTbList); - for (int32_t i = 0; i < uidNum; ++i) { - SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i); - handleGroupFetchDone(pNew->pGroup); + if (pGCache->batchFetch) { + atomic_store_8((int8_t*)&pGCache->fetchDone, true); + + SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; + SGroupCacheData* pGroup = NULL; + while (pGroup = taosHashIterate(pGrpHash, pGroup)) { + handleGroupFetchDone(pGroup); + } + } else { + int32_t uidNum = 0; + SGcVgroupCtx* pVgCtx = NULL; + int32_t iter = 0; + while (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter)) { + uidNum = taosArrayGetSize(pVgCtx->pTbList); + for (int32_t i = 0; i < uidNum; ++i) { + SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i); + handleGroupFetchDone(pNew->pGroup); + } + taosArrayClear(pVgCtx->pTbList); } - taosArrayClear(pVgCtx->pTbList); } taosHashClear(pCtx->pWaitSessions); @@ -575,10 +594,10 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcOperatorParam* pGcParam) { +static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId) { taosThreadMutexInit(&pGroup->mutex, NULL); - pGroup->downstreamIdx = pGcParam->downstreamIdx; - pGroup->vgId = pGcParam->vgId; + pGroup->downstreamIdx = downstreamIdx; + pGroup->vgId = vgId; pGroup->fileId = -1; pGroup->startBlkId = -1; pGroup->endBlkId = -1; @@ -586,18 +605,17 @@ static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheDat pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); } -static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp) { +static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) { SGroupCacheOperatorInfo* pGCache = pOperator->info; - SGcOperatorParam* pGcParam = pParam->value; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGroupCacheData grpData = {0}; - initNewGroupData(pCtx, &grpData, pGcParam); + initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId); while (true) { - if (0 != taosHashPut(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid), &grpData, sizeof(grpData))) { + if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) { if (terrno == TSDB_CODE_DUP_KEY) { - *ppGrp = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); + *ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); if (*ppGrp) { break; } @@ -606,12 +624,12 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* } } - *ppGrp = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); - if (*ppGrp) { + *ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid)); + if (*ppGrp && pParam->pChildren) { SGcNewGroupInfo newGroup; newGroup.pGroup = *ppGrp; - newGroup.vgId = pGcParam->vgId; - newGroup.uid = pGcParam->tbUid; + newGroup.vgId = vgId; + newGroup.uid = uid; newGroup.pParam = taosArrayGetP(pParam->pChildren, 0); taosWLockLatch(&pCtx->grpLock); @@ -646,7 +664,7 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid)); if (NULL == pGroup) { - code = addNewGroupData(pOperator, pParam, &pGroup); + code = addNewGroupData(pOperator, pParam, &pGroup, pGcParam->vgId, pGcParam->tbUid); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -777,6 +795,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->maxCacheSize = -1; pInfo->grpByUid = pPhyciNode->grpByUid; pInfo->globalGrp = pPhyciNode->globalGrp; + pInfo->batchFetch = false; if (!pInfo->grpByUid) { qError("only group cache by uid is supported now");