enh(sync) sync/mnode integration
This commit is contained in:
parent
d06253934b
commit
249aecacda
|
@ -158,6 +158,8 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "mnode-drop-index", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL)
|
||||||
|
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
|
|
|
@ -90,6 +90,9 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
|
||||||
*/
|
*/
|
||||||
int32_t mndProcessMsg(SRpcMsg *pMsg);
|
int32_t mndProcessMsg(SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
|
||||||
|
int32_t mndProcessApplyMsg(SRpcMsg *pMsg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Generate machine code
|
* @brief Generate machine code
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -33,6 +33,7 @@ typedef struct SMnodeMgmt {
|
||||||
SSingleWorker readWorker;
|
SSingleWorker readWorker;
|
||||||
SSingleWorker writeWorker;
|
SSingleWorker writeWorker;
|
||||||
SSingleWorker syncWorker;
|
SSingleWorker syncWorker;
|
||||||
|
SSingleWorker applyWorker;
|
||||||
SSingleWorker monitorWorker;
|
SSingleWorker monitorWorker;
|
||||||
SReplica replicas[TSDB_MAX_REPLICA];
|
SReplica replicas[TSDB_MAX_REPLICA];
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
|
@ -59,6 +60,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt);
|
||||||
void mmStopWorker(SMnodeMgmt *pMgmt);
|
void mmStopWorker(SMnodeMgmt *pMgmt);
|
||||||
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmPutNodeMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -122,6 +122,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (syncInit() != 0) {
|
||||||
|
dError("failed to init sync since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt));
|
SMnodeMgmt *pMgmt = taosMemoryCalloc(1, sizeof(SMnodeMgmt));
|
||||||
if (pMgmt == NULL) {
|
if (pMgmt == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -56,6 +56,32 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void mmProcessApplyQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
|
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
|
int32_t code = -1;
|
||||||
|
tmsg_t msgType = pMsg->msgType;
|
||||||
|
bool isRequest = msgType & 1U;
|
||||||
|
dTrace("msg:%p, get from mnode-query queue", pMsg);
|
||||||
|
|
||||||
|
pMsg->info.node = pMgmt->pMnode;
|
||||||
|
|
||||||
|
mndProcessApplyMsg(pMsg);
|
||||||
|
|
||||||
|
/*
|
||||||
|
if (isRequest) {
|
||||||
|
if (pMsg->info.handle != NULL && code != 0) {
|
||||||
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
|
mmSendRsp(pMsg, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
dTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
static void mmProcessQueryQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -92,6 +118,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
|
return mmPutNodeMsgToWorker(&pMgmt->syncWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mmPutNodeMsgToApplyQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
|
return mmPutNodeMsgToWorker(&pMgmt->applyWorker, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
|
return mmPutNodeMsgToWorker(&pMgmt->readWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
@ -179,6 +209,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSingleWorkerCfg aCfg = {
|
||||||
|
.min = 1,
|
||||||
|
.max = 1,
|
||||||
|
.name = "mnode-apply",
|
||||||
|
.fp = (FItem)mmProcessApplyQueue,
|
||||||
|
.param = pMgmt,
|
||||||
|
};
|
||||||
|
if (tSingleWorkerInit(&pMgmt->applyWorker, &aCfg) != 0) {
|
||||||
|
dError("failed to start mnode mnode-apply worker since %s", terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
SSingleWorkerCfg mCfg = {
|
SSingleWorkerCfg mCfg = {
|
||||||
.min = 1,
|
.min = 1,
|
||||||
.max = 1,
|
.max = 1,
|
||||||
|
|
|
@ -75,7 +75,9 @@ typedef struct {
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
sem_t syncSem;
|
sem_t syncSem;
|
||||||
SWal *pWal;
|
SWal *pWal;
|
||||||
SSyncNode *pSyncNode;
|
//SSyncNode *pSyncNode;
|
||||||
|
int64_t sync;
|
||||||
|
|
||||||
ESyncState state;
|
ESyncState state;
|
||||||
} SSyncMgmt;
|
} SSyncMgmt;
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,12 @@ static void mndCloseWal(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndRestoreWal(SMnode *pMnode) {
|
static int32_t mndRestoreWal(SMnode *pMnode) {
|
||||||
|
|
||||||
|
// do nothing
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
|
||||||
SWal *pWal = pMnode->syncMgmt.pWal;
|
SWal *pWal = pMnode->syncMgmt.pWal;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
|
int64_t lastSdbVer = sdbUpdateVer(pSdb, 0);
|
||||||
|
@ -114,6 +120,70 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
|
||||||
_OVER:
|
_OVER:
|
||||||
walCloseReadHandle(pHandle);
|
walCloseReadHandle(pHandle);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndSyncEqMsg(const SMsgCb* msgcb, SRpcMsg *pMsg) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
if (msgcb->queueFps[SYNC_QUEUE] != NULL) {
|
||||||
|
tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||||
|
} else {
|
||||||
|
mError("mndSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
pMsg->info.noResp = 1;
|
||||||
|
tmsgSendReq(pEpSet, pMsg);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
|
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||||
|
if (pFsm->FpGetSnapshot != NULL) {
|
||||||
|
SSnapshot snapshot;
|
||||||
|
pFsm->FpGetSnapshot(pFsm, &snapshot);
|
||||||
|
beginIndex = snapshot.lastApplyIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cbMeta.index > beginIndex) {
|
||||||
|
SMnode *pMnode = pFsm->data;
|
||||||
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
|
||||||
|
mndProcessApplyMsg((SRpcMsg*)pMsg);
|
||||||
|
//mmPutNodeMsgToApplyQueue(pMnode->pWrapper->pMgmt, pMsg);
|
||||||
|
|
||||||
|
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
tsem_post(&pMgmt->syncSem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
|
// strict consistent, do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
|
// strict consistent, do nothing
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
|
// snapshot
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncFSM *syncMnodeMakeFsm(SMnode *pMnode) {
|
||||||
|
SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||||
|
pFsm->data = pMnode;
|
||||||
|
pFsm->FpCommitCb = mndSyncCommitCb;
|
||||||
|
pFsm->FpPreCommitCb = mndSyncPreCommitCb;
|
||||||
|
pFsm->FpRollBackCb = mndSyncRollBackCb;
|
||||||
|
pFsm->FpGetSnapshot = mndSyncGetSnapshotCb;
|
||||||
|
return pFsm;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndInitSync(SMnode *pMnode) {
|
int32_t mndInitSync(SMnode *pMnode) {
|
||||||
|
@ -133,7 +203,27 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
if (pMnode->selfId == 1) {
|
if (pMnode->selfId == 1) {
|
||||||
pMgmt->state = TAOS_SYNC_STATE_LEADER;
|
pMgmt->state = TAOS_SYNC_STATE_LEADER;
|
||||||
}
|
}
|
||||||
pMgmt->pSyncNode = NULL;
|
|
||||||
|
// pMgmt->pSyncNode = NULL;
|
||||||
|
SSyncInfo syncInfo;
|
||||||
|
syncInfo.vgId = 1;
|
||||||
|
SSyncCfg *pCfg = &(syncInfo.syncCfg);
|
||||||
|
pCfg->replicaNum = pMnode->replica;
|
||||||
|
pCfg->myIndex = pMnode->selfIndex;
|
||||||
|
for (int i = 0; i < pMnode->replica; ++i) {
|
||||||
|
snprintf((pCfg->nodeInfo)->nodeFqdn, sizeof((pCfg->nodeInfo)->nodeFqdn), "%s", (pMnode->replicas)[i].fqdn);
|
||||||
|
(pCfg->nodeInfo)->nodePort = (pMnode->replicas)[i].port;
|
||||||
|
}
|
||||||
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pMnode->path);
|
||||||
|
syncInfo.pWal = pMnode->syncMgmt.pWal;
|
||||||
|
|
||||||
|
syncInfo.pFsm = syncMnodeMakeFsm(pMnode);
|
||||||
|
syncInfo.FpSendMsg = mndSendMsg;
|
||||||
|
syncInfo.FpEqMsg = mndSyncEqMsg;
|
||||||
|
|
||||||
|
pMnode->syncMgmt.sync = syncOpen(&syncInfo);
|
||||||
|
ASSERT(pMnode->syncMgmt.sync > 0);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,6 +247,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||||
SWal *pWal = pMnode->syncMgmt.pWal;
|
SWal *pWal = pMnode->syncMgmt.pWal;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
#if 0
|
||||||
int64_t ver = sdbUpdateVer(pSdb, 1);
|
int64_t ver = sdbUpdateVer(pSdb, 1);
|
||||||
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
|
if (walWrite(pWal, ver, 1, pRaw, sdbGetRawTotalSize(pRaw)) < 0) {
|
||||||
sdbUpdateVer(pSdb, -1);
|
sdbUpdateVer(pSdb, -1);
|
||||||
|
@ -168,24 +259,32 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||||
walCommit(pWal, ver);
|
walCommit(pWal, ver);
|
||||||
walFsync(pWal, true);
|
walFsync(pWal, true);
|
||||||
|
|
||||||
#if 1
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
|
||||||
if (pMnode->replica == 1) return 0;
|
if (pMnode->replica == 1) return 0;
|
||||||
|
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
pMgmt->errCode = 0;
|
pMgmt->errCode = 0;
|
||||||
|
|
||||||
SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)};
|
//SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)};
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
rpcMsg.code = TDMT_MND_APPLY_MSG;
|
||||||
|
rpcMsg.contLen = sdbGetRawTotalSize(pRaw);
|
||||||
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
|
memcpy(rpcMsg.pCont, pRaw, rpcMsg.contLen);
|
||||||
|
|
||||||
bool isWeak = false;
|
bool isWeak = false;
|
||||||
int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak);
|
int32_t code = syncPropose(pMgmt->sync, &rpcMsg, isWeak);
|
||||||
|
|
||||||
if (code != 0) return code;
|
if (code != 0) return code;
|
||||||
|
|
||||||
tsem_wait(&pMgmt->syncSem);
|
tsem_wait(&pMgmt->syncSem);
|
||||||
return pMgmt->errCode;
|
return pMgmt->errCode;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndIsMaster(SMnode *pMnode) {
|
bool mndIsMaster(SMnode *pMnode) {
|
||||||
|
|
|
@ -683,11 +683,14 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
||||||
|
|
||||||
mDebug("trans:%d, sync finished", pTrans->id);
|
mDebug("trans:%d, sync finished", pTrans->id);
|
||||||
|
|
||||||
code = sdbWrite(pMnode->pSdb, pRaw);
|
// do it in state machine commit cb
|
||||||
|
#if 0
|
||||||
|
code = sdbWriteWithout(pMnode->pSdb, pRaw);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -336,9 +336,25 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndStart(SMnode *pMnode) { return mndInitTimer(pMnode); }
|
int32_t mndStart(SMnode *pMnode) {
|
||||||
|
syncStart(pMnode->syncMgmt.sync);
|
||||||
|
return mndInitTimer(pMnode);
|
||||||
|
}
|
||||||
|
|
||||||
void mndStop(SMnode *pMnode) { return mndCleanupTimer(pMnode); }
|
void mndStop(SMnode *pMnode) {
|
||||||
|
syncStop(pMnode->syncMgmt.sync);
|
||||||
|
return mndCleanupTimer(pMnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mndProcessApplyMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
|
SSdbRaw *pRaw = pMsg->pCont;
|
||||||
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessMsg(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
|
|
@ -55,14 +55,17 @@ static void syncFreeNode(void* param);
|
||||||
// ---------------------------------
|
// ---------------------------------
|
||||||
|
|
||||||
int32_t syncInit() {
|
int32_t syncInit() {
|
||||||
int32_t ret;
|
int32_t ret = 0;
|
||||||
tsNodeRefId = taosOpenRef(200, syncFreeNode);
|
|
||||||
if (tsNodeRefId < 0) {
|
if (!syncEnvIsStart()) {
|
||||||
sError("failed to init node ref");
|
tsNodeRefId = taosOpenRef(200, syncFreeNode);
|
||||||
syncCleanUp();
|
if (tsNodeRefId < 0) {
|
||||||
ret = -1;
|
sError("failed to init node ref");
|
||||||
} else {
|
syncCleanUp();
|
||||||
ret = syncEnvStart();
|
ret = -1;
|
||||||
|
} else {
|
||||||
|
ret = syncEnvStart();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|
Loading…
Reference in New Issue