diff --git a/include/util/tlog.h b/include/util/tlog.h index 832dc7dbc1..e80e94de32 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -69,6 +69,8 @@ extern int32_t tdbDebugFlag; extern int32_t sndDebugFlag; extern int32_t simDebugFlag; +extern int32_t tqClientDebug; + int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc); void taosCloseLog(); void taosResetLog(); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 78135b245f..0b50ac6ea0 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -377,9 +377,12 @@ void taos_free_result(TAOS_RES *res) { return; } SMqRspObj *pRsp = (SMqRspObj *)res; - if (TD_RES_TMQ_METADATA(res) || TD_RES_TMQ(res)) { + if (TD_RES_TMQ(res)) { tDeleteMqDataRsp(&pRsp->dataRsp); doFreeReqResultInfo(&pRsp->resInfo); + } else if (TD_RES_TMQ_METADATA(res)) { + tDeleteSTaosxRsp(&pRsp->dataRsp); + doFreeReqResultInfo(&pRsp->resInfo); } else if (TD_RES_TMQ_META(res)) { tDeleteMqMetaRsp(&pRsp->metaRsp); } else if (TD_RES_TMQ_BATCH_META(res)) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 20798fbdeb..3803ac0c7a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,12 +24,12 @@ #include "tref.h" #include "ttimer.h" -#define tqFatalC(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqErrorC(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqWarnC(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqInfoC(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqDebugC(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) -#define tqTraceC(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqFatalC(...) do { if (cDebugFlag & DEBUG_FATAL || tqClientDebug) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebug) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqWarnC(...) do { if (cDebugFlag & DEBUG_WARN || tqClientDebug) { taosPrintLog("TQ WARN ", DEBUG_WARN, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebug) { taosPrintLog("TQ ", DEBUG_INFO, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebug) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqTraceC(...) do { if (cDebugFlag & DEBUG_TRACE || tqClientDebug) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) #define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 @@ -910,7 +910,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { (void)taosReleaseRef(tmqMgmt.rsetId, refId); } - tqDebugFlag = rsp.debugFlag; + tqClientDebug = rsp.debugFlag; tDestroySMqHbRsp(&rsp); END: @@ -2004,11 +2004,12 @@ static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, i } static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, - SMqRspObj* pRspObj, SMqDataRsp* pDataRsp) { + SMqRspObj* pRspObj) { pRspObj->resIter = -1; pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; + SMqDataRsp* pDataRsp = &pRspObj->dataRsp; bool needTransformSchema = !pDataRsp->withSchema; if (!pDataRsp->withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable pDataRsp->withSchema = true; @@ -2276,12 +2277,12 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ } pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA; int64_t numOfRows = 0; - tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj, &pRspObj->dataRsp); + tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj); tmq->totalRows += numOfRows; pVg->emptyBlockReceiveTs = 0; if (tmq->replayEnable) { pVg->blockReceiveTs = taosGetTimestampMs(); - pVg->blockSleepForReplay = pollRspWrapper->dataRsp.sleepTime; + pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime; if (pVg->blockSleepForReplay > 0) { if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) { tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64, @@ -2291,7 +2292,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ } tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, + tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 5b67e1267b..6b0da468a2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -541,6 +541,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); + TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebug", tqClientDebug, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER)); @@ -1935,7 +1936,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag}, {"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag}, {"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"metaDebugFlag", &metaDebugFlag}, - {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, + {"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"tqClientDebug", &tqClientDebug}, }; static OptionNameAndVar options[] = {{"audit", &tsEnableAudit}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index bf4a91b224..38b793c0c9 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10780,7 +10780,7 @@ _exit: int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { TAOS_CHECK_RETURN(tDecodeMqDataRspCommon(pDecoder, pRsp)); if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &((SMqDataRsp *)pRsp)->sleepTime)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime)); } return 0; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index d37ab090b5..1bffc016bd 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -244,7 +244,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { } storeOffsetRows(pMnode, &req, pConsumer); - rsp.debugFlag = tqDebugFlag; + rsp.debugFlag = tqClientDebug; code = buildMqHbRsp(pMsg, &rsp); END: diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 49e1d87d80..66c49d9ac5 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -125,6 +125,8 @@ int32_t idxDebugFlag = 131; int32_t sndDebugFlag = 131; int32_t simDebugFlag = 131; +int32_t tqClientDebug = 0; + int64_t dbgEmptyW = 0; int64_t dbgWN = 0; int64_t dbgSmallWN = 0;