refactor transport

This commit is contained in:
Yihao Deng 2024-07-23 11:25:36 +00:00
parent 4561a888b6
commit 7880800bee
3 changed files with 33 additions and 13 deletions

View File

@ -870,7 +870,10 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); int32_t code = transAllocBuffer(pBuf, buf);
if (code < 0) {
cliDestroyConn(conn, true);
}
} }
static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
// impl later // impl later

View File

@ -115,17 +115,20 @@ int32_t transInitBuffer(SConnBuffer* buf) {
buf->invalid = 0; buf->invalid = 0;
return 0; return 0;
} }
int transDestroyBuffer(SConnBuffer* p) { int32_t transDestroyBuffer(SConnBuffer* p) {
taosMemoryFree(p->buf); taosMemoryFree(p->buf);
p->buf = NULL; p->buf = NULL;
return 0; return 0;
} }
int transClearBuffer(SConnBuffer* buf) { int32_t transClearBuffer(SConnBuffer* buf) {
SConnBuffer* p = buf; SConnBuffer* p = buf;
if (p->cap > BUFFER_CAP) { if (p->cap > BUFFER_CAP) {
p->cap = BUFFER_CAP; p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP); p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP);
if (p->buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
p->left = -1; p->left = -1;
p->len = 0; p->len = 0;
@ -134,27 +137,31 @@ int transClearBuffer(SConnBuffer* buf) {
return 0; return 0;
} }
int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) { int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) {
static const int HEADSIZE = sizeof(STransMsgHead); static const int HEADSIZE = sizeof(STransMsgHead);
int32_t code = 0;
SConnBuffer* p = connBuf; SConnBuffer* p = connBuf;
if (p->left != 0 || p->total <= 0) { if (p->left != 0 || p->total <= 0) {
return -1; return TSDB_CODE_INVALID_MSG;
} }
int total = p->total; int total = p->total;
if (total >= HEADSIZE && !p->invalid) { if (total >= HEADSIZE && !p->invalid) {
*buf = taosMemoryCalloc(1, total); *buf = taosMemoryCalloc(1, total);
if (*buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*buf, p->buf, total); memcpy(*buf, p->buf, total);
if (transResetBuffer(connBuf, resetBuf) < 0) { if ((code = transResetBuffer(connBuf, resetBuf)) < 0) {
return -1; return code;
} }
} else { } else {
total = -1; total = -1;
return TSDB_CODE_INVALID_MSG;
} }
return total; return total;
} }
int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
SConnBuffer* p = connBuf; SConnBuffer* p = connBuf;
if (p->total < p->len) { if (p->total < p->len) {
int left = p->len - p->total; int left = p->len - p->total;
@ -170,15 +177,18 @@ int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) {
if (resetBuf) { if (resetBuf) {
p->cap = BUFFER_CAP; p->cap = BUFFER_CAP;
p->buf = taosMemoryRealloc(p->buf, p->cap); p->buf = taosMemoryRealloc(p->buf, p->cap);
if (p->buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
} }
} else { } else {
ASSERTS(0, "invalid read from sock buf"); ASSERTS(0, "invalid read from sock buf");
return -1; return TSDB_CODE_INVALID_MSG;
} }
return 0; return 0;
} }
int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
/* /*
* formate of data buffer: * formate of data buffer:
* |<--------------------------data from socket------------------------------->| * |<--------------------------data from socket------------------------------->|
@ -195,6 +205,9 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) {
} else { } else {
p->cap = p->left + p->len; p->cap = p->left + p->len;
p->buf = taosMemoryRealloc(p->buf, p->cap); p->buf = taosMemoryRealloc(p->buf, p->cap);
if (p->buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
uvBuf->base = p->buf + p->len; uvBuf->base = p->buf + p->len;
uvBuf->len = p->left; uvBuf->len = p->left;
} }

View File

@ -202,7 +202,11 @@ static int32_t addHandleToAcceptloop(void* arg);
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SSvrConn* conn = handle->data; SSvrConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf); int32_t code = transAllocBuffer(pBuf, buf);
if (code < 0) {
tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code));
destroyConn(conn, true);
}
} }
// refers specifically to query or insert timeout // refers specifically to query or insert timeout