diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c0f5ddde7c..8d93cbb001 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1595,16 +1595,29 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, return code; } +static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int64_t version, char* taskIdStr, + SStreamScanInfo* pInfo, SSDataBlock* pBlock) { + SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, startTs, endTs, version); + printDataBlock(pPreRes, "pre res", taskIdStr); + blockDataCleanup(pBlock); + int32_t code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return ; + } + + SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex); + for (int32_t i = 0; i < pPreRes->info.rows; i++) { + uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i); + appendDataToSpecialBlock(pBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &uid, &groupId, NULL); + } + printDataBlock(pBlock, "new delete", taskIdStr); +} + static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { - blockDataCleanup(pDestBlock); if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } - int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3); + SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startData = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1612,9 +1625,22 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); uint64_t* uidCol = (uint64_t*)pUidCol->pData; SColumnInfoData* pSrcPkCol = NULL; - if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX ) { + if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) { pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); } + int64_t ver = pSrcBlock->info.version - 1; + + if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) { + getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); + startData = (TSKEY*)pStartTsCol->pData; + endData = (TSKEY*)pEndTsCol->pData; + uidCol = (uint64_t*)pUidCol->pData; + } + blockDataCleanup(pDestBlock); + int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return code; + } SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1622,7 +1648,6 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - int64_t ver = pSrcBlock->info.version - 1; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { void* pVal = NULL; if (hasPrimaryKey(pInfo)) { @@ -1663,11 +1688,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } - int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3); + SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startData = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1678,6 +1699,19 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX ) { pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); } + int64_t ver = pSrcBlock->info.version - 1; + + if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) { + getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); + startData = (TSKEY*)pStartTsCol->pData; + endData = (TSKEY*)pEndTsCol->pData; + uidCol = (uint64_t*)pUidCol->pData; + } + + int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); + if (code != TSDB_CODE_SUCCESS) { + return code; + } SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -1685,7 +1719,6 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - int64_t ver = pSrcBlock->info.version - 1; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { void* pVal = NULL; if (hasPrimaryKey(pInfo)) { @@ -1709,8 +1742,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { blockDataCleanup(pDestBlock); - int32_t rows = pSrcBlock->info.rows; - if (rows == 0) { + if (pSrcBlock->info.rows == 0) { return TSDB_CODE_SUCCESS; } SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; @@ -1729,34 +1761,15 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; int64_t ver = pSrcBlock->info.version - 1; - if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) { - uint64_t srcUid = srcUidData[0]; - TSKEY startTs = srcStartTsCol[0]; - TSKEY endTs = srcEndTsCol[0]; - SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver); - printDataBlock(pPreRes, "pre res", GET_TASKID(pTaskInfo)); - blockDataCleanup(pSrcBlock); - int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex); - rows = pPreRes->info.rows; - - for (int32_t i = 0; i < rows; i++) { - uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i); - appendDataToSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid, - &groupId, NULL); - } - printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo)); + if (pInfo->partitionSup.needCalc && (srcStartTsCol[0] != srcEndTsCol[0] || hasPrimaryKey(pInfo))) { + getPreVersionDataBlock(srcUidData[0], srcStartTsCol[0], srcEndTsCol[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); + srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; + srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; + srcUidData = (uint64_t*)pSrcUidCol->pData; } - uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; - srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData; - srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData; - srcUidData = (uint64_t*)pSrcUidCol->pData; - int32_t code = blockDataEnsureCapacity(pDestBlock, rows); + uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData; + int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1767,7 +1780,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - for (int32_t i = 0; i < rows;) { + for (int32_t i = 0; i < pSrcBlock->info.rows;) { uint64_t srcUid = srcUidData[i]; uint64_t groupId = srcGp[i]; if (groupId == 0) { diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 96c48c738d..aaa0ffc7e7 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1246,6 +1246,7 @@ ,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim ,,y,script,./test.sh -f tsim/stream/partitionby1.sim ,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim +,,y,script,./test.sh -f tsim/stream/partitionbyColumnOther.sim ,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim ,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim ,,y,script,./test.sh -f tsim/stream/partitionby.sim diff --git a/tests/script/tsim/stream/partitionbyColumnInterval.sim b/tests/script/tsim/stream/partitionbyColumnInterval.sim index d5f815d533..cc70242172 100644 --- a/tests/script/tsim/stream/partitionbyColumnInterval.sim +++ b/tests/script/tsim/stream/partitionbyColumnInterval.sim @@ -1,6 +1,3 @@ -$loop_all = 0 -looptest: - system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -i 1 system sh/exec.sh -n dnode1 -s start @@ -660,8 +657,65 @@ if $rows != 4 then #goto loop17 endi -$loop_all = $loop_all + 1 -print ============loop_all=$loop_all +print ================step2 +sql drop database if exists test1; +sql create database test6 vgroups 4; +sql use test6; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a interval(10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,0,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); + +sql insert into t1 values(1648791213003,0,2,3,1.0); +sql insert into t1 values(1648791213004,1,2,3,1.0); +sql insert into t1 values(1648791213005,2,2,3,1.0); + +print delete from t1 where ts <= 1648791213002; +sql delete from t1 where ts <= 1648791213002; + +$loop_count = 0 + +loop18: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt6 order by 1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 + +if $rows != 3 then + print ======rows=$rows + goto loop18 +endi + +if $data01 != 1 then + print ======data01=$data01 + goto loop18 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop18 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop18 +endi + +print ========over system sh/stop_dnodes.sh diff --git a/tests/script/tsim/stream/partitionbyColumnOther.sim b/tests/script/tsim/stream/partitionbyColumnOther.sim new file mode 100644 index 0000000000..8e6c0c1f23 --- /dev/null +++ b/tests/script/tsim/stream/partitionbyColumnOther.sim @@ -0,0 +1,130 @@ +$loop_all = 0 +looptest: + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print ================step1 +sql drop database if exists test1; +sql create database test0 vgroups 4; +sql use test0; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams0 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 watermark 100s into streamt0 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a count_window(10); + +sleep 1000 + +sql insert into t1 values(1648791213000,0,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); + +sql insert into t1 values(1648791213003,0,2,3,1.0); +sql insert into t1 values(1648791213004,1,2,3,1.0); +sql insert into t1 values(1648791213005,2,2,3,1.0); + +print delete from t1 where ts <= 1648791213002; +sql delete from t1 where ts <= 1648791213002; + +$loop_count = 0 + +loop0: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt0 order by 1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 + +if $rows != 3 then + print ======rows=$rows + goto loop0 +endi + +if $data01 != 1 then + print ======data01=$data01 + goto loop0 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop0 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop0 +endi + +print ================step1 +sql drop database if exists test1; +sql create database test1 vgroups 4; +sql use test1; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a event_window start with b = 2 end with b = 2; + +sleep 1000 + +sql insert into t1 values(1648791213000,0,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); + +sql insert into t1 values(1648791213003,0,2,3,1.0); +sql insert into t1 values(1648791213004,1,2,3,1.0); +sql insert into t1 values(1648791213005,2,2,3,1.0); + +print delete from t1 where ts <= 1648791213002; +sql delete from t1 where ts <= 1648791213002; + +$loop_count = 0 + +loop1: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt1 order by 1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 + +if $rows != 3 then + print ======rows=$rows + goto loop1 +endi + +if $data01 != 1 then + print ======data01=$data01 + goto loop1 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop1 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop1 +endi + +print ========over + +system sh/stop_dnodes.sh + +#goto looptest diff --git a/tests/script/tsim/stream/partitionbyColumnSession.sim b/tests/script/tsim/stream/partitionbyColumnSession.sim index a22e36e499..1daa033399 100644 --- a/tests/script/tsim/stream/partitionbyColumnSession.sim +++ b/tests/script/tsim/stream/partitionbyColumnSession.sim @@ -561,9 +561,66 @@ if $data21 != 1 then goto loop14 endi +print ================step2 +sql drop database if exists test5; +sql create database test5 vgroups 4; +sql use test5; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a session(ts, 10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,0,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); + +sql insert into t1 values(1648791213003,0,2,3,1.0); +sql insert into t1 values(1648791213004,1,2,3,1.0); +sql insert into t1 values(1648791213005,2,2,3,1.0); + +print delete from t1 where ts <= 1648791213002; +sql delete from t1 where ts <= 1648791213002; + +$loop_count = 0 + +loop15: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt6 order by 1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 + +if $rows != 3 then + print ======rows=$rows + goto loop15 +endi + +if $data01 != 1 then + print ======data01=$data01 + goto loop15 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop15 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop15 +endi + +print ========over + system sh/stop_dnodes.sh -$loop_all = $loop_all + 1 -print ============loop_all=$loop_all - #goto looptest \ No newline at end of file diff --git a/tests/script/tsim/stream/partitionbyColumnState.sim b/tests/script/tsim/stream/partitionbyColumnState.sim index 8435e753d4..d741426786 100644 --- a/tests/script/tsim/stream/partitionbyColumnState.sim +++ b/tests/script/tsim/stream/partitionbyColumnState.sim @@ -266,9 +266,66 @@ if $data32 != 8 then goto loop6 endi +print ================step2 +sql drop database if exists test2; +sql create database test2 vgroups 4; +sql use test2; +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a session(ts, 10s); + +sleep 1000 + +sql insert into t1 values(1648791213000,0,2,3,1.0); +sql insert into t1 values(1648791213001,1,2,3,1.0); +sql insert into t1 values(1648791213002,2,2,3,1.0); + +sql insert into t1 values(1648791213003,0,2,3,1.0); +sql insert into t1 values(1648791213004,1,2,3,1.0); +sql insert into t1 values(1648791213005,2,2,3,1.0); + +print delete from t1 where ts <= 1648791213002; +sql delete from t1 where ts <= 1648791213002; + +$loop_count = 0 + +loop7: +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +sql select * from streamt6 order by 1; + +print $data00 $data01 $data02 +print $data10 $data11 $data12 +print $data20 $data21 $data22 +print $data30 $data31 $data32 +print $data40 $data41 $data42 + +if $rows != 3 then + print ======rows=$rows + goto loop7 +endi + +if $data01 != 1 then + print ======data01=$data01 + goto loop7 +endi + +if $data11 != 1 then + print ======data11=$data11 + goto loop7 +endi + +if $data21 != 1 then + print ======data21=$data21 + goto loop7 +endi + +print ========over + system sh/stop_dnodes.sh -$loop_all = $loop_all + 1 -print ============loop_all=$loop_all - #goto looptest