diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index cf4e3309f1..628e8874b4 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -30,8 +30,6 @@ extern "C" { #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define HEARTBEAT_TIMER_MS 1000 -#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) - typedef struct SSyncEnv { uint8_t isStart; diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 17ea36c640..de8bd81b30 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -38,7 +38,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); void logStoreDestory(SSyncLogStore* pLogStore); SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore); - SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore); SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore); diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 077e6a7658..bb6405f6b2 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -25,6 +25,8 @@ extern "C" { #define RAFT_STORE_BLOCK_SIZE 512 #define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2) +#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) + typedef struct SRaftStore { SyncTerm currentTerm; SRaftId voteFor; @@ -38,20 +40,11 @@ int32_t raftStorePersist(SRaftStore *pRaftStore); int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); -bool raftStoreHasVoted(SRaftStore *pRaftStore); -void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId); -void raftStoreClearVote(SRaftStore *pRaftStore); -void raftStoreNextTerm(SRaftStore *pRaftStore); -void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); -int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson); -cJSON *raftStore2Json(SRaftStore *pRaftStore); -char *raftStore2Str(SRaftStore *pRaftStore); - -// for debug ------------------- -void raftStorePrint(SRaftStore *pObj); -void raftStorePrint2(char *s, SRaftStore *pObj); -void raftStoreLog(SRaftStore *pObj); -void raftStoreLog2(char *s, SRaftStore *pObj); +bool raftStoreHasVoted(SRaftStore *pRaftStore); +void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId); +void raftStoreClearVote(SRaftStore *pRaftStore); +void raftStoreNextTerm(SRaftStore *pRaftStore); +void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index d4a1d35c74..2cdb18a109 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -13,33 +13,33 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "syncRaftLog.h" #include "syncRaftCfg.h" #include "syncRaftStore.h" -//------------------------------- // log[m .. n] // public function static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex); - static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index); static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); static SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore); +static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry); -static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry); - -//------------------------------- SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { - SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); - ASSERT(pLogStore != NULL); + SSyncLogStore* pLogStore = taosMemoryCalloc(1, sizeof(SSyncLogStore)); + if (pLogStore == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5); if (pLogStore->pCache == NULL) { - terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; taosMemoryFree(pLogStore); + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return NULL; } @@ -96,7 +96,6 @@ void logStoreDestory(SSyncLogStore* pLogStore) { } } -//------------------------------- // log[m .. n] static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex) { ASSERT(snapshotIndex >= 0); diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 1c96e70858..e328ed3d31 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -15,8 +15,6 @@ #define _DEFAULT_SOURCE #include "syncRaftStore.h" -#include "cJSON.h" -#include "syncEnv.h" #include "syncUtil.h" // private function @@ -27,22 +25,19 @@ static bool raftStoreFileExist(char *path); SRaftStore *raftStoreOpen(const char *path) { int32_t ret; - SRaftStore *pRaftStore = taosMemoryMalloc(sizeof(SRaftStore)); + SRaftStore *pRaftStore = taosMemoryCalloc(1, sizeof(SRaftStore)); if (pRaftStore == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - memset(pRaftStore, 0, sizeof(*pRaftStore)); + snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); - - char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0}; - memset(storeBuf, 0, sizeof(storeBuf)); - if (!raftStoreFileExist(pRaftStore->path)) { ret = raftStoreInit(pRaftStore); ASSERT(ret == 0); } + char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0}; pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE); ASSERT(pRaftStore->pFile != NULL); @@ -73,9 +68,7 @@ static int32_t raftStoreInit(SRaftStore *pRaftStore) { } int32_t raftStoreClose(SRaftStore *pRaftStore) { - if (pRaftStore == NULL) { - return 0; - } + if (pRaftStore == NULL) return 0; taosCloseFile(&pRaftStore->pFile); taosMemoryFree(pRaftStore); @@ -183,69 +176,3 @@ void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) { pRaftStore->currentTerm = term; raftStorePersist(pRaftStore); } - -int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; } - -cJSON *raftStore2Json(SRaftStore *pRaftStore) { - char u64buf[128] = {0}; - cJSON *pRoot = cJSON_CreateObject(); - - if (pRaftStore != NULL) { - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->currentTerm); - cJSON_AddStringToObject(pRoot, "currentTerm", u64buf); - - cJSON *pVoteFor = cJSON_CreateObject(); - snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->voteFor.addr); - cJSON_AddStringToObject(pVoteFor, "addr", u64buf); - { - uint64_t u64 = pRaftStore->voteFor.addr; - char host[128] = {0}; - uint16_t port; - syncUtilU642Addr(u64, host, sizeof(host), &port); - cJSON_AddStringToObject(pVoteFor, "addr_host", host); - cJSON_AddNumberToObject(pVoteFor, "addr_port", port); - } - cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId); - cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor); - - int hasVoted = raftStoreHasVoted(pRaftStore); - cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted); - } - - cJSON *pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SRaftStore", pRoot); - return pJson; -} - -char *raftStore2Str(SRaftStore *pRaftStore) { - cJSON *pJson = raftStore2Json(pRaftStore); - char *serialized = cJSON_Print(pJson); - cJSON_Delete(pJson); - return serialized; -} - -// for debug ------------------- -void raftStorePrint(SRaftStore *pObj) { - char *serialized = raftStore2Str(pObj); - printf("raftStorePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); - fflush(NULL); - taosMemoryFree(serialized); -} - -void raftStorePrint2(char *s, SRaftStore *pObj) { - char *serialized = raftStore2Str(pObj); - printf("raftStorePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); - fflush(NULL); - taosMemoryFree(serialized); -} -void raftStoreLog(SRaftStore *pObj) { - char *serialized = raftStore2Str(pObj); - sTrace("raftStoreLog | len:%d | %s", (int32_t)strlen(serialized), serialized); - taosMemoryFree(serialized); -} - -void raftStoreLog2(char *s, SRaftStore *pObj) { - char *serialized = raftStore2Str(pObj); - sTrace("raftStoreLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); - taosMemoryFree(serialized); -} diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h index a1881bf1b3..34ce8c3175 100644 --- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h +++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h @@ -63,6 +63,14 @@ void raftEntryCachePrint2(char* s, SRaftEntryCache* pObj); void raftEntryCacheLog(SRaftEntryCache* pObj); void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj); +int32_t raftStoreFromJson(SRaftStore* pRaftStore, cJSON* pJson); +cJSON* raftStore2Json(SRaftStore* pRaftStore); +char* raftStore2Str(SRaftStore* pRaftStore); +void raftStorePrint(SRaftStore* pObj); +void raftStorePrint2(char* s, SRaftStore* pObj); +void raftStoreLog(SRaftStore* pObj); +void raftStoreLog2(char* s, SRaftStore* pObj); + cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg); char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg); void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg); @@ -99,6 +107,8 @@ char* snapshotSender2Str(SSyncSnapshotSender* pSender); cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver); char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver); + + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/test/sync_test_lib/src/syncRaftStoreDebug.c b/source/libs/sync/test/sync_test_lib/src/syncRaftStoreDebug.c new file mode 100644 index 0000000000..0c4dd95e48 --- /dev/null +++ b/source/libs/sync/test/sync_test_lib/src/syncRaftStoreDebug.c @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "syncTest.h" +#include "cJSON.h" + +int32_t raftStoreFromJson(SRaftStore *pRaftStore, cJSON *pJson) { return 0; } + +cJSON *raftStore2Json(SRaftStore *pRaftStore) { + char u64buf[128] = {0}; + cJSON *pRoot = cJSON_CreateObject(); + + if (pRaftStore != NULL) { + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->currentTerm); + cJSON_AddStringToObject(pRoot, "currentTerm", u64buf); + + cJSON *pVoteFor = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64 "", pRaftStore->voteFor.addr); + cJSON_AddStringToObject(pVoteFor, "addr", u64buf); + { + uint64_t u64 = pRaftStore->voteFor.addr; + char host[128] = {0}; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pVoteFor, "addr_host", host); + cJSON_AddNumberToObject(pVoteFor, "addr_port", port); + } + cJSON_AddNumberToObject(pVoteFor, "vgId", pRaftStore->voteFor.vgId); + cJSON_AddItemToObject(pRoot, "voteFor", pVoteFor); + + int hasVoted = raftStoreHasVoted(pRaftStore); + cJSON_AddNumberToObject(pRoot, "hasVoted", hasVoted); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SRaftStore", pRoot); + return pJson; +} + +char *raftStore2Str(SRaftStore *pRaftStore) { + cJSON *pJson = raftStore2Json(pRaftStore); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ------------------- +void raftStorePrint(SRaftStore *pObj) { + char *serialized = raftStore2Str(pObj); + printf("raftStorePrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void raftStorePrint2(char *s, SRaftStore *pObj) { + char *serialized = raftStore2Str(pObj); + printf("raftStorePrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} +void raftStoreLog(SRaftStore *pObj) { + char *serialized = raftStore2Str(pObj); + sTrace("raftStoreLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void raftStoreLog2(char *s, SRaftStore *pObj) { + char *serialized = raftStore2Str(pObj); + sTrace("raftStoreLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); +} \ No newline at end of file