diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index ff9003b8fc..657c21c4f9 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -86,7 +86,7 @@ void closeTransporter(STscObj *pTscObj) { static bool clientRpcRfp(int32_t code) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || - code == TSDB_CODE_SYN_NOT_LEADER) { + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { return true; } else { return false; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 63d2a65df1..a4745abd5b 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -70,9 +70,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SDnodeTrans *pTrans = &pDnode->trans; + SDnodeTrans * pTrans = &pDnode->trans; int32_t code = -1; - SRpcMsg *pMsg = NULL; + SRpcMsg * pMsg = NULL; SMgmtWrapper *pWrapper = NULL; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; @@ -194,11 +194,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SArray *pArray = (*pWrapper->func.getHandlesFp)(); + SArray * pArray = (*pWrapper->func.getHandlesFp)(); if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle *pMgmt = taosArrayGet(pArray, i); + SMgmtHandle * pMgmt = taosArrayGet(pArray, i); SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; @@ -248,7 +248,14 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { } } -static bool rpcRfp(int32_t code) { return code == TSDB_CODE_RPC_REDIRECT; } +static bool rpcRfp(int32_t code) { + if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { + return true; + } else { + return false; + } +} int32_t dmInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 838071dbf1..983cffe9dc 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -12,6 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +// clang-format off #include "uv.h" #include "os.h" #include "fnLog.h" @@ -25,6 +27,7 @@ #include "tglobal.h" #include "tmsg.h" #include "trpc.h" +// clang-foramt on typedef struct SUdfdContext { uv_loop_t * loop; @@ -103,12 +106,12 @@ typedef struct SUdfdRpcSendRecvInfo { uv_sem_t resultSem; } SUdfdRpcSendRecvInfo; -static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf); static int32_t udfdConnectToMnode(); static int32_t udfdLoadUdf(char *udfName, SUdf *udf); -static bool udfdRpcRfp(int32_t code); -static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); +static bool udfdRpcRfp(int32_t code); +static int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet); static int32_t udfdOpenClientRpc(); static int32_t udfdCloseClientRpc(); @@ -126,19 +129,19 @@ static void udfdUvHandleError(SUdfdUvConn *conn) { uv_close((uv_handle_t *)conn- static void udfdPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); static void udfdOnNewConnection(uv_stream_t *server, int status); -static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); +static void udfdIntrSignalHandler(uv_signal_t *handle, int signum); static int32_t removeListeningPipe(); -static void udfdPrintVersion(); +static void udfdPrintVersion(); static int32_t udfdParseArgs(int32_t argc, char *argv[]); static int32_t udfdInitLog(); -static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); -static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); +static void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); +static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf); static int32_t udfdUvInit(); -static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); +static void udfdCloseWalkCb(uv_handle_t *handle, void *arg); static int32_t udfdRun(); -static void udfdConnectMnodeThreadFunc(void* args); +static void udfdConnectMnodeThreadFunc(void *args); void udfdProcessRequest(uv_work_t *req) { SUvUdfWork *uvUdf = (SUvUdfWork *)(req->data); @@ -401,11 +404,11 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { udf->bufSize = pFuncInfo->bufSize; char path[PATH_MAX] = {0}; - #ifdef WINDOWS +#ifdef WINDOWS snprintf(path, sizeof(path), "%s%s.dll", TD_TMP_DIR_PATH, pFuncInfo->name); - #else +#else snprintf(path, sizeof(path), "%s/lib%s.so", TD_TMP_DIR_PATH, pFuncInfo->name); - #endif +#endif TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); if (file == NULL) { @@ -544,7 +547,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { return 0; } static bool udfdRpcRfp(int32_t code) { - if (code == TSDB_CODE_RPC_REDIRECT) { + if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY) { return true; } else { return false; @@ -652,8 +656,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { buf->base = ctx->inputBuf; buf->len = ctx->inputCap; } else { - fnError("udfd can not allocate enough memory") - buf->base = NULL; + fnError("udfd can not allocate enough memory") buf->base = NULL; buf->len = 0; } } else { @@ -664,8 +667,7 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { buf->base = ctx->inputBuf + ctx->inputLen; buf->len = ctx->inputCap - ctx->inputLen; } else { - fnError("udfd can not allocate enough memory") - buf->base = NULL; + fnError("udfd can not allocate enough memory") buf->base = NULL; buf->len = 0; } } @@ -881,7 +883,7 @@ static int32_t udfdRun() { return 0; } -void udfdConnectMnodeThreadFunc(void* args) { +void udfdConnectMnodeThreadFunc(void *args) { int32_t retryMnodeTimes = 0; int32_t code = 0; while (retryMnodeTimes++ <= TSDB_MAX_REPLICA) { @@ -939,7 +941,7 @@ int main(int argc, char *argv[]) { uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); udfdRun(); - + removeListeningPipe(); udfdCloseClientRpc(); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 0671aa39d1..a21b753d6e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -758,13 +758,16 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); } -SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd) { +SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { SCliConn* conn = NULL; int64_t refId = (int64_t)(pMsg->msg.info.handle); if (refId != 0) { SExHandle* exh = transAcquireExHandle(refMgt, refId); if (exh == NULL) { - assert(0); + *ignore = true; + destroyCmsg(pMsg); + return NULL; + // assert(0); } else { conn = exh->handle; transReleaseExHandle(refMgt, refId); @@ -799,7 +802,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); // transPrintEpSet(&pCtx->epSet); - SCliConn* conn = cliGetConn(pMsg, pThrd); + bool ignore = false; + SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); + if (ignore == true) { + return; + } if (conn != NULL) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 7651860224..892d32696e 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -422,7 +422,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { transUnrefSrvHandle(pConn); } else { pHead->msgType = pMsg->msgType; - if (pHead->msgType == 0) pHead->msgType = pConn->inType + 1; + if (pHead->msgType == 0 && transMsgLenFromCont(pMsg->contLen) == sizeof(STransMsgHead)) + pHead->msgType = pConn->inType + 1; } }