Merge pull request #25809 from taosdata/fix/optReadBufForQueryLargeData3

Fix/optReadBufForQueryLargeData3
This commit is contained in:
Hongze Cheng 2024-05-17 11:43:33 +08:00 committed by GitHub
commit 671eb83d4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 22 additions and 10 deletions

View File

@ -302,8 +302,8 @@ int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
int transResetBuffer(SConnBuffer* connBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf);
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
int transSetConnOption(uv_tcp_t* stream, int keepalive);

View File

@ -382,13 +382,18 @@ void cliHandleResp(SCliConn* conn) {
STransMsgHead* pHead = NULL;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf);
if (msgLen <= 0) {
taosMemoryFree(pHead);
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
return;
}
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn);
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
}

View File

@ -126,7 +126,7 @@ int transClearBuffer(SConnBuffer* buf) {
return 0;
}
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
static const int HEADSIZE = sizeof(STransMsgHead);
SConnBuffer* p = connBuf;
@ -137,7 +137,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
if (total >= HEADSIZE && !p->invalid) {
*buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);
if (transResetBuffer(connBuf) < 0) {
if (transResetBuffer(connBuf, resetBuf) < 0) {
return -1;
}
} else {
@ -146,7 +146,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
return total;
}
int transResetBuffer(SConnBuffer* connBuf) {
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
SConnBuffer* p = connBuf;
if (p->total < p->len) {
int left = p->len - p->total;
@ -159,8 +159,10 @@ int transResetBuffer(SConnBuffer* connBuf) {
p->total = 0;
p->len = 0;
if (p->cap > BUFFER_CAP) {
p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, p->cap);
if (resetBuf) {
p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, p->cap);
}
}
} else {
ASSERTS(0, "invalid read from sock buf");

View File

@ -342,11 +342,15 @@ static bool uvHandleReq(SSvrConn* pConn) {
STransMsgHead* pHead = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead);
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
return false;
}
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn);
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
@ -676,7 +680,8 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
taosMemoryFree(smsg);
}
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
static void destroyAllConn(SWorkThrd* pThrd) {
static void destroyAllConn(SWorkThrd* pThrd) {
tTrace("thread %p destroy all conn ", pThrd);
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* h = QUEUE_HEAD(&pThrd->conn);