add sync io

This commit is contained in:
Minghao Li 2022-02-23 16:14:02 +08:00
parent 0e0af1de05
commit c2a01bfd91
2 changed files with 44 additions and 6 deletions

View File

@ -14,7 +14,10 @@
*/ */
#include "syncIO.h" #include "syncIO.h"
#include <tep.h>
#include "syncOnMessage.h" #include "syncOnMessage.h"
#include "tglobal.h"
#include "tutil.h"
void *syncConsumer(void *param) { void *syncConsumer(void *param) {
SSyncIO *io = param; SSyncIO *io = param;
@ -32,7 +35,7 @@ void *syncConsumer(void *param) {
for (int i = 0; i < numOfMsgs; ++i) { for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg); taosGetQitem(qall, (void **)&pRpcMsg);
sDebug("sync-io recv msg: %s", (char *)(pRpcMsg->pCont)); sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont));
} }
taosResetQitems(qall); taosResetQitems(qall);
@ -109,6 +112,8 @@ SSyncIO *syncIOCreate() {
static int32_t syncIOStart(SSyncIO *io) { static int32_t syncIOStart(SSyncIO *io) {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
tsRpcForceTcp = 1;
// cient rpc init // cient rpc init
{ {
SRpcInit rpcInit; SRpcInit rpcInit;
@ -122,7 +127,7 @@ static int32_t syncIOStart(SSyncIO *io) {
rpcInit.user = "sync-io"; rpcInit.user = "sync-io";
rpcInit.secret = "sync-io"; rpcInit.secret = "sync-io";
rpcInit.ckey = "key"; rpcInit.ckey = "key";
rpcInit.spi = 1; rpcInit.spi = 0;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
io->clientRpc = rpcOpen(&rpcInit); io->clientRpc = rpcOpen(&rpcInit);
@ -155,7 +160,7 @@ static int32_t syncIOStart(SSyncIO *io) {
// start consumer thread // start consumer thread
{ {
if (pthread_create(&io->tid, NULL, syncConsumer, NULL) != 0) { if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) {
sError("failed to create sync consumer thread since %s", strerror(errno)); sError("failed to create sync consumer thread since %s", strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -171,7 +176,23 @@ static int32_t syncIOStop(SSyncIO *io) {
return 0; return 0;
} }
static int32_t syncIOPing(SSyncIO *io) { return 0; } static int32_t syncIOPing(SSyncIO *io) {
SRpcMsg rpcMsg, rspMsg;
rpcMsg.pCont = rpcMallocCont(10);
snprintf(rpcMsg.pCont, 10, "ping");
rpcMsg.contLen = 10;
rpcMsg.handle = io;
rpcMsg.msgType = 1;
SEpSet epSet;
epSet.inUse = 0;
addEpIntoEpSet(&epSet, "127.0.0.1", 38000);
rpcSendRequest(io->clientRpc, &epSet, &rpcMsg, NULL);
return 0;
}
static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } static int32_t syncIOOnMessage(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; }

View File

@ -2,9 +2,19 @@
#include "syncIO.h" #include "syncIO.h"
#include "syncInt.h" #include "syncInt.h"
void *pingFunc(void *param) {
SSyncIO *io = (SSyncIO *)param;
while (1) {
sDebug("io->ping");
io->ping(io);
sleep(1);
}
return NULL;
}
int main() { int main() {
tsAsyncLog = 0; tsAsyncLog = 0;
taosInitLog((char*)"syncTest.log", 100000, 10); taosInitLog((char *)"syncTest.log", 100000, 10);
sDebug("sync test"); sDebug("sync test");
syncStartEnv(); syncStartEnv();
@ -12,8 +22,15 @@ int main() {
SSyncIO *syncIO = syncIOCreate(); SSyncIO *syncIO = syncIOCreate();
assert(syncIO != NULL); assert(syncIO != NULL);
syncIO->start(syncIO);
sleep(2);
pthread_t tid;
pthread_create(&tid, NULL, pingFunc, syncIO);
while (1) { while (1) {
sleep(3); sleep(1);
} }
return 0; return 0;
} }