diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index a948de8ac1..160fefd086 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -31,11 +31,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset *pQset; + STaosQset * pQset; pthread_t consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; void *ioTimerTickQ; diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index a2e745b3d9..b022044528 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,7 +30,8 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { - SYNC_UNKNOWN = 99, + SYNC_UNKNOWN = 77, + SYNC_TIMEOUT = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST = 105, @@ -39,7 +40,7 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, - SYNC_TIMEOUT = 117, + } ESyncMessageType; // --------------------------------------------- @@ -48,17 +49,28 @@ cJSON* syncRpcUnknownMsg2Json(); // --------------------------------------------- typedef enum ESyncTimeoutType { - SYNC_TIMEOUT_PING = 0, + SYNC_TIMEOUT_PING = 100, SYNC_TIMEOUT_ELECTION, SYNC_TIMEOUT_HEARTBEAT, } ESyncTimeoutType; typedef struct SyncTimeout { - ESyncTimeoutType type; + uint32_t bytes; + uint32_t msgType; + ESyncTimeoutType timeoutType; void* data; } SyncTimeout; +SyncTimeout* syncTimeoutBuild(); +void syncTimeoutDestroy(SyncTimeout* pMsg); +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); +cJSON* syncTimeout2Json(const SyncTimeout* pMsg); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data); + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index ffd982f233..55c4d57201 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -246,7 +246,14 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { - } else { + if (io->FpOnSyncTimeout != NULL) { + SyncTimeout *pSyncMsg; + pSyncMsg = syncTimeoutBuild(); + syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + } + } else { ; } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 57bc754735..5f3a276a68 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -27,6 +27,10 @@ static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNo static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); static void syncNodePingTimerCb(void* param, void* tmrId); +static void syncNodeEqPingTimer(void* param, void* tmrId); +static void syncNodeEqElectTimer(void* param, void* tmrId); +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); + static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); @@ -95,7 +99,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = 1000; atomic_store_8(&pSyncNode->pingTimerEnable, 0); - pSyncNode->FpPingTimer = syncNodePingTimerCb; + pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; pSyncNode->FpOnPing = syncNodeOnPingCb; @@ -104,6 +108,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; + pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb; return pSyncNode; } @@ -329,6 +334,27 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; + sTrace("<-- syncNodeOnTimeoutCb -->"); + + { + cJSON* pJson = syncTimeout2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { + if (atomic_load_8(&ths->pingTimerEnable)) { + ++(ths->pingTimerCounter); + syncNodePingAll(ths); + } + + } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { + } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { + } else { + } + return ret; } @@ -336,7 +362,6 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_8(&pSyncNode->pingTimerEnable)) { ++(pSyncNode->pingTimerCounter); - // pSyncNode->pingTimerMS += 100; sTrace( "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " @@ -350,4 +375,26 @@ static void syncNodePingTimerCb(void* param, void* tmrId) { } else { sTrace("syncNodePingTimerCb: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); } -} \ No newline at end of file +} + +static void syncNodeEqPingTimer(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_8(&pSyncNode->pingTimerEnable)) { + // pSyncNode->pingTimerMS += 100; + + SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncTimeoutDestroy(pSyncMsg); + + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } else { + sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); + } +} + +static void syncNodeEqElectTimer(void* param, void* tmrId) {} + +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 4dbae9bcba..f1434947a1 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -24,7 +24,12 @@ void onMessage(SRaft* pRaft, void* pMsg) {} cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { cJSON* pRoot; - if (pRpcMsg->msgType == SYNC_PING) { + // in compiler optimization, switch case = if else constants + if (pRpcMsg->msgType == SYNC_TIMEOUT) { + SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont; + pRoot = syncTimeout2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_PING) { SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; pRoot = syncPing2Json(pSyncMsg); @@ -73,6 +78,66 @@ cJSON* syncRpcUnknownMsg2Json() { return pJson; } +// ---- message process SyncTimeout---- +SyncTimeout* syncTimeoutBuild() { + uint32_t bytes = sizeof(SyncTimeout); + SyncTimeout* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_TIMEOUT; + return pMsg; +} + +void syncTimeoutDestroy(SyncTimeout* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) { + syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); + snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); + cJSON_AddStringToObject(pRoot, "data", u64buf); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot); + return pJson; +} + +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) { + SyncTimeout* pMsg = syncTimeoutBuild(); + pMsg->timeoutType = timeoutType; + pMsg->data = data; + return pMsg; +} + // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 06ba5ba6ce..77413a713b 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -78,6 +78,7 @@ int main(int argc, char** argv) { SSyncNode* pSyncNode = doSync(myIndex); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0);