From 57d341706bb4e7f23f2eb39f8e40b62040ed5d3e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 7 Jan 2022 14:57:45 +0800 Subject: [PATCH 1/8] implement client heartbeat --- include/common/tmsg.h | 70 +++++++------ source/client/inc/clientHb.h | 28 ++---- source/client/src/clientHb.c | 77 -------------- source/client/src/clientHbMgmt.c | 166 +++++++++++++++++++++++++++++++ source/client/src/clientHbMq.c | 25 +++++ source/common/src/tmsg.c | 14 ++- source/libs/wal/src/walMgmt.c | 3 +- 7 files changed, 249 insertions(+), 134 deletions(-) delete mode 100644 source/client/src/clientHb.c create mode 100644 source/client/src/clientHbMgmt.c create mode 100644 source/client/src/clientHbMq.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 954f24ef6a..d57fec3d5a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -140,6 +140,46 @@ typedef struct SKlv { void* value; } SKlv; +typedef struct SClientHbKey { + int32_t connId; + int32_t hbType; +} SClientHbKey; + +typedef struct SClientHbReq { + SClientHbKey connKey; + SHashObj* info; // hash +} SClientHbReq; + +typedef struct SClientHbBatchReq { + int64_t reqId; + SArray* reqs; // SArray +} SClientHbBatchReq; + +typedef struct SClientHbRsp { + SClientHbKey connKey; + int32_t status; + int32_t bodyLen; + void* body; +} SClientHbRsp; + +typedef struct SClientHbBatchRsp { + int64_t reqId; + int64_t rspId; + SArray* rsps; // SArray +} SClientHbBatchRsp; + +static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { + return taosIntHash_64(key, keyLen); +} + +int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); +void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); +static FORCE_INLINE void tFreeClientHbReq(void *pReq) { + SClientHbReq* req = pReq; + taosHashCleanup(req->info); + free(pReq); +} + static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pKlv->keyLen); @@ -156,10 +196,6 @@ static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); return buf; } -typedef struct SClientHbKey { - int32_t connId; - int32_t hbType; -} SClientHbKey; static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { int tlen = 0; @@ -174,32 +210,6 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) return buf; } -typedef struct SClientHbReq { - SClientHbKey connKey; - SHashObj* info; // hash -} SClientHbReq; - -typedef struct SClientHbBatchReq { - int64_t reqId; - SArray* reqs; // SArray -} SClientHbBatchReq; - -int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); -void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); - -typedef struct SClientHbRsp { - SClientHbKey connKey; - int32_t status; - int32_t bodyLen; - void* body; -} SClientHbRsp; - -typedef struct SClientHbBatchRsp { - int64_t reqId; - int64_t rspId; - SArray* rsps; // SArray -} SClientHbBatchRsp; - typedef struct SBuildTableMetaInput { int32_t vgId; char* dbName; diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 73adb41308..f410c5e3ee 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -18,9 +18,11 @@ #include "thash.h" #include "tmsg.h" +#define HEARTBEAT_INTERVAL 1500 //ms + typedef enum { - mq = 0, - // type can be added here + HEARTBEAT_TYPE_MQ = 0, + // types can be added here // HEARTBEAT_TYPE_MAX } EHbType; @@ -28,26 +30,16 @@ typedef enum { typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); -typedef struct SClientHbMgr { - int8_t inited; - int32_t reportInterval; // unit ms - int32_t stats; - SRWLatch lock; - SHashObj* activeInfo; // hash - SHashObj* getInfoFuncs; // hash - FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; - // input queue -} SClientHbMgr; - -static SClientHbMgr clientHbMgr = {0}; - +// called by mgmt int hbMgrInit(); void hbMgrCleanUp(); -int hbHandleRsp(void* hbMsg); - +int hbHandleRsp(SClientHbBatchRsp* hbRsp); +//called by user int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); - +void hbDeregisterConn(SClientHbKey connKey); int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); +// mq +void hbMgrInitMqHbRspHandle(); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c deleted file mode 100644 index 7daa1204d0..0000000000 --- a/source/client/src/clientHb.c +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "clientHb.h" - -static int32_t mqHbRspHandle(SClientHbRsp* pReq) { - return 0; -} - -uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { - return 0; -} - -static void hbMgrInitMqHbFunc() { - clientHbMgr.handle[mq] = mqHbRspHandle; -} - -int hbMgrInit() { - //init once - int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); - if (old == 1) return 0; - - //init config - clientHbMgr.reportInterval = 1500; - - //init stat - clientHbMgr.stats = 0; - - //init lock - taosInitRWLatch(&clientHbMgr.lock); - - //init handle funcs - hbMgrInitMqHbFunc(); - - //init hash info - clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - //init getInfoFunc - clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - return 0; -} - -void hbMgrCleanUp() { - int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); - if (old == 0) return; - - taosHashCleanup(clientHbMgr.activeInfo); - taosHashCleanup(clientHbMgr.getInfoFuncs); -} - -int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { - - return 0; -} - -int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { - //lock - - //find req by connection id - SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); - ASSERT(data != NULL); - taosHashPut(data->info, key, keyLen, value, valueLen); - - //unlock - return 0; -} diff --git a/source/client/src/clientHbMgmt.c b/source/client/src/clientHbMgmt.c new file mode 100644 index 0000000000..6ac2263039 --- /dev/null +++ b/source/client/src/clientHbMgmt.c @@ -0,0 +1,166 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "clientHb.h" + +typedef struct SClientHbMgr { + int8_t inited; + + // statistics + int32_t reportCnt; + int32_t connKeyCnt; + int64_t reportBytes; // not implemented + int64_t startTime; + // thread + pthread_t thread; + + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash + FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; +} SClientHbMgr; + +static SClientHbMgr clientHbMgr = {0}; + +static int32_t hbCreateThread(); +static void hbStopThread(); + +static FORCE_INLINE void hbMgrInitHandle() { + // init all handle + hbMgrInitMqHbRspHandle(); +} + +static SClientHbBatchReq* hbGatherAllInfo() { + SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq)); + if(pReq == NULL) { + terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; + return NULL; + } + int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt); + pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + + void *pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + while (pIter != NULL) { + taosArrayPush(pReq->reqs, pIter); + SClientHbReq* pOneReq = pIter; + taosHashClear(pOneReq->info); + pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + } + + return pReq; +} + +static void* hbThreadFunc(void* param) { + setThreadName("hb"); + while (1) { + atomic_add_fetch_32(&clientHbMgr.reportCnt, 1); + taosMsleep(HEARTBEAT_INTERVAL); + } + + return NULL; +} + +static int32_t hbCreateThread() { + pthread_attr_t thAttr; + pthread_attr_init(&thAttr); + pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); + + if (pthread_create(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + pthread_attr_destroy(&thAttr); + return 0; +} + +int hbMgrInit() { + // init once + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); + if (old == 1) return 0; + + // init stat + clientHbMgr.startTime = taosGetTimestampMs(); + + // init handle funcs + hbMgrInitHandle(); + + // init hash info + clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + clientHbMgr.activeInfo->freeFp = tFreeClientHbReq; + // init getInfoFunc + clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + + // init backgroud thread + return 0; +} + +void hbMgrCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); + if (old == 0) return; + + taosHashCleanup(clientHbMgr.activeInfo); + taosHashCleanup(clientHbMgr.getInfoFuncs); +} + +int hbHandleRsp(SClientHbBatchRsp* hbRsp) { + int64_t reqId = hbRsp->reqId; + int64_t rspId = hbRsp->rspId; + + SArray* rsps = hbRsp->rsps; + int32_t sz = taosArrayGetSize(rsps); + for (int i = 0; i < sz; i++) { + SClientHbRsp* pRsp = taosArrayGet(rsps, i); + if (pRsp->connKey.hbType < HEARTBEAT_TYPE_MAX) { + clientHbMgr.handle[pRsp->connKey.hbType](pRsp); + } else { + // discard rsp + } + } + return 0; +} + +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { + // init hash in activeinfo + void* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + if (data != NULL) { + return 0; + } + SClientHbReq hbReq; + hbReq.connKey = connKey; + hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + taosHashPut(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + // init hash + if (func != NULL) { + taosHashPut(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); + } + + atomic_add_fetch_32(&clientHbMgr.connKeyCnt, 1); + return 0; +} + +void hbDeregisterConn(SClientHbKey connKey) { + taosHashRemove(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + taosHashRemove(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey)); + atomic_sub_fetch_32(&clientHbMgr.connKeyCnt, 1); +} + +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { + // find req by connection id + SClientHbReq* pReq = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + ASSERT(pReq != NULL); + + taosHashPut(pReq->info, key, keyLen, value, valueLen); + + return 0; +} diff --git a/source/client/src/clientHbMq.c b/source/client/src/clientHbMq.c new file mode 100644 index 0000000000..8f2400aedd --- /dev/null +++ b/source/client/src/clientHbMq.c @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "clientHb.h" + +static int32_t mqHbRspHandle(SClientHbRsp* pReq) { + return 0; +} + +void hbMgrInitMqHbRspHandle() { + clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = mqHbRspHandle; +} + diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 5cbb42c1e6..2985d5dfe8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -31,17 +31,15 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int tlen = 0; tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); - void *pIter = NULL; - void *data; SKlv klv; - data = taosHashIterate(pReq->info, pIter); - while (data != NULL) { - taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(data); - klv.value = data; + void* pIter = taosHashIterate(pReq->info, pIter); + while (pIter != NULL) { + taosHashGetKey(pIter, &klv.key, (size_t *)&klv.keyLen); + klv.valueLen = taosHashGetDataLen(pIter); + klv.value = pIter; taosEncodeSKlv(buf, &klv); - data = taosHashIterate(pReq->info, pIter); + pIter = taosHashIterate(pReq->info, pIter); } return tlen; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 189881c86d..d12acb52c6 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -232,7 +232,8 @@ static int32_t walCreateThread() { if (pthread_create(&tsWal.thread, &thAttr, walThreadFunc, NULL) != 0) { wError("failed to create wal thread since %s", strerror(errno)); - return TAOS_SYSTEM_ERROR(errno); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } pthread_attr_destroy(&thAttr); From 29cb7c67929a91505ea84038ed53e817328f95a6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 7 Jan 2022 16:48:35 +0800 Subject: [PATCH 2/8] implement client heartbeat --- include/common/tmsg.h | 36 +++++++----- include/libs/planner/planner.h | 1 + include/libs/qcom/query.h | 49 ---------------- include/libs/transport/trpc.h | 49 ++++++++++++++++ include/util/thash.h | 18 ++++++ source/client/inc/clientHb.h | 7 ++- .../client/src/{clientHbMgmt.c => clientHb.c} | 56 ++++++++++++++++++- source/client/src/clientHbMq.c | 25 --------- source/common/src/tmsg.c | 25 ++++++--- source/libs/qcom/test/CMakeLists.txt | 2 +- source/libs/qcom/test/queryTest.cpp | 3 +- source/util/src/thash.c | 4 +- 12 files changed, 172 insertions(+), 103 deletions(-) rename source/client/src/{clientHbMgmt.c => clientHb.c} (77%) delete mode 100644 source/client/src/clientHbMq.c diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d57fec3d5a..f173662770 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -133,12 +133,12 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) -typedef struct SKlv { +typedef struct SKv { int32_t keyLen; int32_t valueLen; void* key; void* value; -} SKlv; +} SKv; typedef struct SClientHbKey { int32_t connId; @@ -174,26 +174,36 @@ static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); + static FORCE_INLINE void tFreeClientHbReq(void *pReq) { - SClientHbReq* req = pReq; + SClientHbReq* req = (SClientHbReq*)pReq; taosHashCleanup(req->info); free(pReq); } -static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { +int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); +void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); + +static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) { + SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; + taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + free(pReq); +} + +static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKlv->keyLen); - tlen += taosEncodeFixedI32(buf, pKlv->valueLen); - tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); - tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); + tlen += taosEncodeFixedI32(buf, pKv->keyLen); + tlen += taosEncodeFixedI32(buf, pKv->valueLen); + tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); + tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); return tlen; } -static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { - buf = taosDecodeFixedI32(buf, &pKlv->keyLen); - buf = taosDecodeFixedI32(buf, &pKlv->valueLen); - buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); - buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); +static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { + buf = taosDecodeFixedI32(buf, &pKv->keyLen); + buf = taosDecodeFixedI32(buf, &pKv->valueLen); + buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); + buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); return buf; } diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index a55a4fedab..ae76b022f2 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -23,6 +23,7 @@ extern "C" { #include "query.h" #include "tmsg.h" #include "tarray.h" +#include "trpc.h" #define QUERY_TYPE_MERGE 1 #define QUERY_TYPE_PARTIAL 2 diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 4bfb774435..17a06a941a 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -94,55 +94,6 @@ typedef struct STableMetaOutput { STableMeta *tbMeta; } STableMetaOutput; -typedef struct SDataBuf { - void *pData; - uint32_t len; -} SDataBuf; - -typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); -typedef int32_t (*__async_exec_fn_t)(void* param); - -typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; //async callback function - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; -} SMsgSendInfo; - -typedef struct SQueryNodeAddr{ - int32_t nodeId; //vgId or qnodeId - int8_t inUse; - int8_t numOfEps; - SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; -} SQueryNodeAddr; - -bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); - -int32_t initTaskQueue(); -int32_t cleanupTaskQueue(); - -/** - * - * @param execFn The asynchronously execution function - * @param execParam The parameters of the execFn - * @param code The response code during execution the execFn - * @return - */ -int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); - -/** - * Asynchronously send message to server, after the response received, the callback will be incured. - * - * @param pTransporter - * @param epSet - * @param pTransporterId - * @param pInfo - * @return - */ -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); - const SSchema* tGetTbnameColumnSchema(); void initQueryModuleMsgHandle(); diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index b88d45fbd6..e28098dfbf 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -84,6 +84,55 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +typedef struct SDataBuf { + void *pData; + uint32_t len; +} SDataBuf; + +typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); +typedef int32_t (*__async_exec_fn_t)(void* param); + +typedef struct SMsgSendInfo { + __async_send_cb_fn_t fp; //async callback function + void *param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; +} SMsgSendInfo; + +typedef struct SQueryNodeAddr{ + int32_t nodeId; //vgId or qnodeId + int8_t inUse; + int8_t numOfEps; + SEpAddrMsg epAddr[TSDB_MAX_REPLICA]; +} SQueryNodeAddr; + +bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); + +int32_t initTaskQueue(); +int32_t cleanupTaskQueue(); + +/** + * + * @param execFn The asynchronously execution function + * @param execParam The parameters of the execFn + * @param code The response code during execution the execFn + * @return + */ +int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); + +/** + * Asynchronously send message to server, after the response received, the callback will be incured. + * + * @param pTransporter + * @param epSet + * @param pTransporterId + * @param pInfo + * @return + */ +int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); + #ifdef __cplusplus } #endif diff --git a/include/util/thash.h b/include/util/thash.h index 4558162ac5..9dc6630461 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -211,6 +211,24 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); +/** + * Get the corresponding key information for a given data in hash table, using memcpy + * @param data + * @param dst + * @return + */ +static FORCE_INLINE int32_t taosHashCopyKey(void *data, void* dst) { + if (NULL == data || NULL == dst) { + return -1; + } + + SHashNode * node = GET_HASH_PNODE(data); + void* key = GET_HASH_NODE_KEY(node); + memcpy(dst, key, node->keyLen); + + return 0; +} + /** * Get the corresponding data length for a given data in hash table * @param data diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index f410c5e3ee..2624c8f833 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -28,10 +28,13 @@ typedef enum { } EHbType; typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); -typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); + +//TODO: embed param into function +//return type: SArray +typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param); // called by mgmt -int hbMgrInit(); +int hbMgrInit(void* transporter, SEpSet epSet); void hbMgrCleanUp(); int hbHandleRsp(SClientHbBatchRsp* hbRsp); diff --git a/source/client/src/clientHbMgmt.c b/source/client/src/clientHb.c similarity index 77% rename from source/client/src/clientHbMgmt.c rename to source/client/src/clientHb.c index 6ac2263039..8672b52343 100644 --- a/source/client/src/clientHbMgmt.c +++ b/source/client/src/clientHb.c @@ -14,6 +14,7 @@ */ #include "clientHb.h" +#include "trpc.h" typedef struct SClientHbMgr { int8_t inited; @@ -23,8 +24,13 @@ typedef struct SClientHbMgr { int32_t connKeyCnt; int64_t reportBytes; // not implemented int64_t startTime; - // thread + // ctl + int8_t threadStop; pthread_t thread; + SRWLatch lock; // lock is used in serialization + // connection + void* transporter; + SEpSet epSet; SHashObj* activeInfo; // hash SHashObj* getInfoFuncs; // hash @@ -36,6 +42,14 @@ static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); static void hbStopThread(); +static int32_t hbMqHbRspHandle(SClientHbRsp* pReq) { + return 0; +} + +void hbMgrInitMqHbRspHandle() { + clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = hbMqHbRspHandle; +} + static FORCE_INLINE void hbMgrInitHandle() { // init all handle hbMgrInitMqHbRspHandle(); @@ -50,11 +64,22 @@ static SClientHbBatchReq* hbGatherAllInfo() { int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt); pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); - void *pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + void *pIter = taosHashIterate(clientHbMgr.activeInfo, NULL); while (pIter != NULL) { taosArrayPush(pReq->reqs, pIter); SClientHbReq* pOneReq = pIter; taosHashClear(pOneReq->info); + + pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + } + + pIter = taosHashIterate(clientHbMgr.getInfoFuncs, NULL); + while (pIter != NULL) { + FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; + SClientHbKey connKey; + taosHashCopyKey(pIter, &connKey); + getConnInfoFp(connKey, NULL); + pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); } @@ -64,8 +89,23 @@ static SClientHbBatchReq* hbGatherAllInfo() { static void* hbThreadFunc(void* param) { setThreadName("hb"); while (1) { + int8_t threadStop = atomic_load_8(&clientHbMgr.threadStop); + if(threadStop) { + break; + } + + SClientHbBatchReq* pReq = hbGatherAllInfo(); + void* reqStr = NULL; + tSerializeSClientHbBatchReq(&reqStr, pReq); + SMsgSendInfo info; + + int64_t transporterId = 0; + asyncSendMsgToServer(clientHbMgr.transporter, &clientHbMgr.epSet, &transporterId, &info); + tFreeClientHbBatchReq(pReq); + atomic_add_fetch_32(&clientHbMgr.reportCnt, 1); taosMsleep(HEARTBEAT_INTERVAL); + } return NULL; @@ -84,7 +124,11 @@ static int32_t hbCreateThread() { return 0; } -int hbMgrInit() { +static void hbStopThread() { + atomic_store_8(&clientHbMgr.threadStop, 1); +} + +int hbMgrInit(void* transporter, SEpSet epSet) { // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; @@ -101,7 +145,13 @@ int hbMgrInit() { // init getInfoFunc clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + //init connection info + clientHbMgr.transporter = transporter; + clientHbMgr.epSet = epSet; + // init backgroud thread + hbCreateThread(); + return 0; } diff --git a/source/client/src/clientHbMq.c b/source/client/src/clientHbMq.c deleted file mode 100644 index 8f2400aedd..0000000000 --- a/source/client/src/clientHbMq.c +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "clientHb.h" - -static int32_t mqHbRspHandle(SClientHbRsp* pReq) { - return 0; -} - -void hbMgrInitMqHbRspHandle() { - clientHbMgr.handle[HEARTBEAT_TYPE_MQ] = mqHbRspHandle; -} - diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2985d5dfe8..408ce899b1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -31,13 +31,13 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int tlen = 0; tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); - SKlv klv; + SKv kv; void* pIter = taosHashIterate(pReq->info, pIter); while (pIter != NULL) { - taosHashGetKey(pIter, &klv.key, (size_t *)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(pIter); - klv.value = pIter; - taosEncodeSKlv(buf, &klv); + taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); + kv.valueLen = taosHashGetDataLen(pIter); + kv.value = pIter; + taosEncodeSKv(buf, &kv); pIter = taosHashIterate(pReq->info, pIter); } @@ -52,13 +52,22 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { if (pReq->info == NULL) { pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } - SKlv klv; - buf = taosDecodeSKlv(buf, &klv); - taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); + SKv kv; + buf = taosDecodeSKv(buf, &kv); + taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); return buf; } +int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq) { + int tlen = 0; + return tlen; +} + +void* tDeserializeClientHbBatchReq(void* buf, SClientHbBatchReq* pReq) { + return buf; +} + int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; diff --git a/source/libs/qcom/test/CMakeLists.txt b/source/libs/qcom/test/CMakeLists.txt index 7adec3752a..e3a0e11a32 100644 --- a/source/libs/qcom/test/CMakeLists.txt +++ b/source/libs/qcom/test/CMakeLists.txt @@ -15,5 +15,5 @@ TARGET_INCLUDE_DIRECTORIES( TARGET_LINK_LIBRARIES( queryUtilTest - PUBLIC os util gtest qcom common + PUBLIC os util gtest qcom common transport ) diff --git a/source/libs/qcom/test/queryTest.cpp b/source/libs/qcom/test/queryTest.cpp index ddf89c6272..8fc6b7e529 100644 --- a/source/libs/qcom/test/queryTest.cpp +++ b/source/libs/qcom/test/queryTest.cpp @@ -17,6 +17,7 @@ #include #include "tmsg.h" #include "query.h" +#include "trpc.h" #pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wunused-function" @@ -80,4 +81,4 @@ TEST(testCase, error_in_async_test) { taosAsyncExec(testPrintError, p, &code); usleep(1000); printf("Error code:%d after asynchronously exec function\n", code); -} \ No newline at end of file +} diff --git a/source/util/src/thash.c b/source/util/src/thash.c index 181661d304..6ec7072a1b 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -794,7 +794,9 @@ FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) { SHashNode * node = GET_HASH_PNODE(data); *key = GET_HASH_NODE_KEY(node); - *keyLen = node->keyLen; + if (keyLen) { + *keyLen = node->keyLen; + } return 0; } From 4d635478fea25359ad812b8282bd747809a125d5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 7 Jan 2022 18:59:17 +0800 Subject: [PATCH 3/8] fix --- source/client/src/clientHb.c | 3 ++- source/common/src/tmsg.c | 2 +- source/libs/wal/src/walWrite.c | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 8672b52343..1d50f7574a 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -96,8 +96,9 @@ static void* hbThreadFunc(void* param) { SClientHbBatchReq* pReq = hbGatherAllInfo(); void* reqStr = NULL; - tSerializeSClientHbBatchReq(&reqStr, pReq); + int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq); SMsgSendInfo info; + /*info.fp = hbHandleRsp;*/ int64_t transporterId = 0; asyncSendMsgToServer(clientHbMgr.transporter, &clientHbMgr.epSet, &transporterId, &info); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 408ce899b1..09523da965 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -37,7 +37,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); kv.valueLen = taosHashGetDataLen(pIter); kv.value = pIter; - taosEncodeSKv(buf, &kv); + tlen += taosEncodeSKv(buf, &kv); pIter = taosHashIterate(pReq->info, pIter); } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 39bf06cedd..5eda16b061 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -244,6 +244,7 @@ int walRoll(SWal *pWal) { pWal->writeIdxTfd = idxTfd; pWal->writeLogTfd = logTfd; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; + ASSERT(pWal->writeCur >= 0); pWal->lastRollSeq = walGetSeq(); return 0; From 87411e72a8448f13a81290ff0f5052e6041596c3 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 10 Jan 2022 01:30:12 -0500 Subject: [PATCH 4/8] TD-12678 bug fix --- source/libs/parser/test/mockCatalogService.cpp | 3 +++ source/libs/planner/src/physicalPlan.c | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 2a0f6b38eb..e2b8766700 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -112,6 +112,9 @@ public: int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { // todo vgInfo->vgId = 1; + vgInfo->numOfEps = 1; + vgInfo->epAddr[0].port = 6030; + strcpy(vgInfo->epAddr[0].fqdn, "node1"); return 0; } diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 7c427efb5a..a38d110d5f 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -234,7 +234,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) { execNode->nodeId = vg->vgId; - execNode->inUse = 0; // todo + execNode->inUse = vg->inUse; execNode->numOfEps = vg->numOfEps; for (int8_t i = 0; i < vg->numOfEps; ++i) { execNode->epAddr[i] = vg->epAddr[i]; From 1e17d776f905e998ea69e13fbde9726156d39954 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 10 Jan 2022 16:13:05 +0800 Subject: [PATCH 5/8] fix crash --- source/client/inc/clientHb.h | 48 +++++++-- source/client/src/clientHb.c | 142 ++++++++++++++------------ source/common/src/tmsg.c | 16 +-- source/dnode/mnode/impl/src/mndSync.c | 4 +- source/libs/wal/inc/walInt.h | 1 + source/libs/wal/src/walMeta.c | 1 + source/libs/wal/src/walWrite.c | 8 ++ 7 files changed, 137 insertions(+), 83 deletions(-) diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 2624c8f833..7bc4311b29 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -18,7 +18,7 @@ #include "thash.h" #include "tmsg.h" -#define HEARTBEAT_INTERVAL 1500 //ms +#define HEARTBEAT_INTERVAL 1500 // ms typedef enum { HEARTBEAT_TYPE_MQ = 0, @@ -29,20 +29,50 @@ typedef enum { typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); -//TODO: embed param into function -//return type: SArray +typedef struct SAppHbMgr { + // statistics + int32_t reportCnt; + int32_t connKeyCnt; + int64_t reportBytes; // not implemented + int64_t startTime; + // ctl + SRWLatch lock; // lock is used in serialization + // connection + void* transporter; + SEpSet epSet; + // info + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash +} SAppHbMgr; + +typedef struct SClientHbMgr { + int8_t inited; + // ctl + int8_t threadStop; + pthread_t thread; + pthread_mutex_t lock; // used when app init and cleanup + SArray* appHbMgrs; // SArray one for each cluster + FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; +} SClientHbMgr; + +// TODO: embed param into function +// return type: SArray typedef SArray* (*FGetConnInfo)(SClientHbKey connKey, void* param); -// called by mgmt -int hbMgrInit(void* transporter, SEpSet epSet); +// global, called by mgmt +int hbMgrInit(); void hbMgrCleanUp(); int hbHandleRsp(SClientHbBatchRsp* hbRsp); -//called by user -int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); -void hbDeregisterConn(SClientHbKey connKey); +// cluster level +SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet); +void appHbMgrCleanup(SAppHbMgr* pAppHbMgr); -int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); +// conn level +int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func); +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); + +int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); // mq void hbMgrInitMqHbRspHandle(); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1d50f7574a..9bbd62c1d9 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -16,27 +16,6 @@ #include "clientHb.h" #include "trpc.h" -typedef struct SClientHbMgr { - int8_t inited; - - // statistics - int32_t reportCnt; - int32_t connKeyCnt; - int64_t reportBytes; // not implemented - int64_t startTime; - // ctl - int8_t threadStop; - pthread_t thread; - SRWLatch lock; // lock is used in serialization - // connection - void* transporter; - SEpSet epSet; - - SHashObj* activeInfo; // hash - SHashObj* getInfoFuncs; // hash - FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; -} SClientHbMgr; - static SClientHbMgr clientHbMgr = {0}; static int32_t hbCreateThread(); @@ -55,32 +34,32 @@ static FORCE_INLINE void hbMgrInitHandle() { hbMgrInitMqHbRspHandle(); } -static SClientHbBatchReq* hbGatherAllInfo() { +SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { SClientHbBatchReq* pReq = malloc(sizeof(SClientHbBatchReq)); - if(pReq == NULL) { + if (pReq == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - int32_t connKeyCnt = atomic_load_32(&clientHbMgr.connKeyCnt); + int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); pReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); - void *pIter = taosHashIterate(clientHbMgr.activeInfo, NULL); + void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL); while (pIter != NULL) { taosArrayPush(pReq->reqs, pIter); SClientHbReq* pOneReq = pIter; taosHashClear(pOneReq->info); - pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } - pIter = taosHashIterate(clientHbMgr.getInfoFuncs, NULL); + pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); while (pIter != NULL) { FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; SClientHbKey connKey; taosHashCopyKey(pIter, &connKey); getConnInfoFp(connKey, NULL); - pIter = taosHashIterate(clientHbMgr.activeInfo, pIter); + pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } return pReq; @@ -93,22 +72,24 @@ static void* hbThreadFunc(void* param) { if(threadStop) { break; } - - SClientHbBatchReq* pReq = hbGatherAllInfo(); - void* reqStr = NULL; - int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq); - SMsgSendInfo info; - /*info.fp = hbHandleRsp;*/ - int64_t transporterId = 0; - asyncSendMsgToServer(clientHbMgr.transporter, &clientHbMgr.epSet, &transporterId, &info); - tFreeClientHbBatchReq(pReq); + int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); + for(int i = 0; i < sz; i++) { + SAppHbMgr* pAppHbMgr = taosArrayGet(clientHbMgr.appHbMgrs, i); + SClientHbBatchReq* pReq = hbGatherAllInfo(pAppHbMgr); + void* reqStr = NULL; + int tlen = tSerializeSClientHbBatchReq(&reqStr, pReq); + SMsgSendInfo info; + /*info.fp = hbHandleRsp;*/ - atomic_add_fetch_32(&clientHbMgr.reportCnt, 1); - taosMsleep(HEARTBEAT_INTERVAL); + int64_t transporterId = 0; + asyncSendMsgToServer(pAppHbMgr->transporter, &pAppHbMgr->epSet, &transporterId, &info); + tFreeClientHbBatchReq(pReq); + atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); + taosMsleep(HEARTBEAT_INTERVAL); + } } - return NULL; } @@ -129,27 +110,55 @@ static void hbStopThread() { atomic_store_8(&clientHbMgr.threadStop, 1); } -int hbMgrInit(void* transporter, SEpSet epSet) { +SAppHbMgr* appHbMgrInit(void* transporter, SEpSet epSet) { + SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr)); + if (pAppHbMgr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + // init stat + pAppHbMgr->startTime = taosGetTimestampMs(); + + // init connection info + pAppHbMgr->transporter = transporter; + pAppHbMgr->epSet = epSet; + + // init hash info + pAppHbMgr->activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq; + // init getInfoFunc + pAppHbMgr->getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + + taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr); + return pAppHbMgr; +} + +void appHbMgrCleanup(SAppHbMgr* pAppHbMgr) { + pthread_mutex_lock(&clientHbMgr.lock); + + int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); + for (int i = 0; i < sz; i++) { + SAppHbMgr* pTarget = taosArrayGet(clientHbMgr.appHbMgrs, i); + if (pAppHbMgr == pTarget) { + taosHashCleanup(pTarget->activeInfo); + taosHashCleanup(pTarget->getInfoFuncs); + } + } + + pthread_mutex_unlock(&clientHbMgr.lock); +} + +int hbMgrInit() { // init once int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); if (old == 1) return 0; - // init stat - clientHbMgr.startTime = taosGetTimestampMs(); + clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void*)); + pthread_mutex_init(&clientHbMgr.lock, NULL); // init handle funcs hbMgrInitHandle(); - // init hash info - clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - clientHbMgr.activeInfo->freeFp = tFreeClientHbReq; - // init getInfoFunc - clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - - //init connection info - clientHbMgr.transporter = transporter; - clientHbMgr.epSet = epSet; - // init backgroud thread hbCreateThread(); @@ -157,11 +166,12 @@ int hbMgrInit(void* transporter, SEpSet epSet) { } void hbMgrCleanUp() { + // destroy all appHbMgr int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); if (old == 0) return; - taosHashCleanup(clientHbMgr.activeInfo); - taosHashCleanup(clientHbMgr.getInfoFuncs); + taosArrayDestroy(clientHbMgr.appHbMgrs); + } int hbHandleRsp(SClientHbBatchRsp* hbRsp) { @@ -181,34 +191,34 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp) { return 0; } -int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { +int hbRegisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, FGetConnInfo func) { // init hash in activeinfo - void* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + void* data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); if (data != NULL) { return 0; } SClientHbReq hbReq; hbReq.connKey = connKey; hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); - taosHashPut(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); + taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq)); // init hash if (func != NULL) { - taosHashPut(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); + taosHashPut(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey), func, sizeof(FGetConnInfo)); } - atomic_add_fetch_32(&clientHbMgr.connKeyCnt, 1); + atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1); return 0; } -void hbDeregisterConn(SClientHbKey connKey) { - taosHashRemove(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); - taosHashRemove(clientHbMgr.getInfoFuncs, &connKey, sizeof(SClientHbKey)); - atomic_sub_fetch_32(&clientHbMgr.connKeyCnt, 1); +void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey) { + taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); + taosHashRemove(pAppHbMgr->getInfoFuncs, &connKey, sizeof(SClientHbKey)); + atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1); } -int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { +int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { // find req by connection id - SClientHbReq* pReq = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + SClientHbReq* pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey)); ASSERT(pReq != NULL); taosHashPut(pReq->info, key, keyLen, value, valueLen); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 09523da965..b94bd6f715 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -31,7 +31,9 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int tlen = 0; tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); - SKv kv; + int kvNum = taosHashGetSize(pReq->info); + tlen += taosEncodeFixedI32(buf, kvNum); + SKv kv; void* pIter = taosHashIterate(pReq->info, pIter); while (pIter != NULL) { taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); @@ -49,12 +51,14 @@ void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { buf = taosDecodeSClientHbKey(buf, &pReq->connKey); // TODO: error handling - if (pReq->info == NULL) { - pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + int kvNum; + taosDecodeFixedI32(buf, &kvNum); + pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + for(int i = 0; i < kvNum; i++) { + SKv kv; + buf = taosDecodeSKv(buf, &kv); + taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); } - SKv kv; - buf = taosDecodeSKv(buf, &kv); - taosHashPut(pReq->info, kv.key, kv.keyLen, kv.value, kv.valueLen); return buf; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 47d0ce4105..e55da73f62 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -95,11 +95,11 @@ static int32_t mndRestoreWal(SMnode *pMnode) { if (sdbWriteFile(pSdb) != 0) { goto WAL_RESTORE_OVER; } - } if (walEndSnapshot(pWal) < 0) { goto WAL_RESTORE_OVER; } + } code = 0; @@ -181,4 +181,4 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; return pMgmt->state == TAOS_SYNC_STATE_LEADER; -} \ No newline at end of file +} diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 871c95193f..7631593dd8 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -131,6 +131,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); // seek section int walChangeFile(SWal* pWal, int64_t ver); +int walChangeFileToLast(SWal* pWal); // seek section end int64_t walGetSeq(); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 6164ec2baf..270a26bf80 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -184,6 +184,7 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { } taosArraySetSize(pArray, sz); pWal->fileInfoSet = pArray; + pWal->writeCur = sz - 1; cJSON_Delete(pRoot); return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 5eda16b061..975f232e3d 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -254,6 +254,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { SWalIdxEntry entry = {.ver = ver, .offset = offset}; int size = tfWrite(pWal->writeIdxTfd, &entry, sizeof(SWalIdxEntry)); if (size != sizeof(SWalIdxEntry)) { + terrno = TAOS_SYSTEM_ERROR(errno); // TODO truncate return -1; } @@ -287,7 +288,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } /*if (!tfValid(pWal->writeLogTfd)) return -1;*/ + ASSERT(pWal->writeCur >= 0); + pthread_mutex_lock(&pWal->mutex); + if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) { + walChangeFileToLast(pWal); + } + pWal->writeHead.head.version = index; int64_t offset = walGetCurFileOffset(pWal); @@ -309,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } + code = walWriteIndex(pWal, index, offset); if (code != 0) { // TODO From 3d7b737bbc4b7c9c245cd723bfc6f5f0b2d64771 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 10 Jan 2022 16:23:35 +0800 Subject: [PATCH 6/8] fix compile error --- source/dnode/mnode/impl/src/mndSync.c | 13 +++++++------ source/libs/planner/CMakeLists.txt | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 581b57ea65..591367c519 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -92,13 +92,14 @@ static int32_t mndRestoreWal(SMnode *pMnode) { goto WAL_RESTORE_OVER; } - if (walBeginSnapshot(pWal, sdbVer) < 0) { - goto WAL_RESTORE_OVER; - } + if (walBeginSnapshot(pWal, sdbVer) < 0) { + goto WAL_RESTORE_OVER; + } + + if (walEndSnapshot(pWal) < 0) { + goto WAL_RESTORE_OVER; + } - if (walEndSnapshot(pWal) < 0) { - goto WAL_RESTORE_OVER; - } } code = 0; diff --git a/source/libs/planner/CMakeLists.txt b/source/libs/planner/CMakeLists.txt index 6234dbe0ac..8d8c148fde 100644 --- a/source/libs/planner/CMakeLists.txt +++ b/source/libs/planner/CMakeLists.txt @@ -8,7 +8,8 @@ target_include_directories( target_link_libraries( planner - PRIVATE os util catalog cjson parser transport function qcom + PRIVATE os util catalog cjson parser function qcom + PUBLIC transport ) if(${BUILD_TEST}) From 155978b883628afbfbb3c9dfee053144d1e15c26 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 10 Jan 2022 00:24:23 -0800 Subject: [PATCH 7/8] minor changes --- source/dnode/mnode/impl/inc/mndVgroup.h | 4 +- source/dnode/mnode/impl/src/mndAcct.c | 2 +- source/dnode/mnode/impl/src/mndBnode.c | 2 +- source/dnode/mnode/impl/src/mndCluster.c | 2 +- source/dnode/mnode/impl/src/mndDb.c | 211 +++++++++-------------- source/dnode/mnode/impl/src/mndDnode.c | 2 +- source/dnode/mnode/impl/src/mndFunc.c | 2 +- source/dnode/mnode/impl/src/mndMnode.c | 2 +- source/dnode/mnode/impl/src/mndQnode.c | 2 +- source/dnode/mnode/impl/src/mndSnode.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 2 +- source/dnode/mnode/impl/src/mndUser.c | 2 +- source/dnode/mnode/impl/src/mndVgroup.c | 6 +- 13 files changed, 100 insertions(+), 141 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 9e4656fec8..6ab11aa1b4 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); -SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); -SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); +SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); +SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index 945a6523ef..aa87eb43a1 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -177,7 +177,7 @@ static int32_t mndAcctActionDelete(SSdb *pSdb, SAcctObj *pAcct) { } static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) { - mTrace("acct:%s, perform update action, old_row:%p new_row:%p", pOld->acct, pOld, pNew); + mTrace("acct:%s, perform update action, old row:%p new row:%p", pOld->acct, pOld, pNew); pOld->updateTime = pNew->updateTime; pOld->status = pNew->status; diff --git a/source/dnode/mnode/impl/src/mndBnode.c b/source/dnode/mnode/impl/src/mndBnode.c index 6325bbb6fd..dbe925992d 100644 --- a/source/dnode/mnode/impl/src/mndBnode.c +++ b/source/dnode/mnode/impl/src/mndBnode.c @@ -155,7 +155,7 @@ static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj) { } static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew) { - mTrace("bnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + mTrace("bnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); pOld->updateTime = pNew->updateTime; return 0; } diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index 8cb98a148f..92b9017627 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -135,7 +135,7 @@ static int32_t mndClusterActionDelete(SSdb *pSdb, SClusterObj *pCluster) { } static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj *pNew) { - mTrace("cluster:%" PRId64 ", perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); return 0; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 12f839358c..1a5ea53d3c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -28,7 +28,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb); static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); -static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb); +static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq); static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq); static int32_t mndProcessDropDbReq(SMnodeMsg *pReq); @@ -182,12 +182,12 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { return 0; } -static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { - mTrace("db:%s, perform update action, old_row:%p new_row:%p", pOldDb->name, pOldDb, pNewDb); - pOldDb->updateTime = pNewDb->updateTime; - pOldDb->cfgVersion = pNewDb->cfgVersion; - pOldDb->vgVersion = pNewDb->vgVersion; - memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg)); +static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) { + mTrace("db:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); + pOld->updateTime = pNew->updateTime; + pOld->cfgVersion = pNew->cfgVersion; + pOld->vgVersion = pNew->vgVersion; + memcpy(&pOld->cfg, &pNew->cfg, sizeof(SDbCfg)); return 0; } @@ -331,14 +331,15 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SCreateVnodeReq *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); - if (pMsg == NULL) return -1; + SCreateVnodeReq *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup); + if (pReq == NULL) return -1; - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SCreateVnodeReq); action.msgType = TDMT_DND_CREATE_VNODE; + action.acceptableCode = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); return -1; } } @@ -360,14 +361,15 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); - if (pMsg == NULL) return -1; + SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup); + if (pReq == NULL) return -1; - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SDropVnodeReq); action.msgType = TDMT_DND_DROP_VNODE; + action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED; if (mndTransAppendUndoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); return -1; } } @@ -376,7 +378,7 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) { +static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pReq, SCreateDbMsg *pCreate, SUserObj *pUser) { SDbObj dbObj = {0}; memcpy(dbObj.name, pCreate->db, TSDB_DB_FNAME_LEN); memcpy(dbObj.acct, pUser->acct, TSDB_USER_LEN); @@ -425,43 +427,17 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat } int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("db:%s, failed to create since %s", pCreate->db, terrstr()); - goto CREATE_DB_OVER; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + if (pTrans == NULL) goto CREATE_DB_OVER; mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); - if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto CREATE_DB_OVER; - } - - if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { - mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr()); - goto CREATE_DB_OVER; - } - - if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto CREATE_DB_OVER; - } - - if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_DB_OVER; - } - - if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto CREATE_DB_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto CREATE_DB_OVER; - } + if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; + if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; + if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; + if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; + if (mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto CREATE_DB_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_DB_OVER; code = 0; @@ -471,9 +447,9 @@ CREATE_DB_OVER: return code; } -static int32_t mndProcessCreateDbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont; +static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SCreateDbMsg *pCreate = pReq->rpcMsg.pCont; pCreate->numOfVgroups = htonl(pCreate->numOfVgroups); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); @@ -502,13 +478,13 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pMsg) { } } - SUserObj *pOperUser = mndAcquireUser(pMnode, pMsg->user); + SUserObj *pOperUser = mndAcquireUser(pMnode, pReq->user); if (pOperUser == NULL) { mError("db:%s, failed to create since %s", pCreate->db, terrstr()); return -1; } - int32_t code = mndCreateDb(pMnode, pMsg, pCreate, pOperUser); + int32_t code = mndCreateDb(pMnode, pReq, pCreate, pOperUser); mndReleaseUser(pMnode, pOperUser); if (code != 0) { @@ -565,8 +541,8 @@ static int32_t mndSetDbCfgFromAlterDbMsg(SDbObj *pDb, SAlterDbMsg *pAlter) { return terrno; } -static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { - SSdbRaw *pRedoRaw = mndDbActionEncode(pOldDb); +static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { + SSdbRaw *pRedoRaw = mndDbActionEncode(pOld); if (pRedoRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_UPDATING) != 0) return -1; @@ -574,8 +550,8 @@ static int32_t mndSetUpdateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pO return 0; } -static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { - SSdbRaw *pCommitRaw = mndDbActionEncode(pNewDb); +static int32_t mndSetUpdateDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { + SSdbRaw *pCommitRaw = mndDbActionEncode(pNew); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; @@ -593,14 +569,14 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SAlterVnodeReq *pMsg = (SAlterVnodeReq *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); - if (pMsg == NULL) return -1; + SAlterVnodeReq *pReq = (SAlterVnodeReq *)mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup); + if (pReq == NULL) return -1; - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SAlterVnodeReq); action.msgType = TDMT_DND_ALTER_VNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); return -1; } } @@ -608,7 +584,7 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { +static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -617,8 +593,8 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - if (pVgroup->dbUid == pNewDb->uid) { - if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNewDb, pVgroup) != 0) { + if (pVgroup->dbUid == pNew->uid) { + if (mndBuildUpdateVgroupAction(pMnode, pTrans, pNew, pVgroup) != 0) { sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); return -1; @@ -631,27 +607,27 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj return 0; } -static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { +static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pOld, SDbObj *pNew) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); if (pTrans == NULL) { - mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); + mError("db:%s, failed to update since %s", pOld->name, terrstr()); return terrno; } - mDebug("trans:%d, used to update db:%s", pTrans->id, pOldDb->name); + mDebug("trans:%d, used to update db:%s", pTrans->id, pOld->name); - if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { + if (mndSetUpdateDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); goto UPDATE_DB_OVER; } - if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOldDb, pNewDb) != 0) { + if (mndSetUpdateDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); goto UPDATE_DB_OVER; } - if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOldDb, pNewDb) != 0) { + if (mndSetUpdateDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) { mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); goto UPDATE_DB_OVER; } @@ -668,9 +644,9 @@ UPDATE_DB_OVER: return code; } -static int32_t mndProcessAlterDbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SAlterDbMsg *pAlter = pMsg->rpcMsg.pCont; +static int32_t mndProcessAlterDbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SAlterDbMsg *pAlter = pReq->rpcMsg.pCont; pAlter->totalBlocks = htonl(pAlter->totalBlocks); pAlter->daysToKeep0 = htonl(pAlter->daysToKeep0); pAlter->daysToKeep1 = htonl(pAlter->daysToKeep1); @@ -697,7 +673,7 @@ static int32_t mndProcessAlterDbReq(SMnodeMsg *pMsg) { dbObj.cfgVersion++; dbObj.updateTime = taosGetTimestampMs(); - code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj); + code = mndUpdateDb(pMnode, pReq, pDb, &dbObj); mndReleaseDb(pMnode, pDb); if (code != 0) { @@ -757,14 +733,15 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); - if (pMsg == NULL) return -1; + SDropVnodeReq *pReq = mndBuildDropVnodeReq(pMnode, pDnode, pDb, pVgroup); + if (pReq == NULL) return -1; - action.pCont = pMsg; + action.pCont = pReq; action.contLen = sizeof(SCreateVnodeReq); action.msgType = TDMT_DND_DROP_VNODE; + action.acceptableCode = TSDB_CODE_DND_VNODE_NOT_DEPLOYED; if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMsg); + free(pReq); return -1; } } @@ -795,35 +772,17 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p return 0; } -static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { +static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pReq, SDbObj *pDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - mError("db:%s, failed to drop since %s", pDb->name, terrstr()); - return -1; - } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg); + if (pTrans == NULL) goto DROP_DB_OVER; mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); - if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) { - mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); - goto DROP_DB_OVER; - } - - if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) { - mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); - goto DROP_DB_OVER; - } - - if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) { - mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); - goto DROP_DB_OVER; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - goto DROP_DB_OVER; - } + if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; + if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; + if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto DROP_DB_OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_DB_OVER; code = 0; @@ -832,9 +791,9 @@ DROP_DB_OVER: return code; } -static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SDropDbMsg *pDrop = pMsg->rpcMsg.pCont; +static int32_t mndProcessDropDbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SDropDbMsg *pDrop = pReq->rpcMsg.pCont; mDebug("db:%s, start to drop", pDrop->db); @@ -850,7 +809,7 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) { } } - int32_t code = mndDropDb(pMnode, pMsg, pDb); + int32_t code = mndDropDb(pMnode, pReq, pDb); mndReleaseDb(pMnode, pDb); if (code != 0) { @@ -861,16 +820,16 @@ static int32_t mndProcessDropDbReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessUseDbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; - SUseDbMsg *pUse = pMsg->rpcMsg.pCont; + SUseDbMsg *pUse = pReq->rpcMsg.pCont; pUse->vgVersion = htonl(pUse->vgVersion); SDbObj *pDb = mndAcquireDb(pMnode, pUse->db); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_EXIST; - mError("db:%s, failed to process use db msg since %s", pUse->db, terrstr()); + mError("db:%s, failed to process use db req since %s", pUse->db, terrstr()); return -1; } @@ -922,19 +881,19 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pMsg) { pRsp->vgNum = htonl(vindex); pRsp->hashMethod = pDb->hashMethod; - pMsg->pCont = pRsp; - pMsg->contLen = contLen; + pReq->pCont = pRsp; + pReq->contLen = contLen; mndReleaseDb(pMnode, pDb); return 0; } -static int32_t mndProcessSyncDbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SSyncDbMsg *pSync = pMsg->rpcMsg.pCont; +static int32_t mndProcessSyncDbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SSyncDbMsg *pSync = pReq->rpcMsg.pCont; SDbObj *pDb = mndAcquireDb(pMnode, pSync->db); if (pDb == NULL) { - mError("db:%s, failed to process sync db msg since %s", pSync->db, terrstr()); + mError("db:%s, failed to process sync db req since %s", pSync->db, terrstr()); return -1; } @@ -942,12 +901,12 @@ static int32_t mndProcessSyncDbReq(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessCompactDbReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - SCompactDbMsg *pCompact = pMsg->rpcMsg.pCont; +static int32_t mndProcessCompactDbReq(SMnodeMsg *pReq) { + SMnode *pMnode = pReq->pMnode; + SCompactDbMsg *pCompact = pReq->rpcMsg.pCont; SDbObj *pDb = mndAcquireDb(pMnode, pCompact->db); if (pDb == NULL) { - mError("db:%s, failed to process compact db msg since %s", pCompact->db, terrstr()); + mError("db:%s, failed to process compact db req since %s", pCompact->db, terrstr()); return -1; } @@ -955,8 +914,8 @@ static int32_t mndProcessCompactDbReq(SMnodeMsg *pMsg) { return 0; } -static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { - SMnode *pMnode = pMsg->pMnode; +static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t cols = 0; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d94078cfe1..1b62a47c91 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -183,7 +183,7 @@ static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { } static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOld, SDnodeObj *pNew) { - mTrace("dnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + mTrace("dnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); pOld->updateTime = pNew->updateTime; return 0; } diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index f7a39ce9e5..1c809a9b9a 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -152,7 +152,7 @@ static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) { } static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOldFunc, SFuncObj *pNewFunc) { - mTrace("func:%s, perform update action, old_row:%p new_row:%p", pOldFunc->name, pOldFunc, pNewFunc); + mTrace("func:%s, perform update action, old row:%p new row:%p", pOldFunc->name, pOldFunc, pNewFunc); return 0; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index c14d1f51f8..2c09b29872 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -208,7 +208,7 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { } static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNew) { - mTrace("mnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + mTrace("mnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); pOld->updateTime = pNew->updateTime; return 0; } diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index a223890e8c..88cade08ed 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -155,7 +155,7 @@ static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) { } static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew) { - mTrace("qnode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + mTrace("qnode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); pOld->updateTime = pNew->updateTime; return 0; } diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 1b66bedbcc..dabd1f0142 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -155,7 +155,7 @@ static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj) { } static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew) { - mTrace("snode:%d, perform update action, old_row:%p new_row:%p", pOld->id, pOld, pNew); + mTrace("snode:%d, perform update action, old row:%p new row:%p", pOld->id, pOld, pNew); pOld->updateTime = pNew->updateTime; return 0; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index d6b6a07de0..b01c291ce5 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -178,7 +178,7 @@ static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) { } static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb) { - mTrace("stb:%s, perform update action, old_row:%p new_row:%p", pOldStb->name, pOldStb, pNewStb); + mTrace("stb:%s, perform update action, old row:%p new row:%p", pOldStb->name, pOldStb, pNewStb); atomic_exchange_32(&pOldStb->updateTime, pNewStb->updateTime); atomic_exchange_32(&pOldStb->version, pNewStb->version); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index e73b34e688..8c15398215 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -192,7 +192,7 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { } static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pOld, SUserObj *pNew) { - mTrace("user:%s, perform update action, old_row:%p new_row:%p", pOld->user, pOld, pNew); + mTrace("user:%s, perform update action, old row:%p new row:%p", pOld->user, pOld, pNew); memcpy(pOld->pass, pNew->pass, TSDB_PASSWORD_LEN); pOld->updateTime = pNew->updateTime; return 0; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e9d35a6e4c..c5c58a075d 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -165,7 +165,7 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) { } static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) { - mTrace("vgId:%d, perform update action, old_row:%p new_row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup); + mTrace("vgId:%d, perform update action, old row:%p new row:%p", pOldVgroup->vgId, pOldVgroup, pNewVgroup); pOldVgroup->updateTime = pNewVgroup->updateTime; pOldVgroup->version = pNewVgroup->version; pOldVgroup->hashBegin = pNewVgroup->hashBegin; @@ -189,7 +189,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { sdbRelease(pSdb, pVgroup); } -SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { +SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq)); if (pCreate == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -248,7 +248,7 @@ SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb return pCreate; } -SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { +SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq)); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; From 7508af0057043b3f8f414b8337b6e9fe933dd2db Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 10 Jan 2022 04:48:30 -0500 Subject: [PATCH 8/8] TD-12678 bug fix --- source/client/src/clientImpl.c | 2 + source/libs/planner/src/physicalPlanJson.c | 57 +++++++++++++++++++--- 2 files changed, 51 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a6b04624d7..d18142cebf 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -305,6 +305,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { } else { CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); + pRequest->code = terrno; + return pRequest; } _return: diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 20da1842cf..d67ff956b9 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -62,7 +62,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f return func(jObj, *obj); } -static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* array) { +static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) { size_t size = (NULL == array) ? 0 : taosArrayGetSize(array); if (size > 0) { cJSON* jArray = cJSON_AddArrayToObject(json, name); @@ -70,7 +70,7 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* return false; } for (size_t i = 0; i < size; ++i) { - if (!addItem(jArray, func, taosArrayGetP(array, i))) { + if (!addItem(jArray, func, isPoint ? taosArrayGetP(array, i) : taosArrayGet(array, i))) { return false; } } @@ -78,11 +78,19 @@ static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* return true; } -static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) { +static bool addInlineArray(cJSON* json, const char* name, FToJson func, const SArray* array) { + return addTarray(json, name, func, array, false); +} + +static bool addArray(cJSON* json, const char* name, FToJson func, const SArray* array) { + return addTarray(json, name, func, array, true); +} + +static bool fromTarray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize, bool isPoint) { const cJSON* jArray = cJSON_GetObjectItem(json, name); int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray)); if (size > 0) { - *array = taosArrayInit(size, POINTER_BYTES); + *array = taosArrayInit(size, isPoint ? POINTER_BYTES : itemSize); if (NULL == *array) { return false; } @@ -92,11 +100,19 @@ static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArra if (NULL == item || !func(cJSON_GetArrayItem(jArray, i), item)) { return false; } - taosArrayPush(*array, &item); + taosArrayPush(*array, isPoint ? &item : item); } return true; } +static bool fromInlineArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) { + return fromTarray(json, name, func, array, itemSize, false); +} + +static bool fromArray(const cJSON* json, const char* name, FFromJson func, SArray** array, int32_t itemSize) { + return fromTarray(json, name, func, array, itemSize, true); +} + static bool addRawArray(cJSON* json, const char* name, FToJson func, const void* array, int32_t itemSize, int32_t size) { if (size > 0) { cJSON* jArray = cJSON_AddArrayToObject(json, name); @@ -556,6 +572,32 @@ static bool epAddrFromJson(const cJSON* json, void* obj) { return true; } +static const char* jkNodeAddrId = "NodeId"; +static const char* jkNodeAddrInUse = "InUse"; +static const char* jkNodeAddrEpAddrs = "EpAddrs"; + +static bool nodeAddrToJson(const void* obj, cJSON* json) { + const SQueryNodeAddr* ep = (const SQueryNodeAddr*)obj; + bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, ep->nodeId); + if (res) { + res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse); + } + if (res) { + res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddrMsg)); + } + return res; +} + +static bool nodeAddrFromJson(const cJSON* json, void* obj) { + SQueryNodeAddr* ep = (SQueryNodeAddr*)obj; + ep->nodeId = getNumber(json, jkNodeAddrId); + ep->inUse = getNumber(json, jkNodeAddrInUse); + int32_t numOfEps = 0; + bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddrMsg), &numOfEps); + ep->numOfEps = numOfEps; + return res; +} + static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId"; static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints"; @@ -563,7 +605,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj; bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId); if (res) { - res = addArray(json, jkExchangeNodeSrcEndPoints, epAddrToJson, exchange->pSrcEndPoints); + res = addInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints); } return res; } @@ -571,7 +613,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) { static bool exchangeNodeFromJson(const cJSON* json, void* obj) { SExchangePhyNode* exchange = (SExchangePhyNode*)obj; exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId); - return fromArray(json, jkExchangeNodeSrcEndPoints, epAddrFromJson, &exchange->pSrcEndPoints, sizeof(SEpAddrMsg)); + return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SQueryNodeAddr)); } static bool specificPhyNodeToJson(const void* obj, cJSON* json) { @@ -803,7 +845,6 @@ static cJSON* subplanToJson(const SSubplan* subplan) { if (res) { res = addObject(jSubplan, jkSubplanDataSink, dataSinkToJson, subplan->pDataSink); } - if (!res) { cJSON_Delete(jSubplan); return NULL;