From 6b53003f94aad0bd5d315fc15b01cfdf701c0fea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Nov 2022 12:18:38 +0800 Subject: [PATCH] refactor: add some logs. --- source/libs/executor/src/exchangeoperator.c | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c858536bb1..3c27c8064a 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -75,6 +75,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn } while (1) { + qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo)); + tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -360,7 +362,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId); if (pExchangeInfo == NULL) { - qWarn("failed to acquire exchange operator, since it may have been released"); + qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo); taosMemoryFree(pMsg->pData); return TSDB_CODE_SUCCESS; } @@ -368,6 +370,9 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { int32_t index = pWrapper->sourceIndex; SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index); + int32_t v = 0; + sem_getvalue(&pExchangeInfo->ready, &v); + if (code == TSDB_CODE_SUCCESS) { pSourceDataInfo->pRsp = pMsg->pData; @@ -379,19 +384,20 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); ASSERT(pRsp != NULL); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows); + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, + pRsp->numOfRows, v, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s", pSourceDataInfo->taskId, index, tstrerror(code)); + qDebug("%s fetch rsp received, index:%d, error:%s, sem:%d, %p", pSourceDataInfo->taskId, index, tstrerror(code), v, pExchangeInfo); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; - tsem_post(&pExchangeInfo->ready); - taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); + code = tsem_post(&pExchangeInfo->ready); + ASSERT(code == TSDB_CODE_SUCCESS); + taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId); return TSDB_CODE_SUCCESS; } @@ -557,6 +563,9 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; + int32_t value = 0; + sem_getvalue(&pExchangeInfo->ready, &value); + tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);