diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7e9ce34f9f..de1a61567f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -127,7 +127,7 @@ int32_t tEncodeSEpSet(SEncoder *pEncoder, const SEpSet *pEp) { if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; - if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pEp->eps[i].fqdn, TSDB_FQDN_LEN) < 0) return -1; } return 0; } @@ -3054,7 +3054,7 @@ int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { if (tEncodeCStr(&encoder, pReq->app) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->user) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->passwd) < 0) return -1; + if (tEncodeCStrWithLen(&encoder, pReq->passwd, TSDB_PASSWORD_LEN) < 0) return -1; if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1; tEndEncode(&encoder); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 89349d2de9..3c571a025f 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -73,6 +73,7 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { + const STraceId *trace = &pMsg->info.traceId; SDCreateMnodeReq createReq = {0}; if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -81,7 +82,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { if (createReq.replica != 1) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to create mnode since %s", terrstr()); + dGError("failed to create mnode since %s", terrstr()); return -1; } @@ -91,7 +92,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { mgmt.path = pInput->path; mgmt.name = pInput->name; if (mmWriteFile(&mgmt, &createReq.replicas[0], deployed) != 0) { - dError("failed to write mnode file since %s", terrstr()); + dGError("failed to write mnode file since %s", terrstr()); return -1; } @@ -99,7 +100,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SDDropMnodeReq dropReq = {0}; + const STraceId *trace = &pMsg->info.traceId; + SDDropMnodeReq dropReq = {0}; if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -107,7 +109,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) { terrno = TSDB_CODE_INVALID_OPTION; - dError("failed to drop mnode since %s", terrstr()); + dGError("failed to drop mnode since %s", terrstr()); return -1; } @@ -117,7 +119,7 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { mgmt.path = pInput->path; mgmt.name = pInput->name; if (mmWriteFile(&mgmt, NULL, deployed) != 0) { - dError("failed to write mnode file since %s", terrstr()); + dGError("failed to write mnode file since %s", terrstr()); return -1; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 32477febeb..7f3f76b4b6 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -18,7 +18,6 @@ static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) { int32_t code = 0; - taosThreadRwlockRdlock(&pMgmt->lock); if (pMgmt->stopped) { code = -1; @@ -48,7 +47,8 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) { static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; - STraceId * trace = &pMsg->info.traceId; + + const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, get from mnode queue", pMsg); switch (pMsg->msgType) { @@ -72,7 +72,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { mndPostProcessQueryMsg(pMsg); } - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + dGTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } @@ -80,7 +80,9 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { SMnodeMgmt *pMgmt = pInfo->ahandle; pMsg->info.node = pMgmt->pMnode; - dTrace("msg:%p, get from mnode-sync queue", pMsg); + + const STraceId *trace = &pMsg->info.traceId; + dGTrace("msg:%p, get from mnode-sync queue", pMsg); SMsgHead *pHead = pMsg->pCont; pHead->contLen = ntohl(pHead->contLen); @@ -88,20 +90,22 @@ static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = mndProcessSyncMsg(pMsg); - dTrace("msg:%p, is freed, code:0x%x", pMsg, code); + dGTrace("msg:%p, is freed, code:0x%x", pMsg, code); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } static inline int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pMsg) { + const STraceId *trace = &pMsg->info.traceId; + if (mmAcquire(pMgmt) == 0) { - dTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); + dGTrace("msg:%p, put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pMsg->msgType)); taosWriteQitem(pWorker->queue, pMsg); mmRelease(pMgmt); return 0; } else { - dTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(), - TMSG_INFO(pMsg->msgType)); + dGTrace("msg:%p, failed to put into %s queue since %s, type:%s", pMsg, pWorker->name, terrstr(), + TMSG_INFO(pMsg->msgType)); return -1; } } @@ -121,19 +125,17 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { pMsg->info.node = pMgmt->pMnode; if (mndPreProcessQueryMsg(pMsg) != 0) { - dError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); + const STraceId *trace = &pMsg->info.traceId; + dGError("msg:%p, failed to pre-process in mnode since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); return -1; } return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg); } int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - pMsg->info.node = pMgmt->pMnode; - return mmPutMsgToWorker(pMgmt, &pMgmt->fetchWorker, pMsg); } - int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->monitorWorker, pMsg); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 660f512fc5..4761e3dc36 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -17,9 +17,44 @@ #include "dmMgmt.h" #include "qworker.h" -static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet); -static void dmSendRsp(SRpcMsg *pMsg); -static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); +static inline void dmSendRsp(SRpcMsg *pMsg) { + SMgmtWrapper *pWrapper = pMsg->info.wrapper; + if (InChildProc(pWrapper)) { + dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); + } else { + rpcSendResponse(pMsg); + } +} + +static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { + SEpSet epSet = {0}; + dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet); + + const int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); + pMsg->pCont = rpcMallocCont(contLen); + if (pMsg->pCont == NULL) { + pMsg->code = TSDB_CODE_OUT_OF_MEMORY; + } else { + tSerializeSEpSet(pMsg->pCont, contLen, &epSet); + pMsg->contLen = contLen; + } +} + +static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { + SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; + int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet); + + rsp.pCont = rpcMallocCont(contLen); + if (rsp.pCont == NULL) { + pMsg->code = TSDB_CODE_OUT_OF_MEMORY; + } else { + tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet); + rsp.contLen = contLen; + } + dmSendRsp(&rsp); + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; +} int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; @@ -28,31 +63,38 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { return -1; } - dTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name); + const STraceId *trace = &pMsg->info.traceId; + dGTrace("msg:%p, will be processed by %s", pMsg, pWrapper->name); pMsg->info.wrapper = pWrapper; return (*msgFp)(pWrapper->pMgmt, pMsg); } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SDnodeTrans * pTrans = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; - SRpcMsg * pMsg = NULL; + SRpcMsg *pMsg = NULL; SMgmtWrapper *pWrapper = NULL; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; - STraceId *trace = &pRpc->info.traceId; + const STraceId *trace = &pRpc->info.traceId; dGTrace("msg:%s is received, handle:%p len:%d code:0x%x app:%p refId:%" PRId64, TMSG_INFO(pRpc->msgType), pRpc->info.handle, pRpc->contLen, pRpc->code, pRpc->info.ahandle, pRpc->info.refId); - if (pRpc->msgType == TDMT_DND_NET_TEST) { - dmProcessNetTestReq(pDnode, pRpc); - return; - } else if (pRpc->msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || pRpc->msgType == TDMT_VND_FETCH_RSP) { - qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); - return; - } else if (pRpc->msgType == TDMT_MND_STATUS_RSP && pEpSet != NULL) { - dmSetMnodeEpSet(&pDnode->data, pEpSet); - } else { + switch (pRpc->msgType) { + case TDMT_DND_NET_TEST: + dmProcessNetTestReq(pDnode, pRpc); + return; + case TDMT_MND_SYSTABLE_RETRIEVE_RSP: + case TDMT_VND_FETCH_RSP: + qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); + return; + case TDMT_MND_STATUS_RSP: + if (pEpSet != NULL) { + dmSetMnodeEpSet(&pDnode->data, pEpSet); + } + break; + default: + break; } if (pDnode->status != DND_STAT_RUNNING) { @@ -73,39 +115,43 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pHandle->defaultNtype == NODE_END) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; goto _OVER; - } else { - pWrapper = &pDnode->wrappers[pHandle->defaultNtype]; - if (pHandle->needCheckVgId) { - if (pRpc->contLen > 0) { - SMsgHead *pHead = pRpc->pCont; - int32_t vgId = ntohl(pHead->vgId); - if (vgId == QNODE_HANDLE) { + } + + pWrapper = &pDnode->wrappers[pHandle->defaultNtype]; + if (pHandle->needCheckVgId) { + if (pRpc->contLen > 0) { + const SMsgHead *pHead = pRpc->pCont; + const int32_t vgId = ntohl(pHead->vgId); + switch (vgId) { + case QNODE_HANDLE: pWrapper = &pDnode->wrappers[QNODE]; - } else if (vgId == SNODE_HANDLE) { + break; + case SNODE_HANDLE: pWrapper = &pDnode->wrappers[SNODE]; - } else if (vgId == MNODE_HANDLE) { + break; + case MNODE_HANDLE: pWrapper = &pDnode->wrappers[MNODE]; - } else { - } - } else { - terrno = TSDB_CODE_INVALID_MSG_LEN; - goto _OVER; + break; + default: + break; } + } else { + terrno = TSDB_CODE_INVALID_MSG_LEN; + goto _OVER; } } if (dmMarkWrapper(pWrapper) != 0) { pWrapper = NULL; goto _OVER; - } else { - pRpc->info.wrapper = pWrapper; } + pRpc->info.wrapper = pWrapper; pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) goto _OVER; - memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); + memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + dGTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle); if (InParentProc(pWrapper)) { code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ); @@ -115,13 +161,11 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { _OVER: if (code != 0) { - dTrace("failed to process msg:%p since %s, handle:%p", pMsg, terrstr(), pRpc->info.handle); - if (terrno != 0) code = terrno; + dGTrace("msg:%p, failed to process since %s", pMsg, terrstr()); if (IsReq(pRpc)) { SRpcMsg rsp = {.code = code, .info = pRpc->info}; - if ((code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_APP_NOT_READY) && pRpc->msgType > TDMT_MND_MSG && pRpc->msgType < TDMT_VND_MSG) { dmBuildMnodeRedirectRsp(pDnode, &rsp); @@ -135,7 +179,7 @@ _OVER: } if (pMsg != NULL) { - dTrace("msg:%p, is freed", pMsg); + dGTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); } rpcFreeCont(pRpc->pCont); @@ -149,11 +193,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SArray * pArray = (*pWrapper->func.getHandlesFp)(); + SArray *pArray = (*pWrapper->func.getHandlesFp)(); if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle * pMgmt = taosArrayGet(pArray, i); + SMgmtHandle *pMgmt = taosArrayGet(pArray, i); SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; @@ -184,45 +228,6 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { } } -static inline void dmSendRsp(SRpcMsg *pMsg) { - SMgmtWrapper *pWrapper = pMsg->info.wrapper; - if (InChildProc(pWrapper)) { - dmPutToProcPQueue(&pWrapper->proc, pMsg, DND_FUNC_RSP); - } else { - rpcSendResponse(pMsg); - } -} - -static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) { - SEpSet epSet = {0}; - dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet); - - int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); - pMsg->pCont = rpcMallocCont(contLen); - if (pMsg->pCont == NULL) { - pMsg->code = TSDB_CODE_OUT_OF_MEMORY; - } else { - tSerializeSEpSet(pMsg->pCont, contLen, &epSet); - pMsg->contLen = contLen; - } -} - -static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) { - SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; - int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet); - - rsp.pCont = rpcMallocCont(contLen); - if (rsp.pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - } else { - tSerializeSEpSet(rsp.pCont, contLen, pNewEpSet); - rsp.contLen = contLen; - } - dmSendRsp(&rsp); - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; -} - static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { SMgmtWrapper *pWrapper = pMsg->info.wrapper; if (InChildProc(pWrapper)) { diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 7897f62f62..f599de384c 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -40,18 +40,27 @@ #include "wal.h" #include "libs/function/function.h" -// clang-format off #ifdef __cplusplus extern "C" { #endif -#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} -#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} -#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} -#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} -#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} -#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",GTID: %s", __VA_ARGS__, buf);} while(0) +// clang-format off + +#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} +#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} +#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} +#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} +#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} +#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} + +#define dGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", gtid:%s", __VA_ARGS__, buf);} +#define dGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", gtid:%s", __VA_ARGS__, buf);} +#define dGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn (param ", gtid:%s", __VA_ARGS__, buf);} +#define dGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo (param ", gtid:%s", __VA_ARGS__, buf);} +#define dGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", gtid:%s", __VA_ARGS__, buf);} +#define dGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", gtid:%s", __VA_ARGS__, buf);} + +// clang-format on typedef enum { DNODE = 0, @@ -185,4 +194,3 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); #endif #endif /*_TD_DM_INT_H_*/ -// clang-format on diff --git a/source/dnode/mgmt/test/sut/src/sut.cpp b/source/dnode/mgmt/test/sut/src/sut.cpp index 6ef94481ea..21d9351ceb 100644 --- a/source/dnode/mgmt/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/test/sut/src/sut.cpp @@ -102,6 +102,7 @@ int32_t Testbase::SendShowReq(int8_t showType, const char* tb, const char* db) { ASSERT(pRsp->pCont != nullptr); if (pRsp->contLen == 0) return -1; + if (pRsp->code != 0) return -1; showRsp = (SRetrieveMetaTableRsp*)pRsp->pCont; showRsp->handle = htobe64(showRsp->handle); // show Id diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index c810a0cbc7..958d82a75d 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -34,13 +34,20 @@ extern "C" { #endif // clang-format off -#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} -#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} -#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} -#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} -#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} -#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} -#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0) + +#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} +#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} +#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} +#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} +#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} +#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} + +#define mGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", gtid:%s", __VA_ARGS__, buf);} +#define mGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", gtid:%s", __VA_ARGS__, buf);} +#define mGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", gtid:%s", __VA_ARGS__, buf);} +#define mGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", gtid:%s", __VA_ARGS__, buf);} +#define mGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", gtid:%s", __VA_ARGS__, buf);} +#define mGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);} // clang-format on diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index ef82d1131c..96ca70bef1 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -59,22 +59,28 @@ static void *mndBuildTimerMsg(int32_t *pContLen) { static void mndPullupTrans(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; - tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (pReq != NULL) { + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } } static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; - tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); + if (pReq != NULL) { + SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; + tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); + } } static void mndPullupTelem(SMnode *pMnode) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; - tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); + if (pReq != NULL) { + SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; + tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); + } } static void mndPushTtlTime(SMnode *pMnode) { @@ -89,10 +95,11 @@ static void mndPushTtlTime(SMnode *pMnode) { int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t); SMsgHead *pHead = rpcMallocCont(contLen); if (pHead == NULL) { - mError("ttl time malloc err. contLen:%d", contLen); + sdbCancelFetch(pSdb, pIter); sdbRelease(pSdb, pVgroup); continue; } + pHead->contLen = htonl(contLen); pHead->vgId = htonl(pVgroup->vgId); @@ -100,13 +107,13 @@ static void mndPushTtlTime(SMnode *pMnode) { *(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t); SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen}; - SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); int32_t code = tmsgSendReq(&epSet, &rpcMsg); if (code != 0) { - mError("ttl time seed err. code:%d", code); + mError("failed to send ttl time seed msg, code:0x%x", code); + } else { + mInfo("send ttl time seed msg, time:%d", t); } - mError("ttl time seed succ. time:%d", t); sdbRelease(pSdb, pVgroup); } } @@ -117,7 +124,7 @@ static void *mndThreadFp(void *param) { setThreadName("mnode-timer"); while (1) { - if (lastTime % (864000) == 0) { // sleep 1 day for ttl + if (lastTime % 864000 == 0) { mndPushTtlTime(pMnode); } @@ -549,23 +556,25 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; + if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || + pMsg->msgType == TDMT_MND_TRANS_TIMER) { + return -1; + } - if (pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && - pMsg->msgType != TDMT_MND_TRANS_TIMER) { - mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); + const STraceId *trace = &pMsg->info.traceId; + mGError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); - SEpSet epSet = {0}; - mndGetMnodeEpSet(pMsg->info.node, &epSet); + SEpSet epSet = {0}; + mndGetMnodeEpSet(pMsg->info.node, &epSet); - int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); - pMsg->info.rsp = rpcMallocCont(contLen); - if (pMsg->info.rsp != NULL) { - tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet); - pMsg->info.rspLen = contLen; - terrno = TSDB_CODE_RPC_REDIRECT; - } else { - terrno = TSDB_CODE_OUT_OF_MEMORY; - } + int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet); + pMsg->info.rsp = rpcMallocCont(contLen); + if (pMsg->info.rsp != NULL) { + tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet); + pMsg->info.rspLen = contLen; + terrno = TSDB_CODE_RPC_REDIRECT; + } else { + terrno = TSDB_CODE_OUT_OF_MEMORY; } return -1; @@ -575,17 +584,20 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; - mError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen, + const STraceId *trace = &pMsg->info.traceId; + mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; return -1; } int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { - SMnode *pMnode = pMsg->info.node; + SMnode *pMnode = pMsg->info.node; + const STraceId *trace = &pMsg->info.traceId; + MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; if (fp == NULL) { - mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_MSG_NOT_PROCESSED; return -1; } @@ -593,18 +605,17 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { if (mndCheckMsgContent(pMsg) != 0) return -1; if (mndCheckMnodeState(pMsg) != 0) return -1; - STraceId *trace = &pMsg->info.traceId; mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); int32_t code = (*fp)(pMsg); mndReleaseRpcRef(pMnode); if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - mTrace("msg:%p, won't response immediately since in progress", pMsg); + mGTrace("msg:%p, won't response immediately since in progress", pMsg); } else if (code == 0) { - mTrace("msg:%p, successfully processed", pMsg); + mGTrace("msg:%p, successfully processed", pMsg); } else { - mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, - TMSG_INFO(pMsg->msgType)); + mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, + TMSG_INFO(pMsg->msgType)); } return code; @@ -620,7 +631,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { // Note: uid 0 is reserved int64_t mndGenerateUid(char *name, int32_t len) { int32_t hashval = MurmurHash3_32(name, len); - do { int64_t us = taosGetTimestampUs(); int64_t x = (us & 0x000000FFFFFFFFFF) << 24; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index fd80679316..01e3bdcf13 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -122,30 +122,33 @@ static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType int32_t pid, const char *app, int64_t startTime) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; - char connStr[255] = {0}; - int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app); + char connStr[255] = {0}; + int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app); uint32_t connId = mndGenerateUid(connStr, len); if (startTime == 0) startTime = taosGetTimestampMs(); - SConnObj connObj = {.id = connId, - .connType = connType, - .appStartTimeMs = startTime, - .pid = pid, - .ip = ip, - .port = port, - .killed = 0, - .loginTimeMs = taosGetTimestampMs(), - .lastAccessTimeMs = 0, - .killId = 0, - .numOfQueries = 0, - .pQueries = NULL}; + SConnObj connObj = { + .id = connId, + .connType = connType, + .appStartTimeMs = startTime, + .pid = pid, + .ip = ip, + .port = port, + .killed = 0, + .loginTimeMs = taosGetTimestampMs(), + .lastAccessTimeMs = 0, + .killId = 0, + .numOfQueries = 0, + .pQueries = NULL, + }; connObj.lastAccessTimeMs = connObj.loginTimeMs; tstrncpy(connObj.user, user, TSDB_USER_LEN); tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN); int32_t keepTime = tsShellActivityTimer * 3; - SConnObj *pConn = taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000); + SConnObj *pConn = + taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000); if (pConn == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr()); @@ -174,7 +177,6 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) { } pConn->lastAccessTimeMs = taosGetTimestampMs(); - mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn); return pConn; } @@ -207,13 +209,14 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { } static int32_t mndProcessConnectReq(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SUserObj *pUser = NULL; - SDbObj *pDb = NULL; - SConnObj *pConn = NULL; - int32_t code = -1; - SConnectReq connReq = {0}; - char ip[30] = {0}; + SMnode *pMnode = pReq->info.node; + SUserObj *pUser = NULL; + SDbObj *pDb = NULL; + SConnObj *pConn = NULL; + int32_t code = -1; + SConnectReq connReq = {0}; + char ip[30] = {0}; + const STraceId *trace = &pReq->info.traceId; if (tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; @@ -224,11 +227,11 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { pUser = mndAcquireUser(pMnode, pReq->info.conn.user); if (pUser == NULL) { - mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr()); + mGError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr()); goto CONN_OVER; } if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) { - mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd); + mGError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd); code = TSDB_CODE_RPC_AUTH_FAILURE; goto CONN_OVER; } @@ -239,8 +242,8 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { pDb = mndAcquireDb(pMnode, db); if (pDb == NULL) { terrno = TSDB_CODE_MND_INVALID_DB; - mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, - terrstr()); + mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db, + terrstr()); goto CONN_OVER; } } @@ -248,7 +251,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp, pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { - mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr()); + mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr()); goto CONN_OVER; } @@ -273,7 +276,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { pReq->info.rspLen = contLen; pReq->info.rsp = pRsp; - mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app); + mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app); code = 0; @@ -302,7 +305,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) { return TSDB_CODE_SUCCESS; } -static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq* pReq) { +static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; SAppObj app; @@ -314,22 +317,19 @@ static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq* pReq) memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary)); app.lastAccessTimeMs = taosGetTimestampMs(); - int32_t keepTime = tsShellActivityTimer * 3; + const int32_t keepTime = tsShellActivityTimer * 3; SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000); if (pApp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr()); return NULL; } - + mTrace("app %" PRIx64 " is put into cache", pReq->appId); return pApp; } -static void mndFreeApp(SAppObj *pApp) { - mTrace("app %" PRIx64 " is destroyed", pApp->appId); -} - +static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); } static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; @@ -356,7 +356,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) { void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) { SAppObj *pApp = NULL; - bool hasNext = taosCacheIterNext(pIter); + bool hasNext = taosCacheIterNext(pIter); if (hasNext) { size_t dataLen = 0; pApp = taosCacheIterGetData(pIter, &dataLen); @@ -439,8 +439,8 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { } static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) { - SAppHbReq* pReq = &pHbReq->app; - SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId); + SAppHbReq *pReq = &pHbReq->app; + SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId); if (pApp == NULL) { pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq); if (pApp == NULL) { @@ -448,7 +448,7 @@ static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnIn return -1; } else { mDebug("a new app %" PRIx64 "created", pReq->appId); - mndReleaseApp(pMnode, pApp); + mndReleaseApp(pMnode, pApp); return TSDB_CODE_SUCCESS; } } @@ -464,7 +464,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb SClientHbBatchRsp *pBatchRsp) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; - SRpcConnInfo connInfo = pMsg->info.conn; + SRpcConnInfo connInfo = pMsg->info.conn; mndUpdateAppInfo(pMnode, pHbReq, &connInfo); @@ -637,9 +637,9 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) { } mInfo("kill query msg is received, queryId:%s", killReq.queryStrId); - int32_t connId = 0; + int32_t connId = 0; uint64_t queryId = 0; - char* p = strchr(killReq.queryStrId, ':'); + char *p = strchr(killReq.queryStrId, ':'); if (NULL == p) { mError("invalid query id %s", killReq.queryStrId); terrno = TSDB_CODE_MND_INVALID_QUERY_ID; @@ -853,12 +853,12 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p } static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; - int32_t cols = 0; - SAppObj *pApp = NULL; - + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SAppObj *pApp = NULL; + if (pShow->pIter == NULL) { SProfileMgmt *pMgmt = &pMnode->profileMgmt; pShow->pIter = taosCacheCreateIter(pMgmt->appCache); @@ -931,7 +931,6 @@ static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo return numOfRows; } - static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) { if (pIter != NULL) { taosCacheDestroyIter(pIter); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 9339402d43..19c46d2f6e 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -26,13 +26,21 @@ extern "C" { #endif // clang-format off -#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0) -#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) -#define vGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param " GTID: %s", __VA_ARGS__, buf);} while(0)//#define vDye(...) do + +#define vFatal(...) { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} +#define vError(...) { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} +#define vWarn(...) { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} +#define vInfo(...) { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} +#define vDebug(...) { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} +#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} + +#define vGFatal(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vFatal(param ", gtid:%s", __VA_ARGS__, buf);} +#define vGError(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vError(param ", gtid:%s", __VA_ARGS__, buf);} +#define vGWarn(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vWarn (param ", gtid:%s", __VA_ARGS__, buf);} +#define vGInfo(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vInfo (param ", gtid:%s", __VA_ARGS__, buf);} +#define vGDebug(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vDebug(param ", gtid:%s", __VA_ARGS__, buf);} +#define vGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param ", gtid:%s", __VA_ARGS__, buf);} + // clang-format on // vnodeCfg.c diff --git a/source/libs/transport/inc/transLog.h b/source/libs/transport/inc/transLog.h index 9947ba803f..81099941c6 100644 --- a/source/libs/transport/inc/transLog.h +++ b/source/libs/transport/inc/transLog.h @@ -24,16 +24,16 @@ extern "C" { #include "tlog.h" #include "ttrace.h" -#define tFatal(...) do {if (rpcDebugFlag & DEBUG_FATAL){ taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} while (0) -#define tError(...)do { if (rpcDebugFlag & DEBUG_ERROR){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); } } while(0) -#define tWarn(...) do { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); }} while(0) -#define tInfo(...) do { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); }} while(0) -#define tDebug(...) do {if (rpcDebugFlag & DEBUG_DEBUG){ taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} while(0) -#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0) -#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0) +#define tFatal(...) { if (rpcDebugFlag & DEBUG_FATAL) { taosPrintLog("RPC FATAL ", DEBUG_FATAL, rpcDebugFlag, __VA_ARGS__); }} +#define tError(...) { if (rpcDebugFlag & DEBUG_ERROR ){ taosPrintLog("RPC ERROR ", DEBUG_ERROR, rpcDebugFlag, __VA_ARGS__); }} +#define tWarn(...) { if (rpcDebugFlag & DEBUG_WARN) { taosPrintLog("RPC WARN ", DEBUG_WARN, rpcDebugFlag, __VA_ARGS__); }} +#define tInfo(...) { if (rpcDebugFlag & DEBUG_INFO) { taosPrintLog("RPC ", DEBUG_INFO, rpcDebugFlag, __VA_ARGS__); }} +#define tDebug(...) { if (rpcDebugFlag & DEBUG_DEBUG) { taosPrintLog("RPC ", DEBUG_DEBUG, rpcDebugFlag, __VA_ARGS__); }} +#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} +#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } //#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0) -#define tGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0) +#define tGTrace(param, ...) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", gtid:%s", __VA_ARGS__, buf);} // clang-format on #ifdef __cplusplus