diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a5cc492b2b..66fedf07ac 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1315,6 +1315,12 @@ void cliSend(SCliConn* pConn) { uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); + if (req == NULL) { + tGError("%s conn %p failed to send msg:%s, errmsg:%s", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pMsg->msgType), + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + cliHandleExcept(pConn, -1); + return; + } int status = uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); if (status != 0) { @@ -2224,13 +2230,13 @@ static void* cliWorkThread(void* arg) { SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - + tsEnableRandErr = true; (void)strtolower(threadName, pThrd->pTransInst->label); setThreadName(threadName); (void)uv_run(pThrd->loop, UV_RUN_DEFAULT); - tDebug("thread quit-thread:%08" PRId64, pThrd->pid); + tDebug("thread quit-thread:%08 " PRId64, pThrd->pid); return NULL; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index aea8282d44..85d7470871 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -414,6 +414,9 @@ void transReqQueueInit(queue* q) { } void* transReqQueuePush(queue* q) { STransReq* req = taosMemoryCalloc(1, sizeof(STransReq)); + if (req == NULL) { + return NULL; + } req->wreq.data = req; QUEUE_PUSH(q, &req->q); return &req->wreq; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 84938126c3..e03d5ddc34 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -696,6 +696,14 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrMsg* smsg) { transRefSrvHandle(pConn); uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); + if (req == NULL) { + if (!uv_is_closing((uv_handle_t*)(pConn->pTcp))) { + tError("conn %p failed to write data, reason:%s", pConn, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + pConn->broken = true; + transUnrefSrvHandle(pConn); + return; + } + } (void)uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSvrMsg* smsg) { @@ -1174,6 +1182,7 @@ static int32_t addHandleToAcceptloop(void* arg) { void* transWorkerThread(void* arg) { setThreadName("trans-svr-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; + tsEnableRandErr = true; (void)uv_run(pThrd->loop, UV_RUN_DEFAULT); return NULL;