diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 96199d0af5..20777e12e0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -678,7 +678,7 @@ _OVER: return -1; } -static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { +static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -690,7 +690,12 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { pReq->streamId = pTask->id.streamId; STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet, 0); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + + // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. + initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -706,7 +711,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPersistTaskDropReq(pTrans, pTask) < 0) { + if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) { return -1; } } @@ -1105,9 +1110,10 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream STransAction action = {0}; SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); - mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(buf); @@ -1303,12 +1309,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (dropReq.igNotExists) { - mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name); + mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); sdbRelease(pMnode->pSdb, pStream); tFreeSMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + mError("stream:%s not exist failed to drop", dropReq.name); tFreeSMDropStreamReq(&dropReq); return -1; } @@ -1688,7 +1695,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { +static int32_t mndPauseStreamTask(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) { SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), @@ -1701,8 +1708,12 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1710,7 +1721,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { return 0; } -int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { +int32_t mndPauseAllStreamTasks(SMnode* pMnode, STrans *pTrans, SStreamObj *pStream) { SArray *tasks = pStream->tasks; int32_t size = taosArrayGetSize(tasks); @@ -1719,7 +1730,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPauseStreamTask(pTrans, pTask) < 0) { + if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) { return -1; } @@ -1805,7 +1816,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb); // pause all tasks - if (mndPauseAllStreamTasks(pTrans, pStream) < 0) { + if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1832,7 +1843,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { +static int32_t mndResumeStreamTask(STrans *pTrans, SMnode* pMnode, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1843,8 +1854,12 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig pReq->streamId = pTask->id.streamId; pReq->igUntreated = igUntreated; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1852,14 +1867,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig return 0; } -int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) { +int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode* pMnode, SStreamObj *pStream, int8_t igUntreated) { int32_t size = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) { return -1; } @@ -1868,7 +1883,6 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn } } } - // pStream->pHTasksList is null return 0; } @@ -1931,7 +1945,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb); // resume all tasks - if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) { + if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2591,8 +2605,12 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet, 0); + initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); taosWUnLockLatch(&pStream->lock);