From d230082a0804b619783e231546eb7f2af3b380de Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 10 Dec 2024 22:52:20 +0800 Subject: [PATCH] refactor(stream): check mnode when issue the nodeEp update trans. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 160 ++++++++++++++------ 1 file changed, 111 insertions(+), 49 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index bb666eb6dd..00b36977b6 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -108,30 +108,94 @@ static bool checkStatusForEachReplica(SVgObj *pVgroup) { return true; } -int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { +static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) { + SSnodeObj *pObj = NULL; + void *pIter = NULL; + int32_t code = 0; + + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + SNodeEntry entry = {.nodeId = SNODE_HANDLE}; + code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); + if (code) { + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn); + return code; + } + + char buf[256] = {0}; + code = epsetToStr(&entry.epset, buf, tListLen(buf)); + if (code != 0) { // print error and continue + mError("failed to convert epset to str, code:%s", tstrerror(code)); + } + + void *p = taosArrayPush(pVgroupList, &entry); + if (p == NULL) { + code = terrno; + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code)); + return code; + } else { + mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); + } + + sdbRelease(pMnode->pSdb, pObj); + } + + return code; +} + +static int32_t mndCheckMnodeStatus(SMnode* pMnode) { + int32_t code = 0; + ESdbStatus objStatus; + void *pIter = NULL; + SMnodeObj *pObj = NULL; + + while (1) { + pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true); + if (pIter == NULL) { + break; + } + + if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) { + mDebug("mnode sync state:%d not leader/follower", pObj->syncState); + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + return TSDB_CODE_FAILED; + } + + if (objStatus != SDB_STATUS_READY) { + mWarn("mnode status:%d not ready", objStatus); + sdbRelease(pMnode->pSdb, pObj); + sdbCancelFetch(pMnode->pSdb, pIter); + return TSDB_CODE_FAILED; + } + + sdbRelease(pMnode->pSdb, pObj); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; int32_t code = 0; - SArray *pVgroupList = NULL; SHashObj *pHash = NULL; - pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); - if (pVgroupList == NULL) { - mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno)); - code = terrno; - goto _err; - } - pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); if (pHash == NULL) { mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno)); - code = terrno; - goto _err; + return terrno; } - *allReady = true; - while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) { @@ -148,7 +212,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code)); sdbRelease(pSdb, pVgroup); sdbCancelFetch(pSdb, pIter); - goto _err; // take snapshot failed, and not all ready + goto _end; // take snapshot failed, and not all ready } } else { if (*pReplica != pVgroup->replica) { @@ -158,7 +222,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { } } - // if not all ready till now, no need to check the remaining vgroups. + // if not all ready till now, no need to check the remaining vgroups, // but still we need to put the info of the existed vgroups into the snapshot list if (*allReady) { *allReady = checkStatusForEachReplica(pVgroup); @@ -176,7 +240,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { code = terrno; sdbRelease(pSdb, pVgroup); sdbCancelFetch(pSdb, pIter); - goto _err; + goto _end; } else { mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); } @@ -184,51 +248,49 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { sdbRelease(pSdb, pVgroup); } - SSnodeObj *pObj = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); - if (pIter == NULL) { - break; - } +_end: + taosHashCleanup(pHash); + return code; +} - SNodeEntry entry = {.nodeId = SNODE_HANDLE}; - code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); - if (code) { - sdbRelease(pSdb, pObj); - sdbCancelFetch(pSdb, pIter); - mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn); - goto _err; - } +int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { + int32_t code = 0; + SArray *pVgroupList = NULL; - char buf[256] = {0}; - code = epsetToStr(&entry.epset, buf, tListLen(buf)); - if (code != 0) { // print error and continue - mError("failed to convert epset to str, code:%s", tstrerror(code)); - } + *pList = NULL; + *allReady = true; - void *p = taosArrayPush(pVgroupList, &entry); - if (p == NULL) { - code = terrno; - sdbRelease(pSdb, pObj); - sdbCancelFetch(pSdb, pIter); - mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code)); - goto _err; - } else { - mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); - } + pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); + if (pVgroupList == NULL) { + mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno)); + code = terrno; + goto _err; + } - sdbRelease(pSdb, pObj); + // 1. check for all vnodes status + code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady); + if (code) { + goto _err; + } + + // 2. add snode info + code = mndAddSnodeInfo(pMnode, pVgroupList); + if (code) { + goto _err; + } + + // 3. check for mnode status + code = mndCheckMnodeStatus(pMnode); + if (code != TSDB_CODE_SUCCESS) { + *allReady = false; } *pList = pVgroupList; - taosHashCleanup(pHash); return code; _err: *allReady = false; taosArrayDestroy(pVgroupList); - taosHashCleanup(pHash); - return code; }