From c2cccbcf40f35cd21e3a4f99894d05ea118762c3 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 28 Aug 2024 01:01:59 +0000 Subject: [PATCH] refactor transport --- source/libs/transport/src/transCli.c | 292 +++++++++++++++------------ 1 file changed, 162 insertions(+), 130 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9a0f5fa450..4c0a5ec5f3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -162,7 +162,7 @@ static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn); -static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn); +static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int port); static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port); // register conn timer @@ -195,12 +195,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update); static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); void cliResetConnTimer(SCliConn* conn); -static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); -static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void cliDestroy(uv_handle_t* handle); -static void cliSend(SCliConn* pConn); -static void cliSendBatch(SCliConn* pConn); -static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); +static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void cliDestroy(uv_handle_t* handle); +static void cliSend(SCliConn* pConn); +static void cliSendBatch(SCliConn* pConn); +static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void doFreeTimeoutMsg(void* param); static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq); @@ -413,6 +412,20 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { return false; } +int32_t cliGetTimerFrom(SCliThrd* pThrd, SCliConn* pConn) { + uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; + if (timer == NULL) { + timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tDebug("no available timer, create a timer %p", timer); + (void)uv_timer_init(pThrd->loop, timer); + } + timer->data = pConn; + pConn->timer = timer; + return 0; +} void cliResetConnTimer(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; if (conn->timer) { @@ -837,7 +850,6 @@ static int32_t cliGetConnFromPool(SCliThrd* pThrd, const char* key, SCliConn** p int32_t code = 0; void* pool = pThrd->pool; STrans* pInst = pThrd->pInst; - // size_t klen = strlen(key); SConnList* plist = NULL; code = getOrCreateMsgList(pThrd, key, &plist); @@ -1163,31 +1175,35 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { + int32_t code = 0; SCliConn* pConn = NULL; + char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); + int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); - int32_t code = cliCreateConn(pThrd, &pConn); - if (code != 0) { - return code; - } - - char addr[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); - - pConn->dstAddr = taosStrdup(addr); - code = addConnToHeapCache(pThrd->connHeapCache, pConn); + TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); + TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception); transQueuePush(&pConn->reqs, pReq); - return cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + + return cliDoConn(pThrd, pConn, ip, port); +_exception: + // free conn + return code; } -static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { +static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int32_t port) { int32_t code = 0; + int32_t lino = 0; SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); if (conn == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _failed); } + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, ip, port); + conn->dstAddr = taosStrdup(addr); + transReqQueueInit(&conn->wreqQueue); QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; @@ -1215,6 +1231,8 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed); + TAOS_CHECK_GOTO(cliGetTimerFrom(pThrd, conn), &lino, _failed); + // read/write stream handle conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); if (conn->stream == NULL) { @@ -1240,6 +1258,7 @@ _failed: (void)transDestroyBuffer(&conn->readBuf); transQueueDestroy(&conn->reqs); } + tError("failed to create conn, code:%d", code); taosMemoryFree(conn); return code; } @@ -1412,7 +1431,6 @@ void cliSendBatch_shareConn(SCliConn* pConn) { if (size == 0) { tError("%s conn %p not msg to send", pInst->label, pConn); ASSERT(0); - // cliHandleExcept(pConn); return; } uv_buf_t* wb = taosMemoryCalloc(size, sizeof(uv_buf_t)); @@ -1657,8 +1675,9 @@ static void cliDestroyBatch(SCliBatch* pBatch) { } static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { - int32_t lino = 0; - STrans* pInst = pThrd->pInst; + int32_t lino = 0; + STrans* pInst = pThrd->pInst; + uint32_t ipaddr; int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr); if (code != 0) { @@ -1688,25 +1707,11 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); return code; } - uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; - if (timer == NULL) { - timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); - if (timer == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exception); - } - - tDebug("no available timer, create a timer %p", timer); - (void)uv_timer_init(pThrd->loop, timer); - } - timer->data = conn; - conn->timer = timer; ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { - // cliResetConnTimer(conn); - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - cliHandleFastFail(conn, -1); - return TSDB_CODE_RPC_ASYNC_IN_PROCESS; + tError("failed connect to %s, reason:%s", conn->dstAddr, uv_err_name(ret)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); } ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); @@ -1747,7 +1752,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { return; } if (conn == NULL) { - code = cliCreateConn(pThrd, &conn); + code = cliCreateConn(pThrd, &conn, pList->ip, pList->port); if (code != 0) { tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label, pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(code)); @@ -1757,7 +1762,13 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { conn->pBatch = pBatch; conn->dstAddr = taosStrdup(pList->dst); - (void)cliDoConn(pThrd, conn, pList->ip, pList->port); + if (conn->dstAddr == NULL) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pInst->label, + pBatch->wLen, pBatch->batchSize, pInst->connLimitNum, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + cliDestroyBatch(pBatch); + return; + } + code = cliDoConn(pThrd, conn, pList->ip, pList->port); } conn->pBatch = pBatch; @@ -1826,6 +1837,25 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { cliHandleExcept(pConn, status); } +int32_t cliConnSetSockInfo(SCliConn* pConn) { + struct sockaddr peername, sockname; + int addrlen = sizeof(peername); + + (void)uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); + (void)transSockInfo2Str(&peername, pConn->dst); + + addrlen = sizeof(sockname); + (void)uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen); + (void)transSockInfo2Str(&sockname, pConn->src); + + struct sockaddr_in addr = *(struct sockaddr_in*)&sockname; + struct sockaddr_in saddr = *(struct sockaddr_in*)&peername; + + pConn->clientIp = addr.sin_addr.s_addr; + pConn->serverIp = saddr.sin_addr.s_addr; + + return 0; +}; void cliConnCb(uv_connect_t* req, int status) { SCliConn* pConn = req->data; SCliThrd* pThrd = pConn->hostThrd; @@ -1840,29 +1870,22 @@ void cliConnCb(uv_connect_t* req, int status) { STUB_RAND_NETWORK_ERR(status); if (status != 0) { - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); - if (timeout == false) { - cliHandleFastFail(pConn, status); - } else if (timeout == true) { - // already deal by timeout - } - return; + tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr, + uv_strerror(status)); + // handle err + // 1. update statis + // 2. notifyCb or retry + // 3. clear conn and + // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, pConn->dstAddr); + // if (timeout == false) { + // cliHandleFastFail(pConn, status); + // } else if (timeout == true) { + // // already deal by timeout + // } + // return; } - struct sockaddr peername, sockname; - int addrlen = sizeof(peername); - (void)uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); - (void)transSockInfo2Str(&peername, pConn->dst); - - addrlen = sizeof(sockname); - (void)uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen); - (void)transSockInfo2Str(&sockname, pConn->src); - - struct sockaddr_in addr = *(struct sockaddr_in*)&sockname; - struct sockaddr_in saddr = *(struct sockaddr_in*)&peername; - - pConn->clientIp = addr.sin_addr.s_addr; - pConn->serverIp = saddr.sin_addr.s_addr; + cliConnSetSockInfo(pConn); tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); if (pConn->pBatch != NULL) { @@ -1906,6 +1929,7 @@ static void cliHandleQuit(SCliThrd* pThrd, SCliReq* pReq) { } pThrd->stopMsg = NULL; pThrd->quit = true; + tDebug("cli work thread %p start to quit", pThrd); destroyReq(pReq); @@ -2147,8 +2171,10 @@ void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) { STraceId* trace = &pReq->msg.info.traceId; STrans* pInst = pThrd->pInst; - char addr[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(addr, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + char addr[TSDB_FQDN_LEN + 64] = {0}; + char* ip = EPSET_GET_INUSE_IP(&pReq->ctx->epSet); + int32_t port = EPSET_GET_INUSE_PORT(&pReq->ctx->epSet); + CONN_CONSTRUCT_HASH_KEY(addr, ip, port); SCliConn* pConn = getConnFromHeapCache(pThrd->connHeapCache, addr); if (pConn == NULL) { @@ -2167,13 +2193,13 @@ void cliHandleReq__shareConn(SCliThrd* pThrd, SCliReq* pReq) { return; } - code = cliCreateConn(pThrd, &pConn); - pConn->dstAddr = taosStrdup(addr); - code = addConnToHeapCache(pThrd->connHeapCache, pConn); + TAOS_CHECK_GOTO(cliCreateConn(pThrd, &pConn, ip, port), NULL, _exception); + + TAOS_CHECK_GOTO(addConnToHeapCache(pThrd->connHeapCache, pConn), NULL, _exception); transQueuePush(&pConn->reqs, pReq); - cliDoConn(pThrd, pConn, EPSET_GET_INUSE_IP(&pReq->ctx->epSet), EPSET_GET_INUSE_PORT(&pReq->ctx->epSet)); + code = cliDoConn(pThrd, pConn, ip, port); _exception: resp.code = code; @@ -2248,9 +2274,68 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); return batch; } +static int32_t createBatch(SCliBatch** ppBatch, SCliBatchList* pList, SCliReq* pReq); + +static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port); + +static void destroyBatchList(SCliBatchList* pList); static void cliBuildBatch(SCliReq* pReq, queue* h, SCliThrd* pThrd) { + int32_t code = 0; STrans* pInst = pThrd->pInst; SReqCtx* pCtx = pReq->ctx; + + char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, ip, port); + size_t klen = strlen(key); + SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); + if (ppBatchList == NULL || *ppBatchList == NULL) { + SCliBatchList* pBatchList = NULL; + code = createBatchList(&pBatchList, key, ip, port); + if (code != 0) { + destroyReq(pReq); + return; + } + + pBatchList->batchLenLimit = pInst->batchSize; + + SCliBatch* pBatch = NULL; + code = createBatch(&pBatch, pBatchList, pReq); + if (code != 0) { + destroyBatchList(pBatchList); + destroyReq(pReq); + return; + } + + code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); + if (code != 0) { + destroyBatchList(pBatchList); + } + } else { + if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { + SCliBatch* pBatch = NULL; + code = createBatch(&pBatch, *ppBatchList, pReq); + if (code != 0) { + destroyReq(pReq); + cliDestroyBatch(pBatch); + } + } else { + queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); + SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); + if ((pBatch->batchSize + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) { + QUEUE_PUSH(&pBatch->wq, h); + pBatch->batchSize += pReq->msg.contLen; + pBatch->wLen += 1; + } else { + SCliBatch* tBatch = NULL; + code = createBatch(&tBatch, *ppBatchList, pReq); + if (code != 0) { + destroyReq(pReq); + } + } + } + } return; } static int32_t createBatchList(SCliBatchList** ppBatchList, char* key, char* ip, uint32_t port) { @@ -2329,64 +2414,6 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { cliBuildBatch(pReq, h, pThrd); continue; } - - if (pReq->type == Normal && REQUEST_NO_RESP(&pReq->msg)) { - SReqCtx* pCtx = pReq->ctx; - - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - size_t klen = strlen(key); - SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, klen); - if (ppBatchList == NULL || *ppBatchList == NULL) { - SCliBatchList* pBatchList = NULL; - code = createBatchList(&pBatchList, key, ip, port); - if (code != 0) { - destroyReq(pReq); - continue; - } - - pBatchList->batchLenLimit = pInst->batchSize; - - SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, pBatchList, pReq); - if (code != 0) { - destroyBatchList(pBatchList); - destroyReq(pReq); - continue; - } - - code = taosHashPut(pThrd->batchCache, key, klen, &pBatchList, sizeof(void*)); - if (code != 0) { - destroyBatchList(pBatchList); - } - } else { - if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { - SCliBatch* pBatch = NULL; - code = createBatch(&pBatch, *ppBatchList, pReq); - if (code != 0) { - destroyReq(pReq); - cliDestroyBatch(pBatch); - } - } else { - queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); - SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); - if ((pBatch->batchSize + pReq->msg.contLen) < (*ppBatchList)->batchLenLimit) { - QUEUE_PUSH(&pBatch->wq, h); - pBatch->batchSize += pReq->msg.contLen; - pBatch->wLen += 1; - } else { - SCliBatch* tBatch = NULL; - code = createBatch(&tBatch, *ppBatchList, pReq); - if (code != 0) { - destroyReq(pReq); - } - } - } - } - continue; - } (*cliAsyncHandle[pReq->type])(pThrd, pReq); count++; } @@ -2896,6 +2923,10 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { int32_t len = pResp->contLen - tlen; if (len != 0) { buf = rpcMallocCont(len); + if (buf == NULL) { + pResp->code = TSDB_CODE_OUT_OF_MEMORY; + return false; + } // TODO: check buf memcpy(buf, (char*)pResp->pCont + tlen, len); } @@ -2909,6 +2940,7 @@ FORCE_INLINE bool cliTryUpdateEpset(SCliReq* pReq, STransMsg* pResp) { epsetAssign(dst, &epset); return true; } + bool cliResetEpset(SReqCtx* pCtx, STransMsg* pResp, bool hasEpSet) { bool noDelay = true; if (hasEpSet == false) { @@ -3749,7 +3781,7 @@ _exception: return code; } -static int32_t getOrCreateHeapIfNotExist(SHashObj* pConnHeapCache, char* key, SHeap** pHeap) { +static int32_t getOrCreateHeap(SHashObj* pConnHeapCache, char* key, SHeap** pHeap) { int32_t code = 0; size_t klen = strlen(key); @@ -3780,7 +3812,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { int code = 0; SHeap* pHeap = NULL; SCliConn* pConn = NULL; - code = getOrCreateHeapIfNotExist(pConnHeapCache, key, &pHeap); + code = getOrCreateHeap(pConnHeapCache, key, &pHeap); if (code != 0) { tDebug("failed to get conn heap from cache for key:%s", key); return NULL; @@ -3795,7 +3827,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) { SHeap* p = NULL; - int32_t code = getOrCreateHeapIfNotExist(pConnHeapCacahe, pConn->dstAddr, &p); + int32_t code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p); if (code != 0) { return code; }