diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index f16f341c5f..17e18710f2 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1037,16 +1037,13 @@ static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator SGroupCacheOperatorInfo* pGCache = pOperator->info; while (continueFetch && TSDB_CODE_SUCCESS == code) { - int32_t code = getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes); - if (TSDB_CODE_SUCCESS != code) { - return code; - } + QRY_ERR_RET(getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes)); if (NULL == *ppRes) { - code = handleDownstreamFetchDone(pOperator, pSession); + QRY_ERR_RET(handleDownstreamFetchDone(pOperator, pSession)); break; } else { - code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch); + QRY_ERR_RET(handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch)); } } @@ -1129,6 +1126,11 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) { + // FOR NOW, IT'S ERROR TO REACH HERE +#if 1 + qError("should not enter session wait"); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; +#else SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheData* pGroup = pSession->pGroupData; int32_t code = TSDB_CODE_SUCCESS; @@ -1181,6 +1183,7 @@ _return: } return code; +#endif } @@ -1206,6 +1209,10 @@ static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t s code = getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes); goto _return; + } else { + // FOR NOW, SHOULD NOT REACH HERE + qError("Invalid fetchSessionId:%" PRId64 ", currentSessionId:%" PRId64, pCtx->fetchSessionId, sessionId); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } if (locked) { @@ -1292,6 +1299,9 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorP qError("fail to get session %" PRId64 " from pSessions", pGcParam->sessionId); QRY_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR); } + + qDebug("session:%" PRId64 " initialized, downstreamIdx:%d, vgId:%d, tbUid:%" PRId64 ", needCache:%d", + pGcParam->sessionId, pGcParam->downstreamIdx, pGcParam->vgId, pGcParam->tbUid, pGcParam->needCache); return TSDB_CODE_SUCCESS; } @@ -1323,7 +1333,7 @@ static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock } } - code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes); + QRY_ERR_RET(getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes)); if (NULL == *ppRes) { qDebug("session %" PRId64 " in downstream %d total got %" PRId64 " rows", pGcParam->sessionId, pCtx->id, pSession->resRows); code = taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));