diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index d884227249..2778d0481e 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -120,6 +120,14 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); +int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); + +typedef struct SStreamTaskIter SStreamTaskIter; + +SStreamTaskIter *createTaskIter(SStreamObj *pStream); +bool taskIterNextTask(SStreamTaskIter *pIter); +SStreamTask *taskIterGetCurrent(SStreamTaskIter *pIter); +void destroyTaskIter(SStreamTaskIter *pIter); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 46c7a06079..c78b80d79a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -476,22 +476,20 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { } int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) { - int32_t level = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < level; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < numOfTasks; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); - if (mndPersistTaskDeployReq(pTrans, pTask) < 0) { - return -1; - } + SStreamTaskIter *pIter = createTaskIter(pStream); + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + if (mndPersistTaskDeployReq(pTrans, pTask) < 0) { + destroyTaskIter(pIter); + return -1; } } + destroyTaskIter(pIter); + // persistent stream task for already stored ts data if (pStream->conf.fillHistory) { - level = taosArrayGetSize(pStream->pHTasksList); + int32_t level = taosArrayGetSize(pStream->pHTasksList); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i); @@ -856,6 +854,32 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int return 0; } +static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId, + int8_t mndTrigger) { + void *buf; + int32_t tlen; + if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, + pTask->id.taskId, pTrans->id, mndTrigger) < 0) { + taosMemoryFree(buf); + return -1; + } + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(buf); + return -1; + } + + code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); + if (code != 0) { + taosMemoryFree(buf); + } + + return code; +} + static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, int8_t mndTrigger, bool lock) { int32_t code = -1; @@ -865,6 +889,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre return -1; } + bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); if (conflict) { mndAddtoCheckpointWaitingList(pStream, checkpointId); @@ -887,8 +912,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre pStream->currentTick = 1; // 1. redo action: broadcast checkpoint source msg for all source vg - int32_t totLevel = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < totLevel; i++) { + int32_t totalLevel = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < totalLevel; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); SStreamTask *p = taosArrayGetP(pLevel, 0); @@ -896,28 +921,9 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int32_t sz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); + code = doSetCheckpointAction(pMnode, pTrans, pTask, checkpointId, mndTrigger); - void *buf; - int32_t tlen; - if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, - pTask->id.taskId, pTrans->id, mndTrigger) < 0) { - taosWUnLockLatch(&pStream->lock); - goto _ERR; - } - - SEpSet epset = {0}; - bool hasEpset = false; - code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { - taosMemoryFree(buf); - taosWUnLockLatch(&pStream->lock); - goto _ERR; - } - - code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, - TSDB_CODE_SYN_PROPOSE_NOT_READY); - if (code != 0) { - taosMemoryFree(buf); + if (code != TSDB_CODE_SUCCESS) { taosWUnLockLatch(&pStream->lock); goto _ERR; } @@ -1500,21 +1506,19 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } // add row for each task - for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SStreamTaskIter *pIter = createTaskIter(pStream); + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); - int32_t numOfLevels = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < numOfLevels; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); - int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); - if (code == TSDB_CODE_SUCCESS) { - numOfRows++; - } + int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); + if (code == TSDB_CODE_SUCCESS) { + numOfRows++; } } - // unlock + destroyTaskIter(pIter); taosRUnLockLatch(&pStream->lock); + sdbRelease(pSdb, pStream); } @@ -1854,22 +1858,19 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { } taosWLockLatch(&pStream->lock); - int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pStream->tasks, j); + SStreamTaskIter *pTaskIter = createTaskIter(pStream); + while (taskIterNextTask(pTaskIter)) { + SStreamTask *pTask = taskIterGetCurrent(pTaskIter); - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - - SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; - epsetAssign(&entry.epset, &pTask->info.epSet); - taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); - } + SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; + epsetAssign(&entry.epset, &pTask->info.epSet); + taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); } + destroyTaskIter(pTaskIter); taosWUnLockLatch(&pStream->lock); + sdbRelease(pSdb, pStream); } @@ -2055,58 +2056,50 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { } void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { - int32_t level = taosArrayGetSize(pStream->tasks); + SStreamTaskIter *pIter = createTaskIter(pStream); + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); - for (int32_t i = 0; i < level; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + if (p == NULL) { + STaskStatusEntry entry = {0}; + streamTaskStatusInit(&entry, pTask); - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < numOfTasks; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); - - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); - if (p == NULL) { - STaskStatusEntry entry = {0}; - streamTaskStatusInit(&entry, pTask); - - taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); - taosArrayPush(pExecNode->pTaskList, &id); - mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, - (int32_t)taosArrayGetSize(pExecNode->pTaskList)); - } + taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); + taosArrayPush(pExecNode->pTaskList, &id); + mInfo("s-task:0x%x add into task buffer, total:%d", (int32_t)entry.id.taskId, + (int32_t)taosArrayGetSize(pExecNode->pTaskList)); } } + + destroyTaskIter(pIter); } void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { - int32_t level = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < level; i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); + SStreamTaskIter *pIter = createTaskIter(pStream); + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < numOfTasks; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); + if (p != NULL) { + taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); - if (p != NULL) { - taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id)); + for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { + STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); + if (pId->taskId == id.taskId && pId->streamId == id.streamId) { + taosArrayRemove(pExecNode->pTaskList, k); - for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) { - STaskId *pId = taosArrayGet(pExecNode->pTaskList, k); - if (pId->taskId == id.taskId && pId->streamId == id.streamId) { - taosArrayRemove(pExecNode->pTaskList, k); - - int32_t num = taosArrayGetSize(pExecNode->pTaskList); - mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num); - break; - } + int32_t num = taosArrayGetSize(pExecNode->pTaskList); + mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId, num); + break; } } } } + destroyTaskIter(pIter); ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 5a6faadebb..19c4f22280 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -65,61 +65,24 @@ static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pI taosArrayPush(pList, pInfo); } -static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { +int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint"); if (pTrans == NULL) { return terrno; } /*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); - - taosWLockLatch(&pStream->lock); - int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pStream->tasks, j); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - - // todo extract method, with pause stream task - SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - taosWUnLockLatch(&pStream->lock); - return terrno; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - SEpSet epset = {0}; - bool hasEpset = false; - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { - taosMemoryFree(pReq); - continue; - } - - code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); - if (code != 0) { - taosMemoryFree(pReq); - taosWUnLockLatch(&pStream->lock); - mndTransDrop(pTrans); - return terrno; - } - } + int32_t code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream); + if (code != 0) { + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return code; } - taosWUnLockLatch(&pStream->lock); - - int32_t code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); + code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); return -1; } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 0a7397827e..f8497e14f7 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -301,86 +301,4 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { mDebug("complete clear checkpoints in Dbs"); } -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, - int32_t transId) { - pMsg->streamId = pId->streamId; - pMsg->taskId = pId->taskId; - pMsg->transId = transId; - pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); - taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); -} -static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, - SStreamTaskId *pId, int32_t transId) { - SStreamTaskNodeUpdateMsg req = {0}; - initNodeUpdateMsg(&req, pInfo, pId, transId); - - int32_t code = 0; - int32_t blen; - - tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(req.pNodeList); - return -1; - } - - int32_t tlen = sizeof(SMsgHead) + blen; - - void *buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(req.pNodeList); - return -1; - } - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - tEncodeStreamTaskUpdateMsg(&encoder, &req); - - SMsgHead *pMsgHead = (SMsgHead *)buf; - pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(nodeId); - - tEncoderClear(&encoder); - - *pBuf = buf; - *pLen = tlen; - - taosArrayDestroy(req.pNodeList); - return TSDB_CODE_SUCCESS; -} - -// todo extract method: traverse stream tasks -// build trans to update the epset -int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { - mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); - - taosWLockLatch(&pStream->lock); - int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pStream->tasks, j); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - - void *pBuf = NULL; - int32_t len = 0; - streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); - - int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(pBuf); - taosWUnLockLatch(&pStream->lock); - return -1; - } - } - } - - taosWUnLockLatch(&pStream->lock); - return 0; -} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 5d6e34856b..c28ba43f65 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -18,13 +18,13 @@ #include "tmisce.h" #include "mndVgroup.h" -typedef struct SStreamTaskIter { +struct SStreamTaskIter { SStreamObj *pStream; int32_t level; int32_t ordinalIndex; int32_t totalLevel; SStreamTask *pTask; -} SStreamTaskIter; +}; SStreamTaskIter* createTaskIter(SStreamObj* pStream) { SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); @@ -235,18 +235,16 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT } SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { - for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) { - SArray *pLevel = taosArrayGetP(pStream->tasks, i); - - int32_t numOfLevels = taosArrayGetSize(pLevel); - for (int32_t j = 0; j < numOfLevels; j++) { - SStreamTask *pTask = taosArrayGetP(pLevel, j); - if (pTask->id.taskId == pId->taskId) { - return pTask; - } + SStreamTaskIter *pIter = createTaskIter(pStream); + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + if (pTask->id.taskId == pId->taskId) { + destroyTaskIter(pIter); + return pTask; } } + destroyTaskIter(pIter); return NULL; } @@ -261,21 +259,20 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { } int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { - int32_t size = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < size; i++) { - SArray *pTasks = taosArrayGetP(pStream->tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) { - return -1; - } + SStreamTaskIter *pIter = createTaskIter(pStream); - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup); - } + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) { + destroyTaskIter(pIter); + return -1; + } + + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) { + atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup); } } + destroyTaskIter(pIter); return 0; } @@ -410,4 +407,139 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* p doSetDropActionFromId(pMnode, pTrans, pTask); } return 0; -} \ No newline at end of file +} + +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, + int32_t transId) { + pMsg->streamId = pId->streamId; + pMsg->taskId = pId->taskId; + pMsg->transId = transId; + pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); + taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); +} + +static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, + SStreamTaskId *pId, int32_t transId) { + SStreamTaskNodeUpdateMsg req = {0}; + initNodeUpdateMsg(&req, pInfo, pId, transId); + + int32_t code = 0; + int32_t blen; + + tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(req.pNodeList); + return -1; + } + + int32_t tlen = sizeof(SMsgHead) + blen; + + void *buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(req.pNodeList); + return -1; + } + + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + tEncodeStreamTaskUpdateMsg(&encoder, &req); + + SMsgHead *pMsgHead = (SMsgHead *)buf; + pMsgHead->contLen = htonl(tlen); + pMsgHead->vgId = htonl(nodeId); + + tEncoderClear(&encoder); + + *pBuf = buf; + *pLen = tlen; + + taosArrayDestroy(req.pNodeList); + return TSDB_CODE_SUCCESS; +} + +static int32_t doSetUpdateTaskAction(STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { + void *pBuf = NULL; + int32_t len = 0; + streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); + + doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); + + int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pBuf); + } + + return code; +} + +// build trans to update the epset +int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { + mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); + taosWLockLatch(&pStream->lock); + + SStreamTaskIter *pIter = createTaskIter(pStream); + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo); + if (code != TSDB_CODE_SUCCESS) { + destroyTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + + destroyTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return 0; +} + +static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { + SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return terrno; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(pReq); + return code; + } + + code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pReq); + } + + return code; +} + +int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + taosWLockLatch(&pStream->lock); + + SStreamTaskIter *pIter = createTaskIter(pStream); + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + int32_t code = doSetResetAction(pMnode, pTrans, pTask); + if (code != TSDB_CODE_SUCCESS) { + destroyTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + + destroyTaskIter(pIter); + taosWUnLockLatch(&pStream->lock); + return 0; +}