diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f1103ad48a..3801a25d6d 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -488,9 +488,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* }; void* pData = colDataGetData(pTagData, rowId); if (colDataIsNull_s(pTagData, rowId)) { - tagVal.type = TSDB_DATA_TYPE_NULL; - tagVal.pData = NULL; - tagVal.nData = 0; + continue; } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { tagVal.nData = varDataLen(pData); tagVal.pData = varDataVal(pData); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index fb122b077f..9c8137db34 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -971,7 +971,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, int64_t groupId, SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock) { void* pValue = NULL; - if (groupId != 0 && streamStateGetParName(pState, groupId, &pValue) != 0) { + if (streamStateGetParName(pState, groupId, &pValue) != 0) { SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); if (pTableSup->numOfExprs > 0) { projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 37c33c44e2..25de627127 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1786,7 +1786,7 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SPackedData* pPacked = taosArrayGet(pInfo->pBlockLists, current); SSDataBlock* pBlock = pPacked->pDataBlock; - if (pBlock->info.id.groupId && pBlock->info.parTbName[0]) { + if (pBlock->info.parTbName[0]) { streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName); } // TODO move into scan diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c0d89aa8c7..648580b913 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1614,7 +1614,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { } nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); - cleanupGroupResInfo(&pInfo->groupResInfo); + pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows); cleanupExprSupp(&pInfo->scalarSupp); taosMemoryFreeClear(param); diff --git a/tests/script/tsim/stream/checkStreamSTable.sim b/tests/script/tsim/stream/checkStreamSTable.sim index 2ed6958196..b60ab0ac05 100644 --- a/tests/script/tsim/stream/checkStreamSTable.sim +++ b/tests/script/tsim/stream/checkStreamSTable.sim @@ -19,9 +19,9 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb int,tc int); +sql create stable result.streamt0(ts timestamp,a int,b int) tags(ta int,tb varchar(100),tc int); -sql create stream streams0 trigger at_once into result.streamt0 as select _wstart, count(*) c1, max(a) c2 from st partition by tbname interval(10s); +sql create stream streams0 trigger at_once into result.streamt0 tags(tb) as select _wstart, count(*) c1, max(a) c2 from st partition by tbname tb interval(10s); sql insert into t1 values(1648791213000,1,2,3); sql insert into t2 values(1648791213000,2,2,3); @@ -61,6 +61,16 @@ if $data02 != 1 then goto loop0 endi +if $data03 != NULL then + print =====data03=$data03 + goto loop0 +endi + +if $data04 != t1 then + print =====data04=$data04 + goto loop0 +endi + if $data11 != 1 then print =====data11=$data11 goto loop0 @@ -71,6 +81,16 @@ if $data12 != 2 then goto loop0 endi +if $data13 != NULL then + print =====data13=$data13 + goto loop0 +endi + +if $data14 != t2 then + print =====data14=$data14 + goto loop0 +endi + print ===== step3 sql create database result1 vgroups 1; @@ -83,9 +103,9 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,1,1); sql create table t2 using st tags(2,2,2); -sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta bigint unsigned,tb int,tc int); +sql create stable result1.streamt1(ts timestamp,a int,b int,c int) tags(ta varchar(100),tb int,tc int); -sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname interval(10s); +sql create stream streams1 trigger at_once into result1.streamt1(ts,c,a,b) tags(ta) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by tbname as ta interval(10s); sql insert into t1 values(1648791213000,10,20,30); sql insert into t2 values(1648791213000,40,50,60); @@ -161,7 +181,7 @@ sql create table t2 using st tags(2,2,2); sql create stable result2.streamt2(ts timestamp, a int , b int) tags(ta varchar(20)); # tag dest 1, source 2 -##sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s); +sql_error create stream streams2 trigger at_once into result2.streamt2 TAGS(aa varchar(100), ta int) as select _wstart, count(*) c1, max(a) from st partition by tbname as aa, ta interval(10s); # column dest 3, source 4 sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1, max(a), max(b) from st partition by tbname interval(10s); @@ -173,7 +193,7 @@ sql_error create stream streams2 trigger at_once into result2.streamt2(ts, a, b sql_error create stream streams2 trigger at_once into result2.streamt2 as select _wstart, count(*) c1 from st partition by tbname interval(10s); # column dest 3, source 2 -sql create stream streams2 trigger at_once into result2.streamt2(ts, a) as select _wstart, count(*) c1 from st partition by tbname interval(10s); +sql create stream streams2 trigger at_once into result2.streamt2(ts, a) tags(ta) as select _wstart, count(*) c1 from st partition by tbname as ta interval(10s); print ===== step5 @@ -252,16 +272,16 @@ sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); sql create table t1 using st tags(1,2,3); sql create table t2 using st tags(4,5,6); -sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(ta int,tb int,tc int); +sql create stable result4.streamt4(ts timestamp,a int,b int,c int, d int) tags(tg1 int,tg2 int,tg3 int); -sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2 int, tg3 varchar(100), tg1 bigint) subtable(concat("tbl-", tg1)) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s); +sql create stream streams4 trigger at_once into result4.streamt4(ts,c,a,b) tags(tg2, tg3, tg1) subtable( concat("tbl-", cast(tg1 as varchar(10)) ) ) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s); sql insert into t1 values(1648791213000,10,20,30); sql insert into t2 values(1648791213000,40,50,60); $loop_count = 0 -sql select _wstart, count(*) c1, max(a),min(b) c2 from st interval(10s); +sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, tc as tg3 interval(10s); print $data00, $data01, $data02, $data03 print $data10, $data11, $data12, $data13 print $data20, $data21, $data22, $data23 @@ -275,7 +295,7 @@ if $loop_count == 10 then return -1 endi -sql select * from result4.streamt4; +sql select * from result4.streamt4 order by tg1; if $rows != 2 then print =====rows=$rows @@ -285,7 +305,7 @@ if $rows != 2 then goto loop2 endi -if $data01 != 40 then +if $data01 != 10 then print =====data01=$data01 goto loop2 endi @@ -295,7 +315,7 @@ if $data02 != 20 then goto loop2 endi -if $data03 != 2 then +if $data03 != 1 then print =====data03=$data03 goto loop2 endi @@ -305,6 +325,26 @@ if $data04 != NULL then goto loop2 endi +if $data11 != 40 then + print =====data11=$data11 + goto loop2 +endi + +if $data12 != 50 then + print =====data12=$data12 + goto loop2 +endi + +if $data13 != 1 then + print =====data13=$data13 + goto loop2 +endi + +if $data14 != NULL then + print =====data14=$data14 + goto loop2 +endi + print ======over system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/udTableAndTag0.sim b/tests/script/tsim/stream/udTableAndTag0.sim index 5cb5c2dd8b..bfc299df0f 100644 --- a/tests/script/tsim/stream/udTableAndTag0.sim +++ b/tests/script/tsim/stream/udTableAndTag0.sim @@ -367,6 +367,77 @@ if $data22 != tag-t3 then goto loop8 endi +print ===== step6 +print ===== transform tag value + +sql drop stream if exists streams1; +sql drop stream if exists streams2; +sql drop stream if exists streams3; +sql drop stream if exists streams4; +sql drop stream if exists streams5; + +sql drop database if exists test1; +sql drop database if exists test2; +sql drop database if exists test3; +sql drop database if exists test4; +sql drop database if exists test5; + +sql drop database if exists result1; +sql drop database if exists result2; +sql drop database if exists result3; +sql drop database if exists result4; +sql drop database if exists result5; + + + +sql create database result6 vgroups 1; + +sql create database test6 vgroups 4; +sql use test6; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta varchar(20), tb int, tc int); +sql create table t1 using st tags("1",1,1); +sql create table t2 using st tags("2",2,2); +sql create table t3 using st tags("3",3,3); + +sql create stream streams6 trigger at_once into result6.streamt6 TAGS(dd int) as select _wstart, count(*) c1 from st partition by concat(ta, "0") as dd, tbname interval(10s); +sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3); + + +$loop_count = 0 +loop9: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result6.streamt6 order by 3; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop9 +endi + +if $data02 != 10 then + print =====data02=$data02 + goto loop9 +endi + +if $data12 != 20 then + print =====data12=$data12 + goto loop9 +endi + +if $data22 != 30 then + print =====data22=$data22 + goto loop8 +endi + print ======over system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/udTableAndTag2.sim b/tests/script/tsim/stream/udTableAndTag2.sim new file mode 100644 index 0000000000..c42ac21dcd --- /dev/null +++ b/tests/script/tsim/stream/udTableAndTag2.sim @@ -0,0 +1,371 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +print ===== step1 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print ===== step2 +print ===== table name + +sql create database result vgroups 1; + +sql create database test vgroups 4; +sql use test; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams1 trigger at_once into result.streamt SUBTABLE("aaa") as select _wstart, count(*) c1 from st interval(10s); +print ===== insert into 1 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + +$loop_count = 0 +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result" order by 1; + +if $rows != 1 then + print =====rows=$rows + print $data00 + print $data10 + goto loop0 +endi + +if $data00 != aaa then + print =====data00=$data00 + goto loop0 +endi + + +$loop_count = 0 +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result.streamt; + +if $rows != 1 then + print =====rows=$rows + print $data00 $data10 + goto loop1 +endi + + +print ===== step3 +print ===== column name + +sql create database result2 vgroups 1; + +sql create database test2 vgroups 4; +sql use test2; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams2 trigger at_once into result2.streamt2 TAGS(cc varchar(100)) as select _wstart, count(*) c1 from st interval(10s); +print ===== insert into 2 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1; + +sql select tag_name from information_schema.ins_tags where db_name="result2" and stable_name = "streamt2" order by 1; + +if $rows != 1 then + print =====rows=$rows + print $data00 + print $data10 + goto loop2 +endi + +if $data00 != cc then + print =====data00=$data00 + goto loop2 +endi + +print sql select cc from result2.streamt2 order by 1; + +$loop_count = 0 +loop21: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select cc from result2.streamt2 order by 1; + +if $rows != 1 then + print =====rows=$rows + print $data00 + print $data10 + goto loop21 +endi + +if $data00 != NULL then + print =====data00=$data00 + goto loop21 +endi + + +$loop_count = 0 +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result2.streamt2; + +if $rows != 1 then + print =====rows=$rows + print $data00 $data10 + goto loop3 +endi + + +print ===== step4 +print ===== column name + table name + +sql create database result3 vgroups 1; + +sql create database test3 vgroups 4; +sql use test3; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql create stream streams3 trigger at_once into result3.streamt3 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", 1 ) ) as select _wstart, count(*) c1 from st interval(10s); +print ===== insert into 3 +sql insert into t1 values(1648791213000,1,2,3); +sql insert into t2 values(1648791213000,2,2,3); + + +$loop_count = 0 +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select tag_name from information_schema.ins_tags where db_name="result3" and stable_name = "streamt3" order by 1; + +if $rows != 1 then + print =====rows=$rows + print $data00 + print $data10 + goto loop4 +endi + +if $data00 != NULL then + print =====data00=$data00 + goto loop4 +endi + +sql select dd from result3.streamt3 order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop4 +endi + +if $data00 != col-1 then + print =====data00=$data00 + goto loop4 +endi + +if $data10 != col-2 then + print =====data10=$data10 + goto loop4 +endi + +$loop_count = 0 +loop5: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result3.streamt3; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop5 +endi + +$loop_count = 0 +loop6: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result3" order by 1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data10 + goto loop6 +endi + +if $data00 != tbn-1 then + print =====data00=$data00 + goto loop6 +endi + +if $data10 != tbn-2 then + print =====data10=$data10 + goto loop6 +endi + +print ===== step5 +print ===== tag name + table name + +sql create database result4 vgroups 1; + +sql create database test4 vgroups 4; +sql use test4; + + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); + +sql create stream streams4 trigger at_once into result4.streamt4 TAGS(dd varchar(100)) SUBTABLE(concat("tbn-", dd)) as select _wstart, count(*) c1 from st interval(10s); +sql insert into t1 values(1648791213000,1,1,1) t2 values(1648791213000,2,2,2) t3 values(1648791213000,3,3,3); + + +$loop_count = 0 +loop7: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select table_name from information_schema.ins_tables where db_name="result4" order by 1; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop7 +endi + +if $data00 != tbn-t1 then + print =====data00=$data00 + goto loop7 +endi + +if $data10 != tbn-t2 then + print =====data10=$data10 + goto loop7 +endi + +if $data20 != tbn-t3 then + print =====data20=$data20 + goto loop7 +endi + +$loop_count = 0 +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from result4.streamt4 order by 3; + +if $rows != 3 then + print =====rows=$rows + print $data00 $data10 + goto loop8 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != t1 then + print =====data02=$data02 + goto loop8 +endi + +if $data11 != 1 then + print =====data11=$data11 + goto loop8 +endi + +if $data12 != t2 then + print =====data12=$data12 + goto loop8 +endi + +if $data21 != 1 then + print =====data21=$data21 + goto loop8 +endi + +if $data22 != t3 then + print =====data22=$data22 + goto loop8 +endi + +print ======over + +system sh/stop_dnodes.sh