From 11c9c7d93634341bba5d8c002ec79a696371d97e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Jan 2024 09:17:02 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/mnode/impl/inc/mndStream.h | 80 ++++++++++---------- source/dnode/mnode/impl/src/mndStream.c | 42 +++++----- source/dnode/mnode/impl/src/mndStreamTrans.c | 16 ++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 68 ++++++++--------- 4 files changed, 101 insertions(+), 105 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 2778d0481e..372612274f 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -26,9 +26,17 @@ extern "C" { #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_VER_NUMBER 4 +#define MND_STREAM_CREATE_NAME "stream-create" +#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" +#define MND_STREAM_PAUSE_NAME "stream-pause" +#define MND_STREAM_RESUME_NAME "stream-resume" +#define MND_STREAM_DROP_NAME "stream-drop" +#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" +#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" + typedef struct SStreamTransInfo { int64_t startTime; - int64_t streamUid; + int64_t streamId; const char *name; int32_t transId; } SStreamTransInfo; @@ -41,7 +49,7 @@ typedef struct SVgroupChangeInfo { // time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard // to avoid too many checkpoints for a taskk in the waiting list typedef struct SCheckpointCandEntry { - char * pName; + char *pName; int64_t streamId; int64_t checkpointTs; int64_t checkpointId; @@ -62,6 +70,9 @@ typedef struct SStreamExecInfo { SHashObj *pTransferStateStreams; } SStreamExecInfo; +extern SStreamExecInfo execInfo; +typedef struct SStreamTaskIter SStreamTaskIter; + typedef struct SNodeEntry { int32_t nodeId; bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. @@ -69,15 +80,11 @@ typedef struct SNodeEntry { int64_t hbTimestamp; // second } SNodeEntry; -#define MND_STREAM_CREATE_NAME "stream-create" -#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" -#define MND_STREAM_PAUSE_NAME "stream-pause" -#define MND_STREAM_RESUME_NAME "stream-resume" -#define MND_STREAM_DROP_NAME "stream-drop" -#define MND_STREAM_TASK_RESET_NAME "stream-task-reset" -#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update" - -extern SStreamExecInfo execInfo; +typedef struct SOrphanTask { + int64_t streamId; + int32_t taskId; + int32_t nodeId; +} SOrphanTask; int32_t mndInitStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode); @@ -85,49 +92,38 @@ SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndPersistStream(STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pTransName, int64_t streamId); +int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); +bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock); +int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId); -int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid); -int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId); -bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock); -int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid); - -typedef struct SOrphanTask { - int64_t streamId; - int32_t taskId; - int32_t nodeId; -} SOrphanTask; - -// for sma -// TODO refactor -int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); -int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); -SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); -void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); -int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode); -STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); -int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); -SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); -void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); -int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); +int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); +int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); +SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); +void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); +int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode); +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); +int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); +SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); +void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t initStreamNodeList(SMnode *pMnode); -int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); +int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated); int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -typedef struct SStreamTaskIter SStreamTaskIter; - -SStreamTaskIter *createTaskIter(SStreamObj *pStream); -bool taskIterNextTask(SStreamTaskIter *pIter); -SStreamTask *taskIterGetCurrent(SStreamTaskIter *pIter); -void destroyTaskIter(SStreamTaskIter *pIter); +SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream); +void destroyStreamTaskIter(SStreamTaskIter *pIter); +bool streamTaskIterNextTask(SStreamTaskIter *pIter); +SStreamTask *streamTaskIterGetCurrent(SStreamTaskIter *pIter); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c78b80d79a..48281fc010 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -476,16 +476,16 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { } int32_t mndPersistStreamTasks(STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = createTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); if (mndPersistTaskDeployReq(pTrans, pTask) < 0) { - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return -1; } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); // persistent stream task for already stored ts data if (pStream->conf.fillHistory) { @@ -1506,9 +1506,9 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } // add row for each task - SStreamTaskIter *pIter = createTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); int32_t code = setTaskAttrInResBlock(pStream, pTask, pBlock, numOfRows); if (code == TSDB_CODE_SUCCESS) { @@ -1516,7 +1516,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); taosRUnLockLatch(&pStream->lock); sdbRelease(pSdb, pStream); @@ -1859,16 +1859,16 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) { taosWLockLatch(&pStream->lock); - SStreamTaskIter *pTaskIter = createTaskIter(pStream); - while (taskIterNextTask(pTaskIter)) { - SStreamTask *pTask = taskIterGetCurrent(pTaskIter); + SStreamTaskIter *pTaskIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pTaskIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pTaskIter); SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; epsetAssign(&entry.epset, &pTask->info.epSet); taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); } - destroyTaskIter(pTaskIter); + destroyStreamTaskIter(pTaskIter); taosWUnLockLatch(&pStream->lock); sdbRelease(pSdb, pStream); @@ -2056,9 +2056,9 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { } void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { - SStreamTaskIter *pIter = createTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); @@ -2073,13 +2073,13 @@ void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); } void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { - SStreamTaskIter *pIter = createTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); @@ -2099,7 +2099,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index f8497e14f7..5bfd3933b5 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -23,10 +23,10 @@ typedef struct SKeyInfo { static int32_t clearFinishedTrans(SMnode* pMnode); -int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid) { +int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamId) { SStreamTransInfo info = { - .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamUid = streamUid}; - taosHashPut(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid), &info, sizeof(SStreamTransInfo)); + .transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamId = streamId}; + taosHashPut(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId), &info, sizeof(SStreamTransInfo)); return 0; } @@ -65,7 +65,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) { return 0; } -bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* pTransName, bool lock) { +bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* pTransName, bool lock) { if (lock) { taosThreadMutexLock(&execInfo.lock); } @@ -80,7 +80,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* clearFinishedTrans(pMnode); - SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid)); + SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); if (pEntry != NULL) { SStreamTransInfo tInfo = *pEntry; @@ -90,7 +90,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) { - mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, + mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); terrno = TSDB_CODE_MND_TRANS_CONFLICT; return true; @@ -99,13 +99,13 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char* } } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) { - mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, + mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); terrno = TSDB_CODE_MND_TRANS_CONFLICT; return true; } } else { - mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamUid); + mDebug("stream:0x%"PRIx64" no conflict trans existed, continue create trans", streamId); } if (lock) { diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index c28ba43f65..235c604b27 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -26,7 +26,7 @@ struct SStreamTaskIter { SStreamTask *pTask; }; -SStreamTaskIter* createTaskIter(SStreamObj* pStream) { +SStreamTaskIter* createStreamTaskIter(SStreamObj* pStream) { SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); if (pIter == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -42,7 +42,7 @@ SStreamTaskIter* createTaskIter(SStreamObj* pStream) { return pIter; } -bool taskIterNextTask(SStreamTaskIter* pIter) { +bool streamTaskIterNextTask(SStreamTaskIter* pIter) { if (pIter->level >= pIter->totalLevel) { pIter->pTask = NULL; return false; @@ -70,11 +70,11 @@ bool taskIterNextTask(SStreamTaskIter* pIter) { return false; } -SStreamTask* taskIterGetCurrent(SStreamTaskIter* pIter) { +SStreamTask* streamTaskIterGetCurrent(SStreamTaskIter* pIter) { return pIter->pTask; } -void destroyTaskIter(SStreamTaskIter* pIter) { +void destroyStreamTaskIter(SStreamTaskIter* pIter) { taosMemoryFree(pIter); } @@ -235,16 +235,16 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT } SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { - SStreamTaskIter *pIter = createTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); if (pTask->id.taskId == pId->taskId) { - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return pTask; } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return NULL; } @@ -259,12 +259,12 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { } int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { - SStreamTaskIter *pIter = createTaskIter(pStream); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) { - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return -1; } @@ -272,7 +272,7 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt atomic_store_8(&pTask->status.taskStatus, pTask->status.statusBackup); } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return 0; } @@ -308,12 +308,12 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa } int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = createTaskIter(pStream); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); if (doSetPauseAction(pMnode, pTrans, pTask) < 0) { - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return -1; } @@ -323,7 +323,7 @@ int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return 0; } @@ -357,16 +357,16 @@ static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTas } int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SStreamTaskIter *pIter = createTaskIter(pStream); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); - while(taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + while(streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); if (doSetDropAction(pMnode, pTrans, pTask) < 0) { - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return -1; } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); return 0; } @@ -480,18 +480,18 @@ int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pI mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); taosWLockLatch(&pStream->lock); - SStreamTaskIter *pIter = createTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); int32_t code = doSetUpdateTaskAction(pTrans, pTask, pInfo); if (code != TSDB_CODE_SUCCESS) { - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); return -1; } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); return 0; } @@ -528,18 +528,18 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { taosWLockLatch(&pStream->lock); - SStreamTaskIter *pIter = createTaskIter(pStream); - while (taskIterNextTask(pIter)) { - SStreamTask *pTask = taskIterGetCurrent(pIter); + SStreamTaskIter *pIter = createStreamTaskIter(pStream); + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = streamTaskIterGetCurrent(pIter); int32_t code = doSetResetAction(pMnode, pTrans, pTask); if (code != TSDB_CODE_SUCCESS) { - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); return -1; } } - destroyTaskIter(pIter); + destroyStreamTaskIter(pIter); taosWUnLockLatch(&pStream->lock); return 0; }