fix(stream): restart tasks in stream threads, instead of write thread.

This commit is contained in:
Haojun Liao 2023-11-06 11:57:21 +08:00
parent 0e44950a37
commit d41fd27189
7 changed files with 91 additions and 61 deletions

View File

@ -432,7 +432,8 @@ struct SStreamTask {
typedef struct STaskStartInfo { typedef struct STaskStartInfo {
int64_t startTs; int64_t startTs;
int64_t readyTs; int64_t readyTs;
int32_t startAllTasksFlag; int32_t tasksWillRestart;
int32_t taskRestarting; // restart flag, sentinel to guard the restart procedure.
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
int32_t elapsedTime; int32_t elapsedTime;
} STaskStartInfo; } STaskStartInfo;

View File

@ -43,9 +43,9 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
// tqPush
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2) #define STREAM_EXEC_START_ALL_TASKS_ID (-2)
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
// tqExec // tqExec
typedef struct { typedef struct {
@ -155,9 +155,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream // tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTask(STQ* pTq);
int32_t tqResetStreamTaskStatus(STQ* pTq); int32_t tqResetStreamTaskStatus(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq);

View File

@ -231,7 +231,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqLaunchStreamTaskAsync(STQ* pTq);
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
int32_t tqRestartStreamTasks(STQ* pTq);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -1307,14 +1307,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
tqStartStreamTask(pTq);
return 0;
}
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
tqScanWal(pTq); tqScanWal(pTq);
return 0; return 0;
} else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
tqStartStreamTasks(pTq);
return 0;
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
tqRestartStreamTasks(pTq);
return 0;
} }
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
@ -1900,7 +1901,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.startAllTasksFlag = 1; pMeta->startInfo.tasksWillRestart = 1;
if (updateTasks < numOfTasks) { if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
@ -1909,45 +1910,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} else { } else {
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.startAllTasksFlag = 0; pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
} else { } else {
tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId);
terrno = 0;
streamMetaWUnLock(pMeta);
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqLaunchStreamTaskAsync(pTq);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqStartStreamTaskAsync(pTq, true);
} }
} }

View File

@ -58,7 +58,7 @@ int32_t tqScanWal(STQ* pTq) {
return 0; return 0;
} }
int32_t tqStartStreamTask(STQ* pTq) { int32_t tqStartStreamTasks(STQ* pTq) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -115,7 +115,67 @@ int32_t tqStartStreamTask(STQ* pTq) {
return code; return code;
} }
int32_t tqLaunchStreamTaskAsync(STQ* pTq) { int32_t tqRestartStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskRestarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
terrno = 0;
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId);
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.);
code = streamMetaLoadAllTasks(pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqStartStreamTasks(pTq);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
@ -132,10 +192,10 @@ int32_t tqLaunchStreamTaskAsync(STQ* pTq) {
return -1; return -1;
} }
tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks); tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID; pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);

View File

@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta; SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (pMeta->startInfo.startAllTasksFlag) { if (pMeta->startInfo.tasksWillRestart) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
return; return;
@ -567,7 +567,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqResetStreamTaskStatus(pVnode->pTq); tqResetStreamTaskStatus(pVnode->pTq);
tqLaunchStreamTaskAsync(pVnode->pTq); tqStartStreamTaskAsync(pVnode->pTq, false);
} }
} else { } else {
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);

View File

@ -228,12 +228,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
} }
int32_t streamMetaReopen(SStreamMeta* pMeta) { int32_t streamMetaReopen(SStreamMeta* pMeta) {
// backup the restart flag
int32_t restartFlag = pMeta->startInfo.startAllTasksFlag;
streamMetaClear(pMeta); streamMetaClear(pMeta);
pMeta->startInfo.startAllTasksFlag = restartFlag;
// NOTE: role should not be changed during reopen meta // NOTE: role should not be changed during reopen meta
pMeta->streamBackendRid = -1; pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL; pMeta->streamBackend = NULL;
@ -302,7 +298,9 @@ void streamMetaClear(SStreamMeta* pMeta) {
pMeta->numOfPausedTasks = 0; pMeta->numOfPausedTasks = 0;
pMeta->chkptNotReadyTasks = 0; pMeta->chkptNotReadyTasks = 0;
streamMetaResetStartInfo(&pMeta->startInfo); // the willrestart/starting flag can NOT be cleared
taosHashClear(pMeta->startInfo.pReadyTaskSet);
pMeta->startInfo.readyTs = 0;
} }
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
@ -1093,8 +1091,10 @@ void streamMetaInitForSnode(SStreamMeta* pMeta) {
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
taosHashClear(pStartInfo->pReadyTaskSet); taosHashClear(pStartInfo->pReadyTaskSet);
pStartInfo->startAllTasksFlag = 0; pStartInfo->tasksWillRestart = 0;
pStartInfo->readyTs = 0; pStartInfo->readyTs = 0;
// reset the sentinel flag value to be 0
atomic_store_32(&pStartInfo->taskRestarting, 0);
} }
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {