diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d57fec3d5a..f173662770 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -133,12 +133,12 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) -typedef struct SKlv { +typedef struct SKv { int32_t keyLen; int32_t valueLen; void* key; void* value; -} SKlv; +} SKv; typedef struct SClientHbKey { int32_t connId; @@ -174,26 +174,36 @@ static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); + static FORCE_INLINE void tFreeClientHbReq(void *pReq) { - SClientHbReq* req = pReq; + SClientHbReq* req = (SClientHbReq*)pReq; taosHashCleanup(req->info); free(pReq); } -static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { +int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); +void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); + +static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) { + SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; + taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + free(pReq); +} + +static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKlv->keyLen); - tlen += taosEncodeFixedI32(buf, pKlv->valueLen); - tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); - tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); + tlen += taosEncodeFixedI32(buf, pKv->keyLen); + tlen += taosEncodeFixedI32(buf, pKv->valueLen); + tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); + tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); return tlen; } -static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { - buf = taosDecodeFixedI32(buf, &pKlv->keyLen); - buf = taosDecodeFixedI32(buf, &pKlv->valueLen); - buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); - buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); +static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { + buf = taosDecodeFixedI32(buf, &pKv->keyLen); + buf = taosDecodeFixedI32(buf, &pKv->valueLen); + buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); + buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); return buf; } diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index a55a4fedab..ae76b022f2 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -23,6 +23,7 @@ extern "C" { #include "query.h" #include "tmsg.h" #include "tarray.h" +#include "trpc.h" #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 4bfb774435..17a06a941a 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -94,55 +94,6 @@ typedef struct STableMetaOutput { STableMeta *tbMeta; } STableMetaOutput; -typedef struct SDataBuf { - void *pData; - uint32_t len; -} SDataBuf; - -typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); -typedef int32_t (*__async_exec_fn_t)(void* param); - -typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; //async callback function - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; -} SMsgSendInfo; - -typedef struct SQueryNodeAddr{ - int32_t nodeId; //vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; -} SQueryNodeAddr; - -bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); - -int32_t initTaskQueue(); -int32_t cleanupTaskQueue(); - -/** - * - * @param execFn The asynchronously execution function - * @param execParam The parameters of the execFn - * @param code The response code during execution the execFn - * @return - */ -int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); - -/** - * Asynchronously send message to server, after the response received, the callback will be incured. - * - * @param pTransporter - * @param epSet - * @param pTransporterId - * @param pInfo - * @return - */ -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); - const SSchema* tGetTbnameColumnSchema(); void initQueryModuleMsgHandle(); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index b88d45fbd6..e28098dfbf 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -84,6 +84,55 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +typedef struct SDataBuf { + void *pData; + uint32_t len; +} SDataBuf; + +typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_exec_fn_t)(void* param); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; //async callback function + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; + +typedef struct SQueryNodeAddr{ + int32_t nodeId; //vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SQueryNodeAddr; + +bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); + +int32_t initTaskQueue(); +int32_t cleanupTaskQueue(); + +/** + * + * @param execFn The asynchronously execution function + * @param execParam The parameters of the execFn + * @param code The response code during execution the execFn + * @return + */ +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); + +/** + * Asynchronously send message to server, after the response received, the callback will be incured. + * + * @param pTransporter + * @param epSet + * @param pTransporterId + * @param pInfo + * @return + */ +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); + #ifdef __cplusplus } #endif diff --git a/include/util/thash.h b/include/util/thash.h index 4558162ac5..9dc6630461 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -211,6 +211,24 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); +/** + * Get the corresponding key information for a given data in hash table, using memcpy + * @param data + * @param dst + * @return + */ +static FORCE_INLINE int32_t taosHashCopyKey(void *data, void* dst) { + if (NULL == data || NULL == dst) { + return -1; + } + + SHashNode * node = GET_HASH_PNODE(data); + void* key = GET_HASH_NODE_KEY(node); + memcpy(dst, key, node->keyLen); + + return 0; +} + /** * Get the corresponding data length for a given data in hash table * @param data diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index f410c5e3ee..2624c8f833 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -28,10 +28,13 @@ typedef enum { } EHbType; typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); -typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); + +//TODO: embed param into function +//return type: SArray +typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param); // called by mgmt -int hbMgrInit(); +int hbMgrInit(void* transporter, SEpSet epSet); void hbMgrCleanUp(); int hbHandleRsp(SClientHbBatchRsp* hbRsp); diff --git a/source/client/src/clientHbMgmt.c b/source/client/src/clientHb.c similarity index 77% rename from source/client/src/clientHbMgmt.c rename to source/client/src/clientHb.c index 6ac2263039..8672b52343 100644 --- a/source/client/src/clientHbMgmt.c +++ b/source/client/src/clientHb.c @@ -14,6 +14,7 @@ */ #include "clientHb.h" +#include "trpc.h" typedef struct SClientHbMgr { int8_t inited; @@ -23,8 +24,13 @@ typedef struct SClientHbMgr { int32_t connKeyCnt; int64_t reportBytes; // not implemented int64_t startTime; - // thread + // ctl + int8_t threadStop; pthread_t thread; + SRWLatch lock; // lock is used in serialization + // connection + void* transporter; + SEpSet epSet; SHashObj* activeInfo; // hash SHashObj* getInfoFuncs; // hash @@ -36,6 +42,14 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); +static int32_t hbMqHbRspHandle(SClientHbRsp* pReq) { + return 0; +} + +void hbMgrInitMqHbRspHandle() { + clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; +} + static FORCE_INLINE void hbMgrInitHandle() { // init all handle hbMgrInitMqHbRspHandle(); @@ -50,11 +64,22 @@ static SClientHbBatchReq* hbGatherAllInfo() { int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt); pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); - void *pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + void *pIter = taosHashIterate(clientHbMgr.activeInfo, NULL); while (pIter != NULL) { taosArrayPush(pReq->reqs, pIter); SClientHbReq* pOneReq = pIter; taosHashClear(pOneReq->info); + + pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + } + + pIter = taosHashIterate(clientHbMgr.getInfoFuncs, NULL); + while (pIter != NULL) { + FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; + SClientHbKey connKey; + taosHashCopyKey(pIter, &connKey); + getConnInfoFp(connKey, NULL); + pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); } @@ -64,8 +89,23 @@ static SClientHbBatchReq* hbGatherAllInfo() { static void* hbThreadFunc(void* param) { setThreadName("hb"); while (1) { + int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop); + if(threadStop) { + break; + } + + SClientHbBatchReq* pReq = hbGatherAllInfo(); + void* reqStr = NULL; + tSerializeSClientHbBatchReq(&reqStr, pReq); + SMsgSendInfo info; + + int64_t transporterId = 0; + asyncSendMsgToServer(clientHbMgr.transporter, &clientHbMgr.epSet, &transporterId, &info); + tFreeClientHbBatchReq(pReq); + atomic_add_fetch_32(&clientHbMgr.reportCnt, 1); taosMsleep(HEARTBEAT_INTERVAL); + } return NULL; @@ -84,7 +124,11 @@ static int32_t hbCreateThread() { return 0; } -int hbMgrInit() { +static void hbStopThread() { + atomic_store_8(&clientHbMgr.threadStop, 1); +} + +int hbMgrInit(void* transporter, SEpSet epSet) { // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -101,7 +145,13 @@ int hbMgrInit() { // init getInfoFunc clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + //init connection info + clientHbMgr.transporter = transporter; + clientHbMgr.epSet = epSet; + // init backgroud thread + hbCreateThread(); + return 0; } diff --git a/source/client/src/clientHbMq.c b/source/client/src/clientHbMq.c deleted file mode 100644 index 8f2400aedd..0000000000 --- a/source/client/src/clientHbMq.c +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "clientHb.h" - -static int32_t mqHbRspHandle(SClientHbRsp* pReq) { - return 0; -} - -void hbMgrInitMqHbRspHandle() { - clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = mqHbRspHandle; -} - diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2985d5dfe8..408ce899b1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -31,13 +31,13 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int tlen = 0; tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); - SKlv klv; + SKv kv; void* pIter = taosHashIterate(pReq->info, pIter); while (pIter != NULL) { - taosHashGetKey(pIter, &klv.key, (size_t *)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(pIter); - klv.value = pIter; - taosEncodeSKlv(buf, &klv); + taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); + kv.valueLen = taosHashGetDataLen(pIter); + kv.value = pIter; + taosEncodeSKv(buf, &kv); pIter = taosHashIterate(pReq->info, pIter); } @@ -52,13 +52,22 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { if (pReq->info == NULL) { pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } - SKlv klv; - buf = taosDecodeSKlv(buf, &klv); - taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); + SKv kv; + buf = taosDecodeSKv(buf, &kv); + taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); return buf; } +int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq) { + int tlen = 0; + return tlen; +} + +void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq) { + return buf; +} + int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; diff --git a/source/libs/qcom/test/CMakeLists.txt b/source/libs/qcom/test/CMakeLists.txt index 7adec3752a..e3a0e11a32 100644 --- a/source/libs/qcom/test/CMakeLists.txt +++ b/source/libs/qcom/test/CMakeLists.txt @@ -15,5 +15,5 @@ TARGET_INCLUDE_DIRECTORIES( TARGET_LINK_LIBRARIES( queryUtilTest - PUBLIC os util gtest qcom common + PUBLIC os util gtest qcom common transport ) diff --git a/source/libs/qcom/test/queryTest.cpp b/source/libs/qcom/test/queryTest.cpp index ddf89c6272..8fc6b7e529 100644 --- a/source/libs/qcom/test/queryTest.cpp +++ b/source/libs/qcom/test/queryTest.cpp @@ -17,6 +17,7 @@ #include #include "tmsg.h" #include "query.h" +#include "trpc.h" #pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wunused-function" @@ -80,4 +81,4 @@ TEST(testCase, error_in_async_test) { taosAsyncExec(testPrintError, p, &code); usleep(1000); printf("Error code:%d after asynchronously exec function\n", code); -} \ No newline at end of file +} diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 181661d304..6ec7072a1b 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -794,7 +794,9 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) { SHashNode * node = GET_HASH_PNODE(data); *key = GET_HASH_NODE_KEY(node); - *keyLen = node->keyLen; + if (keyLen) { + *keyLen = node->keyLen; + } return 0; }