Merge pull request #28915 from taosdata/fix/TD-32999

fix(stream):build create table request for force_window_close
This commit is contained in:
Shengliang Guan 2024-11-27 10:59:14 +08:00 committed by GitHub
commit 9d23131e04
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 233 additions and 4 deletions

View File

@ -538,6 +538,7 @@ typedef struct SStreamScanInfo {
int32_t pkColLen;
bool useGetResultRange;
STimeWindow lastScanRange;
SSDataBlock* pRangeScanRes; // update SSDataBlock
} SStreamScanInfo;
typedef struct {

View File

@ -3839,6 +3839,11 @@ FETCH_NEXT_BLOCK:
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE:
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
if (pInfo->pRangeScanRes != NULL) {
(*ppRes) = pInfo->pRangeScanRes;
pInfo->pRangeScanRes = NULL;
return code;
}
SSDataBlock* pSDB = NULL;
code = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex, &pSDB);
QUERY_CHECK_CODE(code, lino, _end);
@ -3852,6 +3857,15 @@ FETCH_NEXT_BLOCK:
printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
code = calBlockTbName(pInfo, pSDB, 0);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->pCreateTbRes->info.rows > 0) {
printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "update",
GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCreateTbRes;
pInfo->pRangeScanRes = pSDB;
return code;
}
(*ppRes) = pSDB;
return code;
}
@ -4629,6 +4643,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
pInfo->pFillSup = NULL;
pInfo->useGetResultRange = false;
pInfo->pRangeScanRes = NULL;
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -748,7 +748,7 @@ _end:
static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
SSlicePoint* pNextPoint) {
SSlicePoint* pNextPoint, bool isFwc) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t tmpRes = TSDB_CODE_SUCCESS;
@ -769,6 +769,10 @@ static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillS
setPointBuff(pCurPoint, pFillSup);
pFillSup->cur.key = pCurPoint->pRightRow->key;
pFillSup->cur.pRowVal = (SResultCellData*)pCurPoint->pRightRow->pRowVal;
if (isFwc) {
qDebug("===stream=== only get current point state");
goto _end;
}
} else {
pFillSup->cur.key = pCurPoint->key.ts + 1;
}
@ -1466,6 +1470,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
return;
}
bool isFwc = (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE);
// clear the existed group id
pBlock->info.id.groupId = 0;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
@ -1481,11 +1486,23 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
pBlock->info.id.groupId = pKey->groupId;
}
}
void* tbname = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
code =
pAggSup->stateStore.streamStateGetParName(pAggSup->pState, pBlock->info.id.groupId, &tbname, false, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
}
pAggSup->stateStore.streamStateFreeVal(tbname);
SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
SSlicePoint prevPoint = {0};
SSlicePoint nextPoint = {0};
if (pFillSup->type != TSDB_FILL_LINEAR) {
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, isFwc);
} else {
code =
getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
@ -1504,7 +1521,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
}
}
if (pTwSup->calTrigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (isFwc) {
setForceWindowCloseFillRule(pFillSup, pFillInfo, pKey->ts);
} else {
setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts);
@ -1546,7 +1563,7 @@ static void doBuildTimeSliceDeleteResult(SStreamAggSupporter* pAggSup, SStreamFi
SSlicePoint nextPoint = {0};
STimeWindow tw = {0};
if (pFillSup->type != TSDB_FILL_LINEAR) {
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, false);
} else {
code =
getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);

View File

@ -1422,6 +1422,7 @@
,,y,script,./test.sh -f tsim/stream/streamTwaFwcFill.sim
,,y,script,./test.sh -f tsim/stream/streamTwaFwcFillPrimaryKey.sim
,,y,script,./test.sh -f tsim/stream/streamTwaFwcIntervalPrimaryKey.sim
,,y,script,./test.sh -f tsim/stream/streamTwaInterpFwc.sim
,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim
,,y,script,./test.sh -f tsim/stream/triggerSession0.sim
,,y,script,./test.sh -f tsim/stream/udTableAndCol0.sim

View File

@ -109,6 +109,201 @@ if $data01 != $query1_data01 then
return -1
endi
print step2
print =============== create database
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams6 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt6 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1 from st partition by tbname, b as cc every(2s) fill(prev);
sql create stream streams7 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt7 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _wstart, twa(a) from st partition by tbname, b as cc interval(2s) fill(NULL);
sql create stream streams8 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt8 TAGS(cc int) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _wstart, count(a) from st partition by tbname, b as cc interval(2s);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3s,1,1,1);
$loop_count = 0
loop6:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,* from streamt6;
sql select cc,* from streamt6;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop6
endi
if $data00 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt6";
sql select * from information_schema.ins_tables where stable_name = "streamt6";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt6" and table_name like "tbn-t1_1%";
sql select * from information_schema.ins_tables where stable_name = "streamt6" and table_name like "tbn-t1_1%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
$loop_count = 0
loop7:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,* from streamt7;
sql select cc,* from streamt7;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop7
endi
if $data00 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt7";
sql select * from information_schema.ins_tables where stable_name = "streamt7";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt7" and table_name like "tbn-t1_2%";
sql select * from information_schema.ins_tables where stable_name = "streamt7" and table_name like "tbn-t1_2%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
$loop_count = 0
loop8:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,* from streamt8;
sql select cc,* from streamt8;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 1 then
print ======rows=$rows
goto loop8
endi
if $data00 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt8";
sql select * from information_schema.ins_tables where stable_name = "streamt8";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt8" and table_name like "tbn-t1_3%";
sql select * from information_schema.ins_tables where stable_name = "streamt8" and table_name like "tbn-t1_3%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT