Merge branch 'main' of https://github.com/taosdata/TDengine into fix/TS-3039
This commit is contained in:
commit
2dcf68fc5e
|
@ -2,7 +2,7 @@
|
||||||
# taosadapter
|
# taosadapter
|
||||||
ExternalProject_Add(taosadapter
|
ExternalProject_Add(taosadapter
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
|
||||||
GIT_TAG d8059ff
|
GIT_TAG cb1e89c
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
# taos-tools
|
# taos-tools
|
||||||
ExternalProject_Add(taos-tools
|
ExternalProject_Add(taos-tools
|
||||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||||
GIT_TAG e82b9fc
|
GIT_TAG d194dc9
|
||||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||||
BINARY_DIR ""
|
BINARY_DIR ""
|
||||||
#BUILD_IN_SOURCE TRUE
|
#BUILD_IN_SOURCE TRUE
|
||||||
|
|
|
@ -293,7 +293,6 @@ You configure the following parameters when creating a consumer:
|
||||||
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
|
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
|
||||||
| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
|
| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
|
||||||
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
|
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
|
||||||
| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
|
|
||||||
| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
|
| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
|
||||||
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |
|
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |
|
||||||
|
|
||||||
|
@ -368,7 +367,6 @@ conf := &tmq.ConfigMap{
|
||||||
"td.connect.port": "6030",
|
"td.connect.port": "6030",
|
||||||
"client.id": "test_tmq_c",
|
"client.id": "test_tmq_c",
|
||||||
"enable.auto.commit": "false",
|
"enable.auto.commit": "false",
|
||||||
"enable.heartbeat.background": "true",
|
|
||||||
"experimental.snapshot.enable": "true",
|
"experimental.snapshot.enable": "true",
|
||||||
"msg.with.table.name": "true",
|
"msg.with.table.name": "true",
|
||||||
}
|
}
|
||||||
|
@ -418,7 +416,6 @@ Python programs use the following parameters:
|
||||||
| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
|
| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
|
||||||
| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
|
| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
|
||||||
| `experimental.snapshot.enable` | string | Specify whether to consume messages from the WAL or from TSDB | Specify `true` or `false` |
|
| `experimental.snapshot.enable` | string | Specify whether to consume messages from the WAL or from TSDB | Specify `true` or `false` |
|
||||||
| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` |
|
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,6 @@ func main() {
|
||||||
"td.connect.port": "6030",
|
"td.connect.port": "6030",
|
||||||
"client.id": "test_tmq_client",
|
"client.id": "test_tmq_client",
|
||||||
"enable.auto.commit": "false",
|
"enable.auto.commit": "false",
|
||||||
"enable.heartbeat.background": "true",
|
|
||||||
"experimental.snapshot.enable": "true",
|
"experimental.snapshot.enable": "true",
|
||||||
"msg.with.table.name": "true",
|
"msg.with.table.name": "true",
|
||||||
})
|
})
|
||||||
|
|
|
@ -291,7 +291,6 @@ CREATE TOPIC topic_name AS DATABASE db_name;
|
||||||
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
|
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
|
||||||
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交 | 合法值:`true`, `false`。 |
|
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交 | 合法值:`true`, `false`。 |
|
||||||
| `auto.commit.interval.ms` | integer | 以毫秒为单位的消费记录自动提交消费位点时间间 | 默认 5000 m |
|
| `auto.commit.interval.ms` | integer | 以毫秒为单位的消费记录自动提交消费位点时间间 | 默认 5000 m |
|
||||||
| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 默认开启 |
|
|
||||||
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据 | 实验功能,默认关闭 |
|
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据 | 实验功能,默认关闭 |
|
||||||
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) | |
|
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) | |
|
||||||
|
|
||||||
|
@ -366,7 +365,6 @@ conf := &tmq.ConfigMap{
|
||||||
"td.connect.port": "6030",
|
"td.connect.port": "6030",
|
||||||
"client.id": "test_tmq_c",
|
"client.id": "test_tmq_c",
|
||||||
"enable.auto.commit": "false",
|
"enable.auto.commit": "false",
|
||||||
"enable.heartbeat.background": "true",
|
|
||||||
"experimental.snapshot.enable": "true",
|
"experimental.snapshot.enable": "true",
|
||||||
"msg.with.table.name": "true",
|
"msg.with.table.name": "true",
|
||||||
}
|
}
|
||||||
|
@ -418,7 +416,6 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
|
||||||
| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms |
|
| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms |
|
||||||
| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
|
| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
|
||||||
| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
|
| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
|
||||||
| `enable.heartbeat.background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
|
|
||||||
|
|
||||||
</TabItem>
|
</TabItem>
|
||||||
|
|
||||||
|
|
|
@ -320,15 +320,16 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcasecmp(key, "enable.heartbeat.background") == 0) {
|
if (strcasecmp(key, "enable.heartbeat.background") == 0) {
|
||||||
if (strcasecmp(value, "true") == 0) {
|
// if (strcasecmp(value, "true") == 0) {
|
||||||
conf->hbBgEnable = true;
|
// conf->hbBgEnable = true;
|
||||||
return TMQ_CONF_OK;
|
// return TMQ_CONF_OK;
|
||||||
} else if (strcasecmp(value, "false") == 0) {
|
// } else if (strcasecmp(value, "false") == 0) {
|
||||||
conf->hbBgEnable = false;
|
// conf->hbBgEnable = false;
|
||||||
return TMQ_CONF_OK;
|
// return TMQ_CONF_OK;
|
||||||
} else {
|
// } else {
|
||||||
|
tscError("the default value of enable.heartbeat.background is true, can not be seted");
|
||||||
return TMQ_CONF_INVALID;
|
return TMQ_CONF_INVALID;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcasecmp(key, "td.connect.ip") == 0) {
|
if (strcasecmp(key, "td.connect.ip") == 0) {
|
||||||
|
|
|
@ -1835,6 +1835,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pUpdateRes, "recover update");
|
printDataBlock(pInfo->pUpdateRes, "recover update");
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
} break;
|
} break;
|
||||||
|
case STREAM_SCAN_FROM_DELETE_DATA: {
|
||||||
|
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||||
|
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
|
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||||
|
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
|
||||||
|
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
|
||||||
|
printDataBlock(pInfo->pDeleteDataRes, "recover delete");
|
||||||
|
return pInfo->pDeleteDataRes;
|
||||||
|
} break;
|
||||||
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
||||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||||
if (pSDB) {
|
if (pSDB) {
|
||||||
|
@ -2027,6 +2036,7 @@ FETCH_NEXT_BLOCK:
|
||||||
copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
|
copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
|
||||||
blockDataCleanup(pSup->pScanBlock);
|
blockDataCleanup(pSup->pScanBlock);
|
||||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||||
|
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2457,7 +2457,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.name = "interp",
|
.name = "interp",
|
||||||
.type = FUNCTION_TYPE_INTERP,
|
.type = FUNCTION_TYPE_INTERP,
|
||||||
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
|
||||||
FUNC_MGT_FORBID_STREAM_FUNC,
|
FUNC_MGT_FORBID_STREAM_FUNC|FUNC_MGT_KEEP_ORDER_FUNC,
|
||||||
.translateFunc = translateInterp,
|
.translateFunc = translateInterp,
|
||||||
.getEnvFunc = getSelectivityFuncEnv,
|
.getEnvFunc = getSelectivityFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -3278,7 +3278,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "_irowts",
|
.name = "_irowts",
|
||||||
.type = FUNCTION_TYPE_IROWTS,
|
.type = FUNCTION_TYPE_IROWTS,
|
||||||
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC,
|
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC|FUNC_MGT_KEEP_ORDER_FUNC,
|
||||||
.translateFunc = translateTimePseudoColumn,
|
.translateFunc = translateTimePseudoColumn,
|
||||||
.getEnvFunc = getTimePseudoFuncEnv,
|
.getEnvFunc = getTimePseudoFuncEnv,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
|
|
|
@ -871,6 +871,12 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
|
||||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||||
|
|
||||||
|
// group_key function has its own process function
|
||||||
|
// do not process there
|
||||||
|
if (fmIsGroupKeyFunc(pc->functionId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
if (nullList[j]) {
|
if (nullList[j]) {
|
||||||
colDataSetNULL(pDstCol, rowIndex);
|
colDataSetNULL(pDstCol, rowIndex);
|
||||||
|
@ -3091,6 +3097,12 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
|
||||||
for (int32_t i = 0; i < pSubsidiaryies->num; ++i) {
|
for (int32_t i = 0; i < pSubsidiaryies->num; ++i) {
|
||||||
SqlFunctionCtx* pc = pSubsidiaryies->pCtx[i];
|
SqlFunctionCtx* pc = pSubsidiaryies->pCtx[i];
|
||||||
|
|
||||||
|
// group_key function has its own process function
|
||||||
|
// do not process there
|
||||||
|
if (fmIsGroupKeyFunc(pc->functionId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||||
|
|
||||||
|
|
|
@ -757,7 +757,7 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
|
||||||
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType ||
|
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType ||
|
||||||
FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
|
FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
|
||||||
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
|
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
|
||||||
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) {
|
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || FUNCTION_TYPE_IROWTS == pFunc->funcType) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ system sh/exec.sh -n dnode1 -s start
|
||||||
sleep 50
|
sleep 50
|
||||||
sql connect
|
sql connect
|
||||||
|
|
||||||
|
print step 1
|
||||||
print =============== create database
|
print =============== create database
|
||||||
sql create database test vgroups 4;
|
sql create database test vgroups 4;
|
||||||
sql select * from information_schema.ins_databases;
|
sql select * from information_schema.ins_databases;
|
||||||
|
@ -33,8 +34,8 @@ if $loop_count == 10 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select * from streamt1;
|
sql select * from streamt1;
|
||||||
print data00 data01
|
print $data00 $data01
|
||||||
print data10 data11
|
print $data10 $data11
|
||||||
|
|
||||||
if $rows != 0 then
|
if $rows != 0 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
|
@ -52,8 +53,8 @@ if $loop_count == 10 then
|
||||||
endi
|
endi
|
||||||
|
|
||||||
sql select * from streamt1;
|
sql select * from streamt1;
|
||||||
print data00 data01
|
print $data00 $data01
|
||||||
print data10 data11
|
print $data10 $data11
|
||||||
|
|
||||||
if $rows != 1 then
|
if $rows != 1 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
|
@ -92,9 +93,64 @@ endi
|
||||||
sql select * from streamt1;
|
sql select * from streamt1;
|
||||||
if $rows != 2 then
|
if $rows != 2 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
goto loop2
|
goto loop3
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print step 1 over
|
||||||
|
print step 2
|
||||||
|
|
||||||
|
sql create database test2 vgroups 1;
|
||||||
|
sql use test2;
|
||||||
|
sql create table t1(ts timestamp, a int, b int , c int, d double);
|
||||||
|
print create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a)
|
||||||
|
sql create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a);
|
||||||
|
|
||||||
|
sql insert into t1 values(1648791213000,1,2,3,1.0);
|
||||||
|
sql insert into t1 values(1648791213010,1,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop4:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql select * from streamt2;
|
||||||
|
print $data00 $data01
|
||||||
|
print $data10 $data11
|
||||||
|
|
||||||
|
if $rows != 1 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
print insert into t1 values(1648791213005,2,2,3,1.1)
|
||||||
|
sql insert into t1 values(1648791213005,2,2,3,1.1);
|
||||||
|
|
||||||
|
$loop_count = 0
|
||||||
|
loop5:
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
$loop_count = $loop_count + 1
|
||||||
|
if $loop_count == 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print select * from streamt2
|
||||||
|
sql select * from streamt2;
|
||||||
|
print $data00 $data01
|
||||||
|
print $data10 $data11
|
||||||
|
print $data20 $data21
|
||||||
|
print $data30 $data31
|
||||||
|
|
||||||
|
if $rows != 3 then
|
||||||
|
print =====rows=$rows
|
||||||
|
goto loop5
|
||||||
|
endi
|
||||||
|
|
||||||
|
print step 2 over
|
||||||
|
|
||||||
print state1 end
|
print state1 end
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
"auto.offset.reset": "earliest",
|
"auto.offset.reset": "earliest",
|
||||||
"enable.auto.commit": "true",
|
"enable.auto.commit": "true",
|
||||||
"auto.commit.interval.ms": 1000,
|
"auto.commit.interval.ms": 1000,
|
||||||
"enable.heartbeat.background": "true",
|
|
||||||
"experimental.snapshot.enable": "true",
|
"experimental.snapshot.enable": "true",
|
||||||
"msg.with.table.name": "false",
|
"msg.with.table.name": "false",
|
||||||
"topic_list": [
|
"topic_list": [
|
||||||
|
|
|
@ -542,7 +542,6 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "enable.heartbeat.background", "true");
|
|
||||||
|
|
||||||
if (g_conf.snapShot) {
|
if (g_conf.snapShot) {
|
||||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||||
|
|
Loading…
Reference in New Issue