sync refactor

This commit is contained in:
Minghao Li 2022-03-22 16:58:36 +08:00
parent f7432337fd
commit 7c6fbbfbd1
2 changed files with 29 additions and 23 deletions

View File

@ -165,6 +165,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // us
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // for compatibility, the same as syncPropose int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // for compatibility, the same as syncPropose
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
// propose with sequence number, to implement linearizable semantics
int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum);
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -104,29 +104,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
} }
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = syncPropose2(rid, pMsg, isWeak, 0);
// todo : get pointer from rid
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak);
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncClientRequestDestroy(pSyncMsg);
ret = 0;
} else {
sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
ret = -1; // todo : need define err code !!
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret; return ret;
} }
@ -144,6 +122,31 @@ ESyncState syncGetMyRole(int64_t rid) {
return pSyncNode->state; return pSyncNode->state;
} }
int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum) {
int32_t ret = 0;
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak);
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncClientRequestDestroy(pSyncMsg);
ret = 0;
} else {
sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
ret = -1; // todo : need define err code !!
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return ret;
}
// open/close -------------- // open/close --------------
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));