diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index da9dcf1c04..da2325f006 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -30,7 +30,6 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); -int32_t startStreamTasks(SStreamMeta* pMeta); int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e7ac0e99c5..2d2db5c1dc 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -53,6 +53,7 @@ extern "C" { #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1) #define STREAM_EXEC_START_ALL_TASKS_ID (-2) #define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3) +#define STREAM_EXEC_STOP_ALL_TASKS_ID (-4) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; @@ -785,8 +786,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ int64_t* oldStage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); -SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); - bool streamTaskAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); @@ -821,6 +820,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); +int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); @@ -851,10 +851,8 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); -int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); -int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); void streamMetaStartHb(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, @@ -864,6 +862,11 @@ void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWUnLock(SStreamMeta* pMeta); void streamMetaResetStartInfo(STaskStartInfo* pMeta); +SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); +void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); +int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); +int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3ba6280f59..25f51b85df 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -3116,7 +3116,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId); - mndResetStatusFromCheckpoint(pMnode, streamId, activeCheckpointId); + mndResetStatusFromCheckpoint(pMnode, streamId, transId); } else { mInfo("not all vgroups are ready, wait for next HB from stream tasks"); } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f0fe662025..2225a07d3a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -151,7 +151,7 @@ FAIL: int32_t sndInit(SSnode * pSnode) { tqStreamTaskResetStatus(pSnode->pMeta); - startStreamTasks(pSnode->pMeta); + streamMetaStartAllTasks(pSnode->pMeta); return 0; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index cf57623a43..0ef29fcb3a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -152,9 +152,6 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data); char* tqOffsetBuildFName(const char* path, int32_t fVer); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); -// tqStream -int32_t tqStopStreamTasks(STQ* pTq); - // tq util int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index f3b495675d..84f51827ba 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -232,6 +232,7 @@ int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqScanWalAsync(STQ* pTq, bool ckPause); +int32_t tqStopStreamTasksAsync(STQ* pTq); int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 02c9b61432..e8e88e52f7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1076,6 +1076,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if(code == 0 && taskId > 0){ tqScanWalAsync(pTq, false); } + return code; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 63e71f8ee8..2bb2609ae9 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -109,8 +109,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return -1; } - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, restored:%d", vgId, numOfTasks, - alreadyRestored); + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, + numOfTasks, alreadyRestored); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; @@ -123,38 +123,25 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { return 0; } -// todo: createMsg to invoke this function in stream threads, to avoid blocking the syn thread -int32_t tqStopStreamTasks(STQ* pTq) { +int32_t tqStopStreamTasksAsync(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; - int32_t vgId = TD_VID(pTq->pVnode); + int32_t vgId = pMeta->vgId; - streamMetaRLock(pMeta); - - int32_t num = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d stop all %d stream task(s)", vgId, num); - if (num == 0) { - streamMetaRUnLock(pMeta); - return TSDB_CODE_SUCCESS; + SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); + if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create msg to stop tasks, code:%s", vgId, terrstr()); + return -1; } - // send hb msg to mnode before closing all tasks. - SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); - int32_t numOfTasks = taosArrayGetSize(pTaskList); + tqDebug("vgId:%d create msg to stop tasks", vgId); - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId); - if (pTask == NULL) { - continue; - } + pRunReq->head.vgId = vgId; + pRunReq->streamId = 0; + pRunReq->taskId = STREAM_EXEC_STOP_ALL_TASKS_ID; - streamTaskStop(pTask); - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - - streamMetaRUnLock(pMeta); + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; + tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); return 0; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 64c733d63d..d18455d221 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -35,30 +35,8 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { } void tqUpdateNodeStage(STQ* pTq, bool isLeader) { - SSyncState state = syncGetState(pTq->pVnode->sync); - SStreamMeta* pMeta = pTq->pStreamMeta; - int64_t stage = pMeta->stage; - - streamMetaWLock(pMeta); - - pMeta->stage = state.term; - - // mark the sign to send msg before close all tasks - if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) { - pMeta->sendMsgBeforeClosing = true; - } - - pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER; - if (isLeader) { - tqInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId, - state.term, stage, isLeader); - streamMetaStartHb(pMeta); - } else { - tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId, - state.term, stage, isLeader, pMeta->sendMsgBeforeClosing); - } - - streamMetaWUnLock(pMeta); + SSyncState state = syncGetState(pTq->pVnode->sync); + streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader); } static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 0864afdf2c..af9363e363 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -598,66 +598,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen return 0; } -int32_t startStreamTasks(SStreamMeta* pMeta) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = pMeta->vgId; - - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); - if (numOfTasks == 0) { - return TSDB_CODE_SUCCESS; - } - - SArray* pTaskList = NULL; - streamMetaWLock(pMeta); - pTaskList = taosArrayDup(pMeta->pTaskList, NULL); - taosHashClear(pMeta->startInfo.pReadyTaskSet); - taosHashClear(pMeta->startInfo.pFailedTaskSet); - pMeta->startInfo.startTs = taosGetTimestampMs(); - streamMetaWUnLock(pMeta); - - // broadcast the check downstream tasks msg - for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); - if (pTask == NULL) { - streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, - taosGetTimestampMs(), false); - continue; - } - - // fill-history task can only be launched by related stream tasks. - if (pTask->info.fillHistory == 1) { - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - if (pTask->status.downstreamReady == 1) { - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", - pTask->id.idStr); - streamLaunchFillHistoryTask(pTask); - } - - streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, - pTask->execInfo.start, true); - streamMetaReleaseTask(pMeta, pTask); - continue; - } - - EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); - if (ret != TSDB_CODE_SUCCESS) { - code = ret; - } - - streamMetaReleaseTask(pMeta, pTask); - } - - taosArrayDestroy(pTaskList); - return code; -} - int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); @@ -723,7 +663,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaWUnLock(pMeta); tqInfo("vgId:%d start all stream tasks after reload tasks from disk", vgId); - startStreamTasks(pMeta); + streamMetaStartAllTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo); streamMetaWUnLock(pMeta); @@ -741,11 +681,14 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t vgId = pMeta->vgId; if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) { - startStreamTasks(pMeta); + streamMetaStartAllTasks(pMeta); return 0; } else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) { restartStreamTasks(pMeta, isLeader); return 0; + } else if (taskId == STREAM_EXEC_STOP_ALL_TASKS_ID) { + streamMetaStopAllTasks(pMeta); + return 0; } SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 48811b2925..048092131d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -594,7 +594,7 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) { if (pVnode->pTq) { tqUpdateNodeStage(pVnode->pTq, false); - tqStopStreamTasks(pVnode->pTq); + tqStopStreamTasksAsync(pVnode->pTq); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 665288cf7b..0702a908ab 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1367,3 +1367,119 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { pMeta->sendMsgBeforeClosing = false; return pTaskList; } + +void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) { + streamMetaWLock(pMeta); + + int64_t prevStage = pMeta->stage; + pMeta->stage = stage; + + // mark the sign to send msg before close all tasks + if ((!isLeader) && (pMeta->role == NODE_ROLE_LEADER)) { + pMeta->sendMsgBeforeClosing = true; + } + + pMeta->role = (isLeader)? NODE_ROLE_LEADER:NODE_ROLE_FOLLOWER; + + if (isLeader) { + stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId, + prevStage, stage, isLeader); + streamMetaStartHb(pMeta); + } else { + stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId, + prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing); + } + + streamMetaWUnLock(pMeta); +} + +int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { + streamMetaRLock(pMeta); + + int32_t num = taosArrayGetSize(pMeta->pTaskList); + stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); + if (num == 0) { + streamMetaRUnLock(pMeta); + return TSDB_CODE_SUCCESS; + } + + // send hb msg to mnode before closing all tasks. + SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); + int32_t numOfTasks = taosArrayGetSize(pTaskList); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + continue; + } + + streamTaskStop(pTask); + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + + streamMetaRUnLock(pMeta); + return 0; +} + +int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pMeta->vgId; + + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + stDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + SArray* pTaskList = NULL; + streamMetaWLock(pMeta); + pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + taosHashClear(pMeta->startInfo.pReadyTaskSet); + taosHashClear(pMeta->startInfo.pFailedTaskSet); + pMeta->startInfo.startTs = taosGetTimestampMs(); + streamMetaWUnLock(pMeta); + + // broadcast the check downstream tasks msg + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); + if (pTask == NULL) { + streamMetaUpdateTaskDownstreamStatus(pMeta, pTaskId->streamId, pTaskId->taskId, 0, + taosGetTimestampMs(), false); + continue; + } + + // fill-history task can only be launched by related stream tasks. + if (pTask->info.fillHistory == 1) { + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + if (pTask->status.downstreamReady == 1) { + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task", + pTask->id.idStr); + streamLaunchFillHistoryTask(pTask); + } + + streamMetaUpdateTaskDownstreamStatus(pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init, + pTask->execInfo.start, true); + streamMetaReleaseTask(pMeta, pTask); + continue; + } + + EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + if (ret != TSDB_CODE_SUCCESS) { + code = ret; + } + + streamMetaReleaseTask(pMeta, pTask); + } + + taosArrayDestroy(pTaskList); + return code; +} \ No newline at end of file