refactor(sync): add syncAppendEntriesBatch
This commit is contained in:
parent
c3b2b98454
commit
1a8cf049b7
|
@ -236,6 +236,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_BATCH, "sync-append-entries-batch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL)
|
||||
|
|
|
@ -325,22 +325,49 @@ void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
|
|||
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SOffsetAndContLen {
|
||||
int32_t offset;
|
||||
int32_t contLen;
|
||||
} SOffsetAndContLen;
|
||||
|
||||
typedef struct SyncAppendEntriesBatch {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
|
||||
// private data
|
||||
SyncTerm term;
|
||||
SyncIndex prevLogIndex;
|
||||
SyncTerm prevLogTerm;
|
||||
SyncIndex commitIndex;
|
||||
SyncTerm privateTerm;
|
||||
int32_t dataCount;
|
||||
uint32_t dataLen;
|
||||
char data[];
|
||||
} SyncAppendEntriesBatch;
|
||||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId);
|
||||
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
|
||||
char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len);
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len);
|
||||
void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg);
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg);
|
||||
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize,
|
||||
int32_t* pRetArrSize);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncAppendEntriesReply {
|
||||
uint32_t bytes;
|
||||
|
|
|
@ -40,8 +40,8 @@ typedef struct SSyncSnapshotSender {
|
|||
bool start;
|
||||
int32_t seq;
|
||||
int32_t ack;
|
||||
void *pReader;
|
||||
void *pCurrentBlock;
|
||||
void * pReader;
|
||||
void * pCurrentBlock;
|
||||
int32_t blockLen;
|
||||
SSnapshot snapshot;
|
||||
SSyncCfg lastConfig;
|
||||
|
@ -62,14 +62,14 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
|||
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
||||
|
||||
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
|
||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
|
||||
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
|
||||
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
|
||||
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
|
||||
|
||||
//---------------------------------------------------
|
||||
typedef struct SSyncSnapshotReceiver {
|
||||
bool start;
|
||||
int32_t ack;
|
||||
void *pWriter;
|
||||
void * pWriter;
|
||||
SyncTerm term;
|
||||
SyncTerm privateTerm;
|
||||
SSnapshot snapshot;
|
||||
|
@ -85,8 +85,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
|||
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
|
||||
|
||||
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
|
||||
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
|
||||
|
||||
//---------------------------------------------------
|
||||
// on message
|
||||
|
|
|
@ -591,7 +591,7 @@ void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLe
|
|||
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncPingReply) + pMsg->dataLen);
|
||||
}
|
||||
|
||||
char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) {
|
||||
|
@ -1426,6 +1426,279 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncAppendEntriesBatch----
|
||||
|
||||
// block1: SOffsetAndContLen
|
||||
// block2: SOffsetAndContLen Array
|
||||
// block3: SRpcMsg Array
|
||||
// block4: SRpcMsg pCont Array
|
||||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId) {
|
||||
ASSERT(rpcMsgArr != NULL);
|
||||
ASSERT(arrSize > 0);
|
||||
|
||||
int32_t dataLen = 0;
|
||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen>
|
||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; // SRpcMsg
|
||||
int32_t contArrayLen = 0;
|
||||
for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont
|
||||
contArrayLen += rpcMsgArr[i].contLen;
|
||||
}
|
||||
dataLen += (metaArrayLen + rpcArrayLen + contArrayLen);
|
||||
|
||||
uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen;
|
||||
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_BATCH;
|
||||
pMsg->dataCount = arrSize;
|
||||
pMsg->dataLen = dataLen;
|
||||
|
||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + metaArrayLen);
|
||||
char* pData = pMsg->data;
|
||||
|
||||
for (int i = 0; i < arrSize; ++i) {
|
||||
// init <offset, contLen>
|
||||
if (i == 0) {
|
||||
metaArr[i].offset = metaArrayLen + rpcArrayLen;
|
||||
metaArr[i].contLen = rpcMsgArr[i].contLen;
|
||||
} else {
|
||||
metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen;
|
||||
metaArr[i].contLen = rpcMsgArr[i].contLen;
|
||||
}
|
||||
|
||||
// init msgArr
|
||||
msgArr[i] = rpcMsgArr[i];
|
||||
|
||||
// init data
|
||||
ASSERT(rpcMsgArr[i].contLen == metaArr[i].contLen);
|
||||
memcpy(pData + metaArr[i].offset, rpcMsgArr[i].pCont, rpcMsgArr[i].contLen);
|
||||
}
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncAppendEntriesBatch) + pMsg->dataLen);
|
||||
}
|
||||
|
||||
char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncAppendEntriesBatchSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncAppendEntriesBatchDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncAppendEntriesBatchSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg) {
|
||||
syncAppendEntriesBatchDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) {
|
||||
char u64buf[128] = {0};
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->prevLogIndex);
|
||||
cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm);
|
||||
cJSON_AddStringToObject(pRoot, "prevLogTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->commitIndex);
|
||||
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm);
|
||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount);
|
||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||
|
||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // <offset, contLen>
|
||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount; // SRpcMsg
|
||||
int32_t contArrayLen = pMsg->dataLen - metaArrayLen - rpcArrayLen;
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen);
|
||||
cJSON_AddNumberToObject(pRoot, "rpcArrayLen", rpcArrayLen);
|
||||
cJSON_AddNumberToObject(pRoot, "contArrayLen", contArrayLen);
|
||||
|
||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)(pMsg->data + metaArrayLen);
|
||||
void* pData = (void*)(pMsg->data + metaArrayLen + rpcArrayLen);
|
||||
|
||||
cJSON* pMetaArr = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
|
||||
for (int i = 0; i < pMsg->dataCount; ++i) {
|
||||
cJSON* pMeta = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(pMeta, "offset", metaArr[i].offset);
|
||||
cJSON_AddNumberToObject(pMeta, "contLen", metaArr[i].contLen);
|
||||
cJSON_AddItemToArray(pMetaArr, pMeta);
|
||||
}
|
||||
|
||||
cJSON* pMsgArr = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr);
|
||||
for (int i = 0; i < pMsg->dataCount; ++i) {
|
||||
cJSON* pRpcMsgJson = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(pRpcMsgJson, "code", msgArr[i].code);
|
||||
cJSON_AddNumberToObject(pRpcMsgJson, "contLen", msgArr[i].contLen);
|
||||
cJSON_AddNumberToObject(pRpcMsgJson, "msgType", msgArr[i].msgType);
|
||||
cJSON_AddItemToArray(pMsgArr, pRpcMsgJson);
|
||||
}
|
||||
|
||||
char* s;
|
||||
s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data", s);
|
||||
taosMemoryFree(s);
|
||||
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||
taosMemoryFree(s);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncAppendEntriesBatch", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) {
|
||||
cJSON* pJson = syncAppendEntriesBatch2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize,
|
||||
int32_t* pRetArrSize) {
|
||||
if (pRetArrSize != NULL) {
|
||||
*pRetArrSize = pSyncMsg->dataCount;
|
||||
}
|
||||
|
||||
int32_t arrSize = pSyncMsg->dataCount;
|
||||
if (arrSize > maxArrSize) {
|
||||
arrSize = maxArrSize;
|
||||
}
|
||||
|
||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pSyncMsg->dataCount; // <offset, contLen>
|
||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * pSyncMsg->dataCount; // SRpcMsg
|
||||
int32_t contArrayLen = pSyncMsg->dataLen - metaArrayLen - rpcArrayLen;
|
||||
|
||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pSyncMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)(pSyncMsg->data + metaArrayLen);
|
||||
void* pData = pSyncMsg->data + metaArrayLen + rpcArrayLen;
|
||||
|
||||
for (int i = 0; i < arrSize; ++i) {
|
||||
rpcMsgArr[i] = msgArr[i];
|
||||
rpcMsgArr[i].pCont = rpcMallocCont(msgArr[i].contLen);
|
||||
void* pRpcCont = pSyncMsg->data + metaArr[i].offset;
|
||||
memcpy(rpcMsgArr[i].pCont, pRpcCont, rpcMsgArr[i].contLen);
|
||||
}
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) {
|
||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||
printf("syncAppendEntriesBatchPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg) {
|
||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||
printf("syncAppendEntriesBatchPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg) {
|
||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||
sTrace("syncAppendEntriesBatchLog | len:%lu | %s", strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||
sTraceLong("syncAppendEntriesBatchLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncAppendEntriesReply----
|
||||
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncAppendEntriesReply);
|
||||
|
|
|
@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
|||
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||
cJSON *pJson = syncCfg2Json(pSyncCfg);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
|||
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
|
||||
if (pSyncCfg != NULL) {
|
||||
int32_t len = 512;
|
||||
char *s = taosMemoryMalloc(len);
|
||||
char * s = taosMemoryMalloc(len);
|
||||
memset(s, 0, len);
|
||||
|
||||
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
|
||||
|
@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
|||
|
||||
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
|
||||
cJSON *pJson = raftCfg2Json(pRaftCfg);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
|||
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
|
||||
}
|
||||
|
||||
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||
ASSERT(code == 0);
|
||||
|
||||
|
|
|
@ -349,14 +349,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
|||
|
||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
||||
cJSON *pJson = snapshotSender2Json(pSender);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
||||
int32_t len = 256;
|
||||
char *s = taosMemoryMalloc(len);
|
||||
char * s = taosMemoryMalloc(len);
|
||||
|
||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
char host[64];
|
||||
|
@ -604,7 +604,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pReceiver->fromId.addr;
|
||||
cJSON *pTmp = pFromId;
|
||||
cJSON * pTmp = pFromId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
|
@ -637,14 +637,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
|
||||
int32_t len = 256;
|
||||
char *s = taosMemoryMalloc(len);
|
||||
char * s = taosMemoryMalloc(len);
|
||||
|
||||
SRaftId fromId = pReceiver->fromId;
|
||||
char host[128];
|
||||
|
|
|
@ -21,6 +21,7 @@ add_executable(syncEntryCacheTest "")
|
|||
add_executable(syncRequestVoteTest "")
|
||||
add_executable(syncRequestVoteReplyTest "")
|
||||
add_executable(syncAppendEntriesTest "")
|
||||
add_executable(syncAppendEntriesBatchTest "")
|
||||
add_executable(syncAppendEntriesReplyTest "")
|
||||
add_executable(syncClientRequestTest "")
|
||||
add_executable(syncTimeoutTest "")
|
||||
|
@ -146,6 +147,10 @@ target_sources(syncAppendEntriesTest
|
|||
PRIVATE
|
||||
"syncAppendEntriesTest.cpp"
|
||||
)
|
||||
target_sources(syncAppendEntriesBatchTest
|
||||
PRIVATE
|
||||
"syncAppendEntriesBatchTest.cpp"
|
||||
)
|
||||
target_sources(syncAppendEntriesReplyTest
|
||||
PRIVATE
|
||||
"syncAppendEntriesReplyTest.cpp"
|
||||
|
@ -387,6 +392,11 @@ target_include_directories(syncAppendEntriesTest
|
|||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncAppendEntriesBatchTest
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncAppendEntriesReplyTest
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
|
@ -636,6 +646,10 @@ target_link_libraries(syncAppendEntriesTest
|
|||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncAppendEntriesBatchTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncAppendEntriesReplyTest
|
||||
sync
|
||||
gtest_main
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
//#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncUtil.h"
|
||||
#include "trpc.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
|
||||
SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg));
|
||||
memset(pRpcMsg, 0, sizeof(SRpcMsg));
|
||||
|
||||
pRpcMsg->msgType = TDMT_SYNC_PING;
|
||||
pRpcMsg->contLen = dataLen;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
pRpcMsg->code = 10 * i;
|
||||
snprintf((char *)pRpcMsg->pCont, pRpcMsg->contLen, "value_%d", i);
|
||||
|
||||
return pRpcMsg;
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch *createMsg() {
|
||||
SRpcMsg rpcMsgArr[5];
|
||||
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
||||
|
||||
for (int32_t i = 0; i < 5; ++i) {
|
||||
SRpcMsg *pRpcMsg = createRpcMsg(i, 20);
|
||||
rpcMsgArr[i] = *pRpcMsg;
|
||||
taosMemoryFree(pRpcMsg);
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, 5, 1234);
|
||||
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
|
||||
pMsg->srcId.vgId = 100;
|
||||
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
||||
pMsg->destId.vgId = 100;
|
||||
pMsg->prevLogIndex = 11;
|
||||
pMsg->prevLogTerm = 22;
|
||||
pMsg->commitIndex = 33;
|
||||
pMsg->privateTerm = 44;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void test1() {
|
||||
SyncAppendEntriesBatch *pMsg = createMsg();
|
||||
syncAppendEntriesBatchLog2((char *)"test1:", pMsg);
|
||||
|
||||
SRpcMsg rpcMsgArr[5];
|
||||
int32_t retArrSize;
|
||||
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, 5, &retArrSize);
|
||||
for (int i = 0; i < retArrSize; ++i) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "==test1 decode rpc msg %d: msgType:%d, code:%d, contLen:%d, pCont:%s \n", i,
|
||||
rpcMsgArr[i].msgType, rpcMsgArr[i].code, rpcMsgArr[i].contLen, (char *)rpcMsgArr[i].pCont);
|
||||
sTrace("%s", logBuf);
|
||||
}
|
||||
|
||||
syncAppendEntriesBatchDestroy(pMsg);
|
||||
}
|
||||
|
||||
/*
|
||||
void test2() {
|
||||
SyncAppendEntries *pMsg = createMsg();
|
||||
uint32_t len = pMsg->bytes;
|
||||
char * serialized = (char *)taosMemoryMalloc(len);
|
||||
syncAppendEntriesSerialize(pMsg, serialized, len);
|
||||
SyncAppendEntries *pMsg2 = syncAppendEntriesBuild(pMsg->dataLen, 1000);
|
||||
syncAppendEntriesDeserialize(serialized, len, pMsg2);
|
||||
syncAppendEntriesLog2((char *)"test2: syncAppendEntriesSerialize -> syncAppendEntriesDeserialize ", pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncAppendEntriesDestroy(pMsg);
|
||||
syncAppendEntriesDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test3() {
|
||||
SyncAppendEntries *pMsg = createMsg();
|
||||
uint32_t len;
|
||||
char * serialized = syncAppendEntriesSerialize2(pMsg, &len);
|
||||
SyncAppendEntries *pMsg2 = syncAppendEntriesDeserialize2(serialized, len);
|
||||
syncAppendEntriesLog2((char *)"test3: syncAppendEntriesSerialize3 -> syncAppendEntriesDeserialize2 ", pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncAppendEntriesDestroy(pMsg);
|
||||
syncAppendEntriesDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test4() {
|
||||
SyncAppendEntries *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncAppendEntries *pMsg2 = (SyncAppendEntries *)taosMemoryMalloc(rpcMsg.contLen);
|
||||
syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2);
|
||||
syncAppendEntriesLog2((char *)"test4: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg ", pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncAppendEntriesDestroy(pMsg);
|
||||
syncAppendEntriesDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test5() {
|
||||
SyncAppendEntries *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncAppendEntries *pMsg2 = syncAppendEntriesFromRpcMsg2(&rpcMsg);
|
||||
syncAppendEntriesLog2((char *)"test5: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg2 ", pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncAppendEntriesDestroy(pMsg);
|
||||
syncAppendEntriesDestroy(pMsg2);
|
||||
}
|
||||
*/
|
||||
|
||||
int main() {
|
||||
gRaftDetailLog = true;
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||
logTest();
|
||||
|
||||
test1();
|
||||
|
||||
/*
|
||||
test2();
|
||||
test3();
|
||||
test4();
|
||||
test5();
|
||||
*/
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue