enh:[TD-30270]opti data struct in tmq

This commit is contained in:
wangmm0220 2024-09-10 15:11:43 +08:00
parent a1fbf769a8
commit 88f8f62f87
7 changed files with 24 additions and 15 deletions

View File

@ -69,6 +69,8 @@ extern int32_t tdbDebugFlag;
extern int32_t sndDebugFlag;
extern int32_t simDebugFlag;
extern int32_t tqClientDebug;
int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc);
void taosCloseLog();
void taosResetLog();

View File

@ -377,9 +377,12 @@ void taos_free_result(TAOS_RES *res) {
return;
}
SMqRspObj *pRsp = (SMqRspObj *)res;
if (TD_RES_TMQ_METADATA(res) || TD_RES_TMQ(res)) {
if (TD_RES_TMQ(res)) {
tDeleteMqDataRsp(&pRsp->dataRsp);
doFreeReqResultInfo(&pRsp->resInfo);
} else if (TD_RES_TMQ_METADATA(res)) {
tDeleteSTaosxRsp(&pRsp->dataRsp);
doFreeReqResultInfo(&pRsp->resInfo);
} else if (TD_RES_TMQ_META(res)) {
tDeleteMqMetaRsp(&pRsp->metaRsp);
} else if (TD_RES_TMQ_BATCH_META(res)) {

View File

@ -24,12 +24,12 @@
#include "tref.h"
#include "ttimer.h"
#define tqFatalC(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqErrorC(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqWarnC(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqInfoC(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqDebugC(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTraceC(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqFatalC(...) do { if (cDebugFlag & DEBUG_FATAL || tqClientDebug) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqErrorC(...) do { if (cDebugFlag & DEBUG_ERROR || tqClientDebug) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqWarnC(...) do { if (cDebugFlag & DEBUG_WARN || tqClientDebug) { taosPrintLog("TQ WARN ", DEBUG_WARN, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqInfoC(...) do { if (cDebugFlag & DEBUG_INFO || tqClientDebug) { taosPrintLog("TQ ", DEBUG_INFO, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqDebugC(...) do { if (cDebugFlag & DEBUG_DEBUG || tqClientDebug) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTraceC(...) do { if (cDebugFlag & DEBUG_TRACE || tqClientDebug) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
@ -910,7 +910,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
(void)taosReleaseRef(tmqMgmt.rsetId, refId);
}
tqDebugFlag = rsp.debugFlag;
tqClientDebug = rsp.debugFlag;
tDestroySMqHbRsp(&rsp);
END:
@ -2004,11 +2004,12 @@ static void tmqGetRawDataRowsPrecisionFromRes(void* pRetrieve, void** rawData, i
}
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows,
SMqRspObj* pRspObj, SMqDataRsp* pDataRsp) {
SMqRspObj* pRspObj) {
pRspObj->resIter = -1;
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
SMqDataRsp* pDataRsp = &pRspObj->dataRsp;
bool needTransformSchema = !pDataRsp->withSchema;
if (!pDataRsp->withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
pDataRsp->withSchema = true;
@ -2276,12 +2277,12 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
}
pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP ? RES_TYPE__TMQ : RES_TYPE__TMQ_METADATA;
int64_t numOfRows = 0;
tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj, &pRspObj->dataRsp);
tmqBuildRspFromWrapperInner(pollRspWrapper, pVg, &numOfRows, pRspObj);
tmq->totalRows += numOfRows;
pVg->emptyBlockReceiveTs = 0;
if (tmq->replayEnable) {
pVg->blockReceiveTs = taosGetTimestampMs();
pVg->blockSleepForReplay = pollRspWrapper->dataRsp.sleepTime;
pVg->blockSleepForReplay = pRspObj->dataRsp.sleepTime;
if (pVg->blockSleepForReplay > 0) {
if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) {
tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64,
@ -2291,7 +2292,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
}
tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
pollRspWrapper->reqId);
}
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP || pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {

View File

@ -541,6 +541,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "tqClientDebug", tqClientDebug, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "udfDebugFlag", udfDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "smaDebugFlag", smaDebugFlag, 0, 255, CFG_SCOPE_SERVER, CFG_DYN_SERVER));
@ -1935,7 +1936,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"smaDebugFlag", &smaDebugFlag}, {"idxDebugFlag", &idxDebugFlag}, {"tdbDebugFlag", &tdbDebugFlag},
{"tmrDebugFlag", &tmrDebugFlag}, {"uDebugFlag", &uDebugFlag}, {"smaDebugFlag", &smaDebugFlag},
{"rpcDebugFlag", &rpcDebugFlag}, {"qDebugFlag", &qDebugFlag}, {"metaDebugFlag", &metaDebugFlag},
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag},
{"stDebugFlag", &stDebugFlag}, {"sndDebugFlag", &sndDebugFlag}, {"tqClientDebug", &tqClientDebug},
};
static OptionNameAndVar options[] = {{"audit", &tsEnableAudit},

View File

@ -10780,7 +10780,7 @@ _exit:
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
TAOS_CHECK_RETURN(tDecodeMqDataRspCommon(pDecoder, pRsp));
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &((SMqDataRsp *)pRsp)->sleepTime));
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime));
}
return 0;

View File

@ -244,7 +244,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
}
storeOffsetRows(pMnode, &req, pConsumer);
rsp.debugFlag = tqDebugFlag;
rsp.debugFlag = tqClientDebug;
code = buildMqHbRsp(pMsg, &rsp);
END:

View File

@ -125,6 +125,8 @@ int32_t idxDebugFlag = 131;
int32_t sndDebugFlag = 131;
int32_t simDebugFlag = 131;
int32_t tqClientDebug = 0;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;
int64_t dbgSmallWN = 0;