From e42d5ba3dcffde45427d6a751847891b716f5e28 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 21 Feb 2023 15:59:53 +0800 Subject: [PATCH 1/2] fix:table name is null --- source/dnode/vnode/src/tq/tqSink.c | 9 +- source/libs/executor/src/groupoperator.c | 18 +++- .../script/tsim/stream/checkStreamSTable.sim | 100 ++++++++++++++++-- tests/script/tsim/stream/udTableAndTag0.sim | 5 +- 4 files changed, 110 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7a8d899a19..8c478c52a7 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -497,7 +497,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* taosArrayPush(tagArray, &tagVal); } } - pCreateTbReq->ctb.tagNum = size; + pCreateTbReq->ctb.tagNum = TMAX(size - UD_TAG_COLUMN_INDEX, 1); STag* pTag = NULL; tTagNew(tagArray, 1, false, &pTag); @@ -510,15 +510,12 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* pCreateTbReq->ctb.pTag = (uint8_t*)pTag; // set table name - SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); - if (colDataIsNull_s(pTbColInfo, rowId)) { + if (!pDataBlock->info.parTbName[0]) { SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); } else { - void* pTbData = colDataGetData(pTbColInfo, rowId); - pCreateTbReq->name = taosMemoryCalloc(1, varDataLen(pTbData) + 1); - memcpy(pCreateTbReq->name, varDataVal(pTbData), varDataLen(pTbData)); + pCreateTbReq->name = strdup(pDataBlock->info.parTbName); } taosArrayPush(reqs.pArray, pCreateTbReq); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9fd8f7d3a2..4afd254756 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -992,26 +992,34 @@ void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); pTmpBlock->info.id.groupId = groupId; + char* tbName = pSrcBlock->info.parTbName; if (pTableSup->numOfExprs > 0) { projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); - void* pData = colDataGetVarData(pTbCol, pDestBlock->info.rows - 1); - char* tbName = pSrcBlock->info.parTbName; memset(tbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); - memcpy(tbName, varDataVal(pData), len); + int32_t len = 0; + if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) { + len = TMIN(sizeof(TSDB_DATA_NULL_STR), TSDB_TABLE_NAME_LEN - 1); + memcpy(tbName, TSDB_DATA_NULL_STR, len); + } else { + void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1); + len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); + memcpy(tbName, varDataVal(pData), len); + } streamStatePutParName(pState, groupId, tbName); memcpy(pTmpBlock->info.parTbName, tbName, len); pDestBlock->info.rows--; } else { void* pTbNameCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); colDataAppendNULL(pTbNameCol, pDestBlock->info.rows); - pSrcBlock->info.parTbName[0] = 0; + tbName[0] = 0; } if (pTagSup->numOfExprs > 0) { projectApplyFunctions(pTagSup->pExprInfo, pDestBlock, pTmpBlock, pTagSup->pCtx, pTagSup->numOfExprs, NULL); pDestBlock->info.rows--; + } else { + memcpy(pDestBlock->info.parTbName, pTmpBlock->info.parTbName, TSDB_TABLE_NAME_LEN); } void* pGpIdCol = taosArrayGet(pDestBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); diff --git a/tests/script/tsim/stream/checkStreamSTable.sim b/tests/script/tsim/stream/checkStreamSTable.sim index fda78af621..755b3ee7eb 100644 --- a/tests/script/tsim/stream/checkStreamSTable.sim +++ b/tests/script/tsim/stream/checkStreamSTable.sim @@ -301,7 +301,7 @@ print $data00, $data01, $data02, $data03 print $data10, $data11, $data12, $data13 print $data20, $data21, $data22, $data23 -loop2: +loop3: sleep 300 @@ -317,47 +317,127 @@ if $rows != 2 then print $data00, $data01, $data02, $data03 print $data10, $data11, $data12, $data13 print $data20, $data21, $data22, $data23 - goto loop2 + goto loop3 endi if $data01 != 10 then print =====data01=$data01 - goto loop2 + goto loop3 endi if $data02 != 20 then print =====data02=$data02 - goto loop2 + goto loop3 endi if $data03 != 1 then print =====data03=$data03 - goto loop2 + goto loop3 endi if $data04 != NULL then print =====data04=$data04 - goto loop2 + goto loop3 endi if $data11 != 40 then print =====data11=$data11 - goto loop2 + goto loop3 endi if $data12 != 50 then print =====data12=$data12 - goto loop2 + goto loop3 endi if $data13 != 1 then print =====data13=$data13 - goto loop2 + goto loop3 endi if $data14 != NULL then print =====data14=$data14 - goto loop2 + goto loop3 +endi + +print ===== step7 + +sql create database result5 vgroups 1; + +sql create database test5 vgroups 4; +sql use test5; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,2,3); +sql create table t2 using st tags(4,5,6); + +sql create stable result5.streamt5(ts timestamp,a int,b int,c int, d int) tags(tg1 int,tg2 int,tg3 int); + +sql create stream streams5 trigger at_once into result5.streamt5(ts,c,a,b) tags(tg2, tg3, tg1) subtable( concat("tbl-", cast(tg3 as varchar(10)) ) ) as select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s); + +sql insert into t1 values(1648791213000,NULL,NULL,NULL); + +$loop_count = 0 + +print select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s); +sql select _wstart, count(*) c1, max(a),min(b) c2 from st partition by ta+1 as tg1, cast(tb as bigint) as tg2, a as tg3 session(ts, 10s); +print $data00, $data01, $data02, $data03 +print $data10, $data11, $data12, $data13 +print $data20, $data21, $data22, $data23 + +loop4: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print sql select * from result5.streamt5 order by tg1; +sql select * from result5.streamt5 order by tg1; +print $data00, $data01, $data02, $data03 $data04 $data05 $data06 $data07 +print $data10, $data11, $data12, $data13 +print $data20, $data21, $data22, $data23 + +if $rows != 1 then + print =====rows=$rows + goto loop4 +endi + +if $data01 != NULL then + print =====data01=$data01 + goto loop4 +endi + +if $data02 != NULL then + print =====data02=$data02 + goto loop4 +endi + +if $data03 != 1 then + print =====data03=$data03 + goto loop4 +endi + +if $data04 != NULL then + print =====data04=$data04 + goto loop4 +endi + +if $data05 != 2 then + print =====data05=$data05 + goto loop4 +endi + +if $data06 != 2 then + print =====data06=$data06 + goto loop4 +endi + +if $data07 != NULL then + print =====data07=$data07 + goto loop4 endi print ======over diff --git a/tests/script/tsim/stream/udTableAndTag0.sim b/tests/script/tsim/stream/udTableAndTag0.sim index bfc299df0f..8bf34dc54c 100644 --- a/tests/script/tsim/stream/udTableAndTag0.sim +++ b/tests/script/tsim/stream/udTableAndTag0.sim @@ -39,7 +39,10 @@ sql select table_name from information_schema.ins_tables where db_name="result" if $rows != 2 then print =====rows=$rows - print $data00 $data10 + print $data00 + print $data10 + print $data20 + print $data30 goto loop0 endi From d43355e77f3d22d76b65d5d97c61d711b6229869 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 21 Feb 2023 18:27:50 +0800 Subject: [PATCH 2/2] fix:check null column --- source/dnode/mnode/impl/src/mndStream.c | 2 +- .../script/tsim/stream/checkStreamSTable.sim | 55 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 47ebdd706d..de39df4bc9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -354,7 +354,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, int32_t dataIndex = 0; for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); - if (i < pos->slotId) { + if (nullIndex >= numOfNULL || i < pos->slotId) { pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; diff --git a/tests/script/tsim/stream/checkStreamSTable.sim b/tests/script/tsim/stream/checkStreamSTable.sim index 755b3ee7eb..288dd35cfe 100644 --- a/tests/script/tsim/stream/checkStreamSTable.sim +++ b/tests/script/tsim/stream/checkStreamSTable.sim @@ -440,6 +440,61 @@ if $data07 != NULL then goto loop4 endi +sql drop stream if exists streams4; +sql drop stream if exists streams5; +sql drop database if exists test4; +sql drop database if exists test5; +sql drop database if exists result4; +sql drop database if exists result5; + +print ===== step8 + +sql drop stream if exists streams8; +sql drop database if exists test8; +sql create database test8 vgroups 1; +sql use test8; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams8 trigger at_once into streamt8 as select _wstart as ts, count(*) c1, count(d) c2, count(c) c3 from t1 partition by tbname interval(10s) ; + +sql drop stream streams8; +sql create stream streams71 trigger at_once into streamt8(ts, c2) tags(group_id)as select _wstart, count(*) from t1 partition by tbname as group_id interval(10s); + +sql insert into t1 values(1648791233000,1,2,3,1.0); + +loop8: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt8; +print $data00, $data01, $data02, $data03 +print $data10, $data11, $data12, $data13 +print $data20, $data21, $data22, $data23 + +if $rows != 1 then + print =====rows=$rows + goto loop8 +endi + +if $data01 != NULL then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 1 then + print =====data02=$data02 + goto loop8 +endi + +if $data03 != NULL then + print =====data03=$data03 + goto loop8 +endi + print ======over system sh/stop_dnodes.sh