Merge pull request #18057 from taosdata/enh/TD-20052
refact: adjust sync debug codes
This commit is contained in:
commit
fd4de459a1
|
@ -30,8 +30,6 @@ extern "C" {
|
||||||
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
|
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
|
||||||
#define HEARTBEAT_TIMER_MS 1000
|
#define HEARTBEAT_TIMER_MS 1000
|
||||||
|
|
||||||
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
|
|
||||||
|
|
||||||
typedef struct SSyncEnv {
|
typedef struct SSyncEnv {
|
||||||
uint8_t isStart;
|
uint8_t isStart;
|
||||||
|
|
||||||
|
|
|
@ -22,11 +22,9 @@ extern "C" {
|
||||||
|
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
|
|
||||||
#define CONFIG_FILE_LEN 2048
|
#define CONFIG_FILE_LEN 2048
|
||||||
|
|
||||||
#define MAX_CONFIG_INDEX_COUNT 256
|
#define MAX_CONFIG_INDEX_COUNT 256
|
||||||
|
|
||||||
// SRaftCfgIndex ------------------------------------------
|
|
||||||
typedef struct SRaftCfgIndex {
|
typedef struct SRaftCfgIndex {
|
||||||
TdFilePtr pFile;
|
TdFilePtr pFile;
|
||||||
char path[TSDB_FILENAME_LEN * 2];
|
char path[TSDB_FILENAME_LEN * 2];
|
||||||
|
@ -44,11 +42,8 @@ cJSON *raftCfgIndex2Json(SRaftCfgIndex *pRaftCfgIndex);
|
||||||
char *raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex);
|
char *raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex);
|
||||||
int32_t raftCfgIndexFromJson(const cJSON *pRoot, SRaftCfgIndex *pRaftCfgIndex);
|
int32_t raftCfgIndexFromJson(const cJSON *pRoot, SRaftCfgIndex *pRaftCfgIndex);
|
||||||
int32_t raftCfgIndexFromStr(const char *s, SRaftCfgIndex *pRaftCfgIndex);
|
int32_t raftCfgIndexFromStr(const char *s, SRaftCfgIndex *pRaftCfgIndex);
|
||||||
|
|
||||||
int32_t raftCfgIndexCreateFile(const char *path);
|
int32_t raftCfgIndexCreateFile(const char *path);
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
|
|
||||||
typedef struct SRaftCfg {
|
typedef struct SRaftCfg {
|
||||||
SSyncCfg cfg;
|
SSyncCfg cfg;
|
||||||
TdFilePtr pFile;
|
TdFilePtr pFile;
|
||||||
|
@ -68,11 +63,9 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
|
||||||
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
|
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
|
||||||
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
|
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
|
||||||
|
|
||||||
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
|
|
||||||
char *syncCfg2Str(SSyncCfg *pSyncCfg);
|
|
||||||
void syncCfg2SimpleStr(const SSyncCfg *pCfg, char *str, int32_t bufLen);
|
void syncCfg2SimpleStr(const SSyncCfg *pCfg, char *str, int32_t bufLen);
|
||||||
|
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
|
||||||
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
|
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
|
||||||
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
|
|
||||||
|
|
||||||
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
|
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
|
||||||
char *raftCfg2Str(SRaftCfg *pRaftCfg);
|
char *raftCfg2Str(SRaftCfg *pRaftCfg);
|
||||||
|
|
|
@ -38,7 +38,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
|
||||||
void logStoreDestory(SSyncLogStore* pLogStore);
|
void logStoreDestory(SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
|
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore);
|
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
|
SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
|
||||||
|
|
|
@ -25,6 +25,8 @@ extern "C" {
|
||||||
#define RAFT_STORE_BLOCK_SIZE 512
|
#define RAFT_STORE_BLOCK_SIZE 512
|
||||||
#define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2)
|
#define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2)
|
||||||
|
|
||||||
|
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
|
||||||
|
|
||||||
typedef struct SRaftStore {
|
typedef struct SRaftStore {
|
||||||
SyncTerm currentTerm;
|
SyncTerm currentTerm;
|
||||||
SRaftId voteFor;
|
SRaftId voteFor;
|
||||||
|
@ -38,20 +40,11 @@ int32_t raftStorePersist(SRaftStore *pRaftStore);
|
||||||
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
|
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
|
||||||
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
|
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
|
||||||
|
|
||||||
bool raftStoreHasVoted(SRaftStore *pRaftStore);
|
bool raftStoreHasVoted(SRaftStore *pRaftStore);
|
||||||
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
|
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
|
||||||
void raftStoreClearVote(SRaftStore *pRaftStore);
|
void raftStoreClearVote(SRaftStore *pRaftStore);
|
||||||
void raftStoreNextTerm(SRaftStore *pRaftStore);
|
void raftStoreNextTerm(SRaftStore *pRaftStore);
|
||||||
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
|
void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term);
|
||||||
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson);
|
|
||||||
cJSON *raftStore2Json(SRaftStore *pRaftStore);
|
|
||||||
char *raftStore2Str(SRaftStore *pRaftStore);
|
|
||||||
|
|
||||||
// for debug -------------------
|
|
||||||
void raftStorePrint(SRaftStore *pObj);
|
|
||||||
void raftStorePrint2(char *s, SRaftStore *pObj);
|
|
||||||
void raftStoreLog(SRaftStore *pObj);
|
|
||||||
void raftStoreLog2(char *s, SRaftStore *pObj);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,6 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
#include "syncMessage.h"
|
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
// HandleRequestVoteRequest(i, j, m) ==
|
// HandleRequestVoteRequest(i, j, m) ==
|
||||||
|
|
|
@ -21,7 +21,6 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
#include "syncMessage.h"
|
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
// HandleRequestVoteResponse(i, j, m) ==
|
// HandleRequestVoteResponse(i, j, m) ==
|
||||||
|
|
|
@ -13,10 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncRaftCfg.h"
|
#include "syncRaftCfg.h"
|
||||||
#include "cJSON.h"
|
|
||||||
#include "syncEnv.h"
|
|
||||||
#include "syncUtil.h"
|
|
||||||
|
|
||||||
// file must already exist!
|
// file must already exist!
|
||||||
SRaftCfgIndex *raftCfgIndexOpen(const char *path) {
|
SRaftCfgIndex *raftCfgIndexOpen(const char *path) {
|
||||||
|
@ -242,13 +240,6 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
||||||
return pRoot;
|
return pRoot;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
|
||||||
cJSON *pJson = syncCfg2Json(pSyncCfg);
|
|
||||||
char *serialized = cJSON_Print(pJson);
|
|
||||||
cJSON_Delete(pJson);
|
|
||||||
return serialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
|
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
|
||||||
memset(pSyncCfg, 0, sizeof(SSyncCfg));
|
memset(pSyncCfg, 0, sizeof(SSyncCfg));
|
||||||
// cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
|
// cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
|
||||||
|
@ -283,17 +274,6 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) {
|
|
||||||
cJSON *pRoot = cJSON_Parse(s);
|
|
||||||
ASSERT(pRoot != NULL);
|
|
||||||
|
|
||||||
int32_t ret = syncCfgFromJson(pRoot, pSyncCfg);
|
|
||||||
ASSERT(ret == 0);
|
|
||||||
|
|
||||||
cJSON_Delete(pRoot);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
||||||
cJSON *pRoot = cJSON_CreateObject();
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
|
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
|
||||||
|
|
|
@ -102,7 +102,6 @@ void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
|
||||||
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
//-----------------------------------
|
|
||||||
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||||
SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
|
SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
|
||||||
if (pCache == NULL) {
|
if (pCache == NULL) {
|
||||||
|
@ -256,8 +255,6 @@ int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
//-----------------------------------
|
|
||||||
static char* keyFn(const void* pData) {
|
static char* keyFn(const void* pData) {
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
|
||||||
return (char*)(&(pEntry->index));
|
return (char*)(&(pEntry->index));
|
||||||
|
|
|
@ -13,33 +13,33 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncRaftLog.h"
|
#include "syncRaftLog.h"
|
||||||
#include "syncRaftCfg.h"
|
#include "syncRaftCfg.h"
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
|
|
||||||
//-------------------------------
|
|
||||||
// log[m .. n]
|
// log[m .. n]
|
||||||
|
|
||||||
// public function
|
// public function
|
||||||
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
|
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
|
||||||
|
|
||||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
||||||
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
||||||
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
|
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
|
||||||
static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
|
static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
|
||||||
static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore);
|
static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore);
|
||||||
|
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
|
||||||
|
|
||||||
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
|
|
||||||
|
|
||||||
//-------------------------------
|
|
||||||
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
|
SSyncLogStore* pLogStore = taosMemoryCalloc(1, sizeof(SSyncLogStore));
|
||||||
ASSERT(pLogStore != NULL);
|
if (pLogStore == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
|
pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
|
||||||
if (pLogStore->pCache == NULL) {
|
if (pLogStore->pCache == NULL) {
|
||||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
|
||||||
taosMemoryFree(pLogStore);
|
taosMemoryFree(pLogStore);
|
||||||
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +96,6 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//-------------------------------
|
|
||||||
// log[m .. n]
|
// log[m .. n]
|
||||||
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
|
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) {
|
||||||
ASSERT(snapshotIndex >= 0);
|
ASSERT(snapshotIndex >= 0);
|
||||||
|
|
|
@ -13,9 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "cJSON.h"
|
|
||||||
#include "syncEnv.h"
|
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
|
||||||
// private function
|
// private function
|
||||||
|
@ -26,22 +25,19 @@ static bool raftStoreFileExist(char *path);
|
||||||
SRaftStore *raftStoreOpen(const char *path) {
|
SRaftStore *raftStoreOpen(const char *path) {
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
|
|
||||||
SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore));
|
SRaftStore *pRaftStore = taosMemoryCalloc(1, sizeof(SRaftStore));
|
||||||
if (pRaftStore == NULL) {
|
if (pRaftStore == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
memset(pRaftStore, 0, sizeof(*pRaftStore));
|
|
||||||
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
|
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
|
||||||
|
|
||||||
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
|
|
||||||
memset(storeBuf, 0, sizeof(storeBuf));
|
|
||||||
|
|
||||||
if (!raftStoreFileExist(pRaftStore->path)) {
|
if (!raftStoreFileExist(pRaftStore->path)) {
|
||||||
ret = raftStoreInit(pRaftStore);
|
ret = raftStoreInit(pRaftStore);
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
|
||||||
pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
|
pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
|
||||||
ASSERT(pRaftStore->pFile != NULL);
|
ASSERT(pRaftStore->pFile != NULL);
|
||||||
|
|
||||||
|
@ -72,9 +68,7 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftStoreClose(SRaftStore *pRaftStore) {
|
int32_t raftStoreClose(SRaftStore *pRaftStore) {
|
||||||
if (pRaftStore == NULL) {
|
if (pRaftStore == NULL) return 0;
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosCloseFile(&pRaftStore->pFile);
|
taosCloseFile(&pRaftStore->pFile);
|
||||||
taosMemoryFree(pRaftStore);
|
taosMemoryFree(pRaftStore);
|
||||||
|
@ -182,69 +176,3 @@ void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
|
||||||
pRaftStore->currentTerm = term;
|
pRaftStore->currentTerm = term;
|
||||||
raftStorePersist(pRaftStore);
|
raftStorePersist(pRaftStore);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; }
|
|
||||||
|
|
||||||
cJSON *raftStore2Json(SRaftStore *pRaftStore) {
|
|
||||||
char u64buf[128] = {0};
|
|
||||||
cJSON *pRoot = cJSON_CreateObject();
|
|
||||||
|
|
||||||
if (pRaftStore != NULL) {
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->currentTerm);
|
|
||||||
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
|
|
||||||
|
|
||||||
cJSON *pVoteFor = cJSON_CreateObject();
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->voteFor.addr);
|
|
||||||
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
|
|
||||||
{
|
|
||||||
uint64_t u64 = pRaftStore->voteFor.addr;
|
|
||||||
char host[128] = {0};
|
|
||||||
uint16_t port;
|
|
||||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
|
||||||
cJSON_AddStringToObject(pVoteFor, "addr_host", host);
|
|
||||||
cJSON_AddNumberToObject(pVoteFor, "addr_port", port);
|
|
||||||
}
|
|
||||||
cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId);
|
|
||||||
cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor);
|
|
||||||
|
|
||||||
int hasVoted = raftStoreHasVoted(pRaftStore);
|
|
||||||
cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted);
|
|
||||||
}
|
|
||||||
|
|
||||||
cJSON *pJson = cJSON_CreateObject();
|
|
||||||
cJSON_AddItemToObject(pJson, "SRaftStore", pRoot);
|
|
||||||
return pJson;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *raftStore2Str(SRaftStore *pRaftStore) {
|
|
||||||
cJSON *pJson = raftStore2Json(pRaftStore);
|
|
||||||
char *serialized = cJSON_Print(pJson);
|
|
||||||
cJSON_Delete(pJson);
|
|
||||||
return serialized;
|
|
||||||
}
|
|
||||||
|
|
||||||
// for debug -------------------
|
|
||||||
void raftStorePrint(SRaftStore *pObj) {
|
|
||||||
char *serialized = raftStore2Str(pObj);
|
|
||||||
printf("raftStorePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void raftStorePrint2(char *s, SRaftStore *pObj) {
|
|
||||||
char *serialized = raftStore2Str(pObj);
|
|
||||||
printf("raftStorePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
fflush(NULL);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
void raftStoreLog(SRaftStore *pObj) {
|
|
||||||
char *serialized = raftStore2Str(pObj);
|
|
||||||
sTrace("raftStoreLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
||||||
void raftStoreLog2(char *s, SRaftStore *pObj) {
|
|
||||||
char *serialized = raftStore2Str(pObj);
|
|
||||||
sTrace("raftStoreLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
|
||||||
taosMemoryFree(serialized);
|
|
||||||
}
|
|
||||||
|
|
|
@ -13,14 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncReplication.h"
|
#include "syncReplication.h"
|
||||||
#include "syncIndexMgr.h"
|
#include "syncIndexMgr.h"
|
||||||
#include "syncMessage.h"
|
|
||||||
#include "syncRaftCfg.h"
|
|
||||||
#include "syncRaftEntry.h"
|
#include "syncRaftEntry.h"
|
||||||
#include "syncRaftLog.h"
|
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "syncSnapshot.h"
|
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncRequestVote.h"
|
#include "syncRequestVote.h"
|
||||||
#include "syncInt.h"
|
|
||||||
#include "syncRaftCfg.h"
|
#include "syncRaftCfg.h"
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
|
|
|
@ -13,11 +13,9 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncRequestVoteReply.h"
|
#include "syncRequestVoteReply.h"
|
||||||
#include "syncInt.h"
|
|
||||||
#include "syncRaftCfg.h"
|
|
||||||
#include "syncRaftStore.h"
|
#include "syncRaftStore.h"
|
||||||
#include "syncUtil.h"
|
|
||||||
#include "syncVoteMgr.h"
|
#include "syncVoteMgr.h"
|
||||||
|
|
||||||
// TLA+ Spec
|
// TLA+ Spec
|
||||||
|
|
|
@ -143,7 +143,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
|
||||||
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
||||||
|
|
||||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
|
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
|
||||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64,
|
sTrace("==callback== ==ReConfigCb== flag:0x%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64
|
||||||
|
", term:%" PRIu64,
|
||||||
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term);
|
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,8 @@ int32_t GetSnapshotCb(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
||||||
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
|
||||||
|
|
||||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
|
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
|
||||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64,
|
sTrace("==callback== ==ReConfigCb== flag:%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64
|
||||||
|
", term:%" PRIu64,
|
||||||
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term);
|
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -148,8 +148,8 @@ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFini
|
||||||
|
|
||||||
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
|
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
|
||||||
char* s = syncCfg2Str(&(cbMeta->newCfg));
|
char* s = syncCfg2Str(&(cbMeta->newCfg));
|
||||||
sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64
|
sTrace("==callback== ==ReConfigCb== flag:%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64
|
||||||
", newCfg:%s",
|
", term:%" PRIu64 ", newCfg:%s",
|
||||||
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term, s);
|
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term, s);
|
||||||
taosMemoryFree(s);
|
taosMemoryFree(s);
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,6 +49,9 @@ void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj);
|
||||||
void syncEntryLog(const SSyncRaftEntry* pObj);
|
void syncEntryLog(const SSyncRaftEntry* pObj);
|
||||||
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
|
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
|
||||||
|
|
||||||
|
char* syncCfg2Str(SSyncCfg* pSyncCfg);
|
||||||
|
int32_t syncCfgFromStr(const char* s, SSyncCfg* pSyncCfg);
|
||||||
|
|
||||||
cJSON* raftCache2Json(SRaftEntryHashCache* pObj);
|
cJSON* raftCache2Json(SRaftEntryHashCache* pObj);
|
||||||
char* raftCache2Str(SRaftEntryHashCache* pObj);
|
char* raftCache2Str(SRaftEntryHashCache* pObj);
|
||||||
void raftCachePrint(SRaftEntryHashCache* pObj);
|
void raftCachePrint(SRaftEntryHashCache* pObj);
|
||||||
|
@ -63,6 +66,14 @@ void raftEntryCachePrint2(char* s, SRaftEntryCache* pObj);
|
||||||
void raftEntryCacheLog(SRaftEntryCache* pObj);
|
void raftEntryCacheLog(SRaftEntryCache* pObj);
|
||||||
void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj);
|
void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj);
|
||||||
|
|
||||||
|
int32_t raftStoreFromJson(SRaftStore* pRaftStore, cJSON* pJson);
|
||||||
|
cJSON* raftStore2Json(SRaftStore* pRaftStore);
|
||||||
|
char* raftStore2Str(SRaftStore* pRaftStore);
|
||||||
|
void raftStorePrint(SRaftStore* pObj);
|
||||||
|
void raftStorePrint2(char* s, SRaftStore* pObj);
|
||||||
|
void raftStoreLog(SRaftStore* pObj);
|
||||||
|
void raftStoreLog2(char* s, SRaftStore* pObj);
|
||||||
|
|
||||||
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg);
|
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg);
|
||||||
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg);
|
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg);
|
||||||
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg);
|
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg);
|
||||||
|
@ -99,6 +110,8 @@ char* snapshotSender2Str(SSyncSnapshotSender* pSender);
|
||||||
cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver);
|
cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver);
|
||||||
char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver);
|
char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -16,4 +16,20 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "syncTest.h"
|
#include "syncTest.h"
|
||||||
|
|
||||||
|
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||||
|
cJSON *pJson = syncCfg2Json(pSyncCfg);
|
||||||
|
char *serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) {
|
||||||
|
cJSON *pRoot = cJSON_Parse(s);
|
||||||
|
ASSERT(pRoot != NULL);
|
||||||
|
|
||||||
|
int32_t ret = syncCfgFromJson(pRoot, pSyncCfg);
|
||||||
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
|
cJSON_Delete(pRoot);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "syncTest.h"
|
||||||
|
#include "cJSON.h"
|
||||||
|
|
||||||
|
int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; }
|
||||||
|
|
||||||
|
cJSON *raftStore2Json(SRaftStore *pRaftStore) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON *pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pRaftStore != NULL) {
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->currentTerm);
|
||||||
|
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
|
||||||
|
|
||||||
|
cJSON *pVoteFor = cJSON_CreateObject();
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->voteFor.addr);
|
||||||
|
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
|
||||||
|
{
|
||||||
|
uint64_t u64 = pRaftStore->voteFor.addr;
|
||||||
|
char host[128] = {0};
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||||
|
cJSON_AddStringToObject(pVoteFor, "addr_host", host);
|
||||||
|
cJSON_AddNumberToObject(pVoteFor, "addr_port", port);
|
||||||
|
}
|
||||||
|
cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId);
|
||||||
|
cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor);
|
||||||
|
|
||||||
|
int hasVoted = raftStoreHasVoted(pRaftStore);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON *pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SRaftStore", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *raftStore2Str(SRaftStore *pRaftStore) {
|
||||||
|
cJSON *pJson = raftStore2Json(pRaftStore);
|
||||||
|
char *serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for debug -------------------
|
||||||
|
void raftStorePrint(SRaftStore *pObj) {
|
||||||
|
char *serialized = raftStore2Str(pObj);
|
||||||
|
printf("raftStorePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftStorePrint2(char *s, SRaftStore *pObj) {
|
||||||
|
char *serialized = raftStore2Str(pObj);
|
||||||
|
printf("raftStorePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
void raftStoreLog(SRaftStore *pObj) {
|
||||||
|
char *serialized = raftStore2Str(pObj);
|
||||||
|
sTrace("raftStoreLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftStoreLog2(char *s, SRaftStore *pObj) {
|
||||||
|
char *serialized = raftStore2Str(pObj);
|
||||||
|
sTrace("raftStoreLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
Loading…
Reference in New Issue