From a5c8713517f751a0999527c0d08499308d9cf8a4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Mar 2023 09:40:16 +0800 Subject: [PATCH 1/7] opti:disable set enable.heartbeat.background --- source/client/src/clientTmq.c | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 111ca28cdc..9e585f2957 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -320,15 +320,16 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } if (strcasecmp(key, "enable.heartbeat.background") == 0) { - if (strcasecmp(value, "true") == 0) { - conf->hbBgEnable = true; - return TMQ_CONF_OK; - } else if (strcasecmp(value, "false") == 0) { - conf->hbBgEnable = false; - return TMQ_CONF_OK; - } else { +// if (strcasecmp(value, "true") == 0) { +// conf->hbBgEnable = true; +// return TMQ_CONF_OK; +// } else if (strcasecmp(value, "false") == 0) { +// conf->hbBgEnable = false; +// return TMQ_CONF_OK; +// } else { + tscError("the default value of enable.heartbeat.background is true, can not be seted"); return TMQ_CONF_INVALID; - } +// } } if (strcasecmp(key, "td.connect.ip") == 0) { From b7e6e4197dbc125093759eaea41afa6095803e6c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 30 Mar 2023 11:18:35 +0800 Subject: [PATCH 2/7] opti:disable set enable.heartbeat.background --- docs/en/07-develop/07-tmq.mdx | 3 --- docs/examples/go/sub/main.go | 1 - docs/zh/07-develop/07-tmq.mdx | 3 --- tests/system-test/0-others/tmqBasic.json | 1 - utils/test/c/tmq_taosx_ci.c | 1 - 5 files changed, 9 deletions(-) diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index c85109d3c5..dc0f07d3d5 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -293,7 +293,6 @@ You configure the following parameters when creating a consumer: | `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) | | `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. | | `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds | -| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | | | `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | | | `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | @@ -368,7 +367,6 @@ conf := &tmq.ConfigMap{ "td.connect.port": "6030", "client.id": "test_tmq_c", "enable.auto.commit": "false", - "enable.heartbeat.background": "true", "experimental.snapshot.enable": "true", "msg.with.table.name": "true", } @@ -418,7 +416,6 @@ Python programs use the following parameters: | `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | | | `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) | | `experimental.snapshot.enable` | string | Specify whether to consume messages from the WAL or from TSDB | Specify `true` or `false` | -| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` | diff --git a/docs/examples/go/sub/main.go b/docs/examples/go/sub/main.go index 1f7218936f..334b8f290b 100644 --- a/docs/examples/go/sub/main.go +++ b/docs/examples/go/sub/main.go @@ -35,7 +35,6 @@ func main() { "td.connect.port": "6030", "client.id": "test_tmq_client", "enable.auto.commit": "false", - "enable.heartbeat.background": "true", "experimental.snapshot.enable": "true", "msg.with.table.name": "true", }) diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index fb171042d9..5771e5053a 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -291,7 +291,6 @@ CREATE TOPIC topic_name AS DATABASE db_name; | `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` | | `enable.auto.commit` | boolean | 是否启用消费位点自动提交 | 合法值:`true`, `false`。 | | `auto.commit.interval.ms` | integer | 以毫秒为单位的消费记录自动提交消费位点时间间 | 默认 5000 m | -| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 默认开启 | | `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据 | 实验功能,默认关闭 | | `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) | | @@ -366,7 +365,6 @@ conf := &tmq.ConfigMap{ "td.connect.port": "6030", "client.id": "test_tmq_c", "enable.auto.commit": "false", - "enable.heartbeat.background": "true", "experimental.snapshot.enable": "true", "msg.with.table.name": "true", } @@ -418,7 +416,6 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) | `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值:5000 ms | | `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` | | `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` | -| `enable.heartbeat.background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` | diff --git a/tests/system-test/0-others/tmqBasic.json b/tests/system-test/0-others/tmqBasic.json index 24e815708a..d716bff3ac 100644 --- a/tests/system-test/0-others/tmqBasic.json +++ b/tests/system-test/0-others/tmqBasic.json @@ -14,7 +14,6 @@ "auto.offset.reset": "earliest", "enable.auto.commit": "true", "auto.commit.interval.ms": 1000, - "enable.heartbeat.background": "true", "experimental.snapshot.enable": "true", "msg.with.table.name": "false", "topic_list": [ diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 1f25eae366..6661d6d14a 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -542,7 +542,6 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "enable.heartbeat.background", "true"); if (g_conf.snapShot) { tmq_conf_set(conf, "experimental.snapshot.enable", "true"); From ae98ad43a5b4ea5daf3ddf63e642fc612ce4a336 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 30 Mar 2023 16:35:56 +0800 Subject: [PATCH 3/7] fix:state window return wrong block type --- source/libs/executor/src/scanoperator.c | 10 ++++ tests/script/tsim/stream/state1.sim | 66 +++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 84317c825b..1e5c6c2168 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1829,6 +1829,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { printDataBlock(pInfo->pUpdateRes, "recover update"); return pInfo->pUpdateRes; } break; + case STREAM_SCAN_FROM_DELETE_DATA: { + generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); + prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes); + pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA; + printDataBlock(pInfo->pDeleteDataRes, "recover delete"); + return pInfo->pDeleteDataRes; + } break; case STREAM_SCAN_FROM_DATAREADER_RANGE: { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { @@ -2021,6 +2030,7 @@ FETCH_NEXT_BLOCK: copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock); blockDataCleanup(pSup->pScanBlock); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); + pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; return pInfo->pUpdateRes; } diff --git a/tests/script/tsim/stream/state1.sim b/tests/script/tsim/stream/state1.sim index 2ae5739642..67e02c0890 100644 --- a/tests/script/tsim/stream/state1.sim +++ b/tests/script/tsim/stream/state1.sim @@ -4,6 +4,7 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect +print step 1 print =============== create database sql create database test vgroups 4; sql select * from information_schema.ins_databases; @@ -33,8 +34,8 @@ if $loop_count == 10 then endi sql select * from streamt1; -print data00 data01 -print data10 data11 +print $data00 $data01 +print $data10 $data11 if $rows != 0 then print =====rows=$rows @@ -52,8 +53,8 @@ if $loop_count == 10 then endi sql select * from streamt1; -print data00 data01 -print data10 data11 +print $data00 $data01 +print $data10 $data11 if $rows != 1 then print =====rows=$rows @@ -92,9 +93,64 @@ endi sql select * from streamt1; if $rows != 2 then print =====rows=$rows - goto loop2 + goto loop3 endi +print step 1 over +print step 2 + +sql create database test2 vgroups 1; +sql use test2; +sql create table t1(ts timestamp, a int, b int , c int, d double); +print create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a) +sql create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a); + +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213010,1,2,3,1.1); + +$loop_count = 0 +loop4: + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt2; +print $data00 $data01 +print $data10 $data11 + +if $rows != 1 then + print =====rows=$rows + goto loop4 +endi + +print insert into t1 values(1648791213005,2,2,3,1.1) +sql insert into t1 values(1648791213005,2,2,3,1.1); + +$loop_count = 0 +loop5: + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print select * from streamt2 +sql select * from streamt2; +print $data00 $data01 +print $data10 $data11 +print $data20 $data21 +print $data30 $data31 + +if $rows != 3 then + print =====rows=$rows + goto loop5 +endi + +print step 2 over print state1 end From a8da9f31e70ec91cae7878501593f24d51bd4925 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 30 Mar 2023 18:04:32 +0800 Subject: [PATCH 4/7] fix: join query error --- source/libs/function/src/builtins.c | 4 ++-- source/libs/parser/src/parTranslater.c | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 629e846dae..1a039ebda6 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2457,7 +2457,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "interp", .type = FUNCTION_TYPE_INTERP, .classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | - FUNC_MGT_FORBID_STREAM_FUNC, + FUNC_MGT_FORBID_STREAM_FUNC|FUNC_MGT_KEEP_ORDER_FUNC, .translateFunc = translateInterp, .getEnvFunc = getSelectivityFuncEnv, .initFunc = functionSetup, @@ -3278,7 +3278,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "_irowts", .type = FUNCTION_TYPE_IROWTS, - .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC, + .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC|FUNC_MGT_KEEP_ORDER_FUNC, .translateFunc = translateTimePseudoColumn, .getEnvFunc = getTimePseudoFuncEnv, .initFunc = NULL, diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7fb5b0ba40..f58a66af18 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -757,7 +757,7 @@ static bool isPrimaryKeyImpl(SNode* pExpr) { if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) { return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); - } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) { + } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || FUNCTION_TYPE_IROWTS == pFunc->funcType) { return true; } } From 627cfb230bc0313f1d7799983ccee474b6e7ebfb Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 30 Mar 2023 18:35:49 +0800 Subject: [PATCH 5/7] chore: update taos-tools d194dc9 for main (#20713) --- cmake/taostools_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 3f7a43ab2d..aef89a2d42 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG e82b9fc + GIT_TAG d194dc9 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE From 7e30f8619c9acb87c7aac837fdfbf5e83a3d3f43 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 30 Mar 2023 18:39:50 +0800 Subject: [PATCH 6/7] fix(query): fix group_key is processed in selectivity twice --- source/libs/function/src/builtinsimpl.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3c9f2fe8ca..f8714a53c0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -871,6 +871,12 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + // group_key function has its own process function + // do not process there + if (fmIsGroupKeyFunc(pc->functionId)) { + continue; + } + SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); if (nullList[j]) { colDataSetNULL(pDstCol, rowIndex); @@ -3091,6 +3097,12 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid for (int32_t i = 0; i < pSubsidiaryies->num; ++i) { SqlFunctionCtx* pc = pSubsidiaryies->pCtx[i]; + // group_key function has its own process function + // do not process there + if (fmIsGroupKeyFunc(pc->functionId)) { + continue; + } + SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; int32_t srcSlotId = pFuncParam->pCol->slotId; From 79784491139ea068bedfa9f9275ce796d5eb3f48 Mon Sep 17 00:00:00 2001 From: Xuefeng Tan <1172915550@qq.com> Date: Fri, 31 Mar 2023 09:41:20 +0800 Subject: [PATCH 7/7] enh(taosAdapter): TMQ parameter adjustment (#20710) --- cmake/taosadapter_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 1c401ae80e..b2f335e1f7 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG d8059ff + GIT_TAG cb1e89c SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE