refactor(sync): make leader life longer

This commit is contained in:
Minghao Li 2022-08-06 19:40:10 +08:00
parent 05229b6650
commit f83ca89ea2
5 changed files with 91 additions and 47 deletions

View File

@ -401,7 +401,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeSyncThreads = tsNumOfCores; // tsNumOfVnodeSyncThreads = tsNumOfCores;
tsNumOfVnodeSyncThreads = 32;
tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1); tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1);
if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1;

View File

@ -47,6 +47,8 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore);
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore); SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore);
// for debug // for debug
void logStorePrint(SSyncLogStore* pLogStore); void logStorePrint(SSyncLogStore* pLogStore);
void logStorePrint2(char* s, SSyncLogStore* pLogStore); void logStorePrint2(char* s, SSyncLogStore* pLogStore);

View File

@ -357,16 +357,14 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd);
syncNodeEventLog(ths, eventLog);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code; return code;
} }
// if FromIndex > walCommitVer, return 0
// else return num of pass entries
static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
int32_t code; int32_t code = 0;
int32_t pass = 0;
SyncIndex delBegin = FromIndex; SyncIndex delBegin = FromIndex;
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore); SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
@ -398,16 +396,31 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
} }
} }
// update delete begin
SyncIndex walCommitVer = logStoreWalCommitVer(ths->pLogStore);
if (delBegin <= walCommitVer) {
delBegin = walCommitVer + 1;
pass = walCommitVer - delBegin + 1;
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "update delete begin to %ld", delBegin);
syncNodeEventLog(ths, logBuf);
} while (0);
}
// delete confict entries // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
char eventLog[128]; do {
snprintf(eventLog, sizeof(eventLog), "log truncate, from %" PRId64 " to %" PRId64, delBegin, delEnd); char logBuf[128];
syncNodeEventLog(ths, eventLog); snprintf(logBuf, sizeof(logBuf), "make log same from:%ld, delbegin:%ld, pass:%d", FromIndex, delBegin, pass);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); syncNodeEventLog(ths, logBuf);
} while (0);
return code; return pass;
} }
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) { int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) {
@ -543,31 +556,34 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg); SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) { if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
int32_t pass = 0;
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
// make log same // make log same
do { if (hasExtraEntries) {
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore); // make log same, rollback deleted entries
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex; pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(pass >= 0);
if (hasExtraEntries) { }
// make log same, rollback deleted entries
code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(code == 0);
}
} while (0);
// append entry batch // append entry batch
for (int32_t i = 0; i < pMsg->dataCount; ++i) { if (pass == 0) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); // assert! no batch
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); ASSERT(pMsg->dataCount == 1);
if (code != 0) {
return -1; for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
} }
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
} }
// fsync once // fsync once
@ -670,25 +686,33 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
syncLogRecvAppendEntriesBatch(ths, pMsg, "really match"); syncLogRecvAppendEntriesBatch(ths, pMsg, "really match");
int32_t pass = 0;
if (hasExtraEntries) { if (hasExtraEntries) {
// make log same, rollback deleted entries // make log same, rollback deleted entries
code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1); pass = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
ASSERT(code == 0); ASSERT(pass >= 0);
} }
if (hasAppendEntries) { if (hasAppendEntries) {
// append entry batch // append entry batch
for (int32_t i = 0; i < pMsg->dataCount; ++i) { if (pass == 0) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset); // assert! no batch
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); ASSERT(pMsg->dataCount == 1);
if (code != 0) {
return -1; // append entry batch
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
if (code != 0) {
return -1;
}
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
} }
code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0);
// syncEntryDestory(pAppendEntry);
} }
// fsync once // fsync once

View File

@ -2409,6 +2409,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param; SSyncNode* pSyncNode = (SSyncNode*)param;
syncNodeEventLog(pSyncNode, "eq hb timer");
if (pSyncNode->replicaNum > 1) { if (pSyncNode->replicaNum > 1) {
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {

View File

@ -305,10 +305,18 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
return code; return code;
} }
// truncate semantic
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) { static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
int32_t code = walRollback(pWal, fromIndex);
// need not truncate
SyncIndex wallastVer = walGetLastVer(pWal);
if (fromIndex > wallastVer) {
return 0;
}
int32_t code = walRollback(pWal, fromIndex);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
@ -323,7 +331,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
// event log // event log
do { do {
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "wal truncate, from-index:%" PRId64, fromIndex); snprintf(logBuf, sizeof(logBuf), "log truncate, from-index:%" PRId64, fromIndex);
syncNodeEventLog(pData->pSyncNode, logBuf); syncNodeEventLog(pData->pSyncNode, logBuf);
} while (0); } while (0);
@ -637,6 +645,12 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
return walGetFirstVer(pWal); return walGetFirstVer(pWal);
} }
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
return walGetCommittedVer(pWal);
}
// for debug ----------------- // for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) { void logStorePrint(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore); char* serialized = logStore2Str(pLogStore);