fix(stream): fix error in fill-history process.

This commit is contained in:
Haojun Liao 2023-10-09 10:08:48 +08:00
parent 64bde93f85
commit c5ee299d01
1 changed files with 5 additions and 1 deletions

View File

@ -370,8 +370,12 @@ static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32
SStreamQueueItem* pItem = NULL; SStreamQueueItem* pItem = NULL;
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
if (itemInFillhistory) {
numOfNewItems += 1;
}
break; break;
} }