fix: impl ring log buffer
This commit is contained in:
parent
569e776035
commit
5878c9a31a
|
@ -45,7 +45,7 @@ extern bool gRaftDetailLog;
|
||||||
#define SYNC_MAX_BATCH_SIZE 1
|
#define SYNC_MAX_BATCH_SIZE 1
|
||||||
#define SYNC_INDEX_BEGIN 0
|
#define SYNC_INDEX_BEGIN 0
|
||||||
#define SYNC_INDEX_INVALID -1
|
#define SYNC_INDEX_INVALID -1
|
||||||
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
|
#define SYNC_TERM_INVALID -1 // 0xFFFFFFFFFFFFFFFF
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
SYNC_STRATEGY_NO_SNAPSHOT = 0,
|
SYNC_STRATEGY_NO_SNAPSHOT = 0,
|
||||||
|
@ -56,7 +56,7 @@ typedef enum {
|
||||||
typedef uint64_t SyncNodeId;
|
typedef uint64_t SyncNodeId;
|
||||||
typedef int32_t SyncGroupId;
|
typedef int32_t SyncGroupId;
|
||||||
typedef int64_t SyncIndex;
|
typedef int64_t SyncIndex;
|
||||||
typedef uint64_t SyncTerm;
|
typedef int64_t SyncTerm;
|
||||||
|
|
||||||
typedef struct SSyncNode SSyncNode;
|
typedef struct SSyncNode SSyncNode;
|
||||||
typedef struct SSyncBuffer SSyncBuffer;
|
typedef struct SSyncBuffer SSyncBuffer;
|
||||||
|
@ -201,7 +201,7 @@ typedef struct SSyncInfo {
|
||||||
int32_t syncInit();
|
int32_t syncInit();
|
||||||
void syncCleanUp();
|
void syncCleanUp();
|
||||||
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
int64_t syncOpen(SSyncInfo* pSyncInfo);
|
||||||
void syncStart(int64_t rid);
|
int32_t syncStart(int64_t rid);
|
||||||
void syncStop(int64_t rid);
|
void syncStop(int64_t rid);
|
||||||
int32_t syncSetStandby(int64_t rid);
|
int32_t syncSetStandby(int64_t rid);
|
||||||
ESyncState syncGetMyRole(int64_t rid);
|
ESyncState syncGetMyRole(int64_t rid);
|
||||||
|
|
|
@ -170,7 +170,7 @@ int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo
|
||||||
|
|
||||||
// Assign version automatically and return to caller,
|
// Assign version automatically and return to caller,
|
||||||
// -1 will be returned for failed writes
|
// -1 will be returned for failed writes
|
||||||
int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
|
int64_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen);
|
||||||
|
|
||||||
void walFsync(SWal *, bool force);
|
void walFsync(SWal *, bool force);
|
||||||
|
|
||||||
|
|
|
@ -281,6 +281,7 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_DNODE_ROLE_VNODE 2
|
#define TSDB_DNODE_ROLE_VNODE 2
|
||||||
|
|
||||||
#define TSDB_MAX_REPLICA 5
|
#define TSDB_MAX_REPLICA 5
|
||||||
|
#define TSDB_SYNC_LOG_BUFFER_SIZE 500
|
||||||
|
|
||||||
#define TSDB_TBNAME_COLUMN_INDEX (-1)
|
#define TSDB_TBNAME_COLUMN_INDEX (-1)
|
||||||
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
|
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
|
||||||
|
|
|
@ -309,8 +309,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
|
|
||||||
void mndSyncStart(SMnode *pMnode) {
|
void mndSyncStart(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
|
if (syncStart(pMgmt->sync) < 0) {
|
||||||
|
mError("vgId:1, failed to start sync subsystem");
|
||||||
|
return;
|
||||||
|
}
|
||||||
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
||||||
syncStart(pMgmt->sync);
|
|
||||||
mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync);
|
mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -97,7 +97,7 @@ bool vnodeShouldRollback(SVnode* pVnode);
|
||||||
|
|
||||||
// vnodeSync.c
|
// vnodeSync.c
|
||||||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||||
void vnodeSyncStart(SVnode* pVnode);
|
int32_t vnodeSyncStart(SVnode* pVnode);
|
||||||
void vnodeSyncClose(SVnode* pVnode);
|
void vnodeSyncClose(SVnode* pVnode);
|
||||||
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||||
bool vnodeIsLeader(SVnode* pVnode);
|
bool vnodeIsLeader(SVnode* pVnode);
|
||||||
|
|
|
@ -714,7 +714,8 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
||||||
taosArraySet(pMerger->pArray, iCol, pColVal);
|
taosArraySet(pMerger->pArray, iCol, pColVal);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
// ASSERT(0);
|
||||||
|
tsdbError("dup key accounted: key version:%" PRId64 ", merger version:%" PRId64, key.version, pMerger->version);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -888,7 +889,6 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _clear;
|
goto _clear;
|
||||||
}
|
}
|
||||||
|
|
||||||
midx = (sidx + eidx) / 2;
|
midx = (sidx + eidx) / 2;
|
||||||
|
|
||||||
code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1);
|
code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1);
|
||||||
|
|
|
@ -665,9 +665,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncStart(SVnode *pVnode) {
|
int32_t vnodeSyncStart(SVnode *pVnode) {
|
||||||
|
if (syncStart(pVnode->sync) < 0) {
|
||||||
|
vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
||||||
syncStart(pVnode->sync);
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||||
|
|
|
@ -88,6 +88,44 @@ typedef struct SPeerState {
|
||||||
int64_t lastSendTime;
|
int64_t lastSendTime;
|
||||||
} SPeerState;
|
} SPeerState;
|
||||||
|
|
||||||
|
typedef struct SSyncLogBufEntry {
|
||||||
|
SSyncRaftEntry* pItem;
|
||||||
|
SyncIndex prevLogIndex;
|
||||||
|
SyncTerm prevLogTerm;
|
||||||
|
} SSyncLogBufEntry;
|
||||||
|
|
||||||
|
typedef struct SSyncLogBuffer {
|
||||||
|
SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE];
|
||||||
|
int64_t startIndex;
|
||||||
|
int64_t commitIndex;
|
||||||
|
int64_t matchIndex;
|
||||||
|
int64_t endIndex;
|
||||||
|
int64_t size;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
} SSyncLogBuffer;
|
||||||
|
|
||||||
|
SSyncLogBuffer* syncLogBufferCreate();
|
||||||
|
void syncLogBufferDestroy(SSyncLogBuffer* pBuf);
|
||||||
|
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
|
||||||
|
|
||||||
|
// access
|
||||||
|
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
|
||||||
|
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
|
||||||
|
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
|
||||||
|
int64_t syncLogBufferLoad(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex);
|
||||||
|
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode);
|
||||||
|
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex);
|
||||||
|
|
||||||
|
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex);
|
||||||
|
SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index);
|
||||||
|
|
||||||
|
// private
|
||||||
|
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf);
|
||||||
|
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex);
|
||||||
|
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index);
|
||||||
|
void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index);
|
||||||
|
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index);
|
||||||
|
|
||||||
typedef struct SSyncNode {
|
typedef struct SSyncNode {
|
||||||
// init by SSyncInfo
|
// init by SSyncInfo
|
||||||
SyncGroupId vgId;
|
SyncGroupId vgId;
|
||||||
|
@ -97,6 +135,7 @@ typedef struct SSyncNode {
|
||||||
char configPath[TSDB_FILENAME_LEN * 2];
|
char configPath[TSDB_FILENAME_LEN * 2];
|
||||||
|
|
||||||
// sync io
|
// sync io
|
||||||
|
SSyncLogBuffer* pLogBuf;
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
const SMsgCb* msgcb;
|
const SMsgCb* msgcb;
|
||||||
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||||
|
@ -186,7 +225,7 @@ typedef struct SSyncNode {
|
||||||
SSyncRespMgr* pSyncRespMgr;
|
SSyncRespMgr* pSyncRespMgr;
|
||||||
|
|
||||||
// restore state
|
// restore state
|
||||||
bool restoreFinish;
|
_Atomic bool restoreFinish;
|
||||||
// SSnapshot* pSnapshot;
|
// SSnapshot* pSnapshot;
|
||||||
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
|
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
|
||||||
SSyncSnapshotReceiver* pNewNodeReceiver;
|
SSyncSnapshotReceiver* pNewNodeReceiver;
|
||||||
|
@ -208,10 +247,11 @@ typedef struct SSyncNode {
|
||||||
|
|
||||||
// open/close --------------
|
// open/close --------------
|
||||||
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
|
SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
|
||||||
void syncNodeStart(SSyncNode* pSyncNode);
|
int32_t syncNodeStart(SSyncNode* pSyncNode);
|
||||||
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||||
void syncNodeClose(SSyncNode* pSyncNode);
|
void syncNodeClose(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
||||||
|
int32_t syncNodeRestore(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
// option
|
// option
|
||||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
|
@ -298,7 +338,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
|
||||||
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
|
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
|
||||||
|
|
||||||
void syncStartNormal(int64_t rid);
|
void syncStartNormal(int64_t rid);
|
||||||
void syncStartStandBy(int64_t rid);
|
int32_t syncStartStandBy(int64_t rid);
|
||||||
|
|
||||||
bool syncNodeCanChange(SSyncNode* pSyncNode);
|
bool syncNodeCanChange(SSyncNode* pSyncNode);
|
||||||
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);
|
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);
|
||||||
|
|
|
@ -47,7 +47,7 @@ SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncInde
|
||||||
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
|
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
|
||||||
SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index);
|
SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index);
|
||||||
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId);
|
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId);
|
||||||
void syncEntryDestory(SSyncRaftEntry* pEntry);
|
void syncEntryDestroy(SSyncRaftEntry* pEntry);
|
||||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
|
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
|
||||||
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6
|
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6
|
||||||
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
|
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
|
||||||
|
|
|
@ -57,8 +57,8 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, cons
|
||||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId);
|
int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId);
|
||||||
|
|
||||||
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg);
|
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId, SyncAppendEntries* pMsg);
|
||||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg);
|
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId, SyncAppendEntries* pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,7 +118,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncEntryDestory(pRollBackEntry);
|
syncEntryDestroy(pRollBackEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -161,7 +161,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncEntryDestory(pRollBackEntry);
|
syncEntryDestroy(pRollBackEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,7 +308,551 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) {
|
||||||
|
return syncEntryBuildNoop(term, index, vgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
ASSERT(pNode->pLogStore != NULL && "log store not created");
|
||||||
|
ASSERT(pNode->pFsm != NULL && "pFsm not registered");
|
||||||
|
ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
|
||||||
|
|
||||||
|
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||||
|
SSnapshot snapshot;
|
||||||
|
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
|
||||||
|
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
||||||
|
SyncTerm commitTerm = snapshot.lastApplyTerm;
|
||||||
|
SyncIndex toIndex = TMAX(lastVer, commitIndex);
|
||||||
|
|
||||||
|
// update match index
|
||||||
|
pBuf->commitIndex = commitIndex;
|
||||||
|
pBuf->matchIndex = toIndex;
|
||||||
|
pBuf->endIndex = toIndex + 1;
|
||||||
|
|
||||||
|
// load log entries in reverse order
|
||||||
|
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||||
|
SyncIndex index = toIndex;
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
bool takeDummy = false;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (index <= pBuf->commitIndex) {
|
||||||
|
takeDummy = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
|
||||||
|
sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
|
||||||
|
ASSERT(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool taken = false;
|
||||||
|
if (toIndex <= index + pBuf->size - 1) {
|
||||||
|
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1};
|
||||||
|
pBuf->entries[index % pBuf->size] = tmp;
|
||||||
|
taken = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (index < toIndex) {
|
||||||
|
pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index;
|
||||||
|
pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!taken) {
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
pEntry = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
index--;
|
||||||
|
}
|
||||||
|
|
||||||
|
// put a dummy record at commitIndex if present in log buffer
|
||||||
|
if (takeDummy) {
|
||||||
|
ASSERT(index == pBuf->commitIndex);
|
||||||
|
|
||||||
|
SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
|
||||||
|
if (pDummy == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
|
||||||
|
pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
|
||||||
|
|
||||||
|
if (index < toIndex) {
|
||||||
|
pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex;
|
||||||
|
pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update startIndex
|
||||||
|
pBuf->startIndex = index;
|
||||||
|
|
||||||
|
// validate
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t syncLogBufferLoadOld(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
|
||||||
|
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||||
|
ASSERT(pBuf->startIndex <= pBuf->matchIndex);
|
||||||
|
ASSERT(pBuf->matchIndex + 1 == pBuf->endIndex);
|
||||||
|
SyncIndex index = pBuf->endIndex;
|
||||||
|
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
|
||||||
|
ASSERT(pMatch != NULL);
|
||||||
|
|
||||||
|
while (index - pBuf->startIndex < pBuf->size && index <= toIndex) {
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
|
||||||
|
sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
|
||||||
|
ASSERT(0);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
ASSERT(pMatch->index + 1 == pEntry->index);
|
||||||
|
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
|
||||||
|
pBuf->entries[pBuf->endIndex % pBuf->size] = tmp;
|
||||||
|
|
||||||
|
sInfo("vgId:%d, loaded log entry into log buffer. index: %" PRId64 ", term: %" PRId64, pNode->vgId, pEntry->index,
|
||||||
|
pEntry->term);
|
||||||
|
|
||||||
|
pBuf->matchIndex = index;
|
||||||
|
pBuf->endIndex = index + 1;
|
||||||
|
pMatch = pEntry;
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferInitOld(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
ASSERT(pNode->pLogStore != NULL && "log store not created");
|
||||||
|
ASSERT(pNode->pFsm != NULL && "pFsm not registered");
|
||||||
|
ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered");
|
||||||
|
|
||||||
|
SSnapshot snapshot;
|
||||||
|
if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) {
|
||||||
|
sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr());
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
SyncIndex commitIndex = snapshot.lastApplyIndex;
|
||||||
|
SyncTerm commitTerm = snapshot.lastApplyTerm;
|
||||||
|
|
||||||
|
// init log buffer indexes
|
||||||
|
pBuf->startIndex = commitIndex;
|
||||||
|
pBuf->matchIndex = commitIndex;
|
||||||
|
pBuf->commitIndex = commitIndex;
|
||||||
|
pBuf->endIndex = commitIndex + 1;
|
||||||
|
|
||||||
|
// put a dummy record at initial commitIndex
|
||||||
|
SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId);
|
||||||
|
if (pDummy == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm};
|
||||||
|
pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferRollbackMatchIndex(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
|
||||||
|
if (toIndex <= pBuf->commitIndex) {
|
||||||
|
sError("vgId:%d, cannot rollback across commit index:%" PRId64 ", to index:%" PRId64 "", pNode->vgId,
|
||||||
|
pBuf->commitIndex, toIndex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBuf->matchIndex = TMIN(pBuf->matchIndex, toIndex - 1);
|
||||||
|
|
||||||
|
// update my match index
|
||||||
|
syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
SyncIndex index = pEntry->index;
|
||||||
|
SyncIndex prevIndex = pEntry->index - 1;
|
||||||
|
if (index <= pBuf->commitIndex || index - pBuf->startIndex > pBuf->size) {
|
||||||
|
sInfo("vgId:%d, cannot accept index:%" PRId64 " into log buffer. start index: %" PRId64 ", commit index: %" PRId64
|
||||||
|
", end index:%" PRId64 ")",
|
||||||
|
pNode->vgId, index, pBuf->startIndex, pBuf->commitIndex, pBuf->endIndex);
|
||||||
|
ret = (index <= pBuf->commitIndex) ? 0 : -1;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check current in buffer
|
||||||
|
SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
|
||||||
|
if (pExist != NULL) {
|
||||||
|
ASSERT(pEntry->index == pExist->index);
|
||||||
|
|
||||||
|
if (pEntry->term > pExist->term) {
|
||||||
|
(void)syncLogBufferRollback(pBuf, index);
|
||||||
|
} else {
|
||||||
|
sInfo("vgId:%d, %s raft entry received. index:%" PRId64 ", term: %" PRId64 "", pNode->vgId,
|
||||||
|
((pEntry->term < pExist->term) ? "stale" : "duplicate"), pEntry->index, pEntry->term);
|
||||||
|
SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm;
|
||||||
|
ASSERT(pEntry->term < pExist->term || (pEntry->term == pExist->term && prevTerm == existPrevTerm));
|
||||||
|
ret = (pEntry->term < pExist->term) ? 0 : -1;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// update
|
||||||
|
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
|
||||||
|
pEntry = NULL;
|
||||||
|
pBuf->entries[index % pBuf->size] = tmp;
|
||||||
|
|
||||||
|
// update end index
|
||||||
|
pBuf->endIndex = TMAX(index + 1, pBuf->endIndex);
|
||||||
|
|
||||||
|
_out:
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) {
|
||||||
|
SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen);
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
(void)memcpy(pEntry, pMsg->data, pMsg->dataLen);
|
||||||
|
ASSERT(pEntry->bytes == pMsg->dataLen);
|
||||||
|
return pEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
|
SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
|
||||||
|
if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
|
||||||
|
sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
lastVer = pLogStore->syncLogLastIndex(pLogStore);
|
||||||
|
ASSERT(pEntry->index == lastVer + 1);
|
||||||
|
|
||||||
|
if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) {
|
||||||
|
sError("failed to append raft log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
|
||||||
|
pEntry->term);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
|
||||||
|
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||||
|
int64_t matchIndex = pBuf->matchIndex;
|
||||||
|
|
||||||
|
while (pBuf->matchIndex + 1 < pBuf->endIndex) {
|
||||||
|
int64_t index = pBuf->matchIndex + 1;
|
||||||
|
ASSERT(index >= 0);
|
||||||
|
|
||||||
|
// try to proceed
|
||||||
|
SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size];
|
||||||
|
SyncIndex prevLogIndex = pBufEntry->prevLogIndex;
|
||||||
|
SyncTerm prevLogTerm = pBufEntry->prevLogTerm;
|
||||||
|
SSyncRaftEntry* pEntry = pBufEntry->pItem;
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
sDebug("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64,
|
||||||
|
pNode->vgId, pBuf->matchIndex);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(index == pEntry->index);
|
||||||
|
|
||||||
|
// match
|
||||||
|
SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem;
|
||||||
|
ASSERT(pMatch != NULL);
|
||||||
|
ASSERT(pMatch->index == pBuf->matchIndex);
|
||||||
|
ASSERT(pMatch->index + 1 == pEntry->index);
|
||||||
|
ASSERT(prevLogIndex == pMatch->index);
|
||||||
|
|
||||||
|
if (pMatch->term != prevLogTerm) {
|
||||||
|
sError(
|
||||||
|
"vgId:%d, mismatching raft log entries encountered. "
|
||||||
|
"{ index:%" PRId64 ", term:%" PRId64
|
||||||
|
" } "
|
||||||
|
"{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ",
|
||||||
|
pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// replicate on demand
|
||||||
|
if (pNode->state == TAOS_SYNC_STATE_LEADER && pNode->replicaNum > 1) {
|
||||||
|
(void)syncLogBufferReplicate(pBuf, pNode, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
// persist
|
||||||
|
if (syncLogStorePersist(pLogStore, pEntry) < 0) {
|
||||||
|
sError("vgId:%d, failed to persist raft log entry from log buffer since %s. index:%" PRId64, pNode->vgId,
|
||||||
|
terrstr(), pEntry->index);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
// increment
|
||||||
|
pBuf->matchIndex = index;
|
||||||
|
matchIndex = pBuf->matchIndex;
|
||||||
|
|
||||||
|
// update my match index
|
||||||
|
syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex);
|
||||||
|
} // end of while
|
||||||
|
|
||||||
|
_out:
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return matchIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) {
|
||||||
|
ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM");
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||||
|
|
||||||
|
SFsmCbMeta cbMeta = {0};
|
||||||
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = -1;
|
||||||
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
|
cbMeta.code = 0;
|
||||||
|
cbMeta.state = role;
|
||||||
|
cbMeta.seqNum = pEntry->seqNum;
|
||||||
|
cbMeta.term = pEntry->term;
|
||||||
|
cbMeta.currentTerm = term;
|
||||||
|
cbMeta.flag = -1;
|
||||||
|
|
||||||
|
pFsm->FpCommitCb(pFsm, &rpcMsg, cbMeta);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
|
||||||
|
ASSERT(pBuf->startIndex <= pBuf->matchIndex);
|
||||||
|
ASSERT(pBuf->commitIndex <= pBuf->matchIndex);
|
||||||
|
ASSERT(pBuf->matchIndex < pBuf->endIndex);
|
||||||
|
ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size);
|
||||||
|
for (SyncIndex index = pBuf->commitIndex; index <= pBuf->matchIndex; index++) {
|
||||||
|
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||||
|
ASSERT(pEntry != NULL);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
|
||||||
|
SSyncLogStore* pLogStore = pNode->pLogStore;
|
||||||
|
SSyncFSM* pFsm = pNode->pFsm;
|
||||||
|
ESyncState role = pNode->state;
|
||||||
|
SyncTerm term = pNode->pRaftStore->currentTerm;
|
||||||
|
SyncGroupId vgId = pNode->vgId;
|
||||||
|
int32_t ret = 0;
|
||||||
|
int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex);
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
bool inBuf = false;
|
||||||
|
|
||||||
|
if (commitIndex <= pBuf->commitIndex) {
|
||||||
|
sDebug("vgId:%d, stale commit update. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex,
|
||||||
|
commitIndex);
|
||||||
|
ret = 0;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
sDebug("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64
|
||||||
|
", match index: %" PRId64 ", end index:%" PRId64 "",
|
||||||
|
pNode->vgId, role, term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
|
|
||||||
|
// execute in fsm
|
||||||
|
for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) {
|
||||||
|
// get a log entry
|
||||||
|
if (index >= pBuf->startIndex) {
|
||||||
|
inBuf = true;
|
||||||
|
pEntry = pBuf->entries[index % pBuf->size].pItem;
|
||||||
|
} else {
|
||||||
|
inBuf = false;
|
||||||
|
if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) {
|
||||||
|
sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index);
|
||||||
|
ret = -1;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pEntry != NULL);
|
||||||
|
|
||||||
|
// execute it
|
||||||
|
if (!syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||||
|
sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index,
|
||||||
|
pEntry->term);
|
||||||
|
pBuf->commitIndex = index;
|
||||||
|
if (!inBuf) {
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
pEntry = NULL;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (syncLogFsmExecute(pFsm, role, term, pEntry) != 0) {
|
||||||
|
sError("vgId:%d, failed to execute raft entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId,
|
||||||
|
pEntry->index, pEntry->term);
|
||||||
|
ret = -1;
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
pBuf->commitIndex = index;
|
||||||
|
|
||||||
|
sInfo("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId,
|
||||||
|
pEntry->index, pEntry->term, role, term);
|
||||||
|
|
||||||
|
if (!inBuf) {
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
pEntry = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recycle
|
||||||
|
// TODO: with a grace period of one third of free space before commitIndex in ring buffer
|
||||||
|
SyncIndex until = pBuf->commitIndex;
|
||||||
|
for (SyncIndex index = pBuf->startIndex; index < until; index++) {
|
||||||
|
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||||
|
ASSERT(pEntry != NULL);
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||||
|
pBuf->startIndex = index + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
_out:
|
||||||
|
// mark as restored if needed
|
||||||
|
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) {
|
||||||
|
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
|
||||||
|
pNode->restoreFinish = true;
|
||||||
|
sInfo("vgId:%d, restore finished. commit index:%" PRId64 ", match index:%" PRId64 ", last index:%" PRId64 "",
|
||||||
|
pNode->vgId, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!inBuf) {
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
pEntry = NULL;
|
||||||
|
}
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
SyncAppendEntriesReply* pReply = NULL;
|
||||||
|
// if already drop replica, do not process
|
||||||
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
|
syncLogRecvAppendEntries(ths, pMsg, "not in my config");
|
||||||
|
goto _IGNORE;
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare response msg
|
||||||
|
pReply = syncAppendEntriesReplyBuild(ths->vgId);
|
||||||
|
pReply->srcId = ths->myRaftId;
|
||||||
|
pReply->destId = pMsg->srcId;
|
||||||
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
|
pReply->success = false;
|
||||||
|
pReply->matchIndex = SYNC_INDEX_INVALID;
|
||||||
|
pReply->lastSendIndex = pMsg->prevLogIndex + 1;
|
||||||
|
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
||||||
|
pReply->startTime = ths->startTime;
|
||||||
|
|
||||||
|
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||||
|
goto _SEND_RESPONSE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
|
pReply->term = pMsg->term;
|
||||||
|
}
|
||||||
|
|
||||||
|
syncNodeStepDown(ths, pMsg->term);
|
||||||
|
syncNodeResetElectTimer(ths);
|
||||||
|
|
||||||
|
// update commit index
|
||||||
|
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
|
||||||
|
|
||||||
|
if (pMsg->dataLen < (int32_t)sizeof(SSyncRaftEntry)) {
|
||||||
|
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
|
||||||
|
ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||||
|
goto _IGNORE;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* pEntry = syncLogAppendEntriesToRaftEntry(pMsg);
|
||||||
|
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
|
||||||
|
goto _IGNORE;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->prevLogIndex + 1 != pEntry->index) {
|
||||||
|
sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64
|
||||||
|
", prevLogTerm:%" PRId64,
|
||||||
|
ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm);
|
||||||
|
goto _IGNORE;
|
||||||
|
}
|
||||||
|
|
||||||
|
sInfo("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64
|
||||||
|
", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "",
|
||||||
|
pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex);
|
||||||
|
|
||||||
|
// accept
|
||||||
|
if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) {
|
||||||
|
sWarn("vgId:%d, failed to accept raft entry into log buffer. index:%" PRId64 ", term:%" PRId64, ths->vgId,
|
||||||
|
pEntry->index, pEntry->term);
|
||||||
|
goto _SEND_RESPONSE;
|
||||||
|
}
|
||||||
|
pReply->success = true;
|
||||||
|
|
||||||
|
_SEND_RESPONSE:
|
||||||
|
// update match index
|
||||||
|
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
|
||||||
|
|
||||||
|
// ack, i.e. send response
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
|
||||||
|
(void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
|
|
||||||
|
// commit index, i.e. leader notice me
|
||||||
|
if (syncLogBufferCommit(ths->pLogBuf, ths, pMsg->commitIndex) < 0) {
|
||||||
|
sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr());
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
_out:
|
||||||
|
_IGNORE:
|
||||||
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
// if already drop replica, do not process
|
// if already drop replica, do not process
|
||||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
syncLogRecvAppendEntries(ths, pMsg, "not in my config");
|
syncLogRecvAppendEntries(ths, pMsg, "not in my config");
|
||||||
|
@ -386,6 +930,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
goto _IGNORE;
|
goto _IGNORE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pAppendEntry->index == appendIndex);
|
||||||
|
|
||||||
// append
|
// append
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -431,34 +977,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
if (code != 0 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
|
||||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
if (pLocalEntry->term == pAppendEntry->term) {
|
|
||||||
// do nothing
|
|
||||||
} else {
|
|
||||||
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
|
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// update match index
|
// update match index
|
||||||
pReply->matchIndex = pAppendEntry->index;
|
pReply->matchIndex = pAppendEntry->index;
|
||||||
|
|
||||||
syncEntryDestory(pLocalEntry);
|
syncEntryDestroy(pLocalEntry);
|
||||||
syncEntryDestory(pAppendEntry);
|
syncEntryDestroy(pAppendEntry);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// no append entries, do nothing
|
// no append entries, do nothing
|
||||||
|
@ -489,4 +1012,4 @@ _SEND_RESPONSE:
|
||||||
syncAppendEntriesReplyDestroy(pReply);
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,6 +84,70 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) {
|
||||||
|
ths->commitIndex = TMAX(commitIndex, ths->commitIndex);
|
||||||
|
SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||||
|
commitIndex = TMIN(ths->commitIndex, lastVer);
|
||||||
|
ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, commitIndex);
|
||||||
|
return commitIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
|
||||||
|
if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) {
|
||||||
|
SyncIndex commitIndex = indexLikely;
|
||||||
|
syncNodeUpdateCommitIndex(ths, commitIndex);
|
||||||
|
sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state,
|
||||||
|
ths->pRaftStore->currentTerm, commitIndex);
|
||||||
|
}
|
||||||
|
return ths->commitIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferCatchingUpReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex fromIndex, SRaftId destId) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
SyncAppendEntries* pMsgOut = NULL;
|
||||||
|
SyncIndex index = fromIndex;
|
||||||
|
|
||||||
|
if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum <= 1) {
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (index < pBuf->startIndex) {
|
||||||
|
sError("vgId:%d, (not implemented yet) replication fromIndex: %" PRId64
|
||||||
|
" that is less than pBuf->startIndex: %" PRId64 ". destId: 0x%016" PRId64 "",
|
||||||
|
pNode->vgId, fromIndex, pBuf->startIndex, destId.addr);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (index > pBuf->matchIndex) {
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
pMsgOut = syncLogToAppendEntries(pBuf, pNode, index);
|
||||||
|
if (pMsgOut == NULL) {
|
||||||
|
sError("vgId:%d, failed to assembly append entries msg since %s. index: %" PRId64 "", pNode->vgId, terrstr(),
|
||||||
|
index);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (syncNodeSendAppendEntries(pNode, &destId, pMsgOut) < 0) {
|
||||||
|
sWarn("vgId:%d, failed to send append entries msg since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "",
|
||||||
|
pNode->vgId, terrstr(), index, destId.addr);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
index += 1;
|
||||||
|
syncAppendEntriesDestroy(pMsgOut);
|
||||||
|
pMsgOut = NULL;
|
||||||
|
} while (false && index <= pBuf->commitIndex);
|
||||||
|
|
||||||
|
_out:
|
||||||
|
syncAppendEntriesDestroy(pMsgOut);
|
||||||
|
pMsgOut = NULL;
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
@ -99,6 +163,63 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
|
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
||||||
|
syncNodeStepDown(ths, pMsg->term);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
||||||
|
|
||||||
|
sInfo("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "",
|
||||||
|
pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex);
|
||||||
|
|
||||||
|
if (pMsg->success) {
|
||||||
|
SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
|
||||||
|
if (pMsg->matchIndex > oldMatchIndex) {
|
||||||
|
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
// commit if needed
|
||||||
|
SyncIndex indexLikely = TMIN(pMsg->matchIndex, ths->pLogBuf->matchIndex);
|
||||||
|
SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely);
|
||||||
|
(void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex);
|
||||||
|
} else {
|
||||||
|
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||||
|
if (nextIndex > SYNC_INDEX_BEGIN) {
|
||||||
|
--nextIndex;
|
||||||
|
}
|
||||||
|
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
// send next append entries
|
||||||
|
SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId));
|
||||||
|
ASSERT(pState != NULL);
|
||||||
|
|
||||||
|
if (pMsg->lastSendIndex == pState->lastSendIndex) {
|
||||||
|
syncNodeReplicateOne(ths, &(pMsg->srcId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
// if already drop replica, do not process
|
||||||
|
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||||
|
syncLogRecvAppendEntriesReply(ths, pMsg, "not in my config");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// drop stale response
|
||||||
|
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||||
|
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||||
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
|
||||||
|
@ -135,4 +256,4 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs
|
||||||
|
|
||||||
syncLogRecvAppendEntriesReply(ths, pMsg, "process");
|
syncLogRecvAppendEntriesReply(ths, pMsg, "process");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,12 +44,100 @@
|
||||||
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
|
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
|
||||||
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
|
||||||
//
|
//
|
||||||
|
|
||||||
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index");
|
syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update commit index
|
||||||
|
SyncIndex newCommitIndex = pSyncNode->commitIndex;
|
||||||
|
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
|
||||||
|
bool agree = syncAgree(pSyncNode, index);
|
||||||
|
|
||||||
|
if (agree) {
|
||||||
|
// term
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
|
||||||
|
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
|
||||||
|
if (h) {
|
||||||
|
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||||
|
} else {
|
||||||
|
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
|
||||||
|
if (code != 0) {
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "advance commit index error, read wal index:%" PRId64, index);
|
||||||
|
syncNodeErrorLog(pSyncNode, logBuf);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// cannot commit, even if quorum agree. need check term!
|
||||||
|
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
|
||||||
|
// update commit index
|
||||||
|
newCommitIndex = index;
|
||||||
|
|
||||||
|
if (h) {
|
||||||
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
do {
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%" PRId64 ", term:%" PRIu64,
|
||||||
|
pEntry->index, pEntry->term);
|
||||||
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
} while (0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (h) {
|
||||||
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// advance commit index as large as possible
|
||||||
|
SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore);
|
||||||
|
if (walCommitVer > newCommitIndex) {
|
||||||
|
newCommitIndex = walCommitVer;
|
||||||
|
}
|
||||||
|
|
||||||
|
// maybe execute fsm
|
||||||
|
if (newCommitIndex > pSyncNode->commitIndex) {
|
||||||
|
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
|
||||||
|
SyncIndex endIndex = newCommitIndex;
|
||||||
|
|
||||||
|
// update commit index
|
||||||
|
pSyncNode->commitIndex = newCommitIndex;
|
||||||
|
|
||||||
|
// call back Wal
|
||||||
|
pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex);
|
||||||
|
|
||||||
|
// execute fsm
|
||||||
|
if (pSyncNode->pFsm != NULL) {
|
||||||
|
int32_t code = syncNodeDoCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state);
|
||||||
|
if (code != 0) {
|
||||||
|
char logBuf[128];
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "advance commit index error, do commit begin:%" PRId64 ", end:%" PRId64,
|
||||||
|
beginIndex, endIndex);
|
||||||
|
syncNodeErrorLog(pSyncNode, logBuf);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncMaybeAdvanceCommitIndexOld(SSyncNode* pSyncNode) {
|
||||||
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
|
syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// advance commit index to sanpshot first
|
// advance commit index to sanpshot first
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
|
@ -93,7 +181,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
if (h) {
|
if (h) {
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
} else {
|
} else {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -109,7 +197,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
if (h) {
|
if (h) {
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
} else {
|
} else {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -245,13 +333,28 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
|
bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) {
|
||||||
|
int count = 0;
|
||||||
|
SSyncIndexMgr* pMatches = pNode->pMatchIndex;
|
||||||
|
ASSERT(pNode->replicaNum == pMatches->replicaNum);
|
||||||
|
|
||||||
|
for (int i = 0; i < pNode->replicaNum; i++) {
|
||||||
|
SyncIndex matchIndex = pMatches->index[i];
|
||||||
|
if (matchIndex >= index) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return count >= pNode->quorum;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool syncAgree(SSyncNode* pNode, SyncIndex index) {
|
||||||
int agreeCount = 0;
|
int agreeCount = 0;
|
||||||
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
|
for (int i = 0; i < pNode->replicaNum; ++i) {
|
||||||
if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) {
|
if (syncAgreeIndex(pNode, &(pNode->replicasId[i]), index)) {
|
||||||
++agreeCount;
|
++agreeCount;
|
||||||
}
|
}
|
||||||
if (agreeCount >= pSyncNode->quorum) {
|
if (agreeCount >= pNode->quorum) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -105,19 +105,32 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) {
|
||||||
return pSyncNode->rid;
|
return pSyncNode->rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncStart(int64_t rid) {
|
int32_t syncStart(int64_t rid) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
return;
|
sError("failed to acquire rid: %" PRId64 " of tsNodeReftId for pSyncNode", rid);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (syncNodeRestore(pSyncNode) < 0) {
|
||||||
|
sError("vgId:%d, failed to restore raft log buffer since %s", pSyncNode->vgId, terrstr());
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSyncNode->pRaftCfg->isStandBy) {
|
if (pSyncNode->pRaftCfg->isStandBy) {
|
||||||
syncNodeStartStandBy(pSyncNode);
|
if (syncNodeStartStandBy(pSyncNode) < 0) {
|
||||||
|
sError("vgId:%d, failed to start raft node as standby since %s", pSyncNode->vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
syncNodeStart(pSyncNode);
|
if (syncNodeStart(pSyncNode) < 0) {
|
||||||
|
sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncStartNormal(int64_t rid) {
|
void syncStartNormal(int64_t rid) {
|
||||||
|
@ -130,14 +143,15 @@ void syncStartNormal(int64_t rid) {
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncStartStandBy(int64_t rid) {
|
int32_t syncStartStandBy(int64_t rid) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
return;
|
return -1;
|
||||||
}
|
}
|
||||||
syncNodeStartStandBy(pSyncNode);
|
syncNodeStartStandBy(pSyncNode);
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncStop(int64_t rid) {
|
void syncStop(int64_t rid) {
|
||||||
|
@ -661,7 +675,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho
|
||||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
|
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (pEntry != NULL) {
|
if (pEntry != NULL) {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
}
|
}
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -673,7 +687,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho
|
||||||
pSnapshot->lastApplyTerm = pEntry->term;
|
pSnapshot->lastApplyTerm = pEntry->term;
|
||||||
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
|
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
|
||||||
|
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1089,6 +1103,38 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSyncLogBuffer* syncLogBufferCreate() {
|
||||||
|
SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer));
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]);
|
||||||
|
|
||||||
|
ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE);
|
||||||
|
|
||||||
|
if (taosThreadMutexInit(&pBuf->mutex, NULL) < 0) {
|
||||||
|
sError("failed to init log buffer mutex due to %s", strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
return pBuf;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
|
||||||
|
if (pBuf == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
(void)taosThreadMutexDestroy(&pBuf->mutex);
|
||||||
|
(void)taosMemoryFree(pBuf);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// open/close --------------
|
// open/close --------------
|
||||||
SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
|
SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
|
||||||
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
|
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
|
||||||
|
@ -1150,6 +1196,13 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
|
||||||
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
|
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
|
||||||
pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg;
|
pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg;
|
||||||
|
|
||||||
|
// create raft log ring buffer
|
||||||
|
pSyncNode->pLogBuf = syncLogBufferCreate();
|
||||||
|
if (pSyncNode->pLogBuf == NULL) {
|
||||||
|
sError("failed to init log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId);
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
|
||||||
// init raft config
|
// init raft config
|
||||||
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
|
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
|
||||||
if (pSyncNode->pRaftCfg == NULL) {
|
if (pSyncNode->pRaftCfg == NULL) {
|
||||||
|
@ -1362,6 +1415,12 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) {
|
||||||
// snapshotting
|
// snapshotting
|
||||||
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
|
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
|
||||||
|
|
||||||
|
// init log buffer
|
||||||
|
if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) {
|
||||||
|
sError("vgId:%d, failed to init raft log buffer since %s", pSyncNode->vgId, terrstr());
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
|
||||||
syncNodeEventLog(pSyncNode, "sync open");
|
syncNodeEventLog(pSyncNode, "sync open");
|
||||||
|
|
||||||
return pSyncNode;
|
return pSyncNode;
|
||||||
|
@ -1387,7 +1446,48 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeStart(SSyncNode* pSyncNode) {
|
int32_t syncNodeRestore(SSyncNode* pSyncNode) {
|
||||||
|
ASSERT(pSyncNode->pLogStore != NULL && "log store not created");
|
||||||
|
ASSERT(pSyncNode->pLogBuf != NULL && "ring log buffer not created");
|
||||||
|
|
||||||
|
SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||||
|
SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore);
|
||||||
|
SyncIndex endIndex = pSyncNode->pLogBuf->endIndex;
|
||||||
|
|
||||||
|
commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
|
||||||
|
|
||||||
|
if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (endIndex <= lastVer) {
|
||||||
|
sError("vgId:%d, failed to load log entries into log buffers. commit index:%" PRId64 ", lastVer: %" PRId64 "",
|
||||||
|
pSyncNode->vgId, commitIndex, lastVer);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeStart(SSyncNode* pSyncNode) {
|
||||||
|
// start raft
|
||||||
|
if (pSyncNode->replicaNum == 1) {
|
||||||
|
raftStoreNextTerm(pSyncNode->pRaftStore);
|
||||||
|
syncNodeBecomeLeader(pSyncNode, "one replica start");
|
||||||
|
|
||||||
|
// Raft 3.6.2 Committing entries from previous terms
|
||||||
|
syncNodeAppendNoop(pSyncNode);
|
||||||
|
} else {
|
||||||
|
syncNodeBecomeFollower(pSyncNode, "first start");
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = 0;
|
||||||
|
ret = syncNodeStartPingTimer(pSyncNode);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncNodeStartOld(SSyncNode* pSyncNode) {
|
||||||
// start raft
|
// start raft
|
||||||
if (pSyncNode->replicaNum == 1) {
|
if (pSyncNode->replicaNum == 1) {
|
||||||
raftStoreNextTerm(pSyncNode->pRaftStore);
|
raftStoreNextTerm(pSyncNode->pRaftStore);
|
||||||
|
@ -1406,7 +1506,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||||
// state change
|
// state change
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||||
|
@ -1419,6 +1519,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
||||||
ret = 0;
|
ret = 0;
|
||||||
ret = syncNodeStartPingTimer(pSyncNode);
|
ret = syncNodeStartPingTimer(pSyncNode);
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeClose(SSyncNode* pSyncNode) {
|
void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
|
@ -1443,6 +1544,8 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
pSyncNode->pMatchIndex = NULL;
|
pSyncNode->pMatchIndex = NULL;
|
||||||
logStoreDestory(pSyncNode->pLogStore);
|
logStoreDestory(pSyncNode->pLogStore);
|
||||||
pSyncNode->pLogStore = NULL;
|
pSyncNode->pLogStore = NULL;
|
||||||
|
syncLogBufferDestroy(pSyncNode->pLogBuf);
|
||||||
|
pSyncNode->pLogBuf = NULL;
|
||||||
raftCfgClose(pSyncNode->pRaftCfg);
|
raftCfgClose(pSyncNode->pRaftCfg);
|
||||||
pSyncNode->pRaftCfg = NULL;
|
pSyncNode->pRaftCfg = NULL;
|
||||||
|
|
||||||
|
@ -2341,6 +2444,43 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) {
|
||||||
|
|
||||||
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
|
void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); }
|
||||||
|
|
||||||
|
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex) {
|
||||||
|
ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
|
||||||
|
|
||||||
|
SyncIndex index = pBuf->endIndex - 1;
|
||||||
|
while (index >= toIndex) {
|
||||||
|
SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
|
||||||
|
if (pEntry != NULL) {
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
pEntry = NULL;
|
||||||
|
memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
|
||||||
|
}
|
||||||
|
index--;
|
||||||
|
}
|
||||||
|
pBuf->endIndex = toIndex;
|
||||||
|
pBuf->matchIndex = TMIN(pBuf->matchIndex, index);
|
||||||
|
ASSERT(index + 1 == toIndex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore);
|
||||||
|
ASSERT(lastVer == pBuf->matchIndex);
|
||||||
|
SyncIndex index = pBuf->endIndex - 1;
|
||||||
|
|
||||||
|
(void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1);
|
||||||
|
|
||||||
|
sInfo("vgId:%d, reset log buffer. start index: %" PRId64 ", commit index: %" PRId64 ", match Index: %" PRId64
|
||||||
|
", end index: %" PRId64 "",
|
||||||
|
pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
|
|
||||||
|
pBuf->endIndex = pBuf->matchIndex + 1;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
// maybe clear leader cache
|
// maybe clear leader cache
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
@ -2365,6 +2505,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
// min match index
|
// min match index
|
||||||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||||
|
|
||||||
|
// reset log buffer
|
||||||
|
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||||
|
|
||||||
// trace log
|
// trace log
|
||||||
do {
|
do {
|
||||||
int32_t debugStrLen = strlen(debugStr);
|
int32_t debugStrLen = strlen(debugStr);
|
||||||
|
@ -2403,7 +2546,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
pSyncNode->leaderTime = taosGetTimestampMs();
|
pSyncNode->leaderTime = taosGetTimestampMs();
|
||||||
|
|
||||||
// reset restoreFinish
|
// reset restoreFinish
|
||||||
pSyncNode->restoreFinish = false;
|
// TODO: disable it temporarily
|
||||||
|
// pSyncNode->restoreFinish = false;
|
||||||
|
|
||||||
// state change
|
// state change
|
||||||
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
|
||||||
|
@ -2467,6 +2611,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
||||||
// min match index
|
// min match index
|
||||||
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
|
||||||
|
|
||||||
|
// reset log buffer
|
||||||
|
syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode);
|
||||||
|
|
||||||
// trace log
|
// trace log
|
||||||
do {
|
do {
|
||||||
int32_t debugStrLen = strlen(debugStr);
|
int32_t debugStrLen = strlen(debugStr);
|
||||||
|
@ -2490,6 +2637,17 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
|
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
|
||||||
|
|
||||||
|
// Raft 3.6.2 Committing entries from previous terms
|
||||||
|
syncNodeAppendNoop(pSyncNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) {
|
||||||
|
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
|
||||||
|
ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted));
|
||||||
|
syncNodeBecomeLeader(pSyncNode, "candidate to leader");
|
||||||
|
|
||||||
|
syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode);
|
||||||
|
|
||||||
// Raft 3.6.2 Committing entries from previous terms
|
// Raft 3.6.2 Committing entries from previous terms
|
||||||
syncNodeAppendNoop(pSyncNode);
|
syncNodeAppendNoop(pSyncNode);
|
||||||
syncMaybeAdvanceCommitIndex(pSyncNode);
|
syncMaybeAdvanceCommitIndex(pSyncNode);
|
||||||
|
@ -2941,7 +3099,46 @@ static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||||
|
// append to log buffer
|
||||||
|
if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) {
|
||||||
|
sError("vgId:%d, failed to enqueue log buffer. index:%" PRId64 "", ths->vgId, pEntry->index);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// proceed match index, with replicating on needed
|
||||||
|
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths);
|
||||||
|
|
||||||
|
// multi replica
|
||||||
|
if (ths->replicaNum > 1) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// single replica
|
||||||
|
(void)syncNodeUpdateCommitIndex(ths, matchIndex);
|
||||||
|
|
||||||
|
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
|
||||||
|
sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||||
|
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
|
||||||
|
SyncTerm term = ths->pRaftStore->currentTerm;
|
||||||
|
|
||||||
|
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return syncNodeAppend(ths, pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
|
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
|
||||||
|
@ -2963,7 +3160,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||||
if (h) {
|
if (h) {
|
||||||
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
|
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
|
||||||
} else {
|
} else {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -3055,6 +3252,114 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
int64_t index = pBuf->endIndex;
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return index;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
|
||||||
|
taosThreadMutexLock(&pBuf->mutex);
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
SyncIndex index = pEntry->index;
|
||||||
|
|
||||||
|
if (index - pBuf->startIndex > pBuf->size) {
|
||||||
|
sError("vgId:%d, failed to append due to log buffer full. index:%" PRId64 "", pNode->vgId, index);
|
||||||
|
goto _out;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(index == pBuf->endIndex);
|
||||||
|
|
||||||
|
SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem;
|
||||||
|
ASSERT(pExist == NULL);
|
||||||
|
|
||||||
|
// initial log buffer with at least one item, e.g. commitIndex
|
||||||
|
SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem;
|
||||||
|
ASSERT(pMatch != NULL && "no matched raft log entry");
|
||||||
|
ASSERT(pMatch->index + 1 == index);
|
||||||
|
|
||||||
|
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term};
|
||||||
|
pBuf->entries[index % pBuf->size] = tmp;
|
||||||
|
pBuf->endIndex = index + 1;
|
||||||
|
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_out:
|
||||||
|
syncLogBufferValidate(pBuf);
|
||||||
|
syncEntryDestroy(pEntry);
|
||||||
|
taosThreadMutexUnlock(&pBuf->mutex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncTerm syncLogBufferGetTerm(SSyncLogBuffer* pBuf, SyncIndex index) {
|
||||||
|
ASSERT(pBuf->startIndex <= index && index < pBuf->endIndex);
|
||||||
|
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
|
||||||
|
ASSERT(pEntry != NULL);
|
||||||
|
return pEntry->term;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index) {
|
||||||
|
SyncAppendEntries* pMsg = NULL;
|
||||||
|
|
||||||
|
if (index < pBuf->startIndex || index >= pBuf->endIndex) {
|
||||||
|
sError("vgId:%d, log entry (%" PRId64 ") out of range of log buffer [%" PRId64 ", %" PRId64 ").", pNode->vgId,
|
||||||
|
index, pBuf->startIndex, pBuf->endIndex);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
sError("vgId:%d, log entry (%" PRId64 ") not exist in log buffer [%" PRId64 ", %" PRId64 ").", pNode->vgId, index,
|
||||||
|
pBuf->startIndex, pBuf->endIndex);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t datalen = pEntry->bytes;
|
||||||
|
pMsg = syncAppendEntriesBuild(datalen, pNode->vgId);
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)memcpy(pMsg->data, pEntry, datalen);
|
||||||
|
|
||||||
|
pMsg->prevLogIndex = index - 1;
|
||||||
|
pMsg->prevLogTerm = syncLogBufferGetTerm(pBuf, pMsg->prevLogIndex);
|
||||||
|
pMsg->vgId = pNode->vgId;
|
||||||
|
pMsg->srcId = pNode->myRaftId;
|
||||||
|
pMsg->term = pNode->pRaftStore->currentTerm;
|
||||||
|
pMsg->commitIndex = pNode->commitIndex;
|
||||||
|
pMsg->privateTerm = 0;
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncLogReplicateAppendEntries(SSyncNode* pNode, SyncAppendEntries* pMsg) {
|
||||||
|
for (int i = 0; i < pNode->replicaNum; i++) {
|
||||||
|
SRaftId* pDestId = &pNode->peersId[i];
|
||||||
|
if (!syncUtilSameId(pDestId, &pNode->myRaftId)) {
|
||||||
|
(void)syncNodeSendAppendEntries(pNode, pDestId, pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index) {
|
||||||
|
SyncAppendEntries* pMsgOut = syncLogToAppendEntries(pNode->pLogBuf, pNode, index);
|
||||||
|
if (pMsgOut == NULL) {
|
||||||
|
sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// replicate pMsgOut
|
||||||
|
(void)syncLogReplicateAppendEntries(pNode, pMsgOut);
|
||||||
|
|
||||||
|
_err:
|
||||||
|
syncAppendEntriesDestroy(pMsgOut);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
// ClientRequest(i, v) ==
|
// ClientRequest(i, v) ==
|
||||||
// /\ state[i] = Leader
|
// /\ state[i] = Leader
|
||||||
|
@ -3069,6 +3374,31 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) {
|
||||||
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
|
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
|
||||||
syncNodeEventLog(ths, "on client request");
|
syncNodeEventLog(ths, "on client request");
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf);
|
||||||
|
SyncTerm term = ths->pRaftStore->currentTerm;
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
if (pRetIndex) {
|
||||||
|
(*pRetIndex) = index;
|
||||||
|
}
|
||||||
|
|
||||||
|
return syncNodeAppend(ths, pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
|
||||||
|
syncNodeEventLog(ths, "on client request");
|
||||||
|
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -3085,11 +3415,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
// del resp mgr, call FpCommitCb
|
// del resp mgr, call FpCommitCb
|
||||||
ASSERT(0);
|
sError("vgId:%d, failed to append log entry since %s", ths->vgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if mulit replica, start replicate right now
|
// if multi replica, start replicate right now
|
||||||
if (ths->replicaNum > 1) {
|
if (ths->replicaNum > 1) {
|
||||||
syncNodeReplicate(ths);
|
syncNodeReplicate(ths);
|
||||||
}
|
}
|
||||||
|
@ -3111,7 +3441,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
|
||||||
if (h) {
|
if (h) {
|
||||||
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
|
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
|
||||||
} else {
|
} else {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -3305,6 +3635,7 @@ static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFin
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
|
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
|
||||||
|
return false;
|
||||||
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
|
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3432,7 +3763,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
|
||||||
if (h) {
|
if (h) {
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
} else {
|
} else {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3703,4 +4034,4 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
|
snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s",
|
||||||
host, port, pMsg->term, pMsg->privateTerm, s);
|
host, port, pMsg->term, pMsg->privateTerm, s);
|
||||||
syncNodeEventLog(pSyncNode, logBuf);
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
|
||||||
return pEntry;
|
return pEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncEntryDestory(SSyncRaftEntry* pEntry) {
|
void syncEntryDestroy(SSyncRaftEntry* pEntry) {
|
||||||
if (pEntry != NULL) {
|
if (pEntry != NULL) {
|
||||||
taosMemoryFree(pEntry);
|
taosMemoryFree(pEntry);
|
||||||
}
|
}
|
||||||
|
@ -454,7 +454,7 @@ static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(
|
||||||
|
|
||||||
static void freeRaftEntry(void* param) {
|
static void freeRaftEntry(void* param) {
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||||
|
@ -588,7 +588,7 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
|
||||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||||
ASSERT(pNode != NULL);
|
ASSERT(pNode != NULL);
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
++returnCnt;
|
++returnCnt;
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(pIter);
|
tSkipListDestroyIter(pIter);
|
||||||
|
@ -617,7 +617,7 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
|
||||||
++returnCnt;
|
++returnCnt;
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||||
|
|
||||||
// syncEntryDestory(pEntry);
|
// syncEntryDestroy(pEntry);
|
||||||
taosRemoveRef(pCache->refMgr, pEntry->rid);
|
taosRemoveRef(pCache->refMgr, pEntry->rid);
|
||||||
}
|
}
|
||||||
tSkipListDestroyIter(pIter);
|
tSkipListDestroyIter(pIter);
|
||||||
|
|
|
@ -152,7 +152,6 @@ static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
|
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
|
||||||
SyncIndex lastIndex;
|
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
SyncIndex lastVer = walGetLastVer(pWal);
|
SyncIndex lastVer = walGetLastVer(pWal);
|
||||||
|
@ -207,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
syncMeta.isWeek = pEntry->isWeak;
|
syncMeta.isWeek = pEntry->isWeak;
|
||||||
syncMeta.seqNum = pEntry->seqNum;
|
syncMeta.seqNum = pEntry->seqNum;
|
||||||
syncMeta.term = pEntry->term;
|
syncMeta.term = pEntry->term;
|
||||||
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
|
index = walAppendLog(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
|
||||||
if (index < 0) {
|
if (index < 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = terrno;
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
|
@ -218,11 +217,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
pEntry->index, err, err, errStr, sysErr, sysErrStr);
|
||||||
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
syncNodeErrorLog(pData->pSyncNode, logBuf);
|
||||||
|
|
||||||
ASSERT(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pEntry->index = index;
|
|
||||||
|
ASSERT(pEntry->index == index);
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
|
@ -326,8 +324,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr);
|
pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr);
|
||||||
|
return -1;
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// event log
|
// event log
|
||||||
|
@ -365,7 +362,6 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
|
||||||
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
// ASSERT(walCommit(pWal, index) == 0);
|
|
||||||
int32_t code = walCommit(pWal, index);
|
int32_t code = walCommit(pWal, index);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = terrno;
|
||||||
|
@ -374,8 +370,7 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
const char* sysErrStr = strerror(errno);
|
const char* sysErrStr = strerror(errno);
|
||||||
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s",
|
||||||
pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr);
|
pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr);
|
||||||
|
return -1;
|
||||||
ASSERT(0);
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -427,7 +422,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
|
||||||
raftLogGetEntry(pLogStore, i, &pEntry);
|
raftLogGetEntry(pLogStore, i, &pEntry);
|
||||||
|
|
||||||
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,7 +55,11 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
|
||||||
// maybe start snapshot
|
// maybe start snapshot
|
||||||
SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
||||||
SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
||||||
if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) {
|
if (nextIndex > logEndIndex) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nextIndex < logStartIndex) {
|
||||||
char logBuf[128];
|
char logBuf[128];
|
||||||
snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64,
|
snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64,
|
||||||
nextIndex, logStartIndex, logEndIndex);
|
nextIndex, logStartIndex, logEndIndex);
|
||||||
|
@ -90,7 +94,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
|
||||||
memcpy(pMsg->data, serialized, len);
|
memcpy(pMsg->data, serialized, len);
|
||||||
|
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
|
||||||
|
@ -154,8 +158,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
|
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
pMsg->destId = *destRaftId;
|
||||||
|
|
||||||
syncLogSendAppendEntries(pSyncNode, pMsg, "");
|
syncLogSendAppendEntries(pSyncNode, pMsg, "");
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -163,7 +169,10 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
|
||||||
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
||||||
|
|
||||||
SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
|
SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
|
||||||
ASSERT(pState != NULL);
|
if (pState == NULL) {
|
||||||
|
sError("vgId:%d, failed to get peer state for addr:0x%016" PRIx64 "", pSyncNode->vgId, destRaftId->addr);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pMsg->dataLen > 0) {
|
if (pMsg->dataLen > 0) {
|
||||||
pState->lastSendIndex = pMsg->prevLogIndex + 1;
|
pState->lastSendIndex = pMsg->prevLogIndex + 1;
|
||||||
|
@ -173,7 +182,7 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
|
int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
|
if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) {
|
||||||
ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg);
|
ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg);
|
||||||
|
@ -231,4 +240,4 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,7 +139,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
|
||||||
getLastConfig = true;
|
getLastConfig = true;
|
||||||
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
} else {
|
} else {
|
||||||
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
|
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
|
||||||
sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
|
sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId);
|
||||||
|
|
|
@ -194,7 +194,8 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
|
||||||
btc.idx++;
|
btc.idx++;
|
||||||
} else if (c == 0) {
|
} else if (c == 0) {
|
||||||
// dup key not allowed
|
// dup key not allowed
|
||||||
ASSERT(0);
|
tdbError("unable to insert dup key. pKey: %p, kLen: %d, btc: %p, pTxn: %p", pKey, kLen, &btc, pTxn);
|
||||||
|
// ASSERT(0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -519,10 +519,15 @@ END:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) {
|
int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body,
|
||||||
|
int32_t bodyLen) {
|
||||||
taosThreadMutexLock(&pWal->mutex);
|
taosThreadMutexLock(&pWal->mutex);
|
||||||
|
|
||||||
int64_t index = pWal->vers.lastVer + 1;
|
if (index != pWal->vers.lastVer + 1) {
|
||||||
|
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (walCheckAndRoll(pWal) < 0) {
|
if (walCheckAndRoll(pWal) < 0) {
|
||||||
taosThreadMutexUnlock(&pWal->mutex);
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
|
|
Loading…
Reference in New Issue