diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 47898caf27..19fd2a3fd4 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -33,7 +33,6 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndStopInvolvedStreamTasks(SMnode *pMnode, int32_t vgId, STrans *pTrans); // for sma // TODO refactor diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8253e6caeb..772db6c6b0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2090,76 +2090,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - - -int32_t mndBuildUpdateTaskStatusTrans(SStreamObj* pStream, STrans* pTrans) { - pStream->status = STREAM_STATUS__STOP; - - int32_t size = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < size; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < numOfTasks; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); - - SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); - if (pReq == NULL) { - mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_STOP, &pTask->info.epSet); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - - if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__STOP) { - atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP); - } - } - } - return 0; -} - -int32_t mndStopInvolvedStreamTasks(SMnode *pMnode, int32_t vgId, STrans *pTrans) { - SSdb *pSdb = pMnode->pSdb; - SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId); - - const char *p = strdup(pVgroup->dbName); - mndReleaseVgroup(pMnode, pVgroup); - - SStreamObj *pStream = NULL; - void *pIter = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; - } - - if (strcmp(pStream->targetDb, p) == 0 || strcmp(pStream->sourceDb, p) == 0) { - int32_t code = mndBuildUpdateTaskStatusTrans(pStream, pTrans); - // mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid); - if (code != TSDB_CODE_SUCCESS) { - // todo - } - } - - mndReleaseStream(pMnode, pStream); - } - - return 0; -} - // todo: this process should be executed by the write queue worker of the mnode //int32_t mndProcessStreamHb(SRpcMsg *pReq) { // SMnode *pMnode = pReq->info.node; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 05325dcce5..f3dded9c76 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1680,8 +1680,6 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, pNew1->memUsed += vgMem; } -// mndStopInvolvedStreamTasks(pMnode, pVgroup->vgId, pTrans); - if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER; if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER; }