From 5a4fbcf2621f54aec1fd4fd9db2429c3c32228bb Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 2 Mar 2022 15:11:54 +0800 Subject: [PATCH] sync io --- source/libs/sync/inc/syncIO.h | 38 +-- source/libs/sync/src/syncEnv.c | 1 - source/libs/sync/src/syncIO.c | 327 +++++++++++++---------- source/libs/sync/test/CMakeLists.txt | 28 ++ source/libs/sync/test/syncIOTickPing.cpp | 35 +++ source/libs/sync/test/syncIOTickQ.cpp | 35 +++ source/libs/sync/test/syncPingTest.cpp | 20 +- source/libs/sync/test/syncTest.cpp | 2 +- 8 files changed, 310 insertions(+), 176 deletions(-) create mode 100644 source/libs/sync/test/syncIOTickPing.cpp create mode 100644 source/libs/sync/test/syncIOTickQ.cpp diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 6723fa66c5..e5c55baff4 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -30,35 +30,37 @@ extern "C" { #include "trpc.h" typedef struct SSyncIO { - void * serverRpc; - void * clientRpc; STaosQueue *pMsgQ; - STaosQset * pQset; - pthread_t tid; - int8_t isStart; + STaosQset *pQset; + pthread_t consumerTid; - SEpSet epSet; + void *serverRpc; + void *clientRpc; + SEpSet myAddr; - void *syncTimer; - void *syncTimerManager; - - int32_t (*start)(struct SSyncIO *ths); - int32_t (*stop)(struct SSyncIO *ths); - int32_t (*ping)(struct SSyncIO *ths); - - int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); - int32_t (*destroy)(struct SSyncIO *ths); + void *ioTimerTickQ; + void *ioTimerTickPing; + void *ioTimerManager; void *pSyncNode; - int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg); + int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); + int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg); + int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg); + int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); + int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); + int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); + + int8_t isStart; } SSyncIO; extern SSyncIO *gSyncIO; -int32_t syncIOStart(); +int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStop(); -int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOTickQ(); +int32_t syncIOTickPing(); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index bfdf2a9c5e..a9cf035650 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -76,4 +76,3 @@ static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int m } static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer) {} -// -------------------------------- \ No newline at end of file diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 27434a4029..932c27d64a 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -23,146 +23,66 @@ SSyncIO *gSyncIO = NULL; // local function ------------ -static int32_t doSyncIOStart(SSyncIO *io); -static int32_t doSyncIOStop(SSyncIO *io); -static int32_t doSyncIOPing(SSyncIO *io); -static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t doSyncIODestroy(SSyncIO *io); +static int32_t syncIOStartInternal(SSyncIO *io); +static int32_t syncIOStopInternal(SSyncIO *io); +static SSyncIO *syncIOCreate(char *host, uint16_t port); +static int32_t syncIODestroy(SSyncIO *io); -static SSyncIO *syncIOCreate(); -static void *syncIOConsumer(void *param); -static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); -static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static void syncIOTick(void *param, void *tmrId); +static void *syncIOConsumerFunc(void *param); +static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); +static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); + +static int32_t syncIOTickQInternal(SSyncIO *io); +static void syncIOTickQFunc(void *param, void *tmrId); +static int32_t syncIOTickPingInternal(SSyncIO *io); +static void syncIOTickPingFunc(void *param, void *tmrId); // ---------------------------- -int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { +// public function ------------ +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { sTrace("syncIOSendMsg ... "); pMsg->handle = NULL; - rpcSendRequest(handle, pEpSet, pMsg, NULL); + rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); return 0; } -int32_t syncIOStart() { - gSyncIO = syncIOCreate(); +int32_t syncIOStart(char *host, uint16_t port) { + gSyncIO = syncIOCreate(host, port); assert(gSyncIO != NULL); - int32_t ret = doSyncIOStart(gSyncIO); + int32_t ret = syncIOStartInternal(gSyncIO); assert(ret == 0); return 0; } -int32_t syncIOStop() { return 0; } +int32_t syncIOStop() { + int32_t ret = syncIOStopInternal(gSyncIO); + assert(ret == 0); -// local function ------------ -static void syncIOTick(void *param, void *tmrId) { - SSyncIO *io = (SSyncIO *)param; - sDebug("syncIOTick ... "); - - SRpcMsg rpcMsg; - rpcMsg.pCont = rpcMallocCont(10); - snprintf(rpcMsg.pCont, 10, "TICK"); - rpcMsg.contLen = 10; - rpcMsg.handle = NULL; - rpcMsg.msgType = 2; - - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); - - taosWriteQitem(io->pMsgQ, pTemp); - - taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, &io->syncTimer); -} - -static void *syncIOConsumer(void *param) { - SSyncIO *io = param; - - STaosQall *qall; - SRpcMsg *pRpcMsg, rpcMsg; - int type; - - qall = taosAllocateQall(); - - while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); - sDebug("%d sync-io msgs are received", numOfMsgs); - if (numOfMsgs <= 0) break; - - for (int i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont)); - } - - taosResetQitems(qall); - for (int i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - rpcFreeCont(pRpcMsg->pCont); - - if (pRpcMsg->handle != NULL) { - int msgSize = 128; - memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.pCont = rpcMallocCont(msgSize); - rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 0; - rpcSendResponse(&rpcMsg); - } - - taosFreeQitem(pRpcMsg); - } - } - - taosFreeQall(qall); - return NULL; -} - -static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { - // app shall retrieve the auth info based on meterID from DB or a data file - // demo code here only for simple demo - int ret = 0; + ret = syncIODestroy(gSyncIO); + assert(ret == 0); return ret; } -static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - sDebug("syncIODoReply ... "); - rpcFreeCont(pMsg->pCont); +int32_t syncIOTickQ() { + int32_t ret = syncIOTickQInternal(gSyncIO); + assert(ret == 0); + return ret; } -static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SSyncIO *io = pParent; - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - - sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); - taosWriteQitem(io->pMsgQ, pTemp); +int32_t syncIOTickPing() { + int32_t ret = syncIOTickPingInternal(gSyncIO); + assert(ret == 0); + return ret; } -static SSyncIO *syncIOCreate() { - SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); - memset(io, 0, sizeof(*io)); - - io->pMsgQ = taosOpenQueue(); - io->pQset = taosOpenQset(); - taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - - io->start = doSyncIOStart; - io->stop = doSyncIOStop; - io->ping = doSyncIOPing; - io->onMsg = doSyncIOOnMsg; - io->destroy = doSyncIODestroy; - - return io; -} - -static int32_t doSyncIOStart(SSyncIO *io) { +// local function ------------ +static int32_t syncIOStartInternal(SSyncIO *io) { taosBlockSIGPIPE(); + rpcInit(); tsRpcForceTcp = 1; // cient rpc init @@ -172,7 +92,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { rpcInit.localPort = 0; rpcInit.label = "SYNC-IO-CLIENT"; rpcInit.numOfThreads = 1; - rpcInit.cfp = syncIODoReply; + rpcInit.cfp = syncIOProcessReply; rpcInit.sessions = 100; rpcInit.idleTime = 100; rpcInit.user = "sync-io"; @@ -195,7 +115,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { rpcInit.localPort = 7010; rpcInit.label = "SYNC-IO-SERVER"; rpcInit.numOfThreads = 1; - rpcInit.cfp = syncIODoRequest; + rpcInit.cfp = syncIOProcessRequest; rpcInit.sessions = 1000; rpcInit.idleTime = 2 * 1500; rpcInit.afp = syncIOAuth; @@ -209,12 +129,9 @@ static int32_t doSyncIOStart(SSyncIO *io) { } } - io->epSet.inUse = 0; - addEpIntoEpSet(&io->epSet, "127.0.0.1", 7010); - // start consumer thread { - if (pthread_create(&io->tid, NULL, syncIOConsumer, io) != 0) { + if (pthread_create(&io->consumerTid, NULL, syncIOConsumerFunc, io) != 0) { sError("failed to create sync consumer thread since %s", strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -222,35 +139,32 @@ static int32_t doSyncIOStart(SSyncIO *io) { } // start tmr thread - io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); - io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager); + io->ioTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); return 0; } -static int32_t doSyncIOStop(SSyncIO *io) { +static int32_t syncIOStopInternal(SSyncIO *io) { atomic_store_8(&io->isStart, 0); - pthread_join(io->tid, NULL); + pthread_join(io->consumerTid, NULL); return 0; } -static int32_t doSyncIOPing(SSyncIO *io) { - SRpcMsg rpcMsg, rspMsg; +static SSyncIO *syncIOCreate(char *host, uint16_t port) { + SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); + memset(io, 0, sizeof(*io)); - rpcMsg.pCont = rpcMallocCont(10); - snprintf(rpcMsg.pCont, 10, "ping"); - rpcMsg.contLen = 10; - rpcMsg.handle = NULL; - rpcMsg.msgType = 1; + io->pMsgQ = taosOpenQueue(); + io->pQset = taosOpenQset(); + taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL); + io->myAddr.inUse = 0; + addEpIntoEpSet(&io->myAddr, host, port); - return 0; + return io; } -static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } - -static int32_t doSyncIODestroy(SSyncIO *io) { +static int32_t syncIODestroy(SSyncIO *io) { int8_t start = atomic_load_8(&io->isStart); assert(start == 0); @@ -264,15 +178,136 @@ static int32_t doSyncIODestroy(SSyncIO *io) { io->clientRpc = NULL; } - if (io->pMsgQ != NULL) { - free(io->pMsgQ); - io->pMsgQ = NULL; - } - - if (io->pQset != NULL) { - free(io->pQset); - io->pQset = NULL; - } + taosCloseQueue(io->pMsgQ); + taosCloseQset(io->pQset); return 0; } + +static void *syncIOConsumerFunc(void *param) { + SSyncIO *io = param; + + STaosQall *qall; + SRpcMsg *pRpcMsg, rpcMsg; + int type; + + qall = taosAllocateQall(); + + while (1) { + int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); + sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs); + if (numOfMsgs <= 0) break; + + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen, + (char *)(pRpcMsg->pCont)); + + if (pRpcMsg->msgType == SYNC_PING) { + if (io->FpOnSyncPing != NULL) { + SyncPing *pSyncMsg = syncPingBuild(pRpcMsg->contLen); + syncPingFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncPing(io->pSyncNode, pSyncMsg); + } + } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); + syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + } else { + ; + } + } + + taosResetQitems(qall); + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + rpcFreeCont(pRpcMsg->pCont); + + if (pRpcMsg->handle != NULL) { + int msgSize = 128; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply"); + rpcMsg.handle = pRpcMsg->handle; + rpcMsg.code = 0; + + sTrace("syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d", pRpcMsg->msgType, rpcMsg.contLen); + rpcSendResponse(&rpcMsg); + } + + taosFreeQitem(pRpcMsg); + } + } + + taosFreeQall(qall); + return NULL; +} + +static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { + // app shall retrieve the auth info based on meterID from DB or a data file + // demo code here only for simple demo + int ret = 0; + return ret; +} + +static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sTrace("syncIOProcessRequest: type:%d, contLen:%d, cont:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont); + + SSyncIO *io = pParent; + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); +} + +static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sTrace("syncIOProcessReply: type:%d, contLen:%d msg:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont); + rpcFreeCont(pMsg->pCont); +} + +static int32_t syncIOTickQInternal(SSyncIO *io) { + io->ioTimerTickQ = taosTmrStart(syncIOTickQFunc, 1000, io, io->ioTimerManager); + return 0; +} + +static void syncIOTickQFunc(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sTrace("<-- syncIOTickQFunc -->"); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickQ"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 55; + + SRpcMsg *pTemp; + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); + taosTmrReset(syncIOTickQFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickQ); +} + +static int32_t syncIOTickPingInternal(SSyncIO *io) { + io->ioTimerTickPing = taosTmrStart(syncIOTickPingFunc, 1000, io, io->ioTimerManager); + return 0; +} + +static void syncIOTickPingFunc(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sTrace("<-- syncIOTickPingFunc -->"); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickPing"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 77; + + rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL); + taosTmrReset(syncIOTickPingFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickPing); +} diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 8e1c99d179..9abe3f232e 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -2,6 +2,8 @@ add_executable(syncTest "") add_executable(syncEnvTest "") add_executable(syncPingTest "") add_executable(syncEncodeTest "") +add_executable(syncIOTickQ "") +add_executable(syncIOTickPing "") target_sources(syncTest @@ -20,6 +22,14 @@ target_sources(syncEncodeTest PRIVATE "syncEncodeTest.cpp" ) +target_sources(syncIOTickQ + PRIVATE + "syncIOTickQ.cpp" +) +target_sources(syncIOTickPing + PRIVATE + "syncIOTickPing.cpp" +) target_include_directories(syncTest @@ -42,6 +52,16 @@ target_include_directories(syncEncodeTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncIOTickQ + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOTickPing + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -60,6 +80,14 @@ target_link_libraries(syncEncodeTest sync gtest_main ) +target_link_libraries(syncIOTickQ + sync + gtest_main +) +target_link_libraries(syncIOTickPing + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncIOTickPing.cpp b/source/libs/sync/test/syncIOTickPing.cpp new file mode 100644 index 0000000000..1559b57585 --- /dev/null +++ b/source/libs/sync/test/syncIOTickPing.cpp @@ -0,0 +1,35 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char*)"127.0.0.1", 7010); + assert(ret == 0); + + ret = syncIOTickPing(); + assert(ret == 0); + + while (1) { + sleep(1); + } + return 0; +} diff --git a/source/libs/sync/test/syncIOTickQ.cpp b/source/libs/sync/test/syncIOTickQ.cpp new file mode 100644 index 0000000000..90304079e3 --- /dev/null +++ b/source/libs/sync/test/syncIOTickQ.cpp @@ -0,0 +1,35 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char*)"127.0.0.1", 7010); + assert(ret == 0); + + ret = syncIOTickQ(); + assert(ret == 0); + + while (1) { + sleep(1); + } + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index b57f6c5259..bca634765e 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -39,7 +39,7 @@ SSyncNode* doSync() { SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; @@ -57,23 +57,23 @@ int main() { logTest(); - int32_t ret = syncIOStart(); + int32_t ret = syncIOStart((char*)"127.0.0.1", 7010); assert(ret == 0); ret = syncEnvStart(); assert(ret == 0); -/* - SSyncNode* pSyncNode = doSync(); + /* + SSyncNode* pSyncNode = doSync(); - ret = syncNodeStartPingTimer(pSyncNode); - assert(ret == 0); + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); - taosMsleep(5000); + taosMsleep(5000); - ret = syncNodeStopPingTimer(pSyncNode); - assert(ret == 0); -*/ + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); + */ while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 0843a48bb8..0b397ef921 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -8,7 +8,7 @@ void *pingFunc(void *param) { SSyncIO *io = (SSyncIO *)param; while (1) { sDebug("io->ping"); - io->ping(io); + // io->ping(io); sleep(1); } return NULL;