From 6b53003f94aad0bd5d315fc15b01cfdf701c0fea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 12:18:38 +0800 Subject: [PATCH 01/10] refactor: add some logs. --- source/libs/executor/src/exchangeoperator.c | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c858536bb1..3c27c8064a 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -75,6 +75,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } while (1) { + qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); + tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -360,7 +362,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId); if (pExchangeInfo == NULL) { - qWarn("failed to acquire exchange operator, since it may have been released"); + qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo); taosMemoryFree(pMsg->pData); return TSDB_CODE_SUCCESS; } @@ -368,6 +370,9 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { int32_t index = pWrapper->sourceIndex; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); + int32_t v = 0; + sem_getvalue(&pExchangeInfo->ready, &v); + if (code == TSDB_CODE_SUCCESS) { pSourceDataInfo->pRsp = pMsg->pData; @@ -379,19 +384,20 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows); + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + pRsp->numOfRows, v, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s", pSourceDataInfo->taskId, index, tstrerror(code)); + qDebug("%s fetch rsp received, index:%d, error:%s, sem:%d, %p", pSourceDataInfo->taskId, index, tstrerror(code), v, pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; - tsem_post(&pExchangeInfo->ready); - taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); + code = tsem_post(&pExchangeInfo->ready); + ASSERT(code == TSDB_CODE_SUCCESS); + taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); return TSDB_CODE_SUCCESS; } @@ -557,6 +563,9 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; + int32_t value = 0; + sem_getvalue(&pExchangeInfo->ready, &value); + tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); From f92c1d4271293f5a1ec2e05b75699f62434aeb55 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 13:48:15 +0800 Subject: [PATCH 02/10] refactor: add some logs. --- source/libs/executor/src/exchangeoperator.c | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 3c27c8064a..cff0c50644 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -75,7 +75,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } while (1) { - qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); + int32_t v = 0; + sem_getvalue(&pExchangeInfo->ready, &v); + qDebug("prepare wait for ready, sem:(%d,%p), %p, %s", v, pExchangeInfo->ready.__align, pExchangeInfo, GET_TASKID(pTaskInfo)); tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { @@ -370,7 +372,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { int32_t index = pWrapper->sourceIndex; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); - int32_t v = 0; + int32_t v = 0, v1 = 0; sem_getvalue(&pExchangeInfo->ready, &v); if (code == TSDB_CODE_SUCCESS) { @@ -384,12 +386,13 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows, v, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + pRsp->numOfRows, v, pExchangeInfo->ready.__align, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s, sem:%d, %p", pSourceDataInfo->taskId, index, tstrerror(code), v, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, error:%s, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, tstrerror(code), v, + pExchangeInfo->ready.__align, pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; @@ -429,9 +432,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas return pTaskInfo->code; } - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %p, %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, - pSource->execId, sourceIndex, totalSources); + pSource->execId, pExchangeInfo, sourceIndex, totalSources); pMsg->header.vgId = htonl(pSource->addr.nodeId); pMsg->sId = htobe64(pSource->schedId); From c623336b6ed2f758e6be80703eacb7f9bdbe4ea8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 13:59:38 +0800 Subject: [PATCH 03/10] refactor: enable continue query when the buffer of sink node is lower or empty. --- source/libs/executor/src/dataDispatcher.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3d9757c96f..d4248fc420 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -138,7 +138,9 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, toDataCacheEntry(pDispatcher, pInput, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf); - *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false); + + int32_t status = updateStatus(pDispatcher); + *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY); return TSDB_CODE_SUCCESS; } From 05f84c184e0db320bfff1c946b732a016ad27b3c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 15:36:38 +0800 Subject: [PATCH 04/10] fix(query): fix syntax error. --- source/libs/executor/src/exchangeoperator.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index cff0c50644..04caccbf8f 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -77,7 +77,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn while (1) { int32_t v = 0; sem_getvalue(&pExchangeInfo->ready, &v); - qDebug("prepare wait for ready, sem:(%d,%p), %p, %s", v, pExchangeInfo->ready.__align, pExchangeInfo, GET_TASKID(pTaskInfo)); + qDebug("prepare wait for ready, sem:(%d,%p), %p, %s", v, (void*)pExchangeInfo->ready.__align, pExchangeInfo, GET_TASKID(pTaskInfo)); tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { @@ -387,12 +387,12 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { ASSERT(pRsp != NULL); qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows, v, pExchangeInfo->ready.__align, pExchangeInfo); + pRsp->numOfRows, v, (void*)pExchangeInfo->ready.__align, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; qDebug("%s fetch rsp received, index:%d, error:%s, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, tstrerror(code), v, - pExchangeInfo->ready.__align, pExchangeInfo); + (void*) pExchangeInfo->ready.__align, pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; From 0a8fc50024eca59f7d01f80b4c84b208308bd0e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 15:47:12 +0800 Subject: [PATCH 05/10] test:update test cases. --- source/client/test/clientTests.cpp | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 85814305bd..007e0bd931 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -692,7 +692,6 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } -#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -752,9 +751,6 @@ TEST(testCase, projection_query_tables) { taos_close(pConn); } - -#if 0 - TEST(testCase, tsbs_perf_test) { TdThread qid[20] = {0}; @@ -764,15 +760,16 @@ TEST(testCase, tsbs_perf_test) { getchar(); } +#endif TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); + TAOS_RES* pRes = taos_query(pConn, "use test"); taos_free_result(pRes); - pRes = taos_query(pConn, "select ts from st1"); + pRes = taos_query(pConn, "select * from meters"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -793,6 +790,7 @@ TEST(testCase, projection_query_stables) { taos_close(pConn); } +#if 0 TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); From 4d75bd69b255977f9cd46beb8c466125035f314d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 15:49:59 +0800 Subject: [PATCH 06/10] test: disable print result. --- source/client/test/clientTests.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 007e0bd931..78a14d3292 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -782,8 +782,8 @@ TEST(testCase, projection_query_stables) { char str[512] = {0}; while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); } taos_free_result(pRes); From ca1dcf4ea1a2f1d4934b670e7bbec4b09d8b63a7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 15:54:00 +0800 Subject: [PATCH 07/10] test: limit the output --- source/client/test/clientTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 78a14d3292..dcacaf3471 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -769,7 +769,7 @@ TEST(testCase, projection_query_stables) { TAOS_RES* pRes = taos_query(pConn, "use test"); taos_free_result(pRes); - pRes = taos_query(pConn, "select * from meters"); + pRes = taos_query(pConn, "select * from meters limit 30000000"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); From f9e338ba2d25bdb93b4a03b3f1320553804944ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 15:54:30 +0800 Subject: [PATCH 08/10] test: limit the output. --- source/client/test/clientTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index dcacaf3471..82202b8820 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -769,7 +769,7 @@ TEST(testCase, projection_query_stables) { TAOS_RES* pRes = taos_query(pConn, "use test"); taos_free_result(pRes); - pRes = taos_query(pConn, "select * from meters limit 30000000"); + pRes = taos_query(pConn, "select * from meters limit 50000000"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); From b27faba7efb06f9789c240be1285e15d4453af8a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 23 Nov 2022 17:07:45 +0800 Subject: [PATCH 09/10] fix:initialize maxts --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 15 +++-- tests/script/tsim/stream/state0.sim | 60 +++++++++++++++++++ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index de2a7b9dac..8db450ad50 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1849,12 +1849,12 @@ FETCH_NEXT_BLOCK: prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; printDataBlock(pDelBlock, "stream scan delete data"); if (pInfo->tqReader) { blockDataDestroy(pDelBlock); } if (pInfo->pDeleteDataRes->info.rows > 0) { + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; return pInfo->pDeleteDataRes; } else { goto FETCH_NEXT_BLOCK; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index dd02ce9cd4..7fd2fcb5b8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3364,22 +3364,23 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num } } -void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t waterMark, uint16_t type, - int32_t tsColIndex) { +void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, + STimeWindowAggSupp* pTwSup) { if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { SStreamPartitionOperatorInfo* pScanInfo = downstream->info; pScanInfo->tsColIndex = tsColIndex; } if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - initDownStream(downstream->pDownstream[0], pAggSup, waterMark, type, tsColIndex); + initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup); return; } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); + pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); } + pScanInfo->twAggSup = *pTwSup; } int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, @@ -4102,8 +4103,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, NULL); if (downstream) { - initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, - pInfo->primaryTsIndex); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); } return pOperator; @@ -4606,8 +4606,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL); - initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, - pInfo->primaryTsIndex); + initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index 87d7d4b7fc..7c922658c9 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -731,5 +731,65 @@ if $data32 != 1 then goto loop9 endi +sql drop stream if exists streams5; +sql drop database if exists test5; +sql create database test5; +sql use test5; +sql create table tb (ts timestamp, a int); +sql insert into tb values (now + 1m , 1 ); +sql create table b (c timestamp, d int, e int , f int, g double); +sql create stream streams0 trigger at_once into streamt as select _wstart c1, count(*) c2, max(a) c3 from tb state_window(a); +sql insert into b values(1648791213000,NULL,NULL,NULL,NULL); +sql select * from streamt order by c1, c2, c3; + +print data00:$data00 +print data01:$data01 + +sql insert into b values(1648791213000,NULL,NULL,NULL,NULL); +sql select * from streamt order by c1, c2, c3; + +print data00:$data00 +print data01:$data01 + +sql insert into b values(1648791213001,1,2,2,2.0); +sql insert into b values(1648791213002,1,3,3,3.0); +sql insert into tb values(1648791213003,1); + +sql select * from streamt; +print data00:$data00 +print data01:$data01 + +sql delete from b where c >= 1648791213001 and c <= 1648791213002; +sql insert into b values(1648791223003,2,2,3,1.0); insert into b values(1648791223002,2,3,3,3.0); +sql insert into tb values (now + 1m , 1 ); + +sql select * from streamt; +print data00:$data00 +print data01:$data01 + +sql insert into b(c,d) values (now + 6m , 6 ); +sql delete from b where c >= 1648791213001 and c <= 1648791233005;; + +$loop_count = 0 +loop10: + +sleep 200 + +sql select c2 from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop10 +endi + +if $data00 != 2 then + print =====data00=$data00 + goto loop10 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT From b460974f43d248edceee368e371d043136bbbdca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Nov 2022 10:32:31 +0800 Subject: [PATCH 10/10] fix(query): fix syntax error. --- source/libs/executor/src/exchangeoperator.c | 27 ++++++++------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 04caccbf8f..2c79b77e16 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -75,11 +75,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } while (1) { - int32_t v = 0; - sem_getvalue(&pExchangeInfo->ready, &v); - qDebug("prepare wait for ready, sem:(%d,%p), %p, %s", v, (void*)pExchangeInfo->ready.__align, pExchangeInfo, GET_TASKID(pTaskInfo)); - + qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -372,9 +370,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { int32_t index = pWrapper->sourceIndex; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); - int32_t v = 0, v1 = 0; - sem_getvalue(&pExchangeInfo->ready, &v); - if (code == TSDB_CODE_SUCCESS) { pSourceDataInfo->pRsp = pMsg->pData; @@ -386,22 +381,23 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows, v, (void*)pExchangeInfo->ready.__align, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + pRsp->numOfRows, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, tstrerror(code), v, - (void*) pExchangeInfo->ready.__align, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; - code = tsem_post(&pExchangeInfo->ready); - ASSERT(code == TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + code = TAOS_SYSTEM_ERROR(code); + qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo); + } taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); - return TSDB_CODE_SUCCESS; + return code; } int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { @@ -566,9 +562,6 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; - int32_t value = 0; - sem_getvalue(&pExchangeInfo->ready, &value); - tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);