add sync io

This commit is contained in:
Minghao Li 2022-02-23 16:58:47 +08:00
parent c2a01bfd91
commit 7a47508c0e
2 changed files with 39 additions and 17 deletions

View File

@ -37,6 +37,11 @@ typedef struct SSyncIO {
pthread_t tid; pthread_t tid;
int8_t isStart; int8_t isStart;
SEpSet epSet;
void *syncTimer;
void *syncTimerManager;
int32_t (*start)(struct SSyncIO *ths); int32_t (*start)(struct SSyncIO *ths);
int32_t (*stop)(struct SSyncIO *ths); int32_t (*stop)(struct SSyncIO *ths);
int32_t (*ping)(struct SSyncIO *ths); int32_t (*ping)(struct SSyncIO *ths);
@ -45,7 +50,7 @@ typedef struct SSyncIO {
} SSyncIO; } SSyncIO;
SSyncIO * syncIOCreate(); SSyncIO *syncIOCreate();
static int32_t syncIOStart(SSyncIO *io); static int32_t syncIOStart(SSyncIO *io);
static int32_t syncIOStop(SSyncIO *io); static int32_t syncIOStop(SSyncIO *io);

View File

@ -17,8 +17,30 @@
#include <tep.h> #include <tep.h>
#include "syncOnMessage.h" #include "syncOnMessage.h"
#include "tglobal.h" #include "tglobal.h"
#include "ttimer.h"
#include "tutil.h" #include "tutil.h"
static void syncTick(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param;
sDebug("syncTick ... ");
SRpcMsg rpcMsg;
rpcMsg.pCont = rpcMallocCont(10);
snprintf(rpcMsg.pCont, 10, "TICK");
rpcMsg.contLen = 10;
rpcMsg.handle = io;
rpcMsg.msgType = 2;
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp);
io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager);
}
void *syncConsumer(void *param) { void *syncConsumer(void *param) {
SSyncIO *io = param; SSyncIO *io = param;
@ -58,6 +80,7 @@ void *syncConsumer(void *param) {
} }
taosFreeQall(qall); taosFreeQall(qall);
return NULL;
} }
static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
@ -68,17 +91,8 @@ static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encryp
} }
static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
/* sDebug("processResponse ... ");
// SInfo *pInfo = (SInfo *)pMsg->ahandle; rpcFreeCont(pMsg->pCont);
sDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen,
pMsg->code);
if (pEpSet) pInfo->epSet = *pEpSet;
rpcFreeCont(pMsg->pCont);
// tsem_post(&pInfo->rspSem);
tsem_post(&pInfo->rspSem);
*/
} }
static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
@ -158,6 +172,9 @@ static int32_t syncIOStart(SSyncIO *io) {
} }
} }
io->epSet.inUse = 0;
addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000);
// start consumer thread // start consumer thread
{ {
if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) {
@ -167,6 +184,10 @@ static int32_t syncIOStart(SSyncIO *io) {
} }
} }
// start tmr thread
io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager);
return 0; return 0;
} }
@ -185,11 +206,7 @@ static int32_t syncIOPing(SSyncIO *io) {
rpcMsg.handle = io; rpcMsg.handle = io;
rpcMsg.msgType = 1; rpcMsg.msgType = 1;
SEpSet epSet; rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL);
epSet.inUse = 0;
addEpIntoEpSet(&epSet, "127.0.0.1", 38000);
rpcSendRequest(io->clientRpc, &epSet, &rpcMsg, NULL);
return 0; return 0;
} }