Merge pull request #16728 from taosdata/feature/3.0_mhli
refactor(sync): add syncHeartbeat, syncHeartbeatReply
This commit is contained in:
commit
e0d5ec3962
|
@ -261,6 +261,8 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_BATCH, "sync-append-entries-batch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL)
|
||||
|
|
|
@ -444,6 +444,70 @@ void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
|
|||
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
|
||||
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncHeartbeat {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
|
||||
// private data
|
||||
SyncTerm term;
|
||||
SyncIndex commitIndex;
|
||||
SyncTerm privateTerm;
|
||||
} SyncHeartbeat;
|
||||
|
||||
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId);
|
||||
void syncHeartbeatDestroy(SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg);
|
||||
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len);
|
||||
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len);
|
||||
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg);
|
||||
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg);
|
||||
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncHeartbeatPrint(const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatLog(const SyncHeartbeat* pMsg);
|
||||
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncHeartbeatReply {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
|
||||
// private data
|
||||
SyncTerm term;
|
||||
SyncTerm privateTerm;
|
||||
int64_t startTime;
|
||||
} SyncHeartbeatReply;
|
||||
|
||||
SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId);
|
||||
void syncHeartbeatReplyDestroy(SyncHeartbeatReply* pMsg);
|
||||
void syncHeartbeatReplySerialize(const SyncHeartbeatReply* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncHeartbeatReplyDeserialize(const char* buf, uint32_t len, SyncHeartbeatReply* pMsg);
|
||||
char* syncHeartbeatReplySerialize2(const SyncHeartbeatReply* pMsg, uint32_t* len);
|
||||
SyncHeartbeatReply* syncHeartbeatReplyDeserialize2(const char* buf, uint32_t len);
|
||||
void syncHeartbeatReply2RpcMsg(const SyncHeartbeatReply* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncHeartbeatReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeatReply* pMsg);
|
||||
SyncHeartbeatReply* syncHeartbeatReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncHeartbeatReply2Json(const SyncHeartbeatReply* pMsg);
|
||||
char* syncHeartbeatReply2Str(const SyncHeartbeatReply* pMsg);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncHeartbeatReplyPrint(const SyncHeartbeatReply* pMsg);
|
||||
void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg);
|
||||
void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg);
|
||||
void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncApplyMsg {
|
||||
uint32_t bytes;
|
||||
|
|
|
@ -1992,6 +1992,313 @@ void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncHeartbeat----
|
||||
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncHeartbeat);
|
||||
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncHeartbeatDestroy(SyncHeartbeat* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
}
|
||||
|
||||
char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncHeartbeatSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncHeartbeat* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncHeartbeatDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncHeartbeatSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg) {
|
||||
syncHeartbeatDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncHeartbeat* pMsg = syncHeartbeatDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg) {
|
||||
char u64buf[128] = {0};
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->commitIndex);
|
||||
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncHeartbeat", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncHeartbeat2Str(const SyncHeartbeat* pMsg) {
|
||||
cJSON* pJson = syncHeartbeat2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
void syncHeartbeatPrint(const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
printf("syncHeartbeatPrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
printf("syncHeartbeatPrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatLog(const SyncHeartbeat* pMsg) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
sTrace("syncHeartbeatLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncHeartbeat2Str(pMsg);
|
||||
sTrace("syncHeartbeatLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncHeartbeatReply----
|
||||
SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncHeartbeatReply);
|
||||
SyncHeartbeatReply* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncHeartbeatReplyDestroy(SyncHeartbeatReply* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncHeartbeatReplySerialize(const SyncHeartbeatReply* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncHeartbeatReplyDeserialize(const char* buf, uint32_t len, SyncHeartbeatReply* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
}
|
||||
|
||||
char* syncHeartbeatReplySerialize2(const SyncHeartbeatReply* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncHeartbeatReplySerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncHeartbeatReply* syncHeartbeatReplyDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncHeartbeatReply* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncHeartbeatReplyDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncHeartbeatReply2RpcMsg(const SyncHeartbeatReply* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncHeartbeatReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncHeartbeatReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeatReply* pMsg) {
|
||||
syncHeartbeatReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncHeartbeatReply* syncHeartbeatReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncHeartbeatReply* pMsg = syncHeartbeatReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncHeartbeatReply2Json(const SyncHeartbeatReply* pMsg) {
|
||||
char u64buf[128] = {0};
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
|
||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
|
||||
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncHeartbeatReply", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncHeartbeatReply2Str(const SyncHeartbeatReply* pMsg) {
|
||||
cJSON* pJson = syncHeartbeatReply2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
void syncHeartbeatReplyPrint(const SyncHeartbeatReply* pMsg) {
|
||||
char* serialized = syncHeartbeatReply2Str(pMsg);
|
||||
printf("syncHeartbeatReplyPrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg) {
|
||||
char* serialized = syncHeartbeatReply2Str(pMsg);
|
||||
printf("syncHeartbeatReplyPrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg) {
|
||||
char* serialized = syncHeartbeatReply2Str(pMsg);
|
||||
sTrace("syncHeartbeatReplyLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncHeartbeatReply2Str(pMsg);
|
||||
sTrace("syncHeartbeatReplyLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncApplyMsg----
|
||||
SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) {
|
||||
uint32_t bytes = sizeof(SyncApplyMsg) + dataLen;
|
||||
|
|
|
@ -57,6 +57,8 @@ add_executable(syncLeaderTransferTest "")
|
|||
add_executable(syncReconfigFinishTest "")
|
||||
add_executable(syncRestoreFromSnapshot "")
|
||||
add_executable(syncRaftCfgIndexTest "")
|
||||
add_executable(syncHeartbeatTest "")
|
||||
add_executable(syncHeartbeatReplyTest "")
|
||||
|
||||
|
||||
target_sources(syncTest
|
||||
|
@ -295,6 +297,14 @@ target_sources(syncRaftCfgIndexTest
|
|||
PRIVATE
|
||||
"syncRaftCfgIndexTest.cpp"
|
||||
)
|
||||
target_sources(syncHeartbeatTest
|
||||
PRIVATE
|
||||
"syncHeartbeatTest.cpp"
|
||||
)
|
||||
target_sources(syncHeartbeatReplyTest
|
||||
PRIVATE
|
||||
"syncHeartbeatReplyTest.cpp"
|
||||
)
|
||||
|
||||
|
||||
target_include_directories(syncTest
|
||||
|
@ -592,6 +602,16 @@ target_include_directories(syncRaftCfgIndexTest
|
|||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncHeartbeatTest
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncHeartbeatReplyTest
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
|
||||
|
||||
target_link_libraries(syncTest
|
||||
|
@ -830,6 +850,14 @@ target_link_libraries(syncRaftCfgIndexTest
|
|||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncHeartbeatTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncHeartbeatReplyTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
|
||||
|
||||
enable_testing()
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
SyncHeartbeatReply *createMsg() {
|
||||
SyncHeartbeatReply *pMsg = syncHeartbeatReplyBuild(1000);
|
||||
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
|
||||
pMsg->srcId.vgId = 100;
|
||||
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
||||
pMsg->destId.vgId = 100;
|
||||
|
||||
pMsg->term = 33;
|
||||
pMsg->privateTerm = 44;
|
||||
pMsg->startTime = taosGetTimestampMs();
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void test1() {
|
||||
SyncHeartbeatReply *pMsg = createMsg();
|
||||
syncHeartbeatReplyLog2((char *)"test1:", pMsg);
|
||||
syncHeartbeatReplyDestroy(pMsg);
|
||||
}
|
||||
|
||||
void test2() {
|
||||
SyncHeartbeatReply *pMsg = createMsg();
|
||||
uint32_t len = pMsg->bytes;
|
||||
char * serialized = (char *)taosMemoryMalloc(len);
|
||||
syncHeartbeatReplySerialize(pMsg, serialized, len);
|
||||
SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyBuild(1000);
|
||||
syncHeartbeatReplyDeserialize(serialized, len, pMsg2);
|
||||
syncHeartbeatReplyLog2((char *)"test2: syncHeartbeatReplySerialize -> syncHeartbeatReplyDeserialize ",
|
||||
pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncHeartbeatReplyDestroy(pMsg);
|
||||
syncHeartbeatReplyDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test3() {
|
||||
SyncHeartbeatReply *pMsg = createMsg();
|
||||
uint32_t len;
|
||||
char * serialized = syncHeartbeatReplySerialize2(pMsg, &len);
|
||||
SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyDeserialize2(serialized, len);
|
||||
syncHeartbeatReplyLog2((char *)"test3: syncHeartbeatReplySerialize3 -> syncHeartbeatReplyDeserialize2 ",
|
||||
pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncHeartbeatReplyDestroy(pMsg);
|
||||
syncHeartbeatReplyDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test4() {
|
||||
SyncHeartbeatReply *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncHeartbeatReply2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyBuild(1000);
|
||||
syncHeartbeatReplyFromRpcMsg(&rpcMsg, pMsg2);
|
||||
syncHeartbeatReplyLog2((char *)"test4: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg ",
|
||||
pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncHeartbeatReplyDestroy(pMsg);
|
||||
syncHeartbeatReplyDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test5() {
|
||||
SyncHeartbeatReply *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncHeartbeatReply2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncHeartbeatReply *pMsg2 = syncHeartbeatReplyFromRpcMsg2(&rpcMsg);
|
||||
syncHeartbeatReplyLog2((char *)"test5: syncHeartbeatReply2RpcMsg -> syncHeartbeatReplyFromRpcMsg2 ",
|
||||
pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncHeartbeatReplyDestroy(pMsg);
|
||||
syncHeartbeatReplyDestroy(pMsg2);
|
||||
}
|
||||
|
||||
int main() {
|
||||
gRaftDetailLog = true;
|
||||
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||
logTest();
|
||||
|
||||
test1();
|
||||
test2();
|
||||
test3();
|
||||
test4();
|
||||
test5();
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncUtil.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
SyncHeartbeat *createMsg() {
|
||||
SyncHeartbeat *pMsg = syncHeartbeatBuild(789);
|
||||
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
|
||||
pMsg->srcId.vgId = 100;
|
||||
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
||||
pMsg->destId.vgId = 100;
|
||||
pMsg->term = 8;
|
||||
pMsg->commitIndex = 33;
|
||||
pMsg->privateTerm = 44;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void test1() {
|
||||
SyncHeartbeat *pMsg = createMsg();
|
||||
syncHeartbeatLog2((char *)"test1:", pMsg);
|
||||
syncHeartbeatDestroy(pMsg);
|
||||
}
|
||||
|
||||
void test2() {
|
||||
SyncHeartbeat *pMsg = createMsg();
|
||||
uint32_t len = pMsg->bytes;
|
||||
char * serialized = (char *)taosMemoryMalloc(len);
|
||||
syncHeartbeatSerialize(pMsg, serialized, len);
|
||||
SyncHeartbeat *pMsg2 = syncHeartbeatBuild(789);
|
||||
syncHeartbeatDeserialize(serialized, len, pMsg2);
|
||||
syncHeartbeatLog2((char *)"test2: syncHeartbeatSerialize -> syncHeartbeatDeserialize ", pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncHeartbeatDestroy(pMsg);
|
||||
syncHeartbeatDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test3() {
|
||||
SyncHeartbeat *pMsg = createMsg();
|
||||
uint32_t len;
|
||||
char * serialized = syncHeartbeatSerialize2(pMsg, &len);
|
||||
SyncHeartbeat *pMsg2 = syncHeartbeatDeserialize2(serialized, len);
|
||||
syncHeartbeatLog2((char *)"test3: syncHeartbeatSerialize2 -> syncHeartbeatDeserialize2 ", pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncHeartbeatDestroy(pMsg);
|
||||
syncHeartbeatDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test4() {
|
||||
SyncHeartbeat *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncHeartbeat2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncHeartbeat *pMsg2 = (SyncHeartbeat *)taosMemoryMalloc(rpcMsg.contLen);
|
||||
syncHeartbeatFromRpcMsg(&rpcMsg, pMsg2);
|
||||
syncHeartbeatLog2((char *)"test4: syncHeartbeat2RpcMsg -> syncHeartbeatFromRpcMsg ", pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncHeartbeatDestroy(pMsg);
|
||||
syncHeartbeatDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test5() {
|
||||
SyncHeartbeat *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncHeartbeat2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncHeartbeat *pMsg2 =syncHeartbeatFromRpcMsg2(&rpcMsg);
|
||||
syncHeartbeatLog2((char *)"test5: syncHeartbeat2RpcMsg -> syncHeartbeatFromRpcMsg2 ", pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncHeartbeatDestroy(pMsg);
|
||||
syncHeartbeatDestroy(pMsg2);
|
||||
}
|
||||
|
||||
int main() {
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||
gRaftDetailLog = true;
|
||||
logTest();
|
||||
|
||||
test1();
|
||||
test2();
|
||||
test3();
|
||||
test4();
|
||||
test5();
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue