sync refactor

This commit is contained in:
Minghao Li 2022-03-12 17:02:15 +08:00
parent be532bd5eb
commit 81be154e0d
9 changed files with 266 additions and 157 deletions

View File

@ -42,7 +42,7 @@ typedef struct SSyncEnv {
// tick timer // tick timer
tmr_h pEnvTickTimer; tmr_h pEnvTickTimer;
int32_t envTickTimerMS; int32_t envTickTimerMS;
uint64_t envTickTimerLogicClock; uint64_t envTickTimerLogicClock; // if use queue, should pass logic clock into queue item
uint64_t envTickTimerLogicClockUser; uint64_t envTickTimerLogicClockUser;
TAOS_TMR_CALLBACK FpEnvTickTimer; // Timer Fp TAOS_TMR_CALLBACK FpEnvTickTimer; // Timer Fp
uint64_t envTickTimerCounter; uint64_t envTickTimerCounter;

View File

@ -29,6 +29,9 @@ extern "C" {
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#define TICK_Q_TIMER_MS 1000
#define TICK_Ping_TIMER_MS 1000
typedef struct SSyncIO { typedef struct SSyncIO {
STaosQueue *pMsgQ; STaosQueue *pMsgQ;
STaosQset * pQset; STaosQset * pQset;
@ -38,9 +41,11 @@ typedef struct SSyncIO {
void * clientRpc; void * clientRpc;
SEpSet myAddr; SEpSet myAddr;
void *ioTimerTickQ; tmr_h qTimer;
void *ioTimerTickPing; int32_t qTimerMS;
void *ioTimerManager; tmr_h pingTimer;
int32_t pingTimerMS;
tmr_h timerMgr;
void *pSyncNode; void *pSyncNode;
int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg);
@ -59,11 +64,14 @@ extern SSyncIO *gSyncIO;
int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStart(char *host, uint16_t port);
int32_t syncIOStop(); int32_t syncIOStop();
int32_t syncIOTickQ();
int32_t syncIOTickPing();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg);
int32_t syncIOQTimerStart();
int32_t syncIOQTimerStop();
int32_t syncIOPingTimerStart();
int32_t syncIOPingTimerStop();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -39,6 +39,7 @@ typedef enum ESyncMessageType {
SYNC_REQUEST_VOTE_REPLY = 111, SYNC_REQUEST_VOTE_REPLY = 111,
SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES = 113,
SYNC_APPEND_ENTRIES_REPLY = 115, SYNC_APPEND_ENTRIES_REPLY = 115,
SYNC_RESPONSE = 119,
} ESyncMessageType; } ESyncMessageType;

View File

@ -55,13 +55,19 @@ static void syncEnvTick(void *param, void *tmrId) {
if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) { if (atomic_load_64(&pSyncEnv->envTickTimerLogicClockUser) <= atomic_load_64(&pSyncEnv->envTickTimerLogicClock)) {
++(pSyncEnv->envTickTimerCounter); ++(pSyncEnv->envTickTimerCounter);
sTrace( sTrace(
"syncEnvTick ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, " "syncEnvTick do ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, "
"envTickTimerMS:%d", "envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter, pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS); pSyncEnv->envTickTimerMS, tmrId);
// do something, tick ... // do something, tick ...
taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer); taosTmrReset(syncEnvTick, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager, &pSyncEnv->pEnvTickTimer);
} else {
sTrace(
"syncEnvTick pass ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, "
"envTickTimerMS:%d, tmrId:%p",
pSyncEnv->envTickTimerLogicClockUser, pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerCounter,
pSyncEnv->envTickTimerMS, tmrId);
} }
} }
@ -88,15 +94,16 @@ static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) { static int32_t doSyncEnvStartTimer(SSyncEnv *pSyncEnv) {
int32_t ret = 0; int32_t ret = 0;
pSyncEnv->pEnvTickTimer = taosTmrReset(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager,
taosTmrStart(pSyncEnv->FpEnvTickTimer, pSyncEnv->envTickTimerMS, pSyncEnv, pSyncEnv->pTimerManager); &pSyncEnv->pEnvTickTimer);
atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser); atomic_store_64(&pSyncEnv->envTickTimerLogicClock, pSyncEnv->envTickTimerLogicClockUser);
return ret; return ret;
} }
static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) { static int32_t doSyncEnvStopTimer(SSyncEnv *pSyncEnv) {
int32_t ret = 0;
atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1); atomic_add_fetch_64(&pSyncEnv->envTickTimerLogicClockUser, 1);
taosTmrStop(pSyncEnv->pEnvTickTimer); taosTmrStop(pSyncEnv->pEnvTickTimer);
pSyncEnv->pEnvTickTimer = NULL; pSyncEnv->pEnvTickTimer = NULL;
return 0; return ret;
} }

View File

@ -16,6 +16,7 @@
#include "syncIO.h" #include "syncIO.h"
#include <tdatablock.h> #include <tdatablock.h>
#include "syncMessage.h" #include "syncMessage.h"
#include "syncUtil.h"
#include "tglobal.h" #include "tglobal.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
@ -23,33 +24,36 @@
SSyncIO *gSyncIO = NULL; SSyncIO *gSyncIO = NULL;
// local function ------------ // local function ------------
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static SSyncIO *syncIOCreate(char *host, uint16_t port); static SSyncIO *syncIOCreate(char *host, uint16_t port);
static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io);
static void *syncIOConsumerFunc(void *param); static void *syncIOConsumerFunc(void *param);
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
static int32_t syncIOTickQInternal(SSyncIO *io); static int32_t syncIOStartQ(SSyncIO *io);
static void syncIOTickQFunc(void *param, void *tmrId); static int32_t syncIOStopQ(SSyncIO *io);
static int32_t syncIOTickPingInternal(SSyncIO *io); static int32_t syncIOStartPing(SSyncIO *io);
static void syncIOTickPingFunc(void *param, void *tmrId); static int32_t syncIOStopPing(SSyncIO *io);
static void syncIOTickQ(void *param, void *tmrId);
static void syncIOTickPing(void *param, void *tmrId);
// ---------------------------- // ----------------------------
// public function ------------ // public function ------------
int32_t syncIOStart(char *host, uint16_t port) { int32_t syncIOStart(char *host, uint16_t port) {
int32_t ret = 0;
gSyncIO = syncIOCreate(host, port); gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL); assert(gSyncIO != NULL);
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
int32_t ret = syncIOStartInternal(gSyncIO); ret = syncIOStartInternal(gSyncIO);
assert(ret == 0); assert(ret == 0);
sTrace("syncIOStart ok, gSyncIO:%p gSyncIO->clientRpc:%p", gSyncIO, gSyncIO->clientRpc); sTrace("syncIOStart ok, gSyncIO:%p", gSyncIO);
return 0; return ret;
} }
int32_t syncIOStop() { int32_t syncIOStop() {
@ -61,37 +65,25 @@ int32_t syncIOStop() {
return ret; return ret;
} }
int32_t syncIOTickQ() {
int32_t ret = syncIOTickQInternal(gSyncIO);
assert(ret == 0);
return ret;
}
int32_t syncIOTickPing() {
int32_t ret = syncIOTickPingInternal(gSyncIO);
assert(ret == 0);
return ret;
}
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
sTrace( assert(pEpSet->inUse == 0);
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " assert(pEpSet->numOfEps == 1);
"pMsg->msgType:%d, pMsg->contLen:%d",
clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, int32_t ret = 0;
pMsg->msgType, pMsg->contLen); char logBuf[256];
{ snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port);
cJSON *pJson = syncRpcMsg2Json(pMsg); syncRpcMsgPrint2(logBuf, pMsg);
char * serialized = cJSON_Print(pJson);
sTrace("process syncMessage send: pMsg:%s ", serialized);
free(serialized);
cJSON_Delete(pJson);
}
pMsg->handle = NULL; pMsg->handle = NULL;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0; return ret;
} }
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
int32_t ret = 0;
char logBuf[128];
syncRpcMsgPrint2((char *)"==syncIOEqMsg==", pMsg);
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
@ -99,11 +91,75 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
STaosQueue *pMsgQ = queue; STaosQueue *pMsgQ = queue;
taosWriteQitem(pMsgQ, pTemp); taosWriteQitem(pMsgQ, pTemp);
return 0; return ret;
}
int32_t syncIOQTimerStart() {
int32_t ret = syncIOStartQ(gSyncIO);
assert(ret == 0);
return ret;
}
int32_t syncIOQTimerStop() {
int32_t ret = syncIOStopQ(gSyncIO);
assert(ret == 0);
return ret;
}
int32_t syncIOPingTimerStart() {
int32_t ret = syncIOStartPing(gSyncIO);
assert(ret == 0);
return ret;
}
int32_t syncIOPingTimerStop() {
int32_t ret = syncIOStopPing(gSyncIO);
assert(ret == 0);
return ret;
} }
// local function ------------ // local function ------------
static SSyncIO *syncIOCreate(char *host, uint16_t port) {
SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO));
memset(io, 0, sizeof(*io));
io->pMsgQ = taosOpenQueue();
io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
io->myAddr.inUse = 0;
io->myAddr.numOfEps = 0;
addEpIntoEpSet(&io->myAddr, host, port);
io->qTimerMS = TICK_Q_TIMER_MS;
io->pingTimerMS = TICK_Ping_TIMER_MS;
return io;
}
static int32_t syncIODestroy(SSyncIO *io) {
int32_t ret = 0;
int8_t start = atomic_load_8(&io->isStart);
assert(start == 0);
if (io->serverRpc != NULL) {
rpcClose(io->serverRpc);
io->serverRpc = NULL;
}
if (io->clientRpc != NULL) {
rpcClose(io->clientRpc);
io->clientRpc = NULL;
}
taosCloseQueue(io->pMsgQ);
taosCloseQset(io->pQset);
return ret;
}
static int32_t syncIOStartInternal(SSyncIO *io) { static int32_t syncIOStartInternal(SSyncIO *io) {
int32_t ret = 0;
taosBlockSIGPIPE(); taosBlockSIGPIPE();
rpcInit(); rpcInit();
@ -163,58 +219,24 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
} }
// start tmr thread // start tmr thread
io->ioTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); io->timerMgr = taosTmrInit(1000, 50, 10000, "SYNC-IO");
return 0; atomic_store_8(&io->isStart, 1);
return ret;
} }
static int32_t syncIOStopInternal(SSyncIO *io) { static int32_t syncIOStopInternal(SSyncIO *io) {
int32_t ret = 0;
atomic_store_8(&io->isStart, 0); atomic_store_8(&io->isStart, 0);
pthread_join(io->consumerTid, NULL); pthread_join(io->consumerTid, NULL);
return 0; taosTmrCleanUp(io->timerMgr);
} return ret;
static SSyncIO *syncIOCreate(char *host, uint16_t port) {
SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO));
memset(io, 0, sizeof(*io));
io->pMsgQ = taosOpenQueue();
io->pQset = taosOpenQset();
taosAddIntoQset(io->pQset, io->pMsgQ, NULL);
io->myAddr.inUse = 0;
addEpIntoEpSet(&io->myAddr, host, port);
return io;
}
static int32_t syncIODestroy(SSyncIO *io) {
int8_t start = atomic_load_8(&io->isStart);
assert(start == 0);
if (io->serverRpc != NULL) {
free(io->serverRpc);
io->serverRpc = NULL;
}
if (io->clientRpc != NULL) {
free(io->clientRpc);
io->clientRpc = NULL;
}
taosCloseQueue(io->pMsgQ);
taosCloseQset(io->pQset);
return 0;
} }
static void *syncIOConsumerFunc(void *param) { static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
@ -226,77 +248,67 @@ static void *syncIOConsumerFunc(void *param) {
for (int i = 0; i < numOfMsgs; ++i) { for (int i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pRpcMsg); taosGetQitem(qall, (void **)&pRpcMsg);
syncRpcMsgLog2((char *)"==syncIOConsumerFunc==", pRpcMsg);
char *s = syncRpcMsg2Str(pRpcMsg); // use switch case instead of if else
sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen,
s);
free(s);
if (pRpcMsg->msgType == SYNC_PING) { if (pRpcMsg->msgType == SYNC_PING) {
if (io->FpOnSyncPing != NULL) { if (io->FpOnSyncPing != NULL) {
SyncPing *pSyncMsg; SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
/*
pSyncMsg = syncPingBuild(pRpcMsg->contLen); pSyncMsg = syncPingBuild(pRpcMsg->contLen);
syncPingFromRpcMsg(pRpcMsg, pSyncMsg); syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg); io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
*/
} }
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) { } else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
if (io->FpOnSyncPingReply != NULL) { if (io->FpOnSyncPingReply != NULL) {
SyncPingReply *pSyncMsg; SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen);
syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote != NULL) { if (io->FpOnSyncRequestVote != NULL) {
SyncRequestVote *pSyncMsg; SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen);
syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg); io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); syncRequestVoteDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply != NULL) { if (io->FpOnSyncRequestVoteReply != NULL) {
SyncRequestVoteReply *pSyncMsg; SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
pSyncMsg = syncRequestVoteReplyBuild();
syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
if (io->FpOnSyncAppendEntries != NULL) { if (io->FpOnSyncAppendEntries != NULL) {
SyncAppendEntries *pSyncMsg; SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen);
syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg); io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
if (io->FpOnSyncAppendEntriesReply != NULL) { if (io->FpOnSyncAppendEntriesReply != NULL) {
SyncAppendEntriesReply *pSyncMsg; SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
pSyncMsg = syncAppendEntriesReplyBuild();
syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg); io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) { } else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) { if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg; SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
pSyncMsg = syncTimeoutBuild();
syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
} }
} else { } else {
; sTrace("unknown msgType:%d, no operator", pRpcMsg->msgType);
} }
} }
@ -306,15 +318,16 @@ static void *syncIOConsumerFunc(void *param) {
rpcFreeCont(pRpcMsg->pCont); rpcFreeCont(pRpcMsg->pCont);
if (pRpcMsg->handle != NULL) { if (pRpcMsg->handle != NULL) {
int msgSize = 128; int msgSize = 32;
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.msgType = SYNC_RESPONSE;
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply"); snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply");
rpcMsg.handle = pRpcMsg->handle; rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 0; rpcMsg.code = 0;
sTrace("syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d", pRpcMsg->msgType, rpcMsg.contLen); syncRpcMsgPrint2((char *)"syncIOConsumerFunc rpcSendResponse --> ", &rpcMsg);
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
@ -326,60 +339,102 @@ static void *syncIOConsumerFunc(void *param) {
return NULL; return NULL;
} }
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int ret = 0;
return ret;
}
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sTrace("<-- syncIOProcessRequest --> type:%d, contLen:%d, cont:%s", pMsg->msgType, pMsg->contLen, syncRpcMsgPrint2((char *)"==syncIOProcessRequest==", pMsg);
(char *)pMsg->pCont);
SSyncIO *io = pParent; SSyncIO *io = pParent;
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
taosWriteQitem(io->pMsgQ, pTemp); taosWriteQitem(io->pMsgQ, pTemp);
} }
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
sTrace("syncIOProcessReply: type:%d, contLen:%d msg:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont); if (pMsg->msgType == SYNC_RESPONSE) {
sTrace("==syncIOProcessReply==");
} else {
syncRpcMsgPrint2((char *)"==syncIOProcessReply==", pMsg);
}
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
static int32_t syncIOTickQInternal(SSyncIO *io) { static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
io->ioTimerTickQ = taosTmrStart(syncIOTickQFunc, 1000, io, io->ioTimerManager); // app shall retrieve the auth info based on meterID from DB or a data file
return 0; // demo code here only for simple demo
int32_t ret = 0;
return ret;
} }
static void syncIOTickQFunc(void *param, void *tmrId) { static int32_t syncIOStartQ(SSyncIO *io) {
int32_t ret = 0;
taosTmrReset(syncIOTickQ, io->qTimerMS, io, io->timerMgr, &io->qTimer);
return ret;
}
static int32_t syncIOStopQ(SSyncIO *io) {
int32_t ret = 0;
taosTmrStop(io->qTimer);
io->qTimer = NULL;
return ret;
}
static int32_t syncIOStartPing(SSyncIO *io) {
int32_t ret = 0;
taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer);
return ret;
}
static int32_t syncIOStopPing(SSyncIO *io) {
int32_t ret = 0;
taosTmrStop(io->pingTimer);
io->pingTimer = NULL;
return ret;
}
static void syncIOTickQ(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param; SSyncIO *io = (SSyncIO *)param;
sTrace("<-- syncIOTickQFunc -->");
SRaftId srcId, destId;
srcId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port);
srcId.vgId = -1;
destId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port);
destId.vgId = -1;
SyncPingReply *pMsg = syncPingReplyBuild2(&srcId, &destId, "syncIOTickQ");
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
rpcMsg.contLen = 64; syncPingReply2RpcMsg(pMsg, &rpcMsg);
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickQ");
rpcMsg.handle = NULL;
rpcMsg.msgType = 55;
SRpcMsg *pTemp; SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg)); pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg));
syncRpcMsgPrint2((char *)"==syncIOTickQ==", &rpcMsg);
taosWriteQitem(io->pMsgQ, pTemp); taosWriteQitem(io->pMsgQ, pTemp);
taosTmrReset(syncIOTickQFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickQ); syncPingReplyDestroy(pMsg);
taosTmrReset(syncIOTickQ, io->qTimerMS, io, io->timerMgr, &io->qTimer);
} }
static int32_t syncIOTickPingInternal(SSyncIO *io) { static void syncIOTickPing(void *param, void *tmrId) {
io->ioTimerTickPing = taosTmrStart(syncIOTickPingFunc, 1000, io, io->ioTimerManager); SSyncIO *io = (SSyncIO *)param;
return 0;
SRaftId srcId, destId;
srcId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port);
srcId.vgId = -1;
destId.addr = syncUtilAddr2U64(io->myAddr.eps[0].fqdn, io->myAddr.eps[0].port);
destId.vgId = -1;
SyncPing *pMsg = syncPingBuild2(&srcId, &destId, "syncIOTickPing");
// SyncPing *pMsg = syncPingBuild3(&srcId, &destId);
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"==syncIOTickPing==", &rpcMsg);
rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL);
syncPingDestroy(pMsg);
taosTmrReset(syncIOTickPing, io->pingTimerMS, io, io->timerMgr, &io->pingTimer);
} }
#if 0
static void syncIOTickPingFunc(void *param, void *tmrId) { static void syncIOTickPingFunc(void *param, void *tmrId) {
SSyncIO *io = (SSyncIO *)param; SSyncIO *io = (SSyncIO *)param;
sTrace("<-- syncIOTickPingFunc -->"); sTrace("<-- syncIOTickPingFunc -->");
@ -394,3 +449,4 @@ static void syncIOTickPingFunc(void *param, void *tmrId) {
rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL); rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL);
taosTmrReset(syncIOTickPingFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickPing); taosTmrReset(syncIOTickPingFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickPing);
} }
#endif

View File

@ -65,10 +65,32 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
pRoot = syncAppendEntriesReply2Json(pSyncMsg); pRoot = syncAppendEntriesReply2Json(pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_RESPONSE) {
pRoot = cJSON_CreateObject();
char* s;
s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
cJSON_AddStringToObject(pRoot, "pCont", s);
free(s);
s = syncUtilprintBin2((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
cJSON_AddStringToObject(pRoot, "pCont2", s);
free(s);
} else { } else {
pRoot = syncRpcUnknownMsg2Json(); pRoot = syncRpcUnknownMsg2Json();
char* s;
s = syncUtilprintBin((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
cJSON_AddStringToObject(pRoot, "pCont", s);
free(s);
s = syncUtilprintBin2((char*)(pRpcMsg->pCont), pRpcMsg->contLen);
cJSON_AddStringToObject(pRoot, "pCont2", s);
free(s);
} }
cJSON_AddNumberToObject(pRoot, "msgType", pRpcMsg->msgType);
cJSON_AddNumberToObject(pRoot, "contLen", pRpcMsg->contLen);
cJSON_AddNumberToObject(pRoot, "code", pRpcMsg->code);
cJSON_AddNumberToObject(pRoot, "persist", pRpcMsg->persist);
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RpcMsg", pRoot); cJSON_AddItemToObject(pJson, "RpcMsg", pRoot);
return pJson; return pJson;
@ -77,7 +99,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* syncRpcUnknownMsg2Json() { cJSON* syncRpcUnknownMsg2Json() {
cJSON* pRoot = cJSON_CreateObject(); cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN); cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN);
cJSON_AddStringToObject(pRoot, "data", "known message"); cJSON_AddStringToObject(pRoot, "data", "unknown message");
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncUnknown", pRoot); cJSON_AddItemToObject(pJson, "SyncUnknown", pRoot);

View File

@ -25,8 +25,15 @@ int main() {
ret = syncIOStart((char*)"127.0.0.1", 7010); ret = syncIOStart((char*)"127.0.0.1", 7010);
assert(ret == 0); assert(ret == 0);
ret = syncIOTickPing(); for (int i = 0; i < 3; ++i) {
assert(ret == 0); ret = syncIOPingTimerStart();
assert(ret == 0);
taosMsleep(5000);
ret = syncIOPingTimerStop();
assert(ret == 0);
taosMsleep(5000);
}
while (1) { while (1) {
taosSsleep(1); taosSsleep(1);

View File

@ -25,11 +25,18 @@ int main() {
ret = syncIOStart((char*)"127.0.0.1", 7010); ret = syncIOStart((char*)"127.0.0.1", 7010);
assert(ret == 0); assert(ret == 0);
ret = syncIOTickQ(); for (int i = 0; i < 3; ++i) {
ret = syncIOQTimerStart();
assert(ret == 0);
taosMsleep(5000);
ret = syncIOQTimerStop();
assert(ret == 0);
taosMsleep(5000);
}
ret = syncIOStop();
assert(ret == 0); assert(ret == 0);
while (1) {
taosSsleep(1);
}
return 0; return 0;
} }

View File

@ -26,6 +26,7 @@ int main() {
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = 143 + 64; sDebugFlag = 143 + 64;
logTest(); logTest();
electRandomMSTest(); electRandomMSTest();
return 0; return 0;