diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index c99daa7250..245596ed1c 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -265,6 +265,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 23d21b24a3..5029f0aec4 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -202,6 +202,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index f0cfd6d86a..e48ac2ca20 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -76,6 +76,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0e3dcd27df..4dd2226d15 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -745,6 +745,7 @@ SArray *vmGetMsgHandles() { // if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 05adc17d64..47898caf27 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -33,6 +33,8 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStopInvolvedStreamTasks(SMnode *pMnode, int32_t vgId, STrans *pTrans); + // for sma // TODO refactor int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 387e9b5c1c..8253e6caeb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -91,6 +91,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_STOP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); @@ -1819,6 +1820,7 @@ void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_ pAction->msgType = msgType; } +// todo extract method: traverse stream tasks // build trans to update the epset static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo* pInfo) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-task-update"); @@ -2088,6 +2090,76 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } + + +int32_t mndBuildUpdateTaskStatusTrans(SStreamObj* pStream, STrans* pTrans) { + pStream->status = STREAM_STATUS__STOP; + + int32_t size = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < size; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfTasks; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + + SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); + if (pReq == NULL) { + mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + STransAction action = {0}; + initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_STOP, &pTask->info.epSet); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__STOP) { + atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); + } + } + } + return 0; +} + +int32_t mndStopInvolvedStreamTasks(SMnode *pMnode, int32_t vgId, STrans *pTrans) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId); + + const char *p = strdup(pVgroup->dbName); + mndReleaseVgroup(pMnode, pVgroup); + + SStreamObj *pStream = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + if (strcmp(pStream->targetDb, p) == 0 || strcmp(pStream->sourceDb, p) == 0) { + int32_t code = mndBuildUpdateTaskStatusTrans(pStream, pTrans); + // mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid); + if (code != TSDB_CODE_SUCCESS) { + // todo + } + } + + mndReleaseStream(pMnode, pStream); + } + + return 0; +} + // todo: this process should be executed by the write queue worker of the mnode //int32_t mndProcessStreamHb(SRpcMsg *pReq) { // SMnode *pMnode = pReq->info.node; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index f3dded9c76..05325dcce5 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1680,6 +1680,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, pNew1->memUsed += vgMem; } +// mndStopInvolvedStreamTasks(pMnode, pVgroup->vgId, pTrans); + if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER; if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index eb0dfd30ba..c33492bfa1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1031,7 +1031,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 - " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", + " child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); @@ -1889,3 +1889,47 @@ _end: tmsgSendRsp(&rsp); return code; } + +int32_t tqProcessTaskStopReq(STQ* pTq, SRpcMsg* pMsg) { + int32_t vgId = TD_VID(pTq->pVnode); + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + + SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; + + SStreamMeta* pMeta = pTq->pStreamMeta; + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d process stop req, failed to acquire task:0x%x, it may have been dropped already", vgId, + pReq->taskId); + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s receive stop msg from mnode", pTask->id.idStr); + streamTaskStop(pTask); + + SStreamTask* pHistoryTask = NULL; + if (pTask->historyTaskId.taskId != 0) { + pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId); + if (pHistoryTask == NULL) { + tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", + pMeta->vgId, pTask->historyTaskId.taskId); + streamMetaReleaseTask(pMeta, pTask); + + // since task is in [STOP|DROPPING] state, it is safe to assume the pause is active + return TSDB_CODE_SUCCESS; + } + + tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr); + + streamTaskStop(pHistoryTask); + streamMetaReleaseTask(pMeta, pHistoryTask); + } + + streamMetaReleaseTask(pMeta, pTask); + +// tDecoderClear(&decoder); + tmsgSendRsp(&rsp); + return 0; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index fb5348a396..fb3d21454b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,7 +20,6 @@ #define MIN_STREAM_EXEC_BATCH_NUM 4 #define MAX_STREAM_RESULT_DUMP_THRESHOLD 100 -static int32_t updateCheckPointInfo(SStreamTask* pTask); static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamStatus* pStatus) {