refactor transport

This commit is contained in:
yihaoDeng 2024-08-24 20:44:51 +08:00
parent bdc446314d
commit 46447c2bb2
1 changed files with 92 additions and 72 deletions

View File

@ -162,7 +162,8 @@ static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn);
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn); static int32_t cliCreateConn(SCliThrd* pThrd, const SCliReq* pReq, SCliConn** pCliConn);
static int32_t cliDestroyConn2(SCliConn* pConn);
static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port); static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port);
// register conn timer // register conn timer
@ -195,12 +196,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp);
void cliResetConnTimer(SCliConn* conn); void cliResetConnTimer(SCliConn* conn);
static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroy(uv_handle_t* handle);
static void cliDestroy(uv_handle_t* handle); static void cliSend(SCliConn* pConn);
static void cliSend(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static void doFreeTimeoutMsg(void* param); static void doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq); static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq);
@ -1162,31 +1162,62 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
} }
} }
static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) {
if (transQueuePush(&conn->reqs, pReq) != 0) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return 0;
}
static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) {
// do nothing
SCliReq* pTail = transQueuePop(&conn->reqs);
if (pTail == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pReq != NULL) {
*pReq = pTail;
}
return 0;
}
static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) {
SCliConn* pConn = NULL; SCliConn* pConn = NULL;
int32_t code = cliCreateConn(pThrd, &pConn); int32_t code = cliCreateConn(pThrd, pReq, &pConn);
if (code != 0) { if (code != 0) {
return code; return code;
} }
code = cliAddReqToConn(pConn, pReq);
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
code = cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet));
if (code != TSDB_CODE_RPC_ASYNC_IN_PROCESS) {
cliRmReqFromConn(pConn, NULL);
cliDestroyConn2(pConn);
return code;
} else {
}
return code;
}
// not any ref,
static int32_t cliDestroyConn2(SCliConn* conn) { return 0; }
static int32_t cliCreateConn(SCliThrd* pThrd, const SCliReq* pReq, SCliConn** pCliConn) {
char addr[TSDB_FQDN_LEN + 64] = {0}; char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet));
pConn->dstAddr = taosStrdup(addr);
code = addConnToHeapCache(pThrd->connHeapCache, pConn);
transQueuePush(&pConn->reqs, pReq);
return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet));
}
static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) {
int32_t code = 0; int32_t code = 0;
SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn));
if (conn == NULL) { if (conn == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
conn->dstAddr = taosStrdup(addr);
if (conn->dstAddr == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(code, NULL, _failed);
}
transReqQueueInit(&conn->wreqQueue); transReqQueueInit(&conn->wreqQueue);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
@ -1239,48 +1270,12 @@ _failed:
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);
(void)transDestroyBuffer(&conn->readBuf); (void)transDestroyBuffer(&conn->readBuf);
transQueueDestroy(&conn->reqs); transQueueDestroy(&conn->reqs);
taosMemoryFree(conn->dstAddr);
} }
taosMemoryFree(conn); taosMemoryFree(conn);
return code; return code;
} }
static void cliDestroyConn(SCliConn* conn, bool clear) { static void cliDestroyConn(SCliConn* conn, bool clear) {}
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
conn->broken = true;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
conn->broken = true;
if (conn->list == NULL && conn->dstAddr) {
conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr));
}
if (conn->list) {
SConnList* list = conn->list;
list->list->numOfConn--;
if (conn->status == ConnInPool) {
list->size--;
}
}
conn->list = NULL;
(void)transReleaseExHandle(transGetRefMgt(), conn->refId);
(void)transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
if (conn->task != NULL) {
transDQCancel(pThrd->timeoutQueue, conn->task);
conn->task = NULL;
}
cliResetConnTimer(conn);
if (clear) {
if (!uv_is_closing((uv_handle_t*)conn->stream)) {
(void)uv_read_stop(conn->stream);
uv_close((uv_handle_t*)conn->stream, cliDestroy);
}
}
}
static void cliDestroy(uv_handle_t* handle) { static void cliDestroy(uv_handle_t* handle) {
if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) {
return; return;
@ -1563,6 +1558,19 @@ _exception:
pConn->pBatch = NULL; pConn->pBatch = NULL;
return; return;
} }
int32_t cliSend2(SCliConn* pConn) {
}
int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) {
int32_t code = 0;
transQueuePush(&pConn->reqs, pCliMsg);
cliSend(pConn);
return 0;
}
void cliSend(SCliConn* pConn) { void cliSend(SCliConn* pConn) {
SCliThrd* pThrd = pConn->hostThrd; SCliThrd* pThrd = pConn->hostThrd;
STrans* pInst = pThrd->pInst; STrans* pInst = pThrd->pInst;
@ -1662,7 +1670,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
uint32_t ipaddr; uint32_t ipaddr;
int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr); int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr);
if (code != 0) { if (code != 0) {
TAOS_CHECK_GOTO(code, &lino, _exception); TAOS_CHECK_GOTO(code, &lino, _exception1);
} }
struct sockaddr_in addr; struct sockaddr_in addr;
@ -1674,25 +1682,25 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10);
if (fd < 0) { if (fd < 0) {
TAOS_CHECK_GOTO(terrno, &lino, _exception); TAOS_CHECK_GOTO(terrno, &lino, _exception1);
} }
int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd);
if (ret != 0) { if (ret != 0) {
tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
} }
ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); ret = transSetConnOption((uv_tcp_t*)conn->stream, 20);
if (ret != 0) { if (ret != 0) {
tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1);
return code; return code;
} }
uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL;
if (timer == NULL) { if (timer == NULL) {
timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
if (timer == NULL) { if (timer == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception1);
} }
tDebug("no available timer, create a timer %p", timer); tDebug("no available timer, create a timer %p", timer);
@ -1703,24 +1711,33 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) {
ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
// cliResetConnTimer(conn); tError("%s conn %p failed to connect, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2);
cliHandleFastFail(conn, -1);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} }
ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
if (ret != 0) { if (ret != 0) {
tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret));
cliResetConnTimer(conn); TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2);
cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
cliHandleFastFail(conn, -1);
return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
} }
return TSDB_CODE_RPC_ASYNC_IN_PROCESS; return TSDB_CODE_RPC_ASYNC_IN_PROCESS;
_exception:
tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); _exception1:
taosMemoryFree(conn); // free conn later tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code));
// taosMemoryFree(conn); // free conn later
return code;
_exception2:
// already registered to uv, callback handle error
tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code));
// cliRmReqFromConn(conn, NULL);
// cliResetConnTimer(conn);
// cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr);
// cliHandleFastFail(conn, code);
// // taosMemoryFree(conn);
return code; return code;
} }
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
@ -1747,7 +1764,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
return; return;
} }
if (conn == NULL) { if (conn == NULL) {
code = cliCreateConn(pThrd, &conn); code = cliCreateConn(pThrd, NULL, &conn);
if (code != 0) { if (code != 0) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label, tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label,
pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(code)); pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(code));
@ -2167,7 +2184,7 @@ void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) {
return; return;
} }
code = cliCreateConn(pThrd, &pConn); code = cliCreateConn(pThrd, NULL, &pConn);
pConn->dstAddr = taosStrdup(addr); pConn->dstAddr = taosStrdup(addr);
code = addConnToHeapCache(pThrd->connHeapCache, pConn); code = addConnToHeapCache(pThrd->connHeapCache, pConn);
@ -2197,9 +2214,12 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) {
// do nothing, notifyCb // do nothing, notifyCb
return; return;
} else { } else {
code = cliSendReq(pConn, pReq);
} }
tTrace("%s conn %p ready", pInst->label, pConn); tTrace("%s conn %p ready", pInst->label, pConn);
return;
_exception: _exception:
resp.code = code; resp.code = code;
(void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp); (void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp);