diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index c375b46627..b5a612f058 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -88,18 +88,48 @@ void destroyStreamTaskIter(SStreamTaskIter* pIter) { taosMemoryFree(pIter); } -int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - SVgObj *pVgroup = NULL; - int32_t replica = -1; // do the replica check - int32_t code = 0; +static bool checkStatusForEachReplica(SVgObj *pVgroup) { + for (int32_t i = 0; i < pVgroup->replica; ++i) { + if (!pVgroup->vnodeGid[i].syncRestore) { + mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId); + return false; + } + + ESyncState state = pVgroup->vnodeGid[i].syncState; + if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER || + state == TAOS_SYNC_STATE_CANDIDATE) { + mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId, + state); + return false; + } + } + + return true; +} + +int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = 0; + SArray *pVgroupList = NULL; + SHashObj *pHash = NULL; + + pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); + if (pVgroupList == NULL) { + mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno)); + code = terrno; + goto _err; + } + + pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (pHash == NULL) { + mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno)); + code = terrno; + goto _err; + } *allReady = true; - SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry)); - if (pVgroupList == NULL) { - return terrno; - } while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -110,44 +140,37 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; entry.epset = mndGetVgroupEpset(pMnode, pVgroup); - if (replica == -1) { - replica = pVgroup->replica; - } else { - if (replica != pVgroup->replica) { - mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations", - pVgroup->vgId, pVgroup->replica, replica); - *allReady = false; + int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid)); + if (pReplica == NULL) { // not exist, add it into hash map + code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica)); + if (code) { + mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code)); sdbRelease(pSdb, pVgroup); - break; + goto _err; // take snapshot failed, and not all ready + } + } else { + if (*pReplica != pVgroup->replica) { + mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations", + pVgroup->vgId, pVgroup->replica, *pReplica); + *allReady = false; // task snap success, but not all ready } } // if not all ready till now, no need to check the remaining vgroups. + // but still we need to put the info of the existed vgroups into the snapshot list if (*allReady) { - for (int32_t i = 0; i < pVgroup->replica; ++i) { - if (!pVgroup->vnodeGid[i].syncRestore) { - mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId); - *allReady = false; - break; - } - - ESyncState state = pVgroup->vnodeGid[i].syncState; - if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER || - state == TAOS_SYNC_STATE_CANDIDATE) { - mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", - pVgroup->vgId, state); - *allReady = false; - break; - } - } + *allReady = checkStatusForEachReplica(pVgroup); } char buf[256] = {0}; - (void) epsetToStr(&entry.epset, buf, tListLen(buf)); + (void)epsetToStr(&entry.epset, buf, tListLen(buf)); - void* p = taosArrayPush(pVgroupList, &entry); + void *p = taosArrayPush(pVgroupList, &entry); if (p == NULL) { mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); + code = terrno; + sdbRelease(pSdb, pVgroup); + goto _err; } else { mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); } @@ -166,15 +189,19 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); if (code) { sdbRelease(pSdb, pObj); - continue; + mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn); + goto _err; } char buf[256] = {0}; - (void) epsetToStr(&entry.epset, buf, tListLen(buf)); + (void)epsetToStr(&entry.epset, buf, tListLen(buf)); - void* p = taosArrayPush(pVgroupList, &entry); + void *p = taosArrayPush(pVgroupList, &entry); if (p == NULL) { - mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); + code = terrno; + sdbRelease(pSdb, pObj); + mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code)); + goto _err; } else { mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); } @@ -184,6 +211,13 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { *pList = pVgroupList; return code; + +_err: + *allReady = false; + taosArrayDestroy(pVgroupList); + taosHashCleanup(pHash); + + return code; } int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {