sync refactor
This commit is contained in:
parent
1c3113686e
commit
f7432337fd
|
@ -69,17 +69,20 @@ typedef struct SSnapshot {
|
||||||
typedef struct SSyncFSM {
|
typedef struct SSyncFSM {
|
||||||
void* data;
|
void* data;
|
||||||
|
|
||||||
// when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result
|
// when value in pMsg finish a raft flow, FpCommitCb is called, code indicates the result
|
||||||
// user can do something according to the code and isWeak. for example, write data into tsdb
|
// user can do something according to the code and isWeak. for example, write data into tsdb
|
||||||
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
|
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
|
ESyncState state);
|
||||||
|
|
||||||
// when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result
|
// when value in pMsg has been written into local log store, FpPreCommitCb is called, code indicates the result
|
||||||
// user can do something according to the code and isWeak. for example, write data into tsdb
|
// user can do something according to the code and isWeak. for example, write data into tsdb
|
||||||
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
|
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
|
ESyncState state);
|
||||||
|
|
||||||
// when log entry is updated by a new one, FpRollBackCb is called
|
// when log entry is updated by a new one, FpRollBackCb is called
|
||||||
// user can do something to roll back. for example, delete data from tsdb, or just ignore it
|
// user can do something to roll back. for example, delete data from tsdb, or just ignore it
|
||||||
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code);
|
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
|
ESyncState state);
|
||||||
|
|
||||||
// user should implement this function, use "data" to take snapshot into "snapshot"
|
// user should implement this function, use "data" to take snapshot into "snapshot"
|
||||||
int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
|
int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
|
||||||
|
@ -159,7 +162,7 @@ int64_t syncStart(const SSyncInfo* pSyncInfo);
|
||||||
void syncStop(int64_t rid);
|
void syncStop(int64_t rid);
|
||||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
|
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
|
||||||
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function
|
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function
|
||||||
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // just for compatibility
|
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // for compatibility, the same as syncPropose
|
||||||
ESyncState syncGetMyRole(int64_t rid);
|
ESyncState syncGetMyRole(int64_t rid);
|
||||||
|
|
||||||
extern int32_t sDebugFlag;
|
extern int32_t sDebugFlag;
|
||||||
|
|
|
@ -213,7 +213,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||||
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0);
|
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state);
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
syncEntryDestory(pRollBackEntry);
|
syncEntryDestory(pRollBackEntry);
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2);
|
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
@ -255,7 +255,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3);
|
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
@ -326,7 +326,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||||
|
|
||||||
if (ths->pFsm->FpCommitCb != NULL) {
|
if (ths->pFsm->FpCommitCb != NULL) {
|
||||||
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
|
|
@ -91,7 +91,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||||
|
|
||||||
if (pSyncNode->pFsm->FpCommitCb != NULL) {
|
if (pSyncNode->pFsm->FpCommitCb != NULL) {
|
||||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state);
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
|
|
@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
|
||||||
static int32_t syncIOStartInternal(SSyncIO *io);
|
static int32_t syncIOStartInternal(SSyncIO *io);
|
||||||
static int32_t syncIOStopInternal(SSyncIO *io);
|
static int32_t syncIOStopInternal(SSyncIO *io);
|
||||||
|
|
||||||
static void *syncIOConsumerFunc(void *param);
|
static void * syncIOConsumerFunc(void *param);
|
||||||
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
|
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||||
|
@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *syncIOConsumerFunc(void *param) {
|
static void *syncIOConsumerFunc(void *param) {
|
||||||
SSyncIO *io = param;
|
SSyncIO * io = param;
|
||||||
STaosQall *qall;
|
STaosQall *qall;
|
||||||
SRpcMsg *pRpcMsg, rpcMsg;
|
SRpcMsg * pRpcMsg, rpcMsg;
|
||||||
qall = taosAllocateQall();
|
qall = taosAllocateQall();
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
|
@ -849,7 +849,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
|
||||||
|
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0);
|
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
@ -864,7 +864,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
|
||||||
|
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL) {
|
if (ths->pFsm->FpPreCommitCb != NULL) {
|
||||||
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1);
|
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
|
|
@ -28,19 +28,29 @@ SSyncFSM * pFsm;
|
||||||
SWal * pWal;
|
SWal * pWal;
|
||||||
SSyncNode *gSyncNode;
|
SSyncNode *gSyncNode;
|
||||||
|
|
||||||
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
ESyncState state) {
|
||||||
syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf);
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||||
|
pFsm, index, isWeak, code, state, syncUtilState2String(state));
|
||||||
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
ESyncState state) {
|
||||||
syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf);
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
|
||||||
|
code, state, syncUtilState2String(state));
|
||||||
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
ESyncState state) {
|
||||||
syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf);
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||||
|
pFsm, index, isWeak, code, state, syncUtilState2String(state));
|
||||||
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initFsm() {
|
void initFsm() {
|
||||||
|
|
|
@ -27,24 +27,28 @@ SSyncInfo syncInfo;
|
||||||
SSyncFSM *pFsm;
|
SSyncFSM *pFsm;
|
||||||
SWal * pWal;
|
SWal * pWal;
|
||||||
|
|
||||||
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) {
|
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
|
ESyncState state) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index,
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||||
isWeak, code);
|
pFsm, index, isWeak, code, state, syncUtilState2String(state));
|
||||||
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) {
|
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
|
ESyncState state) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm,
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
index, isWeak, code);
|
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
|
||||||
|
code, state, syncUtilState2String(state));
|
||||||
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code) {
|
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
|
ESyncState state) {
|
||||||
char logBuf[256];
|
char logBuf[256];
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index,
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||||
isWeak, code);
|
pFsm, index, isWeak, code, state, syncUtilState2String(state));
|
||||||
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,19 +28,29 @@ SSyncFSM * pFsm;
|
||||||
SWal * pWal;
|
SWal * pWal;
|
||||||
SSyncNode *gSyncNode;
|
SSyncNode *gSyncNode;
|
||||||
|
|
||||||
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
ESyncState state) {
|
||||||
syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf);
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||||
|
pFsm, index, isWeak, code, state, syncUtilState2String(state));
|
||||||
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
ESyncState state) {
|
||||||
syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf);
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
|
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
|
||||||
|
code, state, syncUtilState2String(state));
|
||||||
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) {
|
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
|
||||||
printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code);
|
ESyncState state) {
|
||||||
syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf);
|
char logBuf[256];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
||||||
|
pFsm, index, isWeak, code, state, syncUtilState2String(state));
|
||||||
|
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initFsm() {
|
void initFsm() {
|
||||||
|
|
Loading…
Reference in New Issue