fix(stream): restart tasks in stream threads, instead of write thread.
This commit is contained in:
parent
73d75aac25
commit
2674698b36
|
@ -43,9 +43,9 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct STqOffsetStore STqOffsetStore;
|
typedef struct STqOffsetStore STqOffsetStore;
|
||||||
|
|
||||||
// tqPush
|
|
||||||
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
|
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
|
||||||
#define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2)
|
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
|
||||||
|
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
|
||||||
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
|
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
|
@ -156,9 +156,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
||||||
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||||
|
|
||||||
// tqStream
|
// tqStream
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
|
||||||
int32_t tqScanWal(STQ* pTq);
|
|
||||||
int32_t tqStartStreamTask(STQ* pTq);
|
|
||||||
int32_t tqResetStreamTaskStatus(STQ* pTq);
|
int32_t tqResetStreamTaskStatus(STQ* pTq);
|
||||||
int32_t tqStopStreamTasks(STQ* pTq);
|
int32_t tqStopStreamTasks(STQ* pTq);
|
||||||
|
|
||||||
|
|
|
@ -231,7 +231,12 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqLaunchStreamTaskAsync(STQ* pTq);
|
|
||||||
|
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
|
||||||
|
int32_t tqRestartStreamTasks(STQ* pTq);
|
||||||
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
|
int32_t tqScanWal(STQ* pTq);
|
||||||
|
int32_t tqStartStreamTasks(STQ* pTq);
|
||||||
|
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||||
|
|
|
@ -1317,14 +1317,15 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t taskId = pReq->taskId;
|
int32_t taskId = pReq->taskId;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
|
|
||||||
tqStartStreamTask(pTq);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
|
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
|
||||||
tqScanWal(pTq);
|
tqScanWal(pTq);
|
||||||
return 0;
|
return 0;
|
||||||
|
} else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
|
||||||
|
tqStartStreamTasks(pTq);
|
||||||
|
return 0;
|
||||||
|
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
|
||||||
|
tqRestartStreamTasks(pTq);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
|
||||||
|
@ -1911,7 +1912,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
|
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
|
||||||
|
|
||||||
pMeta->startInfo.startAllTasksFlag = 1;
|
pMeta->startInfo.tasksWillRestart = 1;
|
||||||
|
|
||||||
if (updateTasks < numOfTasks) {
|
if (updateTasks < numOfTasks) {
|
||||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||||
|
@ -1920,45 +1921,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
} else {
|
} else {
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||||
pMeta->startInfo.startAllTasksFlag = 0;
|
pMeta->startInfo.tasksWillRestart = 0;
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d tasks are all updated and stopped, restart them", vgId);
|
|
||||||
terrno = 0;
|
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
|
|
||||||
while (streamMetaTaskInTimer(pMeta)) {
|
|
||||||
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
|
||||||
taosMsleep(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
|
||||||
|
|
||||||
int32_t code = streamMetaReopen(pMeta);
|
|
||||||
if (code != 0) {
|
|
||||||
tqError("vgId:%d failed to reopen stream meta", vgId);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
|
|
||||||
tqError("vgId:%d failed to load stream tasks", vgId);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
|
||||||
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
|
||||||
tqResetStreamTaskStatus(pTq);
|
|
||||||
tqLaunchStreamTaskAsync(pTq);
|
|
||||||
} else {
|
|
||||||
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
tqStartStreamTaskAsync(pTq, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStartStreamTask(STQ* pTq) {
|
int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
@ -125,7 +125,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskRestarting, 0, 1);
|
||||||
if (startVal == 0) {
|
if (startVal == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -155,7 +155,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
|
||||||
|
|
||||||
int64_t el = taosGetTimestampMs() - st;
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
|
|
||||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
|
tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.);
|
||||||
|
|
||||||
code = streamMetaLoadAllTasks(pTq->pStreamMeta);
|
code = streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -168,15 +168,12 @@ int32_t tqRestartStreamTasks(STQ* pTq) {
|
||||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||||
tqResetStreamTaskStatus(pTq);
|
tqResetStreamTaskStatus(pTq);
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
} else {
|
} else {
|
||||||
streamMetaResetStartInfo(&pMeta->startInfo);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
code = terrno;
|
code = terrno;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -198,10 +195,10 @@ int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks);
|
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
||||||
pRunReq->head.vgId = vgId;
|
pRunReq->head.vgId = vgId;
|
||||||
pRunReq->streamId = 0;
|
pRunReq->streamId = 0;
|
||||||
pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID;
|
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
||||||
|
|
|
@ -557,7 +557,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
if (pMeta->startInfo.startAllTasksFlag) {
|
if (pMeta->startInfo.tasksWillRestart) {
|
||||||
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return;
|
return;
|
||||||
|
@ -570,7 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
|
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
|
||||||
tqResetStreamTaskStatus(pVnode->pTq);
|
tqResetStreamTaskStatus(pVnode->pTq);
|
||||||
tqLaunchStreamTaskAsync(pVnode->pTq);
|
tqStartStreamTaskAsync(pVnode->pTq, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||||
|
|
Loading…
Reference in New Issue