From 7cb3112d77fcfca0872e3ef6f4b4c768ab83f5a8 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 27 Oct 2022 15:52:48 +0800 Subject: [PATCH 01/13] refactor(sync): modify function name --- include/libs/sync/syncTools.h | 4 ++-- source/dnode/mnode/impl/src/mndMain.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSync.c | 4 ++-- source/libs/sync/src/syncMain.c | 27 +++++++++++++------------- source/libs/sync/src/syncReplication.c | 2 +- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index d5c015bfb2..1caf21b6a7 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -679,8 +679,8 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); // on message ---------------------- -int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); -int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index fbfa1b73be..94c4ba8c79 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -540,12 +540,12 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); - code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + code = syncNodeOnPing(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); - code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + code = syncNodeOnPingReply(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d3ae1015d0..7acf5b4003 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -301,13 +301,13 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } else if (pMsg->msgType == TDMT_SYNC_PING) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + code = syncNodeOnPing(pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); ASSERT(pSyncMsg != NULL); - code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + code = syncNodeOnPingReply(pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f1c900de50..bf76762757 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -49,8 +49,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths); static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId); // process message ---- -int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); -int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); // --------------------------------- static void syncNodeFreeCb(void* param) { @@ -1327,8 +1327,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { } // init callback - pSyncNode->FpOnPing = syncNodeOnPingCb; - pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; + pSyncNode->FpOnPing = syncNodeOnPing; + pSyncNode->FpOnPingReply = syncNodeOnPingReply; pSyncNode->FpOnClientRequest = syncNodeOnClientRequest; pSyncNode->FpOnTimeout = syncNodeOnTimer; pSyncNode->FpOnSnapshot = syncNodeOnSnapshot; @@ -3003,18 +3003,19 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { } // on message ---- -int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { - // log state +int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg) { + sTrace("vgId:%d, recv sync-ping", ths->vgId); + SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->myRaftId, &pMsg->srcId, ths->vgId); SRpcMsg rpcMsg; syncPingReply2RpcMsg(pMsgReply, &rpcMsg); /* - // htonl - SMsgHead* pHead = rpcMsg.pCont; - pHead->contLen = htonl(pHead->contLen); - pHead->vgId = htonl(pHead->vgId); -*/ + // htonl + SMsgHead* pHead = rpcMsg.pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + */ syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); syncPingReplyDestroy(pMsgReply); @@ -3022,9 +3023,9 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { return 0; } -int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { +int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg) { int32_t ret = 0; - syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); + sTrace("vgId:%d, recv sync-ping-reply", ths->vgId); return ret; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 7296e30e02..af53123421 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -57,7 +57,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) { char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, + snprintf(logBuf, sizeof(logBuf), "maybe start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, nextIndex, logStartIndex, logEndIndex); syncNodeEventLog(pSyncNode, logBuf); From 04c06841495bc5859ebda47abe3e10c467484c1d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 27 Oct 2022 16:21:54 +0800 Subject: [PATCH 02/13] refactor(sync): add SyncLocalCmd --- include/libs/sync/syncTools.h | 41 +++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 1caf21b6a7..b4eabfcbcd 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -678,24 +678,57 @@ void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg); void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); +// --------------------------------------------- +typedef struct SyncLocalCmd { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + + int8_t cmd; + SyncTerm sdNewTerm // step down new term + +} SyncLocalCmd; + +SyncLocalCmd* syncLocalCmdBuild(uint32_t dataLen, int32_t vgId); +void syncLocalCmdDestroy(SyncLocalCmd* pMsg); +void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen); +void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg); +char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len); +SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len); +void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg); +void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg); +SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg); +char* syncLocalCmd2Str(const SyncLocalCmd* pMsg); + +// for debug ---------------------- +void syncLocalCmdPrint(const SyncLocalCmd* pMsg); +void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg); +void syncLocalCmdLog(const SyncLocalCmd* pMsg); +void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg); + // on message ---------------------- int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg); -int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); -int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); -int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); -int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); + int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); +int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex); +int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg); +int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg); + // ----------------------------------------- typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); From 8b5d005db70f73dca410b6112cf0c7593b0ac65a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 27 Oct 2022 17:16:46 +0800 Subject: [PATCH 03/13] refactor(sync): add SyncLocalCmd --- include/common/tmsgdef.h | 1 + include/libs/sync/syncTools.h | 8 +- source/libs/sync/src/syncMessage.c | 150 +++++++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 2 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index be4bf0e4d2..571d14fe3c 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -269,6 +269,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) // no longer used TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT, "sync-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_HEARTBEAT_REPLY, "sync-heartbeat-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_LOCAL_CMD, "sync-local-cmd", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) #if defined(TD_MSG_NUMBER_) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index b4eabfcbcd..1c219f98bd 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -679,6 +679,10 @@ void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); // --------------------------------------------- +typedef enum { + SYNC_STEP_DOWN = 0, +} ESyncLocalCmd; + typedef struct SyncLocalCmd { uint32_t bytes; int32_t vgId; @@ -686,8 +690,8 @@ typedef struct SyncLocalCmd { SRaftId srcId; SRaftId destId; - int8_t cmd; - SyncTerm sdNewTerm // step down new term + int32_t cmd; + SyncTerm sdNewTerm; // step down new term } SyncLocalCmd; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 7a8ea0c4d7..b62c564fcd 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -3095,3 +3095,153 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { taosMemoryFree(serialized); } } + +// --------------------------------------------- +SyncLocalCmd* syncLocalCmdBuild(uint32_t dataLen, int32_t vgId) { + uint32_t bytes = sizeof(SyncLocalCmd); + SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_LOCAL_CMD; + return pMsg; +} + +void syncLocalCmdDestroy(SyncLocalCmd* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen) { + ASSERT(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg) { + memcpy(pMsg, buf, len); + ASSERT(len == pMsg->bytes); +} + +char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + ASSERT(buf != NULL); + syncLocalCmdSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); + ASSERT(pMsg != NULL); + syncLocalCmdDeserialize(buf, len, pMsg); + ASSERT(len == pMsg->bytes); + return pMsg; +} + +void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncLocalCmdSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg) { + syncLocalCmdDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncLocalCmd* pMsg = syncLocalCmdDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + ASSERT(pMsg != NULL); + return pMsg; +} + +cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr); + cJSON_AddStringToObject(pSrcId, "addr", u64buf); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr); + cJSON_AddStringToObject(pDestId, "addr", u64buf); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "cmd", pMsg->cmd); + + snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm); + cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncSnapshotRsp", pRoot); + return pJson; +} + +char* syncLocalCmd2Str(const SyncLocalCmd* pMsg) { + cJSON* pJson = syncLocalCmd2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncLocalCmdPrint(const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + printf("syncLocalCmdPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + printf("syncLocalCmdPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncLocalCmdLog(const SyncLocalCmd* pMsg) { + char* serialized = syncLocalCmd2Str(pMsg); + sTrace("syncLocalCmdLog | len:%d | %s", (int32_t)strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncLocalCmd2Str(pMsg); + sTrace("syncLocalCmdLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } +} \ No newline at end of file From 54dfc8cd524b6e11ba07d2b19076dae2641f16fc Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 27 Oct 2022 17:38:33 +0800 Subject: [PATCH 04/13] enh(sync): add test --- source/libs/sync/test/CMakeLists.txt | 14 +++ .../test/syncConfigChangeSnapshotTest.cpp | 2 +- .../libs/sync/test/syncConfigChangeTest.cpp | 2 +- source/libs/sync/test/syncEncodeTest.cpp | 2 +- source/libs/sync/test/syncIndexTest.cpp | 2 +- source/libs/sync/test/syncLocalCmdTest.cpp | 100 ++++++++++++++++++ source/libs/sync/test/syncRaftIdCheck.cpp | 2 +- source/libs/sync/test/syncRaftLogTest3.cpp | 16 +-- source/libs/sync/test/syncReplicateTest.cpp | 2 +- source/libs/sync/test/syncRespMgrTest.cpp | 6 +- source/libs/sync/test/syncTestTool.cpp | 4 +- 11 files changed, 133 insertions(+), 19 deletions(-) create mode 100644 source/libs/sync/test/syncLocalCmdTest.cpp diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index b9cc7a391d..26dd32942b 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -59,6 +59,7 @@ add_executable(syncRestoreFromSnapshot "") add_executable(syncRaftCfgIndexTest "") add_executable(syncHeartbeatTest "") add_executable(syncHeartbeatReplyTest "") +add_executable(syncLocalCmdTest "") target_sources(syncTest @@ -305,6 +306,10 @@ target_sources(syncHeartbeatReplyTest PRIVATE "syncHeartbeatReplyTest.cpp" ) +target_sources(syncLocalCmdTest + PRIVATE + "syncLocalCmdTest.cpp" +) target_include_directories(syncTest @@ -612,6 +617,11 @@ target_include_directories(syncHeartbeatReplyTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncLocalCmdTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -858,6 +868,10 @@ target_link_libraries(syncHeartbeatReplyTest sync gtest_main ) +target_link_libraries(syncLocalCmdTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 95677e592b..8f16be27e7 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -270,7 +270,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count, taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index c04ab9b000..d1244546c9 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -191,7 +191,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count, taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp index 8b209c4c9e..9f1a81e7ed 100644 --- a/source/libs/sync/test/syncEncodeTest.cpp +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -186,7 +186,7 @@ int main(int argc, char **argv) { int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, pEntry->index, &pEntry); ASSERT(code == 0); - syncEntryLog2((char *)"==pEntry2==", pEntry2); + syncEntryLog2((char *)"==pEntry==", pEntry); // step5 uint32_t len; diff --git a/source/libs/sync/test/syncIndexTest.cpp b/source/libs/sync/test/syncIndexTest.cpp index 93cd5f79f0..763117c0c9 100644 --- a/source/libs/sync/test/syncIndexTest.cpp +++ b/source/libs/sync/test/syncIndexTest.cpp @@ -13,7 +13,7 @@ void print(SHashObj *pNextIndex) { SRaftId *pRaftId = (SRaftId *)key; - printf("key:<" PRIu64 ", %d>, value:%" PRIu64 " \n", pRaftId->addr, pRaftId->vgId, *p); + printf("key:<%" PRIu64 ", %d>, value:%" PRIu64 " \n", pRaftId->addr, pRaftId->vgId, *p); p = (uint64_t *)taosHashIterate(pNextIndex, p); } } diff --git a/source/libs/sync/test/syncLocalCmdTest.cpp b/source/libs/sync/test/syncLocalCmdTest.cpp new file mode 100644 index 0000000000..256c13e267 --- /dev/null +++ b/source/libs/sync/test/syncLocalCmdTest.cpp @@ -0,0 +1,100 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SyncAppendEntries *createMsg() { + SyncAppendEntries *pMsg = syncAppendEntriesBuild(20, 1000); + pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); + pMsg->srcId.vgId = 100; + pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); + pMsg->destId.vgId = 100; + pMsg->prevLogIndex = 11; + pMsg->prevLogTerm = 22; + pMsg->commitIndex = 33; + pMsg->privateTerm = 44; + strcpy(pMsg->data, "hello world"); + return pMsg; +} + +void test1() { + SyncAppendEntries *pMsg = createMsg(); + syncAppendEntriesLog2((char *)"test1:", pMsg); + syncAppendEntriesDestroy(pMsg); +} + +void test2() { + SyncAppendEntries *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char *serialized = (char *)taosMemoryMalloc(len); + syncAppendEntriesSerialize(pMsg, serialized, len); + SyncAppendEntries *pMsg2 = syncAppendEntriesBuild(pMsg->dataLen, 1000); + syncAppendEntriesDeserialize(serialized, len, pMsg2); + syncAppendEntriesLog2((char *)"test2: syncAppendEntriesSerialize -> syncAppendEntriesDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test3() { + SyncAppendEntries *pMsg = createMsg(); + uint32_t len; + char *serialized = syncAppendEntriesSerialize2(pMsg, &len); + SyncAppendEntries *pMsg2 = syncAppendEntriesDeserialize2(serialized, len); + syncAppendEntriesLog2((char *)"test3: syncAppendEntriesSerialize3 -> syncAppendEntriesDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test4() { + SyncAppendEntries *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntries *pMsg2 = (SyncAppendEntries *)taosMemoryMalloc(rpcMsg.contLen); + syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2); + syncAppendEntriesLog2((char *)"test4: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +void test5() { + SyncAppendEntries *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncAppendEntries2RpcMsg(pMsg, &rpcMsg); + SyncAppendEntries *pMsg2 = syncAppendEntriesFromRpcMsg2(&rpcMsg); + syncAppendEntriesLog2((char *)"test5: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncAppendEntriesDestroy(pMsg); + syncAppendEntriesDestroy(pMsg2); +} + +int main() { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncRaftIdCheck.cpp b/source/libs/sync/test/syncRaftIdCheck.cpp index 65da0f6631..e7ef69da20 100644 --- a/source/libs/sync/test/syncRaftIdCheck.cpp +++ b/source/libs/sync/test/syncRaftIdCheck.cpp @@ -15,7 +15,7 @@ int main(int argc, char** argv) { char host[128]; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); - printf("" PRIu64 " -> %s:%d \n", u64, host, port); + printf("%" PRIu64 " -> %s:%d \n", u64, host, port); } else if (argc == 3) { uint64_t u64; diff --git a/source/libs/sync/test/syncRaftLogTest3.cpp b/source/libs/sync/test/syncRaftLogTest3.cpp index fd4cade31c..1171ef8785 100644 --- a/source/libs/sync/test/syncRaftLogTest3.cpp +++ b/source/libs/sync/test/syncRaftLogTest3.cpp @@ -97,8 +97,8 @@ void test1() { sTrace("lastIndex: %" PRId64, lastIndex); sTrace("lastTerm: %" PRIu64, lastTerm); sTrace("syncStartIndex: %" PRId64, syncStartIndex); - sTrace("" PRId64 "'s preIndex: %" PRId64, testIndex, preIndex); - sTrace("" PRId64 "'s preTerm: %" PRIu64, testIndex, preTerm); + sTrace("testIndex: %" PRId64 " preIndex: %" PRId64, testIndex, preIndex); + sTrace("testIndex: %" PRId64 " preTerm: %" PRIu64, testIndex, preTerm); if (gAssert) { assert(lastIndex == -1); @@ -170,8 +170,8 @@ void test2() { SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); - sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); - sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); + sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex); + sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm); if (gAssert) { SyncIndex preIndexArr[12] = {-1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; @@ -292,8 +292,8 @@ void test4() { SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); - sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); - sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); + sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex); + sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm); } logStoreDestory(pLogStore); @@ -354,8 +354,8 @@ void test5() { SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, i); SyncTerm preTerm = syncNodeGetPreTerm(pSyncNode, i); - sTrace("" PRId64 "'s preIndex: %" PRId64, i, preIndex); - sTrace("" PRId64 "'s preTerm: %" PRIu64, i, preTerm); + sTrace("i: %" PRId64 " preIndex: %" PRId64, i, preIndex); + sTrace("i: %" PRId64 " preTerm: %" PRIu64, i, preTerm); if (gAssert) { SyncIndex preIndexArr[12] = {9999, 9999, 9999, 9999, 9999, 9999, 5, 6, 7, 8, 9, 10}; diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index d3ba4bc136..adb3deb22d 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -145,7 +145,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = 9999; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count, taosGetTimestampMs()); return pMsg; } diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index 35daff796f..cad6eec91d 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -58,18 +58,18 @@ void syncRespMgrGetTest(uint64_t i) { if (ret == 1) { printStub(&stub); } else if (ret == 0) { - printf("" PRId64 " notFound \n", i); + printf("%" PRId64 " notFound \n", i); } } void syncRespMgrGetAndDelTest(uint64_t i) { - printf("------syncRespMgrGetAndDelTest-------" PRIu64 "-- \n", i); + printf("------syncRespMgrGetAndDelTest-------%" PRIu64 "-- \n", i); SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub); if (ret == 1) { printStub(&stub); } else if (ret == 0) { - printf("" PRId64 " notFound \n", i); + printf("%" PRId64 " notFound \n", i); } } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 1cdecfe5b3..bdb4d7d2d8 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -154,7 +154,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); } void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) { - char* s = syncCfg2Str(&(cbMeta.newCfg)); + char* s = syncCfg2Str(&(cbMeta->newCfg)); sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%" PRId64 ", code:%d, currentTerm:%" PRIu64 ", term:%" PRIu64 ", newCfg:%s", cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term, s); @@ -308,7 +308,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { pMsg->msgType = TDMT_VND_SUBMIT; pMsg->contLen = 256; pMsg->pCont = rpcMallocCont(pMsg->contLen); - snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-" PRId64, myIndex, i, count, + snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%" PRId64, myIndex, i, count, taosGetTimestampMs()); return pMsg; } From d7c866ee123a49b221cae2c5efd07fc8357a1191 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 27 Oct 2022 17:59:16 +0800 Subject: [PATCH 05/13] enh(sync): add test --- include/libs/sync/syncTools.h | 4 +- source/libs/sync/src/syncMessage.c | 4 +- source/libs/sync/test/syncLocalCmdTest.cpp | 72 +++++++++++----------- 3 files changed, 40 insertions(+), 40 deletions(-) diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 1c219f98bd..eedc403493 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -680,7 +680,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); // --------------------------------------------- typedef enum { - SYNC_STEP_DOWN = 0, + SYNC_LOCAL_CMD_STEP_DOWN = 100, } ESyncLocalCmd; typedef struct SyncLocalCmd { @@ -695,7 +695,7 @@ typedef struct SyncLocalCmd { } SyncLocalCmd; -SyncLocalCmd* syncLocalCmdBuild(uint32_t dataLen, int32_t vgId); +SyncLocalCmd* syncLocalCmdBuild(int32_t vgId); void syncLocalCmdDestroy(SyncLocalCmd* pMsg); void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen); void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index b62c564fcd..3c36633fe8 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -3097,7 +3097,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { } // --------------------------------------------- -SyncLocalCmd* syncLocalCmdBuild(uint32_t dataLen, int32_t vgId) { +SyncLocalCmd* syncLocalCmdBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncLocalCmd); SyncLocalCmd* pMsg = taosMemoryMalloc(bytes); memset(pMsg, 0, bytes); @@ -3206,7 +3206,7 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) { } cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SyncSnapshotRsp", pRoot); + cJSON_AddItemToObject(pJson, "SyncLocalCmd2Json", pRoot); return pJson; } diff --git a/source/libs/sync/test/syncLocalCmdTest.cpp b/source/libs/sync/test/syncLocalCmdTest.cpp index 256c13e267..de908bf9c1 100644 --- a/source/libs/sync/test/syncLocalCmdTest.cpp +++ b/source/libs/sync/test/syncLocalCmdTest.cpp @@ -14,78 +14,78 @@ void logTest() { sFatal("--- sync log test: fatal"); } -SyncAppendEntries *createMsg() { - SyncAppendEntries *pMsg = syncAppendEntriesBuild(20, 1000); +SyncLocalCmd *createMsg() { + SyncLocalCmd *pMsg = syncLocalCmdBuild(1000); pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234); pMsg->srcId.vgId = 100; pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; - pMsg->prevLogIndex = 11; - pMsg->prevLogTerm = 22; - pMsg->commitIndex = 33; - pMsg->privateTerm = 44; - strcpy(pMsg->data, "hello world"); + pMsg->sdNewTerm = 123; + pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; + return pMsg; } void test1() { - SyncAppendEntries *pMsg = createMsg(); - syncAppendEntriesLog2((char *)"test1:", pMsg); - syncAppendEntriesDestroy(pMsg); + SyncLocalCmd *pMsg = createMsg(); + syncLocalCmdLog2((char *)"test1:", pMsg); + syncLocalCmdDestroy(pMsg); } void test2() { - SyncAppendEntries *pMsg = createMsg(); + SyncLocalCmd *pMsg = createMsg(); uint32_t len = pMsg->bytes; char *serialized = (char *)taosMemoryMalloc(len); - syncAppendEntriesSerialize(pMsg, serialized, len); - SyncAppendEntries *pMsg2 = syncAppendEntriesBuild(pMsg->dataLen, 1000); - syncAppendEntriesDeserialize(serialized, len, pMsg2); - syncAppendEntriesLog2((char *)"test2: syncAppendEntriesSerialize -> syncAppendEntriesDeserialize ", pMsg2); + syncLocalCmdSerialize(pMsg, serialized, len); + SyncLocalCmd *pMsg2 = syncLocalCmdBuild(1000); + syncLocalCmdDeserialize(serialized, len, pMsg2); + syncLocalCmdLog2((char *)"test2: syncLocalCmdSerialize -> syncLocalCmdDeserialize ", pMsg2); taosMemoryFree(serialized); - syncAppendEntriesDestroy(pMsg); - syncAppendEntriesDestroy(pMsg2); + syncLocalCmdDestroy(pMsg); + syncLocalCmdDestroy(pMsg2); } void test3() { - SyncAppendEntries *pMsg = createMsg(); + SyncLocalCmd *pMsg = createMsg(); uint32_t len; - char *serialized = syncAppendEntriesSerialize2(pMsg, &len); - SyncAppendEntries *pMsg2 = syncAppendEntriesDeserialize2(serialized, len); - syncAppendEntriesLog2((char *)"test3: syncAppendEntriesSerialize3 -> syncAppendEntriesDeserialize2 ", pMsg2); + char *serialized = syncLocalCmdSerialize2(pMsg, &len); + SyncLocalCmd *pMsg2 = syncLocalCmdDeserialize2(serialized, len); + syncLocalCmdLog2((char *)"test3: syncLocalCmdSerialize3 -> syncLocalCmdDeserialize2 ", pMsg2); taosMemoryFree(serialized); - syncAppendEntriesDestroy(pMsg); - syncAppendEntriesDestroy(pMsg2); + syncLocalCmdDestroy(pMsg); + syncLocalCmdDestroy(pMsg2); } void test4() { - SyncAppendEntries *pMsg = createMsg(); + SyncLocalCmd *pMsg = createMsg(); SRpcMsg rpcMsg; - syncAppendEntries2RpcMsg(pMsg, &rpcMsg); - SyncAppendEntries *pMsg2 = (SyncAppendEntries *)taosMemoryMalloc(rpcMsg.contLen); - syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2); - syncAppendEntriesLog2((char *)"test4: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg ", pMsg2); + syncLocalCmd2RpcMsg(pMsg, &rpcMsg); + SyncLocalCmd *pMsg2 = (SyncLocalCmd *)taosMemoryMalloc(rpcMsg.contLen); + syncLocalCmdFromRpcMsg(&rpcMsg, pMsg2); + syncLocalCmdLog2((char *)"test4: syncLocalCmd2RpcMsg -> syncLocalCmdFromRpcMsg ", pMsg2); rpcFreeCont(rpcMsg.pCont); - syncAppendEntriesDestroy(pMsg); - syncAppendEntriesDestroy(pMsg2); + syncLocalCmdDestroy(pMsg); + syncLocalCmdDestroy(pMsg2); } void test5() { - SyncAppendEntries *pMsg = createMsg(); + SyncLocalCmd *pMsg = createMsg(); SRpcMsg rpcMsg; - syncAppendEntries2RpcMsg(pMsg, &rpcMsg); - SyncAppendEntries *pMsg2 = syncAppendEntriesFromRpcMsg2(&rpcMsg); - syncAppendEntriesLog2((char *)"test5: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg2 ", pMsg2); + syncLocalCmd2RpcMsg(pMsg, &rpcMsg); + SyncLocalCmd *pMsg2 = syncLocalCmdFromRpcMsg2(&rpcMsg); + syncLocalCmdLog2((char *)"test5: syncLocalCmd2RpcMsg -> syncLocalCmdFromRpcMsg2 ", pMsg2); rpcFreeCont(rpcMsg.pCont); - syncAppendEntriesDestroy(pMsg); - syncAppendEntriesDestroy(pMsg2); + syncLocalCmdDestroy(pMsg); + syncLocalCmdDestroy(pMsg2); } int main() { + gRaftDetailLog = true; + tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; logTest(); From 438786501b20560eaf8b0a4fe1428a943a7a5e38 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 28 Oct 2022 15:31:34 +0800 Subject: [PATCH 06/13] refactor(sync): adjust elect timer ms --- source/libs/sync/inc/syncEnv.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index 2a37e000e2..06da0eb3df 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -28,13 +28,13 @@ extern "C" { #include "trpc.h" #include "ttimer.h" -#define TIMER_MAX_MS 0x7FFFFFFF -#define ENV_TICK_TIMER_MS 1000 -#define PING_TIMER_MS 5000 -#define ELECT_TIMER_MS_MIN 5000 -#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) +#define TIMER_MAX_MS 0x7FFFFFFF +#define ENV_TICK_TIMER_MS 1000 +#define PING_TIMER_MS 5000 +#define ELECT_TIMER_MS_MIN 2500 +#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) -#define HEARTBEAT_TIMER_MS 900 +#define HEARTBEAT_TIMER_MS 1000 #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) From a7c2e0e6883f6bf9f80c33d1e078c501432bf102 Mon Sep 17 00:00:00 2001 From: huolibo Date: Fri, 28 Oct 2022 15:31:25 +0800 Subject: [PATCH 07/13] docs: what's updated on new version (2.0.41, 3.0.2) --- docs/en/14-reference/03-connector/04-java.mdx | 8 + docs/zh/08-connector/14-java.mdx | 238 ++++++++++-------- 2 files changed, 136 insertions(+), 110 deletions(-) diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index ecf632c624..abae520b43 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -878,7 +878,9 @@ The source code of the sample application is under `TDengine/examples/JDBC`: | taos-jdbcdriver version | major changes | | :---------------------: | :--------------------------------------------: | +| 3.0.1 - 3.0.2 | fix the resultSet data is parsed incorrectly sometimes. 3.0.1 is compiled on JDK 11, you are advised to use 3.0.2 in the JDK 8 environment | | 3.0.0 | Support for TDengine 3.0 | +| 2.0.41 | fix decode method of username and password in REST connection | | 2.0.39 - 2.0.40 | Add REST connection/request timeout parameters | | 2.0.38 | JDBC REST connections add bulk pull function | | 2.0.37 | Support json tags | @@ -910,6 +912,12 @@ The source code of the sample application is under `TDengine/examples/JDBC`: **Solution**: Use taos-jdbcdriver 2.* with your TDengine 2.* deployment. +5. java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer; ... taos-jdbcdriver-3.0.1.jar + +**Cause**:taos-jdbcdriver 3.0.1 is compiled on JDK 11. + +**Solution**: Use taos-jdbcdriver 3.0.2. + For additional troubleshooting, see [FAQ](../../../train-faq/faq). ## API Reference diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index d13e21ad54..74ab629101 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -68,38 +68,39 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对 ### 安装连接器 - + -目前 taos-jdbcdriver 已经发布到 [Sonatype Repository](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver) 仓库,且各大仓库都已同步。 + 目前 taos-jdbcdriver 已经发布到 [Sonatype Repository](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver) + 仓库,且各大仓库都已同步。 -- [sonatype](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver) -- [mvnrepository](https://mvnrepository.com/artifact/com.taosdata.jdbc/taos-jdbcdriver) -- [maven.aliyun](https://maven.aliyun.com/mvn/search) + - [sonatype](https://search.maven.org/artifact/com.taosdata.jdbc/taos-jdbcdriver) + - [mvnrepository](https://mvnrepository.com/artifact/com.taosdata.jdbc/taos-jdbcdriver) + - [maven.aliyun](https://maven.aliyun.com/mvn/search) -Maven 项目中,在 pom.xml 中添加以下依赖: + Maven 项目中,在 pom.xml 中添加以下依赖: -```xml-dtd - - com.taosdata.jdbc - taos-jdbcdriver - 3.0.0 - -``` + ```xml-dtd + + com.taosdata.jdbc + taos-jdbcdriver + 3.0.0 + + ``` - - + + -可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector + 可以通过下载 TDengine 的源码,自己编译最新版本的 Java connector -```shell -git clone https://github.com/taosdata/taos-connector-jdbc.git -cd taos-connector-jdbc -mvn clean install -Dmaven.test.skip=true -``` + ```shell + git clone https://github.com/taosdata/taos-connector-jdbc.git + cd taos-connector-jdbc + mvn clean install -Dmaven.test.skip=true + ``` -编译后,在 target 目录下会产生 taos-jdbcdriver-3.0.*-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。 + 编译后,在 target 目录下会产生 taos-jdbcdriver-3.0.*-dist.jar 的 jar 包,并自动将编译的 jar 文件放在本地的 Maven 仓库中。 - + ## 建立连接 @@ -110,116 +111,125 @@ TDengine 的 JDBC URL 规范格式为: 对于建立连接,原生连接与 REST 连接有细微不同。 - + -```java -Class.forName("com.taosdata.jdbc.TSDBDriver"); -String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/test?user=root&password=taosdata"; -Connection conn = DriverManager.getConnection(jdbcUrl); -``` + ```java + Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = "jdbc:TAOS://taosdemo.com:6030/test?user=root&password=taosdata"; + Connection conn = DriverManager.getConnection(jdbcUrl); + ``` -以上示例,使用了 JDBC 原生连接的 TSDBDriver,建立了到 hostname 为 taosdemo.com,端口为 6030(TDengine 的默认端口),数据库名为 test 的连接。这个 URL 中指定用户名(user)为 root,密码(password)为 taosdata。 + 以上示例,使用了 JDBC 原生连接的 TSDBDriver,建立了到 hostname 为 taosdemo.com,端口为 6030(TDengine 的默认端口),数据库名为 test 的连接。这个 URL + 中指定用户名(user)为 root,密码(password)为 taosdata。 -**注意**:使用 JDBC 原生连接,taos-jdbcdriver 需要依赖客户端驱动(Linux 下是 libtaos.so;Windows 下是 taos.dll;macOS 下是 libtaos.dylib)。 + **注意**:使用 JDBC 原生连接,taos-jdbcdriver 需要依赖客户端驱动(Linux 下是 libtaos.so;Windows 下是 taos.dll;macOS 下是 libtaos.dylib)。 -url 中的配置参数如下: + url 中的配置参数如下: -- user:登录 TDengine 用户名,默认值 'root'。 -- password:用户登录密码,默认值 'taosdata'。 -- cfgdir:客户端配置文件目录路径,Linux OS 上默认值 `/etc/taos`,Windows OS 上默认值 `C:/TDengine/cfg`。 -- charset:客户端使用的字符集,默认值为系统字符集。 -- locale:客户端语言环境,默认值系统当前 locale。 -- timezone:客户端使用的时区,默认值为系统当前时区。 -- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:true。开启批量拉取同时获取一批数据在查询数据量较大时批量拉取可以有效的提升查询性能。 -- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败将继续执行下面的 SQL。false:不再执行失败 SQL 后的任何语句。默认值为:false。 + - user:登录 TDengine 用户名,默认值 'root'。 + - password:用户登录密码,默认值 'taosdata'。 + - cfgdir:客户端配置文件目录路径,Linux OS 上默认值 `/etc/taos`,Windows OS 上默认值 `C:/TDengine/cfg`。 + - charset:客户端使用的字符集,默认值为系统字符集。 + - locale:客户端语言环境,默认值系统当前 locale。 + - timezone:客户端使用的时区,默认值为系统当前时区。 + - batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:true。开启批量拉取同时获取一批数据在查询数据量较大时批量拉取可以有效的提升查询性能。 + - batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败将继续执行下面的 SQL。false:不再执行失败 SQL + 后的任何语句。默认值为:false。 -JDBC 原生连接的使用请参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1955.html)。 + JDBC 原生连接的使用请参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1955.html)。 -**使用 TDengine 客户端驱动配置文件建立连接 ** + **使用 TDengine 客户端驱动配置文件建立连接 ** -当使用 JDBC 原生连接连接 TDengine 集群时,可以使用 TDengine 客户端驱动配置文件,在配置文件中指定集群的 firstEp、secondEp 等参数。如下所示: + 当使用 JDBC 原生连接连接 TDengine 集群时,可以使用 TDengine 客户端驱动配置文件,在配置文件中指定集群的 firstEp、secondEp 等参数。如下所示: -1. 在 Java 应用中不指定 hostname 和 port + 1. 在 Java 应用中不指定 hostname 和 port -```java -public Connection getConn() throws Exception{ - Class.forName("com.taosdata.jdbc.TSDBDriver"); - String jdbcUrl = "jdbc:TAOS://:/test?user=root&password=taosdata"; - Properties connProps = new Properties(); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - Connection conn = DriverManager.getConnection(jdbcUrl, connProps); - return conn; -} -``` + ```java + public Connection getConn() throws Exception{ + Class.forName("com.taosdata.jdbc.TSDBDriver"); + String jdbcUrl = "jdbc:TAOS://:/test?user=root&password=taosdata"; + Properties connProps = new Properties(); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + Connection conn = DriverManager.getConnection(jdbcUrl, connProps); + return conn; + } + ``` -2. 在配置文件中指定 firstEp 和 secondEp + 2. 在配置文件中指定 firstEp 和 secondEp -```shell -# first fully qualified domain name (FQDN) for TDengine system -firstEp cluster_node1:6030 + ```shell + # first fully qualified domain name (FQDN) for TDengine system + firstEp cluster_node1:6030 -# second fully qualified domain name (FQDN) for TDengine system, for cluster only -secondEp cluster_node2:6030 + # second fully qualified domain name (FQDN) for TDengine system, for cluster only + secondEp cluster_node2:6030 -# default system charset -# charset UTF-8 + # default system charset + # charset UTF-8 -# system locale -# locale en_US.UTF-8 -``` + # system locale + # locale en_US.UTF-8 + ``` -以上示例,jdbc 会使用客户端的配置文件,建立到 hostname 为 cluster_node1、端口为 6030、数据库名为 test 的连接。当集群中 firstEp 节点失效时,JDBC 会尝试使用 secondEp 连接集群。 + 以上示例,jdbc 会使用客户端的配置文件,建立到 hostname 为 cluster_node1、端口为 6030、数据库名为 test 的连接。当集群中 firstEp 节点失效时,JDBC 会尝试使用 secondEp + 连接集群。 -TDengine 中,只要保证 firstEp 和 secondEp 中一个节点有效,就可以正常建立到集群的连接。 + TDengine 中,只要保证 firstEp 和 secondEp 中一个节点有效,就可以正常建立到集群的连接。 -> **注意**:这里的配置文件指的是调用 JDBC Connector 的应用程序所在机器上的配置文件,Linux OS 上默认值 /etc/taos/taos.cfg ,Windows OS 上默认值 C://TDengine/cfg/taos.cfg。 + > **注意**:这里的配置文件指的是调用 JDBC Connector 的应用程序所在机器上的配置文件,Linux OS 上默认值 /etc/taos/taos.cfg ,Windows OS 上默认值 + C://TDengine/cfg/taos.cfg。 - - + + -```java -Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); -String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosdata"; -Connection conn = DriverManager.getConnection(jdbcUrl); -``` + ```java + Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); + String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosdata"; + Connection conn = DriverManager.getConnection(jdbcUrl); + ``` -以上示例,使用了 JDBC REST 连接的 RestfulDriver,建立了到 hostname 为 taosdemo.com,端口为 6041,数据库名为 test 的连接。这个 URL 中指定用户名(user)为 root,密码(password)为 taosdata。 + 以上示例,使用了 JDBC REST 连接的 RestfulDriver,建立了到 hostname 为 taosdemo.com,端口为 6041,数据库名为 test 的连接。这个 URL 中指定用户名(user)为 + root,密码(password)为 taosdata。 -使用 JDBC REST 连接,不需要依赖客户端驱动。与 JDBC 原生连接相比,仅需要: + 使用 JDBC REST 连接,不需要依赖客户端驱动。与 JDBC 原生连接相比,仅需要: -1. driverClass 指定为“com.taosdata.jdbc.rs.RestfulDriver”; -2. jdbcUrl 以“jdbc:TAOS-RS://”开头; -3. 使用 6041 作为连接端口。 + 1. driverClass 指定为“com.taosdata.jdbc.rs.RestfulDriver”; + 2. jdbcUrl 以“jdbc:TAOS-RS://”开头; + 3. 使用 6041 作为连接端口。 -url 中的配置参数如下: + url 中的配置参数如下: -- user:登录 TDengine 用户名,默认值 'root'。 -- password:用户登录密码,默认值 'taosdata'。 -- batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。 -- charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。 -- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。 -- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。 -- httpSocketTimeout: socket 超时时间,单位 ms,默认值为 5000。仅在 batchfetch 设置为 false 时生效。 -- messageWaitTimeout: 消息超时时间, 单位 ms, 默认值为 3000。 仅在 batchfetch 设置为 true 时生效。 -- useSSL: 连接中是否使用 SSL。 + - user:登录 TDengine 用户名,默认值 'root'。 + - password:用户登录密码,默认值 'taosdata'。 + - batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST + 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。 + - charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。 + - batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL + 后的任何语句。默认值为:false。 + - httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 5000。 + - httpSocketTimeout: socket 超时时间,单位 ms,默认值为 5000。仅在 batchfetch 设置为 false 时生效。 + - messageWaitTimeout: 消息超时时间, 单位 ms, 默认值为 3000。 仅在 batchfetch 设置为 true 时生效。 + - useSSL: 连接中是否使用 SSL。 -**注意**:部分配置项(比如:locale、timezone)在 REST 连接中不生效。 + **注意**:部分配置项(比如:locale、timezone)在 REST 连接中不生效。 -:::note + :::note -- 与原生连接方式不同,REST 接口是无状态的。在使用 JDBC REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。例如: + - 与原生连接方式不同,REST 接口是无状态的。在使用 JDBC REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。例如: -```sql -INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6); -``` + ```sql + INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6); + ``` -- 如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6); + - 如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用/rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 + jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:insert into t1 using weather(ts, temperature) + tags('California.SanFrancisco') values(now, 24.6); -::: + ::: - + ### 指定 URL 和 Properties 获取连接 @@ -271,7 +281,7 @@ properties 中的配置参数如下: - TSDBDriver.HTTP_SOCKET_TIMEOUT: socket 超时时间,单位 ms,默认值为 5000。仅在 REST 连接且 batchfetch 设置为 false 时生效。 - TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: 消息超时时间, 单位 ms, 默认值为 3000。 仅在 REST 连接且 batchfetch 设置为 true 时生效。 - TSDBDriver.PROPERTY_KEY_USE_SSL: 连接中是否使用 SSL。仅在 REST 连接时生效。 - 此外对 JDBC 原生连接,通过指定 URL 和 Properties 还可以指定其他参数,比如日志级别、SQL 长度等。更多详细配置请参考[客户端配置](/reference/config/#仅客户端适用)。 +此外对 JDBC 原生连接,通过指定 URL 和 Properties 还可以指定其他参数,比如日志级别、SQL 长度等。更多详细配置请参考[客户端配置](/reference/config/#仅客户端适用)。 ### 配置参数的优先级 @@ -880,7 +890,9 @@ public static void main(String[] args) throws Exception { | taos-jdbcdriver 版本 | 主要变化 | | :------------------: | :----------------------------: | +| 3.0.1 - 3.0.2 | 修复一些情况下结果集数据解析错误的问题。3.0.1 在 JDK 11 环境编译,JDK 8 环境下建议使用 3.0.2 版本 | | 3.0.0 | 支持 TDengine 3.0 | +| 2.0.41 | 修正 REST 连接中用户名和密码转码方式 | | 2.0.39 - 2.0.40 | 增加 REST 连接/请求 超时设置 | | 2.0.38 | JDBC REST 连接增加批量拉取功能 | | 2.0.37 | 增加对 json tag 支持 | @@ -890,27 +902,33 @@ public static void main(String[] args) throws Exception { 1. 使用 Statement 的 `addBatch()` 和 `executeBatch()` 来执行“批量写入/更新”,为什么没有带来性能上的提升? - **原因**:TDengine 的 JDBC 实现中,通过 `addBatch` 方法提交的 SQL 语句,会按照添加的顺序,依次执行,这种方式没有减少与服务端的交互次数,不会带来性能上的提升。 +**原因**:TDengine 的 JDBC 实现中,通过 `addBatch` 方法提交的 SQL 语句,会按照添加的顺序,依次执行,这种方式没有减少与服务端的交互次数,不会带来性能上的提升。 - **解决方法**:1. 在一条 insert 语句中拼接多个 values 值;2. 使用多线程的方式并发插入;3. 使用参数绑定的写入方式 +**解决方法**:1. 在一条 insert 语句中拼接多个 values 值;2. 使用多线程的方式并发插入;3. 使用参数绑定的写入方式 2. java.lang.UnsatisfiedLinkError: no taos in java.library.path - **原因**:程序没有找到依赖的本地函数库 taos。 +**原因**:程序没有找到依赖的本地函数库 taos。 - **解决方法**:Windows 下可以将 C:\TDengine\driver\taos.dll 拷贝到 C:\Windows\System32\ 目录下,Linux 下将建立如下软链 `ln -s /usr/local/taos/driver/libtaos.so.x.x.x.x /usr/lib/libtaos.so` 即可,macOS 下需要建立软链 `ln -s /usr/local/lib/libtaos.dylib`。 +**解决方法**:Windows 下可以将 C:\TDengine\driver\taos.dll 拷贝到 C:\Windows\System32\ 目录下,Linux 下将建立如下软链 `ln -s /usr/local/taos/driver/libtaos.so.x.x.x.x /usr/lib/libtaos.so` 即可,macOS 下需要建立软链 `ln -s /usr/local/lib/libtaos.dylib`。 3. java.lang.UnsatisfiedLinkError: taos.dll Can't load AMD 64 bit on a IA 32-bit platform - **原因**:目前 TDengine 只支持 64 位 JDK。 +**原因**:目前 TDengine 只支持 64 位 JDK。 - **解决方法**:重新安装 64 位 JDK。 +**解决方法**:重新安装 64 位 JDK。 4. java.lang.NoSuchMethodError: setByteArray - **原因**:taos-jdbcdriver 3.* 版本仅支持 TDengine 3.0 及以上版本。 +**原因**:taos-jdbcdriver 3.* 版本仅支持 TDengine 3.0 及以上版本。 - **解决方法**: 使用 taos-jdbcdriver 2.* 版本连接 TDengine 2.* 版本。 +**解决方法**: 使用 taos-jdbcdriver 2.* 版本连接 TDengine 2.* 版本。 + +5. java.lang.NoSuchMethodError: java.nio.ByteBuffer.position(I)Ljava/nio/ByteBuffer; ... taos-jdbcdriver-3.0.1.jar + +**原因**:taos-jdbcdriver 3.0.1 版本需要在 JDK 11+ 环境使用。 + +**解决方法**: 更换 taos-jdbcdriver 3.0.2 版本。 其它问题请参考 [FAQ](../../../train-faq/faq) From d625cb56b7a5936dfc0d7459239aaf33f2a30dce Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 28 Oct 2022 15:47:55 +0800 Subject: [PATCH 08/13] docs: fix few typos --- docs/en/14-reference/03-connector/04-java.mdx | 8 ++++---- docs/zh/08-connector/14-java.mdx | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/en/14-reference/03-connector/04-java.mdx b/docs/en/14-reference/03-connector/04-java.mdx index abae520b43..7b3440ebac 100644 --- a/docs/en/14-reference/03-connector/04-java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -201,7 +201,7 @@ The configuration parameters in the URL are as follows: - httpConnectTimeout: REST connection timeout in milliseconds, the default value is 5000 ms. - httpSocketTimeout: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when batchfetch is false. - messageWaitTimeout: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when batchfetch is true. -- useSSL: connecting Securely Using SSL. true: using SSL conneciton, false: not using SSL connection. +- useSSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection. **Note**: Some configuration items (e.g., locale, timezone) do not work in the REST connection. @@ -268,7 +268,7 @@ The configuration parameters in properties are as follows. - TSDBDriver.HTTP_CONNECT_TIMEOUT: REST connection timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection. - TSDBDriver.HTTP_SOCKET_TIMEOUT: socket timeout in milliseconds, the default value is 5000 ms. It only takes effect when using JDBC REST connection and batchfetch is false. - TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT: message transmission timeout in milliseconds, the default value is 3000 ms. It only takes effect when using JDBC REST connection and batchfetch is true. -- TSDBDriver.PROPERTY_KEY_USE_SSL: connecting Securely Using SSL. true: using SSL conneciton, false: not using SSL connection. It only takes effect when using JDBC REST connection. +- TSDBDriver.PROPERTY_KEY_USE_SSL: connecting Securely Using SSL. true: using SSL connection, false: not using SSL connection. It only takes effect when using JDBC REST connection. For JDBC native connections, you can specify other parameters, such as log level, SQL length, etc., by specifying URL and Properties. For more detailed configuration, please refer to [Client Configuration](/reference/config/#Client-Only). ### Priority of configuration parameters @@ -824,7 +824,7 @@ Example usage is as follows. //query or insert // ... - connection.close(); // put back to conneciton pool + connection.close(); // put back to connection pool } ``` @@ -856,7 +856,7 @@ public static void main(String[] args) throws Exception { //query or insert // ... - connection.close(); // put back to conneciton pool + connection.close(); // put back to connection pool } ``` diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index 74ab629101..1ee59d2df4 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -836,7 +836,7 @@ public abstract class ConsumerLoop { //query or insert // ... - connection.close(); // put back to conneciton pool + connection.close(); // put back to connection pool } ``` @@ -868,7 +868,7 @@ public static void main(String[] args) throws Exception { //query or insert // ... - connection.close(); // put back to conneciton pool + connection.close(); // put back to connection pool } ``` From 030195e72ac35b105545abdf4399fc6a78389405 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 16:42:58 +0800 Subject: [PATCH 09/13] support macos --- source/libs/transport/src/transSvr.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 6819068b64..025eab3f3e 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -655,7 +655,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { uv_tcp_init(pObj->loop, cli); if (uv_accept(stream, (uv_stream_t*)cli) == 0) { -#ifdef WINDOWS +#if defined(WINDOWS) || defined(DARWIN) if (pObj->numOfWorkerReady < pObj->numOfThreads) { tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); @@ -779,7 +779,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { return false; } -#ifdef WINDOWS +#if defined(WINDOWS) || defined(DARWIN) uv_pipe_init(pThrd->loop, pThrd->pipe, 1); #else uv_pipe_init(pThrd->loop, pThrd->pipe, 1); @@ -799,7 +799,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->conn); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); -#ifdef WINDOWS +#if defined(WINDOWS) || defined(DARWIN) uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); #else uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); @@ -976,17 +976,18 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_loop_init(srv->loop); char pipeName[PATH_MAX]; -#ifdef WINDOWS +#if defined(WINDOWS) || defined(DARWIN) int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); if (ret != 0) { tError("failed to init pipe, errmsg: %s", uv_err_name(ret)); goto End; } - +#if defined(WINDOWS) snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); - // char pipeName[PATH_MAX] = {0}; - // snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), - // taosGetSelfPthreadId()); +#elif defined(DARWIN) + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), + taosGetSelfPthreadId()); +#endif ret = uv_pipe_bind(&srv->pipeListen, pipeName); if (ret != 0) { @@ -1059,8 +1060,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, goto End; } } - #endif + if (false == taosValidIpAndPort(srv->ip, srv->port)) { terrno = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); From 6c19e886df8b44f97cdc95f13adb3f270cad3388 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Oct 2022 17:58:59 +0800 Subject: [PATCH 10/13] fix(query): move the init function for different reader. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index f83755fc4f..5e5e287cf9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3601,16 +3601,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl if (code != TSDB_CODE_SUCCESS) { return code; } - - code = doOpenReaderImpl(pNextReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doOpenReaderImpl(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } } } @@ -3741,9 +3731,14 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->innerReader[0] != NULL && pReader->step == 0) { bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); - resetDataBlockScanInfo(pReader->innerReader[0]->status.pTableMap, pReader->innerReader[0]->window.ekey); - pReader->step = EXTERNAL_ROWS_PREV; + // prepare for the main scan + int32_t code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + pReader->step = EXTERNAL_ROWS_PREV; if (ret) { return ret; } @@ -3759,7 +3754,12 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) { - resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey); + // prepare for the next row scan + int32_t code = doOpenReaderImpl(pReader->innerReader[1]); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + bool ret1 = doTsdbNextDataBlock(pReader->innerReader[1]); pReader->step = EXTERNAL_ROWS_NEXT; if (ret1) { From df786f1516cfb3692694e5cf4e2027bc49be06ff Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 19:27:06 +0800 Subject: [PATCH 11/13] use sys mem --- source/libs/transport/src/transSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 025eab3f3e..b829251ccd 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1037,7 +1037,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pThreadObj[i] = thrd; uv_os_sock_t fds[2]; - if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { goto End; } From bae6d824c2576e9fcc84ebade3992b6f93adb8f8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Oct 2022 12:54:39 +0800 Subject: [PATCH 12/13] fix(query): init blockscanInfo correctly. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 17 ++++++++++------- tests/system-test/2-query/interp.py | 2 +- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5e5e287cf9..9add18a4dc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3478,6 +3478,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); +// resetDataBlockScanInfo(pReader->status.pTableMap, pReader->window.skey); // no data in files, let's try buffer in memory if (pReader->status.fileIter.numOfFiles == 0) { @@ -3731,13 +3732,6 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->innerReader[0] != NULL && pReader->step == 0) { bool ret = doTsdbNextDataBlock(pReader->innerReader[0]); - - // prepare for the main scan - int32_t code = doOpenReaderImpl(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - pReader->step = EXTERNAL_ROWS_PREV; if (ret) { return ret; @@ -3745,6 +3739,14 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { } if (pReader->step == EXTERNAL_ROWS_PREV) { + // prepare for the main scan + int32_t code = doOpenReaderImpl(pReader); + resetDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey); + + if (code != TSDB_CODE_SUCCESS) { + return code; + } + pReader->step = EXTERNAL_ROWS_MAIN; } @@ -3756,6 +3758,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) { // prepare for the next row scan int32_t code = doOpenReaderImpl(pReader->innerReader[1]); + resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/tests/system-test/2-query/interp.py b/tests/system-test/2-query/interp.py index bee20710b5..71501d5967 100644 --- a/tests/system-test/2-query/interp.py +++ b/tests/system-test/2-query/interp.py @@ -11,7 +11,7 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) def run(self): dbname = "db" From 21364c4041ca2b02246fa71d8f0babf4d01a7dac Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 29 Oct 2022 18:34:42 +0800 Subject: [PATCH 13/13] fix: add api ut cases --- source/libs/catalog/src/catalog.c | 2 + source/libs/catalog/test/catalogTests.cpp | 94 +++++++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 8f621545c9..b6a22b5fa7 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1114,6 +1114,7 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup, exists)); } +#if 0 int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, SMetaData* pRsp) { CTG_API_ENTER(); @@ -1176,6 +1177,7 @@ _return: CTG_API_LEAVE(code); } +#endif int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) { diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index de34868c55..0bdd9841ab 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -63,6 +63,7 @@ enum { CTGT_RSP_QNODELIST, CTGT_RSP_UDF, CTGT_RSP_SVRVER, + CTGT_RSP_DNODElIST, CTGT_RSP_TBMETA_NOT_EXIST, }; @@ -702,6 +703,30 @@ void ctgTestRspSvrVer(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRs pRsp->pCont = pReq; } +void ctgTestRspDndeList(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + rpcFreeCont(pMsg->pCont); + + SDnodeListRsp dRsp = {0}; + dRsp.dnodeList = taosArrayInit(1, sizeof(SEpSet)); + SEpSet epSet = {0}; + epSet.numOfEps = 1; + tstrncpy(epSet.eps[0].fqdn, "localhost", TSDB_FQDN_LEN); + epSet.eps[0].port = 6030; + + (void)taosArrayPush(dRsp.dnodeList, &epSet); + + int32_t rspLen = tSerializeSDnodeListRsp(NULL, 0, &dRsp); + void *pReq = rpcMallocCont(rspLen); + tSerializeSDnodeListRsp(pReq, rspLen, &dRsp); + + pRsp->code = 0; + pRsp->contLen = rspLen; + pRsp->pCont = pReq; + + tFreeSDnodeListRsp(&dRsp); +} + + void ctgTestRspAuto(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { switch (pMsg->msgType) { @@ -727,6 +752,9 @@ void ctgTestRspAuto(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) case TDMT_MND_SERVER_VERSION: ctgTestRspSvrVer(shandle, pEpSet, pMsg, pRsp); break; + case TDMT_MND_DNODE_LIST: + ctgTestRspDndeList(shandle, pEpSet, pMsg, pRsp); + break; default: break; } @@ -779,6 +807,9 @@ void ctgTestRspByIdx(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp case CTGT_RSP_SVRVER: ctgTestRspSvrVer(shandle, pEpSet, pMsg, pRsp); break; + case CTGT_RSP_DNODElIST: + ctgTestRspDndeList(shandle, pEpSet, pMsg, pRsp); + break; default: ctgTestRspAuto(shandle, pEpSet, pMsg, pRsp); break; @@ -2948,6 +2979,69 @@ TEST(apiTest, catalogGetServerVersion_test) { catalogDestroy(); } +TEST(apiTest, catalogUpdateTableIndex_test) { + struct SCatalog *pCtg = NULL; + SRequestConnInfo connInfo = {0}; + SRequestConnInfo *mockPointer = (SRequestConnInfo *)&connInfo; + + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_SVRVER; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + STableIndexRsp rsp = {0}; + strcpy(rsp.dbFName, ctgTestDbname); + strcpy(rsp.tbName, ctgTestSTablename); + rsp.suid = ctgTestSuid; + rsp.version = 1; + code = catalogUpdateTableIndex(pCtg, &rsp); + ASSERT_EQ(code, 0); + + catalogDestroy(); +} + + +TEST(apiTest, catalogGetDnodeList_test) { + struct SCatalog *pCtg = NULL; + SRequestConnInfo connInfo = {0}; + SRequestConnInfo *mockPointer = (SRequestConnInfo *)&connInfo; + + ctgTestInitLogFile(); + + memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc)); + ctgTestRspIdx = 0; + ctgTestRspFunc[0] = CTGT_RSP_DNODElIST; + + ctgTestSetRspByIdx(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SArray* pList = NULL; + code = catalogGetDnodeList(pCtg, mockPointer, &pList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize(pList), 1); + + taosArrayDestroy(pList); + + catalogDestroy(); +} int main(int argc, char **argv) {