fix(stream):reset the starting flag for the followers.

This commit is contained in:
Haojun Liao 2023-11-06 13:17:13 +08:00
parent bfd14076d7
commit 69863febe6
4 changed files with 7 additions and 8 deletions

View File

@ -433,7 +433,7 @@ typedef struct STaskStartInfo {
int64_t startTs; int64_t startTs;
int64_t readyTs; int64_t readyTs;
int32_t tasksWillRestart; int32_t tasksWillRestart;
int32_t taskRestarting; // restart flag, sentinel to guard the restart procedure. int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
int32_t elapsedTime; int32_t elapsedTime;
} STaskStartInfo; } STaskStartInfo;
@ -817,6 +817,7 @@ void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -122,7 +122,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while(1) { while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskRestarting, 0, 1); int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) { if (startVal == 0) {
break; break;
} }
@ -152,7 +152,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
code = streamMetaLoadAllTasks(pTq->pStreamMeta); code = streamMetaLoadAllTasks(pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -165,12 +165,12 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq); tqResetStreamTaskStatus(pTq);
streamMetaWUnLock(pMeta);
streamMetaWUnLock(pMeta);
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} else { } else {
streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqInfo("vgId:%d, follower node not start stream tasks", vgId); tqInfo("vgId:%d, follower node not start stream tasks", vgId);
} }

View File

@ -132,8 +132,6 @@ STaskId streamTaskExtractKey(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
SStreamQueue* streamQueueOpen(int64_t cap); SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
void streamQueueProcessSuccess(SStreamQueue* queue); void streamQueueProcessSuccess(SStreamQueue* queue);

View File

@ -1094,7 +1094,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
pStartInfo->tasksWillRestart = 0; pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0; pStartInfo->readyTs = 0;
// reset the sentinel flag value to be 0 // reset the sentinel flag value to be 0
atomic_store_32(&pStartInfo->taskRestarting, 0); atomic_store_32(&pStartInfo->taskStarting, 0);
} }
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {