diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 41c9184d27..741bd5ceb4 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -90,6 +90,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) #define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) #define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025) +#define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026) +#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index acb9bd20f3..cc744fe14f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -103,11 +103,11 @@ typedef void* queue[2]; #define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) -typedef struct SRpcMsg STransMsg; -typedef SRpcCtx STransCtx; -typedef SRpcCtxVal STransCtxVal; -typedef SRpcInfo STrans; -typedef SRpcConnInfo STransHandleInfo; +typedef struct SRpcMsg STransMsg; +typedef SRpcCtx STransCtx; +typedef SRpcCtxVal STransCtxVal; +typedef SRpcInfo STrans; +typedef SRpcConnInfo STransHandleInfo; // ref mgt handle typedef struct SExHandle { @@ -250,10 +250,10 @@ typedef struct { int8_t stop; } SAsyncPool; -SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); -void transAsyncPoolDestroy(SAsyncPool* pool); -int transAsyncSend(SAsyncPool* pool, queue* mq); -bool transAsyncPoolIsEmpty(SAsyncPool* pool); +int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool); +void transAsyncPoolDestroy(SAsyncPool* pool); +int transAsyncSend(SAsyncPool* pool, queue* mq); +bool transAsyncPoolIsEmpty(SAsyncPool* pool); #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \ do { \ diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index ba12774c18..10a96d8af0 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -76,9 +76,9 @@ static void httpHandleReq(SHttpMsg* msg); static void httpHandleQuit(SHttpMsg* msg); static int32_t httpSendQuit(SHttpModule* http, int64_t chanId); -static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, - EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); -static void httpDestroyMsg(SHttpMsg* msg); +static int32_t httpCreateMsg(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, + EHttpCompFlag flag, int64_t chanId, SHttpMsg** httpMsg); +static void httpDestroyMsg(SHttpMsg* msg); static bool httpFailFastShoudIgnoreMsg(SHashObj* pTable, char* server, int16_t port); static void httpFailFastMayUpdate(SHashObj* pTable, char* server, int16_t port, int8_t succ); @@ -91,27 +91,27 @@ static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, static int32_t taosBuildHttpHeader(const char* server, const char* uri, int32_t contLen, char* pHead, int32_t headLen, - EHttpCompFlag flag) { - int32_t code = 0; + EHttpCompFlag flag) { + int32_t code = 0; int32_t len = 0; if (flag == HTTP_FLAT) { len = snprintf(pHead, headLen, - "POST %s HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Length: %d\n\n", - uri, server, contLen); + "POST %s HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + uri, server, contLen); if (len < 0 || len >= headLen) { code = TSDB_CODE_OUT_OF_RANGE; } } else if (flag == HTTP_GZIP) { len = snprintf(pHead, headLen, - "POST %s HTTP/1.1\n" - "Host: %s\n" - "Content-Type: application/json\n" - "Content-Encoding: gzip\n" - "Content-Length: %d\n\n", - uri, server, contLen); + "POST %s HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Encoding: gzip\n" + "Content-Length: %d\n\n", + uri, server, contLen); if (len < 0 || len >= headLen) { code = TSDB_CODE_OUT_OF_RANGE; } @@ -127,7 +127,7 @@ static int32_t taosCompressHttpRport(char* pSrc, int32_t srcLen) { void* pDest = taosMemoryMalloc(destLen); if (pDest == NULL) { - code= TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } @@ -184,7 +184,7 @@ _OVER: if (code == 0) { memcpy(pSrc, pDest, gzipStream.total_out); code = gzipStream.total_out; - } + } taosMemoryFree(pDest); return code; } @@ -635,8 +635,8 @@ void httpModuleDestroy2(SHttpModule* http) { static int32_t taosSendHttpReportImplByChan(const char* server, const char* uri, uint16_t port, char* pCont, int32_t contLen, EHttpCompFlag flag, int64_t chanId) { SHttpModule* load = NULL; - SHttpMsg *msg = NULL; - int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId,&msg); + SHttpMsg* msg = NULL; + int32_t code = httpCreateMsg(server, uri, port, pCont, contLen, flag, chanId, &msg); if (code != 0) { goto _ERROR; } @@ -718,9 +718,8 @@ int64_t transInitHttpChanImpl() { goto _ERROR; } - http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - if (http->asyncPool == NULL) { - code = terrno; + code = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb, &http->asyncPool); + if (code != 0) { goto _ERROR; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index cf466f3cd9..fa7ae8c376 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2063,6 +2063,7 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { } static SCliThrd* createThrdObj(void* trans) { + int32_t code = 0; STrans* pTransInst = trans; SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd)); @@ -2080,10 +2081,9 @@ static SCliThrd* createThrdObj(void* trans) { return NULL; } int32_t nSync = pTransInst->supportBatch ? 4 : 8; - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb); - - if (pThrd->asyncPool == NULL) { - tError("failed to init async pool"); + code = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb, &pThrd->asyncPool); + if (code != 0) { + tError("failed to init async pool since:%s", tstrerror(code)); uv_loop_close(pThrd->loop); taosMemoryFree(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); @@ -2561,9 +2561,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { return pThrd; } int transReleaseCliHandle(void* handle) { - int idx = -1; - bool valid = false; - + int32_t code = 0; SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); if (pThrd == NULL) { return TSDB_CODE_RPC_BROKEN_LINK; @@ -2573,9 +2571,17 @@ int transReleaseCliHandle(void* handle) { TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCtx->ahandle = tmsg.info.ahandle; SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cmsg == NULL) { + taosMemoryFree(pCtx); + return TSDB_CODE_OUT_OF_MEMORY; + } cmsg->msg = tmsg; cmsg->st = taosGetTimestampUs(); cmsg->type = Release; @@ -2584,15 +2590,19 @@ int transReleaseCliHandle(void* handle) { STraceId* trace = &tmsg.info.traceId; tGDebug("send release request at thread:%08" PRId64 ", malloc memory:%p", pThrd->pid, cmsg); - if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &cmsg->q)) != 0) { destroyCmsg(cmsg); - return TSDB_CODE_RPC_BROKEN_LINK; + return code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code; } - return 0; + return code; } -static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { +static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliMsg** pCliMsg) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); @@ -2602,13 +2612,20 @@ static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pRe if (ctx != NULL) pCtx->appCtx = *ctx; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + return TSDB_CODE_OUT_OF_MEMORY; + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; QUEUE_INIT(&cliMsg->seqq); - return cliMsg; + *pCliMsg = cliMsg; + + return 0; } int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { @@ -2617,7 +2634,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transFreeMsg(pReq->pCont); return TSDB_CODE_RPC_BROKEN_LINK; } - + int32_t code = 0; int64_t handle = (int64_t)pReq->info.handle; SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); if (pThrd == NULL) { @@ -2630,7 +2647,10 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran if (exh != NULL) { taosWLockLatch(&exh->latch); if (exh->handle == NULL && exh->inited != 0) { - SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + SCliMsg* pCliMsg = NULL; + code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); + ASSERT(code == 0); + QUEUE_PUSH(&exh->q, &pCliMsg->seqq); taosWUnLockLatch(&exh->latch); tDebug("msg refId: %" PRId64 "", handle); @@ -2642,43 +2662,65 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetRefMgt(), handle); } } - SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + + SCliMsg* pCliMsg = NULL; + code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); + if (code != 0) { + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; + } STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) { + if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { destroyCmsg(pCliMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; } int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - taosMemoryFree(pTransRsp); return TSDB_CODE_RPC_BROKEN_LINK; } + int32_t code = 0; + + STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransRsp == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - taosMemoryFree(pTransRsp); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN1); } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); - tsem_init(sem, 0, 0); + if (sem == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + + code = tsem_init(sem, 0, 0); + if (code != 0) { + taosMemoryFree(sem); + code = TAOS_SYSTEM_ERROR(errno); + TAOS_CHECK_GOTO(code, NULL, _RETURN1); + } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + sem_destroy(sem); + taosMemoryFree(sem); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); pCtx->ahandle = pReq->info.ahandle; @@ -2687,6 +2729,13 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs pCtx->pRsp = pTransRsp; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + sem_destroy(sem); + taosMemoryFree(sem); + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); @@ -2697,11 +2746,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); - if (ret != 0) { + code = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (code != 0) { destroyCmsg(cliMsg); - ret = TSDB_CODE_RPC_BROKEN_LINK; - goto _RETURN; + TAOS_CHECK_GOTO((code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code), NULL, _RETURN); } tsem_wait(sem); @@ -2712,13 +2760,28 @@ _RETURN: taosMemoryFree(sem); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); taosMemoryFree(pTransRsp); - return ret; + return code; +_RETURN1: + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + taosMemoryFree(pTransRsp); + taosMemoryFree(pReq->pCont); + return code; } -int64_t transCreateSyncMsg(STransMsg* pTransMsg) { +int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) { + int32_t code = 0; tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t)); - tsem2_init(sem, 0, 0); + if (sem == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (tsem2_init(sem, 0, 0) != 0) { + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _EXIT); + } STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); + if (pSyncMsg == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _EXIT); + } taosInitRWLatch(&pSyncMsg->latch); pSyncMsg->inited = 0; @@ -2726,39 +2789,69 @@ int64_t transCreateSyncMsg(STransMsg* pTransMsg) { pSyncMsg->pSem = sem; pSyncMsg->hasEpSet = 0; - return taosAddRef(transGetSyncMsgMgt(), pSyncMsg); + int64_t id = taosAddRef(transGetSyncMsgMgt(), pSyncMsg); + if (id < 0) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _EXIT); + } else { + *refId = id; + } + return 0; + +_EXIT: + tsem2_destroy(sem); + taosMemoryFree(sem); + taosMemoryFree(pSyncMsg); + return code; } int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + int32_t code = 0; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - taosMemoryFree(pTransMsg); return TSDB_CODE_RPC_BROKEN_LINK; } + STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransMsg == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - taosMemoryFree(pTransMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN2); } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg); + + if ((code = transCreateSyncMsg(pTransMsg, &pCtx->syncMsgRef)) != 0) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(code, NULL, _RETURN2); + } int64_t ref = pCtx->syncMsgRef; STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref); + if (pSyncMsg == NULL) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _RETURN2); + } SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); @@ -2769,17 +2862,17 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); - if (ret != 0) { + code = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (code != 0) { destroyCmsg(cliMsg); - ret = TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code, NULL, _RETURN); goto _RETURN; } - ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs); - if (ret < 0) { + code = tsem2_timewait(pSyncMsg->pSem, timeoutMs); + if (code < 0) { pRsp->code = TSDB_CODE_TIMEOUT_ERROR; - ret = TSDB_CODE_TIMEOUT_ERROR; + code = TSDB_CODE_TIMEOUT_ERROR; } else { memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); pSyncMsg->pRsp->pCont = NULL; @@ -2787,13 +2880,18 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr epsetAssign(pEpSet, &pSyncMsg->epSet); *epUpdated = 1; } - ret = 0; + code = 0; } _RETURN: transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); taosReleaseRef(transGetSyncMsgMgt(), ref); taosRemoveRef(transGetSyncMsgMgt(), ref); - return ret; + return code; +_RETURN2: + transFreeMsg(pReq->pCont); + taosMemoryFree(pTransMsg); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; } /* * @@ -2810,11 +2908,24 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn)); cvtAddr.cvt = true; } - for (int i = 0; i < pTransInst->numOfThreads; i++) { + + int32_t code = 0; + int8_t i = 0; + for (i = 0; i < pTransInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pCtx->cvtAddr = cvtAddr; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } cliMsg->ctx = pCtx; cliMsg->type = Update; cliMsg->refId = (int64_t)shandle; @@ -2822,21 +2933,30 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid); - if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) { + if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) { destroyCmsg(cliMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + break; } } + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return 0; + return code; } int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); - ASSERT(exh == self); if (exh != self) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index b9223e7b39..d31ba34afe 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -67,7 +67,11 @@ int32_t transDecompressMsg(char** msg, int32_t len) { STransCompMsg* pComp = (STransCompMsg*)pCont; int32_t oriLen = htonl(pComp->contLen); - char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead)); + char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead)); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + STransMsgHead* pNewHead = (STransMsgHead*)buf; int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content, len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen); @@ -78,7 +82,7 @@ int32_t transDecompressMsg(char** msg, int32_t len) { taosMemoryFree(pHead); *msg = buf; if (decompLen != oriLen) { - return -1; + return TSDB_CODE_INVALID_MSG; } return 0; } @@ -222,19 +226,19 @@ int transSetConnOption(uv_tcp_t* stream, int keepalive) { // int ret = uv_tcp_keepalive(stream, 5, 60); } -SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { +int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); if (pool == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; + // return NULL; } + int32_t code = 0; pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); if (pool->asyncs == NULL) { taosMemoryFree(pool); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } int i = 0, err = 0; @@ -243,7 +247,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem)); if (item == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } item->pThrd = arg; @@ -254,7 +258,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) err = uv_async_init(loop, async, cb); if (err != 0) { tError("failed to init async, reason:%s", uv_err_name(err)); - terrno = TSDB_CODE_THIRDPARTY_ERROR; + code = TSDB_CODE_THIRDPARTY_ERROR; break; } } @@ -264,7 +268,9 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) pool = NULL; } - return pool; + *pPool = pool; + return 0; + // return pool; } void transAsyncPoolDestroy(SAsyncPool* pool) { @@ -289,7 +295,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool) { } int transAsyncSend(SAsyncPool* pool, queue* q) { if (atomic_load_8(&pool->stop) == 1) { - return -1; + return TSDB_CODE_RPC_ASYNC_MODULE_QUIT; } int idx = pool->index % pool->nAsync; @@ -303,7 +309,12 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { taosThreadMutexLock(&item->mtx); QUEUE_PUSH(&item->qmsg, q); taosThreadMutexUnlock(&item->mtx); - return uv_async_send(async); + int ret = uv_async_send(async); + if (ret != 0) { + tError("failed to send async,reason:%s", uv_err_name(ret)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + return 0; } void transCtxInit(STransCtx* ctx) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 674bb86fb5..48fce78e79 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -181,8 +181,8 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); -static bool addHandleToAcceptloop(void* arg); +static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); +static int32_t addHandleToAcceptloop(void* arg); #define SRV_RELEASE_UV(loop) \ do { \ @@ -221,8 +221,16 @@ static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) { } SIpWhiteListTab* uvWhiteListCreate() { SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab)); + if (pWhiteList == NULL) { + return NULL; + } pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK); + if (pWhiteList->pList == NULL) { + taosMemoryFree(pWhiteList); + return NULL; + } + pWhiteList->ver = -1; return pWhiteList; } @@ -1005,17 +1013,36 @@ void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } -static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { +static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { + int32_t code = 0; pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - if (0 != uv_loop_init(pThrd->loop)) { - return false; + if (pThrd->loop == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if ((code = uv_loop_init(pThrd->loop)) != 0) { + tError("failed to init loop since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } #if defined(WINDOWS) || defined(DARWIN) - uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + if (code != 0) { + tError("failed to init pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #else - uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - uv_pipe_open(pThrd->pipe, pThrd->fd); + code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + if (code != 0) { + tError("failed to init pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + + code = uv_pipe_open(pThrd->pipe, pThrd->fd); + if (code != 0) { + tError("failed to open pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #endif pThrd->pipe->data = pThrd; @@ -1023,50 +1050,90 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->msg); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - uv_prepare_init(pThrd->loop, pThrd->prepare); - uv_prepare_start(pThrd->prepare, uvPrepareCb); + if (pThrd->prepare == NULL) { + tError("failed to init prepare"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + code = uv_prepare_init(pThrd->loop, pThrd->prepare); + if (code != 0) { + tError("failed to init prepare since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + + code = uv_prepare_start(pThrd->prepare, uvPrepareCb); + if (code != 0) { + tError("failed to start prepare since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } pThrd->prepare->data = pThrd; // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb); + code = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb, &pThrd->asyncPool); + if (code != 0) { + tError("failed to init async pool since:%s", tstrerror(code)); + return code; + } #if defined(WINDOWS) || defined(DARWIN) - uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); + code = uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); + if (code != 0) { + tError("failed to start connect pipe:%s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + #else - uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + code = uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + if (code != 0) { + tError("failed to start read pipe:%s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #endif - return true; + return 0; } -static bool addHandleToAcceptloop(void* arg) { +static int32_t addHandleToAcceptloop(void* arg) { // impl later SServerObj* srv = arg; - int err = 0; - if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) { - tError("failed to init accept server:%s", uv_err_name(err)); - return false; + int code = 0; + if ((code = uv_tcp_init(srv->loop, &srv->server)) != 0) { + tError("failed to init accept server since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } // register an async here to quit server gracefully srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t)); - uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + if (srv->pAcceptAsync == NULL) { + tError("failed to create async since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + code = uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + if (code != 0) { + tError("failed to init async since:%s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } srv->pAcceptAsync->data = srv; struct sockaddr_in bind_addr; - uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); - if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { - tError("failed to bind:%s", uv_err_name(err)); - return false; + if ((code = uv_ip4_addr("0.0.0.0", srv->port, &bind_addr)) != 0) { + tError("failed to bind addr since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } - if ((err = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) { - tError("failed to listen:%s", uv_err_name(err)); - terrno = TSDB_CODE_RPC_PORT_EADDRINUSE; - return false; + + if ((code = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { + tError("failed to bind since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } - return true; + if ((code = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) { + tError("failed to listen since %s", uv_err_name(code)); + return TSDB_CODE_RPC_PORT_EADDRINUSE; + } + return 0; } + void* transWorkerThread(void* arg) { setThreadName("trans-svr-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; @@ -1079,6 +1146,9 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd) { SWorkThrd* pThrd = hThrd; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); + if (pConn == NULL) { + return NULL; + } transReqQueueInit(&pConn->wreqQueue); QUEUE_INIT(&pConn->queue); @@ -1204,24 +1274,41 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { } void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + int32_t code = 0; + SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj)); - srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); + if (srv == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tError("failed to init server since: %s", tstrerror(code)); + return NULL; + } + + srv->ip = ip; + srv->port = port; srv->numOfThreads = numOfThreads; srv->workerIdx = 0; srv->numOfWorkerReady = 0; + srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*)); srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*)); - srv->ip = ip; - srv->port = port; - uv_loop_init(srv->loop); + if (srv->loop == NULL || srv->pThreadObj == NULL || srv->pipe == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } - char pipeName[PATH_MAX]; + code = uv_loop_init(srv->loop); + if (code != 0) { + tError("failed to init server since: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } if (false == taosValidIpAndPort(srv->ip, srv->port)) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); goto End; } + char pipeName[PATH_MAX]; #if defined(WINDOWS) || defined(DARWIN) int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); @@ -1259,7 +1346,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd, pipeName)) { + if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) { goto End; } @@ -1276,27 +1363,53 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); + if (thrd == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } thrd->pTransInst = shandle; thrd->quit = false; thrd->pTransInst = shandle; thrd->pWhiteList = uvWhiteListCreate(); - - srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - srv->pThreadObj[i] = thrd; - - uv_os_sock_t fds[2]; - if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + if (thrd->pWhiteList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto End; } - uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); + if (srv->pipe[i] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } + + srv->pThreadObj[i] = thrd; + + uv_os_sock_t fds[2]; + if ((code = uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE)) != 0) { + tError("failed to create pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } + + code = uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + if (code != 0) { + tError("failed to init pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } + + code = uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + if (code != 0) { + tError("failed to init pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } thrd->pipe = &(srv->pipe[i][1]); // init read thrd->fd = fds[0]; - if (false == addHandleToWorkloop(thrd, pipeName)) { + if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) { goto End; } @@ -1311,15 +1424,17 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } #endif - if (false == addHandleToAcceptloop(srv)) { + if ((code = addHandleToAcceptloop(srv)) != 0) { goto End; } - int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); - if (err == 0) { + code = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); + if (code == 0) { tDebug("success to create accept-thread"); } else { - tError("failed to create accept-thread"); + code = TAOS_SYSTEM_ERROR(errno); + tError("failed to create accept-thread since %s", tstrerror(code)); + goto End; // clear all resource later } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4db1475fa9..69d6e6e131 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -55,8 +55,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") -TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")