diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1f93169218..aa215a44de 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -83,9 +83,10 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name); +static STrans *doCreateTrans1(SMnode *pMnode, const char *name, const char* pDbName); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); -static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo); +static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode); static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode); @@ -1962,13 +1963,10 @@ void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_ // todo extract method: traverse stream tasks // build trans to update the epset -static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgroupChangeInfo *pInfo) { - STrans* pTrans = doCreateTrans(pMnode, pStream, "stream-task-update"); - if (pTrans == NULL) { - return terrno; - } +static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans* pTrans) { + mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); - taosWLockLatch(&pStream->lock); + taosWLockLatch(&pStream->lock); int32_t numOfLevels = taosArrayGetSize(pStream->tasks); for (int32_t j = 0; j < numOfLevels; ++j) { @@ -1988,31 +1986,13 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pBuf); taosWUnLockLatch(&pStream->lock); - mndTransDrop(pTrans); return -1; } } } taosWUnLockLatch(&pStream->lock); - - int32_t code = mndPersistTransLog(pStream, pTrans); - if (code != TSDB_CODE_SUCCESS) { - sdbRelease(pMnode->pSdb, pStream); - return -1; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - - return TSDB_CODE_ACTION_IN_PROGRESS; + return 0; } static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { @@ -2102,28 +2082,63 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange // check all streams that involved this vnode should update the epset info SStreamObj *pStream = NULL; void *pIter = NULL; + + STrans *pTrans = NULL; + while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { break; } + if (pTrans == NULL) { + pTrans = doCreateTrans(pMnode, pStream, "stream-task-update"); + if (pTrans == NULL) { + sdbRelease(pSdb, pStream); + sdbCancelFetch(pSdb, pIter); + return terrno; + } + } + void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); if (p == NULL && p1 == NULL) { - mDebug("stream:0x%"PRIx64" %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); - mndReleaseStream(pMnode, pStream); + mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); + sdbRelease(pSdb, pStream); continue; } mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name); - int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); + int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans); + + // todo: not continue, drop all and retry again + if (code != TSDB_CODE_SUCCESS) { + mError("stream:0x%" PRIx64 " build nodeUpdate trans failed, ignore and continue, code:%s", pStream->uid, + tstrerror(code)); + sdbRelease(pSdb, pStream); + continue; + } + + code = mndPersistTransLog(pStream, pTrans); + sdbRelease(pSdb, pStream); + if (code != TSDB_CODE_SUCCESS) { sdbCancelFetch(pSdb, pIter); - return code; + return -1; } } + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + // return TSDB_CODE_ACTION_IN_PROGRESS; + return 0; } @@ -2400,7 +2415,7 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -static STrans* doCreateTrans(SMnode* pMnode, SStreamObj* pStream, const char* name) { +STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, name); if (pTrans == NULL) { mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 37af1ce64f..57103e5a96 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -612,7 +612,7 @@ int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { p->latestUpdateTs = taosGetTimestampMs(); p->updateCount += 1; - stDebug("s-task:%s update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.idStr, + stDebug("s-task:0x%x update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.taskId, numOfNodes, p->updateCount, prevTs); for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {