From c2a01bfd91e53b659c580bcca833dd10d4b580c2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 23 Feb 2022 16:14:02 +0800 Subject: [PATCH] add sync io --- source/libs/sync/src/syncIO.c | 29 +++++++++++++++++++++++++---- source/libs/sync/test/syncTest.cpp | 21 +++++++++++++++++++-- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index b20367cf56..023836f74f 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -14,7 +14,10 @@ */ #include "syncIO.h" +#include #include "syncOnMessage.h" +#include "tglobal.h" +#include "tutil.h" void *syncConsumer(void *param) { SSyncIO *io = param; @@ -32,7 +35,7 @@ void *syncConsumer(void *param) { for (int i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); - sDebug("sync-io recv msg: %s", (char *)(pRpcMsg->pCont)); + sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont)); } taosResetQitems(qall); @@ -109,6 +112,8 @@ SSyncIO *syncIOCreate() { static int32_t syncIOStart(SSyncIO *io) { taosBlockSIGPIPE(); + tsRpcForceTcp = 1; + // cient rpc init { SRpcInit rpcInit; @@ -122,7 +127,7 @@ static int32_t syncIOStart(SSyncIO *io) { rpcInit.user = "sync-io"; rpcInit.secret = "sync-io"; rpcInit.ckey = "key"; - rpcInit.spi = 1; + rpcInit.spi = 0; rpcInit.connType = TAOS_CONN_CLIENT; io->clientRpc = rpcOpen(&rpcInit); @@ -155,7 +160,7 @@ static int32_t syncIOStart(SSyncIO *io) { // start consumer thread { - if (pthread_create(&io->tid, NULL, syncConsumer, NULL) != 0) { + if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { sError("failed to create sync consumer thread since %s", strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -171,7 +176,23 @@ static int32_t syncIOStop(SSyncIO *io) { return 0; } -static int32_t syncIOPing(SSyncIO *io) { return 0; } +static int32_t syncIOPing(SSyncIO *io) { + SRpcMsg rpcMsg, rspMsg; + + rpcMsg.pCont = rpcMallocCont(10); + snprintf(rpcMsg.pCont, 10, "ping"); + rpcMsg.contLen = 10; + rpcMsg.handle = io; + rpcMsg.msgType = 1; + + SEpSet epSet; + epSet.inUse = 0; + addEpIntoEpSet(&epSet, "127.0.0.1", 38000); + + rpcSendRequest(io->clientRpc, &epSet, &rpcMsg, NULL); + + return 0; +} static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index cba196db27..955beea693 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -2,9 +2,19 @@ #include "syncIO.h" #include "syncInt.h" +void *pingFunc(void *param) { + SSyncIO *io = (SSyncIO *)param; + while (1) { + sDebug("io->ping"); + io->ping(io); + sleep(1); + } + return NULL; +} + int main() { tsAsyncLog = 0; - taosInitLog((char*)"syncTest.log", 100000, 10); + taosInitLog((char *)"syncTest.log", 100000, 10); sDebug("sync test"); syncStartEnv(); @@ -12,8 +22,15 @@ int main() { SSyncIO *syncIO = syncIOCreate(); assert(syncIO != NULL); + syncIO->start(syncIO); + + sleep(2); + + pthread_t tid; + pthread_create(&tid, NULL, pingFunc, syncIO); + while (1) { - sleep(3); + sleep(1); } return 0; }