new fileIO
This commit is contained in:
parent
8c289ed38b
commit
a78d7028ef
|
@ -35,7 +35,8 @@ typedef struct SRaftStore {
|
|||
SyncTerm currentTerm;
|
||||
SRaftId voteFor;
|
||||
// FileFd fd;
|
||||
char path[RAFT_STORE_PATH_LEN];
|
||||
TdFilePtr pFile;
|
||||
char path[RAFT_STORE_PATH_LEN];
|
||||
} SRaftStore;
|
||||
|
||||
SRaftStore *raftStoreOpen(const char *path);
|
||||
|
|
|
@ -193,7 +193,7 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
SSyncIO *io = param;
|
||||
|
||||
STaosQall *qall;
|
||||
SRpcMsg *pRpcMsg, rpcMsg;
|
||||
SRpcMsg * pRpcMsg, rpcMsg;
|
||||
int type;
|
||||
|
||||
qall = taosAllocateQall();
|
||||
|
|
|
@ -18,21 +18,125 @@
|
|||
|
||||
// to complie success: FileIO interface is modified
|
||||
|
||||
SRaftStore *raftStoreOpen(const char *path) { return NULL; }
|
||||
SRaftStore *raftStoreOpen(const char *path) {
|
||||
int32_t ret;
|
||||
|
||||
static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0; }
|
||||
SRaftStore *pRaftStore = malloc(sizeof(SRaftStore));
|
||||
if (pRaftStore == NULL) {
|
||||
sError("raftStoreOpen malloc error");
|
||||
return NULL;
|
||||
}
|
||||
memset(pRaftStore, 0, sizeof(*pRaftStore));
|
||||
snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path);
|
||||
|
||||
int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0; }
|
||||
char storeBuf[RAFT_STORE_BLOCK_SIZE];
|
||||
memset(storeBuf, 0, sizeof(storeBuf));
|
||||
|
||||
int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0; }
|
||||
if (!raftStoreFileExist(pRaftStore->path)) {
|
||||
ret = raftStoreInit(pRaftStore);
|
||||
assert(ret == 0);
|
||||
}
|
||||
|
||||
static bool raftStoreFileExist(char *path) { return 0; }
|
||||
pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
|
||||
assert(pRaftStore->pFile != NULL);
|
||||
|
||||
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0; }
|
||||
int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE);
|
||||
assert(len == RAFT_STORE_BLOCK_SIZE);
|
||||
|
||||
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0; }
|
||||
ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
|
||||
assert(ret == 0);
|
||||
|
||||
void raftStorePrint(SRaftStore *pRaftStore) {}
|
||||
return pRaftStore;
|
||||
}
|
||||
|
||||
static int32_t raftStoreInit(SRaftStore *pRaftStore) {
|
||||
assert(pRaftStore != NULL);
|
||||
|
||||
pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CTEATE | TD_FILE_WRITE);
|
||||
assert(pRaftStore->pFile != NULL);
|
||||
|
||||
pRaftStore->currentTerm = 0;
|
||||
pRaftStore->voteFor.addr = 0;
|
||||
pRaftStore->voteFor.vgId = 0;
|
||||
|
||||
int32_t ret = raftStorePersist(pRaftStore);
|
||||
assert(ret == 0);
|
||||
|
||||
taosCloseFile(&pRaftStore->pFile);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t raftStoreClose(SRaftStore *pRaftStore) {
|
||||
assert(pRaftStore != NULL);
|
||||
|
||||
taosCloseFile(&pRaftStore->pFile);
|
||||
free(pRaftStore);
|
||||
pRaftStore = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t raftStorePersist(SRaftStore *pRaftStore) {
|
||||
assert(pRaftStore != NULL);
|
||||
|
||||
int32_t ret;
|
||||
char storeBuf[RAFT_STORE_BLOCK_SIZE];
|
||||
ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
|
||||
assert(ret == 0);
|
||||
|
||||
taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET);
|
||||
|
||||
ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf));
|
||||
assert(ret == RAFT_STORE_BLOCK_SIZE);
|
||||
|
||||
taosFsyncFile(pRaftStore->pFile);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; }
|
||||
|
||||
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
|
||||
assert(pRaftStore != NULL);
|
||||
|
||||
cJSON *pRoot = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm);
|
||||
cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr);
|
||||
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
|
||||
|
||||
char *serialized = cJSON_Print(pRoot);
|
||||
int len2 = strlen(serialized);
|
||||
assert(len2 < len);
|
||||
memset(buf, 0, len);
|
||||
snprintf(buf, len, "%s", serialized);
|
||||
free(serialized);
|
||||
|
||||
cJSON_Delete(pRoot);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
|
||||
assert(pRaftStore != NULL);
|
||||
|
||||
assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
|
||||
cJSON *pRoot = cJSON_Parse(buf);
|
||||
|
||||
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
|
||||
pRaftStore->currentTerm = pCurrentTerm->valueint;
|
||||
|
||||
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
|
||||
pRaftStore->voteFor.addr = pVoteForAddr->valueint;
|
||||
|
||||
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
|
||||
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
|
||||
|
||||
cJSON_Delete(pRoot);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void raftStorePrint(SRaftStore *pRaftStore) {
|
||||
char storeBuf[RAFT_STORE_BLOCK_SIZE];
|
||||
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
|
||||
printf("%s\n", storeBuf);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ add_executable(syncIOTickPingTest "")
|
|||
add_executable(syncIOSendMsgTest "")
|
||||
add_executable(syncIOSendMsgClientTest "")
|
||||
add_executable(syncIOSendMsgServerTest "")
|
||||
add_executable(syncRaftStoreTest "")
|
||||
|
||||
|
||||
target_sources(syncTest
|
||||
|
@ -45,6 +46,10 @@ target_sources(syncIOSendMsgServerTest
|
|||
PRIVATE
|
||||
"syncIOSendMsgServerTest.cpp"
|
||||
)
|
||||
target_sources(syncRaftStoreTest
|
||||
PRIVATE
|
||||
"syncRaftStoreTest.cpp"
|
||||
)
|
||||
|
||||
|
||||
target_include_directories(syncTest
|
||||
|
@ -92,6 +97,11 @@ target_include_directories(syncIOSendMsgServerTest
|
|||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncRaftStoreTest
|
||||
PUBLIC
|
||||
"${CMAKE_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
|
||||
target_link_libraries(syncTest
|
||||
|
@ -130,6 +140,10 @@ target_link_libraries(syncIOSendMsgServerTest
|
|||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncRaftStoreTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
|
||||
|
||||
enable_testing()
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
#include "syncRaftStore.h"
|
||||
#include <stdio.h>
|
||||
#include "gtest/gtest.h"
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
|
||||
void *pingFunc(void *param) {
|
||||
SSyncIO *io = (SSyncIO *)param;
|
||||
while (1) {
|
||||
sDebug("io->ping");
|
||||
// io->ping(io);
|
||||
sleep(1);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int main() {
|
||||
// taosInitLog((char *)"syncTest.log", 100000, 10);
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = 143 + 64;
|
||||
|
||||
sTrace("sync log test: trace");
|
||||
sDebug("sync log test: debug");
|
||||
sInfo("sync log test: info");
|
||||
sWarn("sync log test: warn");
|
||||
sError("sync log test: error");
|
||||
sFatal("sync log test: fatal");
|
||||
|
||||
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
|
||||
assert(pRaftStore != NULL);
|
||||
|
||||
raftStorePrint(pRaftStore);
|
||||
|
||||
pRaftStore->currentTerm = 100;
|
||||
pRaftStore->voteFor.addr = 200;
|
||||
pRaftStore->voteFor.vgId = 300;
|
||||
|
||||
raftStorePrint(pRaftStore);
|
||||
raftStorePersist(pRaftStore);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue