enh:[TS-5441] cost too long in tmq write meta data by cache meta and vg info

This commit is contained in:
wangmm0220 2024-10-15 16:17:58 +08:00
parent 61531e7339
commit 10436cacff
4 changed files with 8 additions and 7 deletions

View File

@ -831,8 +831,8 @@ static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){
} }
code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet); code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet);
if (code != 0){ if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d", tqErrorC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
} }
} }
@ -857,7 +857,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
return; return;
} }
code = innerCommitAll(tmq, pParamSet); code = innerCommitAll(tmq, pParamSet);
if (code != 0){ if (code != 0 && code != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE){
tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code)); tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code));
} }
@ -1270,7 +1270,9 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
}
goto END; goto END;
} }

View File

@ -69,7 +69,7 @@ typedef struct {
SVersionRange fillHistoryVer; SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow; STimeWindow fillHistoryWindow;
SStreamState* pState; SStreamState* pState;
int64_t suid; // for tmq // int64_t suid; // for tmq
} SStreamTaskInfo; } SStreamTaskInfo;
struct SExecTaskInfo { struct SExecTaskInfo {

View File

@ -1499,7 +1499,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN); tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid; // pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema); tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema; pTaskInfo->streamInfo.schema = mtInfo.schema;

View File

@ -2877,7 +2877,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
return 0; return 0;
} }
blockDataFreeRes((SSDataBlock*)pBlock);
code = calBlockTbName(pInfo, pInfo->pRes, 0); code = calBlockTbName(pInfo, pInfo->pRes, 0);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);