From 5c73c1ffd8f4a7dfece8953471ee33fef02269f7 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 6 Jan 2022 19:53:14 +0800 Subject: [PATCH 01/13] refine heartbeat interface --- source/client/inc/clientHb.h | 145 ++++++++++++++++++----------------- source/client/src/clientHb.c | 11 +-- 2 files changed, 78 insertions(+), 78 deletions(-) diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 676b5c4c40..11fea4de5e 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -20,6 +20,8 @@ typedef enum { mq = 0, + // type can be added here + // HEARTBEAT_TYPE_MAX } EHbType; @@ -30,80 +32,16 @@ typedef struct SKlv { void* value; } SKlv; -static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKlv->keyLen); - tlen += taosEncodeFixedI32(buf, pKlv->valueLen); - tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); - tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { - buf = taosDecodeFixedI32(buf, &pKlv->keyLen); - buf = taosDecodeFixedI32(buf, &pKlv->valueLen); - buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); - buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); - return buf; -} - typedef struct SClientHbKey { int32_t connId; int32_t hbType; } SClientHbKey; -static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKey->connId); - tlen += taosEncodeFixedI32(buf, pKey->hbType); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { - buf = taosDecodeFixedI32(buf, &pKey->connId); - buf = taosDecodeFixedI32(buf, &pKey->hbType); - return buf; -} - typedef struct SClientHbReq { SClientHbKey hbKey; - SHashObj* info; // hash + SHashObj* info; // hash } SClientHbReq; -static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { - int tlen = 0; - tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey); - - void* pIter = NULL; - void* data; - SKlv klv; - data = taosHashIterate(pReq->info, pIter); - while (data != NULL) { - taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(data); - klv.value = data; - taosEncodeSKlv(buf, &klv); - - data = taosHashIterate(pReq->info, pIter); - } - return tlen; -} - -static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) { - ASSERT(pReq->info != NULL); - buf = taosDecodeSClientHbKey(buf, &pReq->hbKey); - - //TODO: error handling - if(pReq->info == NULL) { - pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - } - SKlv klv; - buf = taosDecodeSKlv(buf, &klv); - taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); - - return buf; -} - typedef struct SClientHbBatchReq { int64_t reqId; SArray* reqs; // SArray @@ -123,15 +61,16 @@ typedef struct SClientHbBatchRsp { SArray* rsps; // SArray } SClientHbBatchRsp; -typedef int32_t (*FHbRspHandle)(SClientHbReq* pReq); -typedef int32_t (*FGetConnInfo)(int32_t conn, void* self); +typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); +typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); typedef struct SClientHbMgr { int8_t inited; int32_t reportInterval; // unit ms int32_t stats; SRWLatch lock; - SHashObj* info; //hash + SHashObj* activeInfo; // hash + SHashObj* getInfoFuncs; // hash FHbRspHandle handle[HEARTBEAT_TYPE_MAX]; // input queue } SClientHbMgr; @@ -141,8 +80,72 @@ static SClientHbMgr clientHbMgr = {0}; int hbMgrInit(); void hbMgrCleanUp(); -int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle); +int hbHandleRsp(void* hbMsg); -int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle); +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); -int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen); +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); + +static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKlv->keyLen); + tlen += taosEncodeFixedI32(buf, pKlv->valueLen); + tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); + tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { + buf = taosDecodeFixedI32(buf, &pKlv->keyLen); + buf = taosDecodeFixedI32(buf, &pKlv->valueLen); + buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); + buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); + return buf; +} + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + +static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { + int tlen = 0; + tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey); + + void* pIter = NULL; + void* data; + SKlv klv; + data = taosHashIterate(pReq->info, pIter); + while (data != NULL) { + taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen); + klv.valueLen = taosHashGetDataLen(data); + klv.value = data; + taosEncodeSKlv(buf, &klv); + + data = taosHashIterate(pReq->info, pIter); + } + return tlen; +} + +static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) { + ASSERT(pReq->info != NULL); + buf = taosDecodeSClientHbKey(buf, &pReq->hbKey); + + // TODO: error handling + if (pReq->info == NULL) { + pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + SKlv klv; + buf = taosDecodeSKlv(buf, &klv); + taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); + + return buf; +} diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index f1f7409f05..1de295384b 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -15,7 +15,7 @@ #include "clientHb.h" -static int32_t mqHbRspHandle(SClientHbReq* pReq) { +static int32_t mqHbRspHandle(SClientHbRsp* pReq) { return 0; } @@ -42,15 +42,12 @@ void hbMgrCleanUp() { } -int registerConn(int32_t connId, FGetConnInfo func, FHbRspHandle rspHandle) { +int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { + return 0; } -int registerHbRspHandle(int32_t connId, int32_t hbType, FHbRspHandle rspHandle) { - return 0; -} - -int HbAddConnInfo(int32_t connId, void* key, void* value, int32_t keyLen, int32_t valueLen) { +int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen) { //lock //find req by connection id From 11449f3d988136bc7429ba5a0c5fa540caaf098f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 6 Jan 2022 19:58:54 +0800 Subject: [PATCH 02/13] refine heartbeat interface --- source/client/inc/clientHb.h | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 11fea4de5e..4ab11239b7 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -38,7 +38,7 @@ typedef struct SClientHbKey { } SClientHbKey; typedef struct SClientHbReq { - SClientHbKey hbKey; + SClientHbKey connKey; SHashObj* info; // hash } SClientHbReq; @@ -51,8 +51,10 @@ typedef struct SClientHbHandleResult { } SClientHbHandleResult; typedef struct SClientHbRsp { - int32_t connId; - int32_t hbType; + SClientHbKey connKey; + int32_t status; + int32_t bodyLen; + void* body; } SClientHbRsp; typedef struct SClientHbBatchRsp { @@ -118,7 +120,7 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { int tlen = 0; - tlen += taosEncodeSClientHbKey(buf, &pReq->hbKey); + tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); void* pIter = NULL; void* data; @@ -137,7 +139,7 @@ static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* p static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) { ASSERT(pReq->info != NULL); - buf = taosDecodeSClientHbKey(buf, &pReq->hbKey); + buf = taosDecodeSClientHbKey(buf, &pReq->connKey); // TODO: error handling if (pReq->info == NULL) { From cdb3fd245043cafc898be4e60fce2446c3c0f6ec Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 6 Jan 2022 04:17:25 -0800 Subject: [PATCH 03/13] minor changes --- include/common/tmsg.h | 2 +- source/dnode/mgmt/impl/src/dndTransport.c | 74 +++++++++++------------ 2 files changed, 38 insertions(+), 38 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 477d356818..f69f46a601 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -928,7 +928,7 @@ typedef struct { char encrypt; char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; -} SAuthMsg, SAuthRsp; +} SAuthReq, SAuthRsp; typedef struct { int8_t finished; diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 49b9e8d6e1..509e8f4cab 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -143,26 +143,26 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; } -static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { SDnode *pDnode = parent; STransMgmt *pMgmt = &pDnode->tmgmt; - tmsg_t msgType = pMsg->msgType; + tmsg_t msgType = pRsp->msgType; if (dndGetStat(pDnode) == DND_STAT_STOPPED) { - if (pMsg == NULL || pMsg->pCont == NULL) return; - dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pMsg->handle, TMSG_INFO(msgType)); - rpcFreeCont(pMsg->pCont); + if (pRsp == NULL || pRsp->pCont == NULL) return; + dTrace("RPC %p, rsp:%s is ignored since dnode is stopping", pRsp->handle, TMSG_INFO(msgType)); + rpcFreeCont(pRsp->pCont); return; } DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; if (fp != NULL) { - dTrace("RPC %p, rsp:%s will be processed, code:0x%x", pMsg->handle, TMSG_INFO(msgType), pMsg->code & 0XFFFF); - (*fp)(pDnode, pMsg, pEpSet); + dTrace("RPC %p, rsp:%s will be processed, code:0x%x", pRsp->handle, TMSG_INFO(msgType), pRsp->code & 0XFFFF); + (*fp)(pDnode, pRsp, pEpSet); } else { - dError("RPC %p, rsp:%s not processed", pMsg->handle, TMSG_INFO(msgType)); - rpcFreeCont(pMsg->pCont); + dError("RPC %p, rsp:%s not processed", pRsp->handle, TMSG_INFO(msgType)); + rpcFreeCont(pRsp->pCont); } } @@ -201,48 +201,48 @@ static void dndCleanupClient(SDnode *pDnode) { } } -static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { SDnode *pDnode = param; STransMgmt *pMgmt = &pDnode->tmgmt; - tmsg_t msgType = pMsg->msgType; + tmsg_t msgType = pReq->msgType; if (msgType == TDMT_DND_NETWORK_TEST) { - dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); - dndProcessStartupReq(pDnode, pMsg); + dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pReq->handle, pReq->ahandle, pReq->code); + dndProcessStartupReq(pDnode, pReq); return; } if (dndGetStat(pDnode) == DND_STAT_STOPPED) { - dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_OFFLINE}; + dError("RPC %p, req:%s app:%p is ignored since dnode exiting", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); + SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_OFFLINE}; rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); + rpcFreeCont(pReq->pCont); return; } else if (dndGetStat(pDnode) != DND_STAT_RUNNING) { - dError("RPC %p, req:%s app:%p is ignored since dnode not running", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY}; + dError("RPC %p, req:%s app:%p is ignored since dnode not running", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); + SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY}; rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); + rpcFreeCont(pReq->pCont); return; } - if (pMsg->pCont == NULL) { - dTrace("RPC %p, req:%s app:%p not processed since content is null", pMsg->handle, TMSG_INFO(msgType), - pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN}; + if (pReq->pCont == NULL) { + dTrace("RPC %p, req:%s app:%p not processed since content is null", pReq->handle, TMSG_INFO(msgType), + pReq->ahandle); + SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_INVALID_MSG_LEN}; rpcSendResponse(&rspMsg); return; } DndMsgFp fp = pMgmt->msgFp[TMSG_INDEX(msgType)]; if (fp != NULL) { - dTrace("RPC %p, req:%s app:%p will be processed", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - (*fp)(pDnode, pMsg, pEpSet); + dTrace("RPC %p, req:%s app:%p will be processed", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); + (*fp)(pDnode, pReq, pEpSet); } else { - dError("RPC %p, req:%s app:%p is not processed since no handle", pMsg->handle, TMSG_INFO(msgType), pMsg->ahandle); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; + dError("RPC %p, req:%s app:%p is not processed since no handle", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); + SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); + rpcFreeCont(pReq->pCont); } } @@ -254,7 +254,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); } -static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { +static int32_t dndAuthInternalReq(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { if (strcmp(user, INTERNAL_USER) == 0) { // A simple temporary implementation char pass[TSDB_PASSWORD_LEN] = {0}; @@ -281,7 +281,7 @@ static int32_t dndAuthInternalMsg(SDnode *pDnode, char *user, char *spi, char *e static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) { SDnode *pDnode = parent; - if (dndAuthInternalMsg(parent, user, spi, encrypt, secret, ckey) == 0) { + if (dndAuthInternalReq(parent, user, spi, encrypt, secret, ckey) == 0) { // dTrace("get internal auth success"); return 0; } @@ -298,10 +298,10 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char // dDebug("user:%s, send auth msg to other mnodes", user); - SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); - tstrncpy(pMsg->user, user, TSDB_USER_LEN); + SAuthReq *pReq = rpcMallocCont(sizeof(SAuthReq)); + tstrncpy(pReq->user, user, TSDB_USER_LEN); - SRpcMsg rpcMsg = {.pCont = pMsg, .contLen = sizeof(SAuthMsg), .msgType = TDMT_MND_AUTH}; + SRpcMsg rpcMsg = {.pCont = pReq, .contLen = sizeof(SAuthReq), .msgType = TDMT_MND_AUTH}; SRpcMsg rpcRsp = {0}; dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp); @@ -381,19 +381,19 @@ void dndCleanupTrans(SDnode *pDnode) { dInfo("dnode-transport is cleaned up"); } -int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pMsg) { +int32_t dndSendReqToDnode(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq) { STransMgmt *pMgmt = &pDnode->tmgmt; if (pMgmt->clientRpc == NULL) { terrno = TSDB_CODE_DND_OFFLINE; return -1; } - rpcSendRequest(pMgmt->clientRpc, pEpSet, pMsg, NULL); + rpcSendRequest(pMgmt->clientRpc, pEpSet, pReq, NULL); return 0; } -int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pMsg) { +int32_t dndSendReqToMnode(SDnode *pDnode, SRpcMsg *pReq) { SEpSet epSet = {0}; dndGetMnodeEpSet(pDnode, &epSet); - return dndSendReqToDnode(pDnode, &epSet, pMsg); + return dndSendReqToDnode(pDnode, &epSet, pReq); } From 782dcf1162ca7a23c8b204e362babfb08a16aac2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 6 Jan 2022 04:24:46 -0800 Subject: [PATCH 04/13] minor changes --- source/dnode/mgmt/impl/src/dndMnode.c | 50 +++++++++++++-------------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index e947b590ba..5f2d90123c 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -305,18 +305,18 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } -static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pMsg) { +static int32_t dndBuildMnodeOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeMsg *pReq) { dndInitMnodeOption(pDnode, pOption); pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); - pOption->replica = pMsg->replica; + pOption->replica = pReq->replica; pOption->selfIndex = -1; - for (int32_t i = 0; i < pMsg->replica; ++i) { + for (int32_t i = 0; i < pReq->replica; ++i) { SReplica *pReplica = &pOption->replicas[i]; - pReplica->id = pMsg->replicas[i].id; - pReplica->port = pMsg->replicas[i].port; - memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN); + pReplica->id = pReq->replicas[i].id; + pReplica->port = pReq->replicas[i].port; + memcpy(pReplica->fqdn, pReq->replicas[i].fqdn, TSDB_FQDN_LEN); if (pReplica->id == pOption->dnodeId) { pOption->selfIndex = i; } @@ -423,26 +423,26 @@ static int32_t dndDropMnode(SDnode *pDnode) { return 0; } -static SDCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { - SDCreateMnodeMsg *pMsg = pRpcMsg->pCont; - pMsg->dnodeId = htonl(pMsg->dnodeId); - for (int32_t i = 0; i < pMsg->replica; ++i) { - pMsg->replicas[i].id = htonl(pMsg->replicas[i].id); - pMsg->replicas[i].port = htons(pMsg->replicas[i].port); +static SDCreateMnodeMsg *dndParseCreateMnodeReq(SRpcMsg *pReq) { + SDCreateMnodeMsg *pCreate = pReq->pCont; + pCreate->dnodeId = htonl(pCreate->dnodeId); + for (int32_t i = 0; i < pCreate->replica; ++i) { + pCreate->replicas[i].id = htonl(pCreate->replicas[i].id); + pCreate->replicas[i].port = htons(pCreate->replicas[i].port); } - return pMsg; + return pCreate; } -int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); +int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDCreateMnodeMsg *pCreate = dndParseCreateMnodeReq(pReq); - if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + if (pCreate->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; } else { SMnodeOpt option = {0}; - if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { + if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) { return -1; } @@ -450,16 +450,16 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } } -int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); +int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDAlterMnodeMsg *pAlter = dndParseCreateMnodeReq(pReq); - if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + if (pAlter->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; } SMnodeOpt option = {0}; - if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { + if (dndBuildMnodeOptionFromReq(pDnode, &option, pAlter) != 0) { return -1; } @@ -470,11 +470,11 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { return dndWriteMnodeFile(pDnode); } -int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDDropMnodeMsg *pMsg = pRpcMsg->pCont; - pMsg->dnodeId = htonl(pMsg->dnodeId); +int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDDropMnodeMsg *pDrop = pReq->pCont; + pDrop->dnodeId = htonl(pDrop->dnodeId); - if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + if (pDrop->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; } else { From 7583b45ecb263e69526c4a0bef1deec2d9d4b2f6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 7 Jan 2022 10:36:42 +0800 Subject: [PATCH 05/13] refine heartbeat interface --- include/common/tmsg.h | 80 +++++++++++++++++++++++++-- source/client/inc/clientHb.h | 104 +---------------------------------- source/client/src/clientHb.c | 36 +++++++++--- source/common/src/tmsg.c | 36 +++++++++++- 4 files changed, 139 insertions(+), 117 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7d232aa852..954f24ef6a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -26,6 +26,7 @@ extern "C" { #include "tarray.h" #include "tcoding.h" #include "tdataformat.h" +#include "thash.h" #include "tlist.h" /* ------------------------ MESSAGE DEFINITIONS ------------------------ */ @@ -132,6 +133,73 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) +typedef struct SKlv { + int32_t keyLen; + int32_t valueLen; + void* key; + void* value; +} SKlv; + +static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKlv->keyLen); + tlen += taosEncodeFixedI32(buf, pKlv->valueLen); + tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); + tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { + buf = taosDecodeFixedI32(buf, &pKlv->keyLen); + buf = taosDecodeFixedI32(buf, &pKlv->valueLen); + buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); + buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); + return buf; +} +typedef struct SClientHbKey { + int32_t connId; + int32_t hbType; +} SClientHbKey; + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + +typedef struct SClientHbReq { + SClientHbKey connKey; + SHashObj* info; // hash +} SClientHbReq; + +typedef struct SClientHbBatchReq { + int64_t reqId; + SArray* reqs; // SArray +} SClientHbBatchReq; + +int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); +void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq); + +typedef struct SClientHbRsp { + SClientHbKey connKey; + int32_t status; + int32_t bodyLen; + void* body; +} SClientHbRsp; + +typedef struct SClientHbBatchRsp { + int64_t reqId; + int64_t rspId; + SArray* rsps; // SArray +} SClientHbBatchRsp; + typedef struct SBuildTableMetaInput { int32_t vgId; char* dbName; @@ -1126,7 +1194,7 @@ typedef struct { int32_t topicNum; int64_t consumerId; char* consumerGroup; - SArray* topicNames; // SArray + SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { @@ -1135,7 +1203,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeReq(void** buf, const SCMSubscribe tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->consumerGroup); - for(int i = 0; i < pReq->topicNum; i++) { + for (int i = 0; i < pReq->topicNum; i++) { tlen += taosEncodeString(buf, (char*)taosArrayGetP(pReq->topicNames, i)); } return tlen; @@ -1146,7 +1214,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeString(buf, &pReq->consumerGroup); pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*)); - for(int i = 0; i < pReq->topicNum; i++) { + for (int i = 0; i < pReq->topicNum; i++) { char* name = NULL; buf = taosDecodeString(buf, &name); taosArrayPush(pReq->topicNames, &name); @@ -1161,14 +1229,14 @@ typedef struct SMqSubTopic { } SMqSubTopic; typedef struct { - int32_t topicNum; + int32_t topicNum; SMqSubTopic topics[]; } SCMSubscribeRsp; static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) { int tlen = 0; tlen += taosEncodeFixedI32(buf, pRsp->topicNum); - for(int i = 0; i < pRsp->topicNum; i++) { + for (int i = 0; i < pRsp->topicNum; i++) { tlen += taosEncodeFixedI32(buf, pRsp->topics[i].vgId); tlen += taosEncodeFixedI64(buf, pRsp->topics[i].topicId); tlen += taosEncodeSEpSet(buf, &pRsp->topics[i].epSet); @@ -1178,7 +1246,7 @@ static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribe static FORCE_INLINE void* tDeserializeSCMSubscribeRsp(void* buf, SCMSubscribeRsp* pRsp) { buf = taosDecodeFixedI32(buf, &pRsp->topicNum); - for(int i = 0; i < pRsp->topicNum; i++) { + for (int i = 0; i < pRsp->topicNum; i++) { buf = taosDecodeFixedI32(buf, &pRsp->topics[i].vgId); buf = taosDecodeFixedI64(buf, &pRsp->topics[i].topicId); buf = taosDecodeSEpSet(buf, &pRsp->topics[i].epSet); diff --git a/source/client/inc/clientHb.h b/source/client/inc/clientHb.h index 4ab11239b7..73adb41308 100644 --- a/source/client/inc/clientHb.h +++ b/source/client/inc/clientHb.h @@ -25,44 +25,6 @@ typedef enum { HEARTBEAT_TYPE_MAX } EHbType; -typedef struct SKlv { - int32_t keyLen; - int32_t valueLen; - void* key; - void* value; -} SKlv; - -typedef struct SClientHbKey { - int32_t connId; - int32_t hbType; -} SClientHbKey; - -typedef struct SClientHbReq { - SClientHbKey connKey; - SHashObj* info; // hash -} SClientHbReq; - -typedef struct SClientHbBatchReq { - int64_t reqId; - SArray* reqs; // SArray -} SClientHbBatchReq; - -typedef struct SClientHbHandleResult { -} SClientHbHandleResult; - -typedef struct SClientHbRsp { - SClientHbKey connKey; - int32_t status; - int32_t bodyLen; - void* body; -} SClientHbRsp; - -typedef struct SClientHbBatchRsp { - int64_t reqId; - int64_t rspId; - SArray* rsps; // SArray -} SClientHbBatchRsp; - typedef int32_t (*FHbRspHandle)(SClientHbRsp* pReq); typedef int32_t (*FGetConnInfo)(SClientHbKey connKey, void* param); @@ -81,73 +43,11 @@ static SClientHbMgr clientHbMgr = {0}; int hbMgrInit(); void hbMgrCleanUp(); - int hbHandleRsp(void* hbMsg); + int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func); + int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen); -static FORCE_INLINE int taosEncodeSKlv(void** buf, const SKlv* pKlv) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKlv->keyLen); - tlen += taosEncodeFixedI32(buf, pKlv->valueLen); - tlen += taosEncodeBinary(buf, pKlv->key, pKlv->keyLen); - tlen += taosEncodeBinary(buf, pKlv->value, pKlv->valueLen); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSKlv(void* buf, SKlv* pKlv) { - buf = taosDecodeFixedI32(buf, &pKlv->keyLen); - buf = taosDecodeFixedI32(buf, &pKlv->valueLen); - buf = taosDecodeBinary(buf, &pKlv->key, pKlv->keyLen); - buf = taosDecodeBinary(buf, &pKlv->value, pKlv->valueLen); - return buf; -} - -static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKey->connId); - tlen += taosEncodeFixedI32(buf, pKey->hbType); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { - buf = taosDecodeFixedI32(buf, &pKey->connId); - buf = taosDecodeFixedI32(buf, &pKey->hbType); - return buf; -} - -static FORCE_INLINE int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq) { - int tlen = 0; - tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); - - void* pIter = NULL; - void* data; - SKlv klv; - data = taosHashIterate(pReq->info, pIter); - while (data != NULL) { - taosHashGetKey(data, &klv.key, (size_t*)&klv.keyLen); - klv.valueLen = taosHashGetDataLen(data); - klv.value = data; - taosEncodeSKlv(buf, &klv); - - data = taosHashIterate(pReq->info, pIter); - } - return tlen; -} - -static FORCE_INLINE void* tDeserializeClientHbReq(void* buf, SClientHbReq* pReq) { - ASSERT(pReq->info != NULL); - buf = taosDecodeSClientHbKey(buf, &pReq->connKey); - - // TODO: error handling - if (pReq->info == NULL) { - pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - } - SKlv klv; - buf = taosDecodeSKlv(buf, &klv); - taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); - - return buf; -} diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1de295384b..7daa1204d0 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -19,27 +19,44 @@ static int32_t mqHbRspHandle(SClientHbRsp* pReq) { return 0; } +uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { + return 0; +} + +static void hbMgrInitMqHbFunc() { + clientHbMgr.handle[mq] = mqHbRspHandle; +} + int hbMgrInit() { //init once - // - //init lock - // - //init handle funcs - clientHbMgr.handle[mq] = mqHbRspHandle; + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1); + if (old == 1) return 0; + + //init config + clientHbMgr.reportInterval = 1500; //init stat clientHbMgr.stats = 0; - //init config - clientHbMgr.reportInterval = 1500; + //init lock + taosInitRWLatch(&clientHbMgr.lock); + + //init handle funcs + hbMgrInitMqHbFunc(); //init hash info - // + clientHbMgr.activeInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); + //init getInfoFunc + clientHbMgr.getInfoFuncs = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK); return 0; } void hbMgrCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 1, 0); + if (old == 0) return; + taosHashCleanup(clientHbMgr.activeInfo); + taosHashCleanup(clientHbMgr.getInfoFuncs); } int hbRegisterConn(SClientHbKey connKey, FGetConnInfo func) { @@ -51,6 +68,9 @@ int hbAddConnInfo(SClientHbKey connKey, void* key, void* value, int32_t keyLen, //lock //find req by connection id + SClientHbReq* data = taosHashGet(clientHbMgr.activeInfo, &connKey, sizeof(SClientHbKey)); + ASSERT(data != NULL); + taosHashPut(data->info, key, keyLen, value, valueLen); //unlock return 0; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c1048f8482..5cbb42c1e6 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -27,6 +27,40 @@ #undef TD_MSG_SEG_CODE_ #include "tmsgdef.h" +int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { + int tlen = 0; + tlen += taosEncodeSClientHbKey(buf, &pReq->connKey); + + void *pIter = NULL; + void *data; + SKlv klv; + data = taosHashIterate(pReq->info, pIter); + while (data != NULL) { + taosHashGetKey(data, &klv.key, (size_t *)&klv.keyLen); + klv.valueLen = taosHashGetDataLen(data); + klv.value = data; + taosEncodeSKlv(buf, &klv); + + data = taosHashIterate(pReq->info, pIter); + } + return tlen; +} + +void *tDeserializeClientHbReq(void *buf, SClientHbReq *pReq) { + ASSERT(pReq->info != NULL); + buf = taosDecodeSClientHbKey(buf, &pReq->connKey); + + // TODO: error handling + if (pReq->info == NULL) { + pReq->info = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + SKlv klv; + buf = taosDecodeSKlv(buf, &klv); + taosHashPut(pReq->info, klv.key, klv.keyLen, klv.value, klv.valueLen); + + return buf; +} + int tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { int tlen = 0; @@ -148,4 +182,4 @@ void *tSVCreateTbBatchReqDeserialize(void *buf, SVCreateTbBatchReq *pReq) { } return buf; -} \ No newline at end of file +} From 8a1dafa4822757c221d9433192962f0bca69218b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jan 2022 10:40:54 +0800 Subject: [PATCH 06/13] [td-11818] add check for hash iterator, add log config. refactor test script. --- source/client/test/clientTests.cpp | 467 ++++++++++++++--------------- source/common/src/tglobal.c | 12 +- source/libs/catalog/src/catalog.c | 10 - source/libs/parser/src/parser.c | 8 - source/util/src/thash.c | 26 +- 5 files changed, 264 insertions(+), 259 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index aeaa1d8361..0ef96e657d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -101,13 +101,13 @@ TEST(testCase, show_user_Test) { assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show users"); - TAOS_ROW pRow = NULL; + TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); char str[512] = {0}; - while((pRow = taos_fetch_row(pRes)) != NULL) { + while ((pRow = taos_fetch_row(pRes)) != NULL) { int32_t code = taos_print_row(str, pRow, pFields, numOfFields); printf("%s\n", str); } @@ -134,13 +134,13 @@ TEST(testCase, show_db_Test) { assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show databases"); - TAOS_ROW pRow = NULL; + TAOS_ROW pRow = NULL; TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); char str[512] = {0}; - while((pRow = taos_fetch_row(pRes)) != NULL) { + while ((pRow = taos_fetch_row(pRes)) != NULL) { int32_t code = taos_print_row(str, pRow, pFields, numOfFields); printf("%s\n", str); } @@ -228,29 +228,29 @@ TEST(testCase, use_db_test) { taos_close(pConn); } -//TEST(testCase, drop_db_test) { -//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -//// -//// showDB(pConn); -//// -//// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); -//// if (taos_errno(pRes) != 0) { -//// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// -//// showDB(pConn); -//// -//// pRes = taos_query(pConn, "create database abc1"); -//// if (taos_errno(pRes) != 0) { -//// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); -//// } -//// taos_free_result(pRes); -//// taos_close(pConn); +// TEST(testCase, drop_db_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// showDB(pConn); +// +// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// showDB(pConn); +// +// pRes = taos_query(pConn, "create database abc1"); +// if (taos_errno(pRes) != 0) { +// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// taos_close(pConn); //} - TEST(testCase, create_stable_Test) { +TEST(testCase, create_stable_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -281,128 +281,211 @@ TEST(testCase, use_db_test) { taos_close(pConn); } -//TEST(testCase, create_table_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)"); -// taos_free_result(pRes); -// -// taos_close(pConn); -//} +TEST(testCase, create_table_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); -//TEST(testCase, create_ctable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); -// if (taos_errno(pRes) != 0) { -// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -//TEST(testCase, show_stable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "show stables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); -//TEST(testCase, show_vgroup_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "show vgroups"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// -// taos_close(pConn); -//} + pRes = taos_query(pConn, "create table tm0(ts timestamp, k int)"); + taos_free_result(pRes); -//TEST(testCase, drop_stable_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create database abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in creating db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "use abc1"); -// if (taos_errno(pRes) != 0) { -// printf("error in using db, reason:%s\n", taos_errstr(pRes)); -// } -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "drop stable st1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} + taos_close(pConn); +} -//TEST(testCase, create_topic_Test) { +TEST(testCase, create_ctable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_stable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "show stables"); + if (taos_errno(pRes) != 0) { + printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, show_vgroup_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "show vgroups"); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, create_multiple_tables) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to use db, reason:%s", taos_errstr(pRes)); + taos_free_result(pRes); + taos_close(pConn); + return; + } + + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + taos_free_result(pRes); + pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); + if (taos_errno(pRes) != 0) { + printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + + for (int32_t i = 0; i < 20; ++i) { + char sql[512] = {0}; + snprintf(sql, tListLen(sql), + "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, + (i + 1) * 30, (i + 2) * 40); + TAOS_RES* pres = taos_query(pConn, sql); + if (taos_errno(pres) != 0) { + printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); + } + taos_free_result(pres); + } + + taos_close(pConn); +} + +TEST(testCase, show_table_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "show tables"); + if (taos_errno(pRes) != 0) { + printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT_TRUE(false); + } + + TAOS_ROW pRow = NULL; + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while ((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, drop_stable_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("error in creating db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in using db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop stable st1"); + if (taos_errno(pRes) != 0) { + printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + +// TEST(testCase, create_topic_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // @@ -435,97 +518,12 @@ TEST(testCase, use_db_test) { // tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql)); // taos_close(pConn); //} - -//TEST(testCase, show_table_Test) { -// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "use abc1"); -// taos_free_result(pRes); -// -// pRes = taos_query(pConn, "show tables"); -// if (taos_errno(pRes) != 0) { -// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); -// taos_free_result(pRes); -// ASSERT_TRUE(false); -// } -// -// TAOS_ROW pRow = NULL; -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} - -TEST(testCase, create_multiple_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s", taos_errstr(pRes)); - taos_free_result(pRes); - taos_close(pConn); - return; - } - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - taos_free_result(pRes); - pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - - for(int32_t i = 0; i < 200000; ++i) { - char sql[512] = {0}; - snprintf(sql, tListLen(sql), "create table t_x_%d using st1 tags(2)", i); - TAOS_RES* pres = taos_query(pConn, sql); - if (taos_errno(pres) != 0) { - printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); - } - - printf("%d\n", i); - taos_free_result(pres); - } - - taos_close(pConn); -} - TEST(testCase, generated_request_id_test) { - SHashObj *phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - for(int32_t i = 0; i < 50000; ++i) { + for (int32_t i = 0; i < 50000; ++i) { uint64_t v = generateRequestId(); - void* result = taosHashGet(phash, &v, sizeof(v)); + void* result = taosHashGet(phash, &v, sizeof(v)); if (result != nullptr) { printf("0x%lx, index:%d\n", v, i); } @@ -536,7 +534,7 @@ TEST(testCase, generated_request_id_test) { taosHashCleanup(phash); } -//TEST(testCase, projection_query_tables) { +// TEST(testCase, projection_query_tables) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // ASSERT_EQ(pConn, nullptr); // @@ -563,4 +561,3 @@ TEST(testCase, generated_request_id_test) { // taos_free_result(pRes); // taos_close(pConn); //} - diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 9ddadc9ba6..d9043861e7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -910,7 +910,7 @@ static void doInitGlobalConfig(void) { cfg.option = "tsdbDebugFlag"; cfg.ptr = &tsdbDebugFlag; cfg.valType = TAOS_CFG_VTYPE_INT32; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG; cfg.minValue = 0; cfg.maxValue = 255; cfg.ptrLength = 0; @@ -927,6 +927,16 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); + cfg.option = "ctgDebugFlag"; + cfg.ptr = &ctgDebugFlag; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 255; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosAddConfigOption(cfg); + cfg.option = "enableRecordSql"; cfg.ptr = &tsTscEnableRecordSql; cfg.valType = TAOS_CFG_VTYPE_INT8; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 0a90ea3306..79c8e4ea63 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -310,18 +310,8 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName vgInfo = NULL; } - ctgInfo("numOfVgroup:%d", taosHashGetSize(dbInfo->vgInfo)); - if (NULL == vgInfo) { ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo)); - - void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL); - while (pIter1) { - vgInfo = pIter1; - ctgError("valid range:[%u, %u], vgId:%d", vgInfo->hashBegin, vgInfo->hashEnd, vgInfo->vgId); - pIter1 = taosHashIterate(dbInfo->vgInfo, pIter1); - } - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 85a8d9e047..3971f90ac4 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -44,15 +44,7 @@ int32_t parseQuerySql(SParseContext* pCxt, SQueryNode** pQuery) { } if (!isDqlSqlStatement(&info)) { -// bool toVnode = false; if (info.type == TSDB_SQL_CREATE_TABLE) { -// SCreateTableSql* pCreateSql = info.pCreateTableInfo; -// if (pCreateSql->type == TSQL_CREATE_CTABLE || pCreateSql->type == TSQL_CREATE_TABLE) { -// toVnode = true; -// } -// } - -// if (toVnode) { SVnodeModifOpStmtInfo * pModifStmtInfo = qParserValidateCreateTbSqlNode(&info, &pCxt->ctx, pCxt->pMsg, pCxt->msgLen); if (pModifStmtInfo == NULL) { return terrno; diff --git a/source/util/src/thash.c b/source/util/src/thash.c index f90b157558..17200df08e 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -19,8 +19,9 @@ #include "taos.h" #include "tdef.h" -#define EXT_SIZE 1024 - +// the add ref count operation may trigger the warning if the reference count is greater than the MAX_WARNING_REF_COUNT +#define MAX_WARNING_REF_COUNT 10000 +#define EXT_SIZE 1024 #define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) #define DO_FREE_HASH_NODE(_n) \ @@ -902,8 +903,24 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { if (pNode) { SHashEntry *pe = pHashObj->hashList[slot]; - pNode->count++; - data = GET_HASH_NODE_DATA(pNode); + + uint16_t prevRef = atomic_load_16(&pNode->count); + uint16_t afterRef = atomic_add_fetch_16(&pNode->count, 1); + + // the reference count value is overflow, which will cause the delete node operation immediately. + if (prevRef > afterRef) { + uError("hash entry ref count overflow, prev ref:%d, current ref:%d", prevRef, afterRef); + // restore the value + atomic_sub_fetch_16(&pNode->count, 1); + data = NULL; + } else { + data = GET_HASH_NODE_DATA(pNode); + } + + if (afterRef >= MAX_WARNING_REF_COUNT) { + uWarn("hash entry ref count is abnormally high: %d", afterRef); + } + if (pHashObj->type == HASH_ENTRY_LOCK) { taosWUnLockLatch(&pe->latch); } @@ -911,7 +928,6 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { __rd_unlock((void*) &pHashObj->lock, pHashObj->type); return data; - } void taosHashCancelIterate(SHashObj *pHashObj, void *p) { From a1060c3eb811607bc3784a8fbda464b5c349be3f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jan 2022 10:44:06 +0800 Subject: [PATCH 07/13] [td-11818] fix log error. --- source/client/src/clientImpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 26c27a5cae..bda813eded 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -140,7 +140,7 @@ int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj* (*pRequest)->sqlstr[sqlLen] = 0; (*pRequest)->sqlLen = sqlLen; - tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); + tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); return TSDB_CODE_SUCCESS; } From 572cb5840a94b86efe2e561e9c30a404bc434478 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 6 Jan 2022 20:03:18 -0800 Subject: [PATCH 08/13] create table test --- tests/test/c/create_table.c | 63 +++++++++++++++++++++++++++---------- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index c46812c9a5..51f719ec7e 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -28,7 +28,7 @@ int32_t numOfThreads = 1; int32_t numOfTables = 10000; int32_t createTable = 1; int32_t insertData = 0; -int32_t batchNum = 10; +int32_t batchNum = 1; int32_t numOfVgroups = 2; typedef struct { @@ -39,6 +39,7 @@ typedef struct { char stbName[64]; float createTableSpeed; float insertDataSpeed; + int64_t startMs; pthread_t thread; } SThreadInfo; @@ -130,6 +131,26 @@ void createDbAndStb() { taos_close(con); } +void printCreateProgress(SThreadInfo *pInfo, int32_t t) { + int64_t endMs = taosGetTimestampMs(); + int32_t totalTables = t - pInfo->tableBeginIndex; + float seconds = (endMs - pInfo->startMs) / 1000.0; + float speed = totalTables / seconds; + pInfo->createTableSpeed = speed; + pPrint("thread:%d, %d tables created, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, totalTables, + seconds, speed); +} + +void printInsertProgress(SThreadInfo *pInfo, int32_t t) { + int64_t endMs = taosGetTimestampMs(); + int32_t totalTables = t - pInfo->tableBeginIndex; + float seconds = (endMs - pInfo->startMs) / 1000.0; + float speed = totalTables / seconds; + pInfo->insertDataSpeed = speed; + pPrint("thread:%d, %d rows inserted, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex, totalTables, + seconds, speed); +} + void *threadFunc(void *param) { SThreadInfo *pInfo = (SThreadInfo *)param; char *qstr = malloc(2000 * 1000); @@ -146,7 +167,7 @@ void *threadFunc(void *param) { taos_free_result(pSql); if (createTable) { - int64_t startMs = taosGetTimestampMs(); + pInfo->startMs = taosGetTimestampMs(); for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { int32_t batch = (pInfo->tableEndIndex - t); batch = MIN(batch, batchNum); @@ -155,38 +176,46 @@ void *threadFunc(void *param) { for (int32_t i = 0; i < batch; ++i) { len += sprintf(qstr + len, " t%d using %s tags(%d)", t + i, stbName, t + i); } + TAOS_RES *pSql = taos_query(con, qstr); code = taos_errno(pSql); if (code != 0) { pError("failed to create table t%d, reason:%s", t, tstrerror(code)); } taos_free_result(pSql); + + if (t % 1000 == 0) { + printCreateProgress(pInfo, t); + } + t += (batch - 1); } - int64_t endMs = taosGetTimestampMs(); - int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex; - float seconds = (endMs - startMs) / 1000.0; - float speed = totalTables / seconds; - pInfo->createTableSpeed = speed; - pPrint("thread:%d, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, seconds, speed); + printCreateProgress(pInfo, pInfo->tableEndIndex); } if (insertData) { - int64_t startMs = taosGetTimestampMs(); + pInfo->startMs = taosGetTimestampMs(); for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { - sprintf(qstr, "insert into %s%d values(now, 1)", stbName, t); + int32_t batch = (pInfo->tableEndIndex - t); + batch = MIN(batch, batchNum); + + int32_t len = sprintf(qstr, "insert into"); + for (int32_t i = 0; i < batch; ++i) { + len += sprintf(qstr + len, " t%d values(now, %d)", t + i, t + i); + } + TAOS_RES *pSql = taos_query(con, qstr); code = taos_errno(pSql); if (code != 0) { - pError("failed to create table %s%d, reason:%s", stbName, t, tstrerror(code)); + pError("failed to insert table t%d, reason:%s", t, tstrerror(code)); } taos_free_result(pSql); + + if (t % 100000 == 0) { + printInsertProgress(pInfo, t); + } + t += (batch - 1); } - int64_t endMs = taosGetTimestampMs(); - int32_t totalTables = pInfo->tableEndIndex - pInfo->tableBeginIndex; - float seconds = (endMs - startMs) / 1000.0; - float speed = totalTables / seconds; - pInfo->insertDataSpeed = speed; - pPrint("thread:%d, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex, seconds, speed); + printInsertProgress(pInfo, pInfo->tableEndIndex); } taos_close(con); From 0cc54f2fac5974e2acfcbaf95e5077f9b597b0d2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 6 Jan 2022 20:07:23 -0800 Subject: [PATCH 09/13] minor changes --- source/dnode/mgmt/impl/test/mnode/qmnode.cpp | 157 +++++++++++++++++++ 1 file changed, 157 insertions(+) diff --git a/source/dnode/mgmt/impl/test/mnode/qmnode.cpp b/source/dnode/mgmt/impl/test/mnode/qmnode.cpp index 1bf692d892..00098a856a 100644 --- a/source/dnode/mgmt/impl/test/mnode/qmnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/qmnode.cpp @@ -24,3 +24,160 @@ class DndTestMnode : public ::testing::Test { }; Testbase DndTestMnode::test; + +#if 0 +TEST_F(DndTestMnode, 01_Create_Mnode) { + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + } + + test.Restart(); + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + } +} + +TEST_F(DndTestMnode, 02_Alter_Mnode) { + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + } + + test.Restart(); + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED); + } +} + +TEST_F(DndTestMnode, 03_Drop_Mnode) { + { + int32_t contLen = sizeof(SDDropMnodeReq); + + SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_ID_INVALID); + } + + { + int32_t contLen = sizeof(SDDropMnodeReq); + + SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = sizeof(SDDropMnodeReq); + + SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); + } + + test.Restart(); + + { + int32_t contLen = sizeof(SDDropMnodeReq); + + SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); + } + + { + int32_t contLen = sizeof(SDCreateMnodeReq); + + SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); + + SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } +} +#endif \ No newline at end of file From cb67c5006d26141e884dddcd3a82db61a7068cd4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 6 Jan 2022 20:13:18 -0800 Subject: [PATCH 10/13] minor changes --- tests/test/c/create_table.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index 51f719ec7e..c95b37c35d 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -25,10 +25,10 @@ char dbName[32] = "db"; char stbName[64] = "st"; int32_t numOfThreads = 1; -int32_t numOfTables = 10000; +int32_t numOfTables = 200000; int32_t createTable = 1; int32_t insertData = 0; -int32_t batchNum = 1; +int32_t batchNum = 100; int32_t numOfVgroups = 2; typedef struct { @@ -84,8 +84,9 @@ int32_t main(int32_t argc, char *argv[]) { insertDataSpeed += pInfo[i].insertDataSpeed; } - pPrint("%s total %.1f tables/second, threads:%d %s", GREEN, createTableSpeed, numOfThreads, NC); - pPrint("%s total %.1f rows/second, threads:%d %s", GREEN, insertDataSpeed, numOfThreads, NC); + pPrint("%s total %d tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed, numOfThreads, + NC); + pPrint("%s total %d tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, numOfThreads, NC); pthread_attr_destroy(&thattr); free(pInfo); @@ -184,7 +185,7 @@ void *threadFunc(void *param) { } taos_free_result(pSql); - if (t % 1000 == 0) { + if (t % 100000 == 0) { printCreateProgress(pInfo, t); } t += (batch - 1); From 5f72c178fd06efc031610ac0b492dc9884690f01 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 6 Jan 2022 20:19:30 -0800 Subject: [PATCH 11/13] minor changes --- tests/test/c/create_table.c | 53 +++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index c95b37c35d..f2db9d0a0c 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -25,15 +25,15 @@ char dbName[32] = "db"; char stbName[64] = "st"; int32_t numOfThreads = 1; -int32_t numOfTables = 200000; +int64_t numOfTables = 200000; int32_t createTable = 1; int32_t insertData = 0; int32_t batchNum = 100; int32_t numOfVgroups = 2; typedef struct { - int32_t tableBeginIndex; - int32_t tableEndIndex; + int64_t tableBeginIndex; + int64_t tableEndIndex; int32_t threadIndex; char dbName[32]; char stbName[64]; @@ -58,7 +58,7 @@ int32_t main(int32_t argc, char *argv[]) { pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); SThreadInfo *pInfo = (SThreadInfo *)calloc(numOfThreads, sizeof(SThreadInfo)); - int32_t numOfTablesPerThread = numOfTables / numOfThreads; + int64_t numOfTablesPerThread = numOfTables / numOfThreads; numOfTables = numOfTablesPerThread * numOfThreads; for (int32_t i = 0; i < numOfThreads; ++i) { pInfo[i].tableBeginIndex = i * numOfTablesPerThread; @@ -84,9 +84,10 @@ int32_t main(int32_t argc, char *argv[]) { insertDataSpeed += pInfo[i].insertDataSpeed; } - pPrint("%s total %d tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed, numOfThreads, - NC); - pPrint("%s total %d tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, numOfThreads, NC); + pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d %s", GREEN, numOfTables, createTableSpeed, + numOfThreads, NC); + pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, + numOfThreads, NC); pthread_attr_destroy(&thattr); free(pInfo); @@ -132,24 +133,24 @@ void createDbAndStb() { taos_close(con); } -void printCreateProgress(SThreadInfo *pInfo, int32_t t) { +void printCreateProgress(SThreadInfo *pInfo, int64_t t) { int64_t endMs = taosGetTimestampMs(); - int32_t totalTables = t - pInfo->tableBeginIndex; + int64_t totalTables = t - pInfo->tableBeginIndex; float seconds = (endMs - pInfo->startMs) / 1000.0; float speed = totalTables / seconds; pInfo->createTableSpeed = speed; - pPrint("thread:%d, %d tables created, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, totalTables, - seconds, speed); + pPrint("thread:%d, %" PRId64 " tables created, time:%.2f sec, speed:%.1f tables/second, ", pInfo->threadIndex, + totalTables, seconds, speed); } -void printInsertProgress(SThreadInfo *pInfo, int32_t t) { +void printInsertProgress(SThreadInfo *pInfo, int64_t t) { int64_t endMs = taosGetTimestampMs(); - int32_t totalTables = t - pInfo->tableBeginIndex; + int64_t totalTables = t - pInfo->tableBeginIndex; float seconds = (endMs - pInfo->startMs) / 1000.0; float speed = totalTables / seconds; pInfo->insertDataSpeed = speed; - pPrint("thread:%d, %d rows inserted, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex, totalTables, - seconds, speed); + pPrint("thread:%d, %" PRId64 " rows inserted, time:%.2f sec, speed:%.1f rows/second, ", pInfo->threadIndex, + totalTables, seconds, speed); } void *threadFunc(void *param) { @@ -169,19 +170,19 @@ void *threadFunc(void *param) { if (createTable) { pInfo->startMs = taosGetTimestampMs(); - for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { - int32_t batch = (pInfo->tableEndIndex - t); + for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { + int64_t batch = (pInfo->tableEndIndex - t); batch = MIN(batch, batchNum); int32_t len = sprintf(qstr, "create table"); for (int32_t i = 0; i < batch; ++i) { - len += sprintf(qstr + len, " t%d using %s tags(%d)", t + i, stbName, t + i); + len += sprintf(qstr + len, " t%" PRId64 " using %s tags(%" PRId64 ")", t + i, stbName, t + i); } TAOS_RES *pSql = taos_query(con, qstr); code = taos_errno(pSql); if (code != 0) { - pError("failed to create table t%d, reason:%s", t, tstrerror(code)); + pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); } taos_free_result(pSql); @@ -195,19 +196,19 @@ void *threadFunc(void *param) { if (insertData) { pInfo->startMs = taosGetTimestampMs(); - for (int32_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { - int32_t batch = (pInfo->tableEndIndex - t); + for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { + int64_t batch = (pInfo->tableEndIndex - t); batch = MIN(batch, batchNum); int32_t len = sprintf(qstr, "insert into"); for (int32_t i = 0; i < batch; ++i) { - len += sprintf(qstr + len, " t%d values(now, %d)", t + i, t + i); + len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); } TAOS_RES *pSql = taos_query(con, qstr); code = taos_errno(pSql); if (code != 0) { - pError("failed to insert table t%d, reason:%s", t, tstrerror(code)); + pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); } taos_free_result(pSql); @@ -237,7 +238,7 @@ void printHelp() { printf("%s%s\n", indent, "-t"); printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", numOfThreads); printf("%s%s\n", indent, "-n"); - printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", numOfTables); + printf("%s%s%s%" PRId64 "\n", indent, indent, "numOfTables, default is ", numOfTables); printf("%s%s\n", indent, "-v"); printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", numOfVgroups); printf("%s%s\n", indent, "-a"); @@ -264,7 +265,7 @@ void parseArgument(int32_t argc, char *argv[]) { } else if (strcmp(argv[i], "-t") == 0) { numOfThreads = atoi(argv[++i]); } else if (strcmp(argv[i], "-n") == 0) { - numOfTables = atoi(argv[++i]); + numOfTables = atoll(argv[++i]); } else if (strcmp(argv[i], "-n") == 0) { numOfVgroups = atoi(argv[++i]); } else if (strcmp(argv[i], "-a") == 0) { @@ -280,7 +281,7 @@ void parseArgument(int32_t argc, char *argv[]) { pPrint("%s dbName:%s %s", GREEN, dbName, NC); pPrint("%s stbName:%s %s", GREEN, stbName, NC); pPrint("%s configDir:%s %s", GREEN, configDir, NC); - pPrint("%s numOfTables:%d %s", GREEN, numOfTables, NC); + pPrint("%s numOfTables:%" PRId64 " %s", GREEN, numOfTables, NC); pPrint("%s numOfThreads:%d %s", GREEN, numOfThreads, NC); pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC); pPrint("%s createTable:%d %s", GREEN, createTable, NC); From bd386044a14ed7d7ebcdf2957d7670ac83509f4e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 7 Jan 2022 13:50:06 +0800 Subject: [PATCH 12/13] [td-11818] fix invalid free. --- source/client/src/clientImpl.c | 2 +- source/libs/catalog/src/catalog.c | 6 ++---- source/libs/parser/inc/astGenerator.h | 2 +- source/libs/parser/src/astGenerator.c | 2 +- source/libs/parser/src/dCDAstProcess.c | 1 - 5 files changed, 5 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9673937811..3205524e3c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -181,7 +181,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { if (pDcl->msgType == TDMT_VND_SHOW_TABLES) { SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; if (pShowReqInfo->pArray == NULL) { - pShowReqInfo->currentIndex = 0; + pShowReqInfo->currentIndex = 0; // set the first vnode/ then iterate the next vnode pShowReqInfo->pArray = pDcl->pExtension; } } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 79c8e4ea63..d4caf730b4 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -568,18 +568,17 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - SDBVgroupInfo* db = NULL; + SDBVgroupInfo* db = NULL; SVgroupInfo *vgInfo = NULL; int32_t code = 0; SArray *vgList = NULL; - CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db)); vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo)); if (NULL == vgList) { ctgError("taosArrayInit failed"); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } void *pIter = taosHashIterate(db->vgInfo, NULL); @@ -599,7 +598,6 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* vgList = NULL; _return: - if (db) { CTG_UNLOCK(CTG_READ, &db->lock); taosHashRelease(pCatalog->dbCache.cache, db); diff --git a/source/libs/parser/inc/astGenerator.h b/source/libs/parser/inc/astGenerator.h index 22806969af..7f357a2bbd 100644 --- a/source/libs/parser/inc/astGenerator.h +++ b/source/libs/parser/inc/astGenerator.h @@ -123,7 +123,7 @@ typedef struct SCreatedTableInfo { SToken name; // table name token SToken stbName; // super table name token , for using clause SArray *pTagNames; // create by using super table, tag name - SArray *pTagVals; // create by using super table, tag value + SArray *pTagVals; // create by using super table, tag value. SArray char *fullname; // table full name int8_t igExist; // ignore if exists } SCreatedTableInfo; diff --git a/source/libs/parser/src/astGenerator.c b/source/libs/parser/src/astGenerator.c index 0cb3cea95f..34ed8bd355 100644 --- a/source/libs/parser/src/astGenerator.c +++ b/source/libs/parser/src/astGenerator.c @@ -686,7 +686,7 @@ void destroySqlNode(SSqlNode *pSqlNode) { void freeCreateTableInfo(void* p) { SCreatedTableInfo* pInfo = (SCreatedTableInfo*) p; taosArrayDestroy(pInfo->pTagNames); - taosArrayDestroyEx(pInfo->pTagVals, freeItem); + taosArrayDestroy(pInfo->pTagVals); tfree(pInfo->fullname); } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index abee162255..60f4d4835b 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -332,7 +332,6 @@ static int32_t doParseSerializeTagValue(SSchema* pTagSchema, int32_t numOfInputT char* endPtr = NULL; char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0}; - SKvParam param = {.builder = pKvRowBuilder, .schema = pSchema}; SToken* pItem = taosArrayGet(pTagValList, i); From 1c7b2a5ddfe29e431299f552e89b2c786a996955 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jan 2022 14:57:08 +0800 Subject: [PATCH 13/13] feature/qnode --- source/common/src/tglobal.c | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index d9043861e7..9a20fadbfb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -927,16 +927,6 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosAddConfigOption(cfg); - cfg.option = "ctgDebugFlag"; - cfg.ptr = &ctgDebugFlag; - cfg.valType = TAOS_CFG_VTYPE_INT32; - cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT; - cfg.minValue = 0; - cfg.maxValue = 255; - cfg.ptrLength = 0; - cfg.unitType = TAOS_CFG_UTYPE_NONE; - taosAddConfigOption(cfg); - cfg.option = "enableRecordSql"; cfg.ptr = &tsTscEnableRecordSql; cfg.valType = TAOS_CFG_VTYPE_INT8;