From 5c73c1ffd8f4a7dfece8953471ee33fef02269f7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 6 Jan 2022 19:53:14 +0800 Subject: [PATCH] refine heartbeat interface --- source/client/inc/clientHb.h | 145 ++++++++++++++++++----------------- source/client/src/clientHb.c | 11 +-- 2 files changed, 78 insertions(+), 78 deletions(-) diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 676b5c4c40..11fea4de5e 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -20,6 +20,8 @@ typedef enum { mq = 0, + // type can be added here + // HEARTBEAT_TYPE_MAX } EHbType; @@ -30,80 +32,16 @@ typedef struct SKlv { void* value; } SKlv; -static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKlv->keyLen); - tlen += taosEncodeFixedI32(buf, pKlv->valueLen); - tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); - tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { - buf = taosDecodeFixedI32(buf, &pKlv->keyLen); - buf = taosDecodeFixedI32(buf, &pKlv->valueLen); - buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); - buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); - return buf; -} - typedef struct SClientHbKey { int32_t connId; int32_t hbType; } SClientHbKey; -static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKey->connId); - tlen += taosEncodeFixedI32(buf, pKey->hbType); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { - buf = taosDecodeFixedI32(buf, &pKey->connId); - buf = taosDecodeFixedI32(buf, &pKey->hbType); - return buf; -} - typedef struct SClientHbReq { SClientHbKey hbKey; - SHashObj* info; // hash + SHashObj* info; // hash } SClientHbReq; -static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { - int tlen = 0; - tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey); - - void* pIter = NULL; - void* data; - SKlv klv; - data = taosHashIterate(pReq->info, pIter); - while (data != NULL) { - taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(data); - klv.value = data; - taosEncodeSKlv(buf, &klv); - - data = taosHashIterate(pReq->info, pIter); - } - return tlen; -} - -static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) { - ASSERT(pReq->info != NULL); - buf = taosDecodeSClientHbKey(buf, &pReq->hbKey); - - //TODO: error handling - if(pReq->info == NULL) { - pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - } - SKlv klv; - buf = taosDecodeSKlv(buf, &klv); - taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); - - return buf; -} - typedef struct SClientHbBatchReq { int64_t reqId; SArray* reqs; // SArray @@ -123,15 +61,16 @@ typedef struct SClientHbBatchRsp { SArray* rsps; // SArray } SClientHbBatchRsp; -typedef int32_t (*FHbRspHandle)(SClientHbReq* pReq); -typedef int32_t (*FGetConnInfo)(int32_t conn, void* self); +typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); +typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); typedef struct SClientHbMgr { int8_t inited; int32_t reportInterval; // unit ms int32_t stats; SRWLatch lock; - SHashObj* info; //hash + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; // input queue } SClientHbMgr; @@ -141,8 +80,72 @@ static SClientHbMgr clientHbMgr = {0}; int hbMgrInit(); void hbMgrCleanUp(); -int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle); +int hbHandleRsp(void* hbMsg); -int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle); +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); -int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen); +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); + +static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKlv->keyLen); + tlen += taosEncodeFixedI32(buf, pKlv->valueLen); + tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); + tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { + buf = taosDecodeFixedI32(buf, &pKlv->keyLen); + buf = taosDecodeFixedI32(buf, &pKlv->valueLen); + buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); + buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); + return buf; +} + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + +static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { + int tlen = 0; + tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey); + + void* pIter = NULL; + void* data; + SKlv klv; + data = taosHashIterate(pReq->info, pIter); + while (data != NULL) { + taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen); + klv.valueLen = taosHashGetDataLen(data); + klv.value = data; + taosEncodeSKlv(buf, &klv); + + data = taosHashIterate(pReq->info, pIter); + } + return tlen; +} + +static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) { + ASSERT(pReq->info != NULL); + buf = taosDecodeSClientHbKey(buf, &pReq->hbKey); + + // TODO: error handling + if (pReq->info == NULL) { + pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + SKlv klv; + buf = taosDecodeSKlv(buf, &klv); + taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); + + return buf; +} diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index f1f7409f05..1de295384b 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -15,7 +15,7 @@ #include "clientHb.h" -static int32_t mqHbRspHandle(SClientHbReq* pReq) { +static int32_t mqHbRspHandle(SClientHbRsp* pReq) { return 0; } @@ -42,15 +42,12 @@ void hbMgrCleanUp() { } -int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle) { +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { + return 0; } -int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle) { - return 0; -} - -int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen) { +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { //lock //find req by connection id