Merge pull request #20527 from taosdata/fix/TD-23200
fix:scan of fill history ended prematurely
This commit is contained in:
commit
7d1aa634fb
|
@ -2042,7 +2042,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
|||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
qDebug("===stream===printDataBlock: Block is Null or Empty");
|
||||
qDebug("===stream===%s: Block is Null or Empty", flag);
|
||||
return;
|
||||
}
|
||||
char* pBuf = NULL;
|
||||
|
|
|
@ -1803,6 +1803,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
printDataBlock(pInfo->pUpdateRes, "recover update");
|
||||
return pInfo->pUpdateRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
||||
|
@ -1813,7 +1814,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
// printDataBlock(pSDB, "stream scan update");
|
||||
printDataBlock(pSDB, "scan recover update");
|
||||
calBlockTbName(pInfo, pSDB);
|
||||
return pSDB;
|
||||
}
|
||||
|
@ -1838,6 +1839,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
|
||||
return pInfo->pCreateTbRes;
|
||||
}
|
||||
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
|
||||
|
|
|
@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
if (batchCnt >= batchSz) break;
|
||||
}
|
||||
if (taosArrayGetSize(pRes) == 0) {
|
||||
taosArrayDestroy(pRes);
|
||||
break;
|
||||
if (finished) {
|
||||
taosArrayDestroy(pRes);
|
||||
qDebug("task %d finish recover exec task ", pTask->taskId);
|
||||
break;
|
||||
} else {
|
||||
qDebug("task %d continue recover exec task ", pTask->taskId);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||
if (qRes == NULL) {
|
||||
|
|
Loading…
Reference in New Issue