fix(stream): compare vg replica according to different db.

This commit is contained in:
Haojun Liao 2024-07-31 19:24:41 +08:00
parent 6539760c64
commit e8f6454d17
1 changed files with 74 additions and 40 deletions

View File

@ -88,18 +88,48 @@ void destroyStreamTaskIter(SStreamTaskIter* pIter) {
taosMemoryFree(pIter); taosMemoryFree(pIter);
} }
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) { static bool checkStatusForEachReplica(SVgObj *pVgroup) {
SSdb *pSdb = pMnode->pSdb; for (int32_t i = 0; i < pVgroup->replica; ++i) {
void *pIter = NULL; if (!pVgroup->vnodeGid[i].syncRestore) {
SVgObj *pVgroup = NULL; mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
int32_t replica = -1; // do the replica check return false;
int32_t code = 0; }
ESyncState state = pVgroup->vnodeGid[i].syncState;
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
state == TAOS_SYNC_STATE_CANDIDATE) {
mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups", pVgroup->vgId,
state);
return false;
}
}
return true;
}
int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SVgObj *pVgroup = NULL;
int32_t code = 0;
SArray *pVgroupList = NULL;
SHashObj *pHash = NULL;
pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
if (pVgroupList == NULL) {
mError("failed to prepare arraylist during take vgroup snapshot, code:%s", tstrerror(terrno));
code = terrno;
goto _err;
}
pHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pHash == NULL) {
mError("failed to prepare hashmap during take vgroup snapshot, code:%s", tstrerror(terrno));
code = terrno;
goto _err;
}
*allReady = true; *allReady = true;
SArray *pVgroupList = taosArrayInit(4, sizeof(SNodeEntry));
if (pVgroupList == NULL) {
return terrno;
}
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
@ -110,44 +140,37 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime}; SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
entry.epset = mndGetVgroupEpset(pMnode, pVgroup); entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
if (replica == -1) { int8_t *pReplica = taosHashGet(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid));
replica = pVgroup->replica; if (pReplica == NULL) { // not exist, add it into hash map
} else { code = taosHashPut(pHash, &pVgroup->dbUid, sizeof(pVgroup->dbUid), &pVgroup->replica, sizeof(pVgroup->replica));
if (replica != pVgroup->replica) { if (code) {
mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations", mError("failed to put info into hashmap during task vgroup snapshot, code:%s", tstrerror(code));
pVgroup->vgId, pVgroup->replica, replica);
*allReady = false;
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
break; goto _err; // take snapshot failed, and not all ready
}
} else {
if (*pReplica != pVgroup->replica) {
mInfo("vgId:%d replica:%d inconsistent with other vgroups replica:%d, not ready for stream operations",
pVgroup->vgId, pVgroup->replica, *pReplica);
*allReady = false; // task snap success, but not all ready
} }
} }
// if not all ready till now, no need to check the remaining vgroups. // if not all ready till now, no need to check the remaining vgroups.
// but still we need to put the info of the existed vgroups into the snapshot list
if (*allReady) { if (*allReady) {
for (int32_t i = 0; i < pVgroup->replica; ++i) { *allReady = checkStatusForEachReplica(pVgroup);
if (!pVgroup->vnodeGid[i].syncRestore) {
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
*allReady = false;
break;
}
ESyncState state = pVgroup->vnodeGid[i].syncState;
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR || state == TAOS_SYNC_STATE_LEARNER ||
state == TAOS_SYNC_STATE_CANDIDATE) {
mInfo("vgId:%d state:%d , not ready for checkpoint or other operations, not check other vgroups",
pVgroup->vgId, state);
*allReady = false;
break;
}
}
} }
char buf[256] = {0}; char buf[256] = {0};
(void) epsetToStr(&entry.epset, buf, tListLen(buf)); (void)epsetToStr(&entry.epset, buf, tListLen(buf));
void* p = taosArrayPush(pVgroupList, &entry); void *p = taosArrayPush(pVgroupList, &entry);
if (p == NULL) { if (p == NULL) {
mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId);
code = terrno;
sdbRelease(pSdb, pVgroup);
goto _err;
} else { } else {
mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf); mDebug("take node snapshot, nodeId:%d %s", entry.nodeId, buf);
} }
@ -166,15 +189,19 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port); code = addEpIntoEpSet(&entry.epset, pObj->pDnode->fqdn, pObj->pDnode->port);
if (code) { if (code) {
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);
continue; mError("failed to extract epset for fqdn:%s during task vgroup snapshot", pObj->pDnode->fqdn);
goto _err;
} }
char buf[256] = {0}; char buf[256] = {0};
(void) epsetToStr(&entry.epset, buf, tListLen(buf)); (void)epsetToStr(&entry.epset, buf, tListLen(buf));
void* p = taosArrayPush(pVgroupList, &entry); void *p = taosArrayPush(pVgroupList, &entry);
if (p == NULL) { if (p == NULL) {
mError("failed to put entry in vgroup list, nodeId:%d code:out of memory", entry.nodeId); code = terrno;
sdbRelease(pSdb, pObj);
mError("failed to put entry in vgroup list, nodeId:%d code:%s", entry.nodeId, tstrerror(code));
goto _err;
} else { } else {
mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf); mDebug("take snode snapshot, nodeId:%d %s", entry.nodeId, buf);
} }
@ -184,6 +211,13 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray** pList) {
*pList = pVgroupList; *pList = pVgroupList;
return code; return code;
_err:
*allReady = false;
taosArrayDestroy(pVgroupList);
taosHashCleanup(pHash);
return code;
} }
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) { int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {