diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 8b658c8776..97f2748420 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -392,9 +392,8 @@ typedef struct SyncAppendEntriesBatch { char data[]; // block1, block2 } SyncAppendEntriesBatch; -// SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId); - SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId); +SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg); void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg); void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen); void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 664f4c8b73..10e6231a70 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -15,6 +15,7 @@ #include "syncMessage.h" #include "syncRaftCfg.h" +#include "syncRaftEntry.h" #include "syncUtil.h" #include "tcoding.h" @@ -1600,22 +1601,20 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) { // block1: SOffsetAndContLen // block2: SOffsetAndContLen Array -// block3: SRpcMsg Array -// block4: SRpcMsg pCont Array +// block3: entry Array -/* -SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId) { - ASSERT(rpcMsgArr != NULL); +SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) { + ASSERT(entryPArr != NULL); ASSERT(arrSize > 0); int32_t dataLen = 0; int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // - int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; // SRpcMsg - int32_t contArrayLen = 0; + int32_t entryArrayLen = 0; for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont - contArrayLen += rpcMsgArr[i].contLen; + SSyncRaftEntry* pEntry = entryPArr[i]; + entryArrayLen += pEntry->bytes; } - dataLen += (metaArrayLen + rpcArrayLen + contArrayLen); + dataLen += (metaArrayLen + entryArrayLen); uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen; SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes); @@ -1627,37 +1626,28 @@ SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t pMsg->dataLen = dataLen; SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); - SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + metaArrayLen); char* pData = pMsg->data; for (int i = 0; i < arrSize; ++i) { - // init + // init meta if (i == 0) { - metaArr[i].offset = metaArrayLen + rpcArrayLen; - metaArr[i].contLen = rpcMsgArr[i].contLen; + metaArr[i].offset = metaArrayLen; + metaArr[i].contLen = entryPArr[i]->bytes; } else { metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen; - metaArr[i].contLen = rpcMsgArr[i].contLen; + metaArr[i].contLen = entryPArr[i]->bytes; } - // init msgArr - msgArr[i] = rpcMsgArr[i]; - - // init data - ASSERT(rpcMsgArr[i].contLen == metaArr[i].contLen); - memcpy(pData + metaArr[i].offset, rpcMsgArr[i].pCont, rpcMsgArr[i].contLen); + // init entry array + ASSERT(metaArr[i].contLen == entryPArr[i]->bytes); + memcpy(pData + metaArr[i].offset, entryPArr[i], metaArr[i].contLen); } return pMsg; } -*/ -// block1: SOffsetAndContLen -// block2: SOffsetAndContLen Array -// block3: entry Array - -SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) { - return NULL; +SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg) { + return (SOffsetAndContLen*)(pMsg->data); } void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) { @@ -1772,16 +1762,12 @@ cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) { cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // - int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount; // SRpcMsg - int32_t contArrayLen = pMsg->dataLen - metaArrayLen - rpcArrayLen; + int32_t entryArrayLen = pMsg->dataLen - metaArrayLen; cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen); - cJSON_AddNumberToObject(pRoot, "rpcArrayLen", rpcArrayLen); - cJSON_AddNumberToObject(pRoot, "contArrayLen", contArrayLen); + cJSON_AddNumberToObject(pRoot, "entryArrayLen", entryArrayLen); SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data); - SRpcMsg* msgArr = (SRpcMsg*)(pMsg->data + metaArrayLen); - void* pData = (void*)(pMsg->data + metaArrayLen + rpcArrayLen); cJSON* pMetaArr = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr); @@ -1792,14 +1778,12 @@ cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) { cJSON_AddItemToArray(pMetaArr, pMeta); } - cJSON* pMsgArr = cJSON_CreateArray(); - cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr); + cJSON* pEntryArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "entryArr", pEntryArr); for (int i = 0; i < pMsg->dataCount; ++i) { - cJSON* pRpcMsgJson = cJSON_CreateObject(); - cJSON_AddNumberToObject(pRpcMsgJson, "code", msgArr[i].code); - cJSON_AddNumberToObject(pRpcMsgJson, "contLen", msgArr[i].contLen); - cJSON_AddNumberToObject(pRpcMsgJson, "msgType", msgArr[i].msgType); - cJSON_AddItemToArray(pMsgArr, pRpcMsgJson); + SSyncRaftEntry* pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); + cJSON* pEntryJson = syncEntry2Json(pEntry); + cJSON_AddItemToArray(pEntryArr, pEntryJson); } char* s; diff --git a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp index b9000269e6..515d580b35 100644 --- a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp @@ -3,6 +3,7 @@ #include "syncIO.h" #include "syncInt.h" #include "syncMessage.h" +#include "syncRaftEntry.h" #include "syncUtil.h" #include "trpc.h" @@ -15,31 +16,29 @@ void logTest() { sFatal("--- sync log test: fatal"); } -/* -SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) { - SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg)); - memset(pRpcMsg, 0, sizeof(SRpcMsg)); - - pRpcMsg->msgType = TDMT_SYNC_PING; - pRpcMsg->contLen = dataLen; - pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); - pRpcMsg->code = 10 * i; - snprintf((char *)pRpcMsg->pCont, pRpcMsg->contLen, "value_%d", i); - - return pRpcMsg; +SSyncRaftEntry *createEntry(int i) { + SSyncRaftEntry *pEntry = syncEntryBuild(20); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100; + pEntry->index = 200; + snprintf(pEntry->data, pEntry->dataLen, "value_%d", i); + return pEntry; } SyncAppendEntriesBatch *createMsg() { - SRpcMsg rpcMsgArr[5]; - memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); + SSyncRaftEntry *entryPArr[5]; + memset(entryPArr, 0, sizeof(entryPArr)); for (int32_t i = 0; i < 5; ++i) { - SRpcMsg *pRpcMsg = createRpcMsg(i, 20); - rpcMsgArr[i] = *pRpcMsg; - taosMemoryFree(pRpcMsg); + SSyncRaftEntry *pEntry = createEntry(i); + entryPArr[i] = pEntry; } - SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, 5, 1234); + SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(entryPArr, 5, 1234); pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); pMsg->srcId.vgId = 100; pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); @@ -53,21 +52,22 @@ SyncAppendEntriesBatch *createMsg() { void test1() { SyncAppendEntriesBatch *pMsg = createMsg(); - syncAppendEntriesBatchLog2((char *)"test1:", pMsg); + syncAppendEntriesBatchLog2((char *)"==test1==", pMsg); - SRpcMsg rpcMsgArr[5]; - int32_t retArrSize; - syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, 5, &retArrSize); +/* + SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg); + int32_t retArrSize = pMsg->dataCount; for (int i = 0; i < retArrSize; ++i) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "==test1 decode rpc msg %d: msgType:%d, code:%d, contLen:%d, pCont:%s \n", i, - rpcMsgArr[i].msgType, rpcMsgArr[i].code, rpcMsgArr[i].contLen, (char *)rpcMsgArr[i].pCont); - sTrace("%s", logBuf); + SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); + ASSERT(pEntry->bytes == metaArr[i].contLen); + syncEntryPrint(pEntry); } +*/ syncAppendEntriesBatchDestroy(pMsg); } +/* void test2() { SyncAppendEntries *pMsg = createMsg(); uint32_t len = pMsg->bytes; @@ -126,8 +126,9 @@ int main() { sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; logTest(); + test1(); + /* - test1(); test2(); test3(); test4();