diff --git a/contrib/test/craft/CMakeLists.txt b/contrib/test/craft/CMakeLists.txt index e0f6ae64bd..d4d5a6365f 100644 --- a/contrib/test/craft/CMakeLists.txt +++ b/contrib/test/craft/CMakeLists.txt @@ -1,2 +1,2 @@ add_executable(simulate_vnode "simulate_vnode.c") -target_link_libraries(simulate_vnode PUBLIC craft lz4 uv_a) \ No newline at end of file +target_link_libraries(simulate_vnode craft lz4 uv_a) \ No newline at end of file diff --git a/contrib/test/traft/single_node/CMakeLists.txt b/contrib/test/traft/single_node/CMakeLists.txt index 666ce271b8..84b65978b9 100644 --- a/contrib/test/traft/single_node/CMakeLists.txt +++ b/contrib/test/traft/single_node/CMakeLists.txt @@ -3,4 +3,4 @@ target_sources(singleNode PRIVATE "singleNode.c" ) -target_link_libraries(singleNode PUBLIC traft lz4 uv_a) +target_link_libraries(singleNode traft lz4 uv_a) diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt index 71a9f9f4c5..51e1e996dc 100644 --- a/example/CMakeLists.txt +++ b/example/CMakeLists.txt @@ -2,11 +2,7 @@ aux_source_directory(src TMQ_DEMO_SRC) add_executable(tmq ${TMQ_DEMO_SRC}) target_link_libraries( - tmq - PUBLIC taos - #PUBLIC util - #PUBLIC common - #PUBLIC os + tmq taos ) target_include_directories( tmq diff --git a/include/common/tglobal.h b/include/common/tglobal.h index e09f6d11bd..2261170e63 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -52,6 +52,13 @@ extern bool tsEnableSlaveQuery; extern bool tsPrintAuth; extern int64_t tsTickPerDay[3]; +// monitor +extern bool tsEnableMonitor; +extern int32_t tsMonitorInterval; +extern char tsMonitorFqdn[]; +extern uint16_t tsMonitorPort; +extern int32_t tsMonitorMaxLogs; + // query buffer management extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node diff --git a/include/common/ttime.h b/include/common/ttime.h index a4b2aa6673..b71426e312 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -52,6 +52,8 @@ void deltaToUtcInitOnce(); int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); +void taosFormatUtcTime(char *buf, int32_t bufLen, int64_t time, int32_t precision); + #ifdef __cplusplus } #endif diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 2680bb83ed..d295e772e8 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -16,6 +16,8 @@ #ifndef _TD_MND_H_ #define _TD_MND_H_ +#include "monitor.h" + #ifdef __cplusplus extern "C" { #endif @@ -30,20 +32,6 @@ typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef int32_t (*PutReqToMReadQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectRspFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef struct SMnodeLoad { - int64_t numOfDnode; - int64_t numOfMnode; - int64_t numOfVgroup; - int64_t numOfDatabase; - int64_t numOfSuperTable; - int64_t numOfChildTable; - int64_t numOfNormalTable; - int64_t numOfColumn; - int64_t totalPoints; - int64_t totalStorage; - int64_t compStorage; -} SMnodeLoad; - typedef struct { int32_t dnodeId; int64_t clusterId; @@ -92,13 +80,16 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption); void mndDestroy(const char *path); /** - * @brief Get mnode statistics info. + * @brief Get mnode monitor info. * * @param pMnode The mnode object. - * @param pLoad Statistics of the mnode. + * @param pClusterInfo + * @param pVgroupInfo + * @param pGrantInfo * @return int32_t 0 for success, -1 for failure. */ -int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); +int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, + SMonGrantInfo *pGrantInfo); /** * @brief Get user authentication info. diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h new file mode 100644 index 0000000000..4b7c6a9bc9 --- /dev/null +++ b/include/libs/monitor/monitor.h @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MONITOR_H_ +#define _TD_MONITOR_H_ + +#include "tarray.h" +#include "tdef.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + int32_t dnode_id; + char dnode_ep[TSDB_EP_LEN]; +} SMonBasicInfo; + +typedef struct { + int32_t dnode_id; + char dnode_ep[TSDB_EP_LEN]; + char status[8]; +} SMonDnodeDesc; + +typedef struct { + int32_t mnode_id; + char mnode_ep[TSDB_EP_LEN]; + char role[8]; +} SMonMnodeDesc; + +typedef struct { + char first_ep[TSDB_EP_LEN]; + int32_t first_ep_dnode_id; + char version[12]; + float master_uptime; // day + int32_t monitor_interval; // sec + int32_t vgroups_total; + int32_t vgroups_alive; + int32_t vnodes_total; + int32_t vnodes_alive; + int32_t connections_total; + SArray *dnodes; // array of SMonDnodeDesc + SArray *mnodes; // array of SMonMnodeDesc +} SMonClusterInfo; + +typedef struct { + int32_t dnode_id; + int8_t vnode_online; + char vnode_role[8]; +} SMonVnodeDesc; + +typedef struct { + int32_t vgroup_id; + SMonVnodeDesc vnodes[TSDB_MAX_REPLICA]; +} SMonVgroupDesc; + +typedef struct { + char database_name[TSDB_DB_NAME_LEN]; + int32_t tables_num; + int8_t status; + SArray *vgroups; // array of SMonVgroupDesc +} SMonVgroupInfo; + +typedef struct { + int32_t expire_time; + int32_t timeseries_used; + int32_t timeseries_total; +} SMonGrantInfo; + +typedef struct { + float uptime; // day + float cpu_engine; + float cpu_system; + float cpu_cores; + float mem_engine; // MB + float mem_system; // MB + float mem_total; // MB + float disk_engine; // GB + float disk_used; // GB + float disk_total; // GB + float net_in; // Kb/s + float net_out; // Kb/s + float io_read; // Mb/s + float io_write; // Mb/s + float io_read_disk; // Mb/s + float io_write_disk; // Mb/s + int32_t req_select; + float req_select_rate; + int32_t req_insert; + int32_t req_insert_success; + float req_insert_rate; + int32_t req_insert_batch; + int32_t req_insert_batch_success; + float req_insert_batch_rate; + int32_t errors; + int32_t vnodes_num; + int32_t masters; + int32_t has_mnode; +} SMonDnodeInfo; + +typedef struct { + char name[TSDB_FILENAME_LEN]; + int32_t level; + SDiskSize size; +} SMonDiskDesc; + +typedef struct { + SArray *disks; // array of SMonDiskDesc +} SMonDiskInfo; + +typedef struct { + int64_t ts; + int8_t level; + char content[1024]; +} SMonLogItem; + +typedef struct SMonInfo SMonInfo; + +typedef struct { + const char *server; + uint16_t port; + int32_t maxLogs; +} SMonCfg; + +int32_t monInit(const SMonCfg *pCfg); +void monCleanup(); +void monAddLogItem(SMonLogItem *pItem); + +SMonInfo *monCreateMonitorInfo(); +void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo); +void monSetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo); +void monSetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo); +void monSetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo); +void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo); +void monSetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo); +void monSendReport(SMonInfo *pMonitor); +void monCleanupMonitorInfo(SMonInfo *pMonitor); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MONITOR_H_*/ diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a619e66622..cd04783dbc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -34,7 +34,9 @@ typedef enum { TAOS_SYNC_STATE_FOLLOWER = 0, TAOS_SYNC_STATE_CANDIDATE = 1, TAOS_SYNC_STATE_LEADER = 2, -} ESyncState; +} ESyncRole; + +typedef ESyncRole ESyncState; typedef struct SSyncBuffer { void* data; @@ -69,15 +71,15 @@ typedef struct SSyncFSM { // when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // when log entry is updated by a new one, FpRollBackCb is called // user can do something to roll back. for example, delete data from tsdb, or just ignore it - void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // user should implement this function, use "data" to take snapshot into "snapshot" int32_t (*FpTakeSnapshot)(SSnapshot* snapshot); @@ -93,10 +95,10 @@ typedef struct SSyncLogStore { void* data; // append one log entry - int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); + int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf); // get one log entry, user need to free pBuf->data - int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf); + int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf); // update log store commit index with "index" int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); @@ -135,7 +137,9 @@ typedef struct SSyncInfo { SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; SSyncFSM* pFsm; - int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + + void* rpcClient; + int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); } SSyncInfo; @@ -149,8 +153,8 @@ int64_t syncStart(const SSyncInfo* pSyncInfo); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); -int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); +// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 35ac032a8a..bf9d7662e6 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -42,7 +42,6 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage); bool taosGetTotalSysMemoryKB(uint64_t *kb); bool taosGetProcMemory(float *memoryUsedMB); // bool taosGetSysMemory(float *memoryUsedMB); // -void taosGetDisk(); int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); bool taosReadProcIO(int64_t *rchars, int64_t *wchars); bool taosGetProcIO(float *readKB, float *writeKB); diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index 04e5a36e86..1e4ad95504 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src COMMON_SRC) -add_library(common ${COMMON_SRC}) +add_library(common STATIC ${COMMON_SRC}) target_include_directories( common PUBLIC "${CMAKE_SOURCE_DIR}/include/common" diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d66895888b..5998b7c9ce 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -46,6 +46,13 @@ int32_t tsMaxBinaryDisplayWidth = 30; bool tsEnableSlaveQuery = 1; bool tsPrintAuth = 0; +// monitor +bool tsEnableMonitor = 1; +int32_t tsMonitorInterval = 5; +char tsMonitorFqdn[TSDB_FQDN_LEN] = {0}; +uint16_t tsMonitorPort = 6043; +int32_t tsMonitorMaxLogs = 100; + /* * denote if the server needs to compress response message at the application layer to client, including query rsp, * metricmeta rsp, and multi-meter query rsp message body. The client compress the submit message to server. @@ -314,6 +321,13 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; + + if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; + if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "monitorPort", tsMonitorPort, 1, 65056, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "monitorMaxLogs", tsMonitorMaxLogs, 1, 1000000, 0) != 0) return -1; + return 0; } @@ -345,7 +359,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) { } static void taosSetClientCfg(SConfig *pCfg) { - tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_EP_LEN); + tstrncpy(tsLocalFqdn, cfgGetItem(pCfg, "fqdn")->str, TSDB_FQDN_LEN); tsServerPort = (uint16_t)cfgGetItem(pCfg, "serverPort")->i32; snprintf(tsLocalEp, sizeof(tsLocalEp), "%s:%u", tsLocalFqdn, tsServerPort); @@ -425,6 +439,12 @@ static void taosSetServerCfg(SConfig *pCfg) { tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; + tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; + tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; + tstrncpy(tsMonitorFqdn, cfgGetItem(pCfg, "monitorFqdn")->str, TSDB_FQDN_LEN); + tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32; + tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32; + if (tsQueryBufferSize >= 0) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } diff --git a/source/common/src/tname.c b/source/common/src/tname.c index e061862856..fb77417cac 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -22,6 +22,7 @@ bool tscValidateTableNameLength(size_t len) { return len < TSDB_TABLE_NAME_LEN; } +#if 0 // TODO refactor SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFilters) { if (numOfFilters == 0 || src == NULL) { @@ -46,7 +47,7 @@ SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFil return pFilter; } - +#endif #if 0 int64_t taosGetIntervalStartTimestamp(int64_t startTime, int64_t slidingTime, int64_t intervalTime, char timeUnit, int16_t precision) { if (slidingTime == 0) { diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 460c4a6fc0..7a318c2eb8 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -627,3 +627,50 @@ const char* fmtts(int64_t ts) { return buf; } + +void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) { + char ts[40] = {0}; + struct tm* ptm; + + int32_t fractionLen; + char* format = NULL; + time_t quot = 0; + long mod = 0; + + switch (precision) { + case TSDB_TIME_PRECISION_MILLI: { + quot = t / 1000; + fractionLen = 5; + format = ".%03" PRId64; + mod = t % 1000; + break; + } + + case TSDB_TIME_PRECISION_MICRO: { + quot = t / 1000000; + fractionLen = 8; + format = ".%06" PRId64; + mod = t % 1000000; + break; + } + + case TSDB_TIME_PRECISION_NANO: { + quot = t / 1000000000; + fractionLen = 11; + format = ".%09" PRId64; + mod = t % 1000000000; + break; + } + + default: + fractionLen = 0; + assert(false); + } + + ptm = localtime("); + int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm); + length += snprintf(ts + length, fractionLen, format, mod); + length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm); + + tstrncpy(buf, ts, bufLen); +} \ No newline at end of file diff --git a/source/dnode/bnode/CMakeLists.txt b/source/dnode/bnode/CMakeLists.txt index a284437450..f51969f6bf 100644 --- a/source/dnode/bnode/CMakeLists.txt +++ b/source/dnode/bnode/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src BNODE_SRC) -add_library(bnode ${BNODE_SRC}) +add_library(bnode STATIC ${BNODE_SRC}) target_include_directories( bnode PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/bnode" diff --git a/source/dnode/mgmt/daemon/CMakeLists.txt b/source/dnode/mgmt/daemon/CMakeLists.txt index e07c15c95a..3238bbf3f0 100644 --- a/source/dnode/mgmt/daemon/CMakeLists.txt +++ b/source/dnode/mgmt/daemon/CMakeLists.txt @@ -6,4 +6,4 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -target_link_libraries(taosd dnode util os) +target_link_libraries(taosd dnode) diff --git a/source/dnode/mgmt/impl/CMakeLists.txt b/source/dnode/mgmt/impl/CMakeLists.txt index 171f9649fb..c99a405527 100644 --- a/source/dnode/mgmt/impl/CMakeLists.txt +++ b/source/dnode/mgmt/impl/CMakeLists.txt @@ -1,17 +1,7 @@ aux_source_directory(src DNODE_SRC) add_library(dnode STATIC ${DNODE_SRC}) target_link_libraries( - dnode - PUBLIC cjson - PUBLIC mnode - PUBLIC vnode - PUBLIC qnode - PUBLIC snode - PUBLIC bnode - PUBLIC wal - PUBLIC sync - PUBLIC taos - PUBLIC tfs + dnode cjson mnode vnode qnode snode bnode wal sync taos tfs monitor ) target_include_directories( dnode diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 417bc1e041..9fabe40186 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -36,6 +36,7 @@ extern "C" { #include "ttime.h" #include "tworker.h" #include "tglobal.h" +#include "monitor.h" #include "dnode.h" diff --git a/source/dnode/mgmt/impl/inc/dndMnode.h b/source/dnode/mgmt/impl/inc/dndMnode.h index dafbae10ad..0f03bb3832 100644 --- a/source/dnode/mgmt/impl/inc/dndMnode.h +++ b/source/dnode/mgmt/impl/inc/dndMnode.h @@ -32,6 +32,9 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, + SMonGrantInfo *pGrantInfo); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/src/dndEnv.c b/source/dnode/mgmt/impl/src/dndEnv.c index 9467b56e6b..2539443b41 100644 --- a/source/dnode/mgmt/impl/src/dndEnv.c +++ b/source/dnode/mgmt/impl/src/dndEnv.c @@ -21,6 +21,7 @@ #include "dndSnode.h" #include "dndTransport.h" #include "dndVnodes.h" +#include "monitor.h" #include "sync.h" #include "tfs.h" #include "wal.h" @@ -140,7 +141,7 @@ static int32_t dndInitDir(SDnode *pDnode, SDnodeObjCfg *pCfg) { return 0; } -static void dndCloseImp(SDnode *pDnode) { +static void dndCloseDir(SDnode *pDnode) { tfree(pDnode->dir.mnode); tfree(pDnode->dir.vnodes); tfree(pDnode->dir.dnode); @@ -260,7 +261,7 @@ void dndClose(SDnode *pDnode) { dndCleanupMgmt(pDnode); tfsClose(pDnode->pTfs); - dndCloseImp(pDnode); + dndCloseDir(pDnode); free(pDnode); dInfo("dnode object is closed, data:%p", pDnode); } @@ -289,11 +290,7 @@ int32_t dndInit() { } SVnodeOpt vnodeOpt = { - .sver = tsVersion, - .nthreads = tsNumOfCommitThreads, - .putReqToVQueryQFp = dndPutReqToVQueryQ, - .sendReqToDnodeFp = dndSendReqToDnode - }; + .nthreads = tsNumOfCommitThreads, .putReqToVQueryQFp = dndPutReqToVQueryQ, .sendReqToDnodeFp = dndSendReqToDnode}; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode since %s", terrstr()); @@ -301,6 +298,13 @@ int32_t dndInit() { return -1; } + SMonCfg monCfg = {.maxLogs = tsMonitorMaxLogs, .port = tsMonitorPort, .server = tsMonitorFqdn}; + if (monInit(&monCfg) != 0) { + dError("failed to init monitor since %s", terrstr()); + dndCleanup(); + return -1; + } + dInfo("dnode env is initialized"); return 0; } @@ -314,19 +318,8 @@ void dndCleanup() { walCleanUp(); vnodeCleanup(); rpcCleanup(); + monCleanup(); taosStopCacheRefreshWorker(); dInfo("dnode env is cleaned up"); } - -// OTHER FUNCTIONS =================================== -void taosGetDisk() { -#if 0 - const double unit = 1024 * 1024 * 1024; - - SDiskSize diskSize = tfsGetSize(pTfs); - - tfsUpdateSize(&fsMeta); - -#endif -} \ No newline at end of file diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 8f18222ab6..4bf8b82043 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -22,6 +22,7 @@ #include "dndTransport.h" #include "dndVnodes.h" #include "dndWorker.h" +#include "monitor.h" static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); @@ -473,19 +474,62 @@ void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { rpcSendResponse(&rpcRsp); } +void dndGetBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { + pInfo->dnode_id = dndGetDnodeId(pDnode); + tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN); +} + +static void dndSendMonitorReport(SDnode *pDnode) { + if (!tsEnableMonitor || tsMonitorFqdn[0] == 0) return; + SMonInfo *pMonitor = monCreateMonitorInfo(); + if (pMonitor == NULL) return; + + dTrace("pDnode:%p, send monitor report to %s:%u", pDnode, tsMonitorFqdn, tsMonitorPort); + + SMonBasicInfo basicInfo = {0}; + dndGetBasicInfo(pDnode, &basicInfo); + monSetBasicInfo(pMonitor, &basicInfo); + + SMonClusterInfo clusterInfo = {0}; + SMonVgroupInfo vgroupInfo = {0}; + SMonGrantInfo grantInfo = {0}; + if (dndGetMnodeMonitorInfo(pDnode, &clusterInfo, &vgroupInfo, &grantInfo) == 0) { + monSetClusterInfo(pMonitor, &clusterInfo); + monSetVgroupInfo(pMonitor, &vgroupInfo); + monSetGrantInfo(pMonitor, &grantInfo); + } + + monSendReport(pMonitor); + monCleanupMonitorInfo(pMonitor); +} + static void *dnodeThreadRoutine(void *param) { SDnode *pDnode = param; SDnodeMgmt *pMgmt = &pDnode->dmgmt; - int32_t ms = tsStatusInterval * 1000; + int64_t lastStatusTime = taosGetTimestampMs(); + int64_t lastMonitorTime = lastStatusTime; setThreadName("dnode-hb"); while (true) { pthread_testcancel(); - taosMsleep(ms); + taosMsleep(200); + if (dndGetStat(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) { + continue; + } - if (dndGetStat(pDnode) == DND_STAT_RUNNING && !pMgmt->statusSent && !pMgmt->dropped) { + int64_t curTime = taosGetTimestampMs(); + + float statusInterval = (curTime - lastStatusTime) / 1000.0f; + if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) { dndSendStatusReq(pDnode); + lastStatusTime = curTime; + } + + float monitorInterval = (curTime - lastMonitorTime) / 1000.0f; + if (monitorInterval >= tsMonitorInterval) { + dndSendMonitorReport(pDnode); + lastMonitorTime = curTime; } } } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index e1b16a188f..6cb117867f 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -630,3 +630,13 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc dTrace("user:%s, retrieve auth spi:%d encrypt:%d", user, *spi, *encrypt); return code; } + +int32_t dndGetMnodeMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, + SMonGrantInfo *pGrantInfo) { + SMnode *pMnode = dndAcquireMnode(pDnode); + if (pMnode == NULL) return -1; + + int32_t code = mndGetMonitorInfo(pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); + dndReleaseMnode(pDnode, pMnode); + return code; +} \ No newline at end of file diff --git a/source/dnode/mgmt/impl/test/sut/CMakeLists.txt b/source/dnode/mgmt/impl/test/sut/CMakeLists.txt index d20c680fad..3a993986fe 100644 --- a/source/dnode/mgmt/impl/test/sut/CMakeLists.txt +++ b/source/dnode/mgmt/impl/test/sut/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src SUT_SRC) -add_library(sut STATIC ${SUT_SRC}) +add_library(sut STATIC STATIC ${SUT_SRC}) target_link_libraries( sut PUBLIC dnode diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 3aad002d40..514bba19f4 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -1,18 +1,12 @@ aux_source_directory(src MNODE_SRC) -add_library(mnode ${MNODE_SRC}) +add_library(mnode STATIC ${MNODE_SRC}) target_include_directories( mnode PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mnode" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - mnode - PRIVATE scheduler - PRIVATE sdb - PRIVATE wal - PRIVATE transport - PRIVATE cjson - PRIVATE sync + mnode scheduler sdb wal transport cjson sync monitor ) if(${BUILD_TEST}) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 54595fb105..929d47184b 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -21,11 +21,11 @@ #include "sdb.h" #include "tcache.h" #include "tep.h" +#include "tglobal.h" #include "tqueue.h" #include "ttime.h" -#include "wal.h" #include "version.h" -#include "tglobal.h" +#include "wal.h" #ifdef __cplusplus extern "C" { @@ -38,6 +38,20 @@ typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *p typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); +typedef struct SMnodeLoad { + int64_t numOfDnode; + int64_t numOfMnode; + int64_t numOfVgroup; + int64_t numOfDatabase; + int64_t numOfSuperTable; + int64_t numOfChildTable; + int64_t numOfNormalTable; + int64_t numOfColumn; + int64_t totalPoints; + int64_t totalStorage; + int64_t compStorage; +} SMnodeLoad; + typedef struct { const char *name; MndInitFp initFp; @@ -104,7 +118,9 @@ int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg); void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); -uint64_t mndGenerateUid(char *name, int32_t len) ; +uint64_t mndGenerateUid(char *name, int32_t len); + +int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 90d66a80fd..c95857a2d4 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -22,6 +22,7 @@ #include "mndDb.h" #include "mndDnode.h" #include "mndFunc.h" +#include "mndInfoSchema.h" #include "mndMnode.h" #include "mndOffset.h" #include "mndProfile.h" @@ -36,7 +37,6 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" -#include "mndInfoSchema.h" #define MQ_TIMER_MS 3000 #define TRNAS_TIMER_MS 6000 @@ -400,6 +400,11 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { return 0; } +int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, + SMonGrantInfo *pGrantInfo) { + return 0; + } + SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg)); if (pMsg == NULL) { @@ -505,10 +510,9 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } } - // Note: uid 0 is reserved uint64_t mndGenerateUid(char *name, int32_t len) { - int32_t hashval = MurmurHash3_32(name, len); + int32_t hashval = MurmurHash3_32(name, len); do { int64_t us = taosGetTimestampUs(); diff --git a/source/dnode/mnode/sdb/CMakeLists.txt b/source/dnode/mnode/sdb/CMakeLists.txt index b6620f8be4..168d5063c2 100644 --- a/source/dnode/mnode/sdb/CMakeLists.txt +++ b/source/dnode/mnode/sdb/CMakeLists.txt @@ -1,13 +1,10 @@ aux_source_directory(src MNODE_SRC) -add_library(sdb ${MNODE_SRC}) +add_library(sdb STATIC ${MNODE_SRC}) target_include_directories( sdb PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/mnode/sdb" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - sdb - PRIVATE os - PRIVATE common - PRIVATE util + sdb os common util ) \ No newline at end of file diff --git a/source/dnode/qnode/CMakeLists.txt b/source/dnode/qnode/CMakeLists.txt index f6f78f7357..92cd8fcb5c 100644 --- a/source/dnode/qnode/CMakeLists.txt +++ b/source/dnode/qnode/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src QNODE_SRC) -add_library(qnode ${QNODE_SRC}) +add_library(qnode STATIC ${QNODE_SRC}) target_include_directories( qnode PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/qnode" diff --git a/source/dnode/snode/CMakeLists.txt b/source/dnode/snode/CMakeLists.txt index a94dd9edd8..dafd5d6594 100644 --- a/source/dnode/snode/CMakeLists.txt +++ b/source/dnode/snode/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src SNODE_SRC) -add_library(snode ${SNODE_SRC}) +add_library(snode STATIC ${SNODE_SRC}) target_include_directories( snode PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/snode" diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 31f04e840a..b5b53d9361 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -60,10 +60,6 @@ typedef struct { } SVnodeCfg; typedef struct { - int32_t sver; - const char *timezone; - const char *locale; - const char *charset; uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) PutReqToVQueryQFp putReqToVQueryQFp; SendReqToDnodeFp sendReqToDnodeFp; diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index e07e46948f..78625d1eed 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -14,5 +14,6 @@ add_subdirectory(function) add_subdirectory(qcom) add_subdirectory(qworker) add_subdirectory(tfs) +add_subdirectory(monitor) add_subdirectory(nodes) add_subdirectory(scalar) diff --git a/source/libs/cache/CMakeLists.txt b/source/libs/cache/CMakeLists.txt index 5ba59ef160..c99b5602ad 100644 --- a/source/libs/cache/CMakeLists.txt +++ b/source/libs/cache/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src CACHE_SRC) -add_library(cache ${CACHE_SRC}) +add_library(cache STATIC ${CACHE_SRC}) target_include_directories( cache PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/cache" diff --git a/source/libs/catalog/CMakeLists.txt b/source/libs/catalog/CMakeLists.txt index bb9ed686a7..09cd252a89 100644 --- a/source/libs/catalog/CMakeLists.txt +++ b/source/libs/catalog/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src CATALOG_SRC) -add_library(catalog ${CATALOG_SRC}) +add_library(catalog STATIC ${CATALOG_SRC}) target_include_directories( catalog PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/catalog" diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index 9f700dbb3c..a10a542b6b 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src FUNCTION_SRC) -add_library(function ${FUNCTION_SRC}) +add_library(function STATIC ${FUNCTION_SRC}) target_include_directories( function PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/function" diff --git a/source/libs/index/CMakeLists.txt b/source/libs/index/CMakeLists.txt index 50e76abd3f..047fc555a0 100644 --- a/source/libs/index/CMakeLists.txt +++ b/source/libs/index/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src INDEX_SRC) -add_library(index ${INDEX_SRC}) +add_library(index STATIC ${INDEX_SRC}) target_include_directories( index PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/index" diff --git a/source/libs/monitor/CMakeLists.txt b/source/libs/monitor/CMakeLists.txt new file mode 100644 index 0000000000..309d63691c --- /dev/null +++ b/source/libs/monitor/CMakeLists.txt @@ -0,0 +1,13 @@ +aux_source_directory(src MONITOR_SRC) +add_library(monitor STATIC ${MONITOR_SRC}) +target_include_directories( + monitor + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/monitor" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +target_link_libraries(monitor os util common) + +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/libs/monitor/inc/monInt.h b/source/libs/monitor/inc/monInt.h new file mode 100644 index 0000000000..61f9980e4e --- /dev/null +++ b/source/libs/monitor/inc/monInt.h @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MONITOR_INT_H_ +#define _TD_MONITOR_INT_H_ + +#include "monitor.h" + +#include "tarray.h" +#include "tlockfree.h" +#include "tjson.h" + +typedef struct { + SRWLatch lock; + SArray *logs; // array of SMonLogItem + int32_t maxLogs; + const char *server; + uint16_t port; +} SMonitor; + +typedef struct SMonInfo { + SArray *logs; // array of SMonLogItem + SJson *pJson; +} SMonInfo; + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MONITOR_INT_H_*/ diff --git a/source/libs/monitor/src/monitor.c b/source/libs/monitor/src/monitor.c new file mode 100644 index 0000000000..811ee40dc8 --- /dev/null +++ b/source/libs/monitor/src/monitor.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "monInt.h" +#include "taoserror.h" +#include "thttp.h" +#include "tlog.h" +#include "ttime.h" + +static SMonitor tsMonitor = {0}; + +int32_t monInit(const SMonCfg *pCfg) { + tsMonitor.logs = taosArrayInit(16, sizeof(SMonInfo)); + if (tsMonitor.logs == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tsMonitor.maxLogs = pCfg->maxLogs; + tsMonitor.server = pCfg->server; + tsMonitor.port = pCfg->port; + taosInitRWLatch(&tsMonitor.lock); + return 0; +} + +void monCleanup() { + taosArrayDestroy(tsMonitor.logs); + tsMonitor.logs = NULL; +} + +void monAddLogItem(SMonLogItem *pItem) { + taosWLockLatch(&tsMonitor.lock); + int32_t size = taosArrayGetSize(tsMonitor.logs); + if (size > tsMonitor.maxLogs) { + uInfo("too many logs for monitor"); + } else { + taosArrayPush(tsMonitor.logs, pItem); + } + taosWUnLockLatch(&tsMonitor.lock); +} + +SMonInfo *monCreateMonitorInfo() { + SMonInfo *pMonitor = calloc(1, sizeof(SMonInfo)); + if (pMonitor == NULL) return NULL; + + taosWLockLatch(&tsMonitor.lock); + pMonitor->logs = taosArrayDup(tsMonitor.logs); + taosArrayClear(tsMonitor.logs); + taosWUnLockLatch(&tsMonitor.lock); + + pMonitor->pJson = tjsonCreateObject(); + if (pMonitor->pJson == NULL || pMonitor->logs == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + monCleanupMonitorInfo(pMonitor); + return NULL; + } + + return pMonitor; +} + +void monCleanupMonitorInfo(SMonInfo *pMonitor) { + taosArrayDestroy(pMonitor->logs); + tjsonDelete(pMonitor->pJson); + free(pMonitor); +} + +void monSendReport(SMonInfo *pMonitor) { + char *pCont = tjsonToString(pMonitor->pJson); + if (pCont != NULL) { + taosSendHttpReport(tsMonitor.server, tsMonitor.port, pCont, strlen(pCont)); + free(pCont); + } +} + +void monSetBasicInfo(SMonInfo *pMonitor, SMonBasicInfo *pInfo) { + SJson *pJson = pMonitor->pJson; + tjsonAddDoubleToObject(pJson, "dnode_id", pInfo->dnode_id); + tjsonAddStringToObject(pJson, "dnode_ep", pInfo->dnode_ep); + + int64_t ms = taosGetTimestampMs(); + char buf[40] = {0}; + taosFormatUtcTime(buf, sizeof(buf), ms, TSDB_TIME_PRECISION_MILLI); + tjsonAddStringToObject(pJson, "ts", buf); +} + +void monSetClusterInfo(SMonInfo *pMonitor, SMonClusterInfo *pInfo) { + +} + +void monSetVgroupInfo(SMonInfo *pMonitor, SMonVgroupInfo *pInfo) { + +} + +void monSetGrantInfo(SMonInfo *pMonitor, SMonGrantInfo *pInfo) { + +} + +void monSetDnodeInfo(SMonInfo *pMonitor, SMonDnodeInfo *pInfo) { + +} + +void monSetDiskInfo(SMonInfo *pMonitor, SMonDiskInfo *pInfo) { + +} diff --git a/source/libs/monitor/test/CMakeLists.txt b/source/libs/monitor/test/CMakeLists.txt new file mode 100644 index 0000000000..e3ab7e7337 --- /dev/null +++ b/source/libs/monitor/test/CMakeLists.txt @@ -0,0 +1,14 @@ +enable_testing() + +aux_source_directory(. MONITOR_TEST_SRC) +add_executable(monitor_test ${MONITOR_TEST_SRC}) +target_link_libraries( + monitor_test + PUBLIC monitor + PUBLIC gtest_main +) + +add_test( + NAME monitor_test + COMMAND monitor_test +) diff --git a/source/libs/monitor/test/monTest.cpp b/source/libs/monitor/test/monTest.cpp new file mode 100644 index 0000000000..a1805d0c9c --- /dev/null +++ b/source/libs/monitor/test/monTest.cpp @@ -0,0 +1,33 @@ +/** + * @file monTest.cpp + * @author slguan (slguan@taosdata.com) + * @brief monitor module tests + * @version 1.0 + * @date 2022-03-05 + * + * @copyright Copyright (c) 2022 + * + */ + +#include +#include "os.h" + +#include "monitor.h" + +class MonitorTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { root = "/tmp/monTest"; } + static void TearDownTestSuite() {} + + public: + void SetUp() override {} + void TearDown() override {} + + static const char *root; +}; + +const char *MonitorTest::root; + +TEST_F(MonitorTest, 01_Open_Close) { + +} diff --git a/source/libs/parser/CMakeLists.txt b/source/libs/parser/CMakeLists.txt index 417c56aba1..5d02868657 100644 --- a/source/libs/parser/CMakeLists.txt +++ b/source/libs/parser/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src PARSER_SRC) -add_library(parser ${PARSER_SRC}) +add_library(parser STATIC ${PARSER_SRC}) target_include_directories( parser PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/parser" diff --git a/source/libs/planner/CMakeLists.txt b/source/libs/planner/CMakeLists.txt index 14b4488e6a..db5d31f22b 100644 --- a/source/libs/planner/CMakeLists.txt +++ b/source/libs/planner/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src PLANNER_SRC) -add_library(planner ${PLANNER_SRC}) +add_library(planner STATIC ${PLANNER_SRC}) target_include_directories( planner PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/planner" diff --git a/source/libs/qcom/CMakeLists.txt b/source/libs/qcom/CMakeLists.txt index c63e54fb9b..a9bf0f5594 100644 --- a/source/libs/qcom/CMakeLists.txt +++ b/source/libs/qcom/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src QUERY_SRC) -add_library(qcom ${QUERY_SRC}) +add_library(qcom STATIC ${QUERY_SRC}) target_include_directories( qcom PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qcom" diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 1b4aee3ccf..862ae7ccae 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src SCHEDULER_SRC) -add_library(scheduler ${SCHEDULER_SRC}) +add_library(scheduler STATIC ${SCHEDULER_SRC}) target_include_directories( scheduler diff --git a/source/libs/sync/CMakeLists.txt b/source/libs/sync/CMakeLists.txt index cb38d7e363..cb196acc02 100644 --- a/source/libs/sync/CMakeLists.txt +++ b/source/libs/sync/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src SYNC_SRC) -add_library(sync ${SYNC_SRC}) +add_library(sync STATIC ${SYNC_SRC}) target_link_libraries( sync diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index f1c4327b69..7e60822a28 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -26,19 +26,26 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" #include "trpc.h" +#include "ttimer.h" + +#define TIMER_MAX_MS 0x7FFFFFFF typedef struct SSyncEnv { - void *pTimer; - void *pTimerManager; + tmr_h pEnvTickTimer; + tmr_h pTimerManager; + char name[128]; + } SSyncEnv; +extern SSyncEnv* gSyncEnv; + int32_t syncEnvStart(); int32_t syncEnvStop(); -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); +void syncEnvStopTimer(tmr_h* pTimer); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 4b788efd79..238948b403 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -30,42 +30,37 @@ extern "C" { #include "trpc.h" typedef struct SSyncIO { - void * serverRpc; - void * clientRpc; STaosQueue *pMsgQ; STaosQset * pQset; - pthread_t tid; - int8_t isStart; + pthread_t consumerTid; - SEpSet epSet; + void * serverRpc; + void * clientRpc; + SEpSet myAddr; - void *syncTimer; - void *syncTimerManager; - - int32_t (*start)(struct SSyncIO *ths); - int32_t (*stop)(struct SSyncIO *ths); - int32_t (*ping)(struct SSyncIO *ths); - - int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); - int32_t (*destroy)(struct SSyncIO *ths); + void *ioTimerTickQ; + void *ioTimerTickPing; + void *ioTimerManager; void *pSyncNode; - int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg); + int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); + int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg); + int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg); + int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); + int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); + int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); + + int8_t isStart; } SSyncIO; extern SSyncIO *gSyncIO; -int32_t syncIOStart(); -int32_t syncIOStop(); -int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); -SSyncIO *syncIOCreate(); - -static int32_t doSyncIOStart(SSyncIO *io); -static int32_t doSyncIOStop(SSyncIO *io); -static int32_t doSyncIOPing(SSyncIO *io); -static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t doSyncIODestroy(SSyncIO *io); +int32_t syncIOStart(char *host, uint16_t port); +int32_t syncIOStop(); +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOTickQ(); +int32_t syncIOTickPing(); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index ad8484662a..0901330488 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -26,6 +26,7 @@ extern "C" { #include "sync.h" #include "taosdef.h" #include "tlog.h" +#include "ttimer.h" extern int32_t sDebugFlag; @@ -47,23 +48,23 @@ extern int32_t sDebugFlag; taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); \ } \ } -#define sInfo(...) \ - { \ - if (sDebugFlag & DEBUG_INFO) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sInfo(...) \ + { \ + if (sDebugFlag & DEBUG_INFO) { \ + taosPrintLog("SYN INFO ", sDebugFlag, __VA_ARGS__); \ + } \ } -#define sDebug(...) \ - { \ - if (sDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sDebug(...) \ + { \ + if (sDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("SYN DEBUG ", sDebugFlag, __VA_ARGS__); \ + } \ } -#define sTrace(...) \ - { \ - if (sDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sTrace(...) \ + { \ + if (sDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("SYN TRACE ", sDebugFlag, __VA_ARGS__); \ + } \ } struct SRaft; @@ -87,35 +88,64 @@ typedef struct SyncAppendEntries SyncAppendEntries; struct SyncAppendEntriesReply; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; -typedef struct SSyncNode { - int8_t replica; - int8_t quorum; +struct SSyncEnv; +typedef struct SSyncEnv SSyncEnv; +typedef struct SRaftId { + SyncNodeId addr; // typedef uint64_t SyncNodeId; + SyncGroupId vgId; // typedef int32_t SyncGroupId; +} SRaftId; + +typedef struct SSyncNode { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; + SSyncFSM* pFsm; - SRaft* pRaft; + // passed from outside + void* rpcClient; + int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); - int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg); + int32_t refCount; + int64_t rid; - int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg); + SNodeInfo me; + SNodeInfo peers[TSDB_MAX_REPLICA]; + int32_t peersNum; - int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg); + ESyncRole role; + SRaftId raftId; - int32_t (*FpRequestVote)(struct SSyncNode* ths, const SyncRequestVote* pMsg); + tmr_h pPingTimer; + int32_t pingTimerMS; + uint8_t pingTimerStart; + TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp + uint64_t pingTimerCounter; - int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg); + tmr_h pElectTimer; + int32_t electTimerMS; + uint8_t electTimerStart; + TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp + uint64_t electTimerCounter; - int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); + tmr_h pHeartbeatTimer; + int32_t heartbeatTimerMS; + uint8_t heartbeatTimerStart; + TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp + uint64_t heartbeatTimerCounter; - int32_t (*FpAppendEntries)(struct SSyncNode* ths, const SyncAppendEntries* pMsg); + // callback + int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); - int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg); + int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); - int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); - int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); + + int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); + + int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); } SSyncNode; @@ -123,23 +153,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); -static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); +void syncNodePingAll(SSyncNode* pSyncNode); -static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); +void syncNodePingPeers(SSyncNode* pSyncNode); -static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); +void syncNodePingSelf(SSyncNode* pSyncNode); -static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg); +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); -static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg); - -static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); - -static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg); - -static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg); - -static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 2ee5e0109c..be53559e8a 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -23,13 +23,15 @@ extern "C" { #include #include #include +#include "cJSON.h" #include "sync.h" #include "syncRaftEntry.h" #include "taosdef.h" +// encode as uint64 typedef enum ESyncMessageType { - SYNC_PING = 0, - SYNC_PING_REPLY, + SYNC_PING = 101, + SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST, SYNC_CLIENT_REQUEST_REPLY, SYNC_REQUEST_VOTE, @@ -38,29 +40,86 @@ typedef enum ESyncMessageType { SYNC_APPEND_ENTRIES_REPLY, } ESyncMessageType; +/* +typedef struct SRaftId { + SyncNodeId addr; // typedef uint64_t SyncNodeId; + SyncGroupId vgId; // typedef int32_t SyncGroupId; +} SRaftId; +*/ + typedef struct SyncPing { - ESyncMessageType msgType; - const SSyncBuffer *pData; -} SyncPing, RaftPing; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + uint32_t dataLen; + char data[]; +} SyncPing; + +#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) + +SyncPing* syncPingBuild(uint32_t dataLen); + +void syncPingDestroy(SyncPing* pMsg); + +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); + +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); + +cJSON* syncPing2Json(const SyncPing* pMsg); + +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId); typedef struct SyncPingReply { - ESyncMessageType msgType; - const SSyncBuffer *pData; -} SyncPingReply, RaftPingReply; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + uint32_t dataLen; + char data[]; +} SyncPingReply; + +#define SYNC_PING_REPLY_FIX_LEN \ + (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) + +SyncPingReply* syncPingReplyBuild(uint32_t dataLen); + +void syncPingReplyDestroy(SyncPingReply* pMsg); + +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); + +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); + +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); + +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); + +cJSON* syncPingReply2Json(const SyncPingReply* pMsg); + +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); + +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); typedef struct SyncClientRequest { - ESyncMessageType msgType; - const SSyncBuffer *pData; - int64_t seqNum; - bool isWeak; -} SyncClientRequest, RaftClientRequest; + ESyncMessageType msgType; + char* data; + uint32_t dataLen; + int64_t seqNum; + bool isWeak; +} SyncClientRequest; typedef struct SyncClientRequestReply { - ESyncMessageType msgType; - int32_t errCode; - const SSyncBuffer *pErrMsg; - const SSyncBuffer *pLeaderHint; -} SyncClientRequestReply, RaftClientRequestReply; + ESyncMessageType msgType; + int32_t errCode; + SSyncBuffer* pErrMsg; + SSyncBuffer* pLeaderHint; +} SyncClientRequestReply; typedef struct SyncRequestVote { ESyncMessageType msgType; @@ -69,7 +128,7 @@ typedef struct SyncRequestVote { SyncGroupId vgId; SyncIndex lastLogIndex; SyncTerm lastLogTerm; -} SyncRequestVote, RaftRequestVote; +} SyncRequestVote; typedef struct SyncRequestVoteReply { ESyncMessageType msgType; @@ -77,7 +136,7 @@ typedef struct SyncRequestVoteReply { SyncNodeId nodeId; SyncGroupId vgId; bool voteGranted; -} SyncRequestVoteReply, RaftRequestVoteReply; +} SyncRequestVoteReply; typedef struct SyncAppendEntries { ESyncMessageType msgType; @@ -86,9 +145,9 @@ typedef struct SyncAppendEntries { SyncIndex prevLogIndex; SyncTerm prevLogTerm; int32_t entryCount; - SSyncRaftEntry * logEntries; + SSyncRaftEntry* logEntries; SyncIndex commitIndex; -} SyncAppendEntries, RaftAppendEntries; +} SyncAppendEntries; typedef struct SyncAppendEntriesReply { ESyncMessageType msgType; @@ -96,7 +155,7 @@ typedef struct SyncAppendEntriesReply { SyncNodeId nodeId; bool success; SyncIndex matchIndex; -} SyncAppendEntriesReply, RaftAppendEntriesReply; +} SyncAppendEntriesReply; #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h index a247a29fc4..bc5cf26a4c 100644 --- a/source/libs/sync/inc/syncRaft.h +++ b/source/libs/sync/inc/syncRaft.h @@ -27,6 +27,8 @@ extern "C" { #include "syncMessage.h" #include "taosdef.h" +#if 0 + typedef struct SRaftId { SyncNodeId addr; SyncGroupId vgId; @@ -82,6 +84,8 @@ int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); +#endif + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 0fdbd7a150..c480486ff0 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -34,8 +34,9 @@ extern "C" { typedef struct SRaftStore { SyncTerm currentTerm; SRaftId voteFor; - //FileFd fd; - char path[RAFT_STORE_PATH_LEN]; + // FileFd fd; + TdFilePtr pFile; + char path[RAFT_STORE_PATH_LEN]; } SRaftStore; SRaftStore *raftStoreOpen(const char *path); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h new file mode 100644 index 0000000000..93d2c12525 --- /dev/null +++ b/source/libs/sync/inc/syncUtil.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_UTIL_H +#define _TD_LIBS_SYNC_UTIL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "syncMessage.h" +#include "taosdef.h" + +// ---- encode / decode + +uint64_t syncUtilAddr2U64(const char* host, uint16_t port); + +void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); + +void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); + +void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); + +void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); + +// ---- SSyncBuffer ---- +#if 0 +void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); + +void syncUtilbufDestroy(SSyncBuffer* syncBuf); + +void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); + +void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); +#endif + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_UTIL_H*/ diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index e71cf55cb1..a9cf035650 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -18,6 +18,14 @@ SSyncEnv *gSyncEnv = NULL; +// local function ----------------- +static void syncEnvTick(void *param, void *tmrId); +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); +static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param); +static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer); +// -------------------------------- + int32_t syncEnvStart() { int32_t ret; gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); @@ -31,6 +39,40 @@ int32_t syncEnvStop() { return ret; } -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { return 0; } +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param) { + return doSyncEnvStartTimer(gSyncEnv, fp, mseconds, param); +} -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; } +void syncEnvStopTimer(tmr_h *pTimer) { doSyncEnvStopTimer(gSyncEnv, pTimer); } + +// local function ----------------- +static void syncEnvTick(void *param, void *tmrId) { + SSyncEnv *pSyncEnv = (SSyncEnv *)param; + sTrace("syncEnvTick ... name:%s ", pSyncEnv->name); + + pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); +} + +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { + snprintf(pSyncEnv->name, sizeof(pSyncEnv->name), "SyncEnv_%p", pSyncEnv); + + // start tmr thread + pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + + // pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); + + sTrace("SyncEnv start ok, name:%s", pSyncEnv->name); + + return 0; +} + +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { + taosTmrCleanUp(pSyncEnv->pTimerManager); + return 0; +} + +static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param) { + return taosTmrStart(fp, mseconds, pSyncEnv, pSyncEnv->pTimerManager); +} + +static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer) {} diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index bb20d11e37..8d04b5bb26 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -22,118 +22,72 @@ SSyncIO *gSyncIO = NULL; -int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } +// local function ------------ +static int32_t syncIOStartInternal(SSyncIO *io); +static int32_t syncIOStopInternal(SSyncIO *io); +static SSyncIO *syncIOCreate(char *host, uint16_t port); +static int32_t syncIODestroy(SSyncIO *io); -int32_t syncIOStart() { return 0; } +static void *syncIOConsumerFunc(void *param); +static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); +static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t syncIOStop() { return 0; } +static int32_t syncIOTickQInternal(SSyncIO *io); +static void syncIOTickQFunc(void *param, void *tmrId); +static int32_t syncIOTickPingInternal(SSyncIO *io); +static void syncIOTickPingFunc(void *param, void *tmrId); +// ---------------------------- -static void syncTick(void *param, void *tmrId) { - SSyncIO *io = (SSyncIO *)param; - sDebug("syncTick ... "); - - SRpcMsg rpcMsg; - rpcMsg.pCont = rpcMallocCont(10); - snprintf(rpcMsg.pCont, 10, "TICK"); - rpcMsg.contLen = 10; - rpcMsg.handle = NULL; - rpcMsg.msgType = 2; - - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); - - taosWriteQitem(io->pMsgQ, pTemp); - - io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); +// public function ------------ +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { + sTrace( + "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " + "pMsg->msgType:%d, pMsg->contLen:%d", + clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, + pMsg->msgType, pMsg->contLen); + pMsg->handle = NULL; + rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); + return 0; } -void *syncConsumer(void *param) { - SSyncIO *io = param; +int32_t syncIOStart(char *host, uint16_t port) { + gSyncIO = syncIOCreate(host, port); + assert(gSyncIO != NULL); - STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; - int type; + int32_t ret = syncIOStartInternal(gSyncIO); + assert(ret == 0); - qall = taosAllocateQall(); - - while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); - sDebug("%d sync-io msgs are received", numOfMsgs); - if (numOfMsgs <= 0) break; - - for (int i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont)); - } - - taosResetQitems(qall); - for (int i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - rpcFreeCont(pRpcMsg->pCont); - - if (pRpcMsg->handle != NULL) { - int msgSize = 128; - memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.pCont = rpcMallocCont(msgSize); - rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 0; - rpcSendResponse(&rpcMsg); - } - - taosFreeQitem(pRpcMsg); - } - } - - taosFreeQall(qall); - return NULL; + sTrace("syncIOStart ok, gSyncIO:%p gSyncIO->clientRpc:%p", gSyncIO, gSyncIO->clientRpc); + return 0; } -static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { - // app shall retrieve the auth info based on meterID from DB or a data file - // demo code here only for simple demo - int ret = 0; +int32_t syncIOStop() { + int32_t ret = syncIOStopInternal(gSyncIO); + assert(ret == 0); + + ret = syncIODestroy(gSyncIO); + assert(ret == 0); return ret; } -static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - sDebug("processResponse ... "); - rpcFreeCont(pMsg->pCont); +int32_t syncIOTickQ() { + int32_t ret = syncIOTickQInternal(gSyncIO); + assert(ret == 0); + return ret; } -static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SSyncIO *io = pParent; - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - - sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); - taosWriteQitem(io->pMsgQ, pTemp); +int32_t syncIOTickPing() { + int32_t ret = syncIOTickPingInternal(gSyncIO); + assert(ret == 0); + return ret; } -SSyncIO *syncIOCreate() { - SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); - memset(io, 0, sizeof(*io)); - - io->pMsgQ = taosOpenQueue(); - io->pQset = taosOpenQset(); - taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - - io->start = doSyncIOStart; - io->stop = doSyncIOStop; - io->ping = doSyncIOPing; - io->onMsg = doSyncIOOnMsg; - io->destroy = doSyncIODestroy; - - return io; -} - -static int32_t doSyncIOStart(SSyncIO *io) { +// local function ------------ +static int32_t syncIOStartInternal(SSyncIO *io) { taosBlockSIGPIPE(); + rpcInit(); tsRpcForceTcp = 1; // cient rpc init @@ -143,7 +97,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { rpcInit.localPort = 0; rpcInit.label = "SYNC-IO-CLIENT"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processResponse; + rpcInit.cfp = syncIOProcessReply; rpcInit.sessions = 100; rpcInit.idleTime = 100; rpcInit.user = "sync-io"; @@ -163,13 +117,13 @@ static int32_t doSyncIOStart(SSyncIO *io) { { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 38000; + rpcInit.localPort = io->myAddr.eps[0].port; rpcInit.label = "SYNC-IO-SERVER"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processRequestMsg; + rpcInit.cfp = syncIOProcessRequest; rpcInit.sessions = 1000; rpcInit.idleTime = 2 * 1500; - rpcInit.afp = retrieveAuthInfo; + rpcInit.afp = syncIOAuth; rpcInit.parent = io; rpcInit.connType = TAOS_CONN_SERVER; @@ -180,12 +134,9 @@ static int32_t doSyncIOStart(SSyncIO *io) { } } - io->epSet.inUse = 0; - addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000); - // start consumer thread { - if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { + if (pthread_create(&io->consumerTid, NULL, syncIOConsumerFunc, io) != 0) { sError("failed to create sync consumer thread since %s", strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -193,35 +144,32 @@ static int32_t doSyncIOStart(SSyncIO *io) { } // start tmr thread - io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); - io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); + io->ioTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); return 0; } -static int32_t doSyncIOStop(SSyncIO *io) { +static int32_t syncIOStopInternal(SSyncIO *io) { atomic_store_8(&io->isStart, 0); - pthread_join(io->tid, NULL); + pthread_join(io->consumerTid, NULL); return 0; } -static int32_t doSyncIOPing(SSyncIO *io) { - SRpcMsg rpcMsg, rspMsg; +static SSyncIO *syncIOCreate(char *host, uint16_t port) { + SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); + memset(io, 0, sizeof(*io)); - rpcMsg.pCont = rpcMallocCont(10); - snprintf(rpcMsg.pCont, 10, "ping"); - rpcMsg.contLen = 10; - rpcMsg.handle = NULL; - rpcMsg.msgType = 1; + io->pMsgQ = taosOpenQueue(); + io->pQset = taosOpenQset(); + taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL); + io->myAddr.inUse = 0; + addEpIntoEpSet(&io->myAddr, host, port); - return 0; + return io; } -static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } - -static int32_t doSyncIODestroy(SSyncIO *io) { +static int32_t syncIODestroy(SSyncIO *io) { int8_t start = atomic_load_8(&io->isStart); assert(start == 0); @@ -235,15 +183,149 @@ static int32_t doSyncIODestroy(SSyncIO *io) { io->clientRpc = NULL; } - if (io->pMsgQ != NULL) { - free(io->pMsgQ); - io->pMsgQ = NULL; - } - - if (io->pQset != NULL) { - free(io->pQset); - io->pQset = NULL; - } + taosCloseQueue(io->pMsgQ); + taosCloseQset(io->pQset); return 0; } + +static void *syncIOConsumerFunc(void *param) { + SSyncIO *io = param; + + STaosQall *qall; + SRpcMsg * pRpcMsg, rpcMsg; + int type; + + qall = taosAllocateQall(); + + while (1) { + int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); + sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs); + if (numOfMsgs <= 0) break; + + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen, + (char *)(pRpcMsg->pCont)); + + if (pRpcMsg->msgType == SYNC_PING) { + if (io->FpOnSyncPing != NULL) { + SyncPing *pSyncMsg; + + SRpcMsg tmpRpcMsg; + memcpy(&tmpRpcMsg, pRpcMsg, sizeof(SRpcMsg)); + pSyncMsg = syncPingBuild(tmpRpcMsg.contLen); + + syncPingFromRpcMsg(pRpcMsg, pSyncMsg); + + // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); + + io->FpOnSyncPing(io->pSyncNode, pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); + syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); + + if (io->FpOnSyncPingReply != NULL) { + io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + } + } else { + ; + } + } + + taosResetQitems(qall); + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + rpcFreeCont(pRpcMsg->pCont); + + if (pRpcMsg->handle != NULL) { + int msgSize = 128; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply"); + rpcMsg.handle = pRpcMsg->handle; + rpcMsg.code = 0; + + sTrace("syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d", pRpcMsg->msgType, rpcMsg.contLen); + rpcSendResponse(&rpcMsg); + } + + taosFreeQitem(pRpcMsg); + } + } + + taosFreeQall(qall); + return NULL; +} + +static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { + // app shall retrieve the auth info based on meterID from DB or a data file + // demo code here only for simple demo + int ret = 0; + return ret; +} + +static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sTrace("<-- syncIOProcessRequest --> type:%d, contLen:%d, cont:%s", pMsg->msgType, pMsg->contLen, + (char *)pMsg->pCont); + + SSyncIO *io = pParent; + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); +} + +static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sTrace("syncIOProcessReply: type:%d, contLen:%d msg:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont); + rpcFreeCont(pMsg->pCont); +} + +static int32_t syncIOTickQInternal(SSyncIO *io) { + io->ioTimerTickQ = taosTmrStart(syncIOTickQFunc, 1000, io, io->ioTimerManager); + return 0; +} + +static void syncIOTickQFunc(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sTrace("<-- syncIOTickQFunc -->"); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickQ"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 55; + + SRpcMsg *pTemp; + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); + taosTmrReset(syncIOTickQFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickQ); +} + +static int32_t syncIOTickPingInternal(SSyncIO *io) { + io->ioTimerTickPing = taosTmrStart(syncIOTickPingFunc, 1000, io, io->ioTimerManager); + return 0; +} + +static void syncIOTickPingFunc(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sTrace("<-- syncIOTickPingFunc -->"); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickPing"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 77; + + rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL); + taosTmrReset(syncIOTickPingFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickPing); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bd2952505e..49fac038da 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -15,12 +15,36 @@ #include #include "sync.h" +#include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncUtil.h" -int32_t syncInit() { return 0; } +static int32_t tsNodeRefId = -1; -void syncCleanUp() {} +// ------ local funciton --------- +static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); +static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); +static void syncNodePingTimerCb(void* param, void* tmrId); + +static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); +static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); +static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); + +static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); +static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); +static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); +static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); +static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +// --------------------------------- + +int32_t syncInit() { + sTrace("syncInit ok"); + return 0; +} + +void syncCleanUp() { sTrace("syncCleanUp ok"); } int64_t syncStart(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); @@ -32,7 +56,9 @@ void syncStop(int64_t rid) {} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } +// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } + +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } @@ -41,69 +67,238 @@ void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); + memset(pSyncNode, 0, sizeof(SSyncNode)); + pSyncNode->vgId = pSyncInfo->vgId; + pSyncNode->syncCfg = pSyncInfo->syncCfg; + memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); + pSyncNode->pFsm = pSyncInfo->pFsm; + + pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; - pSyncNode->FpPing = doSyncNodePing; - pSyncNode->FpOnPing = onSyncNodePing; - pSyncNode->FpOnPingReply = onSyncNodePingReply; - pSyncNode->FpRequestVote = doSyncNodeRequestVote; - pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; - pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply; - pSyncNode->FpAppendEntries = doSyncNodeAppendEntries; - pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries; - pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply; + pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; + pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; + + int j = 0; + for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) { + if (i != pSyncInfo->syncCfg.myIndex) { + pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i]; + j++; + } + } + + pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER; + syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); + + pSyncNode->pPingTimer = NULL; + pSyncNode->pingTimerMS = 1000; + atomic_store_8(&pSyncNode->pingTimerStart, 0); + pSyncNode->FpPingTimer = syncNodePingTimerCb; + pSyncNode->pingTimerCounter = 0; + + pSyncNode->FpOnPing = syncNodeOnPingCb; + pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; + pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; + pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; + pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; + pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; return pSyncNode; } void syncNodeClose(SSyncNode* pSyncNode) { assert(pSyncNode != NULL); - raftClose(pSyncNode->pRaft); free(pSyncNode); } -static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { - int32_t ret = ths->pRaft->FpPing(ths->pRaft, pMsg); +void syncNodePingAll(SSyncNode* pSyncNode) { + sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { + SRaftId destId; + syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId); + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId); + ret = syncNodePing(pSyncNode, &destId, pMsg); + assert(ret == 0); + syncPingDestroy(pMsg); + } +} + +void syncNodePingPeers(SSyncNode* pSyncNode) { + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId destId; + syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId); + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId); + ret = syncNodePing(pSyncNode, &destId, pMsg); + assert(ret == 0); + syncPingDestroy(pMsg); + } +} + +void syncNodePingSelf(SSyncNode* pSyncNode) { + int32_t ret; + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId); + ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); + assert(ret == 0); + syncPingDestroy(pMsg); +} + +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pPingTimer == NULL) { + pSyncNode->pPingTimer = + taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } + + atomic_store_8(&pSyncNode->pingTimerStart, 1); + return 0; +} + +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->pingTimerStart, 0); + pSyncNode->pingTimerCounter = TIMER_MAX_MS; + return 0; +} + +// ------ local funciton --------- +static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { + sTrace("syncNodePing pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + + /* + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf((char*)rpcMsg.pCont, rpcMsg.contLen, "%s", "xxxxxxxxxxxxxx"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 1; + */ + + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + { + SyncPing* pMsg2 = rpcMsg.pCont; + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing rpcMsg.pCont:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + return ret; } -static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) { - int32_t ret = ths->pRaft->FpOnPing(ths->pRaft, pMsg); +static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { + int32_t ret = 0; return ret; } -static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) { - int32_t ret = ths->pRaft->FpOnPingReply(ths->pRaft, pMsg); +static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { + int32_t ret = 0; return ret; } -static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) { - int32_t ret = ths->pRaft->FpRequestVote(ths->pRaft, pMsg); +static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode); + SEpSet epSet; + syncUtilraftId2EpSet(destRaftId, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilnodeInfo2EpSet(nodeInfo, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { + int32_t ret = 0; + sTrace("<-- syncNodeOnPingCb -->"); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsgReply, &rpcMsg); + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + return ret; } -static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = ths->pRaft->FpOnRequestVote(ths->pRaft, pMsg); +static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { + int32_t ret = 0; + sTrace("<-- syncNodeOnPingReplyCb -->"); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + return ret; } -static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = ths->pRaft->FpOnRequestVoteReply(ths->pRaft, pMsg); +static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { + int32_t ret = 0; return ret; } -static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) { - int32_t ret = ths->pRaft->FpAppendEntries(ths->pRaft, pMsg); +static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { + int32_t ret = 0; return ret; } -static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t ret = ths->pRaft->FpOnAppendEntries(ths->pRaft, pMsg); +static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { + int32_t ret = 0; return ret; } -static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = ths->pRaft->FpOnAppendEntriesReply(ths->pRaft, pMsg); +static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = 0; return ret; +} + +static void syncNodePingTimerCb(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_8(&pSyncNode->pingTimerStart)) { + ++(pSyncNode->pingTimerCounter); + // pSyncNode->pingTimerMS += 100; + + sTrace( + "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " + "tmrId:%p ", + pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId); + + syncNodePingAll(pSyncNode); + + taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } else { + sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart); + } } \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 8937303725..a26c8401b2 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -15,5 +15,265 @@ #include "syncMessage.h" #include "syncRaft.h" +#include "syncUtil.h" +#include "tcoding.h" -void onMessage(SRaft *pRaft, void *pMsg) {} \ No newline at end of file +void onMessage(SRaft* pRaft, void* pMsg) {} + +// ---- message process SyncPing---- +SyncPing* syncPingBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; + SyncPing* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_PING; + pMsg->dataLen = dataLen; +} + +void syncPingDestroy(SyncPing* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); + assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); +} + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { + syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncPing2Json(const SyncPing* pMsg) { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", pMsg->data); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPing* pMsg = syncPingBuild(dataLen); + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId) { + SyncPing* pMsg = syncPingBuild2(srcId, destId, "ping"); + return pMsg; +} + +// ---- message process SyncPingReply---- +SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_PING_REPLY_FIX_LEN + dataLen; + SyncPingReply* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_PING_REPLY; + pMsg->dataLen = dataLen; +} + +void syncPingReplyDestroy(SyncPingReply* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); + assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); +} + +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { + syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", pMsg->data); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot); + return pJson; +} + +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPingReply* pMsg = syncPingReplyBuild(dataLen); + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) { + SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, "pang"); + return pMsg; +} + +#if 0 +void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) { + *bufLen = sizeof(SyncPing) + pMsg->dataLen; + *ppBuf = (char*)malloc(*bufLen); + void* pStart = *ppBuf; + uint32_t allBytes = *bufLen; + + int len = 0; + len = taosEncodeFixedU32(&pStart, pMsg->msgType); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU64(&pStart, pMsg->destId.addr); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU32(&pStart, pMsg->dataLen); + allBytes -= len; + assert(len > 0); + pStart += len; + + memcpy(pStart, pMsg->data, pMsg->dataLen); + allBytes -= pMsg->dataLen; + assert(allBytes == 0); +} + + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { + void* pStart = (void*)buf; + uint64_t u64; + int32_t i32; + uint32_t u32; + + pStart = taosDecodeFixedU64(pStart, &u64); + pMsg->msgType = u64; + + pStart = taosDecodeFixedU64(pStart, &u64); + pMsg->srcId.addr = u64; + + pStart = taosDecodeFixedI32(pStart, &i32); + pMsg->srcId.vgId = i32; + + pStart = taosDecodeFixedU64(pStart, &u64); + pMsg->destId.addr = u64; + + pStart = taosDecodeFixedI32(pStart, &i32); + pMsg->destId.vgId = i32; + + pStart = taosDecodeFixedU32(pStart, &u32); + pMsg->dataLen = u32; +} +#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c index 9f139730d1..b07c6ea797 100644 --- a/source/libs/sync/src/syncRaft.c +++ b/source/libs/sync/src/syncRaft.c @@ -16,6 +16,8 @@ #include "syncRaft.h" #include "sync.h" +#if 0 + SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) { SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); assert(pRaft != NULL); @@ -64,3 +66,5 @@ static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesRepl int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } + +#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 964cc78490..59c85c38de 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -18,23 +18,125 @@ // to complie success: FileIO interface is modified -SRaftStore *raftStoreOpen(const char *path) { return NULL;} +SRaftStore *raftStoreOpen(const char *path) { + int32_t ret; -static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0;} + SRaftStore *pRaftStore = malloc(sizeof(SRaftStore)); + if (pRaftStore == NULL) { + sError("raftStoreOpen malloc error"); + return NULL; + } + memset(pRaftStore, 0, sizeof(*pRaftStore)); + snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); -int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0;} + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + memset(storeBuf, 0, sizeof(storeBuf)); -int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0;} + if (!raftStoreFileExist(pRaftStore->path)) { + ret = raftStoreInit(pRaftStore); + assert(ret == 0); + } -static bool raftStoreFileExist(char *path) { return 0;} + pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE); + assert(pRaftStore->pFile != NULL); -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} + int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE); + assert(len == RAFT_STORE_BLOCK_SIZE); -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} + ret = raftStoreDeserialize(pRaftStore, storeBuf, len); + assert(ret == 0); -void raftStorePrint(SRaftStore *pRaftStore) {} + return pRaftStore; +} +static int32_t raftStoreInit(SRaftStore *pRaftStore) { + assert(pRaftStore != NULL); + pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CTEATE | TD_FILE_WRITE); + assert(pRaftStore->pFile != NULL); + + pRaftStore->currentTerm = 0; + pRaftStore->voteFor.addr = 0; + pRaftStore->voteFor.vgId = 0; + + int32_t ret = raftStorePersist(pRaftStore); + assert(ret == 0); + + taosCloseFile(&pRaftStore->pFile); + return 0; +} + +int32_t raftStoreClose(SRaftStore *pRaftStore) { + assert(pRaftStore != NULL); + + taosCloseFile(&pRaftStore->pFile); + free(pRaftStore); + pRaftStore = NULL; + return 0; +} + +int32_t raftStorePersist(SRaftStore *pRaftStore) { + assert(pRaftStore != NULL); + + int32_t ret; + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); + assert(ret == 0); + + taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET); + + ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf)); + assert(ret == RAFT_STORE_BLOCK_SIZE); + + taosFsyncFile(pRaftStore->pFile); + return 0; +} + +static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; } + +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { + assert(pRaftStore != NULL); + + cJSON *pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm); + cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr); + cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); + + char *serialized = cJSON_Print(pRoot); + int len2 = strlen(serialized); + assert(len2 < len); + memset(buf, 0, len); + snprintf(buf, len, "%s", serialized); + free(serialized); + + cJSON_Delete(pRoot); + return 0; +} + +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { + assert(pRaftStore != NULL); + + assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); + cJSON *pRoot = cJSON_Parse(buf); + + cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); + pRaftStore->currentTerm = pCurrentTerm->valueint; + + cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); + pRaftStore->voteFor.addr = pVoteForAddr->valueint; + + cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); + pRaftStore->voteFor.vgId = pVoteForVgid->valueint; + + cJSON_Delete(pRoot); + return 0; +} + +void raftStorePrint(SRaftStore *pRaftStore) { + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); + printf("%s\n", storeBuf); +} #if 0 diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c new file mode 100644 index 0000000000..b4959a810b --- /dev/null +++ b/source/libs/sync/src/syncUtil.c @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncUtil.h" +#include +#include +#include + +// ---- encode / decode +uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { + uint64_t u64; + uint32_t hostU32 = (uint32_t)inet_addr(host); + // assert(hostU32 != (uint32_t)-1); + u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16); + return u64; +} + +void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) { + uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF); + + struct in_addr addr; + addr.s_addr = hostU32; + snprintf(host, len, "%s", inet_ntoa(addr)); + *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); +} + +void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) { + pEpSet->inUse = 0; + pEpSet->numOfEps = 0; + addEpIntoEpSet(pEpSet, pNodeInfo->nodeFqdn, pNodeInfo->nodePort); +} + +void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { + char host[TSDB_FQDN_LEN]; + uint16_t port; + + syncUtilU642Addr(raftId->addr, host, sizeof(host), &port); + + /* + pEpSet->numOfEps = 1; + pEpSet->inUse = 0; + pEpSet->eps[0].port = port; + snprintf(pEpSet->eps[0].fqdn, sizeof(pEpSet->eps[0].fqdn), "%s", host); + */ + pEpSet->inUse = 0; + pEpSet->numOfEps = 0; + addEpIntoEpSet(pEpSet, host, port); +} + +void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { + uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn); + assert(ipv4 != 0xFFFFFFFF); + char ipbuf[128]; + tinet_ntoa(ipbuf, ipv4); + raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort); + raftId->vgId = vgId; +} + +// ---- SSyncBuffer ----- +#if 0 +void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { + syncBuf->len = len; + syncBuf->data = malloc(syncBuf->len); +} + +void syncUtilbufDestroy(SSyncBuffer* syncBuf) { free(syncBuf->data); } + +void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest) { + dest->len = src->len; + dest->data = src->data; +} + +void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { + dest->len = src->len; + dest->data = malloc(dest->len); + memcpy(dest->data, src->data, dest->len); +} +#endif \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index e655ac01be..2913d230b2 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -1,6 +1,13 @@ add_executable(syncTest "") add_executable(syncEnvTest "") add_executable(syncPingTest "") +add_executable(syncEncodeTest "") +add_executable(syncIOTickQTest "") +add_executable(syncIOTickPingTest "") +add_executable(syncIOSendMsgTest "") +add_executable(syncIOSendMsgClientTest "") +add_executable(syncIOSendMsgServerTest "") +add_executable(syncRaftStoreTest "") target_sources(syncTest @@ -15,6 +22,34 @@ target_sources(syncPingTest PRIVATE "syncPingTest.cpp" ) +target_sources(syncEncodeTest + PRIVATE + "syncEncodeTest.cpp" +) +target_sources(syncIOTickQTest + PRIVATE + "syncIOTickQTest.cpp" +) +target_sources(syncIOTickPingTest + PRIVATE + "syncIOTickPingTest.cpp" +) +target_sources(syncIOSendMsgTest + PRIVATE + "syncIOSendMsgTest.cpp" +) +target_sources(syncIOSendMsgClientTest + PRIVATE + "syncIOSendMsgClientTest.cpp" +) +target_sources(syncIOSendMsgServerTest + PRIVATE + "syncIOSendMsgServerTest.cpp" +) +target_sources(syncRaftStoreTest + PRIVATE + "syncRaftStoreTest.cpp" +) target_include_directories(syncTest @@ -32,6 +67,41 @@ target_include_directories(syncPingTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEncodeTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOTickQTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOTickPingTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOSendMsgTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOSendMsgClientTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOSendMsgServerTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncRaftStoreTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -46,6 +116,34 @@ target_link_libraries(syncPingTest sync gtest_main ) +target_link_libraries(syncEncodeTest + sync + gtest_main +) +target_link_libraries(syncIOTickQTest + sync + gtest_main +) +target_link_libraries(syncIOTickPingTest + sync + gtest_main +) +target_link_libraries(syncIOSendMsgTest + sync + gtest_main +) +target_link_libraries(syncIOSendMsgClientTest + sync + gtest_main +) +target_link_libraries(syncIOSendMsgServerTest + sync + gtest_main +) +target_link_libraries(syncRaftStoreTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp new file mode 100644 index 0000000000..4deceb603e --- /dev/null +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -0,0 +1,184 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +#define PING_MSG_LEN 20 + +void test1() { + sTrace("test1: ---- syncPingSerialize, syncPingDeserialize"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "test ping"); + SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); + pMsg->srcId.addr = 1; + pMsg->srcId.vgId = 2; + pMsg->destId.addr = 3; + pMsg->destId.vgId = 4; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPing: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncPingSerialize(pMsg, buf, bufLen); + + SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes); + syncPingDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPing2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); + free(buf); +} + +void test2() { + sTrace("test2: ---- syncPing2RpcMsg, syncPingFromRpcMsg"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "hello raft"); + SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); + pMsg->srcId.addr = 100; + pMsg->srcId.vgId = 200; + pMsg->destId.addr = 300; + pMsg->destId.vgId = 400; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPing: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes); + syncPingFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPing2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); +} + +void test3() { + sTrace("test3: ---- syncPingReplySerialize, syncPingReplyDeserialize"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "test ping"); + SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); + pMsg->srcId.addr = 19; + pMsg->srcId.vgId = 29; + pMsg->destId.addr = 39; + pMsg->destId.vgId = 49; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncPingReplySerialize(pMsg, buf, bufLen); + + SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes); + syncPingReplyDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncPingReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); + free(buf); +} + +void test4() { + sTrace("test4: ---- syncPingReply2RpcMsg, syncPingReplyFromRpcMsg"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "hello raft"); + SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); + pMsg->srcId.addr = 66; + pMsg->srcId.vgId = 77; + pMsg->destId.addr = 88; + pMsg->destId.vgId = 99; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsg, &rpcMsg); + SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes); + syncPingReplyFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncPingReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); +} +int main() { + // taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + test1(); + test2(); + test3(); + test4(); + + return 0; +} diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index 1d050e7094..1ac4357c5a 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -34,19 +34,20 @@ void doSync() { } int main() { - //taosInitLog((char*)"syncEnvTest.log", 100000, 10); + // taosInitLog((char*)"syncEnvTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; + int32_t ret; logTest(); - int32_t ret = syncIOStart(); - assert(ret == 0); + // ret = syncIOStart(); + // assert(ret == 0); ret = syncEnvStart(); assert(ret == 0); - doSync(); + // doSync(); while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncIOSendMsgClientTest.cpp b/source/libs/sync/test/syncIOSendMsgClientTest.cpp new file mode 100644 index 0000000000..9e8c7c8b65 --- /dev/null +++ b/source/libs/sync/test/syncIOSendMsgClientTest.cpp @@ -0,0 +1,50 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char *)"127.0.0.1", 7010); + assert(ret == 0); + + for (int i = 0; i < 10; ++i) { + SEpSet epSet; + epSet.inUse = 0; + epSet.numOfEps = 0; + addEpIntoEpSet(&epSet, "127.0.0.1", 7030); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 77; + + syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg); + sleep(1); + } + + while (1) { + sleep(1); + } + + return 0; +} diff --git a/source/libs/sync/test/syncIOSendMsgServerTest.cpp b/source/libs/sync/test/syncIOSendMsgServerTest.cpp new file mode 100644 index 0000000000..8af9344ed6 --- /dev/null +++ b/source/libs/sync/test/syncIOSendMsgServerTest.cpp @@ -0,0 +1,33 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char *)"127.0.0.1", 7030); + assert(ret == 0); + + while (1) { + sleep(1); + } + + return 0; +} diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp new file mode 100644 index 0000000000..a297981ee5 --- /dev/null +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -0,0 +1,50 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char *)"127.0.0.1", 7010); + assert(ret == 0); + + for (int i = 0; i < 10; ++i) { + SEpSet epSet; + epSet.inUse = 0; + epSet.numOfEps = 0; + addEpIntoEpSet(&epSet, "127.0.0.1", 7010); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 77; + + syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg); + sleep(1); + } + + while (1) { + sleep(1); + } + + return 0; +} diff --git a/source/libs/sync/test/syncIOTickPingTest.cpp b/source/libs/sync/test/syncIOTickPingTest.cpp new file mode 100644 index 0000000000..1559b57585 --- /dev/null +++ b/source/libs/sync/test/syncIOTickPingTest.cpp @@ -0,0 +1,35 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char*)"127.0.0.1", 7010); + assert(ret == 0); + + ret = syncIOTickPing(); + assert(ret == 0); + + while (1) { + sleep(1); + } + return 0; +} diff --git a/source/libs/sync/test/syncIOTickQTest.cpp b/source/libs/sync/test/syncIOTickQTest.cpp new file mode 100644 index 0000000000..90304079e3 --- /dev/null +++ b/source/libs/sync/test/syncIOTickQTest.cpp @@ -0,0 +1,35 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char*)"127.0.0.1", 7010); + assert(ret == 0); + + ret = syncIOTickQ(); + assert(ret == 0); + + while (1) { + sleep(1); + } + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index e62d051946..24d9ead5e3 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -13,49 +13,71 @@ void logTest() { sFatal("--- sync log test: fatal"); } -void doSync() { +SSyncNode* doSync() { SSyncFSM* pFsm; SSyncInfo syncInfo; syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = 0; - pCfg->replicaNum = 3; + pCfg->replicaNum = 2; pCfg->nodeInfo[0].nodePort = 7010; - taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); pCfg->nodeInfo[1].nodePort = 7110; - taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); pCfg->nodeInfo[2].nodePort = 7210; - taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +void timerPingAll(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + syncNodePingAll(pSyncNode); } int main() { - //taosInitLog((char*)"syncPingTest.log", 100000, 10); + // taosInitLog((char*)"syncPingTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; logTest(); - int32_t ret = syncIOStart(); + int32_t ret = syncIOStart((char*)"127.0.0.1", 7010); assert(ret == 0); ret = syncEnvStart(); assert(ret == 0); - doSync(); + SSyncNode* pSyncNode = doSync(); + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); + + /* + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); + */ while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncRaftStoreTest.cpp b/source/libs/sync/test/syncRaftStoreTest.cpp new file mode 100644 index 0000000000..e533b89a92 --- /dev/null +++ b/source/libs/sync/test/syncRaftStoreTest.cpp @@ -0,0 +1,42 @@ +#include "syncRaftStore.h" +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" + +void *pingFunc(void *param) { + SSyncIO *io = (SSyncIO *)param; + while (1) { + sDebug("io->ping"); + // io->ping(io); + sleep(1); + } + return NULL; +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + sTrace("sync log test: trace"); + sDebug("sync log test: debug"); + sInfo("sync log test: info"); + sWarn("sync log test: warn"); + sError("sync log test: error"); + sFatal("sync log test: fatal"); + + SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); + assert(pRaftStore != NULL); + + raftStorePrint(pRaftStore); + + pRaftStore->currentTerm = 100; + pRaftStore->voteFor.addr = 200; + pRaftStore->voteFor.vgId = 300; + + raftStorePrint(pRaftStore); + raftStorePersist(pRaftStore); + + return 0; +} diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 0f72fd822f..0b397ef921 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,21 +1,21 @@ #include +#include "gtest/gtest.h" #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" -#include "gtest/gtest.h" void *pingFunc(void *param) { SSyncIO *io = (SSyncIO *)param; while (1) { sDebug("io->ping"); - io->ping(io); + // io->ping(io); sleep(1); } return NULL; } int main() { - //taosInitLog((char *)"syncTest.log", 100000, 10); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; diff --git a/source/libs/transport/CMakeLists.txt b/source/libs/transport/CMakeLists.txt index a2e82201bf..465646ac95 100644 --- a/source/libs/transport/CMakeLists.txt +++ b/source/libs/transport/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src TRANSPORT_SRC) -add_library(transport ${TRANSPORT_SRC}) +add_library(transport STATIC ${TRANSPORT_SRC}) target_include_directories( transport PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/transport" diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index bcf759e04f..4cf7cff818 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -1,5 +1,5 @@ aux_source_directory(src WAL_SRC) -add_library(wal ${WAL_SRC}) +add_library(wal STATIC ${WAL_SRC}) target_include_directories( wal PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/wal" diff --git a/source/os/CMakeLists.txt b/source/os/CMakeLists.txt index 0eda9d637d..c3bf94e888 100644 --- a/source/os/CMakeLists.txt +++ b/source/os/CMakeLists.txt @@ -1,14 +1,10 @@ aux_source_directory(src OS_SRC) -add_library(os ${OS_SRC}) +add_library(os STATIC ${OS_SRC}) target_include_directories( os PUBLIC "${CMAKE_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/include" ) target_link_libraries( - os - PUBLIC pthread - PUBLIC dl - PUBLIC rt - PUBLIC m + os pthread dl rt m ) \ No newline at end of file diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 136afb9a15..c2e9b52f88 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -15,8 +15,6 @@ #define ALLOW_FORBID_FUNC #include "os.h" -#define MAX_FPRINTFLINE_BUFFER_SIZE (1000) - #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #include @@ -46,10 +44,15 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */ typedef int32_t FileFd; +#define FILE_WITH_LOCK 1 + typedef struct TdFile { - int refId; - FileFd fd; - FILE *fp; +#if FILE_WITH_LOCK + pthread_rwlock_t rwlock; +#endif + int refId; + FileFd fd; + FILE *fp; } * TdFilePtr, TdFile; void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath) { @@ -238,6 +241,9 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { if (fp != NULL) fclose(fp); return NULL; } +#if FILE_WITH_LOCK + pthread_rwlock_init(&(pFile->rwlock),NULL); +#endif pFile->fd = fd; pFile->fp = fp; pFile->refId = 0; @@ -249,6 +255,12 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) return 0; #else + if (ppFile == NULL || *ppFile == NULL) { + return 0; + } +#if FILE_WITH_LOCK + pthread_rwlock_wrlock(&((*ppFile)->rwlock)); +#endif if (ppFile == NULL || *ppFile == NULL || (*ppFile)->fd == -1) { return 0; } @@ -263,6 +275,10 @@ int64_t taosCloseFile(TdFilePtr *ppFile) { (*ppFile)->fd = -1; } (*ppFile)->refId = 0; +#if FILE_WITH_LOCK + pthread_rwlock_unlock(&((*ppFile)->rwlock)); + pthread_rwlock_destroy(&((*ppFile)->rwlock)); +#endif free(*ppFile); *ppFile = NULL; return 0; @@ -273,6 +289,9 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { if (pFile == NULL) { return 0; } +#if FILE_WITH_LOCK + pthread_rwlock_rdlock(&(pFile->rwlock)); +#endif assert(pFile->fd >= 0); int64_t leftbytes = count; int64_t readbytes; @@ -284,9 +303,15 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { if (errno == EINTR) { continue; } else { +#if FILE_WITH_LOCK + pthread_rwlock_unlock(&(pFile->rwlock)); +#endif return -1; } } else if (readbytes == 0) { +#if FILE_WITH_LOCK + pthread_rwlock_unlock(&(pFile->rwlock)); +#endif return (int64_t)(count - leftbytes); } @@ -294,6 +319,9 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) { tbuf += readbytes; } +#if FILE_WITH_LOCK + pthread_rwlock_unlock(&(pFile->rwlock)); +#endif return count; } @@ -301,14 +329,24 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset) if (pFile == NULL) { return 0; } +#if FILE_WITH_LOCK + pthread_rwlock_rdlock(&(pFile->rwlock)); +#endif assert(pFile->fd >= 0); - return pread(pFile->fd, buf, count, offset); + int64_t ret = pread(pFile->fd, buf, count, offset); +#if FILE_WITH_LOCK + pthread_rwlock_unlock(&(pFile->rwlock)); +#endif + return ret; } int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { if (pFile == NULL) { return 0; } +#if FILE_WITH_LOCK + pthread_rwlock_wrlock(&(pFile->rwlock)); +#endif assert(pFile->fd >= 0); int64_t nleft = count; @@ -321,12 +359,18 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { if (errno == EINTR) { continue; } +#if FILE_WITH_LOCK + pthread_rwlock_unlock(&(pFile->rwlock)); +#endif return -1; } nleft -= nwritten; tbuf += nwritten; } - fsync(pFile->fd); + +#if FILE_WITH_LOCK + pthread_rwlock_unlock(&(pFile->rwlock)); +#endif return count; } @@ -334,8 +378,15 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) { if (pFile == NULL) { return 0; } +#if FILE_WITH_LOCK + pthread_rwlock_rdlock(&(pFile->rwlock)); +#endif assert(pFile->fd >= 0); - return (int64_t)lseek(pFile->fd, (long)offset, whence); + int64_t ret = lseek(pFile->fd, (long)offset, whence); +#if FILE_WITH_LOCK + pthread_rwlock_unlock(&(pFile->rwlock)); +#endif + return ret; } int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) { @@ -637,7 +688,6 @@ void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { } assert(pFile->fp != NULL); - char buffer[MAX_FPRINTFLINE_BUFFER_SIZE] = {0}; va_list ap; va_start(ap, format); vfprintf(pFile->fp, format, ap); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 163fad803f..3e761c6d91 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -167,11 +167,9 @@ void taosGetSystemInfo() { tsTotalMemoryMB = taosGetTotalMemory(); float tmp1, tmp2; - // taosGetDisk(); taosGetBandSpeed(&tmp1); taosGetCpuUsage(&tmp1, &tmp2); taosGetProcIO(&tmp1, &tmp2); - } void taosKillSystem() { @@ -712,7 +710,6 @@ void taosGetSystemInfo() { float tmp1, tmp2; taosGetSysMemory(&tmp1); taosGetProcMemory(&tmp2); - // taosGetDisk(); taosGetBandSpeed(&tmp1); taosGetCpuUsage(&tmp1, &tmp2); taosGetProcIO(&tmp1, &tmp2); diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index 14c39d3f03..adf29b1aa9 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -32,7 +32,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, fd = taosOpenTcpClientSocket(ip, port, 0); if (fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to create http socket since %s", terrstr()); + uError("failed to create http socket to %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } @@ -46,24 +46,24 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, if (taosWriteSocket(fd, (void*)header, headLen) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http header since %s", terrstr()); + uError("failed to send http header to %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } if (taosWriteSocket(fd, (void*)pCont, contLen) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to send http content since %s", terrstr()); + uError("failed to send http content to %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } // read something to avoid nginx error 499 if (taosReadSocket(fd, header, 10) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - uError("failed to receive response since %s", terrstr()); + uError("failed to receive response from %s:%u since %s", server, port, terrstr()); goto SEND_OVER; } - uInfo("send http to %s:%d, len:%d content: %s", server, port, contLen, pCont); + uInfo("send http to %s:%u, len:%d content: %s", server, port, contLen, pCont); code = 0; SEND_OVER: diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 2c117771b1..27848c4e23 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -26,7 +26,11 @@ SJson* tjsonCreateObject() { return pJson; } -void tjsonDelete(SJson* pJson) { cJSON_Delete((cJSON*)pJson); } +void tjsonDelete(SJson* pJson) { + if (pJson != NULL) { + cJSON_Delete((cJSON*)pJson); + } +} int32_t tjsonAddIntegerToObject(SJson* pJson, const char* pName, const uint64_t number) { char tmp[40] = {0};