From 2982a8ae511889e9330466f5c93952c2712bcb90 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 22 Aug 2024 19:32:20 +0800 Subject: [PATCH] fix random_err crash --- source/libs/transport/src/transCli.c | 106 +++++++++++++++++---------- source/libs/transport/src/transSvr.c | 21 +++--- 2 files changed, 79 insertions(+), 48 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1a3afc03f9..6d6336425d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -696,6 +696,11 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { plist = taosHashGet(pool, key, klen); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + if (nList == NULL) { + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } QUEUE_INIT(&nList->msgQ); nList->numOfConn++; @@ -817,7 +822,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); - if (conn->list == NULL) { + if (conn->list == NULL && conn->dstAddr != NULL) { conn->list = taosHashGet((SHashObj*)pool, conn->dstAddr, strlen(conn->dstAddr)); } @@ -962,6 +967,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { int32_t code = 0; + int8_t registed = 0; SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); if (conn == NULL) { @@ -981,8 +987,24 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { code = TSDB_CODE_THIRDPARTY_ERROR; TAOS_CHECK_GOTO(code, NULL, _failed); } + + registed = 1; conn->stream->data = conn; + conn->connReq.data = conn; + + transReqQueueInit(&conn->wreqQueue); + QUEUE_INIT(&conn->q); + conn->hostThrd = pThrd; + conn->status = ConnNormal; + conn->broken = false; + + TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed); + + TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); + + transRefCliHandle(conn); + uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); @@ -996,18 +1018,6 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { timer->data = conn; conn->timer = timer; - conn->connReq.data = conn; - transReqQueueInit(&conn->wreqQueue); - - TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed); - - TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); - - QUEUE_INIT(&conn->q); - conn->hostThrd = pThrd; - conn->status = ConnNormal; - conn->broken = false; - transRefCliHandle(conn); (void)atomic_add_fetch_32(&pThrd->connCount, 1); @@ -1016,12 +1026,16 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { *pCliConn = conn; return code; _failed: - if (conn) { - taosMemoryFree(conn->stream); - (void)transDestroyBuffer(&conn->readBuf); - transQueueDestroy(&conn->cliMsgs); + if (registed == 1) { + uv_close((uv_handle_t*)conn->stream, cliDestroy); + } else { + if (conn) { + taosMemoryFree(conn->stream); + (void)transDestroyBuffer(&conn->readBuf); + transQueueDestroy(&conn->cliMsgs); + } + taosMemoryFree(conn); } - taosMemoryFree(conn); return code; } static void cliDestroyConn(SCliConn* conn, bool clear) { @@ -1032,7 +1046,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { QUEUE_INIT(&conn->q); conn->broken = true; - if (conn->list == NULL) { + if (conn->list == NULL && conn->dstAddr) { conn->list = taosHashGet((SHashObj*)pThrd->pool, conn->dstAddr, strlen(conn->dstAddr)); } @@ -1287,19 +1301,19 @@ void cliSend(SCliConn* pConn) { STraceId* trace = &pMsg->info.traceId; - if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) { - uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; - if (timer == NULL) { - timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); - tDebug("no available timer, create a timer %p", timer); - (void)uv_timer_init(pThrd->loop, timer); - } - timer->data = pConn; - pConn->timer = timer; + // if (pTransInst->startTimer != NULL && pTransInst->startTimer(0, pMsg->msgType)) { + // uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : + // NULL; if (timer == NULL) { + // timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + // tDebug("no available timer, create a timer %p", timer); + // (void)uv_timer_init(pThrd->loop, timer); + // } + // timer->data = pConn; + // pConn->timer = timer; - tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType)); - (void)uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); - } + // tGTrace("%s conn %p start timer for msg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType)); + // (void)uv_timer_start((uv_timer_t*)pConn->timer, cliReadTimeoutCb, TRANS_READ_TIMEOUT, 0); + // } if (pHead->comp == 0 && pMsg->info.compressed == 0 && pConn->clientIp != pConn->serverIp) { if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { @@ -1385,7 +1399,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { if (conn->dstAddr == NULL) { tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - cliHandleFastFail(conn, -1); + uv_close((uv_handle_t*)conn->stream, cliDestroy); return; } @@ -1727,8 +1741,6 @@ FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { if (pMsg == NULL) return -1; - // memset(pResp, 0, sizeof(STransMsg)); - if (pResp->code == 0) { pResp->code = TSDB_CODE_RPC_BROKEN_LINK; } @@ -1869,6 +1881,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { (void)transQueuePush(&conn->cliMsgs, pMsg); conn->dstAddr = taosStrdup(addr); + if (conn->dstAddr == NULL) { + tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + cliHandleFastFail(conn, -1); + return; + } uint32_t ipaddr; int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); @@ -2564,16 +2582,25 @@ static void cliSchedMsgToDebug(SCliMsg* pMsg, char* label) { return; } -static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { +static int32_t cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; cliSchedMsgToDebug(pMsg, transLabel(pThrd->pTransInst)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + if (arg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + arg->param1 = pMsg; arg->param2 = pThrd; - (void)transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); + SDelayTask* pTask = transDQSched(pThrd->delayQueue, doDelayTask, arg, pCtx->retryNextInterval); + if (pTask == NULL) { + taosMemoryFree(arg); + return TSDB_CODE_OUT_OF_MEMORY; + } + return 0; } FORCE_INLINE bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { @@ -2748,7 +2775,12 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } pMsg->sent = 0; - cliSchedMsgToNextNode(pMsg, pThrd); + code = cliSchedMsgToNextNode(pMsg, pThrd); + if (code != 0) { + pResp->code = code; + tError("failed to sched msg to next node, reason:%s", tstrerror(code)); + return false; + } return true; } int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 88cfd547b6..d4c98591d7 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -1182,22 +1182,22 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { int32_t code = 0; SWorkThrd* pThrd = hThrd; STrans* pTransInst = pThrd->pTransInst; + int32_t lino; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); if (pConn == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); } transReqQueueInit(&pConn->wreqQueue); QUEUE_INIT(&pConn->queue); - QUEUE_PUSH(&pThrd->conn, &pConn->queue); if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) { - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(code, &lino, _end); } if ((code = transInitBuffer(&pConn->readBuf)) != 0) { - TAOS_CHECK_GOTO(code, NULL, _end); + TAOS_CHECK_GOTO(code, &lino, _end); } memset(&pConn->regArg, 0, sizeof(pConn->regArg)); @@ -1206,14 +1206,14 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); if (exh == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); } exh->handle = pConn; exh->pThrd = pThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); if (exh->refId < 0) { - TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, &lino, _end); } QUEUE_INIT(&exh->q); @@ -1233,15 +1233,16 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { // init client handle pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); if (pConn->pTcp == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _end); } code = uv_tcp_init(pThrd->loop, pConn->pTcp); if (code != 0) { tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code)); - TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, &lino, _end); } pConn->pTcp->data = pConn; + QUEUE_PUSH(&pThrd->conn, &pConn->queue); return pConn; _end: @@ -1252,7 +1253,7 @@ _end: taosMemoryFree(pConn); pConn = NULL; } - tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), tstrerror(code)); + tError("%s failed to create conn since %s, lino:%d" PRId64, transLabel(pTransInst), tstrerror(code), lino); return NULL; } @@ -1895,5 +1896,3 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { } return code; } - -int32_t transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }