diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 45e0d01be6..1278fa5553 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -46,6 +46,11 @@ typedef struct SGroupSeqBlkList { int64_t endBlkId; } SGroupSeqBlkList; +typedef struct SGroupBatchBlkList { + SRWLatch lock; + SArray* pBlkList; +} SGroupBatchBlkList; + typedef struct SGroupCacheData { TdThreadMutex mutex; SArray* waitQueue; @@ -55,8 +60,8 @@ typedef struct SGroupCacheData { int32_t downstreamIdx; int32_t vgId; union { - SGroupSeqBlkList blkList; - SArray* pBlkList; + SGroupSeqBlkList seqList; + SGroupBatchBlkList batchList; }; uint32_t fileId; int64_t startOffset; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 6b2dba25fa..117763b40a 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -219,6 +219,8 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC //TODO READ FROM FILE code = TSDB_CODE_INVALID_PARA; + qError("block %" PRId64 "not found in dirty block list", blkId); + return code; } @@ -369,10 +371,10 @@ static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheDat pGroup->vgId = vgId; pGroup->fileId = -1; if (batchFetch) { - pGroup->pBlkList = taosArrayInit(10, POINTER_BYTES); + pGroup->batchList.pBlkList = taosArrayInit(10, POINTER_BYTES); } else { - pGroup->blkList.startBlkId = -1; - pGroup->blkList.endBlkId = -1; + pGroup->seqList.startBlkId = -1; + pGroup->seqList.endBlkId = -1; } pGroup->startOffset = -1; @@ -422,19 +424,21 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk) { if (batchFetch) { - taosArrayPush(pGroup->pBlkList, &pNewBlk->blkId); - qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->pBlkList)); + taosWLockLatch(&pGroup->batchList.lock); + taosArrayPush(pGroup->batchList.pBlkList, &pNewBlk->blkId); + qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pBlkList)); + taosWUnLockLatch(&pGroup->batchList.lock); return TSDB_CODE_SUCCESS; } - if (pGroup->blkList.endBlkId > 0) { - pGroup->blkList.endBlkId = pNewBlk->blkId; + if (pGroup->seqList.endBlkId > 0) { + atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId); } else { - pGroup->blkList.startBlkId = pNewBlk->blkId; - pGroup->blkList.endBlkId = pNewBlk->blkId; + atomic_store_64(&pGroup->seqList.startBlkId, pNewBlk->blkId); + atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId); } - qError("block added to group cache, total block num:%" PRId64, pGroup->blkList.endBlkId - pGroup->blkList.startBlkId + 1); + qError("block added to group cache, total block num:%" PRId64, pGroup->seqList.endBlkId - pGroup->seqList.startBlkId + 1); return TSDB_CODE_SUCCESS; } @@ -568,32 +572,38 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 *got = true; if (pGCache->batchFetch) { + SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList; + taosRLockLatch(&pBatchList->lock); + int64_t blkNum = taosArrayGetSize(pBatchList->pBlkList); if (pSession->lastBlkId < 0) { - if (taosArrayGetSize(pSession->pGroupData->pBlkList) > 0) { - int64_t* pIdx = taosArrayGet(pSession->pGroupData->pBlkList, 0); + if (blkNum > 0) { + int64_t* pIdx = taosArrayGet(pBatchList->pBlkList, 0); + taosRUnLockLatch(&pBatchList->lock); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes); pSession->lastBlkId = *pIdx; return code; } - } else if (pSession->lastBlkId < taosArrayGetSize(pSession->pGroupData->pBlkList)) { - int64_t* pIdx = taosArrayGet(pSession->pGroupData->pBlkList, pSession->lastBlkId + 1); + } else if (pSession->lastBlkId < blkNum) { + int64_t* pIdx = taosArrayGet(pBatchList->pBlkList, pSession->lastBlkId + 1); + taosRUnLockLatch(&pBatchList->lock); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes); pSession->lastBlkId++; return code; } - } - - if (pSession->lastBlkId < 0) { - int64_t startBlkId = atomic_load_64(&pSession->pGroupData->blkList.startBlkId); - if (startBlkId > 0) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); - pSession->lastBlkId = startBlkId; + taosRUnLockLatch(&pBatchList->lock); + } else { + if (pSession->lastBlkId < 0) { + int64_t startBlkId = atomic_load_64(&pSession->pGroupData->seqList.startBlkId); + if (startBlkId > 0) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); + pSession->lastBlkId = startBlkId; + return code; + } + } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->seqList.endBlkId)) { + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); + pSession->lastBlkId++; return code; } - } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->blkList.endBlkId)) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); - pSession->lastBlkId++; - return code; } if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {