opt read buf

This commit is contained in:
Yihao Deng 2024-05-17 10:27:56 +08:00
parent 15181eb14b
commit c5b3c04036
4 changed files with 29 additions and 18 deletions

View File

@ -302,8 +302,8 @@ int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf);
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf);
bool transReadComplete(SConnBuffer* connBuf);
int transResetBuffer(SConnBuffer* connBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf);
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf);
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf);
int transSetConnOption(uv_tcp_t* stream, int keepalive);

View File

@ -382,13 +382,18 @@ void cliHandleResp(SCliConn* conn) {
STransMsgHead* pHead = NULL;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
int8_t resetBuf = conn->status == ConnAcquire ? 0 : 1;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, resetBuf);
if (msgLen <= 0) {
taosMemoryFree(pHead);
tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn);
return;
}
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), conn);
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
}

View File

@ -126,7 +126,7 @@ int transClearBuffer(SConnBuffer* buf) {
return 0;
}
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
static const int HEADSIZE = sizeof(STransMsgHead);
SConnBuffer* p = connBuf;
@ -137,7 +137,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
if (total >= HEADSIZE && !p->invalid) {
*buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);
if (transResetBuffer(connBuf) < 0) {
if (transResetBuffer(connBuf, resetBuf) < 0) {
return -1;
}
} else {
@ -146,7 +146,7 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
return total;
}
int transResetBuffer(SConnBuffer* connBuf) {
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
SConnBuffer* p = connBuf;
if (p->total < p->len) {
int left = p->len - p->total;
@ -159,8 +159,10 @@ int transResetBuffer(SConnBuffer* connBuf) {
p->total = 0;
p->len = 0;
if (p->cap > BUFFER_CAP) {
p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, p->cap);
if (resetBuf) {
p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, p->cap);
}
}
} else {
ASSERTS(0, "invalid read from sock buf");

View File

@ -342,11 +342,15 @@ static bool uvHandleReq(SSvrConn* pConn) {
STransMsgHead* pHead = NULL;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead);
int8_t resetBuf = pConn->status == ConnAcquire ? 0 : 1;
int msgLen = transDumpFromBuffer(&pConn->readBuf, (char**)&pHead, resetBuf);
if (msgLen <= 0) {
tError("%s conn %p read invalid packet", transLabel(pTransInst), pConn);
return false;
}
if (resetBuf == 0) {
tTrace("%s conn %p not reset read buf", transLabel(pTransInst), pConn);
}
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tError("%s conn %p recv invalid packet, failed to decompress", transLabel(pTransInst), pConn);
@ -677,17 +681,17 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
}
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
static void destroyAllConn(SWorkThrd* pThrd) {
tTrace("thread %p destroy all conn ", pThrd);
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* h = QUEUE_HEAD(&pThrd->conn);
QUEUE_REMOVE(h);
QUEUE_INIT(h);
tTrace("thread %p destroy all conn ", pThrd);
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
queue* h = QUEUE_HEAD(&pThrd->conn);
QUEUE_REMOVE(h);
QUEUE_INIT(h);
SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue);
while (T_REF_VAL_GET(c) >= 2) {
transUnrefSrvHandle(c);
SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue);
while (T_REF_VAL_GET(c) >= 2) {
transUnrefSrvHandle(c);
}
transUnrefSrvHandle(c);
transUnrefSrvHandle(c);
}
}
void uvWorkerAsyncCb(uv_async_t* handle) {