fix(stream): fix the compatible issue when the fill-history exists.

This commit is contained in:
Haojun Liao 2024-02-22 19:05:30 +08:00
parent bad3730387
commit af1f61cd39
1 changed files with 10 additions and 0 deletions

View File

@ -494,6 +494,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pRange->range.maxVer = ver;
pRange->range.minVer = ver;
} else {
// the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task
// is set at the mnode.
if (pTask->info.fillHistory == 1) {
pChkInfo->checkpointVer = pRange->range.maxVer;
pChkInfo->processedVer = pRange->range.maxVer;
@ -502,6 +504,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pChkInfo->checkpointVer = pRange->range.minVer - 1;
pChkInfo->processedVer = pRange->range.minVer - 1;
pChkInfo->nextProcessVer = pRange->range.minVer;
{ // for compatible purpose, remove it later
if (pRange->range.minVer == 0) {
pChkInfo->checkpointVer = 0;
pChkInfo->processedVer = 0;
stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr);
}
}
}
}
}