From b16853983e17d57b8fe65484562960c768054257 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Nov 2022 19:27:35 +0800 Subject: [PATCH] fix(query): count the correct number of completed sources. --- source/libs/executor/src/executorimpl.c | 54 ++++++++++++++++++++----- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0e2f2189bd..a1890fd6f2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1854,17 +1854,22 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); while (1) { +// printf("1\n"); tsem_wait(&pExchangeInfo->ready); +// printf("2\n"); - int32_t completed = 0; +// int32_t completed = 0; for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { - completed += 1; +// printf("========:%d is completed\n", i); +// completed += 1; continue; } +// printf("index:%d --------3\n", i); if (pDataInfo->status != EX_SOURCE_DATA_READY) { +// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status); continue; } @@ -1879,15 +1884,28 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn // todo SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + + int32_t completed = 0; + for (int32_t k = 0; k < totalSources; ++k) { + SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + if (p->status == EX_SOURCE_DATA_EXHAUSTED) { + completed += 1; + } + } + qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - completed += 1; taosMemoryFreeClear(pDataInfo->pRsp); - break; -// continue; + + if (completed == totalSources) { + setAllSourcesCompleted(pOperator, startTs); + return; + } else { + break; + } } SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; @@ -1906,15 +1924,23 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); + int32_t completed = 0; if (pRsp->completed == 1) { + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + + for (int32_t k = 0; k < totalSources; ++k) { + SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + if (p->status == EX_SOURCE_DATA_EXHAUSTED) { + completed += 1; + } + } + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb, completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, - completed + 1, i + 1, totalSources); - completed += 1; - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + completed, i + 1, totalSources); } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", @@ -1940,12 +1966,18 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn return; } + int32_t completed = 0; + for (int32_t k = 0; k < totalSources; ++k) { + SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + if (p->status == EX_SOURCE_DATA_EXHAUSTED) { + completed += 1; + } + } + if (completed == totalSources) { setAllSourcesCompleted(pOperator, startTs); return; } - -// sched_yield(); } _error: