diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5c539f0ef3..8a6d6b7722 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -215,6 +215,7 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, in bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); +int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 48550b890a..d59a0a64b3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -27,7 +27,7 @@ extern "C" { #define TAOS_CONN_SERVER 0 #define TAOS_CONN_CLIENT 1 -#define IsReq(pMsg) (pMsg->msgType & 1U) +#define IsReq(pMsg) (pMsg->msgType & 1U) extern int32_t tsRpcHeadSize; @@ -35,12 +35,13 @@ typedef struct { uint32_t clientIp; uint16_t clientPort; int64_t applyIndex; + uint64_t applyTerm; char user[TSDB_USER_LEN]; } SRpcConnInfo; typedef struct SRpcHandleInfo { // rpc info - void * handle; // rpc handle returned to app + void *handle; // rpc handle returned to app int64_t refId; // refid, used by server int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int32_t persistHandle; // persist handle or not @@ -53,7 +54,7 @@ typedef struct SRpcHandleInfo { void *node; // node mgmt handle // resp info - void * rsp; + void *rsp; int32_t rspLen; // conn info @@ -62,7 +63,7 @@ typedef struct SRpcHandleInfo { typedef struct SRpcMsg { tmsg_t msgType; - void * pCont; + void *pCont; int32_t contLen; int32_t code; SRpcHandleInfo info; @@ -74,7 +75,7 @@ typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType); typedef struct SRpcInit { char localFqdn[TSDB_FQDN_LEN]; uint16_t localPort; // local port - char * label; // for debug purpose + char *label; // for debug purpose int32_t numOfThreads; // number of threads to handle connections int32_t sessions; // number of sessions allowed int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS @@ -99,12 +100,12 @@ typedef struct { typedef struct { int32_t msgType; - void * val; + void *val; int32_t (*clone)(void *src, void **dst); } SRpcBrokenlinkVal; typedef struct { - SHashObj * args; + SHashObj *args; SRpcBrokenlinkVal brokenVal; void (*freeFunc)(const void *arg); } SRpcCtx; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index add8c6069a..399a5b6299 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -234,17 +234,14 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } #if 1 - char *syncNodeStr = sync2SimpleStr(pVnode->sync); - static int64_t vndTick = 0; - if (++vndTick % 10 == 1) { - vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); - } - if (gRaftDetailLog) { - char logBuf[512] = {0}; - snprintf(logBuf, sizeof(logBuf), "vnode process syncmsg, msgType:%d, syncNode:%s", pMsg->msgType, syncNodeStr); - syncRpcMsgLog2(logBuf, pMsg); - } - taosMemoryFree(syncNodeStr); + do { + char *syncNodeStr = sync2SimpleStr(pVnode->sync); + static int64_t vndTick = 0; + if (++vndTick % 10 == 1) { + vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); + } + taosMemoryFree(syncNodeStr); + } while (0); #endif if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { @@ -297,7 +294,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); code = -1; } - } else { + + } else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) { // use wal first strategy if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); @@ -403,43 +401,53 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon } static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SVnode *pVnode = pFsm->data; - SSnapshot snapshot = {0}; - SyncIndex beginIndex = SYNC_INDEX_INVALID; - char logBuf[256] = {0}; - - snprintf(logBuf, sizeof(logBuf), - "commitCb execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); + SVnode *pVnode = pFsm->data; + vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", + syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, + syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); rpcMsg.info.conn.applyIndex = cbMeta.index; + rpcMsg.info.conn.applyTerm = cbMeta.term; tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); } static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), "preCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); + SVnode *pVnode = pFsm->data; + vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", + syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, + syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); } static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), "rollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); - syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); + SVnode *pVnode = pFsm->data; + vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", + syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, + syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); } -static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; } +static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { + SVnode *pVnode = pFsm->data; + SSnapshotParam *pSnapshotParam = pParam; + int32_t code = + vnodeSnapshotReaderOpen(pVnode, (SVSnapshotReader **)ppReader, pSnapshotParam->start, pSnapshotParam->end); + return code; +} -static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } +static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { + SVnode *pVnode = pFsm->data; + int32_t code = vnodeSnapshotReaderClose(pReader); + return code; +} -static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; } +static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { + SVnode *pVnode = pFsm->data; + int32_t code = vnodeSnapshotRead(pReader, (const void **)ppBuf, len); + return code; +} static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { return 0; } diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index 086a6aa074..fead7cdc76 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -36,6 +36,7 @@ typedef struct SRaftCfg { TdFilePtr pFile; char path[TSDB_FILENAME_LEN * 2]; int8_t isStandBy; + int32_t batchSize; int8_t snapshotStrategy; SyncIndex lastConfigIndex; @@ -49,19 +50,20 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); -cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); -char *syncCfg2Str(SSyncCfg *pSyncCfg); -char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); +cJSON * syncCfg2Json(SSyncCfg *pSyncCfg); +char * syncCfg2Str(SSyncCfg *pSyncCfg); +char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); -cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); -char *raftCfg2Str(SRaftCfg *pRaftCfg); +cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); +char * raftCfg2Str(SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); typedef struct SRaftCfgMeta { int8_t isStandBy; + int32_t batchSize; int8_t snapshotStrategy; SyncIndex lastConfigIndex; } SRaftCfgMeta; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 3b1e4f4560..0dc67cf150 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshotParam snapshotParam; SSnapshot snapshot; SSyncCfg lastConfig; int64_t sendingMS; - SSyncNode *pSyncNode; + SSyncNode * pSyncNode; int32_t replicaIndex; SyncTerm term; SyncTerm privateTerm; @@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); -char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); +char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); //--------------------------------------------------- typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; SyncTerm privateTerm; SSnapshotParam snapshotParam; SSnapshot snapshot; SRaftId fromId; - SSyncNode *pSyncNode; + SSyncNode * pSyncNode; } SSyncSnapshotReceiver; @@ -88,8 +88,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); //--------------------------------------------------- // on message diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index c923ee3d1d..885ab1acae 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -834,7 +834,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc // // operation: // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry - // match my-commit-index or my-commit-index + 1 + // match my-commit-index or my-commit-index + batchSize do { bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && (pMsg->prevLogIndex <= ths->commitIndex); @@ -928,11 +928,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc bool condition = condition1 || condition2; if (condition) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), - "recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex, - pMsg->prevLogTerm, pMsg->dataLen); - syncNodeEventLog(ths, logBuf); + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), + "recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d", + pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen); + syncNodeEventLog(ths, logBuf); + } while (0); // prepare response msg SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index f3206e9ccc..e4724fc999 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -109,19 +109,30 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p } // only start once -static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, - SyncAppendEntriesReply* pMsg) { +static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, + SyncAppendEntriesReply* pMsg) { // get sender SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); ASSERT(pSender != NULL); + if (snapshotSenderIsStart(pSender)) { + do { + char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender already start"); + syncNodeErrorLog(ths, eventLog); + taosMemoryFree(eventLog); + } while (0); + + return; + } + SSnapshot snapshot = { .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; - void* pReader = NULL; SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex}; - ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); - if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { + int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); + ASSERT(code == 0); + + if (pMsg->privateTerm < pSender->privateTerm) { ASSERT(pReader != NULL); snapshotSenderStart(pSender, readerParam, snapshot, pReader); @@ -178,7 +189,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie // start snapshot SSnapshot oldSnapshot; ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); - syncNodeStartSnapshot(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, pMsg); + ASSERT(oldSnapshot.lastApplyIndex >= newMatchIndex + 1); + syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, + pMsg); // term maybe not ok? syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); @@ -187,7 +200,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie } else { SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); - // notice! int64, uint64 if (nextIndex > SYNC_INDEX_BEGIN) { --nextIndex; @@ -198,7 +210,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie SSyncRaftEntry* pEntry; int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry); ASSERT(code == 0); - syncNodeStartSnapshot(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg); + syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg); // get sender SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2192418c50..e207d89212 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -397,6 +397,38 @@ bool syncIsRestoreFinish(int64_t rid) { return b; } +int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) { + if (index < SYNC_INDEX_BEGIN) { + return -1; + } + + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return -1; + } + ASSERT(rid == pSyncNode->rid); + + SSyncRaftEntry* pEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry); + if (code != 0) { + if (pEntry != NULL) { + syncEntryDestory(pEntry); + } + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return -1; + } + ASSERT(pEntry != NULL); + + pSnapshot->data = NULL; + pSnapshot->lastApplyIndex = index; + pSnapshot->lastApplyTerm = pEntry->term; + pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index); + + syncEntryDestory(pEntry); + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return 0; +} + int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -786,6 +818,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { int32_t code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, &retIndex); if (code == 0) { pMsg->info.conn.applyIndex = retIndex; + pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm; rpcFreeCont(rpcMsg.pCont); syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); ret = 1; @@ -846,6 +879,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { meta.isStandBy = pSyncInfo->isStandBy; meta.snapshotStrategy = pSyncInfo->snapshotStrategy; meta.lastConfigIndex = SYNC_INDEX_INVALID; + meta.batchSize = pSyncInfo->batchSize; ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); ASSERT(ret == 0); diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 7eb7eb0db1..ead1168632 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { if (pSyncCfg != NULL) { int32_t len = 512; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -183,6 +183,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg))); cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy); cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy); + cJSON_AddNumberToObject(pRoot, "batchSize", pRaftCfg->batchSize); char buf64[128]; snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex); @@ -205,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -228,6 +229,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { SRaftCfg raftCfg; raftCfg.cfg = *pCfg; raftCfg.isStandBy = meta.isStandBy; + raftCfg.batchSize = meta.batchSize; raftCfg.snapshotStrategy = meta.snapshotStrategy; raftCfg.lastConfigIndex = meta.lastConfigIndex; raftCfg.configIndexCount = 1; @@ -257,6 +259,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy"); pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy); + cJSON *pJsonBatchSize = cJSON_GetObjectItem(pJson, "batchSize"); + pRaftCfg->batchSize = cJSON_GetNumberValue(pJsonBatchSize); + cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy"); pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy); @@ -280,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 990a92aad7..8dd1349edb 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -127,7 +127,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) { while (pStub) { size_t len; - void *key = taosHashGetKey(pStub, &len); + void * key = taosHashGetKey(pStub, &len); SyncIndex *pIndex = (SyncIndex *)key; int64_t nowMS = taosGetTimestampMs(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 5cdfec72c5..a33f66733b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -644,7 +644,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON *pTmp = pFromId; + cJSON * pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -677,14 +677,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp index 515d580b35..f2544d8fec 100644 --- a/source/libs/sync/test/syncAppendEntriesBatchTest.cpp +++ b/source/libs/sync/test/syncAppendEntriesBatchTest.cpp @@ -54,15 +54,15 @@ void test1() { SyncAppendEntriesBatch *pMsg = createMsg(); syncAppendEntriesBatchLog2((char *)"==test1==", pMsg); -/* - SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg); - int32_t retArrSize = pMsg->dataCount; - for (int i = 0; i < retArrSize; ++i) { - SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); - ASSERT(pEntry->bytes == metaArr[i].contLen); - syncEntryPrint(pEntry); - } -*/ + /* + SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg); + int32_t retArrSize = pMsg->dataCount; + for (int i = 0; i < retArrSize; ++i) { + SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); + ASSERT(pEntry->bytes == metaArr[i].contLen); + syncEntryPrint(pEntry); + } + */ syncAppendEntriesBatchDestroy(pMsg); } diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 968baff952..7cd97695f9 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -114,7 +114,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32 return 0; } -int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { +int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { *ppWriter = (void*)0xCDEF; char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter); diff --git a/source/libs/sync/test/syncIndexTest.cpp b/source/libs/sync/test/syncIndexTest.cpp index 8627a6c174..07a05d437e 100644 --- a/source/libs/sync/test/syncIndexTest.cpp +++ b/source/libs/sync/test/syncIndexTest.cpp @@ -8,11 +8,10 @@ void print(SHashObj *pNextIndex) { printf("----------------\n"); uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL); while (p) { - size_t len; - void* key = taosHashGetKey(p, &len); + void * key = taosHashGetKey(p, &len); - SRaftId *pRaftId = (SRaftId*)key; + SRaftId *pRaftId = (SRaftId *)key; printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p); p = (uint64_t *)taosHashIterate(pNextIndex, p); diff --git a/source/libs/sync/test/syncRaftCfgTest.cpp b/source/libs/sync/test/syncRaftCfgTest.cpp index a3773604fb..2823a7826b 100644 --- a/source/libs/sync/test/syncRaftCfgTest.cpp +++ b/source/libs/sync/test/syncRaftCfgTest.cpp @@ -26,6 +26,7 @@ SRaftCfg* createRaftCfg() { snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); } pCfg->isStandBy = taosGetTimestampSec() % 100; + pCfg->batchSize = taosGetTimestampSec() % 100; pCfg->configIndexCount = 5; for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) { @@ -84,6 +85,7 @@ void test3() { SRaftCfgMeta meta; meta.isStandBy = 7; meta.snapshotStrategy = 9; + meta.batchSize = 10; meta.lastConfigIndex = 789; raftCfgCreateFile(pCfg, meta, s); printf("%s create json file: %s \n", (char*)__FUNCTION__, s); @@ -109,6 +111,7 @@ void test5() { pCfg->cfg.myIndex = taosGetTimestampSec(); pCfg->isStandBy += 2; pCfg->snapshotStrategy += 3; + pCfg->batchSize += 4; pCfg->lastConfigIndex += 1000; pCfg->configIndexCount = 5; diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index fd18109280..93a7ce430f 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -74,7 +74,7 @@ void syncRespMgrGetAndDelTest(uint64_t i) { } SSyncNode *createSyncNode() { - SSyncNode *pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); + SSyncNode *pSyncNode = (SSyncNode *)taosMemoryMalloc(sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode)); return pSyncNode; } diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index b4bf08dd40..e5d93ddff4 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -29,7 +29,7 @@ int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } -int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { return 0; } +int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; } int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index c6d3a3e4af..2c08910aa8 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -111,7 +111,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32 return 0; } -int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { +int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { *ppWriter = (void*)0xCDEF; char logBuf[256] = {0}; @@ -314,18 +314,18 @@ int main(int argc, char** argv) { exit(-1); } - int32_t replicaNum = atoi(argv[1]); - int32_t myIndex = atoi(argv[2]); - ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]); - int32_t lastApplyIndex = atoi(argv[4]); - int32_t lastApplyTerm = atoi(argv[5]); - int32_t writeRecordNum = atoi(argv[6]); - bool isStandBy = atoi(argv[7]); - int32_t isConfigChange = atoi(argv[8]); - int32_t iterTimes = atoi(argv[9]); - int32_t finishLastApplyIndex = atoi(argv[10]); - int32_t finishLastApplyTerm = atoi(argv[11]); - int32_t leaderTransfer = atoi(argv[12]); + int32_t replicaNum = atoi(argv[1]); + int32_t myIndex = atoi(argv[2]); + ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]); + int32_t lastApplyIndex = atoi(argv[4]); + int32_t lastApplyTerm = atoi(argv[5]); + int32_t writeRecordNum = atoi(argv[6]); + bool isStandBy = atoi(argv[7]); + int32_t isConfigChange = atoi(argv[8]); + int32_t iterTimes = atoi(argv[9]); + int32_t finishLastApplyIndex = atoi(argv[10]); + int32_t finishLastApplyTerm = atoi(argv[11]); + int32_t leaderTransfer = atoi(argv[12]); sInfo( "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "