refactor(stream): async stop tasks, and do some internal refactor.
This commit is contained in:
parent
7ce545bcd4
commit
b175a4b7b3
|
@ -30,7 +30,6 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored);
|
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored);
|
||||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||||
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
||||||
int32_t startStreamTasks(SStreamMeta* pMeta);
|
|
||||||
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
|
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
|
||||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
|
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
|
||||||
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -53,6 +53,7 @@ extern "C" {
|
||||||
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
|
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
|
||||||
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
|
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
|
||||||
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
|
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
|
||||||
|
#define STREAM_EXEC_STOP_ALL_TASKS_ID (-4)
|
||||||
|
|
||||||
typedef struct SStreamTask SStreamTask;
|
typedef struct SStreamTask SStreamTask;
|
||||||
typedef struct SStreamQueue SStreamQueue;
|
typedef struct SStreamQueue SStreamQueue;
|
||||||
|
@ -785,8 +786,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
|
||||||
int64_t* oldStage);
|
int64_t* oldStage);
|
||||||
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||||
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
|
||||||
|
|
||||||
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
bool streamTaskAllUpstreamClosed(SStreamTask* pTask);
|
||||||
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
bool streamTaskSetSchedStatusWait(SStreamTask* pTask);
|
||||||
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask);
|
||||||
|
@ -821,6 +820,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
|
||||||
int32_t streamTaskReloadState(SStreamTask* pTask);
|
int32_t streamTaskReloadState(SStreamTask* pTask);
|
||||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||||
|
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
||||||
|
|
||||||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||||
|
@ -851,10 +851,8 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
void streamMetaClear(SStreamMeta* pMeta);
|
void streamMetaClear(SStreamMeta* pMeta);
|
||||||
void streamMetaInitBackend(SStreamMeta* pMeta);
|
void streamMetaInitBackend(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
|
||||||
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
|
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||||
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||||
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
|
||||||
void streamMetaStartHb(SStreamMeta* pMeta);
|
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||||
|
@ -864,6 +862,11 @@ void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWLock(SStreamMeta* pMeta);
|
void streamMetaWLock(SStreamMeta* pMeta);
|
||||||
void streamMetaWUnLock(SStreamMeta* pMeta);
|
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||||
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
|
||||||
|
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
|
||||||
|
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
||||||
|
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||||
|
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
||||||
|
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
|
@ -3116,7 +3116,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
if (allReady || snodeChanged) {
|
if (allReady || snodeChanged) {
|
||||||
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
|
||||||
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId);
|
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId);
|
||||||
mndResetStatusFromCheckpoint(pMnode, streamId, activeCheckpointId);
|
mndResetStatusFromCheckpoint(pMnode, streamId, transId);
|
||||||
} else {
|
} else {
|
||||||
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
|
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
|
||||||
}
|
}
|
||||||
|
|
|
@ -151,7 +151,7 @@ FAIL:
|
||||||
|
|
||||||
int32_t sndInit(SSnode * pSnode) {
|
int32_t sndInit(SSnode * pSnode) {
|
||||||
tqStreamTaskResetStatus(pSnode->pMeta);
|
tqStreamTaskResetStatus(pSnode->pMeta);
|
||||||
startStreamTasks(pSnode->pMeta);
|
streamMetaStartAllTasks(pSnode->pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -152,9 +152,6 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);
|
||||||
char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
||||||
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||||
|
|
||||||
// tqStream
|
|
||||||
int32_t tqStopStreamTasks(STQ* pTq);
|
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
|
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
|
||||||
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -232,6 +232,7 @@ int tqPushMsg(STQ*, tmsg_t msgType);
|
||||||
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
||||||
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
||||||
int tqScanWalAsync(STQ* pTq, bool ckPause);
|
int tqScanWalAsync(STQ* pTq, bool ckPause);
|
||||||
|
int32_t tqStopStreamTasksAsync(STQ* pTq);
|
||||||
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
|
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
|
||||||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -1076,6 +1076,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
if(code == 0 && taskId > 0){
|
if(code == 0 && taskId > 0){
|
||||||
tqScanWalAsync(pTq, false);
|
tqScanWalAsync(pTq, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -109,8 +109,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks,
|
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
|
||||||
alreadyRestored);
|
numOfTasks, alreadyRestored);
|
||||||
|
|
||||||
pRunReq->head.vgId = vgId;
|
pRunReq->head.vgId = vgId;
|
||||||
pRunReq->streamId = 0;
|
pRunReq->streamId = 0;
|
||||||
|
@ -123,38 +123,25 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: createMsg to invoke this function in stream threads, to avoid blocking the syn thread
|
int32_t tqStopStreamTasksAsync(STQ* pTq) {
|
||||||
int32_t tqStopStreamTasks(STQ* pTq) {
|
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
streamMetaRLock(pMeta);
|
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
|
||||||
|
if (pRunReq == NULL) {
|
||||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tqDebug("vgId:%d stop all %d stream task(s)", vgId, num);
|
tqError("vgId:%d failed to create msg to stop tasks, code:%s", vgId, terrstr());
|
||||||
if (num == 0) {
|
return -1;
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// send hb msg to mnode before closing all tasks.
|
tqDebug("vgId:%d create msg to stop tasks", vgId);
|
||||||
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pTaskList);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
pRunReq->head.vgId = vgId;
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
pRunReq->streamId = 0;
|
||||||
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
|
pRunReq->taskId = STREAM_EXEC_STOP_ALL_TASKS_ID;
|
||||||
if (pTask == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamTaskStop(pTask);
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pTaskList);
|
|
||||||
|
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,29 +36,7 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
||||||
|
|
||||||
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
|
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
|
||||||
SSyncState state = syncGetState(pTq->pVnode->sync);
|
SSyncState state = syncGetState(pTq->pVnode->sync);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
|
||||||
int64_t stage = pMeta->stage;
|
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
|
||||||
|
|
||||||
pMeta->stage = state.term;
|
|
||||||
|
|
||||||
// mark the sign to send msg before close all tasks
|
|
||||||
if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) {
|
|
||||||
pMeta->sendMsgBeforeClosing = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
|
|
||||||
if (isLeader) {
|
|
||||||
tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
|
|
||||||
state.term, stage, isLeader);
|
|
||||||
streamMetaStartHb(pMeta);
|
|
||||||
} else {
|
|
||||||
tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
|
|
||||||
state.term, stage, isLeader, pMeta->sendMsgBeforeClosing);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
|
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
|
||||||
|
|
|
@ -598,66 +598,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startStreamTasks(SStreamMeta* pMeta) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
|
||||||
tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
|
||||||
if (numOfTasks == 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* pTaskList = NULL;
|
|
||||||
streamMetaWLock(pMeta);
|
|
||||||
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
|
||||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
|
||||||
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
|
||||||
pMeta->startInfo.startTs = taosGetTimestampMs();
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
|
|
||||||
// broadcast the check downstream tasks msg
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
|
||||||
if (pTask == NULL) {
|
|
||||||
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0,
|
|
||||||
taosGetTimestampMs(), false);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// fill-history task can only be launched by related stream tasks.
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->status.downstreamReady == 1) {
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
|
||||||
pTask->id.idStr);
|
|
||||||
streamLaunchFillHistoryTask(pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
|
||||||
pTask->execInfo.start, true);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
|
||||||
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
code = ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pTaskList);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) {
|
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
@ -723,7 +663,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
tqInfo("vgId:%d start all stream tasks after reload tasks from disk", vgId);
|
tqInfo("vgId:%d start all stream tasks after reload tasks from disk", vgId);
|
||||||
|
|
||||||
startStreamTasks(pMeta);
|
streamMetaStartAllTasks(pMeta);
|
||||||
} else {
|
} else {
|
||||||
streamMetaResetStartInfo(&pMeta->startInfo);
|
streamMetaResetStartInfo(&pMeta->startInfo);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
@ -741,11 +681,14 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
|
if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
|
||||||
startStreamTasks(pMeta);
|
streamMetaStartAllTasks(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
|
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
|
||||||
restartStreamTasks(pMeta, isLeader);
|
restartStreamTasks(pMeta, isLeader);
|
||||||
return 0;
|
return 0;
|
||||||
|
} else if (taskId == STREAM_EXEC_STOP_ALL_TASKS_ID) {
|
||||||
|
streamMetaStopAllTasks(pMeta);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
|
||||||
|
|
|
@ -594,7 +594,7 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
|
|
||||||
if (pVnode->pTq) {
|
if (pVnode->pTq) {
|
||||||
tqUpdateNodeStage(pVnode->pTq, false);
|
tqUpdateNodeStage(pVnode->pTq, false);
|
||||||
tqStopStreamTasks(pVnode->pTq);
|
tqStopStreamTasksAsync(pVnode->pTq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1367,3 +1367,119 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
|
||||||
pMeta->sendMsgBeforeClosing = false;
|
pMeta->sendMsgBeforeClosing = false;
|
||||||
return pTaskList;
|
return pTaskList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) {
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
int64_t prevStage = pMeta->stage;
|
||||||
|
pMeta->stage = stage;
|
||||||
|
|
||||||
|
// mark the sign to send msg before close all tasks
|
||||||
|
if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) {
|
||||||
|
pMeta->sendMsgBeforeClosing = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER;
|
||||||
|
|
||||||
|
if (isLeader) {
|
||||||
|
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
|
||||||
|
prevStage, stage, isLeader);
|
||||||
|
streamMetaStartHb(pMeta);
|
||||||
|
} else {
|
||||||
|
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
|
||||||
|
prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
|
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
|
||||||
|
if (num == 0) {
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// send hb msg to mnode before closing all tasks.
|
||||||
|
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pTaskList);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamTaskStop(pTask);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pTaskList);
|
||||||
|
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
stDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
||||||
|
if (numOfTasks == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pTaskList = NULL;
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
|
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||||
|
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
||||||
|
pMeta->startInfo.startTs = taosGetTimestampMs();
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
// broadcast the check downstream tasks msg
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0,
|
||||||
|
taosGetTimestampMs(), false);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fill-history task can only be launched by related stream tasks.
|
||||||
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->status.downstreamReady == 1) {
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
||||||
|
pTask->id.idStr);
|
||||||
|
streamLaunchFillHistoryTask(pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
|
||||||
|
pTask->execInfo.start, true);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||||
|
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
code = ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pTaskList);
|
||||||
|
return code;
|
||||||
|
}
|
Loading…
Reference in New Issue