refactor: do some internal refactor.
This commit is contained in:
parent
1237120730
commit
3cb462cdf9
|
@ -56,7 +56,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int
|
||||||
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
|
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
|
||||||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||||
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList);
|
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList);
|
||||||
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
|
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
|
||||||
|
@ -1092,7 +1092,7 @@ _ERR:
|
||||||
|
|
||||||
int32_t extractStreamNodeList(SMnode *pMnode) {
|
int32_t extractStreamNodeList(SMnode *pMnode) {
|
||||||
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
|
if (taosArrayGetSize(execInfo.pNodeList) == 0) {
|
||||||
int32_t code = extractNodeListFromStream(pMnode, execInfo.pNodeList);
|
int32_t code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
|
||||||
if (code) {
|
if (code) {
|
||||||
mError("Failed to extract node list from stream, code:%s", tstrerror(code));
|
mError("Failed to extract node list from stream, code:%s", tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
@ -2437,7 +2437,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList) {
|
static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeList) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -2565,7 +2565,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
// keep the new vnode snapshot if success
|
// keep the new vnode snapshot if success
|
||||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
code = extractNodeListFromStream(pMnode, execInfo.pNodeList);
|
code = refreshNodeListFromExistedStreams(pMnode, execInfo.pNodeList);
|
||||||
if (code) {
|
if (code) {
|
||||||
mError("failed to extract node list from stream, code:%s", tstrerror(code));
|
mError("failed to extract node list from stream, code:%s", tstrerror(code));
|
||||||
goto _end;
|
goto _end;
|
||||||
|
|
Loading…
Reference in New Issue