Merge branch '3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-16 16:31:02 +08:00
parent 723e863ec4
commit bbb3d4cc61
1 changed files with 29 additions and 11 deletions

View File

@ -196,6 +196,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn);
static void cliBatchSendCb(uv_write_t* req, int status); static void cliBatchSendCb(uv_write_t* req, int status);
void cliBatchSendImpl(SCliConn* pConn); void cliBatchSendImpl(SCliConn* pConn);
static int32_t cliBatchSend(SCliConn* conn); static int32_t cliBatchSend(SCliConn* conn);
void cliConnCheckTimoutMsg(SCliConn* conn);
bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead); bool cliConnRmReleaseReq(SCliConn* conn, STransMsgHead* pHead);
// register conn timer // register conn timer
static void cliConnTimeout(uv_timer_t* handle); static void cliConnTimeout(uv_timer_t* handle);
@ -364,7 +365,23 @@ void cliResetConnTimer(SCliConn* conn) {
} }
} }
void cliConnSetReadTimeout(SCliConn* conn, int timeout) { void cliConnMaySetReadTimeout(SCliConn* conn, int timeout) {
if (conn->timer != NULL) {
// reset previous timer
cliResetConnTimer(conn);
}
int32_t reqsSentNum = transQueueSize(&conn->reqsSentOut);
if (reqsSentNum == 0) {
return;
}
cliConnCheckTimoutMsg(conn);
if (conn->timer != NULL) {
// reset previous timer
cliResetConnTimer(conn);
}
if (conn->timer == NULL) { if (conn->timer == NULL) {
if (cliGetConnTimer(conn->hostThrd, conn) != 0) { if (cliGetConnTimer(conn->hostThrd, conn) != 0) {
return; return;
@ -529,8 +546,6 @@ void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
cliResetConnTimer(conn);
cliConnClearInitUserMsg(conn); cliConnClearInitUserMsg(conn);
SCliReq* pReq = NULL; SCliReq* pReq = NULL;
@ -599,7 +614,8 @@ void cliHandleResp(SCliConn* conn) {
if (cliMayRecycleConn(conn)) { if (cliMayRecycleConn(conn)) {
return; return;
} }
if (transQueueSize(&conn->reqsSentOut)) cliConnSetReadTimeout(conn, READ_TIMEOUT);
cliConnMaySetReadTimeout(conn, READ_TIMEOUT);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
} }
@ -627,12 +643,11 @@ bool filterToRmTimoutReq(void* key, void* arg) {
} }
return false; return false;
} }
void cliConnTimeout__checkReq(uv_timer_t* handle) {
void cliConnCheckTimoutMsg(SCliConn* conn) {
int32_t code = 0; int32_t code = 0;
queue set; queue set;
QUEUE_INIT(&set); QUEUE_INIT(&set);
SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
if (transQueueSize(&conn->reqsSentOut) == 0) { if (transQueueSize(&conn->reqsSentOut) == 0) {
@ -668,6 +683,12 @@ void cliConnTimeout__checkReq(uv_timer_t* handle) {
destroyReqWrapper(pReq, pThrd); destroyReqWrapper(pReq, pThrd);
} }
} }
return;
}
void cliConnTimeout__checkReq(uv_timer_t* handle) {
SCliConn* conn = handle->data;
cliConnCheckTimoutMsg(conn);
} }
void* createConnPool(int size) { void* createConnPool(int size) {
@ -931,7 +952,6 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int
conn->ipStr = taosStrdup(ip); conn->ipStr = taosStrdup(ip);
conn->port = port; conn->port = port;
QUEUE_INIT(&conn->q);
conn->hostThrd = pThrd; conn->hostThrd = pThrd;
conn->status = ConnNormal; conn->status = ConnNormal;
conn->broken = false; conn->broken = false;
@ -1136,9 +1156,7 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
return; return;
} }
cliResetConnTimer(conn); cliConnMaySetReadTimeout(conn, READ_TIMEOUT);
if (transQueueSize(&conn->reqsSentOut)) cliConnSetReadTimeout(conn, READ_TIMEOUT);
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);