From 46447c2bb2a47482cec00885d5688490ec822bfc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 24 Aug 2024 20:44:51 +0800 Subject: [PATCH] refactor transport --- source/libs/transport/src/transCli.c | 164 +++++++++++++++------------ 1 file changed, 92 insertions(+), 72 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9a0f5fa450..9f35965431 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -162,7 +162,8 @@ static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); -static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn); +static int32_t cliCreateConn(SCliThrd* pThrd, const SCliReq* pReq, SCliConn** pCliConn); +static int32_t cliDestroyConn2(SCliConn* pConn); static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port); // register conn timer @@ -195,12 +196,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update); static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); void cliResetConnTimer(SCliConn* conn); -static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); -static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void cliDestroy(uv_handle_t* handle); -static void cliSend(SCliConn* pConn); -static void cliSendBatch(SCliConn* pConn); -static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); +static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void cliDestroy(uv_handle_t* handle); +static void cliSend(SCliConn* pConn); +static void cliSendBatch(SCliConn* pConn); +static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void doFreeTimeoutMsg(void* param); static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq); @@ -1162,31 +1162,62 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } } +static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) { + if (transQueuePush(&conn->reqs, pReq) != 0) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return 0; +} + +static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) { + // do nothing + SCliReq* pTail = transQueuePop(&conn->reqs); + if (pTail == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (pReq != NULL) { + *pReq = pTail; + } + return 0; +} static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { SCliConn* pConn = NULL; - int32_t code = cliCreateConn(pThrd, &pConn); + int32_t code = cliCreateConn(pThrd, pReq, &pConn); if (code != 0) { return code; } + code = cliAddReqToConn(pConn, pReq); + code = addConnToHeapCache(pThrd->connHeapCache, pConn); + + code = cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + if (code != TSDB_CODE_RPC_ASYNC_IN_PROCESS) { + cliRmReqFromConn(pConn, NULL); + cliDestroyConn2(pConn); + return code; + } else { + } + return code; +} + +// not any ref, +static int32_t cliDestroyConn2(SCliConn* conn) { return 0; } +static int32_t cliCreateConn(SCliThrd* pThrd, const SCliReq* pReq, SCliConn** pCliConn) { char addr[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); - pConn->dstAddr = taosStrdup(addr); - code = addConnToHeapCache(pThrd->connHeapCache, pConn); - - transQueuePush(&pConn->reqs, pReq); - return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); -} - -static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { int32_t code = 0; SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); if (conn == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + conn->dstAddr = taosStrdup(addr); + if (conn->dstAddr == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _failed); + } transReqQueueInit(&conn->wreqQueue); QUEUE_INIT(&conn->q); @@ -1239,48 +1270,12 @@ _failed: taosMemoryFree(conn->stream); (void)transDestroyBuffer(&conn->readBuf); transQueueDestroy(&conn->reqs); + taosMemoryFree(conn->dstAddr); } taosMemoryFree(conn); return code; } -static void cliDestroyConn(SCliConn* conn, bool clear) { - SCliThrd* pThrd = conn->hostThrd; - tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - conn->broken = true; - QUEUE_REMOVE(&conn->q); - QUEUE_INIT(&conn->q); - - conn->broken = true; - if (conn->list == NULL && conn->dstAddr) { - conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr)); - } - - if (conn->list) { - SConnList* list = conn->list; - list->list->numOfConn--; - if (conn->status == ConnInPool) { - list->size--; - } - } - conn->list = NULL; - - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); - conn->refId = -1; - - if (conn->task != NULL) { - transDQCancel(pThrd->timeoutQueue, conn->task); - conn->task = NULL; - } - cliResetConnTimer(conn); - - if (clear) { - if (!uv_is_closing((uv_handle_t*)conn->stream)) { - (void)uv_read_stop(conn->stream); - uv_close((uv_handle_t*)conn->stream, cliDestroy); - } - } -} +static void cliDestroyConn(SCliConn* conn, bool clear) {} static void cliDestroy(uv_handle_t* handle) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { return; @@ -1563,6 +1558,19 @@ _exception: pConn->pBatch = NULL; return; } + + +int32_t cliSend2(SCliConn* pConn) { + +} +int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { + int32_t code = 0; + transQueuePush(&pConn->reqs, pCliMsg); + cliSend(pConn); + + return 0; +} + void cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; @@ -1662,7 +1670,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { uint32_t ipaddr; int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr); if (code != 0) { - TAOS_CHECK_GOTO(code, &lino, _exception); + TAOS_CHECK_GOTO(code, &lino, _exception1); } struct sockaddr_in addr; @@ -1674,25 +1682,25 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd < 0) { - TAOS_CHECK_GOTO(terrno, &lino, _exception); + TAOS_CHECK_GOTO(terrno, &lino, _exception1); } int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1); } ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); if (ret != 0) { tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1); return code; } uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); if (timer == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception1); } tDebug("no available timer, create a timer %p", timer); @@ -1703,24 +1711,33 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - // cliResetConnTimer(conn); - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - cliHandleFastFail(conn, -1); - return TSDB_CODE_RPC_ASYNC_IN_PROCESS; + tError("%s conn %p failed to connect, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2); } ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); if (ret != 0) { tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - cliResetConnTimer(conn); - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - cliHandleFastFail(conn, -1); - return TSDB_CODE_RPC_ASYNC_IN_PROCESS; + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2); } + return TSDB_CODE_RPC_ASYNC_IN_PROCESS; -_exception: - tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - taosMemoryFree(conn); // free conn later + +_exception1: + tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code)); + // taosMemoryFree(conn); // free conn later + return code; + +_exception2: + // already registered to uv, callback handle error + tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code)); + // cliRmReqFromConn(conn, NULL); + + // cliResetConnTimer(conn); + // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); + // cliHandleFastFail(conn, code); + + // // taosMemoryFree(conn); return code; } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { @@ -1747,7 +1764,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { return; } if (conn == NULL) { - code = cliCreateConn(pThrd, &conn); + code = cliCreateConn(pThrd, NULL, &conn); if (code != 0) { tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label, pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(code)); @@ -2167,7 +2184,7 @@ void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) { return; } - code = cliCreateConn(pThrd, &pConn); + code = cliCreateConn(pThrd, NULL, &pConn); pConn->dstAddr = taosStrdup(addr); code = addConnToHeapCache(pThrd->connHeapCache, pConn); @@ -2197,9 +2214,12 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { // do nothing, notifyCb return; } else { + code = cliSendReq(pConn, pReq); } tTrace("%s conn %p ready", pInst->label, pConn); + return; + _exception: resp.code = code; (void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp);