From 10436cacffbf07ddcccca22b392f521d6bada41b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 15 Oct 2024 16:17:58 +0800 Subject: [PATCH] enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info --- source/client/src/clientTmq.c | 10 ++++++---- source/libs/executor/inc/querytask.h | 2 +- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/scanoperator.c | 1 - 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 020bf25e40..ce701a755c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -831,8 +831,8 @@ static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){ } code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet); - if (code != 0){ - tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d", + if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){ + tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); } } @@ -857,7 +857,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us return; } code = innerCommitAll(tmq, pParamSet); - if (code != 0){ + if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){ tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code)); } @@ -1270,7 +1270,9 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { } if (code != TSDB_CODE_SUCCESS) { - tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); + if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){ + tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); + } goto END; } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 3c52f8080e..c9e65bacaf 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -69,7 +69,7 @@ typedef struct { SVersionRange fillHistoryVer; STimeWindow fillHistoryWindow; SStreamState* pState; - int64_t suid; // for tmq +// int64_t suid; // for tmq } SStreamTaskInfo; struct SExecTaskInfo { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 4e5a1d2e1e..4cfa12be5b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1499,7 +1499,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN); - pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid; +// pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid; tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3713a2c071..bae9926f63 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2877,7 +2877,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock return 0; } - blockDataFreeRes((SSDataBlock*)pBlock); code = calBlockTbName(pInfo, pInfo->pRes, 0); QUERY_CHECK_CODE(code, lino, _end);