enh: sdb snapshot
This commit is contained in:
parent
f6c676a174
commit
b5a1131c25
|
@ -81,7 +81,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
|
|||
* @param pMsg The request msg.
|
||||
* @return int32_t 0 for success, -1 for failure.
|
||||
*/
|
||||
int32_t mndProcessMsg(SRpcMsg *pMsg);
|
||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
|
||||
|
||||
/**
|
||||
|
|
|
@ -40,7 +40,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
break;
|
||||
default:
|
||||
pMsg->info.node = pMgmt->pMnode;
|
||||
code = mndProcessMsg(pMsg);
|
||||
code = mndProcessRpcMsg(pMsg);
|
||||
}
|
||||
|
||||
if (IsReq(pMsg) && pMsg->info.handle != NULL && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
|
|
|
@ -93,7 +93,8 @@ typedef struct SMnode {
|
|||
int64_t clusterId;
|
||||
TdThread thread;
|
||||
TdThreadRwlock lock;
|
||||
int32_t refCount;
|
||||
int32_t rpcRef;
|
||||
int32_t syncRef;
|
||||
bool stopped;
|
||||
bool restored;
|
||||
bool deploy;
|
||||
|
@ -119,6 +120,14 @@ typedef struct SMnode {
|
|||
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
|
||||
int64_t mndGenerateUid(char *name, int32_t len);
|
||||
|
||||
int32_t mndAcquireRpcRef(SMnode *pMnode);
|
||||
void mndReleaseRpcRef(SMnode *pMnode);
|
||||
void mndSetRestore(SMnode *pMnode, bool restored);
|
||||
void mndSetStop(SMnode *pMnode);
|
||||
bool mndGetStop(SMnode *pMnode);
|
||||
int32_t mndAcquireSyncRef(SMnode *pMnode);
|
||||
void mndReleaseSyncRef(SMnode *pMnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -85,7 +85,7 @@ static void *mndThreadFp(void *param) {
|
|||
while (1) {
|
||||
lastTime++;
|
||||
taosMsleep(100);
|
||||
if (pMnode->stopped) break;
|
||||
if (mndGetStop(pMnode)) break;
|
||||
|
||||
if (lastTime % (tsTransPullupInterval * 10) == 0) {
|
||||
mndPullupTrans(pMnode);
|
||||
|
@ -118,7 +118,6 @@ static int32_t mndInitTimer(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
static void mndCleanupTimer(SMnode *pMnode) {
|
||||
pMnode->stopped = true;
|
||||
if (taosCheckPthreadValid(pMnode->thread)) {
|
||||
taosThreadJoin(pMnode->thread, NULL);
|
||||
taosThreadClear(&pMnode->thread);
|
||||
|
@ -335,15 +334,19 @@ void mndClose(SMnode *pMnode) {
|
|||
int32_t mndStart(SMnode *pMnode) {
|
||||
mndSyncStart(pMnode);
|
||||
if (pMnode->deploy) {
|
||||
if (sdbDeploy(pMnode->pSdb) != 0) return -1;
|
||||
pMnode->restored = true;
|
||||
if (sdbDeploy(pMnode->pSdb) != 0) {
|
||||
mError("failed to deploy sdb while start mnode");
|
||||
return -1;
|
||||
}
|
||||
mndSetRestore(pMnode, true);
|
||||
}
|
||||
return mndInitTimer(pMnode);
|
||||
}
|
||||
|
||||
void mndStop(SMnode *pMnode) {
|
||||
mndSetStop(pMnode);
|
||||
mndSyncStop(pMnode);
|
||||
return mndCleanupTimer(pMnode);
|
||||
mndCleanupTimer(pMnode);
|
||||
}
|
||||
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||
|
@ -362,6 +365,11 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||
}
|
||||
|
||||
if (mndAcquireSyncRef(pMnode) != 0) {
|
||||
mError("failed to process sync msg:%p type:%s since %s", pMsg, TMSG_INFO(pMsg->msgType), terrstr());
|
||||
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||
}
|
||||
|
||||
char logBuf[512];
|
||||
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||
|
@ -405,59 +413,45 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
|||
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||
}
|
||||
|
||||
mndReleaseSyncRef(pMnode);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndCheckMnodeMaster(SRpcMsg *pMsg) {
|
||||
if (!IsReq(pMsg)) return 0;
|
||||
if (mndIsMaster(pMsg->info.node)) return 0;
|
||||
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
||||
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
||||
|
||||
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
|
||||
pMsg->msgType == TDMT_MND_TRANS_TIMER) {
|
||||
return -1;
|
||||
}
|
||||
mError("msg:%p, failed to check master since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
|
||||
pMsg->msgType != TDMT_MND_TRANS_TIMER) {
|
||||
mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
|
||||
SEpSet epSet = {0};
|
||||
mndGetMnodeEpSet(pMsg->info.node, &epSet);
|
||||
SEpSet epSet = {0};
|
||||
mndGetMnodeEpSet(pMsg->info.node, &epSet);
|
||||
|
||||
#if 0
|
||||
mTrace("msg:%p, is redirected, num:%d use:%d", pMsg, epSet.numOfEps, epSet.inUse);
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
mTrace("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
|
||||
if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) {
|
||||
epSet.inUse = (i + 1) % epSet.numOfEps;
|
||||
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
|
||||
pMsg->info.rsp = rpcMallocCont(contLen);
|
||||
if (pMsg->info.rsp != NULL) {
|
||||
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
|
||||
pMsg->info.rspLen = contLen;
|
||||
terrno = TSDB_CODE_RPC_REDIRECT;
|
||||
} else {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
|
||||
pMsg->info.rsp = rpcMallocCont(contLen);
|
||||
if (pMsg->info.rsp != NULL) {
|
||||
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
|
||||
pMsg->info.rspLen = contLen;
|
||||
terrno = TSDB_CODE_RPC_REDIRECT;
|
||||
} else {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t mndCheckRequestValid(SRpcMsg *pMsg) {
|
||||
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
|
||||
if (!IsReq(pMsg)) return 0;
|
||||
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
|
||||
|
||||
mError("msg:%p, failed to valid request, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_INVALID_MSG_LEN;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
||||
if (mndCheckMnodeMaster(pMsg) != 0) return -1;
|
||||
if (mndCheckRequestValid(pMsg) != 0) return -1;
|
||||
|
||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
||||
if (fp == NULL) {
|
||||
|
@ -466,8 +460,13 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
mTrace("msg:%p, will be processed in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
if (mndCheckMsgContent(pMsg) != 0) return -1;
|
||||
if (mndCheckMnodeState(pMsg) != 0) return -1;
|
||||
|
||||
mTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
int32_t code = (*fp)(pMsg);
|
||||
mndReleaseRpcRef(pMnode);
|
||||
|
||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mTrace("msg:%p, won't response immediately since in progress", pMsg);
|
||||
} else if (code == 0) {
|
||||
|
@ -476,6 +475,7 @@ int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
|||
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
|
||||
TMSG_INFO(pMsg->msgType));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -502,7 +502,7 @@ int64_t mndGenerateUid(char *name, int32_t len) {
|
|||
|
||||
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
|
||||
SMonGrantInfo *pGrantInfo) {
|
||||
if (!mndIsMaster(pMnode)) return -1;
|
||||
if (mndAcquireRpcRef(pMnode) != 0) return -1;
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int64_t ms = taosGetTimestampMs();
|
||||
|
@ -511,6 +511,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
|||
pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
|
||||
pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
|
||||
if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL) {
|
||||
mndReleaseRpcRef(pMnode);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -605,6 +606,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
|||
pGrantInfo->timeseries_total = INT32_MAX;
|
||||
}
|
||||
|
||||
mndReleaseRpcRef(pMnode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -612,3 +614,76 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
|
|||
pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndAcquireRpcRef(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
taosThreadRwlockRdlock(&pMnode->lock);
|
||||
if (pMnode->stopped) {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
code = -1;
|
||||
} else if (!mndIsMaster(pMnode)) {
|
||||
code = -1;
|
||||
} else {
|
||||
int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
|
||||
mTrace("mnode rpc is acquired, ref:%d", ref);
|
||||
}
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
void mndReleaseRpcRef(SMnode *pMnode) {
|
||||
taosThreadRwlockRdlock(&pMnode->lock);
|
||||
int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
|
||||
mTrace("mnode rpc is released, ref:%d", ref);
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
}
|
||||
|
||||
void mndSetRestore(SMnode *pMnode, bool restored) {
|
||||
if (restored) {
|
||||
taosThreadRwlockWrlock(&pMnode->lock);
|
||||
pMnode->restored = true;
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
mTrace("mnode set restored:%d", restored);
|
||||
} else {
|
||||
taosThreadRwlockWrlock(&pMnode->lock);
|
||||
pMnode->restored = false;
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
mTrace("mnode set restored:%d", restored);
|
||||
while (1) {
|
||||
if (pMnode->rpcRef <= 0) break;
|
||||
taosMsleep(3);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }
|
||||
|
||||
void mndSetStop(SMnode *pMnode) {
|
||||
taosThreadRwlockWrlock(&pMnode->lock);
|
||||
pMnode->stopped = true;
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
mTrace("mnode set stopped");
|
||||
}
|
||||
|
||||
bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }
|
||||
|
||||
int32_t mndAcquireSyncRef(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
taosThreadRwlockRdlock(&pMnode->lock);
|
||||
if (pMnode->stopped) {
|
||||
terrno = TSDB_CODE_APP_NOT_READY;
|
||||
code = -1;
|
||||
} else {
|
||||
int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1);
|
||||
mTrace("mnode sync is acquired, ref:%d", ref);
|
||||
}
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
void mndReleaseSyncRef(SMnode *pMnode) {
|
||||
taosThreadRwlockRdlock(&pMnode->lock);
|
||||
int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1);
|
||||
mTrace("mnode sync is released, ref:%d", ref);
|
||||
taosThreadRwlockUnlock(&pMnode->lock);
|
||||
}
|
|
@ -63,7 +63,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
|
|||
if (!pMnode->deploy) {
|
||||
mInfo("mnode sync restore finished");
|
||||
mndTransPullup(pMnode);
|
||||
pMnode->restored = true;
|
||||
mndSetRestore(pMnode, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -85,7 +85,7 @@ int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void
|
|||
|
||||
int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
pMnode->restored = false;
|
||||
mndSetRestore(pMnode, false);
|
||||
mInfo("start to apply snapshot to sdb, len:%d", len);
|
||||
|
||||
int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len);
|
||||
|
@ -93,7 +93,7 @@ int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char
|
|||
mError("failed to apply snapshot to sdb, len:%d", len);
|
||||
} else {
|
||||
mInfo("successfully to apply snapshot to sdb, len:%d", len);
|
||||
pMnode->restored = true;
|
||||
mndSetRestore(pMnode, true);
|
||||
}
|
||||
|
||||
// taosMemoryFree(pBuf);
|
||||
|
|
Loading…
Reference in New Issue