diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index b811d192bd..9ab2d918b8 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -476,6 +476,8 @@ int32_t subnetDebugInfoToBuf(SubnetUtils* pUtils, char* buf); int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf); int32_t transUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf); +enum { REQ_STATUS_INIT = 0, REQ_STATUS_PROCESSING }; + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4c0a5ec5f3..11791fac00 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -90,6 +90,8 @@ typedef struct SCliConn { int64_t refId; int32_t seq; int32_t shareCnt; + + int8_t registered; } SCliConn; typedef struct SCliReq { @@ -195,11 +197,11 @@ static int32_t allocConnRef(SCliConn* conn, bool update); static int cliNotifyCb(SCliConn* pConn, SCliReq* pReq, STransMsg* pResp); void cliResetConnTimer(SCliConn* conn); -static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void cliDestroy(uv_handle_t* handle); -static void cliSend(SCliConn* pConn); -static void cliSendBatch(SCliConn* pConn); -static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); +static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void cliDestroy(uv_handle_t* handle); +static int32_t cliSend(SCliConn* pConn); +static void cliSendBatch(SCliConn* pConn); +static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void doFreeTimeoutMsg(void* param); static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliReq** pReq); @@ -323,6 +325,17 @@ int32_t transHeapDelete(SHeap* heap, SCliConn* p); } \ } while (0) +static int32_t cliConnFindToSendMsg(SCliConn* pConn, SCliReq** pReq) { + int32_t code = 0; + for (int32_t i = 0; i < transQueueSize(&pConn->reqs); i++) { + SCliReq* p = transQueueGet(&pConn->reqs, i); + if (p->sent == 0) { + *pReq = p; + return 0; + } + } + return TSDB_CODE_OUT_OF_RANGE; +} #define CONN_SET_PERSIST_BY_APP(conn) \ do { \ if (conn->status == ConnNormal) { \ @@ -376,7 +389,7 @@ bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->reqs)) { SCliReq* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); - cliSend(conn); + (void)cliSend(conn); return true; } return false; @@ -403,7 +416,7 @@ bool cliConnSendSeqMsg(int64_t refId, SCliConn* conn) { (void)transQueuePush(&conn->reqs, t); tDebug("pop from conn %p, refId: %" PRId64 "", conn, refId); (void)transReleaseExHandle(transGetRefMgt(), refId); - cliSend(conn); + (void)cliSend(conn); return true; } taosWUnLockLatch(&exh->latch); @@ -1049,7 +1062,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { (void)transQueuePush(&conn->reqs, pReq); conn->status = ConnNormal; - cliSend(conn); + (void)cliSend(conn); return; } @@ -1174,6 +1187,24 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } } +static int32_t cliAddReqToConn(SCliConn* conn, SCliReq* pReq) { + if (transQueuePush(&conn->reqs, pReq) != 0) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return 0; +} + +static int32_t cliRmReqFromConn(SCliConn* conn, SCliReq** pReq) { + // do nothing + SCliReq* pTail = transQueuePop(&conn->reqs); + if (pTail == NULL) { + return TSDB_CODE_INVALID_PARA; + } + if (pReq != NULL) { + *pReq = pTail; + } + return 0; +} static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) { int32_t code = 0; SCliConn* pConn = NULL; @@ -1257,49 +1288,13 @@ _failed: taosMemoryFree(conn->stream); (void)transDestroyBuffer(&conn->readBuf); transQueueDestroy(&conn->reqs); + taosMemoryFree(conn->dstAddr); } tError("failed to create conn, code:%d", code); taosMemoryFree(conn); return code; } -static void cliDestroyConn(SCliConn* conn, bool clear) { - SCliThrd* pThrd = conn->hostThrd; - tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - conn->broken = true; - QUEUE_REMOVE(&conn->q); - QUEUE_INIT(&conn->q); - - conn->broken = true; - if (conn->list == NULL && conn->dstAddr) { - conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr)); - } - - if (conn->list) { - SConnList* list = conn->list; - list->list->numOfConn--; - if (conn->status == ConnInPool) { - list->size--; - } - } - conn->list = NULL; - - (void)transReleaseExHandle(transGetRefMgt(), conn->refId); - (void)transRemoveExHandle(transGetRefMgt(), conn->refId); - conn->refId = -1; - - if (conn->task != NULL) { - transDQCancel(pThrd->timeoutQueue, conn->task); - conn->task = NULL; - } - cliResetConnTimer(conn); - - if (clear) { - if (!uv_is_closing((uv_handle_t*)conn->stream)) { - (void)uv_read_stop(conn->stream); - uv_close((uv_handle_t*)conn->stream, cliDestroy); - } - } -} +static void cliDestroyConn(SCliConn* conn, bool clear) {} static void cliDestroy(uv_handle_t* handle) { if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { return; @@ -1581,25 +1576,34 @@ _exception: pConn->pBatch = NULL; return; } -void cliSend(SCliConn* pConn) { + +// int32_t cliSend2(SCliConn* pConn) {} +int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { + int32_t code = 0; + transQueuePush(&pConn->reqs, pCliMsg); + code = cliSend(pConn); + return code; +} + +int32_t cliSend(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pInst = pThrd->pInst; + SCliReq* pCliReq = NULL; + int32_t code = cliConnFindToSendMsg(pConn, &pCliReq); - if (transQueueEmpty(&pConn->reqs)) { - tError("%s conn %p not msg to send", pInst->label, pConn); - cliHandleExcept(pConn, -1); - return; + if (code != 0) { + return code; } - SCliReq* pCliMsg = NULL; - CONN_GET_NEXT_SENDMSG(pConn); - pCliMsg->sent = 1; + SReqCtx* pCtx = pCliReq->ctx; - SReqCtx* pCtx = pCliMsg->ctx; - - STransMsg* pReq = (STransMsg*)(&pCliMsg->msg); + STransMsg* pReq = (STransMsg*)(&pCliReq->msg); if (pReq->pCont == 0) { pReq->pCont = (void*)rpcMallocCont(0); + if (pReq->pCont == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + tDebug("malloc memory: %p", pReq->pCont); pReq->contLen = 0; } @@ -1613,7 +1617,7 @@ void cliSend(SCliConn* pConn) { pHead->persist = REQUEST_PERSIS_HANDLE(pReq) ? 1 : 0; pHead->msgType = pReq->msgType; pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; + pHead->release = REQUEST_RELEASE_HANDLE(pCliReq) ? 1 : 0; memcpy(pHead->user, pInst->user, strlen(pInst->user)); pHead->traceId = pReq->info.traceId; pHead->magicNum = htonl(TRANS_MAGIC_NUM); @@ -1646,18 +1650,19 @@ void cliSend(SCliConn* pConn) { tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), tstrerror(TSDB_CODE_OUT_OF_MEMORY)); cliHandleExcept(pConn, -1); - return; + return TSDB_CODE_OUT_OF_MEMORY; } + pCliReq->sent = 1; int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); if (status != 0) { tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), uv_err_name(status)); cliHandleExcept(pConn, -1); + return TSDB_CODE_THIRDPARTY_ERROR; } - return; -_RETURN: - return; + + return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } static void cliDestroyBatch(SCliBatch* pBatch) { @@ -1681,7 +1686,7 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { uint32_t ipaddr; int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip, &ipaddr); if (code != 0) { - TAOS_CHECK_GOTO(code, &lino, _exception); + TAOS_CHECK_GOTO(code, &lino, _exception1); } struct sockaddr_in addr; @@ -1693,39 +1698,50 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn, char* ip, int port) { int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 10); if (fd < 0) { - TAOS_CHECK_GOTO(terrno, &lino, _exception); + TAOS_CHECK_GOTO(terrno, &lino, _exception1); } int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { tError("%s conn %p failed to set stream, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1); } ret = transSetConnOption((uv_tcp_t*)conn->stream, 20); if (ret != 0) { tError("%s conn %p failed to set socket opt, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1); return code; } ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tError("failed connect to %s, reason:%s", conn->dstAddr, uv_err_name(ret)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception1); } ret = uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); if (ret != 0) { tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - cliResetConnTimer(conn); - cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); - cliHandleFastFail(conn, -1); - return TSDB_CODE_RPC_ASYNC_IN_PROCESS; + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _exception2); } + conn->registered = 1; return TSDB_CODE_RPC_ASYNC_IN_PROCESS; -_exception: - tError("%s conn %p failed to start timer, reason:%s", transLabel(pInst), conn, uv_err_name(ret)); - taosMemoryFree(conn); // free conn later + +_exception1: + tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code)); + // taosMemoryFree(conn); // free conn later + return code; + +_exception2: + // already registered to uv, callback handle error + tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, uv_err_name(code)); + // cliRmReqFromConn(conn, NULL); + + // cliResetConnTimer(conn); + // cliMayUpdateFqdnCache(pThrd->fqdn2ipCache, conn->dstAddr); + // cliHandleFastFail(conn, code); + + // // taosMemoryFree(conn); return code; } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { @@ -1895,7 +1911,7 @@ void cliConnCb(uv_connect_t* req, int status) { return cliSendBatch_shareConn(pConn); } - return cliSend(pConn); + (void)cliSend(pConn); } static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { @@ -1957,7 +1973,7 @@ static void cliHandleRelease(SCliThrd* pThrd, SCliReq* pReq) { if (!transQueuePush(&conn->reqs, pReq)) { return; } - cliSend(conn); + (void)cliSend(conn); } else { tError("%s conn %p already released", CONN_GET_INST_LABEL(conn), conn); destroyReq(pReq); @@ -2220,12 +2236,15 @@ void cliHandleReq__noShareConn(SCliThrd* pThrd, SCliReq* pReq) { if (code == TSDB_CODE_RPC_MAX_SESSIONS) { TAOS_CHECK_GOTO(code, &lino, _exception); } else if (code == TSDB_CODE_RPC_ASYNC_IN_PROCESS) { - // do nothing, notifyCb + // do nothing, notiy return; } else { + code = cliSendReq(pConn, pReq); } tTrace("%s conn %p ready", pInst->label, pConn); + return; + _exception: resp.code = code; (void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp); @@ -3538,6 +3557,7 @@ _RETURN1: pReq->pCont = NULL; return code; } + int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) { int32_t code = 0; tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t)); @@ -3574,6 +3594,7 @@ _EXIT: taosMemoryFree(pSyncMsg); return code; } + int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { int32_t code = 0;