From 57d341706bb4e7f23f2eb39f8e40b62040ed5d3e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 7 Jan 2022 14:57:45 +0800 Subject: [PATCH] implement client heartbeat --- include/common/tmsg.h | 70 +++++++------ source/client/inc/clientHb.h | 28 ++---- source/client/src/clientHb.c | 77 -------------- source/client/src/clientHbMgmt.c | 166 +++++++++++++++++++++++++++++++ source/client/src/clientHbMq.c | 25 +++++ source/common/src/tmsg.c | 14 ++- source/libs/wal/src/walMgmt.c | 3 +- 7 files changed, 249 insertions(+), 134 deletions(-) delete mode 100644 source/client/src/clientHb.c create mode 100644 source/client/src/clientHbMgmt.c create mode 100644 source/client/src/clientHbMq.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 954f24ef6a..d57fec3d5a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -140,6 +140,46 @@ typedef struct SKlv { void* value; } SKlv; +typedef struct SClientHbKey { + int32_t connId; + int32_t hbType; +} SClientHbKey; + +typedef struct SClientHbReq { + SClientHbKey connKey; + SHashObj* info; // hash +} SClientHbReq; + +typedef struct SClientHbBatchReq { + int64_t reqId; + SArray* reqs; // SArray +} SClientHbBatchReq; + +typedef struct SClientHbRsp { + SClientHbKey connKey; + int32_t status; + int32_t bodyLen; + void* body; +} SClientHbRsp; + +typedef struct SClientHbBatchRsp { + int64_t reqId; + int64_t rspId; + SArray* rsps; // SArray +} SClientHbBatchRsp; + +static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { + return taosIntHash_64(key, keyLen); +} + +int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); +void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); +static FORCE_INLINE void tFreeClientHbReq(void *pReq) { + SClientHbReq* req = pReq; + taosHashCleanup(req->info); + free(pReq); +} + static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pKlv->keyLen); @@ -156,10 +196,6 @@ static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); return buf; } -typedef struct SClientHbKey { - int32_t connId; - int32_t hbType; -} SClientHbKey; static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { int tlen = 0; @@ -174,32 +210,6 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) return buf; } -typedef struct SClientHbReq { - SClientHbKey connKey; - SHashObj* info; // hash -} SClientHbReq; - -typedef struct SClientHbBatchReq { - int64_t reqId; - SArray* reqs; // SArray -} SClientHbBatchReq; - -int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); -void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); - -typedef struct SClientHbRsp { - SClientHbKey connKey; - int32_t status; - int32_t bodyLen; - void* body; -} SClientHbRsp; - -typedef struct SClientHbBatchRsp { - int64_t reqId; - int64_t rspId; - SArray* rsps; // SArray -} SClientHbBatchRsp; - typedef struct SBuildTableMetaInput { int32_t vgId; char* dbName; diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 73adb41308..f410c5e3ee 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -18,9 +18,11 @@ #include "thash.h" #include "tmsg.h" +#define HEARTBEAT_INTERVAL 1500 //ms + typedef enum { - mq = 0, - // type can be added here + HEARTBEAT_TYPE_MQ = 0, + // types can be added here // HEARTBEAT_TYPE_MAX } EHbType; @@ -28,26 +30,16 @@ typedef enum { typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); -typedef struct SClientHbMgr { - int8_t inited; - int32_t reportInterval; // unit ms - int32_t stats; - SRWLatch lock; - SHashObj* activeInfo; // hash - SHashObj* getInfoFuncs; // hash - FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; - // input queue -} SClientHbMgr; - -static SClientHbMgr clientHbMgr = {0}; - +// called by mgmt int hbMgrInit(); void hbMgrCleanUp(); -int hbHandleRsp(void* hbMsg); - +int hbHandleRsp(SClientHbBatchRsp* hbRsp); +//called by user int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); - +void hbDeregisterConn(SClientHbKey connKey); int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); +// mq +void hbMgrInitMqHbRspHandle(); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c deleted file mode 100644 index 7daa1204d0..0000000000 --- a/source/client/src/clientHb.c +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "clientHb.h" - -static int32_t mqHbRspHandle(SClientHbRsp* pReq) { - return 0; -} - -uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { - return 0; -} - -static void hbMgrInitMqHbFunc() { - clientHbMgr.handle[mq] = mqHbRspHandle; -} - -int hbMgrInit() { - //init once - int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); - if (old == 1) return 0; - - //init config - clientHbMgr.reportInterval = 1500; - - //init stat - clientHbMgr.stats = 0; - - //init lock - taosInitRWLatch(&clientHbMgr.lock); - - //init handle funcs - hbMgrInitMqHbFunc(); - - //init hash info - clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - //init getInfoFunc - clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - return 0; -} - -void hbMgrCleanUp() { - int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); - if (old == 0) return; - - taosHashCleanup(clientHbMgr.activeInfo); - taosHashCleanup(clientHbMgr.getInfoFuncs); -} - -int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { - - return 0; -} - -int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { - //lock - - //find req by connection id - SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); - ASSERT(data != NULL); - taosHashPut(data->info, key, keyLen, value, valueLen); - - //unlock - return 0; -} diff --git a/source/client/src/clientHbMgmt.c b/source/client/src/clientHbMgmt.c new file mode 100644 index 0000000000..6ac2263039 --- /dev/null +++ b/source/client/src/clientHbMgmt.c @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "clientHb.h" + +typedef struct SClientHbMgr { + int8_t inited; + + // statistics + int32_t reportCnt; + int32_t connKeyCnt; + int64_t reportBytes; // not implemented + int64_t startTime; + // thread + pthread_t thread; + + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash + FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; +} SClientHbMgr; + +static SClientHbMgr clientHbMgr = {0}; + +static int32_t hbCreateThread(); +static void hbStopThread(); + +static FORCE_INLINE void hbMgrInitHandle() { + // init all handle + hbMgrInitMqHbRspHandle(); +} + +static SClientHbBatchReq* hbGatherAllInfo() { + SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq)); + if(pReq == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt); + pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + + void *pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + while (pIter != NULL) { + taosArrayPush(pReq->reqs, pIter); + SClientHbReq* pOneReq = pIter; + taosHashClear(pOneReq->info); + pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + } + + return pReq; +} + +static void* hbThreadFunc(void* param) { + setThreadName("hb"); + while (1) { + atomic_add_fetch_32(&clientHbMgr.reportCnt, 1); + taosMsleep(HEARTBEAT_INTERVAL); + } + + return NULL; +} + +static int32_t hbCreateThread() { + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pthread_attr_destroy(&thAttr); + return 0; +} + +int hbMgrInit() { + // init once + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); + if (old == 1) return 0; + + // init stat + clientHbMgr.startTime = taosGetTimestampMs(); + + // init handle funcs + hbMgrInitHandle(); + + // init hash info + clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + clientHbMgr.activeInfo->freeFp = tFreeClientHbReq; + // init getInfoFunc + clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + + // init backgroud thread + return 0; +} + +void hbMgrCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); + if (old == 0) return; + + taosHashCleanup(clientHbMgr.activeInfo); + taosHashCleanup(clientHbMgr.getInfoFuncs); +} + +int hbHandleRsp(SClientHbBatchRsp* hbRsp) { + int64_t reqId = hbRsp->reqId; + int64_t rspId = hbRsp->rspId; + + SArray* rsps = hbRsp->rsps; + int32_t sz = taosArrayGetSize(rsps); + for (int i = 0; i < sz; i++) { + SClientHbRsp* pRsp = taosArrayGet(rsps, i); + if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) { + clientHbMgr.handle[pRsp->connKey.hbType](pRsp); + } else { + // discard rsp + } + } + return 0; +} + +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { + // init hash in activeinfo + void* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + if (data != NULL) { + return 0; + } + SClientHbReq hbReq; + hbReq.connKey = connKey; + hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + taosHashPut(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + // init hash + if (func != NULL) { + taosHashPut(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); + } + + atomic_add_fetch_32(&clientHbMgr.connKeyCnt, 1); + return 0; +} + +void hbDeregisterConn(SClientHbKey connKey) { + taosHashRemove(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + taosHashRemove(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey)); + atomic_sub_fetch_32(&clientHbMgr.connKeyCnt, 1); +} + +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { + // find req by connection id + SClientHbReq* pReq = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + ASSERT(pReq != NULL); + + taosHashPut(pReq->info, key, keyLen, value, valueLen); + + return 0; +} diff --git a/source/client/src/clientHbMq.c b/source/client/src/clientHbMq.c new file mode 100644 index 0000000000..8f2400aedd --- /dev/null +++ b/source/client/src/clientHbMq.c @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "clientHb.h" + +static int32_t mqHbRspHandle(SClientHbRsp* pReq) { + return 0; +} + +void hbMgrInitMqHbRspHandle() { + clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = mqHbRspHandle; +} + diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 5cbb42c1e6..2985d5dfe8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -31,17 +31,15 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int tlen = 0; tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); - void *pIter = NULL; - void *data; SKlv klv; - data = taosHashIterate(pReq->info, pIter); - while (data != NULL) { - taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(data); - klv.value = data; + void* pIter = taosHashIterate(pReq->info, pIter); + while (pIter != NULL) { + taosHashGetKey(pIter, &klv.key, (size_t *)&klv.keyLen); + klv.valueLen = taosHashGetDataLen(pIter); + klv.value = pIter; taosEncodeSKlv(buf, &klv); - data = taosHashIterate(pReq->info, pIter); + pIter = taosHashIterate(pReq->info, pIter); } return tlen; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 189881c86d..d12acb52c6 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -232,7 +232,8 @@ static int32_t walCreateThread() { if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { wError("failed to create wal thread since %s", strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } pthread_attr_destroy(&thAttr);