From 5acf665b7ef046d3ea8f183d36a126581f52bebc Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 9 Mar 2023 19:36:10 +0800 Subject: [PATCH] fix:get dest table version --- source/dnode/vnode/src/tq/tq.c | 10 ++- .../script/tsim/stream/checkStreamSTable1.sim | 71 +++++++++++++++++++ 2 files changed, 78 insertions(+), 3 deletions(-) create mode 100644 tests/script/tsim/stream/checkStreamSTable1.sim diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d21a2e7f3..5d3350a69a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -983,11 +983,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; - /*A(pTask->tbSink.pSchemaWrapper);*/ - /*A(pTask->tbSink.pSchemaWrapper->pSchema);*/ + int32_t version = 1; + SMetaInfo info = {0}; + int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL); + if (code == TSDB_CODE_SUCCESS) { + version = info.skmVer; + } pTask->tbSink.pTSchema = - tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, 1); + tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version); ASSERT(pTask->tbSink.pTSchema); } diff --git a/tests/script/tsim/stream/checkStreamSTable1.sim b/tests/script/tsim/stream/checkStreamSTable1.sim new file mode 100644 index 0000000000..495e1cf358 --- /dev/null +++ b/tests/script/tsim/stream/checkStreamSTable1.sim @@ -0,0 +1,71 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +print ===== step1 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print ===== step2 + +sql create database test vgroups 4; +sql use test; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2 from st interval(1s) ; +sql insert into t1 values(1648791211000,1,2,3); +sql insert into t1 values(1648791212000,2,2,3); + +$loop_count = 0 +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 select * from streamt1; +sql select * from streamt1; + +if $rows != 2 then + print rows=$rows + goto loop0 +endi + +print drop stream streams1 +sql drop stream streams1; + +print alter table streamt1 add column c3 double +sql alter table streamt1 add column c3 double; + +print create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ; +sql create stream streams1 trigger at_once into streamt1 as select _wstart, count(*) c1, count(a) c2, avg(b) c3 from st interval(1s) ; + +sql insert into t2 values(1648791213000,1,2,3); +sql insert into t1 values(1648791214000,1,2,3); + +$loop_count = 0 +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 select * from streamt1; +sql select * from streamt1; + +if $rows != 4 then + print rows=$rows + goto loop1 +endi + +print ======over + +system sh/stop_dnodes.sh