Merge pull request #23685 from taosdata/fix/drop_stream
fix(stream): send msg by using epset extract from mnode
This commit is contained in:
commit
94abc0de38
|
@ -678,7 +678,7 @@ _OVER:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||
static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -690,7 +690,12 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet, 0);
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
||||
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
@ -706,7 +711,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
|
|||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndPersistTaskDropReq(pTrans, pTask) < 0) {
|
||||
if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -1085,9 +1090,10 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
|
|||
|
||||
STransAction action = {0};
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
|
||||
TSDB_CODE_SYN_PROPOSE_NOT_READY);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(buf);
|
||||
|
@ -1283,12 +1289,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
|
||||
if (pStream == NULL) {
|
||||
if (dropReq.igNotExists) {
|
||||
mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name);
|
||||
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
tFreeSMDropStreamReq(&dropReq);
|
||||
return 0;
|
||||
} else {
|
||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||
mError("stream:%s not exist failed to drop", dropReq.name);
|
||||
tFreeSMDropStreamReq(&dropReq);
|
||||
return -1;
|
||||
}
|
||||
|
@ -1660,7 +1667,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
|
||||
static int32_t mndPauseStreamTask(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
|
||||
|
@ -1673,8 +1680,12 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
|
|||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet, 0);
|
||||
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
@ -1682,7 +1693,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t mndPauseAllStreamTasks(SMnode* pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
SArray *tasks = pStream->tasks;
|
||||
|
||||
int32_t size = taosArrayGetSize(tasks);
|
||||
|
@ -1691,7 +1702,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
|
|||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndPauseStreamTask(pTrans, pTask) < 0) {
|
||||
if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1768,7 +1779,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// pause all tasks
|
||||
if (mndPauseAllStreamTasks(pTrans, pStream) < 0) {
|
||||
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -1795,7 +1806,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) {
|
||||
static int32_t mndResumeStreamTask(STrans *pTrans, SMnode* pMnode, SStreamTask *pTask, int8_t igUntreated) {
|
||||
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1806,8 +1817,12 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
|
|||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->igUntreated = igUntreated;
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet, 0);
|
||||
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
@ -1815,14 +1830,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) {
|
||||
int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode* pMnode, SStreamObj *pStream, int8_t igUntreated) {
|
||||
int32_t size = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||
int32_t sz = taosArrayGetSize(pTasks);
|
||||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pTasks, j);
|
||||
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
|
||||
if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1831,7 +1846,6 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
|
|||
}
|
||||
}
|
||||
}
|
||||
// pStream->pHTasksList is null
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1884,7 +1898,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
// resume all tasks
|
||||
if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) {
|
||||
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -2534,8 +2548,12 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
|||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet, 0);
|
||||
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
|
Loading…
Reference in New Issue