refactor: mnode sync
This commit is contained in:
parent
e1c52582e0
commit
bf8bbfbfbb
|
@ -36,7 +36,6 @@ static const SSysDbTableSchema mnodesSchema[] = {
|
||||||
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -652,16 +652,13 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, b1, false);
|
colDataAppend(pColInfo, numOfRows, b1, false);
|
||||||
|
|
||||||
const char *roles = syncStr(pObj->role);
|
const char *roles = syncStr(syncGetMyRole(pMnode->syncMgmt.sync));
|
||||||
char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE);
|
char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE);
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
|
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)b2, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pObj->roleTime, false);
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
|
||||||
|
|
||||||
|
|
|
@ -49,8 +49,10 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
|
|
||||||
void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
|
if (!pMnode->deploy) {
|
||||||
mndTransPullup(pMnode);
|
mndTransPullup(pMnode);
|
||||||
pMnode->syncMgmt.restored = true;
|
pMnode->syncMgmt.restored = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) {
|
void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) {
|
||||||
|
|
|
@ -1080,7 +1080,7 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
if (!mndIsMaster(pMnode)) return false;
|
if (!pMnode->deploy && !mndIsMaster(pMnode)) return false;
|
||||||
|
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
|
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
|
||||||
|
@ -1171,7 +1171,7 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
|
||||||
if (!mndIsMaster(pMnode)) return false;
|
if (!pMnode->deploy && !mndIsMaster(pMnode)) return false;
|
||||||
|
|
||||||
bool continueExec = true;
|
bool continueExec = true;
|
||||||
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
|
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
|
||||||
|
|
|
@ -335,8 +335,9 @@ void mndClose(SMnode *pMnode) {
|
||||||
|
|
||||||
int32_t mndStart(SMnode *pMnode) {
|
int32_t mndStart(SMnode *pMnode) {
|
||||||
mndSyncStart(pMnode);
|
mndSyncStart(pMnode);
|
||||||
if (pMnode->deploy && sdbDeploy(pMnode->pSdb) != 0) {
|
if (pMnode->deploy) {
|
||||||
return -1;
|
if (sdbDeploy(pMnode->pSdb) != 0) return -1;
|
||||||
|
pMnode->syncMgmt.restored = true;
|
||||||
}
|
}
|
||||||
return mndInitTimer(pMnode);
|
return mndInitTimer(pMnode);
|
||||||
}
|
}
|
||||||
|
@ -414,8 +415,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
||||||
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
|
mTrace("msg:%p, will be processed, type:%s app:%p", pMsg, TMSG_INFO(pMsg->msgType), ahandle);
|
||||||
|
|
||||||
if (IsReq(pMsg)) {
|
if (IsReq(pMsg)) {
|
||||||
if (!mndIsMaster(pMnode) && pMsg->msgType != TDMT_MND_TRANS_TIMER && pMsg->msgType != TDMT_MND_MQ_TIMER &&
|
if (!mndIsMaster(pMnode)) {
|
||||||
pMsg->msgType != TDMT_MND_TELEM_TIMER) {
|
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -913,12 +913,12 @@ uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
|
||||||
} else {
|
} else {
|
||||||
#ifdef EAI_SYSTEM
|
#ifdef EAI_SYSTEM
|
||||||
if (ret == EAI_SYSTEM) {
|
if (ret == EAI_SYSTEM) {
|
||||||
printf("failed to get the ip address, fqdn:%s, errno:%d, since:%s", fqdn, errno, strerror(errno));
|
// printf("failed to get the ip address, fqdn:%s, errno:%d, since:%s", fqdn, errno, strerror(errno));
|
||||||
} else {
|
} else {
|
||||||
printf("failed to get the ip address, fqdn:%s, ret:%d, since:%s", fqdn, ret, gai_strerror(ret));
|
// printf("failed to get the ip address, fqdn:%s, ret:%d, since:%s", fqdn, ret, gai_strerror(ret));
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
printf("failed to get the ip address, fqdn:%s, ret:%d, since:%s", fqdn, ret, gai_strerror(ret));
|
// printf("failed to get the ip address, fqdn:%s, ret:%d, since:%s", fqdn, ret, gai_strerror(ret));
|
||||||
#endif
|
#endif
|
||||||
return 0xFFFFFFFF;
|
return 0xFFFFFFFF;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue