fix(stream): refactor the tasks restart mechanism.
This commit is contained in:
parent
41fe441dad
commit
b8101afbe2
|
@ -390,6 +390,7 @@ typedef struct SStreamMeta {
|
|||
tmr_h hbTmr;
|
||||
SMgmtInfo mgmtInfo;
|
||||
|
||||
int32_t closedTask;
|
||||
int32_t chkptNotReadyTasks;
|
||||
|
||||
int64_t chkpId;
|
||||
|
|
|
@ -1855,13 +1855,12 @@ int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
bool restartTasks = false;
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||
|
||||
SStreamTaskNodeUpdateMsg req = {0};
|
||||
|
||||
|
@ -1884,34 +1883,52 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||
streamTaskStop(pTask);
|
||||
|
||||
SStreamTask* pHistoryTask = NULL;
|
||||
if (pTask->historyTaskId.taskId != 0) {
|
||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
|
||||
if (pHistoryTask != NULL) {
|
||||
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
|
||||
streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList);
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
||||
streamTaskRestart(pHistoryTask, NULL, startTask);
|
||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||
} else {
|
||||
tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped",
|
||||
pMeta->vgId, pTask->historyTaskId.taskId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
}
|
||||
}
|
||||
int32_t numOfCount = streamMetaGetNumOfTasks(pMeta);
|
||||
pMeta->closedTask += 1;
|
||||
|
||||
streamTaskRestart(pTask, NULL, startTask);
|
||||
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
int32_t code = rsp.code;
|
||||
// all tasks are closed, now let's restart the stream meta
|
||||
if (pMeta->closedTask == numOfCount) {
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
restartTasks = true;
|
||||
pMeta->closedTask = 0; // reset value
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
|
||||
_end:
|
||||
tDecoderClear(&decoder);
|
||||
tmsgSendRsp(&rsp);
|
||||
|
||||
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
|
||||
return code;
|
||||
if (restartTasks) {
|
||||
tqDebug("vgId:%d all tasks are stopped, restart them", vgId);
|
||||
|
||||
streamMetaClose(pTask->pMeta);
|
||||
|
||||
pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, pTq->pVnode->config.vgId, -1);
|
||||
if (pTq->pStreamMeta == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (streamLoadTasks(pTq->pStreamMeta) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||
vInfo("vgId:%d, restart to all stream tasks", vgId);
|
||||
tqCheckStreamStatus(pTq);
|
||||
}
|
||||
}
|
||||
|
||||
return rsp.code;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
|
|
Loading…
Reference in New Issue