fix(stream): set correct start version if no checkpoint exist.

This commit is contained in:
Haojun Liao 2024-04-19 19:12:23 +08:00
parent b903872a4e
commit ea527b19b8
4 changed files with 19 additions and 21 deletions

View File

@ -41,5 +41,6 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode);
void tqSetRestoreVersionInfo(SStreamTask* pTask);
#endif // TDENGINE_TQ_COMMON_H

View File

@ -58,17 +58,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
streamSetupScheduleTrigger(pTask);
SCheckpointInfo *pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64,
pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask)->name;
if (pTask->info.fillHistory) {

View File

@ -760,16 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
tqSetRestoreVersionInfo(pTask);
char* p = streamTaskGetStatus(pTask)->name;
const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);

View File

@ -86,6 +86,22 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode)
return TSDB_CODE_SUCCESS;
}
void tqSetRestoreVersionInfo(SStreamTask* pTask) {
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
// checkpoint ver is the kept version, handled data should be the next version.
if (pChkInfo->checkpointId != 0) {
pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1;
pChkInfo->processedVer = pChkInfo->checkpointVer;
pTask->execInfo.startCheckpointId = pChkInfo->checkpointId;
tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer;
}
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);