fix: group cache block cache issue

This commit is contained in:
dapan1121 2023-07-20 10:21:19 +08:00
parent b9b8fa1ba2
commit 58c5d5e70e
2 changed files with 42 additions and 27 deletions

View File

@ -46,6 +46,11 @@ typedef struct SGroupSeqBlkList {
int64_t endBlkId;
} SGroupSeqBlkList;
typedef struct SGroupBatchBlkList {
SRWLatch lock;
SArray* pBlkList;
} SGroupBatchBlkList;
typedef struct SGroupCacheData {
TdThreadMutex mutex;
SArray* waitQueue;
@ -55,8 +60,8 @@ typedef struct SGroupCacheData {
int32_t downstreamIdx;
int32_t vgId;
union {
SGroupSeqBlkList blkList;
SArray* pBlkList;
SGroupSeqBlkList seqList;
SGroupBatchBlkList batchList;
};
uint32_t fileId;
int64_t startOffset;

View File

@ -219,6 +219,8 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC
//TODO READ FROM FILE
code = TSDB_CODE_INVALID_PARA;
qError("block %" PRId64 "not found in dirty block list", blkId);
return code;
}
@ -369,10 +371,10 @@ static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheDat
pGroup->vgId = vgId;
pGroup->fileId = -1;
if (batchFetch) {
pGroup->pBlkList = taosArrayInit(10, POINTER_BYTES);
pGroup->batchList.pBlkList = taosArrayInit(10, POINTER_BYTES);
} else {
pGroup->blkList.startBlkId = -1;
pGroup->blkList.endBlkId = -1;
pGroup->seqList.startBlkId = -1;
pGroup->seqList.endBlkId = -1;
}
pGroup->startOffset = -1;
@ -422,19 +424,21 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk) {
if (batchFetch) {
taosArrayPush(pGroup->pBlkList, &pNewBlk->blkId);
qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->pBlkList));
taosWLockLatch(&pGroup->batchList.lock);
taosArrayPush(pGroup->batchList.pBlkList, &pNewBlk->blkId);
qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pBlkList));
taosWUnLockLatch(&pGroup->batchList.lock);
return TSDB_CODE_SUCCESS;
}
if (pGroup->blkList.endBlkId > 0) {
pGroup->blkList.endBlkId = pNewBlk->blkId;
if (pGroup->seqList.endBlkId > 0) {
atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId);
} else {
pGroup->blkList.startBlkId = pNewBlk->blkId;
pGroup->blkList.endBlkId = pNewBlk->blkId;
atomic_store_64(&pGroup->seqList.startBlkId, pNewBlk->blkId);
atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId);
}
qError("block added to group cache, total block num:%" PRId64, pGroup->blkList.endBlkId - pGroup->blkList.startBlkId + 1);
qError("block added to group cache, total block num:%" PRId64, pGroup->seqList.endBlkId - pGroup->seqList.startBlkId + 1);
return TSDB_CODE_SUCCESS;
}
@ -568,33 +572,39 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64
*got = true;
if (pGCache->batchFetch) {
SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList;
taosRLockLatch(&pBatchList->lock);
int64_t blkNum = taosArrayGetSize(pBatchList->pBlkList);
if (pSession->lastBlkId < 0) {
if (taosArrayGetSize(pSession->pGroupData->pBlkList) > 0) {
int64_t* pIdx = taosArrayGet(pSession->pGroupData->pBlkList, 0);
if (blkNum > 0) {
int64_t* pIdx = taosArrayGet(pBatchList->pBlkList, 0);
taosRUnLockLatch(&pBatchList->lock);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes);
pSession->lastBlkId = *pIdx;
return code;
}
} else if (pSession->lastBlkId < taosArrayGetSize(pSession->pGroupData->pBlkList)) {
int64_t* pIdx = taosArrayGet(pSession->pGroupData->pBlkList, pSession->lastBlkId + 1);
} else if (pSession->lastBlkId < blkNum) {
int64_t* pIdx = taosArrayGet(pBatchList->pBlkList, pSession->lastBlkId + 1);
taosRUnLockLatch(&pBatchList->lock);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
return code;
}
}
taosRUnLockLatch(&pBatchList->lock);
} else {
if (pSession->lastBlkId < 0) {
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->blkList.startBlkId);
int64_t startBlkId = atomic_load_64(&pSession->pGroupData->seqList.startBlkId);
if (startBlkId > 0) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = startBlkId;
return code;
}
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->blkList.endBlkId)) {
} else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->seqList.endBlkId)) {
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes);
pSession->lastBlkId++;
return code;
}
}
if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = NULL;