From b460974f43d248edceee368e371d043136bbbdca Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 24 Nov 2022 10:32:31 +0800 Subject: [PATCH] fix(query): fix syntax error. --- source/libs/executor/src/exchangeoperator.c | 27 ++++++++------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 04caccbf8f..2c79b77e16 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -75,11 +75,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } while (1) { - int32_t v = 0; - sem_getvalue(&pExchangeInfo->ready, &v); - qDebug("prepare wait for ready, sem:(%d,%p), %p, %s", v, (void*)pExchangeInfo->ready.__align, pExchangeInfo, GET_TASKID(pTaskInfo)); - + qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } @@ -372,9 +370,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { int32_t index = pWrapper->sourceIndex; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); - int32_t v = 0, v1 = 0; - sem_getvalue(&pExchangeInfo->ready, &v); - if (code == TSDB_CODE_SUCCESS) { pSourceDataInfo->pRsp = pMsg->pData; @@ -386,22 +381,23 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows, v, (void*)pExchangeInfo->ready.__align, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + pRsp->numOfRows, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s, sem:(%d,%p), %p", pSourceDataInfo->taskId, index, tstrerror(code), v, - (void*) pExchangeInfo->ready.__align, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; - code = tsem_post(&pExchangeInfo->ready); - ASSERT(code == TSDB_CODE_SUCCESS); + if (code != TSDB_CODE_SUCCESS) { + code = TAOS_SYSTEM_ERROR(code); + qError("failed to invoke post when fetch rsp is ready, code:%s, %p", tstrerror(code), pExchangeInfo); + } taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); - return TSDB_CODE_SUCCESS; + return code; } int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { @@ -566,9 +562,6 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; - int32_t value = 0; - sem_getvalue(&pExchangeInfo->ready, &value); - tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);