fix: group cache retrive block failed issue

This commit is contained in:
dapan1121 2024-09-18 14:17:02 +08:00
parent d6fbb3dc2a
commit 41f8f211f0
1 changed files with 17 additions and 7 deletions

View File

@ -1037,16 +1037,13 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator
SGroupCacheOperatorInfo* pGCache = pOperator->info;
while (continueFetch && TSDB_CODE_SUCCESS == code) {
int32_t code = getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
QRY_ERR_RET(getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes));
if (NULL == *ppRes) {
code = handleDownstreamFetchDone(pOperator, pSession);
QRY_ERR_RET(handleDownstreamFetchDone(pOperator, pSession));
break;
} else {
code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch);
QRY_ERR_RET(handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch));
}
}
@ -1129,6 +1126,11 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64
static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
// FOR NOW, IT'S ERROR TO REACH HERE
#if 1
qError("should not enter session wait");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
#else
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData* pGroup = pSession->pGroupData;
int32_t code = TSDB_CODE_SUCCESS;
@ -1181,6 +1183,7 @@ _return:
}
return code;
#endif
}
@ -1206,6 +1209,10 @@ static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t s
code = getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
goto _return;
} else {
// FOR NOW, SHOULD NOT REACH HERE
qError("Invalid fetchSessionId:%" PRId64 ", currentSessionId:%" PRId64, pCtx->fetchSessionId, sessionId);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (locked) {
@ -1292,6 +1299,9 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP
qError("fail to get session %" PRId64 " from pSessions", pGcParam->sessionId);
QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
qDebug("session:%" PRId64 " initialized, downstreamIdx:%d, vgId:%d, tbUid:%" PRId64 ", needCache:%d",
pGcParam->sessionId, pGcParam->downstreamIdx, pGcParam->vgId, pGcParam->tbUid, pGcParam->needCache);
return TSDB_CODE_SUCCESS;
}
@ -1323,7 +1333,7 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock
}
}
code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes);
QRY_ERR_RET(getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes));
if (NULL == *ppRes) {
qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows);
code = taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));