diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f3f9319ca8..2058324b50 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -639,7 +639,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage); -int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir); +int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskStop(SStreamTask* pTask); int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 1e59dd1805..23d21b24a3 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -203,6 +203,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 884365447b..a8d9d20096 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -88,8 +88,6 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq); - /*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/ - mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); @@ -776,16 +774,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } } - pDb = mndAcquireDb(pMnode, streamObj.sourceDb); - if (pDb->cfg.replications != 1) { - mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); - terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; - mndReleaseDb(pMnode, pDb); - pDb = NULL; - goto _OVER; - } +// pDb = mndAcquireDb(pMnode, streamObj.sourceDb); +// if (pDb->cfg.replications != 1) { +// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); +// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; +// mndReleaseDb(pMnode, pDb); +// pDb = NULL; +// goto _OVER; +// } - mndReleaseDb(pMnode, pDb); +// mndReleaseDb(pMnode, pDb); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); if (pTrans == NULL) { @@ -868,7 +866,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)}; -// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 5ae690632c..402464100f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -217,7 +217,7 @@ void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); void tqNotifyClose(STQ*); void tqClose(STQ*); -int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); +int tqPushMsg(STQ*, tmsg_t msgType); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9299dbc9e5..873f420802 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -14,6 +14,7 @@ */ #include "tq.h" +#include "vnd.h" typedef struct { int8_t inited; @@ -680,7 +681,6 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) { SRpcMsg rsp = {.info = pMsg->info, .pCont = buf, .contLen = len, .code = 0}; tmsgSendRsp(&rsp); - return 0; } @@ -1137,14 +1137,13 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); code = tDecodeStreamTask(&decoder, pTask); + tDecoderClear(&decoder); + if (code < 0) { - tDecoderClear(&decoder); taosMemoryFree(pTask); return -1; } - tDecoderClear(&decoder); - SStreamMeta* pStreamMeta = pTq->pStreamMeta; // 2.save task, use the latest commit version as the initial start version of stream task. @@ -1166,16 +1165,22 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if // it is added into the meta store if (added) { - tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); - SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); + // only handled in the leader node + if (vnodeIsRoleLeader(pTq->pVnode)) { + tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); + SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); - bool restored = pTq->pVnode->restored; - if (p != NULL && restored) { // reset the downstreamReady flag. - streamTaskCheckDownstreamTasks(p); - } else if (!restored) { - tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); + bool restored = pTq->pVnode->restored; + if (p != NULL && restored) { + streamTaskCheckDownstreamTasks(p); + } else if (!restored) { + tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); + } + + if (p != NULL) { + streamMetaReleaseTask(pStreamMeta, p); + } } - streamMetaReleaseTask(pStreamMeta, p); } else { tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); tFreeStreamTask(pTask); @@ -1663,6 +1668,7 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } +// todo refactor. int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { STQ* pTq = pVnode->pTq; SMsgHead* msgStr = pMsg->pCont; @@ -1830,26 +1836,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; + bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SStreamTaskNodeUpdateMsg req = {0}; SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { - code = TSDB_CODE_MSG_DECODE_ERROR; - tDecoderClear(&decoder); - tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(code)); - return code; + rsp.code = TSDB_CODE_MSG_DECODE_ERROR; + tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); + goto _end; } - tDecoderClear(&decoder); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, req.taskId); // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active - return TSDB_CODE_SUCCESS; + rsp.code = TSDB_CODE_SUCCESS; + goto _end; } tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); @@ -1858,27 +1864,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pHistoryTask = NULL; if (pTask->historyTaskId.taskId != 0) { pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); - if (pHistoryTask == NULL) { + if (pHistoryTask != NULL) { + tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr); + streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList); + + streamTaskRestart(pHistoryTask, NULL, startTask); + streamMetaReleaseTask(pMeta, pHistoryTask); + } else { tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped", pMeta->vgId, pTask->historyTaskId.taskId); - streamMetaReleaseTask(pMeta, pTask); - - // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active - return TSDB_CODE_SUCCESS; } - - tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr); - streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList); } - if (pHistoryTask != NULL) { - streamTaskRestart(pHistoryTask, NULL); - streamMetaReleaseTask(pMeta, pHistoryTask); - } - - streamTaskRestart(pTask, NULL); + streamTaskRestart(pTask, NULL, startTask); streamMetaReleaseTask(pMeta, pTask); - return TSDB_CODE_SUCCESS; + int32_t code = rsp.code; + +_end: + tDecoderClear(&decoder); + tmsgSendRsp(&rsp); + return code; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index a236b98614..1ab4ef523a 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -30,7 +30,7 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { return 0; } -int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { +int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) { if (msgType == TDMT_VND_SUBMIT) { tqProcessSubmitReqForSubscribe(pTq); } @@ -39,20 +39,14 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); taosRUnLockLatch(&pTq->pStreamMeta->lock); - tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks); + tqDebug("handle submit, restore:%d, numOfTasks:%d", pTq->pVnode->restored, numOfTasks); // push data for stream processing: // 1. the vnode has already been restored. // 2. the vnode should be the leader. // 3. the stream is not suspended yet. - if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) { - if (numOfTasks == 0) { - return 0; - } - - if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) { - tqStartStreamTasks(pTq); - } + if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) { + tqStartStreamTasks(pTq); } return 0; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c1f9c7a0f9..da88ae34b8 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "vnd.h" #include "tq.h" static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); @@ -130,6 +131,11 @@ int32_t tqStartStreamTasks(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + // for follower or vnode does not restored, do not launch the stream tasks. + if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) { + return TSDB_CODE_SUCCESS; + } + taosWLockLatch(&pMeta->lock); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 91aa009632..d3753e63f0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -524,7 +524,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg walApplyVer(pVnode->pWal, ver); - if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) { + if (tqPushMsg(pVnode->pTq, pMsg->msgType) < 0) { vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a6715dbcff..adb587fdf8 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -262,9 +262,13 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) { int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId) { taosWLockLatch(&pMeta->lock); + int64_t keys[2]; for (int32_t i = 0; i < taosArrayGetSize(pMeta->pTaskList); ++i) { - uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId)); + SStreamId* pId = taosArrayGet(pMeta->pTaskList, i); + keys[0] = pId->streamId; + keys[1] = pId->taskId; + + SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); int8_t prev = p->status.taskStatus; ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7a77b98db6..4bdc38b742 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -465,7 +465,7 @@ int32_t streamTaskStop(SStreamTask* pTask) { return 0; } -int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) { +int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) { const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; @@ -485,6 +485,12 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) { pTask->status.stage += 1; streamSetStatusNormal(pTask); + + taosWLockLatch(&pTask->pMeta->lock); + streamMetaSaveTask(pTask->pMeta, pTask); + streamMetaCommit(pTask->pMeta); + taosWUnLockLatch(&pTask->pMeta->lock); + qDebug("s-task:%s reset downstream status and inc stage to be:%d, status:%s, start to check downstream", id, pTask->status.stage, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -493,36 +499,6 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) { return 0; } -// todo remove it -//int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) { -// int32_t numOfLevels = taosArrayGetSize(pTaskList); -// -// for (int32_t j = 0; j < numOfLevels; ++j) { -// SArray *pLevel = taosArrayGetP(pTaskList, j); -// -// int32_t numOfTasks = taosArrayGetSize(pLevel); -// for (int32_t k = 0; k < numOfTasks; ++k) { -// SStreamTask *pTask = taosArrayGetP(pLevel, k); -// if (pTask->info.nodeId == nodeId) { -// pTask->info.epSet = *pEpSet; -// continue; -// } -// -// // check for the dispath info and the upstream task info -// int32_t level = pTask->info.taskLevel; -// if (level == TASK_LEVEL__SOURCE) { -// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); -// } else if (level == TASK_LEVEL__AGG) { -// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); -// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); -// } else { // TASK_LEVEL__SINK -// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); -// } -// } -// } -// return 0; -//} - int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) { if (pTask->info.nodeId == nodeId) { // execution task should be moved away epsetAssign(&pTask->info.epSet, pEpSet);