Merge pull request #18426 from taosdata/fix/liao_cov

fix(query): fix error in seq fetch data block.
This commit is contained in:
Shengliang Guan 2022-11-24 13:44:07 +08:00 committed by GitHub
commit 631dc756b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 36 additions and 16 deletions

View File

@ -63,6 +63,7 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator);
static int32_t seqLoadRemoteData(SOperatorInfo* pOperator); static int32_t seqLoadRemoteData(SOperatorInfo* pOperator);
static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator); static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator);
static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf); static int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
static int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo);
static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo, static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeInfo* pExchangeInfo,
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
@ -112,20 +113,12 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
break; break;
} }
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
int32_t index = 0; if (code != TSDB_CODE_SUCCESS) {
char* pStart = pRetrieveRsp->data;
while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
goto _error; goto _error;
} }
taosArrayPush(pExchangeInfo->pResultBlockList, &pb); SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
}
updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator); updateLoadRemoteInfo(pLoadInfo, pRetrieveRsp->numOfRows, pRetrieveRsp->compLen, pDataInfo->startTime, pOperator);
pDataInfo->totalRows += pRetrieveRsp->numOfRows; pDataInfo->totalRows += pRetrieveRsp->numOfRows;
@ -586,10 +579,32 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doExtractResultBlocks(SExchangeInfo* pExchangeInfo, SSourceDataInfo* pDataInfo) {
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
char* pStart = pRetrieveRsp->data;
int32_t index = 0;
int32_t code = 0;
while (index++ < pRetrieveRsp->numOfBlocks) {
SSDataBlock* pb = createOneDataBlock(pExchangeInfo->pDummyBlock, false);
code = extractDataBlockFromFetchRsp(pb, pStart, NULL, &pStart);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
return code;
}
taosArrayPush(pExchangeInfo->pResultBlockList, &pb);
}
return code;
}
int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = 0;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
@ -629,11 +644,12 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
continue; continue;
} }
code = doExtractResultBlocks(pExchangeInfo, pDataInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
char* pStart = pRetrieveRsp->data;
int32_t code = extractDataBlockFromFetchRsp(NULL, pStart, NULL, &pStart);
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
@ -656,6 +672,10 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
taosMemoryFreeClear(pDataInfo->pRsp); taosMemoryFreeClear(pDataInfo->pRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
_error:
pTaskInfo->code = code;
return code;
} }
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {