opti:set default offset to latest in subscription & return error if use snapshot when consume column
This commit is contained in:
parent
c26d74355c
commit
9c470e9297
|
@ -349,7 +349,7 @@ You configure the following parameters when creating a consumer:
|
|||
| `td.connect.port` | string | Port of the server side | |
|
||||
| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. Each topic can create up to 100 consumer groups. |
|
||||
| `client.id` | string | Client ID | Maximum length: 192. |
|
||||
| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior; `latest`: subscribe from the latest data; or `none`: can't subscribe without committed offset|
|
||||
| `auto.offset.reset` | enum | Initial offset for the consumer group | `earliest`: subscribe from the earliest data, this is the default behavior(version <= 3.1.1.0); `latest`: subscribe from the latest data, this is the default behavior(version > 3.1.1.0); or `none`: can't subscribe without committed offset|
|
||||
| `enable.auto.commit` | boolean | Commit automatically; true: user application doesn't need to explicitly commit; false: user application need to handle commit by itself | Default value is true |
|
||||
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
|
||||
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | default value: false
|
||||
|
|
|
@ -348,7 +348,7 @@ CREATE TOPIC topic_name [with meta] AS DATABASE db_name;
|
|||
| `td.connect.port` | integer | 服务端的端口号 | |
|
||||
| `group.id` | string | 消费组 ID,同一消费组共享消费进度 | <br />**必填项**。最大长度:192。<br />每个topic最多可建立100个 consumer group |
|
||||
| `client.id` | string | 客户端 ID | 最大长度:192。 |
|
||||
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default;从头开始订阅; <br/>`latest`: 仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
|
||||
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | <br />`earliest`: default(version <= 3.1.1.0);从头开始订阅; <br/>`latest`: default(version > 3.1.1.0);仅从最新数据开始订阅; <br/>`none`: 没有提交的 offset 无法订阅 |
|
||||
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交,true: 自动提交,客户端应用无需commit;false:客户端应用需要自行commit | 默认值为 true |
|
||||
| `auto.commit.interval.ms` | integer | 消费记录自动提交消费位点时间间隔,单位为毫秒 | 默认值为 5000 |
|
||||
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) |默认关闭 |
|
||||
|
|
|
@ -26,8 +26,7 @@
|
|||
|
||||
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
|
||||
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
|
||||
|
||||
#define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0)
|
||||
#define DEFAULT_HEARTBEAT_INTERVAL 3000
|
||||
|
||||
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
|
||||
|
||||
|
@ -63,8 +62,7 @@ struct tmq_conf_t {
|
|||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
int8_t snapEnable;
|
||||
int32_t snapBatchSize;
|
||||
bool hbBgEnable;
|
||||
// int32_t snapBatchSize;
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
|
@ -84,7 +82,6 @@ struct tmq_t {
|
|||
int32_t autoCommitInterval;
|
||||
int8_t resetOffsetCfg;
|
||||
uint64_t consumerId;
|
||||
bool hbBgEnable;
|
||||
tmq_commit_cb* commitCb;
|
||||
void* commitCbUserParam;
|
||||
|
||||
|
@ -276,8 +273,7 @@ tmq_conf_t* tmq_conf_new() {
|
|||
conf->withTbName = false;
|
||||
conf->autoCommit = true;
|
||||
conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
|
||||
conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
|
||||
conf->hbBgEnable = true;
|
||||
conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
@ -367,10 +363,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
}
|
||||
}
|
||||
|
||||
if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
|
||||
conf->snapBatchSize = taosStr2int64(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
// if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
|
||||
// conf->snapBatchSize = taosStr2int64(value);
|
||||
// return TMQ_CONF_OK;
|
||||
// }
|
||||
|
||||
// if (strcasecmp(key, "enable.heartbeat.background") == 0) {
|
||||
// if (strcasecmp(value, "true") == 0) {
|
||||
|
@ -847,7 +843,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
|
||||
OVER:
|
||||
tDeatroySMqHbReq(&req);
|
||||
taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
||||
taosTmrReset(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, param, tmqMgmt.timer, &tmq->hbLiveTimer);
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
}
|
||||
|
||||
|
@ -1106,8 +1102,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
pTmq->resetOffsetCfg = conf->resetOffset;
|
||||
taosInitRWLatch(&pTmq->lock);
|
||||
|
||||
pTmq->hbBgEnable = conf->hbBgEnable;
|
||||
|
||||
// assign consumerId
|
||||
pTmq->consumerId = tGenIdPI64();
|
||||
|
||||
|
@ -1131,19 +1125,16 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
goto _failed;
|
||||
}
|
||||
|
||||
if (pTmq->hbBgEnable) {
|
||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
|
||||
}
|
||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, DEFAULT_HEARTBEAT_INTERVAL, pRefId, tmqMgmt.timer);
|
||||
|
||||
char buf[TSDB_OFFSET_LEN] = {0};
|
||||
STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
|
||||
tFormatOffset(buf, tListLen(buf), &offset);
|
||||
tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
|
||||
", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
|
||||
pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
|
||||
buf, pTmq->hbBgEnable);
|
||||
", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s",
|
||||
pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf);
|
||||
|
||||
return pTmq;
|
||||
|
||||
|
|
|
@ -109,6 +109,11 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
|||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey:%s, vgId:%d, (earliest) set offset to be snapshot",
|
||||
consumerId, pHandle->subKey, vgId);
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
|
||||
tqError("tmq poll column can not use snapshot");
|
||||
terrno = TSDB_CODE_TQ_INVALID_CONFIG;
|
||||
return -1;
|
||||
}
|
||||
if (pHandle->fetchMeta) {
|
||||
tqOffsetResetToMeta(pOffsetVal, 0);
|
||||
} else {
|
||||
|
|
|
@ -1831,54 +1831,29 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||
if (pResult && pResult->info.rows > 0) {
|
||||
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
|
||||
return pResult;
|
||||
}
|
||||
ASSERT(pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG);
|
||||
while (1) {
|
||||
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);
|
||||
|
||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);
|
||||
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
|
||||
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
|
||||
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
int64_t validVer = pTaskInfo->streamInfo.snapshotVer + 1;
|
||||
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", validVer);
|
||||
if (pAPI->tqReaderFn.tqReaderSeek(pInfo->tqReader, validVer, pTaskInfo->id.str) < 0) {
|
||||
// curVersion move to next
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
|
||||
|
||||
if (hasResult) {
|
||||
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
|
||||
pTaskInfo->streamInfo.currentOffset.version);
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
} else {
|
||||
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, validVer);
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
|
||||
|
||||
while (1) {
|
||||
bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id);
|
||||
|
||||
SSDataBlock* pRes = pAPI->tqReaderFn.tqGetResultBlock(pInfo->tqReader);
|
||||
struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader);
|
||||
|
||||
// curVersion move to next
|
||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pWalReader->curVersion);
|
||||
|
||||
if (hasResult) {
|
||||
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
|
||||
pTaskInfo->streamInfo.currentOffset.version);
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
} else {
|
||||
qDebug("doQueueScan get none from log, return, version:%" PRId64, pTaskInfo->streamInfo.currentOffset.version);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.currentOffset.type);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue