From de4efca56f283e7ce0488410eed590649e2f0ff8 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jul 2023 12:12:56 +0000 Subject: [PATCH 1/5] update ip --- source/libs/transport/src/transCli.c | 53 ++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 11 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8062a0618b..01223a2be9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -73,7 +73,7 @@ typedef struct SCliConn { SDelayTask* task; - char* ip; + char* dstAddr; char src[32]; char dst[32]; @@ -196,6 +196,7 @@ static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); +static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later static void cliHandleResp(SCliConn* conn); // handle except about conn @@ -543,6 +544,7 @@ void cliConnTimeout(uv_timer_t* handle) { taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, UV_ECANCELED); } void cliReadTimeoutCb(uv_timer_t* handle) { @@ -719,7 +721,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); if (conn->list == NULL) { - conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1); + conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr) + 1); } SConnList* pList = conn->list; @@ -878,7 +880,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { connList->list->numOfConn--; connList->size--; } else { - SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1); + SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr) + 1); if (connList != NULL) connList->list->numOfConn--; } conn->list = NULL; @@ -923,7 +925,7 @@ static void cliDestroy(uv_handle_t* handle) { transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); - taosMemoryFree(conn->ip); + taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); cliDestroyConnMsgs(conn, true); @@ -1168,7 +1170,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { if (conn == NULL) { conn = cliCreateConn(pThrd); conn->pBatch = pBatch; - conn->ip = taosStrdup(pList->dst); + conn->dstAddr = taosStrdup(pList->dst); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); if (ipaddr == 0xffffffff) { @@ -1213,6 +1215,8 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; + + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, -1); return; } @@ -1271,11 +1275,11 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { STraceId* trace = &pMsg->msg.info.traceId; tGError("%s msg %s failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - TMSG_INFO(pMsg->msg.msgType), pConn, pConn->ip, uv_strerror(status)); + TMSG_INFO(pMsg->msg.msgType), pConn, pConn->dstAddr, uv_strerror(status)); if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1); + SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { int32_t elapse = cTimestamp - item->timestamp; @@ -1287,12 +1291,12 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem)); + taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr) + 1, &item, sizeof(SFailFastItem)); } } } else { tError("%s batch msg failed to send, conn %p failed to connect to %s, reason: %s", CONN_GET_INST_LABEL(pConn), - pConn, pConn->ip, uv_strerror(status)); + pConn, pConn->dstAddr, uv_strerror(status)); cliDestroyBatch(pConn->pBatch); pConn->pBatch = NULL; } @@ -1314,6 +1318,7 @@ void cliConnCb(uv_connect_t* req, int status) { } if (status != 0) { + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); if (timeout == false) { cliHandleFastFail(pConn, status); } else if (timeout == true) { @@ -1483,9 +1488,34 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) } static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later + uint32_t addr = taosGetIpv4FromFqdn(fqdn); + if (addr != 0xffffffff) { + uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1); + if (addr != *v) { + char old[64] = {0}, new[64] = {0}; + tinet_ntoa(old, *v); + tinet_ntoa(new, addr); + tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); + taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); + } + } return; } +static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) { + if (dst == NULL) return; + + int16_t i = 0, len = strlen(dst); + for (i = len - 1; i >= 0; i--) { + if (dst[i] == ':') break; + } + if (i > 0) { + char fqdn[TSDB_FQDN_LEN + 1] = {0}; + memcpy(fqdn, dst, i); + cliUpdateFqdnCache(cache, fqdn); + } +} + static void doFreeTimeoutMsg(void* param) { STaskArg* arg = param; SCliMsg* pMsg = arg->param1; @@ -1560,7 +1590,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - conn->ip = taosStrdup(addr); + conn->dstAddr = taosStrdup(addr); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); if (ipaddr == 0xffffffff) { @@ -1578,7 +1608,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { addr.sin_addr.s_addr = ipaddr; addr.sin_port = (uint16_t)htons(port); - tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->ip); + tGTrace("%s conn %p try to connect to %s", pTransInst->label, conn, conn->dstAddr); pThrd->newConnCount++; int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); if (fd == -1) { @@ -1608,6 +1638,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; + cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliHandleFastFail(conn, ret); return; } From 8f464bd21f380ce7e492e60a900678565472d215 Mon Sep 17 00:00:00 2001 From: wade zhang <95411902+gccgdb1234@users.noreply.github.com> Date: Wed, 19 Jul 2023 09:42:10 +0800 Subject: [PATCH 2/5] docs: correct the description of auto.offset.reset --- docs/en/07-develop/07-tmq.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index 506a8dcc46..ccf39ef581 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -298,7 +298,7 @@ You configure the following parameters when creating a consumer: | `td.connect.port` | string | Port of the server side | | | `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. | | `client.id` | string | Client ID | Maximum length: 192. | -| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) | +| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior; `latest`: subscribe from the latest data; or `none`: can't subscribe without committed offset| | `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true | | `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds | | `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false From 4cdfd0e1e8be1d6ed556fe3638efeb542a7c5eb0 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 19 Jul 2023 10:50:46 +0800 Subject: [PATCH 3/5] conflict --- source/dnode/mnode/impl/src/mndUser.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 65393399d5..06523c7c9b 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -620,7 +620,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { goto _OVER; } - if (createReq.user[0] == 0) { + if (createReq.user[0] == 0 || strlen(createReq.pass) >= TSDB_PASSWORD_LEN) { terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; goto _OVER; } From 8f709ea7829173345c7cccc505643327f927d022 Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 19 Jul 2023 13:06:59 +0800 Subject: [PATCH 4/5] fix test case --- tests/system-test/1-insert/boundary.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/1-insert/boundary.py b/tests/system-test/1-insert/boundary.py index 29dcbc7c46..4476236ca6 100644 --- a/tests/system-test/1-insert/boundary.py +++ b/tests/system-test/1-insert/boundary.py @@ -33,7 +33,7 @@ class TDTestCase: self.colname_length_boundary = self.boundary.COL_KEY_MAX_LENGTH self.tagname_length_boundary = self.boundary.TAG_KEY_MAX_LENGTH self.username_length_boundary = 23 - self.password_length_boundary = 128 + self.password_length_boundary = 31 def dbname_length_check(self): dbname_length = randint(1,self.dbname_length_boundary-1) for dbname in [tdCom.get_long_name(self.dbname_length_boundary),tdCom.get_long_name(dbname_length)]: From 69fae6200387df5d79f970864658e9d5373405be Mon Sep 17 00:00:00 2001 From: tangfangzhi Date: Wed, 19 Jul 2023 15:04:39 +0800 Subject: [PATCH 5/5] error message --- source/dnode/mnode/impl/src/mndUser.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 06523c7c9b..cbde56a860 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -620,7 +620,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { goto _OVER; } - if (createReq.user[0] == 0 || strlen(createReq.pass) >= TSDB_PASSWORD_LEN) { + if (createReq.user[0] == 0) { terrno = TSDB_CODE_MND_INVALID_USER_FORMAT; goto _OVER; } @@ -630,6 +630,11 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { goto _OVER; } + if (strlen(createReq.pass) >= TSDB_PASSWORD_LEN){ + terrno = TSDB_CODE_PAR_NAME_OR_PASSWD_TOO_LONG; + goto _OVER; + } + pUser = mndAcquireUser(pMnode, createReq.user); if (pUser != NULL) { terrno = TSDB_CODE_MND_USER_ALREADY_EXIST;