diff --git a/source/dnode/mgmt/vm/src/vmWorker.c b/source/dnode/mgmt/vm/src/vmWorker.c index 9d62624756..4be6311cf8 100644 --- a/source/dnode/mgmt/vm/src/vmWorker.c +++ b/source/dnode/mgmt/vm/src/vmWorker.c @@ -187,7 +187,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr()); - return -1; + return terrno; } int32_t code = 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a92813812a..287cb094a6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -362,6 +362,7 @@ typedef struct SSourceDataInfo { int32_t index; SRetrieveTableRsp *pRsp; uint64_t totalRows; + int32_t code; EX_SOURCE_STATUS status; } SSourceDataInfo; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index de0974774f..54a75d9c7e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5004,12 +5004,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param; - pSourceDataInfo->pRsp = pMsg->pData; + if (code == TSDB_CODE_SUCCESS) { + pSourceDataInfo->pRsp = pMsg->pData; - SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; - pRsp->numOfRows = htonl(pRsp->numOfRows); - pRsp->useconds = htobe64(pRsp->useconds); - pRsp->compLen = htonl(pRsp->compLen); + SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; + pRsp->numOfRows = htonl(pRsp->numOfRows); + pRsp->compLen = htonl(pRsp->compLen); + pRsp->useconds = htobe64(pRsp->useconds); + } else { + pSourceDataInfo->code = code; + } pSourceDataInfo->status = EX_SOURCE_DATA_READY; tsem_post(&pSourceDataInfo->pEx->ready); @@ -5265,7 +5269,6 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) { totalSources, endTs - startTs); tsem_wait(&pExchangeInfo->ready); - pOperator->status = OP_RES_TO_RETURN; return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo); } @@ -5309,18 +5312,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { } doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); - tsem_wait(&pExchangeInfo->ready); - SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); + SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); + if (pDataInfo->code != TSDB_CODE_SUCCESS) { + qError("%s vgId:%d, taskID:0x%" PRIx64 " error happens, code:%s", + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, tstrerror(pDataInfo->code)); + pOperator->pTaskInfo->code = pDataInfo->code; + return NULL; + } + SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; - if (pRsp->numOfRows == 0) { - qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - " try next", + qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows);