From 9c470e92971131a643e3e42fef37f0c3b8da1dce Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 7 Oct 2023 15:25:24 +0800 Subject: [PATCH 1/4] opti:set default offset to latest in subscription & return error if use snapshot when consume column --- docs/en/07-develop/07-tmq.mdx | 2 +- docs/zh/07-develop/07-tmq.md | 2 +- source/client/src/clientTmq.c | 35 +++++--------- source/dnode/vnode/src/tq/tqUtil.c | 5 ++ source/libs/executor/src/scanoperator.c | 63 ++++++++----------------- 5 files changed, 39 insertions(+), 68 deletions(-) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index d3a4b79d0b..6171edad70 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -349,7 +349,7 @@ You configure the following parameters when creating a consumer: | `td.connect.port` | string | Port of the server side | | | `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. | | `client.id` | string | Client ID | Maximum length: 192. | -| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior; `latest`: subscribe from the latest data; or `none`: can't subscribe without committed offset| +| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior(version <= 3.1.1.0); `latest`: subscribe from the latest data, this is the default behavior(version > 3.1.1.0); or `none`: can't subscribe without committed offset| | `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true | | `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds | | `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false diff --git a/docs/zh/07-develop/07-tmq.md b/docs/zh/07-develop/07-tmq.md index f0921af8d8..44c5dded70 100644 --- a/docs/zh/07-develop/07-tmq.md +++ b/docs/zh/07-develop/07-tmq.md @@ -348,7 +348,7 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name; | `td.connect.port` | integer | 服务端的端口号 | | | `group.id` | string | 消费组 ID,同一消费组共享消费进度 |
**必填项**。最大长度:192。
每个topic最多可建立100个 consumer group | | `client.id` | string | 客户端 ID | 最大长度:192。 | -| `auto.offset.reset` | enum | 消费组订阅的初始位置 |
`earliest`: default;从头开始订阅;
`latest`: 仅从最新数据开始订阅;
`none`: 没有提交的 offset 无法订阅 | +| `auto.offset.reset` | enum | 消费组订阅的初始位置 |
`earliest`: default(version <= 3.1.1.0);从头开始订阅;
`latest`: default(version > 3.1.1.0);仅从最新数据开始订阅;
`none`: 没有提交的 offset 无法订阅 | | `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true | | `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 | | `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 | diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e861bd4b92..43ce8a68f9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -26,8 +26,7 @@ #define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 - -#define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0) +#define DEFAULT_HEARTBEAT_INTERVAL 3000 typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam); @@ -63,8 +62,7 @@ struct tmq_conf_t { int8_t resetOffset; int8_t withTbName; int8_t snapEnable; - int32_t snapBatchSize; - bool hbBgEnable; +// int32_t snapBatchSize; uint16_t port; int32_t autoCommitInterval; char* ip; @@ -84,7 +82,6 @@ struct tmq_t { int32_t autoCommitInterval; int8_t resetOffsetCfg; uint64_t consumerId; - bool hbBgEnable; tmq_commit_cb* commitCb; void* commitCbUserParam; @@ -276,8 +273,7 @@ tmq_conf_t* tmq_conf_new() { conf->withTbName = false; conf->autoCommit = true; conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL; - conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST; - conf->hbBgEnable = true; + conf->resetOffset = TMQ_OFFSET__RESET_LATEST; return conf; } @@ -367,10 +363,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } - if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) { - conf->snapBatchSize = taosStr2int64(value); - return TMQ_CONF_OK; - } +// if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) { +// conf->snapBatchSize = taosStr2int64(value); +// return TMQ_CONF_OK; +// } // if (strcasecmp(key, "enable.heartbeat.background") == 0) { // if (strcasecmp(value, "true") == 0) { @@ -847,7 +843,7 @@ void tmqSendHbReq(void* param, void* tmrId) { OVER: tDeatroySMqHbReq(&req); - taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer); + taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer); taosReleaseRef(tmqMgmt.rsetId, refId); } @@ -1106,8 +1102,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->resetOffsetCfg = conf->resetOffset; taosInitRWLatch(&pTmq->lock); - pTmq->hbBgEnable = conf->hbBgEnable; - // assign consumerId pTmq->consumerId = tGenIdPI64(); @@ -1131,19 +1125,16 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { goto _failed; } - if (pTmq->hbBgEnable) { - int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); - *pRefId = pTmq->refId; - pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer); - } + int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); + *pRefId = pTmq->refId; + pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer); char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; tFormatOffset(buf, tListLen(buf), &offset); tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64 - ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d", - pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, - buf, pTmq->hbBgEnable); + ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s", + pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf); return pTmq; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index b77424d4a5..828681e2cf 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -109,6 +109,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot", consumerId, pHandle->subKey, vgId); + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ + tqError("tmq poll column can not use snapshot"); + terrno = TSDB_CODE_TQ_INVALID_CONFIG; + return -1; + } if (pHandle->fetchMeta) { tqOffsetResetToMeta(pOffsetVal, 0); } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 805635f603..297c0f8aff 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1831,54 +1831,29 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; } - if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { - SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); - if (pResult && pResult->info.rows > 0) { - tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey); - return pResult; - } + ASSERT(pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG); + while (1) { + bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); - STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); + SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); + struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); - pTSInfo->base.dataReader = NULL; - int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1; - qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer); - if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) { + // curVersion move to next + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); + + if (hasResult) { + qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, + pTaskInfo->streamInfo.currentOffset.version); + blockDataCleanup(pInfo->pRes); + STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; + setBlockIntoRes(pInfo, pRes, &defaultWindow, true); + if (pInfo->pRes->info.rows > 0) { + return pInfo->pRes; + } + } else { + qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); return NULL; } - - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer); - } - - if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { - - while (1) { - bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); - - SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); - struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); - - // curVersion move to next - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); - - if (hasResult) { - qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, - pTaskInfo->streamInfo.currentOffset.version); - blockDataCleanup(pInfo->pRes); - STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; - setBlockIntoRes(pInfo, pRes, &defaultWindow, true); - if (pInfo->pRes->info.rows > 0) { - return pInfo->pRes; - } - } else { - qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); - return NULL; - } - } - } else { - qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type); - return NULL; } } From fb3197ea89ca820c6cbe2e0892f6dec3b0774da1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 8 Oct 2023 10:55:35 +0800 Subject: [PATCH 2/4] fix:test case error for reset=latest --- source/dnode/vnode/src/tq/tqUtil.c | 6 -- source/libs/executor/src/scanoperator.c | 64 +++++++++++++------ tests/script/tsim/tmq/basic1.sim | 2 +- tests/script/tsim/tmq/basic1Of2Cons.sim | 2 +- tests/script/tsim/tmq/basic2.sim | 2 +- tests/script/tsim/tmq/basic2Of2Cons.sim | 2 +- .../script/tsim/tmq/basic2Of2ConsOverlap.sim | 2 +- tests/script/tsim/tmq/basic3.sim | 2 +- tests/script/tsim/tmq/basic3Of2Cons.sim | 2 +- tests/script/tsim/tmq/basic4.sim | 2 +- tests/script/tsim/tmq/basic4Of2Cons.sim | 2 +- 11 files changed, 54 insertions(+), 34 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 828681e2cf..d122a9b0b5 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -108,12 +108,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand if (pRequest->useSnapshot) { tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot", consumerId, pHandle->subKey, vgId); - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ - tqError("tmq poll column can not use snapshot"); - terrno = TSDB_CODE_TQ_INVALID_CONFIG; - return -1; - } if (pHandle->fetchMeta) { tqOffsetResetToMeta(pOffsetVal, 0); } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 297c0f8aff..3e085d71de 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1831,30 +1831,56 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; } - ASSERT(pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG); - while (1) { - bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); + if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + if (pResult && pResult->info.rows > 0) { + tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey); + return pResult; + } - SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); - struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); - // curVersion move to next - tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); - - if (hasResult) { - qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, - pTaskInfo->streamInfo.currentOffset.version); - blockDataCleanup(pInfo->pRes); - STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; - setBlockIntoRes(pInfo, pRes, &defaultWindow, true); - if (pInfo->pRes->info.rows > 0) { - return pInfo->pRes; - } - } else { - qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); + pTSInfo->base.dataReader = NULL; + int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1; + qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer); + if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) { return NULL; } + + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer); } + + if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { + + while (1) { + bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); + + SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader); + struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); + + // curVersion move to next + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion); + + if (hasResult) { + qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, + pTaskInfo->streamInfo.currentOffset.version); + blockDataCleanup(pInfo->pRes); + STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; + setBlockIntoRes(pInfo, pRes, &defaultWindow, true); + if (pInfo->pRes->info.rows > 0) { + return pInfo->pRes; + } + } else { + qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version); + return NULL; + } + } + } else { + qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type); + return NULL; + } +} } static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) { diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index fe6ec04a20..80e048a9e4 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic1Of2Cons.sim b/tests/script/tsim/tmq/basic1Of2Cons.sim index c12351cbe8..76e9775fc9 100644 --- a/tests/script/tsim/tmq/basic1Of2Cons.sim +++ b/tests/script/tsim/tmq/basic1Of2Cons.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2.sim b/tests/script/tsim/tmq/basic2.sim index 5c7528ea5d..57bcb40921 100644 --- a/tests/script/tsim/tmq/basic2.sim +++ b/tests/script/tsim/tmq/basic2.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2Of2Cons.sim b/tests/script/tsim/tmq/basic2Of2Cons.sim index 23598c17a4..44b6a4b591 100644 --- a/tests/script/tsim/tmq/basic2Of2Cons.sim +++ b/tests/script/tsim/tmq/basic2Of2Cons.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim index 1223a94fa7..b279a13826 100644 --- a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim +++ b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic3.sim b/tests/script/tsim/tmq/basic3.sim index 8bb34cefa2..b66917623e 100644 --- a/tests/script/tsim/tmq/basic3.sim +++ b/tests/script/tsim/tmq/basic3.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic3Of2Cons.sim b/tests/script/tsim/tmq/basic3Of2Cons.sim index 75d762c44b..6284356e91 100644 --- a/tests/script/tsim/tmq/basic3Of2Cons.sim +++ b/tests/script/tsim/tmq/basic3Of2Cons.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic4.sim b/tests/script/tsim/tmq/basic4.sim index c72d8ff412..290d611cbc 100644 --- a/tests/script/tsim/tmq/basic4.sim +++ b/tests/script/tsim/tmq/basic4.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic4Of2Cons.sim b/tests/script/tsim/tmq/basic4Of2Cons.sim index bb006a354c..0a3b4d2ee0 100644 --- a/tests/script/tsim/tmq/basic4Of2Cons.sim +++ b/tests/script/tsim/tmq/basic4Of2Cons.sim @@ -63,7 +63,7 @@ $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 #$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList From 450d079839d3da74d8bc36572f6fd5edf51c5533 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 8 Oct 2023 11:04:30 +0800 Subject: [PATCH 3/4] fix:compile error --- source/libs/executor/src/scanoperator.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3e085d71de..805635f603 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1881,7 +1881,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; } } -} static int32_t filterDelBlockByUid(SSDataBlock* pDst, const SSDataBlock* pSrc, SStreamScanInfo* pInfo) { STqReader* pReader = pInfo->tqReader; From be27e13ed9a3b66bd74f2c849e84fc957d6b2b21 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 8 Oct 2023 15:29:02 +0800 Subject: [PATCH 4/4] fix:test case error for reset=latest --- tests/script/tsim/tmq/basic1.sim | 2 +- tests/script/tsim/tmq/basic1Of2Cons.sim | 2 +- tests/script/tsim/tmq/basic2.sim | 2 +- tests/script/tsim/tmq/basic2Of2Cons.sim | 2 +- tests/script/tsim/tmq/basic2Of2ConsOverlap.sim | 2 +- tests/script/tsim/tmq/basic3.sim | 2 +- tests/script/tsim/tmq/basic3Of2Cons.sim | 2 +- tests/script/tsim/tmq/basic4.sim | 2 +- tests/script/tsim/tmq/basic4Of2Cons.sim | 2 +- tests/script/tsim/tmq/snapshot.sim | 4 ++-- tests/script/tsim/tmq/snapshot1.sim | 4 ++-- tests/system-test/7-tmq/tmqParamsTest.py | 4 ++-- utils/test/c/tmq_taosx_ci.c | 1 + 13 files changed, 16 insertions(+), 15 deletions(-) diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index 80e048a9e4..4ef0c121f6 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic1Of2Cons.sim b/tests/script/tsim/tmq/basic1Of2Cons.sim index 76e9775fc9..d2906ec875 100644 --- a/tests/script/tsim/tmq/basic1Of2Cons.sim +++ b/tests/script/tsim/tmq/basic1Of2Cons.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2.sim b/tests/script/tsim/tmq/basic2.sim index 57bcb40921..4477101d0f 100644 --- a/tests/script/tsim/tmq/basic2.sim +++ b/tests/script/tsim/tmq/basic2.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2Of2Cons.sim b/tests/script/tsim/tmq/basic2Of2Cons.sim index 44b6a4b591..951a1d52fd 100644 --- a/tests/script/tsim/tmq/basic2Of2Cons.sim +++ b/tests/script/tsim/tmq/basic2Of2Cons.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim index b279a13826..8cc447f0c7 100644 --- a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim +++ b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic3.sim b/tests/script/tsim/tmq/basic3.sim index b66917623e..da2bee4f6b 100644 --- a/tests/script/tsim/tmq/basic3.sim +++ b/tests/script/tsim/tmq/basic3.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic3Of2Cons.sim b/tests/script/tsim/tmq/basic3Of2Cons.sim index 6284356e91..21d691bd9c 100644 --- a/tests/script/tsim/tmq/basic3Of2Cons.sim +++ b/tests/script/tsim/tmq/basic3Of2Cons.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic4.sim b/tests/script/tsim/tmq/basic4.sim index 290d611cbc..adeab58ff2 100644 --- a/tests/script/tsim/tmq/basic4.sim +++ b/tests/script/tsim/tmq/basic4.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/basic4Of2Cons.sim b/tests/script/tsim/tmq/basic4Of2Cons.sim index 0a3b4d2ee0..186005b231 100644 --- a/tests/script/tsim/tmq/basic4Of2Cons.sim +++ b/tests/script/tsim/tmq/basic4Of2Cons.sim @@ -62,7 +62,7 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , +$keyList = $keyList . , $keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/snapshot.sim b/tests/script/tsim/tmq/snapshot.sim index fbdaba7d28..c0194d98c8 100644 --- a/tests/script/tsim/tmq/snapshot.sim +++ b/tests/script/tsim/tmq/snapshot.sim @@ -62,8 +62,8 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . , +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/script/tsim/tmq/snapshot1.sim b/tests/script/tsim/tmq/snapshot1.sim index 5349981cc7..6121692d6c 100644 --- a/tests/script/tsim/tmq/snapshot1.sim +++ b/tests/script/tsim/tmq/snapshot1.sim @@ -62,8 +62,8 @@ $keyList = $keyList . , $keyList = $keyList . enable.auto.commit:false #$keyList = $keyList . , #$keyList = $keyList . auto.commit.interval.ms:6000 -#$keyList = $keyList . , -#$keyList = $keyList . auto.offset.reset:earliest +$keyList = $keyList . , +$keyList = $keyList . auto.offset.reset:earliest $keyList = $keyList . ' print ========== key list: $keyList diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index ff7c70bcd2..0e9e8f989f 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -19,7 +19,7 @@ class TDTestCase: self.wal_retention_period1 = 3600 self.wal_retention_period2 = 1 self.commit_value_list = ["true", "false"] - self.offset_value_list = ["", "earliest", "latest", "none"] + self.offset_value_list = ["earliest", "latest", "none"] self.tbname_value_list = ["true", "false"] self.snapshot_value_list = ["false"] @@ -92,7 +92,7 @@ class TDTestCase: } consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 - consumer_ret = "earliest" if offset_value == "" else offset_value + consumer_ret = "latest" if offset_value == "" else offset_value expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]}ms,reset:{consumer_ret}' if len(offset_value) == 0: del consumer_dict["auto.offset.reset"] diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 5d4d73c448..ff89bb1f75 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -547,6 +547,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); if (g_conf.snapShot) { tmq_conf_set(conf, "experimental.snapshot.enable", "true");