refactor(sync): add SyncAppendEntriesBatch

This commit is contained in:
Minghao Li 2022-07-02 16:01:47 +08:00
parent 086ec29ca0
commit 4fc500c91c
3 changed files with 53 additions and 69 deletions

View File

@ -392,9 +392,8 @@ typedef struct SyncAppendEntriesBatch {
char data[]; // block1, block2 char data[]; // block1, block2
} SyncAppendEntriesBatch; } SyncAppendEntriesBatch;
// SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId);
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId); SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId);
SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg); void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen); void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg); void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);

View File

@ -15,6 +15,7 @@
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftEntry.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "tcoding.h" #include "tcoding.h"
@ -1600,22 +1601,20 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
// block1: SOffsetAndContLen // block1: SOffsetAndContLen
// block2: SOffsetAndContLen Array // block2: SOffsetAndContLen Array
// block3: SRpcMsg Array // block3: entry Array
// block4: SRpcMsg pCont Array
/* SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) {
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId) { ASSERT(entryPArr != NULL);
ASSERT(rpcMsgArr != NULL);
ASSERT(arrSize > 0); ASSERT(arrSize > 0);
int32_t dataLen = 0; int32_t dataLen = 0;
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen> int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen>
int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; // SRpcMsg int32_t entryArrayLen = 0;
int32_t contArrayLen = 0;
for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont
contArrayLen += rpcMsgArr[i].contLen; SSyncRaftEntry* pEntry = entryPArr[i];
entryArrayLen += pEntry->bytes;
} }
dataLen += (metaArrayLen + rpcArrayLen + contArrayLen); dataLen += (metaArrayLen + entryArrayLen);
uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen; uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen;
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes); SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
@ -1627,37 +1626,28 @@ SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t
pMsg->dataLen = dataLen; pMsg->dataLen = dataLen;
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + metaArrayLen);
char* pData = pMsg->data; char* pData = pMsg->data;
for (int i = 0; i < arrSize; ++i) { for (int i = 0; i < arrSize; ++i) {
// init <offset, contLen> // init meta <offset, contLen>
if (i == 0) { if (i == 0) {
metaArr[i].offset = metaArrayLen + rpcArrayLen; metaArr[i].offset = metaArrayLen;
metaArr[i].contLen = rpcMsgArr[i].contLen; metaArr[i].contLen = entryPArr[i]->bytes;
} else { } else {
metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen; metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen;
metaArr[i].contLen = rpcMsgArr[i].contLen; metaArr[i].contLen = entryPArr[i]->bytes;
} }
// init msgArr // init entry array
msgArr[i] = rpcMsgArr[i]; ASSERT(metaArr[i].contLen == entryPArr[i]->bytes);
memcpy(pData + metaArr[i].offset, entryPArr[i], metaArr[i].contLen);
// init data
ASSERT(rpcMsgArr[i].contLen == metaArr[i].contLen);
memcpy(pData + metaArr[i].offset, rpcMsgArr[i].pCont, rpcMsgArr[i].contLen);
} }
return pMsg; return pMsg;
} }
*/
// block1: SOffsetAndContLen SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg) {
// block2: SOffsetAndContLen Array return (SOffsetAndContLen*)(pMsg->data);
// block3: entry Array
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) {
return NULL;
} }
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) { void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) {
@ -1772,16 +1762,12 @@ cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) {
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // <offset, contLen> int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // <offset, contLen>
int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount; // SRpcMsg int32_t entryArrayLen = pMsg->dataLen - metaArrayLen;
int32_t contArrayLen = pMsg->dataLen - metaArrayLen - rpcArrayLen;
cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen); cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen);
cJSON_AddNumberToObject(pRoot, "rpcArrayLen", rpcArrayLen); cJSON_AddNumberToObject(pRoot, "entryArrayLen", entryArrayLen);
cJSON_AddNumberToObject(pRoot, "contArrayLen", contArrayLen);
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
SRpcMsg* msgArr = (SRpcMsg*)(pMsg->data + metaArrayLen);
void* pData = (void*)(pMsg->data + metaArrayLen + rpcArrayLen);
cJSON* pMetaArr = cJSON_CreateArray(); cJSON* pMetaArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr); cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
@ -1792,14 +1778,12 @@ cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) {
cJSON_AddItemToArray(pMetaArr, pMeta); cJSON_AddItemToArray(pMetaArr, pMeta);
} }
cJSON* pMsgArr = cJSON_CreateArray(); cJSON* pEntryArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr); cJSON_AddItemToObject(pRoot, "entryArr", pEntryArr);
for (int i = 0; i < pMsg->dataCount; ++i) { for (int i = 0; i < pMsg->dataCount; ++i) {
cJSON* pRpcMsgJson = cJSON_CreateObject(); SSyncRaftEntry* pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
cJSON_AddNumberToObject(pRpcMsgJson, "code", msgArr[i].code); cJSON* pEntryJson = syncEntry2Json(pEntry);
cJSON_AddNumberToObject(pRpcMsgJson, "contLen", msgArr[i].contLen); cJSON_AddItemToArray(pEntryArr, pEntryJson);
cJSON_AddNumberToObject(pRpcMsgJson, "msgType", msgArr[i].msgType);
cJSON_AddItemToArray(pMsgArr, pRpcMsgJson);
} }
char* s; char* s;

View File

@ -3,6 +3,7 @@
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncUtil.h" #include "syncUtil.h"
#include "trpc.h" #include "trpc.h"
@ -15,31 +16,29 @@ void logTest() {
sFatal("--- sync log test: fatal"); sFatal("--- sync log test: fatal");
} }
/* SSyncRaftEntry *createEntry(int i) {
SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) { SSyncRaftEntry *pEntry = syncEntryBuild(20);
SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); assert(pEntry != NULL);
memset(pRpcMsg, 0, sizeof(SRpcMsg)); pEntry->msgType = 1;
pEntry->originalRpcType = 2;
pRpcMsg->msgType = TDMT_SYNC_PING; pEntry->seqNum = 3;
pRpcMsg->contLen = dataLen; pEntry->isWeak = true;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); pEntry->term = 100;
pRpcMsg->code = 10 * i; pEntry->index = 200;
snprintf((char *)pRpcMsg->pCont, pRpcMsg->contLen, "value_%d", i); snprintf(pEntry->data, pEntry->dataLen, "value_%d", i);
return pEntry;
return pRpcMsg;
} }
SyncAppendEntriesBatch *createMsg() { SyncAppendEntriesBatch *createMsg() {
SRpcMsg rpcMsgArr[5]; SSyncRaftEntry *entryPArr[5];
memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); memset(entryPArr, 0, sizeof(entryPArr));
for (int32_t i = 0; i < 5; ++i) { for (int32_t i = 0; i < 5; ++i) {
SRpcMsg *pRpcMsg = createRpcMsg(i, 20); SSyncRaftEntry *pEntry = createEntry(i);
rpcMsgArr[i] = *pRpcMsg; entryPArr[i] = pEntry;
taosMemoryFree(pRpcMsg);
} }
SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, 5, 1234); SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(entryPArr, 5, 1234);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100; pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
@ -53,21 +52,22 @@ SyncAppendEntriesBatch *createMsg() {
void test1() { void test1() {
SyncAppendEntriesBatch *pMsg = createMsg(); SyncAppendEntriesBatch *pMsg = createMsg();
syncAppendEntriesBatchLog2((char *)"test1:", pMsg); syncAppendEntriesBatchLog2((char *)"==test1==", pMsg);
SRpcMsg rpcMsgArr[5]; /*
int32_t retArrSize; SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg);
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, 5, &retArrSize); int32_t retArrSize = pMsg->dataCount;
for (int i = 0; i < retArrSize; ++i) { for (int i = 0; i < retArrSize; ++i) {
char logBuf[128]; SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
snprintf(logBuf, sizeof(logBuf), "==test1 decode rpc msg %d: msgType:%d, code:%d, contLen:%d, pCont:%s \n", i, ASSERT(pEntry->bytes == metaArr[i].contLen);
rpcMsgArr[i].msgType, rpcMsgArr[i].code, rpcMsgArr[i].contLen, (char *)rpcMsgArr[i].pCont); syncEntryPrint(pEntry);
sTrace("%s", logBuf);
} }
*/
syncAppendEntriesBatchDestroy(pMsg); syncAppendEntriesBatchDestroy(pMsg);
} }
/*
void test2() { void test2() {
SyncAppendEntries *pMsg = createMsg(); SyncAppendEntries *pMsg = createMsg();
uint32_t len = pMsg->bytes; uint32_t len = pMsg->bytes;
@ -126,8 +126,9 @@ int main() {
sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest(); logTest();
test1();
/* /*
test1();
test2(); test2();
test3(); test3();
test4(); test4();