opt reset buf
This commit is contained in:
parent
f8eb217c75
commit
427a98279e
|
@ -302,8 +302,8 @@ int transClearBuffer(SConnBuffer* buf);
|
||||||
int transDestroyBuffer(SConnBuffer* buf);
|
int transDestroyBuffer(SConnBuffer* buf);
|
||||||
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
|
||||||
bool transReadComplete(SConnBuffer* connBuf);
|
bool transReadComplete(SConnBuffer* connBuf);
|
||||||
int transResetBuffer(SConnBuffer* connBuf);
|
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
|
||||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf);
|
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
|
||||||
|
|
||||||
int transSetConnOption(uv_tcp_t* stream, int keepalive);
|
int transSetConnOption(uv_tcp_t* stream, int keepalive);
|
||||||
|
|
||||||
|
|
|
@ -382,13 +382,18 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
|
|
||||||
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
|
int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
|
||||||
|
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf);
|
||||||
if (msgLen <= 0) {
|
if (msgLen <= 0) {
|
||||||
taosMemoryFree(pHead);
|
taosMemoryFree(pHead);
|
||||||
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (resetBuf == 0) {
|
||||||
|
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn);
|
||||||
|
}
|
||||||
|
|
||||||
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
||||||
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,7 +126,7 @@ int transClearBuffer(SConnBuffer* buf) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
|
||||||
static const int HEADSIZE = sizeof(STransMsgHead);
|
static const int HEADSIZE = sizeof(STransMsgHead);
|
||||||
|
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
|
@ -137,7 +137,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
||||||
if (total >= HEADSIZE && !p->invalid) {
|
if (total >= HEADSIZE && !p->invalid) {
|
||||||
*buf = taosMemoryCalloc(1, total);
|
*buf = taosMemoryCalloc(1, total);
|
||||||
memcpy(*buf, p->buf, total);
|
memcpy(*buf, p->buf, total);
|
||||||
if (transResetBuffer(connBuf) < 0) {
|
if (transResetBuffer(connBuf, resetBuf) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -146,7 +146,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
int transResetBuffer(SConnBuffer* connBuf) {
|
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
|
||||||
SConnBuffer* p = connBuf;
|
SConnBuffer* p = connBuf;
|
||||||
if (p->total < p->len) {
|
if (p->total < p->len) {
|
||||||
int left = p->len - p->total;
|
int left = p->len - p->total;
|
||||||
|
@ -159,9 +159,11 @@ int transResetBuffer(SConnBuffer* connBuf) {
|
||||||
p->total = 0;
|
p->total = 0;
|
||||||
p->len = 0;
|
p->len = 0;
|
||||||
if (p->cap > BUFFER_CAP) {
|
if (p->cap > BUFFER_CAP) {
|
||||||
|
if (resetBuf) {
|
||||||
p->cap = BUFFER_CAP;
|
p->cap = BUFFER_CAP;
|
||||||
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
p->buf = taosMemoryRealloc(p->buf, p->cap);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(0, "invalid read from sock buf");
|
ASSERTS(0, "invalid read from sock buf");
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -342,11 +342,15 @@ static bool uvHandleReq(SSvrConn* pConn) {
|
||||||
|
|
||||||
STransMsgHead* pHead = NULL;
|
STransMsgHead* pHead = NULL;
|
||||||
|
|
||||||
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead);
|
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
|
||||||
|
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf);
|
||||||
if (msgLen <= 0) {
|
if (msgLen <= 0) {
|
||||||
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
|
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (resetBuf == 0) {
|
||||||
|
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn);
|
||||||
|
}
|
||||||
|
|
||||||
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
|
||||||
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
|
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
|
||||||
|
|
Loading…
Reference in New Issue