fix(stream):reset the starting flag for the followers.
This commit is contained in:
parent
d440da0d8c
commit
e38e526dda
|
@ -432,7 +432,8 @@ struct SStreamTask {
|
||||||
typedef struct STaskStartInfo {
|
typedef struct STaskStartInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t readyTs;
|
int64_t readyTs;
|
||||||
int32_t startAllTasksFlag;
|
int32_t tasksWillRestart;
|
||||||
|
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
||||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||||
int32_t elapsedTime;
|
int32_t elapsedTime;
|
||||||
} STaskStartInfo;
|
} STaskStartInfo;
|
||||||
|
@ -816,6 +817,7 @@ void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWLock(SStreamMeta* pMeta);
|
void streamMetaWLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWUnLock(SStreamMeta* pMeta);
|
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||||
|
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
|
@ -115,7 +115,70 @@ int32_t tqStartStreamTask(STQ* pTq) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
|
int32_t tqRestartStreamTasks(STQ* pTq) {
|
||||||
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int32_t code = 0;
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
|
while(1) {
|
||||||
|
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||||
|
if (startVal == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
|
||||||
|
taosMsleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
terrno = 0;
|
||||||
|
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
|
||||||
|
pMeta->updateInfo.transId);
|
||||||
|
|
||||||
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
|
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
code = streamMetaReopen(pMeta);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqError("vgId:%d failed to reopen stream meta", vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
|
|
||||||
|
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
|
||||||
|
|
||||||
|
code = streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
|
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||||
|
tqResetStreamTaskStatus(pTq);
|
||||||
|
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
tqStartStreamTasks(pTq);
|
||||||
|
} else {
|
||||||
|
streamMetaResetStartInfo(&pMeta->startInfo);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = terrno;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) {
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
|
|
|
@ -132,8 +132,6 @@ STaskId streamTaskExtractKey(const SStreamTask* pTask);
|
||||||
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
|
||||||
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
|
||||||
|
|
||||||
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
|
||||||
|
|
||||||
SStreamQueue* streamQueueOpen(int64_t cap);
|
SStreamQueue* streamQueueOpen(int64_t cap);
|
||||||
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
|
||||||
void streamQueueProcessSuccess(SStreamQueue* queue);
|
void streamQueueProcessSuccess(SStreamQueue* queue);
|
||||||
|
|
|
@ -228,12 +228,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
int32_t streamMetaReopen(SStreamMeta* pMeta) {
|
||||||
// backup the restart flag
|
|
||||||
int32_t restartFlag = pMeta->startInfo.startAllTasksFlag;
|
|
||||||
streamMetaClear(pMeta);
|
streamMetaClear(pMeta);
|
||||||
|
|
||||||
pMeta->startInfo.startAllTasksFlag = restartFlag;
|
|
||||||
|
|
||||||
// NOTE: role should not be changed during reopen meta
|
// NOTE: role should not be changed during reopen meta
|
||||||
pMeta->streamBackendRid = -1;
|
pMeta->streamBackendRid = -1;
|
||||||
pMeta->streamBackend = NULL;
|
pMeta->streamBackend = NULL;
|
||||||
|
@ -1095,6 +1091,8 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||||
pStartInfo->startAllTasksFlag = 0;
|
pStartInfo->startAllTasksFlag = 0;
|
||||||
pStartInfo->readyTs = 0;
|
pStartInfo->readyTs = 0;
|
||||||
|
// reset the sentinel flag value to be 0
|
||||||
|
atomic_store_32(&pStartInfo->taskStarting, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||||
|
|
Loading…
Reference in New Issue