Merge pull request #29106 from taosdata/fix/init_meta

refactor(stream): check mnode when issue the nodeEp update trans.
This commit is contained in:
Shengliang Guan 2024-12-16 09:23:18 +08:00 committed by GitHub
commit 5461502ed1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 111 additions and 49 deletions

View File

@ -108,30 +108,94 @@ static bool checkStatusForEachReplica(SVgObj *pVgroup) {
return true; return true;
} }
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) { static int32_t mndAddSnodeInfo(SMnode *pMnode, SArray *pVgroupList) {
SSnodeObj *pObj = NULL;
void *pIter = NULL;
int32_t code = 0;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
SNodeEntry entry = {.nodeId = SNODE_HANDLE};
code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
if (code) {
sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
return code;
}
char buf[256] = {0};
code = epsetToStr(&entry.epset, buf, tListLen(buf));
if (code != 0) { // print error and continue
mError("failed to convert epset to str, code:%s", tstrerror(code));
}
void *p = taosArrayPush(pVgroupList, &entry);
if (p == NULL) {
code = terrno;
sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
return code;
} else {
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
}
sdbRelease(pMnode->pSdb, pObj);
}
return code;
}
static int32_t mndCheckMnodeStatus(SMnode* pMnode) {
int32_t code = 0;
ESdbStatus objStatus;
void *pIter = NULL;
SMnodeObj *pObj = NULL;
while (1) {
pIter = sdbFetchAll(pMnode->pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, true);
if (pIter == NULL) {
break;
}
if (pObj->syncState != TAOS_SYNC_STATE_LEADER && pObj->syncState != TAOS_SYNC_STATE_FOLLOWER) {
mDebug("mnode sync state:%d not leader/follower", pObj->syncState);
sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_FAILED;
}
if (objStatus != SDB_STATUS_READY) {
mWarn("mnode status:%d not ready", objStatus);
sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_FAILED;
}
sdbRelease(pMnode->pSdb, pObj);
}
return TSDB_CODE_SUCCESS;
}
static int32_t mndCheckAndAddVgroupsInfo(SMnode *pMnode, SArray *pVgroupList, bool* allReady) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
int32_t code = 0; int32_t code = 0;
SArray *pVgroupList = NULL;
SHashObj *pHash = NULL; SHashObj *pHash = NULL;
pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
if (pVgroupList == NULL) {
mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
code = terrno;
goto _err;
}
pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pHash == NULL) { if (pHash == NULL) {
mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno)); mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
code = terrno; return terrno;
goto _err;
} }
*allReady = true;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) { if (pIter == NULL) {
@ -148,7 +212,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code)); mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
goto _err; // take snapshot failed, and not all ready goto _end; // take snapshot failed, and not all ready
} }
} else { } else {
if (*pReplica != pVgroup->replica) { if (*pReplica != pVgroup->replica) {
@ -158,7 +222,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
} }
} }
// if not all ready till now, no need to check the remaining vgroups. // if not all ready till now, no need to check the remaining vgroups,
// but still we need to put the info of the existed vgroups into the snapshot list // but still we need to put the info of the existed vgroups into the snapshot list
if (*allReady) { if (*allReady) {
*allReady = checkStatusForEachReplica(pVgroup); *allReady = checkStatusForEachReplica(pVgroup);
@ -176,7 +240,7 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
code = terrno; code = terrno;
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
goto _err; goto _end;
} else { } else {
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
} }
@ -184,51 +248,49 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
SSnodeObj *pObj = NULL; _end:
while (1) { taosHashCleanup(pHash);
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); return code;
if (pIter == NULL) { }
break;
}
SNodeEntry entry = {.nodeId = SNODE_HANDLE}; int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); int32_t code = 0;
if (code) { SArray *pVgroupList = NULL;
sdbRelease(pSdb, pObj);
sdbCancelFetch(pSdb, pIter);
mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
goto _err;
}
char buf[256] = {0}; *pList = NULL;
code = epsetToStr(&entry.epset, buf, tListLen(buf)); *allReady = true;
if (code != 0) { // print error and continue
mError("failed to convert epset to str, code:%s", tstrerror(code));
}
void *p = taosArrayPush(pVgroupList, &entry); pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
if (p == NULL) { if (pVgroupList == NULL) {
code = terrno; mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
sdbRelease(pSdb, pObj); code = terrno;
sdbCancelFetch(pSdb, pIter); goto _err;
mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code)); }
goto _err;
} else {
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
}
sdbRelease(pSdb, pObj); // 1. check for all vnodes status
code = mndCheckAndAddVgroupsInfo(pMnode, pVgroupList, allReady);
if (code) {
goto _err;
}
// 2. add snode info
code = mndAddSnodeInfo(pMnode, pVgroupList);
if (code) {
goto _err;
}
// 3. check for mnode status
code = mndCheckMnodeStatus(pMnode);
if (code != TSDB_CODE_SUCCESS) {
*allReady = false;
} }
*pList = pVgroupList; *pList = pVgroupList;
taosHashCleanup(pHash);
return code; return code;
_err: _err:
*allReady = false; *allReady = false;
taosArrayDestroy(pVgroupList); taosArrayDestroy(pVgroupList);
taosHashCleanup(pHash);
return code; return code;
} }