From 9932fa6bf0aab84b2ec6e0515e0ad67c2592e853 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Nov 2022 17:48:27 +0800 Subject: [PATCH 1/4] refactor: refactor to avoid spin operator in exchange operator. --- source/libs/executor/inc/executorimpl.h | 8 ++++---- source/libs/executor/src/executorimpl.c | 17 ++++++++++------- source/libs/executor/src/scanoperator.c | 6 +++++- source/util/src/tlrucache.c | 5 ++++- 4 files changed, 23 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5051bedf8d..cd9f29978d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -251,10 +251,10 @@ typedef struct SLimitInfo { } SLimitInfo; typedef struct SExchangeInfo { - SArray* pSources; - SArray* pSourceDataInfo; - tsem_t ready; - void* pTransporter; + SArray* pSources; + SArray* pSourceDataInfo; + tsem_t ready; + void* pTransporter; // SArray, result block list, used to keep the multi-block that // passed by downstream operator SArray* pResultBlockList; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7e93232e0c..b76698b0bc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1854,6 +1854,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); while (1) { + tsem_wait(&pExchangeInfo->ready); + int32_t completed = 0; for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); @@ -1874,6 +1876,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); + // todo SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 @@ -1883,7 +1886,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; taosMemoryFreeClear(pDataInfo->pRsp); - continue; + break; +// continue; } SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; @@ -1904,10 +1908,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d" - " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - ", total:%.2f Kb," - " completed:%d try next %d/%" PRIzu, + " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + ", total:%.2f Kb, completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, completed + 1, i + 1, totalSources); @@ -1915,7 +1917,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", + " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } @@ -1939,7 +1941,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn return; } - sched_yield(); +// sched_yield(); } _error: @@ -1970,6 +1972,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->cost.openCost = taosGetTimestampUs() - startTs; tsem_wait(&pExchangeInfo->ready); + tsem_post(&pExchangeInfo->ready); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ee935837d0..6d13048123 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -956,8 +956,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->pTaskInfo = pTaskInfo; pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024*128, -1, .5); - taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false); + if (pInfo->metaCache.pTableMetaEntryCache == NULL) { + code = terrno; + goto _error; + } + taosLRUCacheSetStrictCapacity(pInfo->metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, getTableScannerExecInfo); diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 0ea7258828..264883be4e 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -630,13 +630,16 @@ static int getDefaultCacheShardBits(size_t capacity) { SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) { if (numShardBits >= 20) { + terrno = TSDB_CODE_INVALID_PARA; return NULL; } if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) { + terrno = TSDB_CODE_INVALID_PARA; return NULL; } SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache)); if (!cache) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -648,7 +651,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard)); if (!cache->shards) { taosMemoryFree(cache); - + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } From d016fc9d47226f780f6694d297e0970f2a7bbbec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Nov 2022 18:27:26 +0800 Subject: [PATCH 2/4] fix(query): set retrieve completed flag for exchange operator. --- source/libs/executor/src/executorimpl.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b76698b0bc..0e2f2189bd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1933,6 +1933,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } } + if (completed == totalSources) { + setAllSourcesCompleted(pOperator, startTs); + } + return; } From b16853983e17d57b8fe65484562960c768054257 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Nov 2022 19:27:35 +0800 Subject: [PATCH 3/4] fix(query): count the correct number of completed sources. --- source/libs/executor/src/executorimpl.c | 54 ++++++++++++++++++++----- 1 file changed, 43 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0e2f2189bd..a1890fd6f2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1854,17 +1854,22 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); while (1) { +// printf("1\n"); tsem_wait(&pExchangeInfo->ready); +// printf("2\n"); - int32_t completed = 0; +// int32_t completed = 0; for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { - completed += 1; +// printf("========:%d is completed\n", i); +// completed += 1; continue; } +// printf("index:%d --------3\n", i); if (pDataInfo->status != EX_SOURCE_DATA_READY) { +// printf("-----------%d, status:%d, continue\n", i, pDataInfo->status); continue; } @@ -1879,15 +1884,28 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn // todo SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + + int32_t completed = 0; + for (int32_t k = 0; k < totalSources; ++k) { + SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + if (p->status == EX_SOURCE_DATA_EXHAUSTED) { + completed += 1; + } + } + qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - completed += 1; taosMemoryFreeClear(pDataInfo->pRsp); - break; -// continue; + + if (completed == totalSources) { + setAllSourcesCompleted(pOperator, startTs); + return; + } else { + break; + } } SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; @@ -1906,15 +1924,23 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); + int32_t completed = 0; if (pRsp->completed == 1) { + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + + for (int32_t k = 0; k < totalSources; ++k) { + SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + if (p->status == EX_SOURCE_DATA_EXHAUSTED) { + completed += 1; + } + } + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", total:%.2f Kb, completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, - completed + 1, i + 1, totalSources); - completed += 1; - pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; + completed, i + 1, totalSources); } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", @@ -1940,12 +1966,18 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn return; } + int32_t completed = 0; + for (int32_t k = 0; k < totalSources; ++k) { + SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + if (p->status == EX_SOURCE_DATA_EXHAUSTED) { + completed += 1; + } + } + if (completed == totalSources) { setAllSourcesCompleted(pOperator, startTs); return; } - -// sched_yield(); } _error: From 1da67a569df57017bcf3987ed32c504ea9965423 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Nov 2022 00:23:40 +0800 Subject: [PATCH 4/4] fix(query): set correct status. --- source/libs/executor/src/executorimpl.c | 62 +++++++++++---------- tests/script/tsim/parser/auto_create_tb.sim | 1 + 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a1890fd6f2..b7c3eed069 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1853,21 +1853,32 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn int64_t startTs = taosGetTimestampUs(); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + int32_t completed = 0; + for (int32_t k = 0; k < totalSources; ++k) { + SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); + if (p->status == EX_SOURCE_DATA_EXHAUSTED) { + completed += 1; + } + } + + if (completed == totalSources) { + setAllSourcesCompleted(pOperator, startTs); + return; + } + while (1) { // printf("1\n"); tsem_wait(&pExchangeInfo->ready); // printf("2\n"); -// int32_t completed = 0; for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { // printf("========:%d is completed\n", i); -// completed += 1; continue; } -// printf("index:%d --------3\n", i); +// printf("index:%d - status:%d\n", i, pDataInfo->status); if (pDataInfo->status != EX_SOURCE_DATA_READY) { // printf("-----------%d, status:%d, continue\n", i, pDataInfo->status); continue; @@ -1885,33 +1896,27 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - - int32_t completed = 0; - for (int32_t k = 0; k < totalSources; ++k) { - SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); - if (p->status == EX_SOURCE_DATA_EXHAUSTED) { - completed += 1; - } - } +// printf("%d completed, try next\n", i); qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, - pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); + pExchangeInfo->loadInfo.totalRows, completed, i + 1, totalSources); taosMemoryFreeClear(pDataInfo->pRsp); - if (completed == totalSources) { - setAllSourcesCompleted(pOperator, startTs); - return; - } else { - break; - } +// if (completed == totalSources) { +// return; +// } else { +// break; +// } + break; } SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; int32_t index = 0; char* pStart = pRetrieveRsp->data; while (index++ < pRetrieveRsp->numOfBlocks) { + printf("results, numOfBLock: %d\n", pRetrieveRsp->numOfBlocks); SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false); code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart); if (code != 0) { @@ -1924,16 +1929,16 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, startTs, pOperator); - int32_t completed = 0; +// int32_t completed = 0; if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; - for (int32_t k = 0; k < totalSources; ++k) { - SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); - if (p->status == EX_SOURCE_DATA_EXHAUSTED) { - completed += 1; - } - } +// for (int32_t k = 0; k < totalSources; ++k) { +// SSourceDataInfo* p = taosArrayGet(pExchangeInfo->pSourceDataInfo, k); +// if (p->status == EX_SOURCE_DATA_EXHAUSTED) { +// completed += 1; +// } +// } qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 @@ -1959,9 +1964,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } } - if (completed == totalSources) { - setAllSourcesCompleted(pOperator, startTs); - } +// if (completed == totalSources) { +// setAllSourcesCompleted(pOperator, startTs); +// } return; } @@ -1975,7 +1980,6 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } if (completed == totalSources) { - setAllSourcesCompleted(pOperator, startTs); return; } } diff --git a/tests/script/tsim/parser/auto_create_tb.sim b/tests/script/tsim/parser/auto_create_tb.sim index 312964a1ab..7803557bdc 100644 --- a/tests/script/tsim/parser/auto_create_tb.sim +++ b/tests/script/tsim/parser/auto_create_tb.sim @@ -186,6 +186,7 @@ endi sql select t1, count(*), first(c9) from $stb partition by t1 order by t1 asc slimit 3 if $rows != 3 then + print expect 3, actual: $rows return -1 endi if $data(1)[1] != 1 then