Merge branch '3.0' into enh/opt-transport

This commit is contained in:
yihaoDeng 2024-09-16 13:23:10 +08:00
parent 9ecb9b23e7
commit 723e863ec4
1 changed files with 23 additions and 2 deletions

View File

@ -628,11 +628,13 @@ bool filterToRmTimoutReq(void* key, void* arg) {
return false;
}
void cliConnTimeout__checkReq(uv_timer_t* handle) {
int32_t code = 0;
queue set;
QUEUE_INIT(&set);
SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
if (transQueueSize(&conn->reqsSentOut) == 0) {
return;
}
@ -645,8 +647,27 @@ void cliConnTimeout__checkReq(uv_timer_t* handle) {
SCliReq* pReq = QUEUE_DATA(el, SCliReq, q);
STraceId* trace = &pReq->msg.info.traceId;
tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, pReq->msg.msgType);
SReqCtx* pCtx = pReq ? pReq->ctx : NULL;
STransMsg resp = {0};
resp.code = TSDB_CODE_RPC_TIMEOUT;
resp.msgType = pReq ? pReq->msg.msgType + 1 : 0;
resp.info.cliVer = pInst->compatibilityVer;
resp.info.ahandle = pCtx ? pCtx->ahandle : 0;
resp.info.handle = pReq->msg.info.handle;
if (pReq) {
resp.info.traceId = pReq->msg.info.traceId;
}
pReq->seq = 0;
code = cliNotifyCb(conn, pReq, &resp);
if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
continue;
} else {
// already notify user
destroyReqWrapper(pReq, pThrd);
}
}
}
void* createConnPool(int size) {