fix(stream): send msg by using epset extract from mnode, instead of cached epset in streamObj.

This commit is contained in:
Haojun Liao 2023-11-14 15:48:00 +08:00
parent cfa7c19dd5
commit b390ee859c
1 changed files with 35 additions and 17 deletions

View File

@ -678,7 +678,7 @@ _OVER:
return -1;
}
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
static int32_t mndPersistTaskDropReq(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -690,7 +690,12 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
pReq->streamId = pTask->id.streamId;
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &pTask->info.epSet, 0);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
@ -706,7 +711,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPersistTaskDropReq(pTrans, pTask) < 0) {
if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) {
return -1;
}
}
@ -1085,9 +1090,10 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
STransAction action = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY);
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
@ -1283,12 +1289,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (pStream == NULL) {
if (dropReq.igNotExists) {
mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name);
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
return 0;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
mError("stream:%s not exist failed to drop", dropReq.name);
tFreeSMDropStreamReq(&dropReq);
return -1;
}
@ -1660,7 +1667,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
static int32_t mndPauseStreamTask(SMnode* pMnode, STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
@ -1673,8 +1680,12 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &pTask->info.epSet, 0);
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
@ -1682,7 +1693,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t mndPauseAllStreamTasks(SMnode* pMnode, STrans *pTrans, SStreamObj *pStream) {
SArray *tasks = pStream->tasks;
int32_t size = taosArrayGetSize(tasks);
@ -1691,7 +1702,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPauseStreamTask(pTrans, pTask) < 0) {
if (mndPauseStreamTask(pMnode, pTrans, pTask) < 0) {
return -1;
}
@ -1768,7 +1779,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
// pause all tasks
if (mndPauseAllStreamTasks(pTrans, pStream) < 0) {
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1795,7 +1806,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) {
static int32_t mndResumeStreamTask(STrans *pTrans, SMnode* pMnode, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1806,8 +1817,12 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &pTask->info.epSet, 0);
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
@ -1815,14 +1830,14 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
return 0;
}
int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUntreated) {
int32_t mndResumeAllStreamTasks(STrans *pTrans, SMnode* pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
if (mndResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
return -1;
}
@ -1831,7 +1846,6 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
}
}
}
// pStream->pHTasksList is null
return 0;
}
@ -1884,7 +1898,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
// resume all tasks
if (mndResumeAllStreamTasks(pTrans, pStream, pauseReq.igUntreated) < 0) {
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -2534,8 +2548,12 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &pTask->info.epSet, 0);
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
taosWUnLockLatch(&pStream->lock);