fix(stream): update the checkpoint record start time.
This commit is contained in:
parent
6ef68fdf06
commit
7f265e181d
|
@ -306,7 +306,6 @@ int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: handle the case: during the checkpoint procedure, leader/follower changes happened.
|
|
||||||
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -316,9 +315,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
ASSERT(remain >= 0);
|
ASSERT(remain >= 0);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->chkInfo.startTs) / 1000.0;
|
||||||
|
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
||||||
|
|
||||||
if (remain == 0) { // all tasks are ready
|
if (remain == 0) { // all tasks are ready
|
||||||
pTask->chkInfo.startTs = 0; // clear the recorded start time
|
|
||||||
stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
|
stDebug("s-task:%s is ready for checkpoint", pTask->id.idStr);
|
||||||
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
streamBackendDoCheckpoint(pMeta, pTask->checkpointingId);
|
||||||
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
streamSaveAllTaskStatus(pMeta, pTask->checkpointingId);
|
||||||
|
|
Loading…
Reference in New Issue