diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a903f26dc9..8b7311fe3b 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -297,7 +297,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); int32_t transInitBuffer(SConnBuffer* buf); int32_t transClearBuffer(SConnBuffer* buf); -int32_t transDestroyBuffer(SConnBuffer* buf); +void transDestroyBuffer(SConnBuffer* buf); int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); bool transReadComplete(SConnBuffer* connBuf); int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); @@ -327,7 +327,7 @@ int32_t transRegisterMsg(const STransMsg* msg); int32_t transSetDefaultAddr(void* pInit, const char* ip, const char* fqdn); int32_t transSetIpWhiteList(void* pInit, void* arg, FilteFunc* func); -int32_t transSockInfo2Str(struct sockaddr* sockname, char* dst); +void transSockInfo2Str(struct sockaddr* sockname, char* dst); int32_t transAllocHandle(int64_t* refId); @@ -367,7 +367,7 @@ int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(void* arg)); * put arg into queue * if queue'size > 1, return false; else return true */ -int32_t transQueuePush(STransQueue* queue, void* arg); +void transQueuePush(STransQueue* queue, void* arg); /* * the size of queue */ @@ -457,9 +457,9 @@ int32_t transDecompressMsg(char** msg, int32_t* len); int32_t transOpenRefMgt(int size, void (*func)(void*)); void transCloseRefMgt(int32_t refMgt); int64_t transAddExHandle(int32_t refMgt, void* p); -int32_t transRemoveExHandle(int32_t refMgt, int64_t refId); +void transRemoveExHandle(int32_t refMgt, int64_t refId); void* transAcquireExHandle(int32_t refMgt, int64_t refId); -int32_t transReleaseExHandle(int32_t refMgt, int64_t refId); +void transReleaseExHandle(int32_t refMgt, int64_t refId); void transDestroyExHandle(void* handle); int32_t transGetRefMgt(); @@ -510,6 +510,8 @@ void destroyWQ(queue* wq); uv_write_t* allocWReqFromWQ(queue* wq, void* arg); void freeWReqToWQ(queue* wq, SWReqsWrapper* w); + +int32_t transSetReadOption(uv_handle_t* handle); #ifdef __cplusplus } #endif diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 6abca83f4c..a9ca9d47ce 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -131,13 +131,8 @@ void rpcClose(void* arg) { if (arg == NULL) { return; } - if (transRemoveExHandle(transGetInstMgt(), (int64_t)arg) != 0) { - tError("failed to remove rpc handle"); - } - - if (transReleaseExHandle(transGetInstMgt(), (int64_t)arg) != 0) { - tError("failed to release rpc handle"); - } + transRemoveExHandle(transGetInstMgt(), (int64_t)arg); + transReleaseExHandle(transGetInstMgt(), (int64_t)arg); tInfo("end to close rpc"); return; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 23048f4a7e..3cf2f2c563 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -308,11 +308,11 @@ int32_t transHeapGet(SHeap* heap, SCliConn** p); int32_t transHeapInsert(SHeap* heap, SCliConn* p); int32_t transHeapDelete(SHeap* heap, SCliConn* p); -#define CLI_RELEASE_UV(loop) \ - do { \ - uv_walk(loop, cliWalkCb, NULL); \ - (void)uv_run(loop, UV_RUN_DEFAULT); \ - (void)uv_loop_close(loop); \ +#define CLI_RELEASE_UV(loop) \ + do { \ + uv_walk(loop, cliWalkCb, NULL); \ + (TAOS_UNUSED(uv_run(loop, UV_RUN_DEFAULT))); \ + (TAOS_UNUSED(uv_loop_close(loop))); \ } while (0); // snprintf may cause performance problem @@ -352,7 +352,11 @@ int32_t cliGetConnTimer(SCliThrd* pThrd, SCliConn* pConn) { return TSDB_CODE_OUT_OF_MEMORY; } tDebug("no available timer, create a timer %p", timer); - (void)uv_timer_init(pThrd->loop, timer); + int ret = uv_timer_init(pThrd->loop, timer); + if (ret != 0) { + tError("conn %p failed to init timer %p, ret:%d", pConn, timer, uv_err_name(ret)); + return TSDB_CODE_THIRDPARTY_ERROR; + } } timer->data = pConn; pConn->timer = timer; @@ -365,7 +369,10 @@ void cliResetConnTimer(SCliConn* conn) { tDebug("%s conn %p stop timer", CONN_GET_INST_LABEL(conn), conn); TAOS_UNUSED(uv_timer_stop(conn->timer)); } - (void)taosArrayPush(pThrd->timerList, &conn->timer); + if (taosArrayPush(pThrd->timerList, &conn->timer) == NULL) { + tError("%s conn %p failed to push timer %p to list since %s", CONN_GET_INST_LABEL(conn), conn, conn->timer, + tstrerror(terrno)); + } conn->timer->data = NULL; conn->timer = NULL; } @@ -392,19 +399,24 @@ void cliConnMayUpdateTimer(SCliConn* conn, int timeout) { if (cliGetConnTimer(conn->hostThrd, conn) != 0) { return; } - (void)uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0); + int ret = uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0); + if (ret != 0) { + tError("%s conn %p failed to start timer %p, ret:%d", CONN_GET_INST_LABEL(conn), conn, conn->timer, + uv_err_name(ret)); + } } void destroyCliConnQTable(SCliConn* conn) { - void* pIter = taosHashIterate(conn->pQTable, NULL); + int32_t code = 0; + void* pIter = taosHashIterate(conn->pQTable, NULL); while (pIter != NULL) { int64_t* qid = taosHashGetKey(pIter, NULL); STransCtx* ctx = pIter; transCtxCleanup(ctx); pIter = taosHashIterate(conn->pQTable, pIter); - (void)transReleaseExHandle(transGetRefMgt(), *qid); - (void)transRemoveExHandle(transGetRefMgt(), *qid); + transReleaseExHandle(transGetRefMgt(), *qid); + transRemoveExHandle(transGetRefMgt(), *qid); } taosHashCleanup(conn->pQTable); conn->pQTable = NULL; @@ -439,10 +451,15 @@ int32_t cliGetReqBySeq(SCliConn* conn, int64_t seq, int32_t msgType, SCliReq** p } int8_t cliMayRecycleConn(SCliConn* conn) { + int32_t code = 0; SCliThrd* pThrd = conn->hostThrd; if (transQueueSize(&conn->reqsToSend) == 0 && transQueueSize(&conn->reqsSentOut) == 0 && taosHashGetSize(conn->pQTable) == 0) { - (void)delConnFromHeapCache(pThrd->connHeapCache, conn); + code = delConnFromHeapCache(pThrd->connHeapCache, conn); + if (code != 0) { + tError("%s conn %p failed to remove conn from heap cache since %s", CONN_GET_INST_LABEL(conn), conn, + tstrerror(code)); + } addConnToPool(pThrd->pool, conn); return 1; } @@ -508,8 +525,8 @@ int32_t cliHandleState_mayHandleReleaseResp(SCliConn* conn, STransMsgHead* pHead transQueueRemoveByFilter(&conn->reqsSentOut, filterByQid, &qId, &set, -1); transQueueRemoveByFilter(&conn->reqsToSend, filterByQid, &qId, &set, -1); - (void)transReleaseExHandle(transGetRefMgt(), qId); - (void)transRemoveExHandle(transGetRefMgt(), qId); + transReleaseExHandle(transGetRefMgt(), qId); + transRemoveExHandle(transGetRefMgt(), qId); while (!QUEUE_IS_EMPTY(&set)) { queue* el = QUEUE_HEAD(&set); @@ -562,9 +579,12 @@ void cliHandleResp(SCliConn* conn) { int32_t msgLen = transDumpFromBuffer(&conn->readBuf, (char**)&pHead, 0); if (msgLen < 0) { taosMemoryFree(pHead); - tDebug("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); + tWarn("%s conn %p recv invalid packet ", CONN_GET_INST_LABEL(conn), conn); // TODO: notify cb - (void)pThrd->notifyExceptCb(pThrd, NULL, NULL); + code = pThrd->notifyExceptCb(pThrd, NULL, NULL); + if (code != 0) { + tError("%s conn %p failed to notify user since %s", tstrerror(code)); + } return; } @@ -864,16 +884,18 @@ static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_ } } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { + int32_t code = 0; STUB_RAND_NETWORK_ERR(nread); if (handle->data == NULL) { return; } - int32_t fd; - (void)uv_fileno((uv_handle_t*)handle, &fd); - (void)taosSetSockOpt2(fd); SCliConn* conn = handle->data; + code = transSetReadOption((uv_handle_t*)handle); + if (code != 0) { + tWarn("%s conn %p failed to set recv opt since %s", CONN_GET_INST_LABEL(conn), conn, code); + } SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { @@ -882,7 +904,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); if (pBuf->invalid) { conn->broken = true; - (void)transUnrefCliHandle(conn); + TAOS_UNUSED(transUnrefCliHandle(conn)); return; break; } else { @@ -903,7 +925,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), transGetRefCount(conn)); conn->broken = true; - (void)transUnrefCliHandle(conn); + TAOS_UNUSED(transUnrefCliHandle(conn)); } } @@ -917,8 +939,11 @@ static int32_t cliCreateConn2(SCliThrd* pThrd, SCliReq* pReq, SCliConn** ppConn) code = cliHandleState_mayUpdateState(pConn, pReq); - (void)addConnToHeapCache(pThrd->connHeapCache, pConn); - (void)transQueuePush(&pConn->reqsToSend, &pReq->q); + code = addConnToHeapCache(pThrd->connHeapCache, pConn); + if (code != 0) { + TAOS_CHECK_GOTO(code, NULL, _exception); + } + transQueuePush(&pConn->reqsToSend, &pReq->q); return cliDoConn(pThrd, pConn); _exception: // free conn @@ -983,8 +1008,12 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int conn->bufSize = BUFFER_LIMIT; conn->buf = (uv_buf_t*)taosMemoryCalloc(1, BUFFER_LIMIT * sizeof(uv_buf_t)); + if (conn->buf == NULL) { + TAOS_CHECK_GOTO(terrno, NULL, _failed); + } + + TAOS_CHECK_GOTO(initWQ(&conn->wq), NULL, _failed); - (void)initWQ(&conn->wq); conn->stream->data = conn; conn->connReq.data = conn; @@ -993,11 +1022,11 @@ static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn, char* ip, int return code; _failed: if (conn) { + taosMemoryFree(conn->buf); taosMemoryFree(conn->stream); - destroyCliConnQTable(conn); taosHashCleanup(conn->pQTable); - (void)transDestroyBuffer(&conn->readBuf); + transDestroyBuffer(&conn->readBuf); transQueueDestroy(&conn->reqsToSend); transQueueDestroy(&conn->reqsSentOut); taosMemoryFree(conn->dstAddr); @@ -1008,6 +1037,7 @@ _failed: } static void cliDestroyConn(SCliConn* conn, bool clear) { cliHandleException(conn); } static void cliDestroy(uv_handle_t* handle) { + int32_t code = 0; if (uv_handle_get_type(handle) != UV_TCP || handle->data == NULL) { return; } @@ -1015,9 +1045,15 @@ static void cliDestroy(uv_handle_t* handle) { SCliThrd* pThrd = conn->hostThrd; cliResetConnTimer(conn); - (void)destroyAllReqs(conn); + code = destroyAllReqs(conn); + if (code != 0) { + tDebug("%s conn %p failed to all reqs since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); + } - (void)delConnFromHeapCache(pThrd->connHeapCache, conn); + code = delConnFromHeapCache(pThrd->connHeapCache, conn); + if (code != 0) { + tDebug("%s conn %p failed to del conn from heapcach since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); + } taosMemoryFree(conn->dstAddr); taosMemoryFree(conn->stream); taosMemoryFree(conn->ipStr); @@ -1025,7 +1061,10 @@ static void cliDestroy(uv_handle_t* handle) { void* pIter = taosHashIterate(conn->pQTable, NULL); while (pIter) { int64_t* qid = taosHashGetKey(pIter, NULL); - (void)taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid)); + code = taosHashRemove(pThrd->pIdConnTable, qid, sizeof(*qid)); + if (code != 0) { + tDebug("%s conn %p failed to remove state %" PRId64 " since %s", CONN_GET_INST_LABEL(conn), conn, *qid, code); + } pIter = taosHashIterate(conn->pQTable, pIter); tDebug("%s conn %p destroy state %" PRId64 "", CONN_GET_INST_LABEL(conn), conn, *qid); } @@ -1039,7 +1078,7 @@ static void cliDestroy(uv_handle_t* handle) { taosMemoryFree(conn->buf); destroyWQ(&conn->wq); - (void)transDestroyBuffer(&conn->readBuf); + transDestroyBuffer(&conn->readBuf); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); @@ -1100,7 +1139,10 @@ static void cliHandleException(SCliConn* conn) { STrans* pInst = pThrd->pInst; cliResetConnTimer(conn); - (void)destroyAllReqs(conn); + code = destroyAllReqs(conn); + if (code != 0) { + tError("%s conn %p failed to destroy all reqs on conn since %s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); + } QUEUE_REMOVE(&conn->q); if (conn->registered) { @@ -1134,6 +1176,7 @@ static void cliConnRmReqs(SCliConn* conn) { } static void cliBatchSendCb(uv_write_t* req, int status) { + int32_t code = 0; SWReqsWrapper* wrapper = (SWReqsWrapper*)req->data; SCliConn* conn = wrapper->arg; @@ -1148,18 +1191,27 @@ static void cliBatchSendCb(uv_write_t* req, int status) { cliConnRmReqs(conn); if (status != 0) { tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(status)); - (void)transUnrefCliHandle(conn); + TAOS_UNUSED(transUnrefCliHandle(conn)); return; } cliConnMayUpdateTimer(conn, READ_TIMEOUT); if (conn->readerStart == 0) { - (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); + code = uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); + if (code != 0) { + tDebug("%s conn %p failed to start read since%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); + TAOS_UNUSED(transUnrefCliHandle(conn)); + return; + } conn->readerStart = 1; } if (!cliMayRecycleConn(conn)) { - (void)cliBatchSend(conn); + code = cliBatchSend(conn); + if (code != 0) { + tDebug("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(conn), conn, tstrerror(code)); + TAOS_UNUSED(transUnrefCliHandle(conn)); + } } } bool cliConnMayAddUserInfo(SCliConn* pConn, STransMsgHead** ppHead, int32_t* msgLen) { @@ -1265,7 +1317,7 @@ int32_t cliBatchSend(SCliConn* pConn) { STraceId* trace = &pCliMsg->msg.info.traceId; tGDebug("%s conn %p %s is sent to %s, local info:%s, seq:%" PRId64 ", qid:%" PRId64 "", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pReq->msgType), pConn->dst, pConn->src, pConn->seq, pReq->info.qId); - (void)transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); + transQueuePush(&pConn->reqsSentOut, &pCliMsg->q); } transRefCliHandle(pConn); uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn); @@ -1274,14 +1326,14 @@ int32_t cliBatchSend(SCliConn* pConn) { if (ret != 0) { tError("%s conn %p failed to send msg, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_err_name(ret)); freeWReqToWQ(&pConn->wq, req->data); - (void)transUnrefCliHandle(pConn); + TAOS_UNUSED(transUnrefCliHandle(pConn)); } return 0; } int32_t cliSendReq(SCliConn* pConn, SCliReq* pCliMsg) { int32_t code = 0; - (void)transQueuePush(&pConn->reqsToSend, &pCliMsg->q); + transQueuePush(&pConn->reqsToSend, &pCliMsg->q); code = cliBatchSend(pConn); return code; @@ -1357,7 +1409,7 @@ _exception1: return code; _exception2: - (void)transUnrefCliHandle(conn); + TAOS_UNUSED(transUnrefCliHandle(conn)); tError("%s conn %p failed to do connect, reason:%s", transLabel(pInst), conn, tstrerror(code)); return code; } @@ -1366,12 +1418,22 @@ int32_t cliConnSetSockInfo(SCliConn* pConn) { struct sockaddr peername, sockname; int addrlen = sizeof(peername); - (void)uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); - (void)transSockInfo2Str(&peername, pConn->dst); + int32_t code = uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); + if (code != 0) { + tWarn("failed to get perrname since %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + return code; + } + transSockInfo2Str(&peername, pConn->dst); addrlen = sizeof(sockname); - TAOS_UNUSED(uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen)); - TAOS_UNUSED(transSockInfo2Str(&sockname, pConn->src)); + code = uv_tcp_getsockname((uv_tcp_t*)pConn->stream, &sockname, &addrlen); + if (code != 0) { + tWarn("failed to get sock name since %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + return code; + } + transSockInfo2Str(&sockname, pConn->src); struct sockaddr_in addr = *(struct sockaddr_in*)&sockname; struct sockaddr_in saddr = *(struct sockaddr_in*)&peername; @@ -1399,6 +1461,7 @@ int32_t cliConnSetSockInfo(SCliConn* pConn) { bool filteGetAll(void* q, void* arg) { return true; } void cliConnCb(uv_connect_t* req, int status) { + int32_t code = 0; SCliConn* pConn = req->data; SCliThrd* pThrd = pConn->hostThrd; bool timeout = false; @@ -1419,14 +1482,24 @@ void cliConnCb(uv_connect_t* req, int status) { if (status != 0) { tDebug("%s conn %p failed to connect to %s, reason:%s", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr, uv_strerror(status)); - (void)transUnrefCliHandle(pConn); + TAOS_UNUSED(transUnrefCliHandle(pConn)); return; } pConn->connnected = 1; - (void)cliConnSetSockInfo(pConn); + code = cliConnSetSockInfo(pConn); + if (code != 0) { + tDebug("%s conn %p failed to get sock info,reason:%s ", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr, + tstrerror(code)); + TAOS_UNUSED(transUnrefCliHandle(pConn)); + } tTrace("%s conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); - (void)cliBatchSend(pConn); + code = cliBatchSend(pConn); + if (code != 0) { + tDebug("%s conn %p failed to get sock info,reason:%s ", CONN_GET_INST_LABEL(pConn), pConn, pConn->dstAddr, + tstrerror(code)); + TAOS_UNUSED(transUnrefCliHandle(pConn)); + } } static void doNotifyCb(SCliReq* pReq, SCliThrd* pThrd, int32_t code) { @@ -1637,17 +1710,17 @@ int32_t cliMayGetStateByQid(SCliThrd* pThrd, SCliReq* pReq, SCliConn** pConn) { if (pState == NULL) { if (pReq->ctx == NULL) { - (void)transReleaseExHandle(transGetRefMgt(), qid); + transReleaseExHandle(transGetRefMgt(), qid); return TSDB_CODE_RPC_STATE_DROPED; } tDebug("%s conn %p failed to get statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); - (void)transReleaseExHandle(transGetRefMgt(), qid); + transReleaseExHandle(transGetRefMgt(), qid); return TSDB_CODE_RPC_ASYNC_IN_PROCESS; } else { *pConn = pState->conn; tDebug("%s conn %p succ to get conn of statue, qid:%" PRId64 "", transLabel(pThrd->pInst), pConn, qid); } - (void)transReleaseExHandle(transGetRefMgt(), qid); + transReleaseExHandle(transGetRefMgt(), qid); return 0; } } @@ -1668,7 +1741,7 @@ int32_t cliHandleState_mayUpdateState(SCliConn* pConn, SCliReq* pReq) { tDebug("%s conn %p succ to add statue, qid:%" PRId64 " (1)", transLabel(pThrd->pInst), pConn, qid); } - (void)cliHandleState_mayUpdateStateCtx(pConn, pReq); + TAOS_UNUSED(cliHandleState_mayUpdateStateCtx(pConn, pReq)); return code; } void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) { @@ -1681,7 +1754,7 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) { SCliConn* pConn = NULL; code = cliMayGetStateByQid(pThrd, pReq, &pConn); if (code == 0) { - (void)cliHandleState_mayUpdateStateCtx(pConn, pReq); + TAOS_UNUSED(cliHandleState_mayUpdateStateCtx(pConn, pReq)); } else if (code == TSDB_CODE_RPC_STATE_DROPED) { TAOS_CHECK_GOTO(code, &lino, _exception); return; @@ -1702,9 +1775,12 @@ void cliHandleBatchReq(SCliThrd* pThrd, SCliReq* pReq) { // do nothing, notiy return; } else if (code == 0) { - (void)addConnToHeapCache(pThrd->connHeapCache, pConn); + code = addConnToHeapCache(pThrd->connHeapCache, pConn); + if (code != 0) { + TAOS_CHECK_GOTO(code, &lino, _exception); + } } else { - // do nothing, notiy + TAOS_CHECK_GOTO(code, &lino, _exception); return; } } @@ -1719,7 +1795,11 @@ _exception: resp.code = code; STraceId* trace = &pReq->msg.info.traceId; tGWarn("%s failed to process req, reason:%s", pInst->label, tstrerror(code)); - (void)(pThrd->notifyExceptCb)(pThrd, pReq, &resp); + + code = (pThrd->notifyExceptCb)(pThrd, pReq, &resp); + if (code != 0) { + tGWarn("%s failed to notify user since %s", pInst->label, tstrerror(code)); + } return; } @@ -1948,7 +2028,7 @@ static void* cliWorkThread(void* arg) { pThrd->pid = taosGetSelfPthreadId(); tsEnableRandErr = true; - (void)strtolower(threadName, pThrd->pInst->label); + TAOS_UNUSED(strtolower(threadName, pThrd->pInst->label)); setThreadName(threadName); TAOS_UNUSED(uv_run(pThrd->loop, UV_RUN_DEFAULT)); @@ -2075,10 +2155,17 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); if (timer == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + TAOS_CHECK_GOTO(terrno, NULL, _end); + } + code = uv_timer_init(pThrd->loop, timer); + if (code != 0) { + tError("failed to init timer since %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + TAOS_CHECK_GOTO(code, NULL, _end); + } + if (taosArrayPush(pThrd->timerList, &timer) == NULL) { + TAOS_CHECK_GOTO(terrno, NULL, _end); } - (void)uv_timer_init(pThrd->loop, timer); - (void)taosArrayPush(pThrd->timerList, &timer); } pThrd->pool = createConnPool(4); @@ -2137,7 +2224,7 @@ _end: TAOS_UNUSED(uv_loop_close(pThrd->loop)); taosMemoryFree(pThrd->loop); - (void)taosThreadMutexDestroy(&pThrd->msgMtx); + TAOS_UNUSED((taosThreadMutexDestroy(&pThrd->msgMtx))); transAsyncPoolDestroy(pThrd->asyncPool); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i); @@ -2169,7 +2256,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { } CLI_RELEASE_UV(pThrd->loop); - (void)taosThreadMutexDestroy(&pThrd->msgMtx); + TAOS_UNUSED(taosThreadMutexDestroy(&pThrd->msgMtx)); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliReq, destroyReqWrapper, (void*)pThrd); transAsyncPoolDestroy(pThrd->asyncPool); @@ -2284,26 +2371,38 @@ static FORCE_INLINE void doCloseIdleConn(void* param) { taosMemoryFree(arg); } static FORCE_INLINE void cliPerfLog_schedMsg(SCliReq* pReq, char* label) { + int32_t code = 0; if (!(rpcDebugFlag & DEBUG_DEBUG)) { return; } SReqCtx* pCtx = pReq->ctx; STraceId* trace = &pReq->msg.info.traceId; char tbuf[512] = {0}; - (void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + + code = epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + if (code != 0) { + tWarn("failed to debug epset since %s", tstrerror(code)); + return; + } tGDebug("%s retry on next node,use:%s, step: %d,timeout:%" PRId64 "", label, tbuf, pCtx->retryStep, pCtx->retryNextInterval); return; } static FORCE_INLINE void cliPerfLog_epset(SCliConn* pConn, SCliReq* pReq) { + int32_t code = 0; if (!(rpcDebugFlag & DEBUG_TRACE)) { return; } SReqCtx* pCtx = pReq->ctx; char tbuf[512] = {0}; - (void)epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + + code = epsetToStr(&pCtx->epSet, tbuf, tListLen(tbuf)); + if (code != 0) { + tWarn("failed to debug epset since %s", tstrerror(code)); + return; + } tTrace("%s conn %p extract epset from msg", CONN_GET_INST_LABEL(pConn), pConn); return; } @@ -2699,7 +2798,7 @@ static FORCE_INLINE SCliThrd* transGetWorkThrdFromHandle(STrans* trans, int64_t pThrd = exh->pThrd; taosWUnLockLatch(&exh->latch); - TAOS_UNUSED(transReleaseExHandle(transGetRefMgt(), handle)); + transReleaseExHandle(transGetRefMgt(), handle); return pThrd; } @@ -2806,16 +2905,16 @@ int32_t transSendRequest(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { destroyReq(pCliMsg); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return 0; _exception: transFreeMsg(pReq->pCont); pReq->pCont = NULL; - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, int64_t* transpointId) { @@ -2852,16 +2951,16 @@ int32_t transSendRequestWithId(void* pInstRef, const SEpSet* pEpSet, STransMsg* EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { destroyReq(pCliMsg); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return 0; _exception: transFreeMsg(pReq->pCont); pReq->pCont = NULL; - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } @@ -2913,7 +3012,7 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr SCliReq* pCliReq = taosMemoryCalloc(1, sizeof(SCliReq)); if (pCliReq == NULL) { - (void)tsem_destroy(sem); + (TAOS_UNUSED(tsem_destroy(sem))); taosMemoryFree(sem); taosMemoryFree(pCtx); TAOS_CHECK_GOTO(terrno, NULL, _RETURN1); @@ -2940,11 +3039,11 @@ int32_t transSendRecv(void* pInstRef, const SEpSet* pEpSet, STransMsg* pReq, STr _RETURN: tsem_destroy(sem); taosMemoryFree(sem); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); taosMemoryFree(pTransRsp); return code; _RETURN1: - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); taosMemoryFree(pTransRsp); taosMemoryFree(pReq->pCont); pReq->pCont = NULL; @@ -3066,15 +3165,15 @@ int32_t transSendRecvWithTimeout(void* pInstRef, SEpSet* pEpSet, STransMsg* pReq } } _RETURN: - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); - (void)taosReleaseRef(transGetSyncMsgMgt(), ref); - (void)taosRemoveRef(transGetSyncMsgMgt(), ref); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + TAOS_UNUSED(taosReleaseRef(transGetSyncMsgMgt(), ref)); + TAOS_UNUSED(taosRemoveRef(transGetSyncMsgMgt(), ref)); return code; _RETURN2: transFreeMsg(pReq->pCont); pReq->pCont = NULL; taosMemoryFree(pTransMsg); - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } /* @@ -3128,7 +3227,7 @@ int32_t transSetDefaultAddr(void* pInstRef, const char* ip, const char* fqdn) { } } - (void)transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); + transReleaseExHandle(transGetInstMgt(), (int64_t)pInstRef); return code; } @@ -3243,7 +3342,7 @@ static FORCE_INLINE bool filterToDebug(void* e, void* arg) { tGWarn("%s is sent to, and no resp from server", TMSG_INFO(pReq->msg.msgType)); return false; } -static FORCE_INLINE int32_t logConnMissHit(SCliConn* pConn) { +static FORCE_INLINE void logConnMissHit(SCliConn* pConn) { // queue set; // QUEUE_INIT(&set); pConn->heapMissHit++; @@ -3253,7 +3352,6 @@ static FORCE_INLINE int32_t logConnMissHit(SCliConn* pConn) { // if (transQueueSize(&pConn->reqsSentOut) >= BUFFER_LIMIT) { // transQueueRemoveByFilter(&pConn->reqsSentOut, filterToDebug, NULL, &set, 1); // } - return 0; } static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { int code = 0; @@ -3275,7 +3373,7 @@ static SCliConn* getConnFromHeapCache(SHashObj* pConnHeapCache, char* key) { int32_t stateNum = taosHashGetSize(pConn->pQTable); if (shouldSWitchToOtherConn(reqsNum, reqsSentOut, stateNum)) { - (void)logConnMissHit(pConn); + logConnMissHit(pConn); return NULL; } } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 49f7c6e32b..59be5c4bde 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -97,13 +97,12 @@ void transFreeMsg(void* msg) { tTrace("rpc free cont:%p", (char*)msg - TRANS_MSG_OVERHEAD); taosMemoryFree((char*)msg - sizeof(STransMsgHead)); } -int transSockInfo2Str(struct sockaddr* sockname, char* dst) { +void transSockInfo2Str(struct sockaddr* sockname, char* dst) { struct sockaddr_in addr = *(struct sockaddr_in*)sockname; char buf[20] = {0}; int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf)); sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); - return r; } int32_t transInitBuffer(SConnBuffer* buf) { buf->buf = taosMemoryCalloc(1, BUFFER_CAP); @@ -118,10 +117,9 @@ int32_t transInitBuffer(SConnBuffer* buf) { buf->invalid = 0; return 0; } -int32_t transDestroyBuffer(SConnBuffer* p) { +void transDestroyBuffer(SConnBuffer* p) { taosMemoryFree(p->buf); p->buf = NULL; - return 0; } int32_t transClearBuffer(SConnBuffer* buf) { @@ -335,11 +333,12 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { SAsyncItem* item = async->data; if (taosThreadMutexLock(&item->mtx) != 0) { - tError("failed to lock mutex"); + tError("failed to lock mutex since %s", tstrerror(terrno)); + return terrno; } QUEUE_PUSH(&item->qmsg, q); - (void)taosThreadMutexUnlock(&item->mtx); + TAOS_UNUSED(taosThreadMutexUnlock(&item->mtx)); int ret = uv_async_send(async); if (ret != 0) { @@ -426,12 +425,10 @@ int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) { wq->size = 0; return 0; } -int32_t transQueuePush(STransQueue* q, void* arg) { +void transQueuePush(STransQueue* q, void* arg) { queue* node = arg; QUEUE_PUSH(&q->node, node); q->size++; - - return 0; } void* transQueuePop(STransQueue* q) { if (q->size == 0) return NULL; @@ -737,18 +734,24 @@ int64_t transAddExHandle(int32_t refMgt, void* p) { // acquire extern handle return taosAddRef(refMgt, p); } -int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) { +void transRemoveExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle - return taosRemoveRef(refMgt, refId); + int32_t code = taosRemoveRef(refMgt, refId); + if (code != 0) { + tTrace("failed to remove %" PRId64 " from resetId:%d", refId, refMgt); + } } void* transAcquireExHandle(int32_t refMgt, int64_t refId) { // acquire extern handle return (void*)taosAcquireRef(refMgt, refId); } -int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) { +void transReleaseExHandle(int32_t refMgt, int64_t refId) { // release extern handle - return taosReleaseRef(refMgt, refId); + int32_t code = taosReleaseRef(refMgt, refId); + if (code != 0) { + tTrace("failed to release %" PRId64 " from resetId:%d", refId, refMgt); + } } void transDestroyExHandle(void* handle) { if (handle == NULL) { @@ -869,15 +872,22 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) { // } int32_t initWQ(queue* wq) { + int32_t code = 0; QUEUE_INIT(wq); for (int i = 0; i < 4; i++) { SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper)); + if (w == NULL) { + TAOS_CHECK_GOTO(terrno, NULL, _exception); + } w->wreq.data = w; w->arg = NULL; QUEUE_INIT(&w->node); QUEUE_PUSH(wq, &w->q); } return 0; +_exception: + destroyWQ(wq); + return code; } void destroyWQ(queue* wq) { while (!QUEUE_IS_EMPTY(wq)) { @@ -899,6 +909,9 @@ uv_write_t* allocWReqFromWQ(queue* wq, void* arg) { return &w->wreq; } else { SWReqsWrapper* w = taosMemoryCalloc(1, sizeof(SWReqsWrapper)); + if (w == NULL) { + return NULL; + } w->wreq.data = w; w->arg = arg; QUEUE_INIT(&w->node); @@ -910,3 +923,15 @@ void freeWReqToWQ(queue* wq, SWReqsWrapper* w) { QUEUE_INIT(&w->node); QUEUE_PUSH(wq, &w->q); } + +int32_t transSetReadOption(uv_handle_t* handle) { + int32_t code = 0; + int32_t fd; + int ret = uv_fileno((uv_handle_t*)handle, &fd); + if (ret != 0) { + tWarn("failed to get fd since %s", uv_err_name(ret)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + code = taosSetSockOpt2(fd); + return code; +} \ No newline at end of file diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 17a4de1737..4d1410bee1 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -161,8 +161,8 @@ static void uvFreeCb(uv_handle_t* handle); static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg); -static int uvPrepareSendData(SSvrRespMsg* msg, uv_buf_t* wb); -static void uvStartSendResp(SSvrRespMsg* msg); +static int32_t uvPrepareSendData(SSvrRespMsg* msg, uv_buf_t* wb); +static void uvStartSendResp(SSvrRespMsg* msg); static void uvNotifyLinkBrokenToApp(SSvrConn* conn); @@ -453,7 +453,10 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { (pInst->cfp)(pInst->parent, &(arg->msg), NULL); tTrace("conn %p recv release, notify server app, qid:%" PRId64 "", pConn, qId); - (void)taosHashRemove(pConn->pQTable, &qId, sizeof(qId)); + code = taosHashRemove(pConn->pQTable, &qId, sizeof(qId)); + if (code != 0) { + tDebug("conn %p failed to remove qid:%" PRId64 "", pConn, qId); + } tTrace("conn %p clear state,qid:%" PRId64 "", pConn, qId); } @@ -464,11 +467,15 @@ static int32_t uvMayHandleReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { .info.seqNum = taosHton64(pHead->seqNum)}; SSvrRespMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrRespMsg)); + if (srvMsg == NULL) { + tError("conn %p recv release, but invalid qid:%" PRId64 "", pConn, qId); + return terrno; + } srvMsg->msg = tmsg; srvMsg->type = Normal; srvMsg->pConn = pConn; - (void)transQueuePush(&pConn->resps, &srvMsg->q); + transQueuePush(&pConn->resps, &srvMsg->q); uvStartSendRespImpl(srvMsg); taosMemoryFree(pHead); @@ -582,14 +589,16 @@ static bool uvHandleReq(SSvrConn* pConn) { pConnInfo->clientPort = pConn->port; tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); - (void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId); + transReleaseExHandle(uvGetConnRefOfThrd(pThrd), pConn->refId); (*pInst->cfp)(pInst->parent, &transMsg, NULL); return true; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + int32_t code = 0; SSvrConn* conn = cli->data; + STrans* pInst = conn->pInst; SWorkThrd* pThrd = conn->hostThrd; STUB_RAND_NETWORK_ERR(nread); @@ -599,10 +608,11 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { destroyConn(conn, true); return; } - STrans* pInst = conn->pInst; - int32_t fd = 0; - (void)uv_fileno((uv_handle_t*)cli, &fd); - (void)taosSetSockOpt2(fd); + + code = transSetReadOption((uv_handle_t*)cli); + if (code != 0) { + tWarn("%s conn %p failed to set recv opt since %s", transLabel(pInst), conn, code); + } SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { @@ -705,7 +715,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { taosMemoryFree(req); } -static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { +static int32_t uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { SSvrConn* pConn = smsg->pConn; STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { @@ -726,12 +736,10 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { pHead->withUserInfo = pConn->userInited == 0 ? 1 : 0; // handle invalid drop_task resp, TD-20098 - if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - // (void)transQueuePop(&pConn->resps); - // destroySmsg(smsg); - // return TSDB_CODE_INVALID_MSG; - return 0; - } + // if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { + // // return TSDB_CODE_INVALID_MSG; + // return 0; + // } pHead->msgType = (0 == pMsg->msgType ? pConn->inType + 1 : pMsg->msgType); // pHead->msgType = pMsg->msgType; @@ -758,6 +766,7 @@ static int uvPrepareSendData(SSvrRespMsg* smsg, uv_buf_t* wb) { return 0; } static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* bufNum, queue* toSendQ) { + int32_t code = 0; int32_t size = transQueueSize(&pConn->resps); tDebug("%s conn %p has %d msg to send", transLabel(pConn->pInst), pConn, size); if (size == 0) { @@ -766,6 +775,9 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf if (pConn->bufSize < size) { pConn->buf = taosMemoryRealloc(pConn->buf, size * sizeof(uv_buf_t)); + if (pConn->buf == NULL) { + return terrno; + } pConn->bufSize = size; } uv_buf_t* pWb = pConn->buf; @@ -776,7 +788,10 @@ static int32_t uvBuildToSendData(SSvrConn* pConn, uv_buf_t** ppBuf, int32_t* buf queue* el = transQueuePop(&pConn->resps); SSvrRespMsg* pMsg = QUEUE_DATA(el, SSvrRespMsg, q); uv_buf_t wb; - (void)uvPrepareSendData(pMsg, &wb); + code = uvPrepareSendData(pMsg, &wb); + if (code != 0) { + return code; + } pWb[count] = wb; pMsg->sent = 1; QUEUE_PUSH(toSendQ, &pMsg->q); @@ -805,7 +820,12 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) { return; } - uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn); + uv_write_t* req = allocWReqFromWQ(&pConn->wq, pConn); + if (req == NULL) { + uError("%s conn %p failed to alloc write req since %s", transLabel(pConn->pInst), pConn, tstrerror(terrno)); + transUnrefSrvHandle(pConn); + return; + } SWReqsWrapper* pWreq = req->data; uv_buf_t* pBuf = NULL; @@ -838,6 +858,7 @@ static FORCE_INLINE void uvStartSendRespImpl(SSvrRespMsg* smsg) { } } int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) { + int32_t code = 0; SSvrConn* pConn = pMsg->pConn; int64_t qid = pMsg->msg.info.qId; if (pMsg->msg.msgType == TDMT_SCH_TASK_RELEASE && qid > 0) { @@ -847,7 +868,10 @@ int32_t uvMayHandleReleaseResp(SSvrRespMsg* pMsg) { return TSDB_CODE_RPC_NO_STATE; } else { transFreeMsg(p->msg.pCont); - (void)taosHashRemove(pConn->pQTable, &qid, sizeof(qid)); + code = taosHashRemove(pConn->pQTable, &qid, sizeof(qid)); + if (code != 0) { + tError("%s conn %p failed release qid:%d since %s", tstrerror(code)); + } } } return 0; @@ -860,7 +884,7 @@ static void uvStartSendResp(SSvrRespMsg* smsg) { return; } - (void)transQueuePush(&pConn->resps, &smsg->q); + transQueuePush(&pConn->resps, &smsg->q); uvStartSendRespImpl(smsg); return; } @@ -928,12 +952,12 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SExHandle* exh2 = transAcquireExHandle(uvGetConnRefOfThrd(pThrd), refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - (void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId); + transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - (void)transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId); + transReleaseExHandle(uvGetConnRefOfThrd(pThrd), refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -1031,6 +1055,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + int32_t code = 0; STUB_RAND_NETWORK_ERR(nread); if (nread < 0) { if (nread != UV_EOF) { @@ -1100,9 +1125,16 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { pConn->serverIp = saddr.sin_addr.s_addr; pConn->port = ntohs(addr.sin_port); - (void)transSetConnOption((uv_tcp_t*)pConn->pTcp, 20); - (void)uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb); - + code = transSetConnOption((uv_tcp_t*)pConn->pTcp, 20); + if (code != 0) { + tWarn("failed to set tcp option since %s", tstrerror(code)); + } + code = uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocRecvBufferCb, uvOnRecvCb); + if (code != 0) { + tWarn("%s conn %p failed to start to read since %s", uv_err_name(code)); + transUnrefSrvHandle(pConn); + return; + } } else { tDebug("failed to create new connection"); transUnrefSrvHandle(pConn); @@ -1225,10 +1257,14 @@ static int32_t addHandleToAcceptloop(void* arg) { } void* transWorkerThread(void* arg) { + int32_t code = 0; setThreadName("trans-svr-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; tsEnableRandErr = true; - (void)uv_run(pThrd->loop, UV_RUN_DEFAULT); + code = uv_run(pThrd->loop, UV_RUN_DEFAULT); + if (code != 0) { + tWarn("failed to start to worker thread since %s", uv_err_name(code)); + } return NULL; } @@ -1318,7 +1354,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { _end: if (pConn) { transQueueDestroy(&pConn->resps); - (void)transDestroyBuffer(&pConn->readBuf); + transDestroyBuffer(&pConn->readBuf); taosHashCleanup(pConn->pQTable); taosMemoryFree(pConn->pTcp); destroyWQ(&pConn->wq); @@ -1369,8 +1405,8 @@ static void uvDestroyConn(uv_handle_t* handle) { } SWorkThrd* thrd = conn->hostThrd; - (void)transReleaseExHandle(uvGetConnRefOfThrd(thrd), conn->refId); - (void)transRemoveExHandle(uvGetConnRefOfThrd(thrd), conn->refId); + transReleaseExHandle(uvGetConnRefOfThrd(thrd), conn->refId); + transRemoveExHandle(uvGetConnRefOfThrd(thrd), conn->refId); STrans* pInst = thrd->pInst; tDebug("%s conn %p destroy", transLabel(pInst), conn); @@ -1383,7 +1419,7 @@ static void uvDestroyConn(uv_handle_t* handle) { uvConnDestroyAllState(conn); - (void)transDestroyBuffer(&conn->readBuf); + transDestroyBuffer(&conn->readBuf); destroyWQ(&conn->wq); taosMemoryFree(conn->buf); @@ -1839,15 +1875,15 @@ int32_t transReleaseSrvHandle(void* handle) { qId); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(info->refIdMgt, refId); + transReleaseExHandle(info->refIdMgt, refId); return code; } - (void)transReleaseExHandle(info->refIdMgt, refId); + transReleaseExHandle(info->refIdMgt, refId); return 0; _return1: tDebug("handle %p failed to send to release handle", exh); - (void)transReleaseExHandle(info->refIdMgt, refId); + transReleaseExHandle(info->refIdMgt, refId); return code; _return2: tDebug("handle %p failed to send to release handle", exh); @@ -1893,17 +1929,17 @@ int32_t transSendResponse(const STransMsg* msg) { tGDebug("conn %p start to send resp (1/2)", exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(msg->info.refIdMgt, refId); + transReleaseExHandle(msg->info.refIdMgt, refId); return code; } - (void)transReleaseExHandle(msg->info.refIdMgt, refId); + transReleaseExHandle(msg->info.refIdMgt, refId); return 0; _return1: tDebug("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - (void)transReleaseExHandle(msg->info.refIdMgt, refId); + transReleaseExHandle(msg->info.refIdMgt, refId); return code; _return2: tDebug("handle %p failed to send resp", exh); @@ -1941,17 +1977,17 @@ int32_t transRegisterMsg(const STransMsg* msg) { tDebug("%s conn %p start to register brokenlink callback", transLabel(pInst), exh->handle); if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); - (void)transReleaseExHandle(msg->info.refIdMgt, refId); + transReleaseExHandle(msg->info.refIdMgt, refId); return code; } - (void)transReleaseExHandle(msg->info.refIdMgt, refId); + transReleaseExHandle(msg->info.refIdMgt, refId); return 0; _return1: tDebug("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - (void)transReleaseExHandle(msg->info.refIdMgt, refId); + transReleaseExHandle(msg->info.refIdMgt, refId); return code; _return2: tDebug("handle %p failed to register brokenlink", exh); @@ -1996,7 +2032,7 @@ int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { break; } } - TAOS_UNUSED(transReleaseExHandle(transGetInstMgt(), (int64_t)thandle)); + transReleaseExHandle(transGetInstMgt(), (int64_t)thandle); if (code != 0) { tError("ip-white-list update failed since %s", tstrerror(code));