diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f256c96037..a81d6db80f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -7,8 +7,7 @@ * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * + * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ @@ -211,6 +210,7 @@ typedef struct SConnBuffer { char* buf; int len; int cap; + int left; int total; } SConnBuffer; @@ -282,6 +282,8 @@ int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); +int transResetBuffer(SConnBuffer* connBuf); +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf); int transSetConnOption(uv_tcp_t* stream); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index c747e69339..ce9c6fb306 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -17,10 +17,10 @@ #ifdef USE_UV #include #endif -#include "zlib.h" -#include "thttp.h" #include "taoserror.h" +#include "thttp.h" #include "tlog.h" +#include "zlib.h" static int32_t taosBuildHttpHeader(const char* server, int32_t contLen, char* pHead, int32_t headLen, EHttpCompFlag flag) { @@ -174,7 +174,7 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32 #else int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag) { - int32_t code = -1; + int32_t code = -1; TdSocketPtr pSocket = NULL; uint32_t ip = taosGetIpv4FromFqdn(server); @@ -231,4 +231,4 @@ SEND_OVER: return code; } -#endif \ No newline at end of file +#endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 70d56dca13..54ffcabc8d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -323,7 +323,8 @@ void cliHandleResp(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); + STransMsgHead* pHead = NULL; + transDumpFromBuffer(&conn->readBuf, (char**)&pHead); pHead->code = htonl(pHead->code); pHead->msgLen = htonl(pHead->msgLen); @@ -366,7 +367,6 @@ void cliHandleResp(SCliConn* conn) { } } // buf's mem alread translated to transMsg.pCont - transClearBuffer(&conn->readBuf); if (!CONN_NO_PERSIST_BY_APP(conn)) { transMsg.info.handle = (void*)conn->refId; tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); @@ -636,6 +636,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); + + transInitBuffer(&conn->readBuf); QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -651,8 +653,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); transRemoveExHandle(transGetRefMgt(), conn->refId); - conn->refId = -1; + transDestroyBuffer(&conn->readBuf); + conn->refId = -1; if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); if (clear) { @@ -678,7 +681,6 @@ static void cliDestroy(uv_handle_t* handle) { tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); transReqQueueClear(&conn->wreqQueue); - transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn); } static bool cliHandleNoResp(SCliConn* conn) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 155cdd1062..fb59aafb33 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -16,6 +16,8 @@ #include "transComm.h" +#define BUFFER_CAP 4096 + static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; @@ -111,12 +113,56 @@ int transGetSockDebugInfo(struct sockaddr* sockname, char* dst) { return r; } int transInitBuffer(SConnBuffer* buf) { - transClearBuffer(buf); + buf->cap = BUFFER_CAP; + buf->buf = taosMemoryCalloc(1, BUFFER_CAP); + buf->left = -1; + buf->len = 0; + buf->total = 0; return 0; } +int transDestroyBuffer(SConnBuffer* buf) { + taosMemoryFree(buf->buf); + return 0; +} + int transClearBuffer(SConnBuffer* buf) { - memset(buf, 0, sizeof(*buf)); - buf->total = -1; + SConnBuffer* p = buf; + if (p->cap > BUFFER_CAP) { + p->cap = BUFFER_CAP; + p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP); + } + p->left = -1; + p->len = 0; + p->total = 0; + return 0; +} + +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) { + SConnBuffer* p = connBuf; + if (p->left != 0) { + return -1; + } + int total = connBuf->total; + *buf = taosMemoryCalloc(1, total); + memcpy(*buf, p->buf, total); + + transResetBuffer(connBuf); + return total; +} + +int transResetBuffer(SConnBuffer* connBuf) { + SConnBuffer* p = connBuf; + if (p->total <= p->len) { + int left = p->len - p->total; + memmove(p->buf, p->buf + p->total, left); + p->left = -1; + p->total = 0; + p->len = left; + } else { + p->left = -1; + p->total = 0; + p->len = 0; + } return 0; } int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { @@ -126,54 +172,39 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { * |<------STransMsgHead------->|<-------------------userdata--------------->|<-----auth data----->|<----user * info--->| */ - static const int CAPACITY = sizeof(STransMsgHead); - SConnBuffer* p = connBuf; - if (p->cap == 0) { - p->buf = (char*)taosMemoryCalloc(CAPACITY, sizeof(char)); - tTrace("internal malloc mem:%p, size:%d", p->buf, CAPACITY); - p->len = 0; - p->cap = CAPACITY; - p->total = -1; - uvBuf->base = p->buf; - uvBuf->len = CAPACITY; - } else if (p->total == -1 && p->len < CAPACITY) { - uvBuf->base = p->buf + p->len; - uvBuf->len = CAPACITY - p->len; - } else { - p->cap = p->total; - p->buf = taosMemoryRealloc(p->buf, p->cap); - tTrace("internal realloc mem:%p, size:%d", p->buf, p->cap); - - uvBuf->base = p->buf + p->len; + uvBuf->base = p->buf + p->len; + if (p->left == -1) { uvBuf->len = p->cap - p->len; + } else { + if (p->left < p->cap - p->len) { + uvBuf->len = p->left; + } else { + p->buf = taosMemoryRealloc(p->buf, p->left + p->len); + uvBuf->base = p->buf + p->len; + uvBuf->len = p->left; + } } return 0; } // check whether already read complete bool transReadComplete(SConnBuffer* connBuf) { - if (connBuf->total == -1 && connBuf->len >= sizeof(STransMsgHead)) { - STransMsgHead head; - memcpy((char*)&head, connBuf->buf, sizeof(head)); - int32_t msgLen = (int32_t)htonl(head.msgLen); - connBuf->total = msgLen; + SConnBuffer* p = connBuf; + if (p->len >= sizeof(STransMsgHead)) { + if (p->left == -1) { + STransMsgHead head; + memcpy((char*)&head, connBuf->buf, sizeof(head)); + int32_t msgLen = (int32_t)htonl(head.msgLen); + p->total = msgLen; + } + if (p->total >= p->len) { + p->left = p->total - p->len; + } else { + p->left = 0; + } } - if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) { - return true; - } - return false; -} -int transPackMsg(STransMsgHead* msgHead, bool sercured, bool auth) { return 0; } - -int transUnpackMsg(STransMsgHead* msgHead) { return 0; } -int transDestroyBuffer(SConnBuffer* buf) { - if (buf->cap > 0) { - taosMemoryFreeClear(buf->buf); - } - transClearBuffer(buf); - - return 0; + return p->left == 0 ? true : false; } int transSetConnOption(uv_tcp_t* stream) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index fe7ab47fee..e360926b40 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -212,9 +212,10 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) { } static void uvHandleReq(SSvrConn* pConn) { - SConnBuffer* pBuf = &pConn->readBuf; - char* msg = pBuf->buf; - uint32_t msgLen = pBuf->len; + STransMsgHead* msg = NULL; + int msgLen = 0; + + msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&msg); STransMsgHead* pHead = (STransMsgHead*)msg; pHead->code = htonl(pHead->code); @@ -761,6 +762,7 @@ static SSvrConn* createConn(void* hThrd) { memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; + transInitBuffer(&pConn->readBuf); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = pConn;