refactor(sync) add trace log

This commit is contained in:
Minghao Li 2022-06-21 15:09:23 +08:00
parent 6c71ae8985
commit 0fb64add90
9 changed files with 106 additions and 70 deletions

View File

@ -253,6 +253,7 @@ void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj);
void syncNodeLog(SSyncNode* pObj); void syncNodeLog(SSyncNode* pObj);
void syncNodeLog2(char* s, SSyncNode* pObj); void syncNodeLog2(char* s, SSyncNode* pObj);
void syncNodeLog3(char* s, SSyncNode* pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -27,7 +27,7 @@ extern "C" {
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
#define CONFIG_FILE_LEN 1024 #define CONFIG_FILE_LEN 2048
#define MAX_CONFIG_INDEX_COUNT 512 #define MAX_CONFIG_INDEX_COUNT 512

View File

@ -55,7 +55,7 @@ typedef struct SSyncSnapshotSender {
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
void snapshotSenderStop(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);

View File

@ -173,6 +173,28 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// get sender // get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
SSnapshot snapshot;
void* pReader = NULL;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot, NULL, &pReader);
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN && nextIndex <= snapshot.lastApplyIndex + 1 &&
!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
// has snapshot
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, snapshot, pReader);
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
} else {
// no snapshot
if (pReader != NULL) {
ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
}
}
/*
bool hasSnapshot = syncNodeHasSnapshot(ths); bool hasSnapshot = syncNodeHasSnapshot(ths);
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
@ -187,6 +209,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncNodeEventLog(ths, eventLog); syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog); taosMemoryFree(eventLog);
} }
*/
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1; SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
@ -207,12 +230,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex);
if (gRaftDetailLog) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
return ret; return ret;
} }

View File

@ -78,7 +78,9 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf
return idx; return idx;
} }
} }
assert(0);
syncNodeLog3("syncIndexMgrGetIndex", pSyncIndexMgr->pSyncNode);
ASSERT(0);
} }
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {

View File

@ -1282,6 +1282,9 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// snapshot receivers // snapshot receivers
cJSON* pReceivers = cJSON_CreateArray(); cJSON* pReceivers = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver)); cJSON_AddItemToObject(pRoot, "receiver", snapshotReceiver2Json(pSyncNode->pNewNodeReceiver));
// changing
cJSON_AddNumberToObject(pRoot, "changing", pSyncNode->changing);
} }
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
@ -1400,7 +1403,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
pSyncNode->pRaftCfg->isStandBy = 1; // set standby pSyncNode->pRaftCfg->isStandBy = 1; // set standby
} }
// persist last config index // add last config index
raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex); raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
if (IamInNew) { if (IamInNew) {
@ -1827,7 +1830,11 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN); ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
ASSERT(index <= syncStartIndex);
if (index > syncStartIndex) {
syncNodeLog3("syncNodeGetPreIndex", pSyncNode);
ASSERT(0);
}
SyncIndex preIndex = index - 1; SyncIndex preIndex = index - 1;
return preIndex; return preIndex;
@ -1836,7 +1843,11 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN); ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
ASSERT(index <= syncStartIndex);
if (index > syncStartIndex) {
syncNodeLog3("syncNodeGetPreTerm", pSyncNode);
ASSERT(0);
}
if (index == SYNC_INDEX_BEGIN) { if (index == SYNC_INDEX_BEGIN) {
return 0; return 0;
@ -1929,6 +1940,12 @@ void syncNodeLog2(char* s, SSyncNode* pObj) {
} }
} }
void syncNodeLog3(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTraceLong("syncNodeLog3 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
// ------ local funciton --------- // ------ local funciton ---------
// enqueue message ---- // enqueue message ----
static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqPingTimer(void* param, void* tmrId) {

View File

@ -53,7 +53,12 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
char buf[CONFIG_FILE_LEN] = {0}; char buf[CONFIG_FILE_LEN] = {0};
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
ASSERT(strlen(s) + 1 <= CONFIG_FILE_LEN);
if (strlen(s) + 1 > CONFIG_FILE_LEN) {
sError("too long config str:%s", s);
ASSERT(0);
}
snprintf(buf, sizeof(buf), "%s", s); snprintf(buf, sizeof(buf), "%s", s);
int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf)); int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf));
assert(ret == sizeof(buf)); assert(ret == sizeof(buf));

View File

@ -67,38 +67,34 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
// begin send snapshot (current term, seq begin) // begin send snapshot (current term, seq begin)
void snapshotSenderStart(SSyncSnapshotSender *pSender) { void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) {
ASSERT(!snapshotSenderIsStart(pSender)); ASSERT(!snapshotSenderIsStart(pSender));
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
// open snapshot reader
ASSERT(pSender->pReader == NULL);
pSender->pReader = pReader;
pSender->snapshot = snapshot;
/*
// open snapshot reader // open snapshot reader
ASSERT(pSender->pReader == NULL); ASSERT(pSender->pReader == NULL);
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader)); int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
ASSERT(ret == 0); ASSERT(ret == 0);
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
*/
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock); taosMemoryFree(pSender->pCurrentBlock);
} }
pSender->blockLen = 0; pSender->blockLen = 0;
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld",
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex);
if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
/*
SSyncRaftEntry *pEntry = NULL;
int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
pSender->snapshot.lastConfigIndex, &pEntry);
ASSERT(code == 0);
ASSERT(pEntry != NULL);
*/
SSyncRaftEntry *pEntry = SSyncRaftEntry *pEntry =
pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex); pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex);
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);

View File

@ -60,14 +60,12 @@ SyncReconfigFinish *createMsg() {
return pMsg; return pMsg;
} }
void test1() { void test1() {
SyncReconfigFinish *pMsg = createMsg(); SyncReconfigFinish *pMsg = createMsg();
syncReconfigFinishLog2((char *)"test1:", pMsg); syncReconfigFinishLog2((char *)"test1:", pMsg);
syncReconfigFinishDestroy(pMsg); syncReconfigFinishDestroy(pMsg);
} }
void test2() { void test2() {
SyncReconfigFinish *pMsg = createMsg(); SyncReconfigFinish *pMsg = createMsg();
uint32_t len = pMsg->bytes; uint32_t len = pMsg->bytes;