diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index c3679636e6..40ff79287b 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -42,7 +42,7 @@ typedef struct SSyncEnv { // tick timer tmr_h pEnvTickTimer; int32_t envTickTimerMS; - uint64_t envTickTimerLogicClock; + uint64_t envTickTimerLogicClock; // if use queue, should pass logic clock into queue item uint64_t envTickTimerLogicClockUser; TAOS_TMR_CALLBACK FpEnvTickTimer; // Timer Fp uint64_t envTickTimerCounter; diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 160fefd086..352d30c8d7 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -29,6 +29,9 @@ extern "C" { #include "tqueue.h" #include "trpc.h" +#define TICK_Q_TIMER_MS 1000 +#define TICK_Ping_TIMER_MS 1000 + typedef struct SSyncIO { STaosQueue *pMsgQ; STaosQset * pQset; @@ -38,9 +41,11 @@ typedef struct SSyncIO { void * clientRpc; SEpSet myAddr; - void *ioTimerTickQ; - void *ioTimerTickPing; - void *ioTimerManager; + tmr_h qTimer; + int32_t qTimerMS; + tmr_h pingTimer; + int32_t pingTimerMS; + tmr_h timerMgr; void *pSyncNode; int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); @@ -59,11 +64,14 @@ extern SSyncIO *gSyncIO; int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStop(); -int32_t syncIOTickQ(); -int32_t syncIOTickPing(); int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); +int32_t syncIOQTimerStart(); +int32_t syncIOQTimerStop(); +int32_t syncIOPingTimerStart(); +int32_t syncIOPingTimerStop(); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 2876577410..9bb5b6195e 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -39,6 +39,7 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, + SYNC_RESPONSE = 119, } ESyncMessageType; diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index 2830d2d4aa..dd7161800d 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -55,13 +55,19 @@ static void syncEnvTick(void *param, void *tmrId) { if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { ++(pSyncEnv->envTickTimerCounter); sTrace( - "syncEnvTick ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, " - "envTickTimerMS:%d", + "syncEnvTick do ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, " + "envTickTimerMS:%d, tmrId:%p", pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, - pSyncEnv->envTickTimerMS); + pSyncEnv->envTickTimerMS, tmrId); // do something, tick ... taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); + } else { + sTrace( + "syncEnvTick pass ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, " + "envTickTimerMS:%d, tmrId:%p", + pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, + pSyncEnv->envTickTimerMS, tmrId); } } @@ -88,15 +94,16 @@ static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) { int32_t ret = 0; - pSyncEnv->pEnvTickTimer = - taosTmrStart(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager); + taosTmrReset(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, + &pSyncEnv->pEnvTickTimer); atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser); return ret; } static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) { + int32_t ret = 0; atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1); taosTmrStop(pSyncEnv->pEnvTickTimer); pSyncEnv->pEnvTickTimer = NULL; - return 0; + return ret; } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index af97c4663c..c8745ca138 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -16,6 +16,7 @@ #include "syncIO.h" #include #include "syncMessage.h" +#include "syncUtil.h" #include "tglobal.h" #include "ttimer.h" #include "tutil.h" @@ -23,33 +24,36 @@ SSyncIO *gSyncIO = NULL; // local function ------------ -static int32_t syncIOStartInternal(SSyncIO *io); -static int32_t syncIOStopInternal(SSyncIO *io); static SSyncIO *syncIOCreate(char *host, uint16_t port); static int32_t syncIODestroy(SSyncIO *io); +static int32_t syncIOStartInternal(SSyncIO *io); +static int32_t syncIOStopInternal(SSyncIO *io); -static void *syncIOConsumerFunc(void *param); -static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); -static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void *syncIOConsumerFunc(void *param); +static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); -static int32_t syncIOTickQInternal(SSyncIO *io); -static void syncIOTickQFunc(void *param, void *tmrId); -static int32_t syncIOTickPingInternal(SSyncIO *io); -static void syncIOTickPingFunc(void *param, void *tmrId); +static int32_t syncIOStartQ(SSyncIO *io); +static int32_t syncIOStopQ(SSyncIO *io); +static int32_t syncIOStartPing(SSyncIO *io); +static int32_t syncIOStopPing(SSyncIO *io); +static void syncIOTickQ(void *param, void *tmrId); +static void syncIOTickPing(void *param, void *tmrId); // ---------------------------- // public function ------------ int32_t syncIOStart(char *host, uint16_t port) { + int32_t ret = 0; gSyncIO = syncIOCreate(host, port); assert(gSyncIO != NULL); taosSeedRand(taosGetTimestampSec()); - int32_t ret = syncIOStartInternal(gSyncIO); + ret = syncIOStartInternal(gSyncIO); assert(ret == 0); - sTrace("syncIOStart ok, gSyncIO:%p gSyncIO->clientRpc:%p", gSyncIO, gSyncIO->clientRpc); - return 0; + sTrace("syncIOStart ok, gSyncIO:%p", gSyncIO); + return ret; } int32_t syncIOStop() { @@ -61,37 +65,25 @@ int32_t syncIOStop() { return ret; } -int32_t syncIOTickQ() { - int32_t ret = syncIOTickQInternal(gSyncIO); - assert(ret == 0); - return ret; -} - -int32_t syncIOTickPing() { - int32_t ret = syncIOTickPingInternal(gSyncIO); - assert(ret == 0); - return ret; -} - int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { - sTrace( - "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " - "pMsg->msgType:%d, pMsg->contLen:%d", - clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, - pMsg->msgType, pMsg->contLen); - { - cJSON *pJson = syncRpcMsg2Json(pMsg); - char * serialized = cJSON_Print(pJson); - sTrace("process syncMessage send: pMsg:%s ", serialized); - free(serialized); - cJSON_Delete(pJson); - } + assert(pEpSet->inUse == 0); + assert(pEpSet->numOfEps == 1); + + int32_t ret = 0; + char logBuf[256]; + snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port); + syncRpcMsgPrint2(logBuf, pMsg); + pMsg->handle = NULL; rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); - return 0; + return ret; } int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { + int32_t ret = 0; + char logBuf[128]; + syncRpcMsgPrint2((char *)"==syncIOEqMsg==", pMsg); + SRpcMsg *pTemp; pTemp = taosAllocateQitem(sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); @@ -99,11 +91,75 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { STaosQueue *pMsgQ = queue; taosWriteQitem(pMsgQ, pTemp); - return 0; + return ret; +} + +int32_t syncIOQTimerStart() { + int32_t ret = syncIOStartQ(gSyncIO); + assert(ret == 0); + return ret; +} + +int32_t syncIOQTimerStop() { + int32_t ret = syncIOStopQ(gSyncIO); + assert(ret == 0); + return ret; +} + +int32_t syncIOPingTimerStart() { + int32_t ret = syncIOStartPing(gSyncIO); + assert(ret == 0); + return ret; +} + +int32_t syncIOPingTimerStop() { + int32_t ret = syncIOStopPing(gSyncIO); + assert(ret == 0); + return ret; } // local function ------------ +static SSyncIO *syncIOCreate(char *host, uint16_t port) { + SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); + memset(io, 0, sizeof(*io)); + + io->pMsgQ = taosOpenQueue(); + io->pQset = taosOpenQset(); + taosAddIntoQset(io->pQset, io->pMsgQ, NULL); + + io->myAddr.inUse = 0; + io->myAddr.numOfEps = 0; + addEpIntoEpSet(&io->myAddr, host, port); + + io->qTimerMS = TICK_Q_TIMER_MS; + io->pingTimerMS = TICK_Ping_TIMER_MS; + + return io; +} + +static int32_t syncIODestroy(SSyncIO *io) { + int32_t ret = 0; + int8_t start = atomic_load_8(&io->isStart); + assert(start == 0); + + if (io->serverRpc != NULL) { + rpcClose(io->serverRpc); + io->serverRpc = NULL; + } + + if (io->clientRpc != NULL) { + rpcClose(io->clientRpc); + io->clientRpc = NULL; + } + + taosCloseQueue(io->pMsgQ); + taosCloseQset(io->pQset); + + return ret; +} + static int32_t syncIOStartInternal(SSyncIO *io) { + int32_t ret = 0; taosBlockSIGPIPE(); rpcInit(); @@ -163,58 +219,24 @@ static int32_t syncIOStartInternal(SSyncIO *io) { } // start tmr thread - io->ioTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); + io->timerMgr = taosTmrInit(1000, 50, 10000, "SYNC-IO"); - return 0; + atomic_store_8(&io->isStart, 1); + return ret; } static int32_t syncIOStopInternal(SSyncIO *io) { + int32_t ret = 0; atomic_store_8(&io->isStart, 0); pthread_join(io->consumerTid, NULL); - return 0; -} - -static SSyncIO *syncIOCreate(char *host, uint16_t port) { - SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); - memset(io, 0, sizeof(*io)); - - io->pMsgQ = taosOpenQueue(); - io->pQset = taosOpenQset(); - taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - - io->myAddr.inUse = 0; - addEpIntoEpSet(&io->myAddr, host, port); - - return io; -} - -static int32_t syncIODestroy(SSyncIO *io) { - int8_t start = atomic_load_8(&io->isStart); - assert(start == 0); - - if (io->serverRpc != NULL) { - free(io->serverRpc); - io->serverRpc = NULL; - } - - if (io->clientRpc != NULL) { - free(io->clientRpc); - io->clientRpc = NULL; - } - - taosCloseQueue(io->pMsgQ); - taosCloseQset(io->pQset); - - return 0; + taosTmrCleanUp(io->timerMgr); + return ret; } static void *syncIOConsumerFunc(void *param) { - SSyncIO *io = param; - + SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; - int type; - + SRpcMsg *pRpcMsg, rpcMsg; qall = taosAllocateQall(); while (1) { @@ -226,77 +248,67 @@ static void *syncIOConsumerFunc(void *param) { for (int i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); + syncRpcMsgLog2((char *)"==syncIOConsumerFunc==", pRpcMsg); - char *s = syncRpcMsg2Str(pRpcMsg); - sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen, - s); - free(s); - + // use switch case instead of if else if (pRpcMsg->msgType == SYNC_PING) { if (io->FpOnSyncPing != NULL) { - SyncPing *pSyncMsg; + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + io->FpOnSyncPing(io->pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + /* pSyncMsg = syncPingBuild(pRpcMsg->contLen); syncPingFromRpcMsg(pRpcMsg, pSyncMsg); // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); io->FpOnSyncPing(io->pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); + */ } } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { if (io->FpOnSyncPingReply != NULL) { - SyncPingReply *pSyncMsg; - pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); - syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { if (io->FpOnSyncRequestVote != NULL) { - SyncRequestVote *pSyncMsg; - pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen); - syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg); + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { if (io->FpOnSyncRequestVoteReply != NULL) { - SyncRequestVoteReply *pSyncMsg; - pSyncMsg = syncRequestVoteReplyBuild(); - syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg); + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { if (io->FpOnSyncAppendEntries != NULL) { - SyncAppendEntries *pSyncMsg; - pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen); - syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg); + SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { if (io->FpOnSyncAppendEntriesReply != NULL) { - SyncAppendEntriesReply *pSyncMsg; - pSyncMsg = syncAppendEntriesReplyBuild(); - syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg); + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { if (io->FpOnSyncTimeout != NULL) { - SyncTimeout *pSyncMsg; - pSyncMsg = syncTimeoutBuild(); - syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg); + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); } } else { - ; + sTrace("unknown msgType:%d, no operator", pRpcMsg->msgType); } } @@ -306,15 +318,16 @@ static void *syncIOConsumerFunc(void *param) { rpcFreeCont(pRpcMsg->pCont); if (pRpcMsg->handle != NULL) { - int msgSize = 128; + int msgSize = 32; memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.msgType = SYNC_RESPONSE; rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply"); rpcMsg.handle = pRpcMsg->handle; rpcMsg.code = 0; - sTrace("syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d", pRpcMsg->msgType, rpcMsg.contLen); + syncRpcMsgPrint2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg); rpcSendResponse(&rpcMsg); } @@ -326,60 +339,102 @@ static void *syncIOConsumerFunc(void *param) { return NULL; } -static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { - // app shall retrieve the auth info based on meterID from DB or a data file - // demo code here only for simple demo - int ret = 0; - return ret; -} - static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - sTrace("<-- syncIOProcessRequest --> type:%d, contLen:%d, cont:%s", pMsg->msgType, pMsg->contLen, - (char *)pMsg->pCont); - + syncRpcMsgPrint2((char *)"==syncIOProcessRequest==", pMsg); SSyncIO *io = pParent; SRpcMsg *pTemp; - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - taosWriteQitem(io->pMsgQ, pTemp); } static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - sTrace("syncIOProcessReply: type:%d, contLen:%d msg:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont); + if (pMsg->msgType == SYNC_RESPONSE) { + sTrace("==syncIOProcessReply=="); + } else { + syncRpcMsgPrint2((char *)"==syncIOProcessReply==", pMsg); + } rpcFreeCont(pMsg->pCont); } -static int32_t syncIOTickQInternal(SSyncIO *io) { - io->ioTimerTickQ = taosTmrStart(syncIOTickQFunc, 1000, io, io->ioTimerManager); - return 0; +static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { + // app shall retrieve the auth info based on meterID from DB or a data file + // demo code here only for simple demo + int32_t ret = 0; + return ret; } -static void syncIOTickQFunc(void *param, void *tmrId) { +static int32_t syncIOStartQ(SSyncIO *io) { + int32_t ret = 0; + taosTmrReset(syncIOTickQ, io->qTimerMS, io, io->timerMgr, &io->qTimer); + return ret; +} + +static int32_t syncIOStopQ(SSyncIO *io) { + int32_t ret = 0; + taosTmrStop(io->qTimer); + io->qTimer = NULL; + return ret; +} + +static int32_t syncIOStartPing(SSyncIO *io) { + int32_t ret = 0; + taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer); + return ret; +} + +static int32_t syncIOStopPing(SSyncIO *io) { + int32_t ret = 0; + taosTmrStop(io->pingTimer); + io->pingTimer = NULL; + return ret; +} + +static void syncIOTickQ(void *param, void *tmrId) { SSyncIO *io = (SSyncIO *)param; - sTrace("<-- syncIOTickQFunc -->"); + + SRaftId srcId, destId; + srcId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port); + srcId.vgId = -1; + destId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port); + destId.vgId = -1; + SyncPingReply *pMsg = syncPingReplyBuild2(&srcId, &destId, "syncIOTickQ"); SRpcMsg rpcMsg; - rpcMsg.contLen = 64; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickQ"); - rpcMsg.handle = NULL; - rpcMsg.msgType = 55; - + syncPingReply2RpcMsg(pMsg, &rpcMsg); SRpcMsg *pTemp; pTemp = taosAllocateQitem(sizeof(SRpcMsg)); memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); - + syncRpcMsgPrint2((char *)"==syncIOTickQ==", &rpcMsg); taosWriteQitem(io->pMsgQ, pTemp); - taosTmrReset(syncIOTickQFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickQ); + syncPingReplyDestroy(pMsg); + + taosTmrReset(syncIOTickQ, io->qTimerMS, io, io->timerMgr, &io->qTimer); } -static int32_t syncIOTickPingInternal(SSyncIO *io) { - io->ioTimerTickPing = taosTmrStart(syncIOTickPingFunc, 1000, io, io->ioTimerManager); - return 0; +static void syncIOTickPing(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + + SRaftId srcId, destId; + srcId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port); + srcId.vgId = -1; + destId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port); + destId.vgId = -1; + SyncPing *pMsg = syncPingBuild2(&srcId, &destId, "syncIOTickPing"); + // SyncPing *pMsg = syncPingBuild3(&srcId, &destId); + + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + syncRpcMsgPrint2((char *)"==syncIOTickPing==", &rpcMsg); + rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL); + syncPingDestroy(pMsg); + + taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer); } +#if 0 + + static void syncIOTickPingFunc(void *param, void *tmrId) { SSyncIO *io = (SSyncIO *)param; sTrace("<-- syncIOTickPingFunc -->"); @@ -393,4 +448,5 @@ static void syncIOTickPingFunc(void *param, void *tmrId) { rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL); taosTmrReset(syncIOTickPingFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickPing); -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 5a55bbc11f..8d2ff6a9a5 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -65,10 +65,32 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { pRoot = syncAppendEntriesReply2Json(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == SYNC_RESPONSE) { + pRoot = cJSON_CreateObject(); + char* s; + s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen); + cJSON_AddStringToObject(pRoot, "pCont", s); + free(s); + s = syncUtilprintBin2((char*)(pRpcMsg->pCont), pRpcMsg->contLen); + cJSON_AddStringToObject(pRoot, "pCont2", s); + free(s); + } else { pRoot = syncRpcUnknownMsg2Json(); + char* s; + s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen); + cJSON_AddStringToObject(pRoot, "pCont", s); + free(s); + s = syncUtilprintBin2((char*)(pRpcMsg->pCont), pRpcMsg->contLen); + cJSON_AddStringToObject(pRoot, "pCont2", s); + free(s); } + cJSON_AddNumberToObject(pRoot, "msgType", pRpcMsg->msgType); + cJSON_AddNumberToObject(pRoot, "contLen", pRpcMsg->contLen); + cJSON_AddNumberToObject(pRoot, "code", pRpcMsg->code); + cJSON_AddNumberToObject(pRoot, "persist", pRpcMsg->persist); + cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "RpcMsg", pRoot); return pJson; @@ -77,7 +99,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* syncRpcUnknownMsg2Json() { cJSON* pRoot = cJSON_CreateObject(); cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN); - cJSON_AddStringToObject(pRoot, "data", "known message"); + cJSON_AddStringToObject(pRoot, "data", "unknown message"); cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SyncUnknown", pRoot); diff --git a/source/libs/sync/test/syncIOTickPingTest.cpp b/source/libs/sync/test/syncIOTickPingTest.cpp index 8be93e6fc0..9c2342828e 100644 --- a/source/libs/sync/test/syncIOTickPingTest.cpp +++ b/source/libs/sync/test/syncIOTickPingTest.cpp @@ -25,8 +25,15 @@ int main() { ret = syncIOStart((char*)"127.0.0.1", 7010); assert(ret == 0); - ret = syncIOTickPing(); - assert(ret == 0); + for (int i = 0; i < 3; ++i) { + ret = syncIOPingTimerStart(); + assert(ret == 0); + taosMsleep(5000); + + ret = syncIOPingTimerStop(); + assert(ret == 0); + taosMsleep(5000); + } while (1) { taosSsleep(1); diff --git a/source/libs/sync/test/syncIOTickQTest.cpp b/source/libs/sync/test/syncIOTickQTest.cpp index 76f5e33e82..64b65f25c8 100644 --- a/source/libs/sync/test/syncIOTickQTest.cpp +++ b/source/libs/sync/test/syncIOTickQTest.cpp @@ -25,11 +25,18 @@ int main() { ret = syncIOStart((char*)"127.0.0.1", 7010); assert(ret == 0); - ret = syncIOTickQ(); + for (int i = 0; i < 3; ++i) { + ret = syncIOQTimerStart(); + assert(ret == 0); + taosMsleep(5000); + + ret = syncIOQTimerStop(); + assert(ret == 0); + taosMsleep(5000); + } + + ret = syncIOStop(); assert(ret == 0); - while (1) { - taosSsleep(1); - } return 0; } diff --git a/source/libs/sync/test/syncUtilTest.cpp b/source/libs/sync/test/syncUtilTest.cpp index 9a1c113620..663db3a7b3 100644 --- a/source/libs/sync/test/syncUtilTest.cpp +++ b/source/libs/sync/test/syncUtilTest.cpp @@ -26,6 +26,7 @@ int main() { tsAsyncLog = 0; sDebugFlag = 143 + 64; logTest(); + electRandomMSTest(); return 0;