[td-13039] fix query crash.

This commit is contained in:
Haojun Liao 2022-04-01 16:09:00 +08:00
parent 13e7310315
commit 3846250454
3 changed files with 20 additions and 12 deletions

View File

@ -187,7 +187,7 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to write msg:%p to vnode-queue since %s", pHead->vgId, pMsg, terrstr());
return -1;
return terrno;
}
int32_t code = 0;

View File

@ -362,6 +362,7 @@ typedef struct SSourceDataInfo {
int32_t index;
SRetrieveTableRsp *pRsp;
uint64_t totalRows;
int32_t code;
EX_SOURCE_STATUS status;
} SSourceDataInfo;

View File

@ -5004,12 +5004,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*)param;
pSourceDataInfo->pRsp = pMsg->pData;
if (code == TSDB_CODE_SUCCESS) {
pSourceDataInfo->pRsp = pMsg->pData;
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->useconds = htobe64(pRsp->useconds);
pRsp->compLen = htonl(pRsp->compLen);
SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp;
pRsp->numOfRows = htonl(pRsp->numOfRows);
pRsp->compLen = htonl(pRsp->compLen);
pRsp->useconds = htobe64(pRsp->useconds);
} else {
pSourceDataInfo->code = code;
}
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
tsem_post(&pSourceDataInfo->pEx->ready);
@ -5265,7 +5269,6 @@ static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) {
totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->status = OP_RES_TO_RETURN;
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
@ -5309,18 +5312,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
}
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
tsem_wait(&pExchangeInfo->ready);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
qError("%s vgId:%d, taskID:0x%" PRIx64 " error happens, code:%s",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, tstrerror(pDataInfo->code));
pOperator->pTaskInfo->code = pDataInfo->code;
return NULL;
}
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
if (pRsp->numOfRows == 0) {
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64
" try next",
qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
pDataInfo->totalRows, pLoadInfo->totalRows);