diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 13a0312f70..45e0d01be6 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -41,6 +41,11 @@ typedef struct SGcVgroupCtx { uint32_t fileId; } SGcVgroupCtx; +typedef struct SGroupSeqBlkList { + int64_t startBlkId; + int64_t endBlkId; +} SGroupSeqBlkList; + typedef struct SGroupCacheData { TdThreadMutex mutex; SArray* waitQueue; @@ -49,9 +54,11 @@ typedef struct SGroupCacheData { SGcVgroupCtx* pVgCtx; int32_t downstreamIdx; int32_t vgId; + union { + SGroupSeqBlkList blkList; + SArray* pBlkList; + }; uint32_t fileId; - int64_t startBlkId; - int64_t endBlkId; int64_t startOffset; } SGroupCacheData; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 2961a380eb..b184281f38 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -924,18 +924,30 @@ static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTa return pResBlock; } -static void doSetQualifiedUid(SArray* pUidList, const SArray* pUidTagList, bool* pResultList) { +static int32_t doSetQualifiedUid(STableListInfo* pListInfo, SArray* pUidList, const SArray* pUidTagList, bool* pResultList, bool addUid) { taosArrayClear(pUidList); + STableKeyInfo info = {.uid = 0, .groupId = 0}; int32_t numOfTables = taosArrayGetSize(pUidTagList); for (int32_t i = 0; i < numOfTables; ++i) { - uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid; - qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]); - if (pResultList[i]) { - taosArrayPush(pUidList, &uid); + uint64_t uid = ((STUidTagInfo*)taosArrayGet(pUidTagList, i))->uid; + qDebug("tagfilter get uid:%" PRId64 ", res:%d", uid, pResultList[i]); + + info.uid = uid; + void* p = taosArrayPush(pListInfo->pTableList, &info); + if (p == NULL) { + taosArrayDestroy(pUidList); + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (addUid) { + taosArrayPush(pUidList, &uid); + } } } + + return TSDB_CODE_SUCCESS; } static void copyExistedUids(SArray* pUidTagList, const SArray* pUidList) { @@ -952,7 +964,7 @@ static void copyExistedUids(SArray* pUidTagList, const SArray* pUidList) { } static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode, - SIdxFltStatus status, SStorageAPI* pAPI) { + SIdxFltStatus status, SStorageAPI* pAPI, bool addUid) { if (pTagCond == NULL) { return TSDB_CODE_SUCCESS; } @@ -1041,7 +1053,11 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN goto end; } - doSetQualifiedUid(pUidList, pUidTagList, (bool*)output.columnData->pData); + code = doSetQualifiedUid(pListInfo, pUidList, pUidTagList, (bool*)output.columnData->pData, addUid); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + goto end; + } end: taosHashCleanup(ctx.colHash); @@ -1059,6 +1075,7 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI) { int32_t code = TSDB_CODE_SUCCESS; size_t numOfTables = 0; + bool listAdded = false; pListInfo->idInfo.suid = pScanNode->suid; pListInfo->idInfo.tableType = pScanNode->tableType; @@ -1071,10 +1088,11 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S if (pStorageAPI->metaFn.isTableExisted(pVnode, pScanNode->uid)) { taosArrayPush(pUidList, &pScanNode->uid); } - code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI); + code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, false); if (code != TSDB_CODE_SUCCESS) { goto _end; } + listAdded = true; } else { T_MD5_CTX context = {0}; @@ -1115,10 +1133,11 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S } } - code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI); + code = doFilterByTagCond(pListInfo, pUidList, pTagCond, pVnode, status, pStorageAPI, tsTagFilterCache); if (code != TSDB_CODE_SUCCESS) { goto _end; } + listAdded = true; // let's add the filter results into meta-cache numOfTables = taosArrayGetSize(pUidList); @@ -1139,17 +1158,19 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S } _end: - numOfTables = taosArrayGetSize(pUidList); - for (int i = 0; i < numOfTables; i++) { - STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(pUidList, i), .groupId = 0}; + if (!listAdded) { + numOfTables = taosArrayGetSize(pUidList); + for (int i = 0; i < numOfTables; i++) { + STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(pUidList, i), .groupId = 0}; - void* p = taosArrayPush(pListInfo->pTableList, &info); - if (p == NULL) { - taosArrayDestroy(pUidList); - return TSDB_CODE_OUT_OF_MEMORY; + void* p = taosArrayPush(pListInfo->pTableList, &info); + if (p == NULL) { + taosArrayDestroy(pUidList); + return TSDB_CODE_OUT_OF_MEMORY; + } + + qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr); } - - qTrace("tagfilter get uid:%" PRIu64 ", %s", info.uid, idstr); } taosArrayDestroy(pUidList); diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index cc09fc4860..6b2dba25fa 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -363,13 +363,18 @@ static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* } -static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId) { +static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch) { taosThreadMutexInit(&pGroup->mutex, NULL); pGroup->downstreamIdx = downstreamIdx; pGroup->vgId = vgId; pGroup->fileId = -1; - pGroup->startBlkId = -1; - pGroup->endBlkId = -1; + if (batchFetch) { + pGroup->pBlkList = taosArrayInit(10, POINTER_BYTES); + } else { + pGroup->blkList.startBlkId = -1; + pGroup->blkList.endBlkId = -1; + + } pGroup->startOffset = -1; pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); } @@ -379,7 +384,7 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx]; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGroupCacheData grpData = {0}; - initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId); + initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch); while (true) { if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) { @@ -415,6 +420,24 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* return TSDB_CODE_SUCCESS; } +static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk) { + if (batchFetch) { + taosArrayPush(pGroup->pBlkList, &pNewBlk->blkId); + qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->pBlkList)); + return TSDB_CODE_SUCCESS; + } + + if (pGroup->blkList.endBlkId > 0) { + pGroup->blkList.endBlkId = pNewBlk->blkId; + } else { + pGroup->blkList.startBlkId = pNewBlk->blkId; + pGroup->blkList.endBlkId = pNewBlk->blkId; + } + + qError("block added to group cache, total block num:%" PRId64, pGroup->blkList.endBlkId - pGroup->blkList.startBlkId + 1); + + return TSDB_CODE_SUCCESS; +} static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) { int32_t code = TSDB_CODE_SUCCESS; @@ -446,12 +469,10 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD if (code) { return code; } - - if (pGroup->endBlkId > 0) { - pGroup->endBlkId = newBlkBuf.blkId; - } else { - pGroup->startBlkId = newBlkBuf.blkId; - pGroup->endBlkId = newBlkBuf.blkId; + + code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf); + if (code) { + return code; } notifyWaitingSessions(pGroup->waitQueue); @@ -541,6 +562,50 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator return code; } +static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes, bool* got) { + int32_t code = TSDB_CODE_SUCCESS; + SGroupCacheOperatorInfo* pGCache = pOperator->info; + *got = true; + + if (pGCache->batchFetch) { + if (pSession->lastBlkId < 0) { + if (taosArrayGetSize(pSession->pGroupData->pBlkList) > 0) { + int64_t* pIdx = taosArrayGet(pSession->pGroupData->pBlkList, 0); + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes); + pSession->lastBlkId = *pIdx; + return code; + } + } else if (pSession->lastBlkId < taosArrayGetSize(pSession->pGroupData->pBlkList)) { + int64_t* pIdx = taosArrayGet(pSession->pGroupData->pBlkList, pSession->lastBlkId + 1); + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes); + pSession->lastBlkId++; + return code; + } + } + + if (pSession->lastBlkId < 0) { + int64_t startBlkId = atomic_load_64(&pSession->pGroupData->blkList.startBlkId); + if (startBlkId > 0) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); + pSession->lastBlkId = startBlkId; + return code; + } + } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->blkList.endBlkId)) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); + pSession->lastBlkId++; + return code; + } + + if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { + *ppRes = NULL; + return code; + } + + *got = false; + return code; +} + + static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheData* pGroup = pSession->pGroupData; @@ -573,44 +638,21 @@ static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstr taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId)); - if (pSession->lastBlkId < 0) { - int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); - if (startBlkId > 0) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); - pSession->lastBlkId = startBlkId; - } else if (pGroup->fetchDone) { - *ppRes = NULL; - } - } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); - pSession->lastBlkId++; - } else if (pGroup->fetchDone) { - *ppRes = NULL; - } - - return code; + bool got = false; + return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got); } -static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { + +static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SGroupCacheOperatorInfo* pGCache = pOperator->info; bool locked = false; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; while (true) { - if (pSession->lastBlkId < 0) { - int64_t startBlkId = atomic_load_64(&pSession->pGroupData->startBlkId); - if (startBlkId > 0) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); - pSession->lastBlkId = startBlkId; - goto _return; - } - } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->endBlkId)) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); - pSession->lastBlkId++; - goto _return; - } else if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { - *ppRes = NULL; + bool got = false; + code = getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got); + if (TSDB_CODE_SUCCESS != code || got) { goto _return; } @@ -723,7 +765,7 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock } } - code = getBlkFromSessionCacheImpl(pOperator, pGcParam->sessionId, pSession, ppRes); + code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes); if (NULL == ppRes) { taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId)); } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index fd1e2fe463..7bdea2a9c6 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -447,6 +447,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator while (endPos == dataBlock->info.rows) { SOperatorInfo* ds = pOperator->pDownstream[whichChild]; dataBlock = getNextBlockFromDownstream(pOperator, whichChild); + qError("merge join %s got block for same ts, rows:%" PRId64, whichChild == 0 ? "left" : "right", dataBlock ? dataBlock->info.rows : 0); if (whichChild == 0) { pJoinInfo->leftPos = 0; pJoinInfo->pLeft = dataBlock; @@ -656,16 +657,15 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0); pJoinInfo->leftPos = 0; + qError("merge join left got block, rows:%" PRId64, pJoinInfo->pLeft ? pJoinInfo->pLeft->info.rows : 0); + if (pJoinInfo->pLeft == NULL) { - qError("merge join left got empty block"); if (pOperator->pOperatorParam && ((SSortMergeJoinOperatorParam*)pOperator->pOperatorParam->value)->initParam) { leftEmpty = true; } else { setMergeJoinDone(pOperator); return false; } - } else { - qError("merge join left got block"); } } @@ -673,12 +673,12 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1); pJoinInfo->rightPos = 0; + qError("merge join right got block, rows:%" PRId64, pJoinInfo->pRight ? pJoinInfo->pRight->info.rows : 0); + if (pJoinInfo->pRight == NULL) { - qError("merge join right got empty block"); setMergeJoinDone(pOperator); return false; } else { - qError("merge join right got block"); if (leftEmpty) { setMergeJoinDone(pOperator); return false;