diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ff14e637d0..7ed8414906 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -45,7 +45,7 @@ extern bool gRaftDetailLog; #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 -#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF +#define SYNC_TERM_INVALID -1 // 0xFFFFFFFFFFFFFFFF typedef enum { SYNC_STRATEGY_NO_SNAPSHOT = 0, @@ -56,7 +56,7 @@ typedef enum { typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; typedef int64_t SyncIndex; -typedef uint64_t SyncTerm; +typedef int64_t SyncTerm; typedef struct SSyncNode SSyncNode; typedef struct SSyncBuffer SSyncBuffer; @@ -201,7 +201,7 @@ typedef struct SSyncInfo { int32_t syncInit(); void syncCleanUp(); int64_t syncOpen(SSyncInfo* pSyncInfo); -void syncStart(int64_t rid); +int32_t syncStart(int64_t rid); void syncStop(int64_t rid); int32_t syncSetStandby(int64_t rid); ESyncState syncGetMyRole(int64_t rid); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index adf244e32a..e908ae6d88 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -170,7 +170,7 @@ int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo // Assign version automatically and return to caller, // -1 will be returned for failed writes -int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); +int64_t walAppendLog(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); void walFsync(SWal *, bool force); diff --git a/include/util/tdef.h b/include/util/tdef.h index 936fbdf0d5..0b7f9b28fa 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -281,6 +281,7 @@ typedef enum ELogicConditionType { #define TSDB_DNODE_ROLE_VNODE 2 #define TSDB_MAX_REPLICA 5 +#define TSDB_SYNC_LOG_BUFFER_SIZE 500 #define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index cd6fe380e1..74f9d8bbdb 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -309,8 +309,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { void mndSyncStart(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; + if (syncStart(pMgmt->sync) < 0) { + mError("vgId:1, failed to start sync subsystem"); + return; + } syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); - syncStart(pMgmt->sync); mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync); } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 988ecc5dd3..fe23087c25 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -97,7 +97,7 @@ bool vnodeShouldRollback(SVnode* pVnode); // vnodeSync.c int32_t vnodeSyncOpen(SVnode* pVnode, char* path); -void vnodeSyncStart(SVnode* pVnode); +int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); bool vnodeIsLeader(SVnode* pVnode); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 4e02a28cdf..a2f3ca2911 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -714,7 +714,8 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { taosArraySet(pMerger->pArray, iCol, pColVal); } } else { - ASSERT(0); + // ASSERT(0); + tsdbError("dup key accounted: key version:%" PRId64 ", merger version:%" PRId64, key.version, pMerger->version); } } @@ -888,7 +889,6 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr code = TSDB_CODE_OUT_OF_MEMORY; goto _clear; } - midx = (sidx + eidx) / 2; code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1863203f4a..c3ccbddc53 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -665,9 +665,13 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { return 0; } -void vnodeSyncStart(SVnode *pVnode) { +int32_t vnodeSyncStart(SVnode *pVnode) { + if (syncStart(pVnode->sync) < 0) { + vError("vgId:%d, failed to start sync subsystem since %s", pVnode->config.vgId, terrstr()); + return -1; + } syncSetMsgCb(pVnode->sync, &pVnode->msgCb); - syncStart(pVnode->sync); + return 0; } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index a158430a0f..bd97187ce7 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -88,6 +88,44 @@ typedef struct SPeerState { int64_t lastSendTime; } SPeerState; +typedef struct SSyncLogBufEntry { + SSyncRaftEntry* pItem; + SyncIndex prevLogIndex; + SyncTerm prevLogTerm; +} SSyncLogBufEntry; + +typedef struct SSyncLogBuffer { + SSyncLogBufEntry entries[TSDB_SYNC_LOG_BUFFER_SIZE]; + int64_t startIndex; + int64_t commitIndex; + int64_t matchIndex; + int64_t endIndex; + int64_t size; + TdThreadMutex mutex; +} SSyncLogBuffer; + +SSyncLogBuffer* syncLogBufferCreate(); +void syncLogBufferDestroy(SSyncLogBuffer* pBuf); +int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); + +// access +int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); +int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); +int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); +int64_t syncLogBufferLoad(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode); +int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); + +int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex); +SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index); + +// private +int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); +int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); +int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index); +void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index); +bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); + typedef struct SSyncNode { // init by SSyncInfo SyncGroupId vgId; @@ -97,6 +135,7 @@ typedef struct SSyncNode { char configPath[TSDB_FILENAME_LEN * 2]; // sync io + SSyncLogBuffer* pLogBuf; SWal* pWal; const SMsgCb* msgcb; int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); @@ -186,7 +225,7 @@ typedef struct SSyncNode { SSyncRespMgr* pSyncRespMgr; // restore state - bool restoreFinish; + _Atomic bool restoreFinish; // SSnapshot* pSnapshot; SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* pNewNodeReceiver; @@ -208,10 +247,11 @@ typedef struct SSyncNode { // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo); -void syncNodeStart(SSyncNode* pSyncNode); -void syncNodeStartStandBy(SSyncNode* pSyncNode); +int32_t syncNodeStart(SSyncNode* pSyncNode); +int32_t syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); +int32_t syncNodeRestore(SSyncNode* pSyncNode); // option bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); @@ -298,7 +338,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); void syncStartNormal(int64_t rid); -void syncStartStandBy(int64_t rid); +int32_t syncStartStandBy(int64_t rid); bool syncNodeCanChange(SSyncNode* pSyncNode); bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index bab1dcc661..75ecd2d2a1 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -47,7 +47,7 @@ SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncInde SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index); SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId); -void syncEntryDestory(SSyncRaftEntry* pEntry); +void syncEntryDestroy(SSyncRaftEntry* pEntry); char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5 SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); // step 6 cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 4f15a45cec..1d34a41456 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -57,8 +57,8 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, cons int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId); -int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg); -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg); +int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId, SyncAppendEntries* pMsg); +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId, SyncAppendEntries* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 170a57a7a9..9560ea269b 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -118,7 +118,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { rpcFreeCont(rpcMsg.pCont); } - syncEntryDestory(pRollBackEntry); + syncEntryDestroy(pRollBackEntry); } } @@ -161,7 +161,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { rpcFreeCont(rpcMsg.pCont); } - syncEntryDestory(pRollBackEntry); + syncEntryDestroy(pRollBackEntry); } } @@ -308,7 +308,551 @@ int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { return 0; } +SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId) { + return syncEntryBuildNoop(term, index, vgId); +} + +int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + ASSERT(pNode->pLogStore != NULL && "log store not created"); + ASSERT(pNode->pFsm != NULL && "pFsm not registered"); + ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); + + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); + SSnapshot snapshot; + if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { + sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); + goto _err; + } + + SyncIndex commitIndex = snapshot.lastApplyIndex; + SyncTerm commitTerm = snapshot.lastApplyTerm; + SyncIndex toIndex = TMAX(lastVer, commitIndex); + + // update match index + pBuf->commitIndex = commitIndex; + pBuf->matchIndex = toIndex; + pBuf->endIndex = toIndex + 1; + + // load log entries in reverse order + SSyncLogStore* pLogStore = pNode->pLogStore; + SyncIndex index = toIndex; + SSyncRaftEntry* pEntry = NULL; + bool takeDummy = false; + + while (true) { + if (index <= pBuf->commitIndex) { + takeDummy = true; + break; + } + + if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { + sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + ASSERT(0); + break; + } + + bool taken = false; + if (toIndex <= index + pBuf->size - 1) { + SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = -1, .prevLogTerm = -1}; + pBuf->entries[index % pBuf->size] = tmp; + taken = true; + } + + if (index < toIndex) { + pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = pEntry->index; + pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = pEntry->term; + } + + if (!taken) { + syncEntryDestroy(pEntry); + pEntry = NULL; + break; + } + + index--; + } + + // put a dummy record at commitIndex if present in log buffer + if (takeDummy) { + ASSERT(index == pBuf->commitIndex); + + SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId); + if (pDummy == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; + pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; + + if (index < toIndex) { + pBuf->entries[(index + 1) % pBuf->size].prevLogIndex = commitIndex; + pBuf->entries[(index + 1) % pBuf->size].prevLogTerm = commitTerm; + } + } + + // update startIndex + pBuf->startIndex = index; + + // validate + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return 0; + +_err: + taosThreadMutexUnlock(&pBuf->mutex); + return -1; +} + +int64_t syncLogBufferLoadOld(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + + SSyncLogStore* pLogStore = pNode->pLogStore; + ASSERT(pBuf->startIndex <= pBuf->matchIndex); + ASSERT(pBuf->matchIndex + 1 == pBuf->endIndex); + SyncIndex index = pBuf->endIndex; + SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; + ASSERT(pMatch != NULL); + + while (index - pBuf->startIndex < pBuf->size && index <= toIndex) { + SSyncRaftEntry* pEntry = NULL; + if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { + sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + ASSERT(0); + break; + } + ASSERT(pMatch->index + 1 == pEntry->index); + SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; + pBuf->entries[pBuf->endIndex % pBuf->size] = tmp; + + sInfo("vgId:%d, loaded log entry into log buffer. index: %" PRId64 ", term: %" PRId64, pNode->vgId, pEntry->index, + pEntry->term); + + pBuf->matchIndex = index; + pBuf->endIndex = index + 1; + pMatch = pEntry; + index++; + } + + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return index; +} + +int32_t syncLogBufferInitOld(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + ASSERT(pNode->pLogStore != NULL && "log store not created"); + ASSERT(pNode->pFsm != NULL && "pFsm not registered"); + ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); + + SSnapshot snapshot; + if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { + sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); + goto _err; + } + SyncIndex commitIndex = snapshot.lastApplyIndex; + SyncTerm commitTerm = snapshot.lastApplyTerm; + + // init log buffer indexes + pBuf->startIndex = commitIndex; + pBuf->matchIndex = commitIndex; + pBuf->commitIndex = commitIndex; + pBuf->endIndex = commitIndex + 1; + + // put a dummy record at initial commitIndex + SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId); + if (pDummy == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; + pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; + + taosThreadMutexUnlock(&pBuf->mutex); + return 0; + +_err: + taosThreadMutexUnlock(&pBuf->mutex); + return -1; +} + +int32_t syncLogBufferRollbackMatchIndex(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { + if (toIndex <= pBuf->commitIndex) { + sError("vgId:%d, cannot rollback across commit index:%" PRId64 ", to index:%" PRId64 "", pNode->vgId, + pBuf->commitIndex, toIndex); + return -1; + } + + pBuf->matchIndex = TMIN(pBuf->matchIndex, toIndex - 1); + + // update my match index + syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex); + return 0; +} + +int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + + int32_t ret = 0; + SyncIndex index = pEntry->index; + SyncIndex prevIndex = pEntry->index - 1; + if (index <= pBuf->commitIndex || index - pBuf->startIndex > pBuf->size) { + sInfo("vgId:%d, cannot accept index:%" PRId64 " into log buffer. start index: %" PRId64 ", commit index: %" PRId64 + ", end index:%" PRId64 ")", + pNode->vgId, index, pBuf->startIndex, pBuf->commitIndex, pBuf->endIndex); + ret = (index <= pBuf->commitIndex) ? 0 : -1; + goto _out; + } + + // check current in buffer + SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; + if (pExist != NULL) { + ASSERT(pEntry->index == pExist->index); + + if (pEntry->term > pExist->term) { + (void)syncLogBufferRollback(pBuf, index); + } else { + sInfo("vgId:%d, %s raft entry received. index:%" PRId64 ", term: %" PRId64 "", pNode->vgId, + ((pEntry->term < pExist->term) ? "stale" : "duplicate"), pEntry->index, pEntry->term); + SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm; + ASSERT(pEntry->term < pExist->term || (pEntry->term == pExist->term && prevTerm == existPrevTerm)); + ret = (pEntry->term < pExist->term) ? 0 : -1; + goto _out; + } + } + + // update + SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; + pEntry = NULL; + pBuf->entries[index % pBuf->size] = tmp; + + // update end index + pBuf->endIndex = TMAX(index + 1, pBuf->endIndex); + +_out: + syncEntryDestroy(pEntry); + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return ret; +} + +SSyncRaftEntry* syncLogAppendEntriesToRaftEntry(const SyncAppendEntries* pMsg) { + SSyncRaftEntry* pEntry = taosMemoryMalloc(pMsg->dataLen); + if (pEntry == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + (void)memcpy(pEntry, pMsg->data, pMsg->dataLen); + ASSERT(pEntry->bytes == pMsg->dataLen); + return pEntry; +} + +int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { + SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); + if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) { + sError("failed to truncate log store since %s. from index:%" PRId64 "", terrstr(), pEntry->index); + return -1; + } + lastVer = pLogStore->syncLogLastIndex(pLogStore); + ASSERT(pEntry->index == lastVer + 1); + + if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) { + sError("failed to append raft log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, + pEntry->term); + return -1; + } + return 0; +} + +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + + SSyncLogStore* pLogStore = pNode->pLogStore; + int64_t matchIndex = pBuf->matchIndex; + + while (pBuf->matchIndex + 1 < pBuf->endIndex) { + int64_t index = pBuf->matchIndex + 1; + ASSERT(index >= 0); + + // try to proceed + SSyncLogBufEntry* pBufEntry = &pBuf->entries[index % pBuf->size]; + SyncIndex prevLogIndex = pBufEntry->prevLogIndex; + SyncTerm prevLogTerm = pBufEntry->prevLogTerm; + SSyncRaftEntry* pEntry = pBufEntry->pItem; + if (pEntry == NULL) { + sDebug("vgId:%d, cannot proceed match index in log buffer. no raft entry at next pos of matchIndex:%" PRId64, + pNode->vgId, pBuf->matchIndex); + goto _out; + } + + ASSERT(index == pEntry->index); + + // match + SSyncRaftEntry* pMatch = pBuf->entries[(pBuf->matchIndex + pBuf->size) % pBuf->size].pItem; + ASSERT(pMatch != NULL); + ASSERT(pMatch->index == pBuf->matchIndex); + ASSERT(pMatch->index + 1 == pEntry->index); + ASSERT(prevLogIndex == pMatch->index); + + if (pMatch->term != prevLogTerm) { + sError( + "vgId:%d, mismatching raft log entries encountered. " + "{ index:%" PRId64 ", term:%" PRId64 + " } " + "{ index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " } ", + pNode->vgId, pMatch->index, pMatch->term, pEntry->index, pEntry->term, prevLogIndex, prevLogTerm); + goto _out; + } + + // replicate on demand + if (pNode->state == TAOS_SYNC_STATE_LEADER && pNode->replicaNum > 1) { + (void)syncLogBufferReplicate(pBuf, pNode, index); + } + + // persist + if (syncLogStorePersist(pLogStore, pEntry) < 0) { + sError("vgId:%d, failed to persist raft log entry from log buffer since %s. index:%" PRId64, pNode->vgId, + terrstr(), pEntry->index); + goto _out; + } + + // increment + pBuf->matchIndex = index; + matchIndex = pBuf->matchIndex; + + // update my match index + syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex); + } // end of while + +_out: + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return matchIndex; +} + +int32_t syncLogFsmExecute(SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry) { + ASSERT(pFsm->FpCommitCb != NULL && "No commit cb registered for the FSM"); + + SRpcMsg rpcMsg; + syncEntry2OriginalRpc(pEntry, &rpcMsg); + + SFsmCbMeta cbMeta = {0}; + cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = -1; + cbMeta.isWeak = pEntry->isWeak; + cbMeta.code = 0; + cbMeta.state = role; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = term; + cbMeta.flag = -1; + + pFsm->FpCommitCb(pFsm, &rpcMsg, cbMeta); + return 0; +} + +int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) { + ASSERT(pBuf->startIndex <= pBuf->matchIndex); + ASSERT(pBuf->commitIndex <= pBuf->matchIndex); + ASSERT(pBuf->matchIndex < pBuf->endIndex); + ASSERT(pBuf->endIndex - pBuf->startIndex <= pBuf->size); + for (SyncIndex index = pBuf->commitIndex; index <= pBuf->matchIndex; index++) { + SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + ASSERT(pEntry != NULL); + } + return 0; +} + +int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + + SSyncLogStore* pLogStore = pNode->pLogStore; + SSyncFSM* pFsm = pNode->pFsm; + ESyncState role = pNode->state; + SyncTerm term = pNode->pRaftStore->currentTerm; + SyncGroupId vgId = pNode->vgId; + int32_t ret = 0; + int64_t upperIndex = TMIN(commitIndex, pBuf->matchIndex); + SSyncRaftEntry* pEntry = NULL; + bool inBuf = false; + + if (commitIndex <= pBuf->commitIndex) { + sDebug("vgId:%d, stale commit update. current:%" PRId64 ", notified:%" PRId64 "", vgId, pBuf->commitIndex, + commitIndex); + ret = 0; + goto _out; + } + + sDebug("vgId:%d, log buffer info. role: %d, term: %" PRId64 ". start index:%" PRId64 ", commit index:%" PRId64 + ", match index: %" PRId64 ", end index:%" PRId64 "", + pNode->vgId, role, term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + + // execute in fsm + for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { + // get a log entry + if (index >= pBuf->startIndex) { + inBuf = true; + pEntry = pBuf->entries[index % pBuf->size].pItem; + } else { + inBuf = false; + if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { + sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); + ret = -1; + goto _out; + } + } + + ASSERT(pEntry != NULL); + + // execute it + if (!syncUtilUserCommit(pEntry->originalRpcType)) { + sInfo("vgId:%d, non-user msg in raft log entry. index: %" PRId64 ", term:%" PRId64 "", vgId, pEntry->index, + pEntry->term); + pBuf->commitIndex = index; + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + continue; + } + + if (syncLogFsmExecute(pFsm, role, term, pEntry) != 0) { + sError("vgId:%d, failed to execute raft entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId, + pEntry->index, pEntry->term); + ret = -1; + goto _out; + } + pBuf->commitIndex = index; + + sInfo("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, + pEntry->index, pEntry->term, role, term); + + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + } + + // recycle + // TODO: with a grace period of one third of free space before commitIndex in ring buffer + SyncIndex until = pBuf->commitIndex; + for (SyncIndex index = pBuf->startIndex; index < until; index++) { + SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + ASSERT(pEntry != NULL); + syncEntryDestroy(pEntry); + memset(&pBuf->entries[(index + pBuf->size) % pBuf->size], 0, sizeof(pBuf->entries[0])); + pBuf->startIndex = index + 1; + } + +_out: + // mark as restored if needed + if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { + pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); + pNode->restoreFinish = true; + sInfo("vgId:%d, restore finished. commit index:%" PRId64 ", match index:%" PRId64 ", last index:%" PRId64 "", + pNode->vgId, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex - 1); + } + + if (!inBuf) { + syncEntryDestroy(pEntry); + pEntry = NULL; + } + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return ret; +} + int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { + SyncAppendEntriesReply* pReply = NULL; + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { + syncLogRecvAppendEntries(ths, pMsg, "not in my config"); + goto _IGNORE; + } + + // prepare response msg + pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = false; + pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->lastSendIndex = pMsg->prevLogIndex + 1; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->startTime = ths->startTime; + + if (pMsg->term < ths->pRaftStore->currentTerm) { + goto _SEND_RESPONSE; + } + + if (pMsg->term > ths->pRaftStore->currentTerm) { + pReply->term = pMsg->term; + } + + syncNodeStepDown(ths, pMsg->term); + syncNodeResetElectTimer(ths); + + // update commit index + (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); + + if (pMsg->dataLen < (int32_t)sizeof(SSyncRaftEntry)) { + sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d", + ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + goto _IGNORE; + } + + SSyncRaftEntry* pEntry = syncLogAppendEntriesToRaftEntry(pMsg); + + if (pEntry == NULL) { + sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr()); + goto _IGNORE; + } + + if (pMsg->prevLogIndex + 1 != pEntry->index) { + sError("vgId:%d, invalid previous log index in msg. index:%" PRId64 ", term:%" PRId64 ", prevLogIndex:%" PRId64 + ", prevLogTerm:%" PRId64, + ths->vgId, pEntry->index, pEntry->term, pMsg->prevLogIndex, pMsg->prevLogTerm); + goto _IGNORE; + } + + sInfo("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 + ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", + pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); + + // accept + if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) { + sWarn("vgId:%d, failed to accept raft entry into log buffer. index:%" PRId64 ", term:%" PRId64, ths->vgId, + pEntry->index, pEntry->term); + goto _SEND_RESPONSE; + } + pReply->success = true; + +_SEND_RESPONSE: + // update match index + pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); + + // ack, i.e. send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + (void)syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + + // commit index, i.e. leader notice me + if (syncLogBufferCommit(ths->pLogBuf, ths, pMsg->commitIndex) < 0) { + sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr()); + goto _out; + } + +_out: +_IGNORE: + syncAppendEntriesReplyDestroy(pReply); + return 0; +} + +int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, SyncAppendEntries* pMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { syncLogRecvAppendEntries(ths, pMsg, "not in my config"); @@ -386,6 +930,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { goto _IGNORE; } + ASSERT(pAppendEntry->index == appendIndex); + // append code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); if (code != 0) { @@ -431,34 +977,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { } } -#if 0 - if (code != 0 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); - ASSERT(code == 0); - - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - ASSERT(code == 0); - - } else { - ASSERT(code == 0); - - if (pLocalEntry->term == pAppendEntry->term) { - // do nothing - } else { - code = ths->pLogStore->syncLogTruncate(ths->pLogStore, appendIndex); - ASSERT(code == 0); - - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - ASSERT(code == 0); - } - } -#endif - // update match index pReply->matchIndex = pAppendEntry->index; - syncEntryDestory(pLocalEntry); - syncEntryDestory(pAppendEntry); + syncEntryDestroy(pLocalEntry); + syncEntryDestroy(pAppendEntry); } else { // no append entries, do nothing @@ -489,4 +1012,4 @@ _SEND_RESPONSE: syncAppendEntriesReplyDestroy(pReply); return 0; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5e6c9f1534..e37c40455c 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -84,6 +84,70 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync } } +int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) { + ths->commitIndex = TMAX(commitIndex, ths->commitIndex); + SyncIndex lastVer = ths->pLogStore->syncLogLastIndex(ths->pLogStore); + commitIndex = TMIN(ths->commitIndex, lastVer); + ths->pLogStore->syncLogUpdateCommitIndex(ths->pLogStore, commitIndex); + return commitIndex; +} + +int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { + if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { + SyncIndex commitIndex = indexLikely; + syncNodeUpdateCommitIndex(ths, commitIndex); + sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, + ths->pRaftStore->currentTerm, commitIndex); + } + return ths->commitIndex; +} + +int32_t syncLogBufferCatchingUpReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex fromIndex, SRaftId destId) { + taosThreadMutexLock(&pBuf->mutex); + SyncAppendEntries* pMsgOut = NULL; + SyncIndex index = fromIndex; + + if (pNode->state != TAOS_SYNC_STATE_LEADER || pNode->replicaNum <= 1) { + goto _out; + } + + if (index < pBuf->startIndex) { + sError("vgId:%d, (not implemented yet) replication fromIndex: %" PRId64 + " that is less than pBuf->startIndex: %" PRId64 ". destId: 0x%016" PRId64 "", + pNode->vgId, fromIndex, pBuf->startIndex, destId.addr); + goto _out; + } + + if (index > pBuf->matchIndex) { + goto _out; + } + + do { + pMsgOut = syncLogToAppendEntries(pBuf, pNode, index); + if (pMsgOut == NULL) { + sError("vgId:%d, failed to assembly append entries msg since %s. index: %" PRId64 "", pNode->vgId, terrstr(), + index); + goto _out; + } + + if (syncNodeSendAppendEntries(pNode, &destId, pMsgOut) < 0) { + sWarn("vgId:%d, failed to send append entries msg since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", + pNode->vgId, terrstr(), index, destId.addr); + goto _out; + } + + index += 1; + syncAppendEntriesDestroy(pMsgOut); + pMsgOut = NULL; + } while (false && index <= pBuf->commitIndex); + +_out: + syncAppendEntriesDestroy(pMsgOut); + pMsgOut = NULL; + taosThreadMutexUnlock(&pBuf->mutex); + return 0; +} + int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; @@ -99,6 +163,63 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs return 0; } + if (ths->state == TAOS_SYNC_STATE_LEADER) { + if (pMsg->term > ths->pRaftStore->currentTerm) { + syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); + syncNodeStepDown(ths, pMsg->term); + return -1; + } + + ASSERT(pMsg->term == ths->pRaftStore->currentTerm); + + sInfo("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", + pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); + + if (pMsg->success) { + SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); + if (pMsg->matchIndex > oldMatchIndex) { + syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex); + } + + // commit if needed + SyncIndex indexLikely = TMIN(pMsg->matchIndex, ths->pLogBuf->matchIndex); + SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); + (void)syncLogBufferCommit(ths->pLogBuf, ths, commitIndex); + } else { + SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); + if (nextIndex > SYNC_INDEX_BEGIN) { + --nextIndex; + } + syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); + } + + // send next append entries + SPeerState* pState = syncNodeGetPeerState(ths, &(pMsg->srcId)); + ASSERT(pState != NULL); + + if (pMsg->lastSendIndex == pState->lastSendIndex) { + syncNodeReplicateOne(ths, &(pMsg->srcId)); + } + } + + return 0; +} + +int32_t syncNodeOnAppendEntriesReplyOld(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = 0; + + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { + syncLogRecvAppendEntriesReply(ths, pMsg, "not in my config"); + return 0; + } + + // drop stale response + if (pMsg->term < ths->pRaftStore->currentTerm) { + syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response"); + return 0; + } + if (ths->state == TAOS_SYNC_STATE_LEADER) { if (pMsg->term > ths->pRaftStore->currentTerm) { syncLogRecvAppendEntriesReply(ths, pMsg, "error term"); @@ -135,4 +256,4 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs syncLogRecvAppendEntriesReply(ths, pMsg, "process"); return 0; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 811a7b8e99..a96fa31f83 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -44,12 +44,100 @@ // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // /\ UNCHANGED <> // + void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index"); return; } + // update commit index + SyncIndex newCommitIndex = pSyncNode->commitIndex; + for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) { + bool agree = syncAgree(pSyncNode, index); + + if (agree) { + // term + SSyncRaftEntry* pEntry = NULL; + SLRUCache* pCache = pSyncNode->pLogStore->pCache; + LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index)); + if (h) { + pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); + } else { + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry); + if (code != 0) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "advance commit index error, read wal index:%" PRId64, index); + syncNodeErrorLog(pSyncNode, logBuf); + return; + } + } + // cannot commit, even if quorum agree. need check term! + if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { + // update commit index + newCommitIndex = index; + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } else { + syncEntryDestroy(pEntry); + } + + break; + } else { + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%" PRId64 ", term:%" PRIu64, + pEntry->index, pEntry->term); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); + } + + if (h) { + taosLRUCacheRelease(pCache, h, false); + } else { + syncEntryDestroy(pEntry); + } + } + } + + // advance commit index as large as possible + SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore); + if (walCommitVer > newCommitIndex) { + newCommitIndex = walCommitVer; + } + + // maybe execute fsm + if (newCommitIndex > pSyncNode->commitIndex) { + SyncIndex beginIndex = pSyncNode->commitIndex + 1; + SyncIndex endIndex = newCommitIndex; + + // update commit index + pSyncNode->commitIndex = newCommitIndex; + + // call back Wal + pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex); + + // execute fsm + if (pSyncNode->pFsm != NULL) { + int32_t code = syncNodeDoCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state); + if (code != 0) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "advance commit index error, do commit begin:%" PRId64 ", end:%" PRId64, + beginIndex, endIndex); + syncNodeErrorLog(pSyncNode, logBuf); + return; + } + } + } +} + +void syncMaybeAdvanceCommitIndexOld(SSyncNode* pSyncNode) { + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index"); + return; + } + // advance commit index to sanpshot first SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -93,7 +181,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (h) { taosLRUCacheRelease(pCache, h, false); } else { - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } break; @@ -109,7 +197,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (h) { taosLRUCacheRelease(pCache, h, false); } else { - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } } } @@ -245,13 +333,28 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } */ -bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { +bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index) { + int count = 0; + SSyncIndexMgr* pMatches = pNode->pMatchIndex; + ASSERT(pNode->replicaNum == pMatches->replicaNum); + + for (int i = 0; i < pNode->replicaNum; i++) { + SyncIndex matchIndex = pMatches->index[i]; + if (matchIndex >= index) { + count++; + } + } + + return count >= pNode->quorum; +} + +bool syncAgree(SSyncNode* pNode, SyncIndex index) { int agreeCount = 0; - for (int i = 0; i < pSyncNode->replicaNum; ++i) { - if (syncAgreeIndex(pSyncNode, &(pSyncNode->replicasId[i]), index)) { + for (int i = 0; i < pNode->replicaNum; ++i) { + if (syncAgreeIndex(pNode, &(pNode->replicasId[i]), index)) { ++agreeCount; } - if (agreeCount >= pSyncNode->quorum) { + if (agreeCount >= pNode->quorum) { return true; } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 00c4ea76aa..c3267bafdc 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -105,19 +105,32 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) { return pSyncNode->rid; } -void syncStart(int64_t rid) { +int32_t syncStart(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return; + sError("failed to acquire rid: %" PRId64 " of tsNodeReftId for pSyncNode", rid); + return -1; + } + + if (syncNodeRestore(pSyncNode) < 0) { + sError("vgId:%d, failed to restore raft log buffer since %s", pSyncNode->vgId, terrstr()); + return -1; } if (pSyncNode->pRaftCfg->isStandBy) { - syncNodeStartStandBy(pSyncNode); + if (syncNodeStartStandBy(pSyncNode) < 0) { + sError("vgId:%d, failed to start raft node as standby since %s", pSyncNode->vgId, terrstr()); + return -1; + } } else { - syncNodeStart(pSyncNode); + if (syncNodeStart(pSyncNode) < 0) { + sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr()); + return -1; + } } taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; } void syncStartNormal(int64_t rid) { @@ -130,14 +143,15 @@ void syncStartNormal(int64_t rid) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); } -void syncStartStandBy(int64_t rid) { +int32_t syncStartStandBy(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { - return; + return -1; } syncNodeStartStandBy(pSyncNode); taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; } void syncStop(int64_t rid) { @@ -661,7 +675,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry); if (code != 0) { if (pEntry != NULL) { - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } taosReleaseRef(tsNodeRefId, pSyncNode->rid); return -1; @@ -673,7 +687,7 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho pSnapshot->lastApplyTerm = pEntry->term; pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index); - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; } @@ -1089,6 +1103,38 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { return ret; } +SSyncLogBuffer* syncLogBufferCreate() { + SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer)); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]); + + ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); + + if (taosThreadMutexInit(&pBuf->mutex, NULL) < 0) { + sError("failed to init log buffer mutex due to %s", strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + return pBuf; + +_err: + taosMemoryFree(pBuf); + return NULL; +} + +void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { + if (pBuf == NULL) { + return; + } + (void)taosThreadMutexDestroy(&pBuf->mutex); + (void)taosMemoryFree(pBuf); + return; +} + // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; @@ -1150,6 +1196,13 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg; + // create raft log ring buffer + pSyncNode->pLogBuf = syncLogBufferCreate(); + if (pSyncNode->pLogBuf == NULL) { + sError("failed to init log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId); + goto _error; + } + // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); if (pSyncNode->pRaftCfg == NULL) { @@ -1362,6 +1415,12 @@ SSyncNode* syncNodeOpen(SSyncInfo* pOldSyncInfo) { // snapshotting atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); + // init log buffer + if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) { + sError("vgId:%d, failed to init raft log buffer since %s", pSyncNode->vgId, terrstr()); + ASSERT(false); + } + syncNodeEventLog(pSyncNode, "sync open"); return pSyncNode; @@ -1387,7 +1446,48 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { } } -void syncNodeStart(SSyncNode* pSyncNode) { +int32_t syncNodeRestore(SSyncNode* pSyncNode) { + ASSERT(pSyncNode->pLogStore != NULL && "log store not created"); + ASSERT(pSyncNode->pLogBuf != NULL && "ring log buffer not created"); + + SyncIndex lastVer = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore); + SyncIndex endIndex = pSyncNode->pLogBuf->endIndex; + + commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); + + if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) { + return -1; + } + + if (endIndex <= lastVer) { + sError("vgId:%d, failed to load log entries into log buffers. commit index:%" PRId64 ", lastVer: %" PRId64 "", + pSyncNode->vgId, commitIndex, lastVer); + return -1; + } + + return 0; +} + +int32_t syncNodeStart(SSyncNode* pSyncNode) { + // start raft + if (pSyncNode->replicaNum == 1) { + raftStoreNextTerm(pSyncNode->pRaftStore); + syncNodeBecomeLeader(pSyncNode, "one replica start"); + + // Raft 3.6.2 Committing entries from previous terms + syncNodeAppendNoop(pSyncNode); + } else { + syncNodeBecomeFollower(pSyncNode, "first start"); + } + + int32_t ret = 0; + ret = syncNodeStartPingTimer(pSyncNode); + ASSERT(ret == 0); + return ret; +} + +void syncNodeStartOld(SSyncNode* pSyncNode) { // start raft if (pSyncNode->replicaNum == 1) { raftStoreNextTerm(pSyncNode->pRaftStore); @@ -1406,7 +1506,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { ASSERT(ret == 0); } -void syncNodeStartStandBy(SSyncNode* pSyncNode) { +int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; syncNodeStopHeartbeatTimer(pSyncNode); @@ -1419,6 +1519,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ret = 0; ret = syncNodeStartPingTimer(pSyncNode); ASSERT(ret == 0); + return ret; } void syncNodeClose(SSyncNode* pSyncNode) { @@ -1443,6 +1544,8 @@ void syncNodeClose(SSyncNode* pSyncNode) { pSyncNode->pMatchIndex = NULL; logStoreDestory(pSyncNode->pLogStore); pSyncNode->pLogStore = NULL; + syncLogBufferDestroy(pSyncNode->pLogBuf); + pSyncNode->pLogBuf = NULL; raftCfgClose(pSyncNode->pRaftCfg); pSyncNode->pRaftCfg = NULL; @@ -2341,6 +2444,43 @@ void syncNodeStepDown(SSyncNode* pSyncNode, SyncTerm newTerm) { void syncNodeLeaderChangeRsp(SSyncNode* pSyncNode) { syncRespCleanRsp(pSyncNode->pSyncRespMgr); } +int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex) { + ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); + + SyncIndex index = pBuf->endIndex - 1; + while (index >= toIndex) { + SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem; + if (pEntry != NULL) { + syncEntryDestroy(pEntry); + pEntry = NULL; + memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0])); + } + index--; + } + pBuf->endIndex = toIndex; + pBuf->matchIndex = TMIN(pBuf->matchIndex, index); + ASSERT(index + 1 == toIndex); + return 0; +} + +int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { + taosThreadMutexLock(&pBuf->mutex); + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); + ASSERT(lastVer == pBuf->matchIndex); + SyncIndex index = pBuf->endIndex - 1; + + (void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); + + sInfo("vgId:%d, reset log buffer. start index: %" PRId64 ", commit index: %" PRId64 ", match Index: %" PRId64 + ", end index: %" PRId64 "", + pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + + pBuf->endIndex = pBuf->matchIndex + 1; + + taosThreadMutexUnlock(&pBuf->mutex); + return 0; +} + void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // maybe clear leader cache if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { @@ -2365,6 +2505,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // min match index pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; + // reset log buffer + syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); + // trace log do { int32_t debugStrLen = strlen(debugStr); @@ -2403,7 +2546,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->leaderTime = taosGetTimestampMs(); // reset restoreFinish - pSyncNode->restoreFinish = false; + // TODO: disable it temporarily + // pSyncNode->restoreFinish = false; // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; @@ -2467,6 +2611,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // min match index pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; + // reset log buffer + syncLogBufferReset(pSyncNode->pLogBuf, pSyncNode); + // trace log do { int32_t debugStrLen = strlen(debugStr); @@ -2490,6 +2637,17 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); + // Raft 3.6.2 Committing entries from previous terms + syncNodeAppendNoop(pSyncNode); +} + +void syncNodeCandidate2LeaderOld(SSyncNode* pSyncNode) { + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted)); + syncNodeBecomeLeader(pSyncNode, "candidate to leader"); + + syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); + // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); @@ -2941,7 +3099,46 @@ static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, return code; } +int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { + // append to log buffer + if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { + sError("vgId:%d, failed to enqueue log buffer. index:%" PRId64 "", ths->vgId, pEntry->index); + return -1; + } + + // proceed match index, with replicating on needed + SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); + + // multi replica + if (ths->replicaNum > 1) { + return 0; + } + + // single replica + (void)syncNodeUpdateCommitIndex(ths, matchIndex); + + if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { + sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex); + return -1; + } + + return 0; +} + static int32_t syncNodeAppendNoop(SSyncNode* ths) { + SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); + SyncTerm term = ths->pRaftStore->currentTerm; + + SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); + if (pEntry == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return syncNodeAppend(ths, pEntry); +} + +static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { int32_t ret = 0; SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); @@ -2963,7 +3160,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (h) { taosLRUCacheRelease(ths->pLogStore->pCache, h, false); } else { - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } return ret; @@ -3055,6 +3252,114 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { return 0; } +int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { + taosThreadMutexLock(&pBuf->mutex); + int64_t index = pBuf->endIndex; + taosThreadMutexUnlock(&pBuf->mutex); + return index; +} + +int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry) { + taosThreadMutexLock(&pBuf->mutex); + syncLogBufferValidate(pBuf); + SyncIndex index = pEntry->index; + + if (index - pBuf->startIndex > pBuf->size) { + sError("vgId:%d, failed to append due to log buffer full. index:%" PRId64 "", pNode->vgId, index); + goto _out; + } + + ASSERT(index == pBuf->endIndex); + + SSyncRaftEntry* pExist = pBuf->entries[index % pBuf->size].pItem; + ASSERT(pExist == NULL); + + // initial log buffer with at least one item, e.g. commitIndex + SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; + ASSERT(pMatch != NULL && "no matched raft log entry"); + ASSERT(pMatch->index + 1 == index); + + SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; + pBuf->entries[index % pBuf->size] = tmp; + pBuf->endIndex = index + 1; + + syncLogBufferValidate(pBuf); + taosThreadMutexUnlock(&pBuf->mutex); + return 0; + +_out: + syncLogBufferValidate(pBuf); + syncEntryDestroy(pEntry); + taosThreadMutexUnlock(&pBuf->mutex); + return -1; +} + +SyncTerm syncLogBufferGetTerm(SSyncLogBuffer* pBuf, SyncIndex index) { + ASSERT(pBuf->startIndex <= index && index < pBuf->endIndex); + SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; + ASSERT(pEntry != NULL); + return pEntry->term; +} + +SyncAppendEntries* syncLogToAppendEntries(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index) { + SyncAppendEntries* pMsg = NULL; + + if (index < pBuf->startIndex || index >= pBuf->endIndex) { + sError("vgId:%d, log entry (%" PRId64 ") out of range of log buffer [%" PRId64 ", %" PRId64 ").", pNode->vgId, + index, pBuf->startIndex, pBuf->endIndex); + return pMsg; + } + + SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem; + if (pEntry == NULL) { + sError("vgId:%d, log entry (%" PRId64 ") not exist in log buffer [%" PRId64 ", %" PRId64 ").", pNode->vgId, index, + pBuf->startIndex, pBuf->endIndex); + return pMsg; + } + + uint32_t datalen = pEntry->bytes; + pMsg = syncAppendEntriesBuild(datalen, pNode->vgId); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + (void)memcpy(pMsg->data, pEntry, datalen); + + pMsg->prevLogIndex = index - 1; + pMsg->prevLogTerm = syncLogBufferGetTerm(pBuf, pMsg->prevLogIndex); + pMsg->vgId = pNode->vgId; + pMsg->srcId = pNode->myRaftId; + pMsg->term = pNode->pRaftStore->currentTerm; + pMsg->commitIndex = pNode->commitIndex; + pMsg->privateTerm = 0; + return pMsg; +} + +void syncLogReplicateAppendEntries(SSyncNode* pNode, SyncAppendEntries* pMsg) { + for (int i = 0; i < pNode->replicaNum; i++) { + SRaftId* pDestId = &pNode->peersId[i]; + if (!syncUtilSameId(pDestId, &pNode->myRaftId)) { + (void)syncNodeSendAppendEntries(pNode, pDestId, pMsg); + } + } +} + +int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index) { + SyncAppendEntries* pMsgOut = syncLogToAppendEntries(pNode->pLogBuf, pNode, index); + if (pMsgOut == NULL) { + sError("vgId:%d, failed to get append entries for index:%" PRId64 "", pNode->vgId, index); + goto _err; + } + + // replicate pMsgOut + (void)syncLogReplicateAppendEntries(pNode, pMsgOut); + +_err: + syncAppendEntriesDestroy(pMsgOut); + return 0; +} + // TLA+ Spec // ClientRequest(i, v) == // /\ state[i] = Leader @@ -3069,6 +3374,31 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) { syncNodeEventLog(ths, "on client request"); + int32_t code = 0; + + SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); + SyncTerm term = ths->pRaftStore->currentTerm; + SSyncRaftEntry* pEntry = NULL; + pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); + if (pEntry == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + if (pRetIndex) { + (*pRetIndex) = index; + } + + return syncNodeAppend(ths, pEntry); + } + + return 0; +} + +int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) { + syncNodeEventLog(ths, "on client request"); + int32_t ret = 0; int32_t code = 0; @@ -3085,11 +3415,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { // del resp mgr, call FpCommitCb - ASSERT(0); + sError("vgId:%d, failed to append log entry since %s", ths->vgId, terrstr()); return -1; } - // if mulit replica, start replicate right now + // if multi replica, start replicate right now if (ths->replicaNum > 1) { syncNodeReplicate(ths); } @@ -3111,7 +3441,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd if (h) { taosLRUCacheRelease(ths->pLogStore->pCache, h, false); } else { - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } return ret; @@ -3305,6 +3635,7 @@ static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFin } bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) { + return false; return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1); } @@ -3432,7 +3763,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde if (h) { taosLRUCacheRelease(pCache, h, false); } else { - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } } } @@ -3703,4 +4034,4 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, pMsg->term, pMsg->privateTerm, s); syncNodeEventLog(pSyncNode, logBuf); -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 940aaca055..38b8574afa 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -96,7 +96,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) return pEntry; } -void syncEntryDestory(SSyncRaftEntry* pEntry) { +void syncEntryDestroy(SSyncRaftEntry* pEntry) { if (pEntry != NULL) { taosMemoryFree(pEntry); } @@ -454,7 +454,7 @@ static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof( static void freeRaftEntry(void* param) { SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param; - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) { @@ -588,7 +588,7 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) { SSkipListNode* pNode = tSkipListIterGet(pIter); ASSERT(pNode != NULL); SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); ++returnCnt; } tSkipListDestroyIter(pIter); @@ -617,7 +617,7 @@ int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) { ++returnCnt; SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode); - // syncEntryDestory(pEntry); + // syncEntryDestroy(pEntry); taosRemoveRef(pCache->refMgr, pEntry->rid); } tSkipListDestroyIter(pIter); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 23d076cfbc..3db970ba00 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -152,7 +152,6 @@ static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) { } static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) { - SyncIndex lastIndex; SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex lastVer = walGetLastVer(pWal); @@ -207,7 +206,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; - index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); + index = walAppendLog(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); if (index < 0) { int32_t err = terrno; const char* errStr = tstrerror(err); @@ -218,11 +217,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pEntry->index, err, err, errStr, sysErr, sysErrStr); syncNodeErrorLog(pData->pSyncNode, logBuf); - - ASSERT(0); return -1; } - pEntry->index = index; + + ASSERT(pEntry->index == index); do { char eventLog[128]; @@ -326,8 +324,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn const char* sysErrStr = strerror(errno); sError("vgId:%d, wal truncate error, from-index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, fromIndex, err, err, errStr, sysErr, sysErrStr); - - ASSERT(0); + return -1; } // event log @@ -365,7 +362,6 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - // ASSERT(walCommit(pWal, index) == 0); int32_t code = walCommit(pWal, index); if (code != 0) { int32_t err = terrno; @@ -374,8 +370,7 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { const char* sysErrStr = strerror(errno); sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index, err, err, errStr, sysErr, sysErrStr); - - ASSERT(0); + return -1; } return 0; } @@ -427,7 +422,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { raftLogGetEntry(pLogStore, i, &pEntry); cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } } } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index e040310e15..181b9f2b74 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -55,7 +55,11 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { // maybe start snapshot SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); - if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) { + if (nextIndex > logEndIndex) { + return 0; + } + + if (nextIndex < logStartIndex) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, nextIndex, logStartIndex, logEndIndex); @@ -90,7 +94,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { memcpy(pMsg->data, serialized, len); taosMemoryFree(serialized); - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } else { if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { @@ -154,8 +158,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { +int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) { int32_t ret = 0; + pMsg->destId = *destRaftId; + syncLogSendAppendEntries(pSyncNode, pMsg, ""); SRpcMsg rpcMsg; @@ -163,7 +169,10 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId); - ASSERT(pState != NULL); + if (pState == NULL) { + sError("vgId:%d, failed to get peer state for addr:0x%016" PRIx64 "", pSyncNode->vgId, destRaftId->addr); + return -1; + } if (pMsg->dataLen > 0) { pState->lastSendIndex = pMsg->prevLogIndex + 1; @@ -173,7 +182,7 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI return ret; } -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) { int32_t ret = 0; if (syncNodeNeedSendAppendEntries(pSyncNode, destRaftId, pMsg)) { ret = syncNodeSendAppendEntries(pSyncNode, destRaftId, pMsg); @@ -231,4 +240,4 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { } return 0; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index a7bafa9f28..43c9ec2980 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -139,7 +139,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho getLastConfig = true; rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); + syncEntryDestroy(pEntry); } else { if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) { sTrace("vgId:%d, sync sender get cfg from local", pSender->pSyncNode->vgId); diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 3f36a058e5..6dda38653d 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -194,7 +194,8 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in btc.idx++; } else if (c == 0) { // dup key not allowed - ASSERT(0); + tdbError("unable to insert dup key. pKey: %p, kLen: %d, btc: %p, pTxn: %p", pKey, kLen, &btc, pTxn); + // ASSERT(0); return -1; } } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 527ffa0056..7ddebc5424 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -519,10 +519,15 @@ END: return -1; } -int64_t walAppendLog(SWal *pWal, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen) { +int64_t walAppendLog(SWal *pWal, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, + int32_t bodyLen) { taosThreadMutexLock(&pWal->mutex); - int64_t index = pWal->vers.lastVer + 1; + if (index != pWal->vers.lastVer + 1) { + terrno = TSDB_CODE_WAL_INVALID_VER; + taosThreadMutexUnlock(&pWal->mutex); + return -1; + } if (walCheckAndRoll(pWal) < 0) { taosThreadMutexUnlock(&pWal->mutex);