refactor transport

This commit is contained in:
Yihao Deng 2024-05-16 11:42:14 +08:00
parent 29215d2411
commit 77c7ee264c
1 changed files with 45 additions and 50 deletions

View File

@ -175,6 +175,7 @@ static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead);
static int32_t allocConnRef(SCliConn* conn, bool update); static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
void cliResetConnTimer(SCliConn* conn);
static SCliConn* cliCreateConn(SCliThrd* thrd); static SCliConn* cliCreateConn(SCliThrd* thrd);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
@ -367,10 +368,8 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) {
return false; return false;
} }
void cliHandleResp(SCliConn* conn) { void cliResetConnTimer(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (conn->timer) { if (conn->timer) {
if (uv_is_active((uv_handle_t*)conn->timer)) { if (uv_is_active((uv_handle_t*)conn->timer)) {
tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn); tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn);
@ -380,6 +379,40 @@ void cliHandleResp(SCliConn* conn) {
conn->timer->data = NULL; conn->timer->data = NULL;
conn->timer = NULL; conn->timer = NULL;
} }
}
void cliHandleBatchResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
cliResetConnTimer(conn);
STransMsgHead* pHead = NULL;
int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead);
if (transDecompressMsg((char**)&pHead, msgLen) < 0) {
tDebug("%s conn %p recv invalid packet, failed to decompress", CONN_GET_INST_LABEL(conn), conn);
}
pHead->code = htonl(pHead->code);
pHead->msgLen = htonl(pHead->msgLen);
STransMsg transMsg = {0};
transMsg.contLen = transContLenFromMsg(pHead->msgLen);
transMsg.pCont = transContFromHead((char*)pHead);
transMsg.code = pHead->code;
transMsg.msgType = pHead->msgType;
transMsg.info.ahandle = NULL;
transMsg.info.traceId = pHead->traceId;
transMsg.info.hasEpSet = pHead->hasEpSet;
transMsg.info.cliVer = htonl(pHead->compatibilityVer);
SCliMsg* pMsg = NULL;
STransConnCtx* pCtx = pMsg->ctx;
if (cliAppCb(conn, &transMsg, pMsg) != 0) {
return;
}
}
void cliHandleResp(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
cliResetConnTimer(conn);
STransMsgHead* pHead = NULL; STransMsgHead* pHead = NULL;
@ -578,11 +611,7 @@ void cliConnTimeout(uv_timer_t* handle) {
tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
uv_timer_stop(handle); cliResetConnTimer(conn);
handle->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, UV_ECANCELED); cliHandleFastFail(conn, UV_ECANCELED);
} }
@ -757,12 +786,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
allocConnRef(conn, true); allocConnRef(conn, true);
SCliThrd* thrd = conn->hostThrd; SCliThrd* thrd = conn->hostThrd;
if (conn->timer != NULL) { cliResetConnTimer(conn);
uv_timer_stop(conn->timer);
taosArrayPush(thrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
if (T_REF_VAL_GET(conn) > 1) { if (T_REF_VAL_GET(conn) > 1) {
transUnrefCliHandle(conn); transUnrefCliHandle(conn);
} }
@ -948,12 +972,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
transDQCancel(pThrd->timeoutQueue, conn->task); transDQCancel(pThrd->timeoutQueue, conn->task);
conn->task = NULL; conn->task = NULL;
} }
if (conn->timer != NULL) { cliResetConnTimer(conn);
uv_timer_stop(conn->timer);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
}
if (clear) { if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) { if (!uv_is_closing((uv_handle_t*)conn->stream)) {
@ -968,12 +987,7 @@ static void cliDestroy(uv_handle_t* handle) {
} }
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SCliThrd* pThrd = conn->hostThrd; SCliThrd* pThrd = conn->hostThrd;
if (conn->timer != NULL) { cliResetConnTimer(conn);
uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL;
conn->timer = NULL;
}
transReleaseExHandle(transGetRefMgt(), conn->refId); transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
@ -1230,11 +1244,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip);
if (ipaddr == 0xffffffff) { if (ipaddr == 0xffffffff) {
uv_timer_stop(conn->timer); cliResetConnTimer(conn);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleFastFail(conn, -1); cliHandleFastFail(conn, -1);
return; return;
} }
@ -1266,11 +1276,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
uv_timer_stop(conn->timer); cliResetConnTimer(conn);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, -1); cliHandleFastFail(conn, -1);
return; return;
@ -1366,10 +1372,7 @@ void cliConnCb(uv_connect_t* req, int status) {
if (pConn->timer == NULL) { if (pConn->timer == NULL) {
timeout = true; timeout = true;
} else { } else {
uv_timer_stop(pConn->timer); cliResetConnTimer(pConn);
pConn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &pConn->timer);
pConn->timer = NULL;
} }
if (status != 0) { if (status != 0) {
@ -1641,11 +1644,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
if (ipaddr == 0xffffffff) { if (ipaddr == 0xffffffff) {
uv_timer_stop(conn->timer); cliResetConnTimer(conn);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliHandleExcept(conn); cliHandleExcept(conn);
return; return;
} }
@ -1679,11 +1678,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
uv_timer_stop(conn->timer); cliResetConnTimer(conn);
conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL;
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, ret); cliHandleFastFail(conn, ret);
return; return;