From 427a98279eb4d3d46f7de735ba673ca88c9336d3 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 16 May 2024 10:30:09 +0800 Subject: [PATCH 1/3] opt reset buf --- source/libs/transport/inc/transComm.h | 4 ++-- source/libs/transport/src/transCli.c | 7 ++++++- source/libs/transport/src/transComm.c | 12 +++++++----- source/libs/transport/src/transSvr.c | 24 ++++++++++++++---------- 4 files changed, 29 insertions(+), 18 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index da6d71e07b..00944115c0 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -302,8 +302,8 @@ int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); -int transResetBuffer(SConnBuffer* connBuf); -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf); +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); int transSetConnOption(uv_tcp_t* stream, int keepalive); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index dfd7630f35..a1b09b9ee8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -382,13 +382,18 @@ void cliHandleResp(SCliConn* conn) { STransMsgHead* pHead = NULL; - int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead); + int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1; + int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf); if (msgLen <= 0) { taosMemoryFree(pHead); tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); return; } + if (resetBuf == 0) { + tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn); + } + if (transDecompressMsg((char**)&pHead, msgLen) < 0) { tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index fff13e7ebb..5598aa39e3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -126,7 +126,7 @@ int transClearBuffer(SConnBuffer* buf) { return 0; } -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) { static const int HEADSIZE = sizeof(STransMsgHead); SConnBuffer* p = connBuf; @@ -137,7 +137,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { if (total >= HEADSIZE && !p->invalid) { *buf = taosMemoryCalloc(1, total); memcpy(*buf, p->buf, total); - if (transResetBuffer(connBuf) < 0) { + if (transResetBuffer(connBuf, resetBuf) < 0) { return -1; } } else { @@ -146,7 +146,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { return total; } -int transResetBuffer(SConnBuffer* connBuf) { +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { SConnBuffer* p = connBuf; if (p->total < p->len) { int left = p->len - p->total; @@ -159,8 +159,10 @@ int transResetBuffer(SConnBuffer* connBuf) { p->total = 0; p->len = 0; if (p->cap > BUFFER_CAP) { - p->cap = BUFFER_CAP; - p->buf = taosMemoryRealloc(p->buf, p->cap); + if (resetBuf) { + p->cap = BUFFER_CAP; + p->buf = taosMemoryRealloc(p->buf, p->cap); + } } } else { ASSERTS(0, "invalid read from sock buf"); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 21ad5be869..c0b2eba90a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -342,11 +342,15 @@ static bool uvHandleReq(SSvrConn* pConn) { STransMsgHead* pHead = NULL; - int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead); + int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1; + int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf); if (msgLen <= 0) { tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn); return false; } + if (resetBuf == 0) { + tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn); + } if (transDecompressMsg((char**)&pHead, msgLen) < 0) { tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn); @@ -677,17 +681,17 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) { } static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); } static void destroyAllConn(SWorkThrd* pThrd) { - tTrace("thread %p destroy all conn ", pThrd); - while (!QUEUE_IS_EMPTY(&pThrd->conn)) { - queue* h = QUEUE_HEAD(&pThrd->conn); - QUEUE_REMOVE(h); - QUEUE_INIT(h); + tTrace("thread %p destroy all conn ", pThrd); + while (!QUEUE_IS_EMPTY(&pThrd->conn)) { + queue* h = QUEUE_HEAD(&pThrd->conn); + QUEUE_REMOVE(h); + QUEUE_INIT(h); - SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); - while (T_REF_VAL_GET(c) >= 2) { - transUnrefSrvHandle(c); + SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); + while (T_REF_VAL_GET(c) >= 2) { + transUnrefSrvHandle(c); } - transUnrefSrvHandle(c); + transUnrefSrvHandle(c); } } void uvWorkerAsyncCb(uv_async_t* handle) { From c155cf5eb750ecb1626941a8c60116b4152cc1c0 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 16 May 2024 13:53:41 +0800 Subject: [PATCH 2/3] fix(tsim/less): use int64_t instead of int32_t for 13 digits ts --- utils/tsim/src/simExe.c | 46 +++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/utils/tsim/src/simExe.c b/utils/tsim/src/simExe.c index 394b168b08..bea839057e 100644 --- a/utils/tsim/src/simExe.c +++ b/utils/tsim/src/simExe.c @@ -37,15 +37,15 @@ void simLogSql(char *sql, bool useSharp) { char *simParseHostName(char *varName) { static char hostName[140]; -//#ifdef WINDOWS -// hostName[0] = '\"'; -// taosGetFqdn(&hostName[1]); -// int strEndIndex = strlen(hostName); -// hostName[strEndIndex] = '\"'; -// hostName[strEndIndex + 1] = '\0'; -//#else + //#ifdef WINDOWS + // hostName[0] = '\"'; + // taosGetFqdn(&hostName[1]); + // int strEndIndex = strlen(hostName); + // hostName[strEndIndex] = '\"'; + // hostName[strEndIndex + 1] = '\0'; + //#else sprintf(hostName, "%s", "localhost"); -//#endif + //#endif return hostName; } @@ -276,12 +276,16 @@ int32_t simExecuteExpression(SScript *script, char *exp) { if (op1[0] == '=') { strcpy(simGetVariable(script, var1 + 1, var1Len - 1), t3); } else if (op1[0] == '<') { - val0 = atoi(t0); - val1 = atoi(t3); + int64_t val0 = atoll(t0); + int64_t val1 = atoll(t3); + // val0 = atoi(t0); + // val1 = atoi(t3); if (val0 >= val1) result = -1; } else if (op1[0] == '>') { - val0 = atoi(t0); - val1 = atoi(t3); + int64_t val0 = atoll(t0); + int64_t val1 = atoll(t3); + // val0 = atoi(t0); + // val1 = atoi(t3); if (val0 <= val1) result = -1; } } else { @@ -378,16 +382,14 @@ bool simExecuteRunBackCmd(SScript *script, char *option) { return true; } -void simReplaceDirSep (char *buf){ +void simReplaceDirSep(char *buf) { #ifdef WINDOWS - int i=0; - while(buf[i] != '\0') - { - if(buf[i] == '/') - { - buf[i] = '\\'; - } - i++; + int i = 0; + while (buf[i] != '\0') { + if (buf[i] == '/') { + buf[i] = '\\'; + } + i++; } #endif } @@ -505,7 +507,7 @@ bool simExecuteSystemContentCmd(SScript *script, char *option) { } bool simExecuteSetBIModeCmd(SScript *script, char *option) { - char buf[1024]; + char buf[1024]; simVisuallizeOption(script, option, buf); option = buf; From bc41a5ee3cbabad5effc56d62449589fb0533cbb Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 16 May 2024 14:14:09 +0800 Subject: [PATCH 3/3] refactor transport --- source/libs/transport/src/transSvr.c | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index c0b2eba90a..04f6ac8c95 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -680,18 +680,19 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) { taosMemoryFree(smsg); } static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); } -static void destroyAllConn(SWorkThrd* pThrd) { - tTrace("thread %p destroy all conn ", pThrd); - while (!QUEUE_IS_EMPTY(&pThrd->conn)) { - queue* h = QUEUE_HEAD(&pThrd->conn); - QUEUE_REMOVE(h); - QUEUE_INIT(h); - SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); - while (T_REF_VAL_GET(c) >= 2) { - transUnrefSrvHandle(c); +static void destroyAllConn(SWorkThrd* pThrd) { + tTrace("thread %p destroy all conn ", pThrd); + while (!QUEUE_IS_EMPTY(&pThrd->conn)) { + queue* h = QUEUE_HEAD(&pThrd->conn); + QUEUE_REMOVE(h); + QUEUE_INIT(h); + + SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); + while (T_REF_VAL_GET(c) >= 2) { + transUnrefSrvHandle(c); } - transUnrefSrvHandle(c); + transUnrefSrvHandle(c); } } void uvWorkerAsyncCb(uv_async_t* handle) {