fix(stream): set the processVer for streamTask when initializing the task object.
This commit is contained in:
parent
e62dd2ff5c
commit
a3b04c3c58
|
@ -85,8 +85,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
|
|||
|
||||
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
|
||||
// checkpoint ver is the kept version, handled data should be the next version.
|
||||
if (pTask->chkInfo.checkpointId != 0) {
|
||||
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
|
||||
if (pChkInfo->checkpointId != 0) {
|
||||
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
||||
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
|
||||
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||
}
|
||||
|
|
|
@ -835,6 +835,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
|
|||
// checkpoint ver is the kept version, handled data should be the next version.
|
||||
if (pChkInfo->checkpointId != 0) {
|
||||
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
|
||||
pChkInfo->processedVer = pChkInfo->checkpointVer;
|
||||
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
|
||||
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue