refact: adjust head file and sync log

This commit is contained in:
Shengliang Guan 2022-11-11 11:03:01 +08:00
parent 41c76d662f
commit 4d0c5c9d57
7 changed files with 113 additions and 103 deletions

View File

@ -30,8 +30,6 @@ extern "C" {
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 1000 #define HEARTBEAT_TIMER_MS 1000
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
typedef struct SSyncEnv { typedef struct SSyncEnv {
uint8_t isStart; uint8_t isStart;

View File

@ -38,7 +38,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
void logStoreDestory(SSyncLogStore* pLogStore); void logStoreDestory(SSyncLogStore* pLogStore);
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore); SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore); SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore);
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore); SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);

View File

@ -25,6 +25,8 @@ extern "C" {
#define RAFT_STORE_BLOCK_SIZE 512 #define RAFT_STORE_BLOCK_SIZE 512
#define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2) #define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2)
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
typedef struct SRaftStore { typedef struct SRaftStore {
SyncTerm currentTerm; SyncTerm currentTerm;
SRaftId voteFor; SRaftId voteFor;
@ -38,20 +40,11 @@ int32_t raftStorePersist(SRaftStore *pRaftStore);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
bool raftStoreHasVoted(SRaftStore *pRaftStore); bool raftStoreHasVoted(SRaftStore *pRaftStore);
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId); void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
void raftStoreClearVote(SRaftStore *pRaftStore); void raftStoreClearVote(SRaftStore *pRaftStore);
void raftStoreNextTerm(SRaftStore *pRaftStore); void raftStoreNextTerm(SRaftStore *pRaftStore);
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson);
cJSON *raftStore2Json(SRaftStore *pRaftStore);
char *raftStore2Str(SRaftStore *pRaftStore);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj);
void raftStorePrint2(char *s, SRaftStore *pObj);
void raftStoreLog(SRaftStore *pObj);
void raftStoreLog2(char *s, SRaftStore *pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -13,33 +13,33 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
//-------------------------------
// log[m .. n] // log[m .. n]
// public function // public function
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex); static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index); static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore); static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore);
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
//-------------------------------
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); SSyncLogStore* pLogStore = taosMemoryCalloc(1, sizeof(SSyncLogStore));
ASSERT(pLogStore != NULL); if (pLogStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5); pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
if (pLogStore->pCache == NULL) { if (pLogStore->pCache == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
taosMemoryFree(pLogStore); taosMemoryFree(pLogStore);
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -96,7 +96,6 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
} }
} }
//-------------------------------
// log[m .. n] // log[m .. n]
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) { static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
ASSERT(snapshotIndex >= 0); ASSERT(snapshotIndex >= 0);

View File

@ -15,8 +15,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "cJSON.h"
#include "syncEnv.h"
#include "syncUtil.h" #include "syncUtil.h"
// private function // private function
@ -27,22 +25,19 @@ static bool raftStoreFileExist(char *path);
SRaftStore *raftStoreOpen(const char *path) { SRaftStore *raftStoreOpen(const char *path) {
int32_t ret; int32_t ret;
SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore)); SRaftStore *pRaftStore = taosMemoryCalloc(1, sizeof(SRaftStore));
if (pRaftStore == NULL) { if (pRaftStore == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
memset(pRaftStore, 0, sizeof(*pRaftStore));
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
memset(storeBuf, 0, sizeof(storeBuf));
if (!raftStoreFileExist(pRaftStore->path)) { if (!raftStoreFileExist(pRaftStore->path)) {
ret = raftStoreInit(pRaftStore); ret = raftStoreInit(pRaftStore);
ASSERT(ret == 0); ASSERT(ret == 0);
} }
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE); pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
ASSERT(pRaftStore->pFile != NULL); ASSERT(pRaftStore->pFile != NULL);
@ -73,9 +68,7 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) {
} }
int32_t raftStoreClose(SRaftStore *pRaftStore) { int32_t raftStoreClose(SRaftStore *pRaftStore) {
if (pRaftStore == NULL) { if (pRaftStore == NULL) return 0;
return 0;
}
taosCloseFile(&pRaftStore->pFile); taosCloseFile(&pRaftStore->pFile);
taosMemoryFree(pRaftStore); taosMemoryFree(pRaftStore);
@ -183,69 +176,3 @@ void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
pRaftStore->currentTerm = term; pRaftStore->currentTerm = term;
raftStorePersist(pRaftStore); raftStorePersist(pRaftStore);
} }
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; }
cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
if (pRaftStore != NULL) {
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
cJSON *pVoteFor = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
{
uint64_t u64 = pRaftStore->voteFor.addr;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pVoteFor, "addr_host", host);
cJSON_AddNumberToObject(pVoteFor, "addr_port", port);
}
cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId);
cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor);
int hasVoted = raftStoreHasVoted(pRaftStore);
cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SRaftStore", pRoot);
return pJson;
}
char *raftStore2Str(SRaftStore *pRaftStore) {
cJSON *pJson = raftStore2Json(pRaftStore);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug -------------------
void raftStorePrint(SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
printf("raftStorePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void raftStorePrint2(char *s, SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
printf("raftStorePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void raftStoreLog(SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
sTrace("raftStoreLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void raftStoreLog2(char *s, SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
sTrace("raftStoreLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}

View File

@ -63,6 +63,14 @@ void raftEntryCachePrint2(char* s, SRaftEntryCache* pObj);
void raftEntryCacheLog(SRaftEntryCache* pObj); void raftEntryCacheLog(SRaftEntryCache* pObj);
void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj); void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj);
int32_t raftStoreFromJson(SRaftStore* pRaftStore, cJSON* pJson);
cJSON* raftStore2Json(SRaftStore* pRaftStore);
char* raftStore2Str(SRaftStore* pRaftStore);
void raftStorePrint(SRaftStore* pObj);
void raftStorePrint2(char* s, SRaftStore* pObj);
void raftStoreLog(SRaftStore* pObj);
void raftStoreLog2(char* s, SRaftStore* pObj);
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg); cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg);
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg); char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg); void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg);
@ -99,6 +107,8 @@ char* snapshotSender2Str(SSyncSnapshotSender* pSender);
cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver); cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver);
char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver); char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -0,0 +1,84 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "syncTest.h"
#include "cJSON.h"
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; }
cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
if (pRaftStore != NULL) {
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
cJSON *pVoteFor = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
{
uint64_t u64 = pRaftStore->voteFor.addr;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pVoteFor, "addr_host", host);
cJSON_AddNumberToObject(pVoteFor, "addr_port", port);
}
cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId);
cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor);
int hasVoted = raftStoreHasVoted(pRaftStore);
cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SRaftStore", pRoot);
return pJson;
}
char *raftStore2Str(SRaftStore *pRaftStore) {
cJSON *pJson = raftStore2Json(pRaftStore);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug -------------------
void raftStorePrint(SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
printf("raftStorePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void raftStorePrint2(char *s, SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
printf("raftStorePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void raftStoreLog(SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
sTrace("raftStoreLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void raftStoreLog2(char *s, SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
sTrace("raftStoreLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}