check preversion data

This commit is contained in:
54liuyao 2024-04-08 17:23:46 +08:00
parent 3ba45df0a0
commit 858d61072e
1 changed files with 7 additions and 1 deletions

View File

@ -1633,6 +1633,9 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, startTs, endTs, version);
printDataBlock(pPreRes, "pre res", taskIdStr);
blockDataCleanup(pBlock);
if (!pPreRes) {
return ;
}
int32_t code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return ;
@ -2522,6 +2525,7 @@ FETCH_NEXT_BLOCK:
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_GET_ALL:
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pBlock;
case STREAM_RETRIEVE: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
@ -2603,6 +2607,7 @@ FETCH_NEXT_BLOCK:
pInfo->pRes->info.dataLoad = 1;
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
} break;
@ -2724,6 +2729,7 @@ FETCH_NEXT_BLOCK:
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
if (pBlockInfo->rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
@ -2747,7 +2753,7 @@ FETCH_NEXT_BLOCK:
if (pBlock->info.type == STREAM_CHECKPOINT) {
streamScanOperatorSaveCheckpoint(pInfo);
}
// printDataBlock(pBlock, "stream scan ck");
// printDataBlock(pInfo->pCheckpointRes, "stream scan ck", GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
}