refactor: add some logs.

This commit is contained in:
Haojun Liao 2022-11-23 12:18:38 +08:00
parent ee371ba6c3
commit 6b53003f94
1 changed files with 15 additions and 6 deletions

View File

@ -75,6 +75,8 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
}
while (1) {
qDebug("prepare wait for ready, %p, %s", pExchangeInfo, GET_TASKID(pTaskInfo));
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
@ -360,7 +362,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
if (pExchangeInfo == NULL) {
qWarn("failed to acquire exchange operator, since it may have been released");
qWarn("failed to acquire exchange operator, since it may have been released, %p", pExchangeInfo);
taosMemoryFree(pMsg->pData);
return TSDB_CODE_SUCCESS;
}
@ -368,6 +370,9 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
int32_t index = pWrapper->sourceIndex;
SSourceDataInfo* pSourceDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, index);
int32_t v = 0;
sem_getvalue(&pExchangeInfo->ready, &v);
if (code == TSDB_CODE_SUCCESS) {
pSourceDataInfo->pRsp = pMsg->pData;
@ -379,19 +384,20 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
ASSERT(pRsp != NULL);
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
pRsp->numOfRows);
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d, sem:%d, %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks,
pRsp->numOfRows, v, pExchangeInfo);
} else {
taosMemoryFree(pMsg->pData);
pSourceDataInfo->code = code;
qDebug("%s fetch rsp received, index:%d, error:%s", pSourceDataInfo->taskId, index, tstrerror(code));
qDebug("%s fetch rsp received, index:%d, error:%s, sem:%d, %p", pSourceDataInfo->taskId, index, tstrerror(code), v, pExchangeInfo);
}
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pExchangeInfo->ready);
taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
code = tsem_post(&pExchangeInfo->ready);
ASSERT(code == TSDB_CODE_SUCCESS);
taosReleaseRef(exchangeObjRefPool, pWrapper->exchangeId);
return TSDB_CODE_SUCCESS;
}
@ -557,6 +563,9 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
int32_t value = 0;
sem_getvalue(&pExchangeInfo->ready, &value);
tsem_wait(&pExchangeInfo->ready);
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);