diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4dc1b9315e..a33a259ef7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -433,7 +433,7 @@ typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; int32_t tasksWillRestart; - int32_t taskRestarting; // restart flag, sentinel to guard the restart procedure. + int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing int32_t elapsedTime; } STaskStartInfo; @@ -817,6 +817,7 @@ void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); +void streamMetaResetStartInfo(STaskStartInfo* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 6e0e63b67d..d7ab76bd82 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -122,7 +122,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { int64_t st = taosGetTimestampMs(); while(1) { - int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskRestarting, 0, 1); + int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); if (startVal == 0) { break; } @@ -152,7 +152,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); + tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); code = streamMetaLoadAllTasks(pTq->pStreamMeta); if (code != TSDB_CODE_SUCCESS) { @@ -165,12 +165,12 @@ int32_t tqRestartStreamTasks(STQ* pTq) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); tqResetStreamTaskStatus(pTq); - streamMetaWUnLock(pMeta); + streamMetaWUnLock(pMeta); tqStartStreamTasks(pTq); } else { + streamMetaResetStartInfo(&pMeta->startInfo); streamMetaWUnLock(pMeta); - tqInfo("vgId:%d, follower node not start stream tasks", vgId); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index c63b51d745..09d26119f1 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -132,8 +132,6 @@ STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); -void streamMetaResetStartInfo(STaskStartInfo* pMeta); - SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueProcessSuccess(SStreamQueue* queue); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ee02ccf8f1..3ff7fbcf4f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1094,7 +1094,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { pStartInfo->tasksWillRestart = 0; pStartInfo->readyTs = 0; // reset the sentinel flag value to be 0 - atomic_store_32(&pStartInfo->taskRestarting, 0); + atomic_store_32(&pStartInfo->taskStarting, 0); } void streamMetaRLock(SStreamMeta* pMeta) {