From 7c6fbbfbd12689e172cd2fd994a44a4a4a9a5501 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 22 Mar 2022 16:58:36 +0800 Subject: [PATCH] sync refactor --- include/libs/sync/sync.h | 3 ++ source/libs/sync/src/syncMain.c | 49 +++++++++++++++++---------------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 44efc33dbb..7802aaca96 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -165,6 +165,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // us int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // for compatibility, the same as syncPropose ESyncState syncGetMyRole(int64_t rid); +// propose with sequence number, to implement linearizable semantics +int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum); + extern int32_t sDebugFlag; #ifdef __cplusplus diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bbb0142584..f2341ef521 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -104,29 +104,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { } int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { - int32_t ret = 0; - - // todo : get pointer from rid - SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); - if (pSyncNode == NULL) { - return -1; - } - assert(rid == pSyncNode->rid); - - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); - SRpcMsg rpcMsg; - syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); - pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); - syncClientRequestDestroy(pSyncMsg); - ret = 0; - - } else { - sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); - ret = -1; // todo : need define err code !! - } - - taosReleaseRef(tsNodeRefId, pSyncNode->rid); + int32_t ret = syncPropose2(rid, pMsg, isWeak, 0); return ret; } @@ -144,6 +122,31 @@ ESyncState syncGetMyRole(int64_t rid) { return pSyncNode->state; } +int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum) { + int32_t ret = 0; + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return -1; + } + assert(rid == pSyncNode->rid); + + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncClientRequestDestroy(pSyncMsg); + ret = 0; + + } else { + sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); + ret = -1; // todo : need define err code !! + } + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return ret; +} + // open/close -------------- SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));