refactor: do some internal refactor.
This commit is contained in:
parent
6170bb5c1c
commit
39495ec935
|
@ -133,7 +133,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr
|
||||||
|
|
||||||
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream);
|
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream);
|
||||||
bool mndStreamNodeIsUpdated(SMnode *pMnode);
|
bool mndStreamNodeIsUpdated(SMnode *pMnode);
|
||||||
bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb);
|
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb);
|
||||||
|
|
||||||
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
||||||
int32_t mndProcessStreamHb(SRpcMsg *pReq);
|
int32_t mndProcessStreamHb(SRpcMsg *pReq);
|
||||||
|
|
|
@ -813,10 +813,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool snodeCheckSucc = mndCheckForSnode(pMnode, pSourceDb);
|
code = mndCheckForSnode(pMnode, pSourceDb);
|
||||||
mndReleaseDb(pMnode, pSourceDb);
|
mndReleaseDb(pMnode, pSourceDb);
|
||||||
if (!snodeCheckSucc) {
|
if (code != 0) {
|
||||||
code = TSDB_CODE_SNODE_NOT_DEPLOYED;
|
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1497,13 +1497,13 @@ bool mndStreamNodeIsUpdated(SMnode *pMnode) {
|
||||||
return updated;
|
return updated;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
|
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SSnodeObj *pObj = NULL;
|
SSnodeObj *pObj = NULL;
|
||||||
|
|
||||||
if (pSrcDb->cfg.replications == 1) {
|
if (pSrcDb->cfg.replications == 1) {
|
||||||
return true;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||||
|
@ -1513,11 +1513,11 @@ bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
|
||||||
|
|
||||||
sdbRelease(pSdb, pObj);
|
sdbRelease(pSdb, pObj);
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return true;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
mError("snode not existed when trying to create stream in db with multiple replica");
|
mError("snode not existed when trying to create stream in db with multiple replica");
|
||||||
return false;
|
return TSDB_CODE_SNODE_NOT_DEPLOYED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue