diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 75a67ea484..f77a75b3f6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1075,10 +1075,10 @@ typedef struct { SUpdateUserIpWhite* pUserIpWhite; } SUpdateIpWhite; -int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); -int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); -void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); -SUpdateIpWhite* cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +int32_t tSerializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +int32_t tDeserializeSUpdateIpWhite(void* buf, int32_t bufLen, SUpdateIpWhite* pReq); +void tFreeSUpdateIpWhiteReq(SUpdateIpWhite* pReq); +int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite* pReq, SUpdateIpWhite** pUpdate); typedef struct { int64_t ipWhiteVer; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index b7a459f957..6c0d04354a 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -164,13 +164,13 @@ int rpcRegisterBrokenLinkArg(SRpcMsg *msg); int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock // These functions will not be called in the child process -int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); -int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, - int32_t timeoutMs); -int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); -void *rpcAllocHandle(); -void rpcSetIpWhite(void *thandl, void *arg); +int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); +int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, + int32_t timeoutMs); +int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); +void *rpcAllocHandle(); +int32_t rpcSetIpWhite(void *thandl, void *arg); int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 5ec18daab7..7ff3e500cf 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -90,6 +90,8 @@ int32_t taosGetErrSize(); #define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) #define TSDB_CODE_RPC_NETWORK_BUSY TAOS_DEF_ERROR_CODE(0, 0x0024) #define TSDB_CODE_HTTP_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0025) +#define TSDB_CODE_RPC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0026) +#define TSDB_CODE_RPC_ASYNC_MODULE_QUIT TAOS_DEF_ERROR_CODE(0, 0x0027) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c17b8ef526..a4f854a59e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1764,23 +1764,33 @@ int32_t tDeserializeSUpdateIpWhite(void *buf, int32_t bufLen, SUpdateIpWhite *pR return 0; } void tFreeSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { - for (int i = 0; i < pReq->numOfUser; i++) { - SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; - taosMemoryFree(pUserWhite->pIpRanges); + if (pReq == NULL) return; + + if (pReq->pUserIpWhite) { + for (int i = 0; i < pReq->numOfUser; i++) { + SUpdateUserIpWhite *pUserWhite = &pReq->pUserIpWhite[i]; + taosMemoryFree(pUserWhite->pIpRanges); + } } taosMemoryFree(pReq->pUserIpWhite); - // impl later return; } -SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { +int32_t cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq, SUpdateIpWhite **pUpdateMsg) { + int32_t code = 0; + if (pReq == NULL) { + return 0; + } SUpdateIpWhite *pClone = taosMemoryCalloc(1, sizeof(SUpdateIpWhite)); - if (pClone == NULL) return NULL; + if (pClone == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } pClone->numOfUser = pReq->numOfUser; pClone->ver = pReq->ver; - if ((pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser)) == NULL) { + pClone->pUserIpWhite = taosMemoryCalloc(1, sizeof(SUpdateUserIpWhite) * pReq->numOfUser); + if (pClone->pUserIpWhite == NULL) { taosMemoryFree(pClone); - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } for (int i = 0; i < pReq->numOfUser; i++) { @@ -1792,17 +1802,21 @@ SUpdateIpWhite *cloneSUpdateIpWhiteReq(SUpdateIpWhite *pReq) { pNew->numOfRange = pOld->numOfRange; int32_t sz = pOld->numOfRange * sizeof(SIpV4Range); - if ((pNew->pIpRanges = taosMemoryCalloc(1, sz)) == NULL) { - for (int j = 0; j < i; j++) { - taosMemoryFree(pClone->pUserIpWhite[j].pIpRanges); - } - taosMemoryFree(pClone->pUserIpWhite); - taosMemoryFree(pClone); - return NULL; + pNew->pIpRanges = taosMemoryCalloc(1, sz); + if (pNew->pIpRanges == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; } memcpy(pNew->pIpRanges, pOld->pIpRanges, sz); } - return pClone; +_return: + if (code < 0) { + tFreeSUpdateIpWhiteReq(pClone); + taosMemoryFree(pClone); + } else { + *pUpdateMsg = pClone; + } + return code; } int32_t tSerializeRetrieveIpWhite(void *buf, int32_t bufLen, SRetrieveIpWhiteReq *pReq) { SEncoder encoder = {0}; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index f0fa497eec..0c04d50927 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -65,10 +65,11 @@ static int32_t dmConvertErrCode(tmsg_t msgType, int32_t code) { return code; } static void dmUpdateRpcIpWhite(SDnodeData *pData, void *pTrans, SRpcMsg *pRpc) { + int32_t code = 0; SUpdateIpWhite ipWhite = {0}; // aosMemoryCalloc(1, sizeof(SUpdateIpWhite)); tDeserializeSUpdateIpWhite(pRpc->pCont, pRpc->contLen, &ipWhite); - rpcSetIpWhite(pTrans, &ipWhite); + code = rpcSetIpWhite(pTrans, &ipWhite); pData->ipWhiteVer = ipWhite.ver; tFreeSUpdateIpWhiteReq(&ipWhite); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index acb9bd20f3..e66941244c 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -103,11 +103,11 @@ typedef void* queue[2]; #define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) -typedef struct SRpcMsg STransMsg; -typedef SRpcCtx STransCtx; -typedef SRpcCtxVal STransCtxVal; -typedef SRpcInfo STrans; -typedef SRpcConnInfo STransHandleInfo; +typedef struct SRpcMsg STransMsg; +typedef SRpcCtx STransCtx; +typedef SRpcCtxVal STransCtxVal; +typedef SRpcInfo STrans; +typedef SRpcConnInfo STransHandleInfo; // ref mgt handle typedef struct SExHandle { @@ -250,10 +250,10 @@ typedef struct { int8_t stop; } SAsyncPool; -SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); -void transAsyncPoolDestroy(SAsyncPool* pool); -int transAsyncSend(SAsyncPool* pool, queue* mq); -bool transAsyncPoolIsEmpty(SAsyncPool* pool); +int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool); +void transAsyncPoolDestroy(SAsyncPool* pool); +int transAsyncSend(SAsyncPool* pool, queue* mq); +bool transAsyncPoolIsEmpty(SAsyncPool* pool); #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \ do { \ @@ -279,6 +279,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); if (exh2 == NULL || id != exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \ exh2 ? exh2->refId : 0, id); \ + code = terrno; \ goto _return1; \ } \ } else if (id == 0) { \ @@ -287,6 +288,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); if (exh2 == NULL || id == exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, id, \ exh2 ? exh2->refId : 0); \ + code = terrno; \ goto _return1; \ } else { \ id = exh1->refId; \ @@ -297,13 +299,13 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); } \ } while (0) -int transInitBuffer(SConnBuffer* buf); -int transClearBuffer(SConnBuffer* buf); -int transDestroyBuffer(SConnBuffer* buf); -int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); -bool transReadComplete(SConnBuffer* connBuf); -int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); +int32_t transInitBuffer(SConnBuffer* buf); +int32_t transClearBuffer(SConnBuffer* buf); +int32_t transDestroyBuffer(SConnBuffer* buf); +int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf); +bool transReadComplete(SConnBuffer* connBuf); +int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf); +int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf); int transSetConnOption(uv_tcp_t* stream, int keepalive); @@ -316,14 +318,14 @@ void transUnrefCliHandle(void* handle); int transReleaseCliHandle(void* handle); int transReleaseSrvHandle(void* handle); -int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); -int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, - int32_t timeoutMs); -int transSendResponse(const STransMsg* msg); -int transRegisterMsg(const STransMsg* msg); -int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); -void transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); +int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); +int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); +int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, + int32_t timeoutMs); +int transSendResponse(const STransMsg* msg); +int transRegisterMsg(const STransMsg* msg); +int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); +int32_t transSetIpWhiteList(void* shandle, void* arg, FilteFunc* func); int transSockInfo2Str(struct sockaddr* sockname, char* dst); @@ -363,7 +365,7 @@ typedef struct { * init queue * note: queue'size is small, default 1 */ -void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)); +int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)); /* * put arg into queue @@ -420,7 +422,7 @@ typedef struct SDelayQueue { uv_loop_t* loop; } SDelayQueue; -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); +int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); void transDQCancel(SDelayQueue* queue, SDelayTask* task); @@ -433,9 +435,9 @@ bool transEpSetIsEqual2(SEpSet* a, SEpSet* b); */ void transThreadOnce(); -void transInit(); -void transCleanup(); -void transPrintEpSet(SEpSet* pEpSet); +int32_t transInit(); +void transCleanup(); +void transPrintEpSet(SEpSet* pEpSet); void transFreeMsg(void* msg); int32_t transCompressMsg(char* msg, int32_t len); diff --git a/source/libs/transport/src/thttp.c b/source/libs/transport/src/thttp.c index 28c1c15ea3..d3fe42d2ec 100644 --- a/source/libs/transport/src/thttp.c +++ b/source/libs/transport/src/thttp.c @@ -719,9 +719,8 @@ int64_t transInitHttpChanImpl() { goto _ERROR; } - http->asyncPool = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb); - if (http->asyncPool == NULL) { - code = terrno; + code = transAsyncPoolCreate(http->loop, 1, http, httpAsyncCb, &http->asyncPool); + if (code != 0) { goto _ERROR; } diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c35f147fc5..f86c010828 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -27,18 +27,20 @@ int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transRelease static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { int32_t code = taosGetIpv4FromFqdn(localFqdn, ip); - if (code) { - terrno = TSDB_CODE_RPC_FQDN_ERROR; - return -1; + if (code != 0) { + return TSDB_CODE_RPC_FQDN_ERROR; } return 0; } void* rpcOpen(const SRpcInit* pInit) { - rpcInit(); + int32_t code = rpcInit(); + if (code != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { - return NULL; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } if (pInit->label) { int len = strlen(pInit->label) > sizeof(pRpc->label) ? sizeof(pRpc->label) : strlen(pInit->label); @@ -84,10 +86,9 @@ void* rpcOpen(const SRpcInit* pInit) { uint32_t ip = 0; if (pInit->connType == TAOS_CONN_SERVER) { - if (transValidLocalFqdn(pInit->localFqdn, &ip) != 0) { - tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, terrstr()); - taosMemoryFree(pRpc); - return NULL; + if ((code = transValidLocalFqdn(pInit->localFqdn, &ip)) != 0) { + tError("invalid fqdn:%s, errmsg:%s", pInit->localFqdn, tstrerror(code)); + TAOS_CHECK_GOTO(code, NULL, _end); } } @@ -105,14 +106,19 @@ void* rpcOpen(const SRpcInit* pInit) { (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); if (pRpc->tcphandle == NULL) { - taosMemoryFree(pRpc); - return NULL; + tError("failed to init rpc handle"); + TAOS_CHECK_GOTO(terrno, NULL, _end); } int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); transAcquireExHandle(transGetInstMgt(), refId); pRpc->refId = refId; return (void*)refId; +_end: + taosMemoryFree(pRpc); + terrno = code; + + return NULL; } void rpcClose(void* arg) { tInfo("start to close rpc"); @@ -186,7 +192,7 @@ int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { return transSetDefaultAddr(thandle, ip, fqdn); } // server only -void rpcSetIpWhite(void* thandle, void* arg) { transSetIpWhiteList(thandle, arg, NULL); } +int32_t rpcSetIpWhite(void* thandle, void* arg) { return transSetIpWhiteList(thandle, arg, NULL); } void* rpcAllocHandle() { return (void*)transAllocHandle(); } @@ -202,10 +208,8 @@ int32_t rpcCvtErrCode(int32_t code) { return code; } -int32_t rpcInit() { - transInit(); - return 0; -} +int32_t rpcInit() { return transInit(); } + void rpcCleanup(void) { transCleanup(); transHttpEnvDestroy(); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 19af63b24f..b51964083f 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -114,7 +114,7 @@ typedef struct SCliThrd { SDelayQueue* timeoutQueue; SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout - void* pTransInst; // + STrans* pTransInst; // int connCount; void (*destroyAhandleFp)(void* ahandle); @@ -180,12 +180,12 @@ static int32_t allocConnRef(SCliConn* conn, bool update); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); -static SCliConn* cliCreateConn(SCliThrd* thrd); -static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); -static void cliDestroy(uv_handle_t* handle); -static void cliSend(SCliConn* pConn); -static void cliSendBatch(SCliConn* pConn); -static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); +static int32_t cliCreateConn(SCliThrd* thrd, SCliConn** pCliConn); +static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void cliDestroy(uv_handle_t* handle); +static void cliSend(SCliConn* pConn); +static void cliSendBatch(SCliConn* pConn); +static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); static void doFreeTimeoutMsg(void* param); static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg); @@ -196,8 +196,8 @@ static FORCE_INLINE void cliMayCvtFqdnToIp(SEpSet* pEpSet, SCvtAddr* pCvtAddr); static FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* resp); -static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip); -static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn); +static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ipaddr); +static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn); static FORCE_INLINE void cliMayUpdateFqdnCache(SHashObj* cache, char* dst); // process data read from server, add decompress etc later @@ -226,10 +226,9 @@ static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); // thread obj -static SCliThrd* createThrdObj(void* trans); -static void destroyThrdObj(SCliThrd* pThrd); - -static void cliWalkCb(uv_handle_t* handle, void* arg); +static int32_t createThrdObj(void* trans, SCliThrd** pThrd); +static void destroyThrdObj(SCliThrd* pThrd); +static void cliWalkCb(uv_handle_t* handle, void* arg); #define CLI_RELEASE_UV(loop) \ do { \ @@ -714,10 +713,22 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { } STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + if (arg == NULL) { + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } arg->param1 = *pMsg; arg->param2 = pThrd; - (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + if (task == NULL) { + taosMemoryFree(arg); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } + (*pMsg)->ctx->task = task; tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); *pMsg = NULL; @@ -725,9 +736,23 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { // send msg in delay queue if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + if (arg == NULL) { + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } arg->param1 = *pMsg; arg->param2 = pThrd; - (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + + SDelayTask* task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + if (task == NULL) { + taosMemoryFree(arg); + doNotifyApp(*pMsg, pThrd, TSDB_CODE_OUT_OF_MEMORY); + *pMsg = NULL; + return NULL; + } + + (*pMsg)->ctx->task = task; tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); @@ -767,7 +792,11 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; } - allocConnRef(conn, true); + int32_t code = allocConnRef(conn, true); + if (code != 0) { + cliDestroyConn(conn, true); + return; + } SCliThrd* thrd = conn->hostThrd; if (conn->timer != NULL) { @@ -812,6 +841,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->list->size >= 10) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + if (arg == NULL) return; arg->param1 = conn; arg->param2 = thrd; @@ -827,9 +857,12 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { } SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + exh->refId = transAddExHandle(transGetRefMgt(), exh); - SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); - if (self != exh) { + if (exh->refId < 0) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; } @@ -839,8 +872,14 @@ static int32_t allocConnRef(SCliConn* conn, bool update) { exh->handle = conn; exh->pThrd = conn->hostThrd; + SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (self != exh) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + conn->refId = exh->refId; - if (conn->refId == -1) { + if (conn->refId < 0) { taosMemoryFree(exh); } return 0; @@ -871,7 +910,11 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - transAllocBuffer(pBuf, buf); + int32_t code = transAllocBuffer(pBuf, buf); + if (code < 0) { + tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code)); + // cliDestroyConn(conn, true); + } } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later @@ -909,16 +952,35 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { } } -static SCliConn* cliCreateConn(SCliThrd* pThrd) { +static int32_t cliCreateConn(SCliThrd* pThrd, SCliConn** pCliConn) { + int32_t code = 0; SCliConn* conn = taosMemoryCalloc(1, sizeof(SCliConn)); + if (conn == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + // read/write stream handle conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + if (conn->stream == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, NULL, _failed); + } + + code = uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + if (code != 0) { + tError("failed to init tcp handle, code:%d, %s", code, uv_strerror(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + TAOS_CHECK_GOTO(code, NULL, _failed); + } conn->stream->data = conn; uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _failed); + } + tDebug("no available timer, create a timer %p", timer); uv_timer_init(pThrd->loop, timer); } @@ -927,8 +989,11 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->timer = timer; conn->connReq.data = conn; transReqQueueInit(&conn->wreqQueue); - transQueueInit(&conn->cliMsgs, NULL); - transInitBuffer(&conn->readBuf); + + TAOS_CHECK_GOTO(transQueueInit(&conn->cliMsgs, NULL), NULL, _failed); + + TAOS_CHECK_GOTO(transInitBuffer(&conn->readBuf), NULL, _failed); + QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; @@ -936,9 +1001,20 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { transRefCliHandle(conn); atomic_add_fetch_32(&pThrd->connCount, 1); - allocConnRef(conn, false); - return conn; + TAOS_CHECK_GOTO(allocConnRef(conn, false), NULL, _failed); + + *pCliConn = conn; + return code; +_failed: + if (conn) { + taosMemoryFree(conn->stream); + transReqQueueClear(&conn->wreqQueue); + transDestroyBuffer(&conn->readBuf); + transQueueDestroy(&conn->cliMsgs); + } + taosMemoryFree(conn); + return code; } static void cliDestroyConn(SCliConn* conn, bool clear) { SCliThrd* pThrd = conn->hostThrd; @@ -1225,6 +1301,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) { taosMemoryFree(pBatch); } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { + int32_t code = 0; if (pThrd->quit == true) { cliDestroyBatch(pBatch); return; @@ -1249,22 +1326,33 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { return; } if (conn == NULL) { - conn = cliCreateConn(pThrd); + code = cliCreateConn(pThrd, &conn); + if (code != 0) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d, reason:%s", pTransInst->label, + pBatch->wLen, pBatch->batchSize, pTransInst->connLimitNum, tstrerror(code)); + cliDestroyBatch(pBatch); + return; + } + conn->pBatch = pBatch; conn->dstAddr = taosStrdup(pList->dst); + if (conn->dstAddr == NULL) { + tError("%s conn %p failed to send batch msg, reason:%s", transLabel(pTransInst), conn, + tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + cliHandleFastFail(conn, -1); + return; + } uint32_t ipaddr = 0; - int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr); - if (code) { + if ((code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip, &ipaddr)) != 0) { uv_timer_stop(conn->timer); conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - - cliHandleFastFail(conn, terrno); - terrno = 0; + cliHandleFastFail(conn, code); return; } + struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = ipaddr; @@ -1553,37 +1641,45 @@ FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx) { FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { if (pMsg == NULL) return -1; - memset(pResp, 0, sizeof(STransMsg)); + // memset(pResp, 0, sizeof(STransMsg)); - pResp->code = TSDB_CODE_RPC_BROKEN_LINK; + if (pResp->code == 0) { + pResp->code = TSDB_CODE_RPC_BROKEN_LINK; + } pResp->msgType = pMsg->msg.msgType + 1; pResp->info.ahandle = pMsg->ctx ? pMsg->ctx->ahandle : NULL; pResp->info.traceId = pMsg->msg.info.traceId; return 0; } + static FORCE_INLINE int32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn, uint32_t* ip) { + int32_t code = 0; + uint32_t addr = 0; size_t len = strlen(fqdn); uint32_t* v = taosHashGet(cache, fqdn, len); if (v == NULL) { - int32_t code = taosGetIpv4FromFqdn(fqdn, ip); - if (code) { - tError("failed to get ip from fqdn:%s since %s", fqdn, terrstr()); + code = taosGetIpv4FromFqdn(fqdn, &addr); + if (code != 0) { + code = TSDB_CODE_RPC_FQDN_ERROR; + tError("failed to get ip from fqdn:%s since %s", fqdn, tstrerror(code)); return code; } - taosHashPut(cache, fqdn, len, ip, sizeof(*ip)); + if ((code = taosHashPut(cache, fqdn, len, &addr, sizeof(addr)) != 0)) { + return code; + } + *ip = addr; } else { *ip = *v; } - - return TSDB_CODE_SUCCESS; + return 0; } -static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { +static FORCE_INLINE int32_t cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { // impl later uint32_t addr = 0; - int32_t code = taosGetIpv4FromFqdn(fqdn, &addr); - if (TSDB_CODE_SUCCESS == code) { + int32_t code = taosGetIpv4FromFqdn(fqdn, &addr); + if (code == 0) { size_t len = strlen(fqdn); uint32_t* v = taosHashGet(cache, fqdn, len); if (addr != *v) { @@ -1591,10 +1687,12 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { tinet_ntoa(old, *v); tinet_ntoa(new, addr); tWarn("update ip of fqdn:%s, old: %s, new: %s", fqdn, old, new); - taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); + code = taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); } + } else { + code = TSDB_CODE_RPC_FQDN_ERROR; // TSDB_CODE_RPC_INVALID_FQDN; } - return; + return code; } static void cliMayUpdateFqdnCache(SHashObj* cache, char* dst) { @@ -1623,7 +1721,9 @@ static void doFreeTimeoutMsg(void* param) { doNotifyApp(pMsg, pThrd, code); taosMemoryFree(arg); } + void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { + int32_t code = 0; STrans* pTransInst = pThrd->pTransInst; cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); @@ -1641,7 +1741,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr); if (ignore == true) { // persist conn already release by server - STransMsg resp; + STransMsg resp = {0}; cliBuildExceptResp(pMsg, &resp); // refactorr later resp.info.cliVer = pTransInst->compatibilityVer; @@ -1662,7 +1762,19 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); } else { - conn = cliCreateConn(pThrd); + code = cliCreateConn(pThrd, &conn); + if (code != 0) { + tError("%s failed to create conn, reason:%s", pTransInst->label, tstrerror(code)); + STransMsg resp = {.code = code}; + cliBuildExceptResp(pMsg, &resp); + + resp.info.cliVer = pTransInst->compatibilityVer; + if (pMsg->type != Release) { + pTransInst->cfp(pTransInst->parent, &resp, NULL); + } + destroyCmsg(pMsg); + return; + } int64_t refId = (int64_t)pMsg->msg.info.handle; if (refId != 0) specifyConnRef(conn, true, refId); @@ -1672,16 +1784,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { conn->dstAddr = taosStrdup(addr); - uint32_t ipaddr = 0; - int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); - if (code) { + uint32_t ipaddr; + int32_t code = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn, &ipaddr); + if (code != 0) { uv_timer_stop(conn->timer); conn->timer->data = NULL; taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; - cliHandleExcept(conn, terrno); - terrno = 0; + cliHandleExcept(conn, code); return; } @@ -1699,12 +1810,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { errno = 0; return; } + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { tGError("%s conn %p failed to set stream, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); cliHandleExcept(conn, -1); return; } + ret = transSetConnOption((uv_tcp_t*)conn->stream, tsKeepAliveIdle); if (ret != 0) { tGError("%s conn %p failed to set socket opt, reason:%s", transLabel(pTransInst), conn, uv_err_name(ret)); @@ -1981,12 +2094,12 @@ bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead) { } static void* cliWorkThread(void* arg) { + char threadName[TSDB_LABEL_LEN] = {0}; + SCliThrd* pThrd = (SCliThrd*)arg; pThrd->pid = taosGetSelfPthreadId(); - char threadName[TSDB_LABEL_LEN] = {0}; - STrans* pInst = pThrd->pTransInst; - strtolower(threadName, pInst->label); + strtolower(threadName, pThrd->pTransInst->label); setThreadName(threadName); uv_run(pThrd->loop, UV_RUN_DEFAULT); @@ -1996,22 +2109,32 @@ static void* cliWorkThread(void* arg) { } void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + int32_t code = 0; SCliObj* cli = taosMemoryCalloc(1, sizeof(SCliObj)); + if (cli == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); + } STrans* pTransInst = shandle; memcpy(cli->label, label, TSDB_LABEL_LEN); cli->numOfThreads = numOfThreads; + cli->pThreadObj = (SCliThrd**)taosMemoryCalloc(cli->numOfThreads, sizeof(SCliThrd*)); + if (cli->pThreadObj == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _err); + } for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrd* pThrd = createThrdObj(shandle); - if (pThrd == NULL) { + SCliThrd* pThrd = NULL; + code = createThrdObj(shandle, &pThrd); + if (code != 0) { goto _err; } int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); if (err != 0) { - goto _err; + code = TAOS_SYSTEM_ERROR(errno); + TAOS_CHECK_GOTO(code, NULL, _err); } else { tDebug("success to create tranport-cli thread:%d", i); } @@ -2020,8 +2143,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, return cli; _err: - taosMemoryFree(cli->pThreadObj); - taosMemoryFree(cli); + if (cli) { + taosMemoryFree(cli->pThreadObj); + taosMemoryFree(cli); + } + terrno = code; return NULL; } @@ -2037,11 +2163,9 @@ static FORCE_INLINE void destroyCmsg(void* arg) { taosMemoryFree(pMsg); } static FORCE_INLINE void destroyCmsgWrapper(void* arg, void* param) { - SCliMsg* pMsg = arg; - if (pMsg == NULL) { - return; - } + if (arg == NULL) return; + SCliMsg* pMsg = arg; SCliThrd* pThrd = param; if (pMsg->msg.info.notFreeAhandle == 0 && pThrd != NULL) { if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle); @@ -2064,67 +2188,124 @@ static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { taosMemoryFree(pMsg); } -static SCliThrd* createThrdObj(void* trans) { +static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) { + int32_t code = 0; STrans* pTransInst = trans; SCliThrd* pThrd = (SCliThrd*)taosMemoryCalloc(1, sizeof(SCliThrd)); + if (pThrd == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } QUEUE_INIT(&pThrd->msg); taosThreadMutexInit(&pThrd->msgMtx, NULL); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - int err = uv_loop_init(pThrd->loop); - if (err != 0) { - tError("failed to init uv_loop, reason:%s", uv_err_name(err)); - taosMemoryFree(pThrd->loop); - taosThreadMutexDestroy(&pThrd->msgMtx); - taosMemoryFree(pThrd); - return NULL; + if (pThrd->loop == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); } - int32_t nSync = pTransInst->supportBatch ? 4 : 8; - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb); - if (pThrd->asyncPool == NULL) { - tError("failed to init async pool"); - uv_loop_close(pThrd->loop); - taosMemoryFree(pThrd->loop); - taosThreadMutexDestroy(&pThrd->msgMtx); - taosMemoryFree(pThrd); - return NULL; + code = uv_loop_init(pThrd->loop); + if (code != 0) { + tError("failed to init uv_loop, reason:%s", uv_err_name(code)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); + } + + int32_t nSync = pTransInst->supportBatch ? 4 : 8; + code = transAsyncPoolCreate(pThrd->loop, nSync, pThrd, cliAsyncCb, &pThrd->asyncPool); + if (code != 0) { + tError("failed to init async pool since:%s", tstrerror(code)); + TAOS_CHECK_GOTO(code, NULL, _end); } pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - uv_prepare_init(pThrd->loop, pThrd->prepare); + if (pThrd->prepare == NULL) { + tError("failed to create prepre since:%s", tstrerror(code)); + TAOS_CHECK_GOTO(code, NULL, _end); + } + + code = uv_prepare_init(pThrd->loop, pThrd->prepare); + if (code != 0) { + tError("failed to create prepre since:%s", uv_err_name(code)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); + } pThrd->prepare->data = pThrd; - // uv_prepare_start(pThrd->prepare, cliPrepareCb); int32_t timerSize = 64; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); + if (pThrd->timerList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } uv_timer_init(pThrd->loop, timer); taosArrayPush(pThrd->timerList, &timer); } pThrd->pool = createConnPool(4); - transDQCreate(pThrd->loop, &pThrd->delayQueue); + if (pThrd->pool == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + if ((code = transDQCreate(pThrd->loop, &pThrd->delayQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } - transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + if ((code = transDQCreate(pThrd->loop, &pThrd->timeoutQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } - transDQCreate(pThrd->loop, &pThrd->waitConnQueue); - - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); - pThrd->pTransInst = trans; + if ((code = transDQCreate(pThrd->loop, &pThrd->waitConnQueue)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->fqdn2ipCache == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->failFastCache == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (pThrd->batchCache == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); + pThrd->pTransInst = trans; pThrd->quit = false; - return pThrd; + *ppThrd = pThrd; + return code; + +_end: + if (pThrd) { + uv_loop_close(pThrd->loop); + taosMemoryFree(pThrd->loop); + taosMemoryFree(pThrd->prepare); + taosThreadMutexDestroy(&pThrd->msgMtx); + transAsyncPoolDestroy(pThrd->asyncPool); + for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { + uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i); + taosMemoryFree(timer); + } + taosArrayDestroy(pThrd->timerList); + taosMemoryFree(pThrd->prepare); + taosHashCleanup(pThrd->fqdn2ipCache); + taosHashCleanup(pThrd->failFastCache); + taosHashCleanup(pThrd->batchCache); + + taosMemoryFree(pThrd); + } + return code; } static void destroyThrdObj(SCliThrd* pThrd) { if (pThrd == NULL) { @@ -2177,12 +2358,23 @@ static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx) { taosMemoryFree(ctx); } -void cliSendQuit(SCliThrd* thrd) { +int32_t cliSendQuit(SCliThrd* thrd) { // cli can stop gracefully + int32_t code = 0; SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (msg == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + msg->type = Quit; - transAsyncSend(thrd->asyncPool, &msg->q); + if ((code = transAsyncSend(thrd->asyncPool, &msg->q)) != 0) { + code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + taosMemoryFree(msg); + return code; + } + atomic_store_8(&thrd->asyncPool->stop, 1); + return 0; } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { @@ -2506,9 +2698,14 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { } void transCloseClient(void* arg) { + int32_t code = 0; SCliObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { - cliSendQuit(cli->pThreadObj[i]); + code = cliSendQuit(cli->pThreadObj[i]); + if (code != 0) { + tError("failed to send quit to thread:%d, reason:%s", i, tstrerror(code)); + } + destroyThrdObj(cli->pThreadObj[i]); } taosMemoryFree(cli->pThreadObj); @@ -2563,9 +2760,7 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { return pThrd; } int transReleaseCliHandle(void* handle) { - int idx = -1; - bool valid = false; - + int32_t code = 0; SCliThrd* pThrd = transGetWorkThrdFromHandle(NULL, (int64_t)handle); if (pThrd == NULL) { return TSDB_CODE_RPC_BROKEN_LINK; @@ -2575,9 +2770,17 @@ int transReleaseCliHandle(void* handle) { TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCtx->ahandle = tmsg.info.ahandle; SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cmsg == NULL) { + taosMemoryFree(pCtx); + return TSDB_CODE_OUT_OF_MEMORY; + } cmsg->msg = tmsg; cmsg->st = taosGetTimestampUs(); cmsg->type = Release; @@ -2586,15 +2789,20 @@ int transReleaseCliHandle(void* handle) { STraceId* trace = &tmsg.info.traceId; tGDebug("send release request at thread:%08" PRId64 ", malloc memory:%p", pThrd->pid, cmsg); - if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &cmsg->q)) != 0) { destroyCmsg(cmsg); - return TSDB_CODE_RPC_BROKEN_LINK; + return code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code; } - return 0; + return code; } -static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { + +static int32_t transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx, SCliMsg** pCliMsg) { TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); @@ -2604,13 +2812,20 @@ static SCliMsg* transInitMsg(void* shandle, const SEpSet* pEpSet, STransMsg* pRe if (ctx != NULL) pCtx->appCtx = *ctx; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + return TSDB_CODE_OUT_OF_MEMORY; + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; cliMsg->refId = (int64_t)shandle; QUEUE_INIT(&cliMsg->seqq); - return cliMsg; + *pCliMsg = cliMsg; + + return 0; } int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { @@ -2619,7 +2834,7 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transFreeMsg(pReq->pCont); return TSDB_CODE_RPC_BROKEN_LINK; } - + int32_t code = 0; int64_t handle = (int64_t)pReq->info.handle; SCliThrd* pThrd = transGetWorkThrd(pTransInst, handle); if (pThrd == NULL) { @@ -2632,7 +2847,10 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran if (exh != NULL) { taosWLockLatch(&exh->latch); if (exh->handle == NULL && exh->inited != 0) { - SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + SCliMsg* pCliMsg = NULL; + code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); + ASSERT(code == 0); + QUEUE_PUSH(&exh->q, &pCliMsg->seqq); taosWUnLockLatch(&exh->latch); tDebug("msg refId: %" PRId64 "", handle); @@ -2644,43 +2862,65 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran transReleaseExHandle(transGetRefMgt(), handle); } } - SCliMsg* pCliMsg = transInitMsg(shandle, pEpSet, pReq, ctx); + + SCliMsg* pCliMsg = NULL; + code = transInitMsg(shandle, pEpSet, pReq, ctx, &pCliMsg); + if (code != 0) { + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; + } STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(pEpSet), EPSET_GET_INUSE_PORT(pEpSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) { + if ((code = transAsyncSend(pThrd->asyncPool, &(pCliMsg->q))) != 0) { destroyCmsg(pCliMsg); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + return (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; } int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - taosMemoryFree(pTransRsp); return TSDB_CODE_RPC_BROKEN_LINK; } + int32_t code = 0; + + STransMsg* pTransRsp = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransRsp == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - taosMemoryFree(pTransRsp); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN1); } tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); - tsem_init(sem, 0, 0); + if (sem == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + + code = tsem_init(sem, 0, 0); + if (code != 0) { + taosMemoryFree(sem); + code = TAOS_SYSTEM_ERROR(errno); + TAOS_CHECK_GOTO(code, NULL, _RETURN1); + } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + tsem_destroy(sem); + taosMemoryFree(sem); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); pCtx->ahandle = pReq->info.ahandle; @@ -2689,6 +2929,13 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs pCtx->pRsp = pTransRsp; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + tsem_destroy(sem); + taosMemoryFree(sem); + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN1); + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); @@ -2699,11 +2946,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); - if (ret != 0) { + code = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (code != 0) { destroyCmsg(cliMsg); - ret = TSDB_CODE_RPC_BROKEN_LINK; - goto _RETURN; + TAOS_CHECK_GOTO((code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code), NULL, _RETURN); } tsem_wait(sem); @@ -2714,13 +2960,28 @@ _RETURN: taosMemoryFree(sem); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); taosMemoryFree(pTransRsp); - return ret; + return code; +_RETURN1: + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + taosMemoryFree(pTransRsp); + taosMemoryFree(pReq->pCont); + return code; } -int64_t transCreateSyncMsg(STransMsg* pTransMsg) { +int32_t transCreateSyncMsg(STransMsg* pTransMsg, int64_t* refId) { + int32_t code = 0; tsem2_t* sem = taosMemoryCalloc(1, sizeof(tsem2_t)); - tsem2_init(sem, 0, 0); + if (sem == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (tsem2_init(sem, 0, 0) != 0) { + TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), NULL, _EXIT); + } STransSyncMsg* pSyncMsg = taosMemoryCalloc(1, sizeof(STransSyncMsg)); + if (pSyncMsg == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _EXIT); + } taosInitRWLatch(&pSyncMsg->latch); pSyncMsg->inited = 0; @@ -2728,39 +2989,69 @@ int64_t transCreateSyncMsg(STransMsg* pTransMsg) { pSyncMsg->pSem = sem; pSyncMsg->hasEpSet = 0; - return taosAddRef(transGetSyncMsgMgt(), pSyncMsg); + int64_t id = taosAddRef(transGetSyncMsgMgt(), pSyncMsg); + if (id < 0) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _EXIT); + } else { + *refId = id; + } + return 0; + +_EXIT: + tsem2_destroy(sem); + taosMemoryFree(sem); + taosMemoryFree(pSyncMsg); + return code; } int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, int32_t timeoutMs) { - STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); - STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + int32_t code = 0; + STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); if (pTransInst == NULL) { transFreeMsg(pReq->pCont); - taosMemoryFree(pTransMsg); return TSDB_CODE_RPC_BROKEN_LINK; } + STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); + if (pTransMsg == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); if (pThrd == NULL) { - transFreeMsg(pReq->pCont); - taosMemoryFree(pTransMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(TSDB_CODE_RPC_BROKEN_LINK, NULL, _RETURN2); } TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + epsetAssign(&pCtx->epSet, pEpSet); epsetAssign(&pCtx->origEpSet, pEpSet); pCtx->ahandle = pReq->info.ahandle; pCtx->msgType = pReq->msgType; - pCtx->syncMsgRef = transCreateSyncMsg(pTransMsg); + + if ((code = transCreateSyncMsg(pTransMsg, &pCtx->syncMsgRef)) != 0) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(code, NULL, _RETURN2); + } int64_t ref = pCtx->syncMsgRef; STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), ref); + if (pSyncMsg == NULL) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _RETURN2); + } SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _RETURN2); + } + cliMsg->ctx = pCtx; cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); @@ -2771,17 +3062,17 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); - if (ret != 0) { + code = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (code != 0) { destroyCmsg(cliMsg); - ret = TSDB_CODE_RPC_BROKEN_LINK; + TAOS_CHECK_GOTO(code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code, NULL, _RETURN); goto _RETURN; } - ret = tsem2_timewait(pSyncMsg->pSem, timeoutMs); - if (ret < 0) { + code = tsem2_timewait(pSyncMsg->pSem, timeoutMs); + if (code < 0) { pRsp->code = TSDB_CODE_TIMEOUT_ERROR; - ret = TSDB_CODE_TIMEOUT_ERROR; + code = TSDB_CODE_TIMEOUT_ERROR; } else { memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); pSyncMsg->pRsp->pCont = NULL; @@ -2789,13 +3080,18 @@ int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STr epsetAssign(pEpSet, &pSyncMsg->epSet); *epUpdated = 1; } - ret = 0; + code = 0; } _RETURN: transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); taosReleaseRef(transGetSyncMsgMgt(), ref); taosRemoveRef(transGetSyncMsgMgt(), ref); - return ret; + return code; +_RETURN2: + transFreeMsg(pReq->pCont); + taosMemoryFree(pTransMsg); + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + return code; } /* * @@ -2812,11 +3108,25 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { tstrncpy(cvtAddr.fqdn, fqdn, sizeof(cvtAddr.fqdn)); cvtAddr.cvt = true; } - for (int i = 0; i < pTransInst->numOfThreads; i++) { + + int32_t code = 0; + int8_t i = 0; + for (i = 0; i < pTransInst->numOfThreads; i++) { STransConnCtx* pCtx = taosMemoryCalloc(1, sizeof(STransConnCtx)); + if (pCtx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pCtx->cvtAddr = cvtAddr; SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + if (cliMsg == NULL) { + taosMemoryFree(pCtx); + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + cliMsg->ctx = pCtx; cliMsg->type = Update; cliMsg->refId = (int64_t)shandle; @@ -2824,21 +3134,30 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid); - if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) { + if ((code = transAsyncSend(thrd->asyncPool, &(cliMsg->q))) != 0) { destroyCmsg(cliMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); + code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + break; } } + transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return 0; + return code; } int64_t transAllocHandle() { SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + SExHandle* self = transAcquireExHandle(transGetRefMgt(), exh->refId); - ASSERT(exh == self); if (exh != self) { taosMemoryFree(exh); return TSDB_CODE_REF_INVALID_ID; @@ -2847,6 +3166,5 @@ int64_t transAllocHandle() { QUEUE_INIT(&exh->q); taosInitRWLatch(&exh->latch); tDebug("pre alloc refId %" PRId64 "", exh->refId); - return exh->refId; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index b9223e7b39..aa6b377423 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -67,7 +67,11 @@ int32_t transDecompressMsg(char** msg, int32_t len) { STransCompMsg* pComp = (STransCompMsg*)pCont; int32_t oriLen = htonl(pComp->contLen); - char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead)); + char* buf = taosMemoryCalloc(1, oriLen + sizeof(STransMsgHead)); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + STransMsgHead* pNewHead = (STransMsgHead*)buf; int32_t decompLen = LZ4_decompress_safe(pCont + sizeof(STransCompMsg), (char*)pNewHead->content, len - sizeof(STransMsgHead) - sizeof(STransCompMsg), oriLen); @@ -78,7 +82,7 @@ int32_t transDecompressMsg(char** msg, int32_t len) { taosMemoryFree(pHead); *msg = buf; if (decompLen != oriLen) { - return -1; + return TSDB_CODE_INVALID_MSG; } return 0; } @@ -98,26 +102,33 @@ int transSockInfo2Str(struct sockaddr* sockname, char* dst) { sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); return r; } -int transInitBuffer(SConnBuffer* buf) { - buf->cap = BUFFER_CAP; +int32_t transInitBuffer(SConnBuffer* buf) { buf->buf = taosMemoryCalloc(1, BUFFER_CAP); + if (buf->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + buf->cap = BUFFER_CAP; buf->left = -1; buf->len = 0; buf->total = 0; buf->invalid = 0; return 0; } -int transDestroyBuffer(SConnBuffer* p) { +int32_t transDestroyBuffer(SConnBuffer* p) { taosMemoryFree(p->buf); p->buf = NULL; return 0; } -int transClearBuffer(SConnBuffer* buf) { +int32_t transClearBuffer(SConnBuffer* buf) { SConnBuffer* p = buf; if (p->cap > BUFFER_CAP) { p->cap = BUFFER_CAP; p->buf = taosMemoryRealloc(p->buf, BUFFER_CAP); + if (p->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } p->left = -1; p->len = 0; @@ -126,27 +137,31 @@ int transClearBuffer(SConnBuffer* buf) { return 0; } -int transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) { +int32_t transDumpFromBuffer(SConnBuffer* connBuf, char** buf, int8_t resetBuf) { static const int HEADSIZE = sizeof(STransMsgHead); - - SConnBuffer* p = connBuf; + int32_t code = 0; + SConnBuffer* p = connBuf; if (p->left != 0 || p->total <= 0) { - return -1; + return TSDB_CODE_INVALID_MSG; } int total = p->total; if (total >= HEADSIZE && !p->invalid) { *buf = taosMemoryCalloc(1, total); + if (*buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } memcpy(*buf, p->buf, total); - if (transResetBuffer(connBuf, resetBuf) < 0) { - return -1; + if ((code = transResetBuffer(connBuf, resetBuf)) < 0) { + return code; } } else { total = -1; + return TSDB_CODE_INVALID_MSG; } return total; } -int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { +int32_t transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { SConnBuffer* p = connBuf; if (p->total < p->len) { int left = p->len - p->total; @@ -162,15 +177,18 @@ int transResetBuffer(SConnBuffer* connBuf, int8_t resetBuf) { if (resetBuf) { p->cap = BUFFER_CAP; p->buf = taosMemoryRealloc(p->buf, p->cap); + if (p->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } } } else { ASSERTS(0, "invalid read from sock buf"); - return -1; + return TSDB_CODE_INVALID_MSG; } return 0; } -int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { +int32_t transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { /* * formate of data buffer: * |<--------------------------data from socket------------------------------->| @@ -187,6 +205,9 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { } else { p->cap = p->left + p->len; p->buf = taosMemoryRealloc(p->buf, p->cap); + if (p->buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } uvBuf->base = p->buf + p->len; uvBuf->len = p->left; } @@ -222,19 +243,19 @@ int transSetConnOption(uv_tcp_t* stream, int keepalive) { // int ret = uv_tcp_keepalive(stream, 5, 60); } -SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { +int32_t transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb, SAsyncPool** pPool) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); if (pool == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; + // return NULL; } + int32_t code = 0; pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); if (pool->asyncs == NULL) { taosMemoryFree(pool); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } int i = 0, err = 0; @@ -243,7 +264,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem)); if (item == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; break; } item->pThrd = arg; @@ -254,7 +275,7 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) err = uv_async_init(loop, async, cb); if (err != 0) { tError("failed to init async, reason:%s", uv_err_name(err)); - terrno = TSDB_CODE_THIRDPARTY_ERROR; + code = TSDB_CODE_THIRDPARTY_ERROR; break; } } @@ -264,10 +285,14 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) pool = NULL; } - return pool; + *pPool = pool; + return 0; + // return pool; } void transAsyncPoolDestroy(SAsyncPool* pool) { + if (pool == NULL) return; + for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); SAsyncItem* item = async->data; @@ -289,7 +314,7 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool) { } int transAsyncSend(SAsyncPool* pool, queue* q) { if (atomic_load_8(&pool->stop) == 1) { - return -1; + return TSDB_CODE_RPC_ASYNC_MODULE_QUIT; } int idx = pool->index % pool->nAsync; @@ -303,7 +328,12 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { taosThreadMutexLock(&item->mtx); QUEUE_PUSH(&item->qmsg, q); taosThreadMutexUnlock(&item->mtx); - return uv_async_send(async); + int ret = uv_async_send(async); + if (ret != 0) { + tError("failed to send async,reason:%s", uv_err_name(ret)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + return 0; } void transCtxInit(STransCtx* ctx) { @@ -408,9 +438,14 @@ void transReqQueueClear(queue* q) { } } -void transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { +int32_t transQueueInit(STransQueue* queue, void (*freeFunc)(const void* arg)) { queue->q = taosArrayInit(2, sizeof(void*)); queue->freeFunc = (void (*)(const void*))freeFunc; + + if (queue->q == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return 0; } bool transQueuePush(STransQueue* queue, void* arg) { if (queue->q == NULL) { @@ -513,20 +548,44 @@ static void transDQTimeout(uv_timer_t* timer) { uv_timer_start(queue->timer, transDQTimeout, timeout, 0); } } -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) { - uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); - uv_timer_init(loop, timer); +int32_t transDQCreate(uv_loop_t* loop, SDelayQueue** queue) { + int32_t code = 0; + Heap* heap = NULL; + uv_timer_t* timer = NULL; + SDelayQueue* q = NULL; - Heap* heap = heapCreate(timeCompare); + timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + if (timer == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - SDelayQueue* q = taosMemoryCalloc(1, sizeof(SDelayQueue)); + heap = heapCreate(timeCompare); + if (heap == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _return1); + } + + q = taosMemoryCalloc(1, sizeof(SDelayQueue)); + if (q == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _return1); + } q->heap = heap; q->timer = timer; q->loop = loop; q->timer->data = q; + int err = uv_timer_init(loop, timer); + if (err != 0) { + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _return1); + } + *queue = q; return 0; + +_return1: + taosMemoryFree(timer); + heapDestroy(heap); + taosMemoryFree(q); + return TSDB_CODE_OUT_OF_MEMORY; } void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { @@ -578,6 +637,10 @@ void transDQCancel(SDelayQueue* queue, SDelayTask* task) { SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { uint64_t now = taosGetTimestampMs(); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); + if (task == NULL) { + return NULL; + } + task->func = func; task->arg = arg; task->execTime = now + timeoutMs; @@ -648,9 +711,13 @@ static void transDestroyEnv() { transCloseRefMgt(transSyncMsgMgt); } -void transInit() { +int32_t transInit() { // init env - taosThreadOnce(&transModuleInit, transInitEnv); + int32_t code = taosThreadOnce(&transModuleInit, transInitEnv); + if (code != 0) { + code = TAOS_SYSTEM_ERROR(errno); + } + return code; } int32_t transGetRefMgt() { return refMgt; } @@ -709,29 +776,6 @@ void transDestroySyncMsg(void* msg) { taosMemoryFree(pSyncMsg); } -// void subnetIp2int(const char* const ip_addr, uint8_t* dst) { -// char ip_addr_cpy[20]; -// char ip[5]; - -// tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy)); - -// char *s_start, *s_end; -// s_start = ip_addr_cpy; -// s_end = ip_addr_cpy; - -// int32_t k = 0; - -// for (k = 0; *s_start != '\0'; s_start = s_end) { -// for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) { -// } -// if (*s_end == '.') { -// *s_end = '\0'; -// s_end++; -// } -// dst[k++] = (char)atoi(s_start); -// } -// } - uint32_t subnetIpRang2Int(SIpV4Range* pRange) { uint32_t ip = pRange->ip; return ((ip & 0xFF) << 24) | ((ip & 0xFF00) << 8) | ((ip & 0xFF0000) >> 8) | ((ip >> 24) & 0xFF); @@ -778,7 +822,11 @@ int32_t transUtilSIpRangeToStr(SIpV4Range* pRange, char* buf) { struct in_addr addr; addr.s_addr = pRange->ip; - uv_inet_ntop(AF_INET, &addr, buf, 32); + int32_t err = uv_inet_ntop(AF_INET, &addr, buf, 32); + if (err != 0) { + tError("failed to convert ip to string, reason:%s", uv_strerror(err)); + return TSDB_CODE_THIRDPARTY_ERROR; + } len = strlen(buf); @@ -794,14 +842,23 @@ int32_t transUtilSWhiteListToStr(SIpWhiteList* pList, char** ppBuf) { *ppBuf = NULL; return 0; } + int32_t len = 0; char* pBuf = taosMemoryCalloc(1, pList->num * 36); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } for (int i = 0; i < pList->num; i++) { SIpV4Range* pRange = &pList->pIpRange[i]; char tbuf[32] = {0}; int tlen = transUtilSIpRangeToStr(pRange, tbuf); + if (tlen < 0) { + taosMemoryFree(pBuf); + return tlen; + } + len += sprintf(pBuf + len, "%s,", tbuf); } if (len > 0) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 674bb86fb5..2f6c687df2 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -122,7 +122,7 @@ typedef struct SServerObj { SIpWhiteListTab* uvWhiteListCreate(); void uvWhiteListDestroy(SIpWhiteListTab* pWhite); -void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver); +int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* pList, int64_t ver); void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable); bool uvWhiteListCheckConn(SIpWhiteListTab* pWhite, SSvrConn* pConn); bool uvWhiteListFilte(SIpWhiteListTab* pWhite, char* user, uint32_t ip, int64_t ver); @@ -164,7 +164,7 @@ static FORCE_INLINE SSvrConn* createConn(void* hThrd); static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn); -static int reallocConnRef(SSvrConn* conn); +static int32_t reallocConnRef(SSvrConn* conn); static void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd); static void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd); @@ -181,8 +181,8 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); -static bool addHandleToAcceptloop(void* arg); +static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); +static int32_t addHandleToAcceptloop(void* arg); #define SRV_RELEASE_UV(loop) \ do { \ @@ -202,7 +202,11 @@ static bool addHandleToAcceptloop(void* arg); void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SSvrConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - transAllocBuffer(pBuf, buf); + int32_t code = transAllocBuffer(pBuf, buf); + if (code < 0) { + tError("conn %p failed to alloc buffer, since %s", conn, tstrerror(code)); + // destroyConn(conn, true); + } } // refers specifically to query or insert timeout @@ -221,8 +225,16 @@ static bool uvCheckIp(SIpV4Range* pRange, int32_t ip) { } SIpWhiteListTab* uvWhiteListCreate() { SIpWhiteListTab* pWhiteList = taosMemoryCalloc(1, sizeof(SIpWhiteListTab)); + if (pWhiteList == NULL) { + return NULL; + } pWhiteList->pList = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), 0, HASH_NO_LOCK); + if (pWhiteList->pList == NULL) { + taosMemoryFree(pWhiteList); + return NULL; + } + pWhiteList->ver = -1; return pWhiteList; } @@ -240,17 +252,26 @@ void uvWhiteListDestroy(SIpWhiteListTab* pWhite) { taosMemoryFree(pWhite); } -void uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) { +int32_t uvWhiteListToStr(SWhiteUserList* plist, char* user, char** ppBuf) { char* tmp = NULL; int32_t tlen = transUtilSWhiteListToStr(plist->pList, &tmp); + if (tlen < 0) { + return tlen; + } + + char* pBuf = taosMemoryCalloc(1, tlen + 64); + if (pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } - char* pBuf = taosMemoryCalloc(1, tlen + 64); int32_t len = sprintf(pBuf, "user: %s, ver: %" PRId64 ", ip: {%s}", user, plist->ver, tmp); taosMemoryFree(tmp); *ppBuf = pBuf; + return len; } void uvWhiteListDebug(SIpWhiteListTab* pWrite) { + int32_t code = 0; SHashObj* pWhiteList = pWrite->pList; void* pIter = taosHashIterate(pWhiteList, NULL); while (pIter) { @@ -262,23 +283,35 @@ void uvWhiteListDebug(SIpWhiteListTab* pWrite) { SWhiteUserList* pUserList = *(SWhiteUserList**)pIter; char* buf = NULL; - uvWhiteListToStr(pUserList, user, &buf); - tDebug("ip-white-list %s", buf); + + code = uvWhiteListToStr(pUserList, user, &buf); + if (code != 0) { + tDebug("ip-white-list failed to debug to str since %s", buf); + } taosMemoryFree(buf); pIter = taosHashIterate(pWhiteList, pIter); } } -void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) { +int32_t uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, int64_t ver) { + int32_t code = 0; SHashObj* pWhiteList = pWhite->pList; SWhiteUserList** ppUserList = taosHashGet(pWhiteList, user, strlen(user)); if (ppUserList == NULL || *ppUserList == NULL) { SWhiteUserList* pUserList = taosMemoryCalloc(1, sizeof(SWhiteUserList)); + if (pUserList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pUserList->ver = ver; pUserList->pList = plist; - taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*)); + code = taosHashPut(pWhiteList, user, strlen(user), &pUserList, sizeof(void*)); + if (code != 0) { + taosMemoryFree(pUserList); + return code; + } } else { SWhiteUserList* pUserList = *ppUserList; @@ -287,6 +320,7 @@ void uvWhiteListAdd(SIpWhiteListTab* pWhite, char* user, SIpWhiteList* plist, in pUserList->pList = plist; } uvWhiteListDebug(pWhite); + return 0; } void uvWhiteListUpdate(SIpWhiteListTab* pWhite, SHashObj* pTable) { @@ -584,6 +618,10 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); + if (pMsg->pCont == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pMsg->contLen = 0; } STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); @@ -598,7 +636,7 @@ static int uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { if (pConn->inType == TDMT_SCH_DROP_TASK && pMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { transQueuePop(&pConn->srvMsgs); destroySmsg(smsg); - return -1; + return TSDB_CODE_INVALID_MSG; } if (pConn->status == ConnNormal) { @@ -764,7 +802,11 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { } static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) { if ((pHead)->release == 1 && (pHead->msgLen) == sizeof(*pHead)) { - reallocConnRef(pConn); + int32_t code = reallocConnRef(pConn); + if (code != 0) { + destroyConn(pConn, true); + return true; + } tTrace("conn %p received release request", pConn); STraceId traceId = pHead->traceId; @@ -935,23 +977,21 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { return; } - // uv_handle_type pending = uv_pipe_pending_type(pipe); - SSvrConn* pConn = createConn(pThrd); + if (pConn == NULL) { + uv_close((uv_handle_t*)q, NULL); + return; + } - pConn->pTransInst = pThrd->pTransInst; - /* init conn timer*/ - // uv_timer_init(pThrd->loop, &pConn->pTimer); - // pConn->pTimer.data = pConn; - - pConn->hostThrd = pThrd; - - // init client handle - pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, pConn->pTcp); - pConn->pTcp->data = pConn; - - // transSetConnOption((uv_tcp_t*)pConn->pTcp); + // pConn->pTransInst = pThrd->pTransInst; + // /* init conn timer*/ + // // uv_timer_init(pThrd->loop, &pConn->pTimer); + // // pConn->pTimer.data = pConn; + // pConn->hostThrd = pThrd; + // // init client handle + // pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + // uv_tcp_init(pThrd->loop, pConn->pTcp); + // pConn->pTcp->data = pConn; if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; @@ -1005,17 +1045,36 @@ void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { SWorkThrd* pThrd = container_of(connect, SWorkThrd, connect_req); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } -static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { +static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { + int32_t code = 0; pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - if (0 != uv_loop_init(pThrd->loop)) { - return false; + if (pThrd->loop == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if ((code = uv_loop_init(pThrd->loop)) != 0) { + tError("failed to init loop since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } #if defined(WINDOWS) || defined(DARWIN) - uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + if (code != 0) { + tError("failed to init pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #else - uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - uv_pipe_open(pThrd->pipe, pThrd->fd); + code = uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + if (code != 0) { + tError("failed to init pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + + code = uv_pipe_open(pThrd->pipe, pThrd->fd); + if (code != 0) { + tError("failed to open pip since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #endif pThrd->pipe->data = pThrd; @@ -1023,50 +1082,86 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { QUEUE_INIT(&pThrd->msg); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); - uv_prepare_init(pThrd->loop, pThrd->prepare); - uv_prepare_start(pThrd->prepare, uvPrepareCb); + if (pThrd->prepare == NULL) { + tError("failed to init prepare"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + code = uv_prepare_init(pThrd->loop, pThrd->prepare); + if (code != 0) { + tError("failed to init prepare since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } + + code = uv_prepare_start(pThrd->prepare, uvPrepareCb); + if (code != 0) { + tError("failed to start prepare since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } pThrd->prepare->data = pThrd; // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb); + code = transAsyncPoolCreate(pThrd->loop, 8, pThrd, uvWorkerAsyncCb, &pThrd->asyncPool); + if (code != 0) { + tError("failed to init async pool since:%s", tstrerror(code)); + return code; + } #if defined(WINDOWS) || defined(DARWIN) uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); + #else - uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + code = uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + if (code != 0) { + tError("failed to start read pipe:%s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } #endif - return true; + return 0; } -static bool addHandleToAcceptloop(void* arg) { +static int32_t addHandleToAcceptloop(void* arg) { // impl later SServerObj* srv = arg; - int err = 0; - if ((err = uv_tcp_init(srv->loop, &srv->server)) != 0) { - tError("failed to init accept server:%s", uv_err_name(err)); - return false; + int code = 0; + if ((code = uv_tcp_init(srv->loop, &srv->server)) != 0) { + tError("failed to init accept server since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } // register an async here to quit server gracefully srv->pAcceptAsync = taosMemoryCalloc(1, sizeof(uv_async_t)); - uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + if (srv->pAcceptAsync == NULL) { + tError("failed to create async since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return TSDB_CODE_OUT_OF_MEMORY; + } + + code = uv_async_init(srv->loop, srv->pAcceptAsync, uvAcceptAsyncCb); + if (code != 0) { + tError("failed to init async since:%s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; + } srv->pAcceptAsync->data = srv; struct sockaddr_in bind_addr; - uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); - if ((err = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { - tError("failed to bind:%s", uv_err_name(err)); - return false; + if ((code = uv_ip4_addr("0.0.0.0", srv->port, &bind_addr)) != 0) { + tError("failed to bind addr since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } - if ((err = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) { - tError("failed to listen:%s", uv_err_name(err)); - terrno = TSDB_CODE_RPC_PORT_EADDRINUSE; - return false; + + if ((code = uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0)) != 0) { + tError("failed to bind since %s", uv_err_name(code)); + return TSDB_CODE_THIRDPARTY_ERROR; } - return true; + if ((code = uv_listen((uv_stream_t*)&srv->server, 4096 * 2, uvOnAcceptCb)) != 0) { + tError("failed to listen since %s", uv_err_name(code)); + return TSDB_CODE_RPC_PORT_EADDRINUSE; + } + return 0; } + void* transWorkerThread(void* arg) { setThreadName("trans-svr-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; @@ -1076,35 +1171,84 @@ void* transWorkerThread(void* arg) { } static FORCE_INLINE SSvrConn* createConn(void* hThrd) { + int32_t code = 0; SWorkThrd* pThrd = hThrd; SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); + if (pConn == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } transReqQueueInit(&pConn->wreqQueue); QUEUE_INIT(&pConn->queue); - QUEUE_PUSH(&pThrd->conn, &pConn->queue); - transQueueInit(&pConn->srvMsgs, NULL); + if ((code = transQueueInit(&pConn->srvMsgs, NULL)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } + + if ((code = transInitBuffer(&pConn->readBuf)) != 0) { + TAOS_CHECK_GOTO(code, NULL, _end); + } memset(&pConn->regArg, 0, sizeof(pConn->regArg)); pConn->broken = false; pConn->status = ConnNormal; - transInitBuffer(&pConn->readBuf); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + if (exh == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + exh->handle = pConn; exh->pThrd = pThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); + } + QUEUE_INIT(&exh->q); - transAcquireExHandle(transGetRefMgt(), exh->refId); + + SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (pSelf != exh) { + TAOS_CHECK_GOTO(TSDB_CODE_REF_INVALID_ID, NULL, _end); + } STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; QUEUE_INIT(&exh->q); transRefSrvHandle(pConn); tTrace("%s handle %p, conn %p created, refId:%" PRId64, transLabel(pTransInst), exh, pConn, pConn->refId); + + pConn->pTransInst = pThrd->pTransInst; + /* init conn timer*/ + // uv_timer_init(pThrd->loop, &pConn->pTimer); + // pConn->pTimer.data = pConn; + pConn->hostThrd = pThrd; + // init client handle + pConn->pTcp = (uv_tcp_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); + if (pConn->pTcp == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end); + } + + code = uv_tcp_init(pThrd->loop, pConn->pTcp); + if (code != 0) { + tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), uv_strerror(code)); + TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end); + } + pConn->pTcp->data = pConn; + return pConn; +_end: + if (pConn) { + transQueueDestroy(&pConn->srvMsgs); + transDestroyBuffer(&pConn->readBuf); + taosMemoryFree(pConn->pTcp); + taosMemoryFree(pConn); + pConn = NULL; + } + tError("%s failed to create conn since %s" PRId64, transLabel(pTransInst), tstrerror(code)); + return NULL; } static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear) { @@ -1125,16 +1269,33 @@ static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn) { conn->regArg.init = 0; } } -static int reallocConnRef(SSvrConn* conn) { - transReleaseExHandle(transGetRefMgt(), conn->refId); - transRemoveExHandle(transGetRefMgt(), conn->refId); +static int32_t reallocConnRef(SSvrConn* conn) { + if (conn->refId > 0) { + transReleaseExHandle(transGetRefMgt(), conn->refId); + transRemoveExHandle(transGetRefMgt(), conn->refId); + } // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + if (exh == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + exh->handle = conn; exh->pThrd = conn->hostThrd; exh->refId = transAddExHandle(transGetRefMgt(), exh); + if (exh->refId < 0) { + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + QUEUE_INIT(&exh->q); - transAcquireExHandle(transGetRefMgt(), exh->refId); + SExHandle* pSelf = transAcquireExHandle(transGetRefMgt(), exh->refId); + if (pSelf != exh) { + tError("conn %p failed to acquire handle", conn); + taosMemoryFree(exh); + return TSDB_CODE_REF_INVALID_ID; + } + conn->refId = exh->refId; return 0; @@ -1204,24 +1365,41 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { } void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + int32_t code = 0; + SServerObj* srv = taosMemoryCalloc(1, sizeof(SServerObj)); - srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); + if (srv == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + tError("failed to init server since: %s", tstrerror(code)); + return NULL; + } + + srv->ip = ip; + srv->port = port; srv->numOfThreads = numOfThreads; srv->workerIdx = 0; srv->numOfWorkerReady = 0; + srv->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); srv->pThreadObj = (SWorkThrd**)taosMemoryCalloc(srv->numOfThreads, sizeof(SWorkThrd*)); srv->pipe = (uv_pipe_t**)taosMemoryCalloc(srv->numOfThreads, sizeof(uv_pipe_t*)); - srv->ip = ip; - srv->port = port; - uv_loop_init(srv->loop); + if (srv->loop == NULL || srv->pThreadObj == NULL || srv->pipe == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } - char pipeName[PATH_MAX]; + code = uv_loop_init(srv->loop); + if (code != 0) { + tError("failed to init server since: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } if (false == taosValidIpAndPort(srv->ip, srv->port)) { - terrno = TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); goto End; } + char pipeName[PATH_MAX]; #if defined(WINDOWS) || defined(DARWIN) int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); @@ -1259,7 +1437,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd, pipeName)) { + if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) { goto End; } @@ -1276,27 +1454,53 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd)); + if (thrd == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } thrd->pTransInst = shandle; thrd->quit = false; thrd->pTransInst = shandle; thrd->pWhiteList = uvWhiteListCreate(); - - srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); - srv->pThreadObj[i] = thrd; - - uv_os_sock_t fds[2]; - if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + if (thrd->pWhiteList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto End; } - uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); + if (srv->pipe[i] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto End; + } + + srv->pThreadObj[i] = thrd; + + uv_os_sock_t fds[2]; + if ((code = uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE)) != 0) { + tError("failed to create pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } + + code = uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + if (code != 0) { + tError("failed to init pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } + + code = uv_pipe_open(&(srv->pipe[i][0]), fds[1]); + if (code != 0) { + tError("failed to init pipe, errmsg: %s", uv_err_name(code)); + code = TSDB_CODE_THIRDPARTY_ERROR; + goto End; + } thrd->pipe = &(srv->pipe[i][1]); // init read thrd->fd = fds[0]; - if (false == addHandleToWorkloop(thrd, pipeName)) { + if ((code = addHandleToWorkloop(thrd, pipeName)) != 0) { goto End; } @@ -1311,15 +1515,17 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, } #endif - if (false == addHandleToAcceptloop(srv)) { + if ((code = addHandleToAcceptloop(srv)) != 0) { goto End; } - int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); - if (err == 0) { + code = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); + if (code == 0) { tDebug("success to create accept-thread"); } else { - tError("failed to create accept-thread"); + code = TAOS_SYSTEM_ERROR(errno); + tError("failed to create accept-thread since %s", tstrerror(code)); + goto End; // clear all resource later } @@ -1328,6 +1534,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, return srv; End: transCloseServer(srv); + terrno = code; return NULL; } @@ -1341,9 +1548,14 @@ void uvHandleQuit(SSvrMsg* msg, SWorkThrd* thrd) { taosMemoryFree(msg); } void uvHandleRelease(SSvrMsg* msg, SWorkThrd* thrd) { + int32_t code = 0; SSvrConn* conn = msg->pConn; if (conn->status == ConnAcquire) { - reallocConnRef(conn); + code = reallocConnRef(conn); + if (code != 0) { + destroyConn(conn, true); + return; + } if (!transQueuePush(&conn->srvMsgs, msg)) { return; } @@ -1387,30 +1599,43 @@ void uvHandleRegister(SSvrMsg* msg, SWorkThrd* thrd) { } void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) { SUpdateIpWhite* req = msg->arg; - if (req != NULL) { - for (int i = 0; i < req->numOfUser; i++) { - SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i]; - - int32_t sz = pUser->numOfRange * sizeof(SIpV4Range); - SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList)); - pList->num = pUser->numOfRange; - - memcpy(pList->pIpRange, pUser->pIpRanges, sz); - uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver); - } - - thrd->pWhiteList->ver = req->ver; - thrd->enableIpWhiteList = 1; - - tFreeSUpdateIpWhiteReq(req); - taosMemoryFree(req); - } else { + if (req == NULL) { tDebug("ip-white-list disable on trans"); thrd->enableIpWhiteList = 0; + taosMemoryFree(msg); + return; } + int32_t code = 0; + for (int i = 0; i < req->numOfUser; i++) { + SUpdateUserIpWhite* pUser = &req->pUserIpWhite[i]; + + int32_t sz = pUser->numOfRange * sizeof(SIpV4Range); + + SIpWhiteList* pList = taosMemoryCalloc(1, sz + sizeof(SIpWhiteList)); + if (pList == NULL) { + tError("failed to create ip-white-list since %s", tstrerror(code)); + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + pList->num = pUser->numOfRange; + memcpy(pList->pIpRange, pUser->pIpRanges, sz); + code = uvWhiteListAdd(thrd->pWhiteList, pUser->user, pList, pUser->ver); + if (code != 0) { + break; + } + } + + if (code == 0) { + thrd->pWhiteList->ver = req->ver; + thrd->enableIpWhiteList = 1; + } else { + tError("failed to update ip-white-list since %s", tstrerror(code)); + } + tFreeSUpdateIpWhiteReq(req); + taosMemoryFree(req); taosMemoryFree(msg); - return; } + void destroyWorkThrd(SWorkThrd* pThrd) { if (pThrd == NULL) { return; @@ -1483,6 +1708,7 @@ void transUnrefSrvHandle(void* handle) { } int transReleaseSrvHandle(void* handle) { + int32_t code = 0; SRpcHandleInfo* info = handle; SExHandle* exh = info->handle; int64_t refId = info->refId; @@ -1495,12 +1721,19 @@ int transReleaseSrvHandle(void* handle) { STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId}; SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + if (m == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return1; + } + m->msg = tmsg; m->type = Release; tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); - if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); + transReleaseExHandle(transGetRefMgt(), refId); + return code; } transReleaseExHandle(transGetRefMgt(), refId); @@ -1508,13 +1741,15 @@ int transReleaseSrvHandle(void* handle) { _return1: tDebug("handle %p failed to send to release handle", exh); transReleaseExHandle(transGetRefMgt(), refId); - return -1; + return code; _return2: tDebug("handle %p failed to send to release handle", exh); - return -1; + return code; } int transSendResponse(const STransMsg* msg) { + int32_t code = 0; + if (msg->info.noResp) { rpcFreeCont(msg->pCont); tTrace("no need send resp"); @@ -1536,13 +1771,20 @@ int transSendResponse(const STransMsg* msg) { ASYNC_ERR_JRET(pThrd); SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + if (m == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return1; + } + m->msg = tmsg; m->type = Normal; STraceId* trace = (STraceId*)&msg->info.traceId; tGDebug("conn %p start to send resp (1/2)", exh->handle); - if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); + transReleaseExHandle(transGetRefMgt(), refId); + return code; } transReleaseExHandle(transGetRefMgt(), refId); @@ -1552,13 +1794,15 @@ _return1: tDebug("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); transReleaseExHandle(transGetRefMgt(), refId); - return -1; + return code; _return2: tDebug("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - return -1; + return code; } int transRegisterMsg(const STransMsg* msg) { + int32_t code = 0; + SExHandle* exh = msg->info.handle; int64_t refId = msg->info.refId; ASYNC_CHECK_HANDLE(exh, refId); @@ -1572,13 +1816,20 @@ int transRegisterMsg(const STransMsg* msg) { ASYNC_ERR_JRET(pThrd); SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + if (m == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return1; + } + m->msg = tmsg; m->type = Register; STrans* pTransInst = pThrd->pTransInst; tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); - if (0 != transAsyncSend(pThrd->asyncPool, &m->q)) { + if ((code = transAsyncSend(pThrd->asyncPool, &m->q)) != 0) { destroySmsg(m); + transReleaseExHandle(transGetRefMgt(), refId); + return code; } transReleaseExHandle(transGetRefMgt(), refId); @@ -1588,29 +1839,56 @@ _return1: tDebug("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); transReleaseExHandle(transGetRefMgt(), refId); - return -1; + return code; _return2: tDebug("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - return -1; + return code; } -void transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { + +int32_t transSetIpWhiteList(void* thandle, void* arg, FilteFunc* func) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)thandle); + if (pTransInst == NULL) { + return TSDB_CODE_RPC_MODULE_QUIT; + } + + int32_t code = 0; tDebug("ip-white-list update on rpc"); SServerObj* svrObj = pTransInst->tcphandle; for (int i = 0; i < svrObj->numOfThreads; i++) { SWorkThrd* pThrd = svrObj->pThreadObj[i]; - SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); - SUpdateIpWhite* pReq = (arg != NULL ? cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg) : NULL); + SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); + if (msg == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + + SUpdateIpWhite* pReq = NULL; + code = cloneSUpdateIpWhiteReq((SUpdateIpWhite*)arg, &pReq); + if (code != 0) { + taosMemoryFree(msg); + break; + } msg->type = Update; msg->arg = pReq; - transAsyncSend(pThrd->asyncPool, &msg->q); + if ((code = transAsyncSend(pThrd->asyncPool, &msg->q)) != 0) { + code = (code == TSDB_CODE_RPC_ASYNC_MODULE_QUIT ? TSDB_CODE_RPC_MODULE_QUIT : code); + tFreeSUpdateIpWhiteReq(pReq); + taosMemoryFree(pReq); + taosMemoryFree(msg); + break; + } } transReleaseExHandle(transGetInstMgt(), (int64_t)thandle); + + if (code != 0) { + tError("ip-white-list update failed since %s", tstrerror(code)); + } + return code; } int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 1200cbfb5a..565a27f86a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -55,8 +55,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") -TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") -TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_BUSY, "rpc network busy") +TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_MODULE_QUIT, "http-report already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MODULE_QUIT, "rpc module already quit") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_ASYNC_MODULE_QUIT, "rpc async module already quit") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")