This commit is contained in:
54liuyao 2024-03-29 13:52:56 +08:00
parent ffeabb132a
commit ea33f08d02
6 changed files with 366 additions and 54 deletions

View File

@ -1595,16 +1595,29 @@ static int32_t getPreSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
return code;
}
static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int64_t version, char* taskIdStr,
SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, uid, startTs, endTs, version);
printDataBlock(pPreRes, "pre res", taskIdStr);
blockDataCleanup(pBlock);
int32_t code = blockDataEnsureCapacity(pBlock, pPreRes->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return ;
}
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
for (int32_t i = 0; i < pPreRes->info.rows; i++) {
uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
appendDataToSpecialBlock(pBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &uid, &groupId, NULL);
}
printDataBlock(pBlock, "new delete", taskIdStr);
}
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
blockDataCleanup(pDestBlock);
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
@ -1612,9 +1625,22 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
SColumnInfoData* pSrcPkCol = NULL;
if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX ) {
if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) {
pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
}
int64_t ver = pSrcBlock->info.version - 1;
if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) {
getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
startData = (TSKEY*)pStartTsCol->pData;
endData = (TSKEY*)pEndTsCol->pData;
uidCol = (uint64_t*)pUidCol->pData;
}
blockDataCleanup(pDestBlock);
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
@ -1622,7 +1648,6 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
int64_t ver = pSrcBlock->info.version - 1;
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
void* pVal = NULL;
if (hasPrimaryKey(pInfo)) {
@ -1663,11 +1688,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
ASSERT(taosArrayGetSize(pSrcBlock->pDataBlock) >= 3);
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
@ -1678,6 +1699,19 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX ) {
pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
}
int64_t ver = pSrcBlock->info.version - 1;
if (pInfo->partitionSup.needCalc && (startData[0] != endData[0] || hasPrimaryKey(pInfo))) {
getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
startData = (TSKEY*)pStartTsCol->pData;
endData = (TSKEY*)pEndTsCol->pData;
uidCol = (uint64_t*)pUidCol->pData;
}
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
@ -1685,7 +1719,6 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
int64_t ver = pSrcBlock->info.version - 1;
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
void* pVal = NULL;
if (hasPrimaryKey(pInfo)) {
@ -1709,8 +1742,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
blockDataCleanup(pDestBlock);
int32_t rows = pSrcBlock->info.rows;
if (rows == 0) {
if (pSrcBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
@ -1729,34 +1761,15 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
int64_t ver = pSrcBlock->info.version - 1;
if (pInfo->partitionSup.needCalc && srcStartTsCol[0] != srcEndTsCol[0]) {
uint64_t srcUid = srcUidData[0];
TSKEY startTs = srcStartTsCol[0];
TSKEY endTs = srcEndTsCol[0];
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
printDataBlock(pPreRes, "pre res", GET_TASKID(pTaskInfo));
blockDataCleanup(pSrcBlock);
int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
rows = pPreRes->info.rows;
for (int32_t i = 0; i < rows; i++) {
uint64_t groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, i);
appendDataToSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
&groupId, NULL);
}
printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo));
if (pInfo->partitionSup.needCalc && (srcStartTsCol[0] != srcEndTsCol[0] || hasPrimaryKey(pInfo))) {
getPreVersionDataBlock(srcUidData[0], srcStartTsCol[0], srcEndTsCol[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
srcUidData = (uint64_t*)pSrcUidCol->pData;
}
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
srcUidData = (uint64_t*)pSrcUidCol->pData;
int32_t code = blockDataEnsureCapacity(pDestBlock, rows);
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1767,7 +1780,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
SColumnInfoData* pGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
SColumnInfoData* pCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
SColumnInfoData* pCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
for (int32_t i = 0; i < rows;) {
for (int32_t i = 0; i < pSrcBlock->info.rows;) {
uint64_t srcUid = srcUidData[i];
uint64_t groupId = srcGp[i];
if (groupId == 0) {

View File

@ -1246,6 +1246,7 @@
,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim
,,y,script,./test.sh -f tsim/stream/partitionby1.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnOther.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim
,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim
,,y,script,./test.sh -f tsim/stream/partitionby.sim

View File

@ -1,6 +1,3 @@
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
@ -660,8 +657,65 @@ if $rows != 4 then
#goto loop17
endi
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
print ================step2
sql drop database if exists test1;
sql create database test6 vgroups 4;
sql use test6;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a interval(10s);
sleep 1000
sql insert into t1 values(1648791213000,0,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213003,0,2,3,1.0);
sql insert into t1 values(1648791213004,1,2,3,1.0);
sql insert into t1 values(1648791213005,2,2,3,1.0);
print delete from t1 where ts <= 1648791213002;
sql delete from t1 where ts <= 1648791213002;
$loop_count = 0
loop18:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt6 order by 1;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
if $rows != 3 then
print ======rows=$rows
goto loop18
endi
if $data01 != 1 then
print ======data01=$data01
goto loop18
endi
if $data11 != 1 then
print ======data11=$data11
goto loop18
endi
if $data21 != 1 then
print ======data21=$data21
goto loop18
endi
print ========over
system sh/stop_dnodes.sh

View File

@ -0,0 +1,130 @@
$loop_all = 0
looptest:
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ================step1
sql drop database if exists test1;
sql create database test0 vgroups 4;
sql use test0;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams0 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 watermark 100s into streamt0 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a count_window(10);
sleep 1000
sql insert into t1 values(1648791213000,0,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213003,0,2,3,1.0);
sql insert into t1 values(1648791213004,1,2,3,1.0);
sql insert into t1 values(1648791213005,2,2,3,1.0);
print delete from t1 where ts <= 1648791213002;
sql delete from t1 where ts <= 1648791213002;
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt0 order by 1;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
if $rows != 3 then
print ======rows=$rows
goto loop0
endi
if $data01 != 1 then
print ======data01=$data01
goto loop0
endi
if $data11 != 1 then
print ======data11=$data11
goto loop0
endi
if $data21 != 1 then
print ======data21=$data21
goto loop0
endi
print ================step1
sql drop database if exists test1;
sql create database test1 vgroups 4;
sql use test1;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a event_window start with b = 2 end with b = 2;
sleep 1000
sql insert into t1 values(1648791213000,0,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213003,0,2,3,1.0);
sql insert into t1 values(1648791213004,1,2,3,1.0);
sql insert into t1 values(1648791213005,2,2,3,1.0);
print delete from t1 where ts <= 1648791213002;
sql delete from t1 where ts <= 1648791213002;
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt1 order by 1;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
if $rows != 3 then
print ======rows=$rows
goto loop1
endi
if $data01 != 1 then
print ======data01=$data01
goto loop1
endi
if $data11 != 1 then
print ======data11=$data11
goto loop1
endi
if $data21 != 1 then
print ======data21=$data21
goto loop1
endi
print ========over
system sh/stop_dnodes.sh
#goto looptest

View File

@ -561,9 +561,66 @@ if $data21 != 1 then
goto loop14
endi
print ================step2
sql drop database if exists test5;
sql create database test5 vgroups 4;
sql use test5;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a session(ts, 10s);
sleep 1000
sql insert into t1 values(1648791213000,0,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213003,0,2,3,1.0);
sql insert into t1 values(1648791213004,1,2,3,1.0);
sql insert into t1 values(1648791213005,2,2,3,1.0);
print delete from t1 where ts <= 1648791213002;
sql delete from t1 where ts <= 1648791213002;
$loop_count = 0
loop15:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt6 order by 1;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
if $rows != 3 then
print ======rows=$rows
goto loop15
endi
if $data01 != 1 then
print ======data01=$data01
goto loop15
endi
if $data11 != 1 then
print ======data11=$data11
goto loop15
endi
if $data21 != 1 then
print ======data21=$data21
goto loop15
endi
print ========over
system sh/stop_dnodes.sh
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
#goto looptest

View File

@ -266,9 +266,66 @@ if $data32 != 8 then
goto loop6
endi
print ================step2
sql drop database if exists test2;
sql create database test2 vgroups 4;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt6 subtable("aaa-a") as select _wstart, count(*) from t1 partition by a session(ts, 10s);
sleep 1000
sql insert into t1 values(1648791213000,0,2,3,1.0);
sql insert into t1 values(1648791213001,1,2,3,1.0);
sql insert into t1 values(1648791213002,2,2,3,1.0);
sql insert into t1 values(1648791213003,0,2,3,1.0);
sql insert into t1 values(1648791213004,1,2,3,1.0);
sql insert into t1 values(1648791213005,2,2,3,1.0);
print delete from t1 where ts <= 1648791213002;
sql delete from t1 where ts <= 1648791213002;
$loop_count = 0
loop7:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt6 order by 1;
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
if $rows != 3 then
print ======rows=$rows
goto loop7
endi
if $data01 != 1 then
print ======data01=$data01
goto loop7
endi
if $data11 != 1 then
print ======data11=$data11
goto loop7
endi
if $data21 != 1 then
print ======data21=$data21
goto loop7
endi
print ========over
system sh/stop_dnodes.sh
$loop_all = $loop_all + 1
print ============loop_all=$loop_all
#goto looptest