refactor: sync integrate into mnode
This commit is contained in:
parent
2b668e3b27
commit
6c1d1927c9
|
@ -304,13 +304,14 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
|
||||||
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
|
int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Update the version of sdb
|
* @brief Update the index of sdb
|
||||||
*
|
*
|
||||||
* @param pSdb The sdb object.
|
* @param pSdb The sdb object.
|
||||||
* @param val The update value of the version.
|
* @param index The update value of the apply index.
|
||||||
* @return int32_t The current version of sdb
|
* @return int32_t The current index of sdb
|
||||||
*/
|
*/
|
||||||
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val);
|
void sdbSetApplyIndex(SSdb *pSdb, int64_t index);
|
||||||
|
int64_t sdbGetApplyIndex(SSdb *pSdb);
|
||||||
|
|
||||||
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
||||||
void sdbFreeRaw(SSdbRaw *pRaw);
|
void sdbFreeRaw(SSdbRaw *pRaw);
|
||||||
|
|
|
@ -128,38 +128,35 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue
|
||||||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||||
|
|
||||||
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
SMnode *pMnode = pFsm->data;
|
||||||
if (pFsm->FpGetSnapshot != NULL) {
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SSnapshot snapshot = {0};
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
SyncIndex lastApply = sdbGetApplyIndex(pSdb);
|
||||||
beginIndex = snapshot.lastApplyIndex;
|
SSdbRaw *pRaw = pMsg->pCont;
|
||||||
}
|
|
||||||
|
|
||||||
if (cbMeta.index > beginIndex) {
|
|
||||||
SMnode *pMnode = pFsm->data;
|
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
|
||||||
|
|
||||||
SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg;
|
|
||||||
pApplyMsg->info.node = pFsm->data;
|
|
||||||
mndProcessApplyMsg(pApplyMsg);
|
|
||||||
sdbUpdateVer(pMnode->pSdb, 1);
|
|
||||||
|
|
||||||
|
if (cbMeta.index > lastApply) {
|
||||||
|
mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state));
|
||||||
|
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||||
|
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
|
||||||
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
||||||
tsem_post(&pMgmt->syncSem);
|
tsem_post(&pMgmt->syncSem);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, lastApply);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void mndSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
// strict consistent, do nothing
|
// strict consistent, do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void mndSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
// strict consistent, do nothing
|
// strict consistent, do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
// snapshot
|
SMnode *pMnode = pFsm->data;
|
||||||
|
pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,23 +179,25 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncInfo syncInfo = {.vgId = 1};
|
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
|
||||||
SSyncCfg *pCfg = &(syncInfo.syncCfg);
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
||||||
|
syncInfo.pWal = pMgmt->pWal;
|
||||||
|
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
||||||
|
|
||||||
|
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||||
pCfg->replicaNum = pMnode->replica;
|
pCfg->replicaNum = pMnode->replica;
|
||||||
pCfg->myIndex = pMnode->selfIndex;
|
pCfg->myIndex = pMnode->selfIndex;
|
||||||
for (int32_t i = 0; i < pMnode->replica; ++i) {
|
for (int32_t i = 0; i < pMnode->replica; ++i) {
|
||||||
SNodeInfo *pNodeInfo = &pCfg->nodeInfo[i];
|
SNodeInfo *pNode = &pCfg->nodeInfo[i];
|
||||||
tstrncpy(pNodeInfo->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNodeInfo->nodeFqdn));
|
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
||||||
pNodeInfo->nodePort = pMnode->replicas[i].port;
|
pNode->nodePort = pMnode->replicas[i].port;
|
||||||
}
|
}
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
|
||||||
syncInfo.pWal = pMnode->syncMgmt.pWal;
|
|
||||||
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
|
||||||
syncInfo.FpSendMsg = mndSyncSendMsg;
|
|
||||||
syncInfo.FpEqMsg = mndSyncEqMsg;
|
|
||||||
|
|
||||||
pMnode->syncMgmt.sync = syncOpen(&syncInfo);
|
pMgmt->sync = syncOpen(&syncInfo);
|
||||||
ASSERT(pMnode->syncMgmt.sync > 0);
|
if (pMgmt->sync <= 0) {
|
||||||
|
mError("failed to open sync since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -209,16 +208,6 @@ void mndCleanupSync(SMnode *pMnode) {
|
||||||
mndCloseWal(pMnode);
|
mndCloseWal(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
|
|
||||||
SMnode *pMnode = pData;
|
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
|
||||||
|
|
||||||
pMgmt->errCode = 0;
|
|
||||||
tsem_post(&pMgmt->syncSem);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||||
SWal *pWal = pMnode->syncMgmt.pWal;
|
SWal *pWal = pMnode->syncMgmt.pWal;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
|
@ -493,9 +493,8 @@ TEST_F(MndTestSdb, 01_Write_Str) {
|
||||||
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
|
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
|
||||||
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
|
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
|
||||||
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 );
|
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2 );
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1);
|
sdbSetApplyIndex(pSdb, -1);
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
|
ASSERT_EQ(sdbGetApplyIndex(pSdb), -1);
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1);
|
|
||||||
ASSERT_EQ(mnode.insertTimes, 2);
|
ASSERT_EQ(mnode.insertTimes, 2);
|
||||||
ASSERT_EQ(mnode.deleteTimes, 0);
|
ASSERT_EQ(mnode.deleteTimes, 0);
|
||||||
|
|
||||||
|
@ -537,9 +536,6 @@ TEST_F(MndTestSdb, 01_Write_Str) {
|
||||||
|
|
||||||
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 3);
|
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 3);
|
||||||
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 4);
|
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 4);
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, 0), -1);
|
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
|
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, -1), -1);
|
|
||||||
ASSERT_EQ(mnode.insertTimes, 3);
|
ASSERT_EQ(mnode.insertTimes, 3);
|
||||||
ASSERT_EQ(mnode.deleteTimes, 0);
|
ASSERT_EQ(mnode.deleteTimes, 0);
|
||||||
|
|
||||||
|
@ -704,8 +700,9 @@ TEST_F(MndTestSdb, 01_Write_Str) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// write version
|
// write version
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 0);
|
sdbSetApplyIndex(pSdb, 0);
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, 1), 1);
|
sdbSetApplyIndex(pSdb, 1);
|
||||||
|
ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
|
||||||
ASSERT_EQ(sdbWriteFile(pSdb), 0);
|
ASSERT_EQ(sdbWriteFile(pSdb), 0);
|
||||||
ASSERT_EQ(sdbWriteFile(pSdb), 0);
|
ASSERT_EQ(sdbWriteFile(pSdb), 0);
|
||||||
|
|
||||||
|
@ -775,7 +772,7 @@ TEST_F(MndTestSdb, 01_Read_Str) {
|
||||||
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
|
ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2);
|
||||||
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
|
ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1);
|
||||||
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5);
|
ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5);
|
||||||
ASSERT_EQ(sdbUpdateVer(pSdb, 0), 1);
|
ASSERT_EQ(sdbGetApplyIndex(pSdb), 1);
|
||||||
ASSERT_EQ(mnode.insertTimes, 4);
|
ASSERT_EQ(mnode.insertTimes, 4);
|
||||||
ASSERT_EQ(mnode.deleteTimes, 0);
|
ASSERT_EQ(mnode.deleteTimes, 0);
|
||||||
|
|
||||||
|
|
|
@ -156,4 +156,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t sdbUpdateVer(SSdb *pSdb, int32_t val) { return atomic_add_fetch_64(&pSdb->curVer, val); }
|
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
|
||||||
|
|
||||||
|
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
|
||||||
|
|
Loading…
Reference in New Issue