diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 10df19d3c8..72b9889afa 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -628,11 +628,13 @@ bool filterToRmTimoutReq(void* key, void* arg) { return false; } void cliConnTimeout__checkReq(uv_timer_t* handle) { - queue set; + int32_t code = 0; + queue set; QUEUE_INIT(&set); SCliConn* conn = handle->data; SCliThrd* pThrd = conn->hostThrd; + STrans* pInst = pThrd->pInst; if (transQueueSize(&conn->reqsSentOut) == 0) { return; } @@ -645,7 +647,26 @@ void cliConnTimeout__checkReq(uv_timer_t* handle) { SCliReq* pReq = QUEUE_DATA(el, SCliReq, q); STraceId* trace = &pReq->msg.info.traceId; tDebug("%s conn %p req %s timeout, start to free", CONN_GET_INST_LABEL(conn), conn, pReq->msg.msgType); - destroyReqWrapper(pReq, pThrd); + + SReqCtx* pCtx = pReq ? pReq->ctx : NULL; + STransMsg resp = {0}; + resp.code = TSDB_CODE_RPC_TIMEOUT; + resp.msgType = pReq ? pReq->msg.msgType + 1 : 0; + resp.info.cliVer = pInst->compatibilityVer; + resp.info.ahandle = pCtx ? pCtx->ahandle : 0; + resp.info.handle = pReq->msg.info.handle; + if (pReq) { + resp.info.traceId = pReq->msg.info.traceId; + } + + pReq->seq = 0; + code = cliNotifyCb(conn, pReq, &resp); + if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + continue; + } else { + // already notify user + destroyReqWrapper(pReq, pThrd); + } } }