Merge remote-tracking branch 'origin/feature/sync-mnode-integration' into fix/mnode
This commit is contained in:
commit
7ec3c82cd6
|
@ -17,7 +17,13 @@
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) {
|
||||||
|
|
||||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||||
|
|
||||||
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
|
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||||
|
|
||||||
|
|
|
@ -149,7 +149,7 @@ typedef struct SSyncNode {
|
||||||
|
|
||||||
// restore state
|
// restore state
|
||||||
bool restoreFinish;
|
bool restoreFinish;
|
||||||
sem_t restoreSem;
|
//sem_t restoreSem;
|
||||||
SSnapshot* pSnapshot;
|
SSnapshot* pSnapshot;
|
||||||
|
|
||||||
} SSyncNode;
|
} SSyncNode;
|
||||||
|
|
|
@ -366,8 +366,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
ths->pFsm->FpRestoreFinish(ths->pFsm);
|
ths->pFsm->FpRestoreFinish(ths->pFsm);
|
||||||
}
|
}
|
||||||
ths->restoreFinish = true;
|
ths->restoreFinish = true;
|
||||||
|
sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
|
||||||
|
|
||||||
|
/*
|
||||||
tsem_post(&ths->restoreSem);
|
tsem_post(&ths->restoreSem);
|
||||||
|
sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -143,8 +143,12 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
|
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
|
||||||
}
|
}
|
||||||
pSyncNode->restoreFinish = true;
|
pSyncNode->restoreFinish = true;
|
||||||
|
sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId);
|
||||||
|
|
||||||
|
/*
|
||||||
tsem_post(&pSyncNode->restoreSem);
|
tsem_post(&pSyncNode->restoreSem);
|
||||||
|
sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode);
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -499,7 +499,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
||||||
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
|
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
|
||||||
}
|
}
|
||||||
tsem_init(&(pSyncNode->restoreSem), 0, 0);
|
//tsem_init(&(pSyncNode->restoreSem), 0, 0);
|
||||||
|
|
||||||
// start in syncNodeStart
|
// start in syncNodeStart
|
||||||
// start raft
|
// start raft
|
||||||
|
@ -521,7 +521,19 @@ void syncNodeStart(SSyncNode* pSyncNode) {
|
||||||
syncNodeAppendNoop(pSyncNode);
|
syncNodeAppendNoop(pSyncNode);
|
||||||
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
|
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
|
||||||
|
|
||||||
|
/*
|
||||||
|
sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
|
||||||
tsem_wait(&pSyncNode->restoreSem);
|
tsem_wait(&pSyncNode->restoreSem);
|
||||||
|
sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
while (pSyncNode->restoreFinish != true) {
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
sInfo("==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,7 +544,18 @@ void syncNodeStart(SSyncNode* pSyncNode) {
|
||||||
// ret = syncNodeStartPingTimer(pSyncNode);
|
// ret = syncNodeStartPingTimer(pSyncNode);
|
||||||
assert(ret == 0);
|
assert(ret == 0);
|
||||||
|
|
||||||
|
/*
|
||||||
|
sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
|
||||||
tsem_wait(&pSyncNode->restoreSem);
|
tsem_wait(&pSyncNode->restoreSem);
|
||||||
|
sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
while (pSyncNode->restoreFinish != true) {
|
||||||
|
taosMsleep(10);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
sInfo("==syncNodeStart== restoreFinish ok multi replica %p vgId:%d", pSyncNode, pSyncNode->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||||
|
@ -573,7 +596,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
taosMemoryFree(pSyncNode->pSnapshot);
|
taosMemoryFree(pSyncNode->pSnapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_destroy(&pSyncNode->restoreSem);
|
//tsem_destroy(&pSyncNode->restoreSem);
|
||||||
|
|
||||||
// free memory in syncFreeNode
|
// free memory in syncFreeNode
|
||||||
// taosMemoryFree(pSyncNode);
|
// taosMemoryFree(pSyncNode);
|
||||||
|
|
|
@ -94,7 +94,7 @@ typedef void* queue[2];
|
||||||
/* Return the structure holding the given element. */
|
/* Return the structure holding the given element. */
|
||||||
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
|
||||||
|
|
||||||
#define TRANS_RETRY_COUNT_LIMIT 20 // retry count limit
|
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
|
||||||
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
|
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
|
||||||
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
#define TRANS_CONN_TIMEOUT 3 // connect timeout
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue