fix stream issue

This commit is contained in:
54liuyao 2024-11-27 16:24:46 +08:00
parent e63b8a0ccb
commit 9d668150d5
8 changed files with 487 additions and 113 deletions

View File

@ -1432,7 +1432,11 @@ static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppR
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = -1; pInfo->srcRowIndex = -1;
} break; } break;
case STREAM_CHECKPOINT: case STREAM_CHECKPOINT: {
pInfo->stateStore.streamStateCommit(pInfo->pState);
(*ppRes) = pBlock;
goto _end;
} break;
case STREAM_CREATE_CHILD_TABLE: { case STREAM_CREATE_CHILD_TABLE: {
(*ppRes) = pBlock; (*ppRes) = pBlock;
goto _end; goto _end;

View File

@ -360,6 +360,13 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
return code; return code;
} }
if (pInfo->recvCkBlock) {
pInfo->recvCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
setStreamOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL; (*ppRes) = NULL;
@ -391,8 +398,6 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
case STREAM_CHECKPOINT: { case STREAM_CHECKPOINT: {
pInfo->recvCkBlock = true; pInfo->recvCkBlock = true;
pAggSup->stateStore.streamStateCommit(pAggSup->pState); pAggSup->stateStore.streamStateCommit(pAggSup->pState);
// doStreamIntervalSliceSaveCheckpoint(pOperator);
pInfo->recvCkBlock = true;
code = copyDataBlock(pInfo->pCheckpointRes, pBlock); code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
continue; continue;
@ -450,6 +455,12 @@ static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock**
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if ((*ppRes) == NULL) { if ((*ppRes) == NULL) {
if (pInfo->recvCkBlock) {
pInfo->recvCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pCheckpointRes;
return code;
}
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState); pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
setStreamOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
} }

View File

@ -10931,10 +10931,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
if (pSelect->hasInterpFunc) { if (pSelect->hasInterpFunc) {
// Temporary code // Temporary code
if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { // if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, // return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream interp function only support force window close"); // "Stream interp function only support force window close");
} // }
if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (pStmt->pOptions->fillHistory) { if (pStmt->pOptions->fillHistory) {

View File

@ -4130,7 +4130,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) { SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) {
SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX}; SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX};
return streamStateFillSeekKeyNext_rocksdb(pState, &key); return streamStateFillSeekKeyPrev_rocksdb(pState, &key);
} }
#ifdef BUILD_NO_CALL #ifdef BUILD_NO_CALL

View File

@ -23,6 +23,33 @@
#define NUM_OF_CACHE_WIN 64 #define NUM_OF_CACHE_WIN 64
#define MAX_NUM_OF_CACHE_WIN 128 #define MAX_NUM_OF_CACHE_WIN 128
int32_t recoverSearchBuff(SStreamFileState* pFileState, SArray* pWinStates, uint64_t groupId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SWinKey start = {.groupId = groupId, .ts = INT64_MAX};
void* pState = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
SWinKey tmpKey = {.groupId = groupId};
int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0);
if (tmpRes != TSDB_CODE_SUCCESS) {
break;
}
void* tmp = taosArrayPush(pWinStates, &tmpKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
streamStateCurPrev_rocksdb(pCur);
}
taosArraySort(pWinStates, winKeyCmprImpl);
streamStateFreeCur(pCur);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) { int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -38,22 +65,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
// recover // recover
if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
TSKEY ts = getFlushMark(pFileState); recoverSearchBuff(pFileState, pWinStates, pKey->groupId);
SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX};
void* pState = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
SWinKey tmpKey = {.groupId = pKey->groupId};
int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0);
if (tmpRes != TSDB_CODE_SUCCESS) {
break;
}
void* tmp = taosArrayPush(pWinStates, &tmpKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
streamStateCurPrev_rocksdb(pCur);
}
taosArraySort(pWinStates, winKeyCmprImpl);
streamStateFreeCur(pCur);
} }
code = addSearchItem(pFileState, pWinStates, pKey); code = addSearchItem(pFileState, pWinStates, pKey);
@ -203,29 +215,16 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
SArray* pWinStates = NULL; SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState); SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void* pState = getStateFileStore(pFileState); void* pState = getStateFileStore(pFileState);
void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); // void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff); code = addArrayBuffIfNotExist(pSearchBuff, pKey->groupId, &pWinStates);
} else {
qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
// recover
if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
recoverSearchBuff(pFileState, pWinStates, pKey->groupId);
} }
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
streamStateFreeCur(pCur);
return code;
}
int32_t size = taosArrayGetSize(pWinStates); int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index >= 0) { if (index >= 0) {

View File

@ -1,67 +0,0 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a) from st partition by tbname interval(2s);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a) from st interval(2s);
run tsim/stream/checkTaskStatus.sim
sleep 70000
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3000a,1,1,1);
$loop_count = 0
loop0:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02
if $rows == 0 then
goto loop0
endi
print select * from streamt2;
sql select * from streamt2;
print $data00 $data01 $data02
if $rows == 0 then
goto loop0
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -304,6 +304,253 @@ if $rows != 1 then
return -1 return -1
endi endi
print step3
print =============== create database
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1234567890t1 using st tags(1,1,1);
sql create table t1234567890t2 using st tags(2,2,2);
sql create stable streamt9(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int);
sql create stable streamt10(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int);
sql create stable streamt11(ts timestamp,a varchar(10),b tinyint,c tinyint) tags(ta varchar(3),cc int,tc int);
sql create stream streams9 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt9 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_1")) as select _irowts, interp(a), _isfilled as a1, interp(b) from st partition by tbname as ta, b as cc every(2s) fill(value, 100000,200000);
sql create stream streams10 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt10 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_2")) as select _wstart, twa(a), sum(b),max(c) from st partition by tbname as ta, b as cc interval(2s) fill(NULL);
sql create stream streams11 trigger FORCE_WINDOW_CLOSE IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt11 TAGS(cc,ta) SUBTABLE(concat(concat("tbn-", tbname), "_3")) as select _wstart, count(a),avg(c),min(b) from st partition by tbname as ta, b as cc interval(2s);
run tsim/stream/checkTaskStatus.sim
sql insert into t1234567890t1 values(now + 3s,100000,1,1);
$loop_count = 0
loop9:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,ta, * from streamt9;
sql select cc,ta, * from streamt9;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop9
endi
if $data00 != 1 then
return -1
endi
if $data01 != @t12@ then
return -1
endi
if $data03 != @100000@ then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 64 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt9";
sql select * from information_schema.ins_tables where stable_name = "streamt9";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt9" and table_name like "tbn-t1234567890t1_1%";
sql select * from information_schema.ins_tables where stable_name = "streamt9" and table_name like "tbn-t1234567890t1_1%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
$loop_count = 0
loop10:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,ta, * from streamt10;
sql select cc,ta, * from streamt10;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 2 then
print ======rows=$rows
goto loop10
endi
if $data00 != 1 then
return -1
endi
if $data01 != @t12@ then
return -1
endi
if $data03 != @100000.000@ then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt10";
sql select * from information_schema.ins_tables where stable_name = "streamt10";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt10" and table_name like "tbn-t1234567890t1_2%";
sql select * from information_schema.ins_tables where stable_name = "streamt10" and table_name like "tbn-t1234567890t1_2%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
$loop_count = 0
loop11:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print 2 sql select cc,ta,* from streamt11;
sql select cc,ta,* from streamt11;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
# row 0
if $rows < 1 then
print ======rows=$rows
goto loop11
endi
if $data00 != 1 then
return -1
endi
if $data01 != @t12@ then
return -1
endi
if $data03 != @1@ then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 1 then
return -1
endi
print 3 sql select * from information_schema.ins_tables where stable_name = "streamt11";
sql select * from information_schema.ins_tables where stable_name = "streamt11";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print 4 sql select * from information_schema.ins_tables where stable_name = "streamt11" and table_name like "tbn-t1234567890t1_3%";
sql select * from information_schema.ins_tables where stable_name = "streamt11" and table_name like "tbn-t1234567890t1_3%";
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows != 1 then
return -1
endi
print end print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,180 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c checkpointInterval -v 60
system sh/cfg.sh -n dnode1 -c ratioOfVnodeStreamThreads -v 4
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step1
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int)tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt1 as select _wstart, count(a),max(b) from st partition by tbname interval(5s);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(a), max(b) from st interval(5s);
sql create stream streams3 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt3 as select _wstart, count(a), twa(b) from st partition by tbname interval(5s) fill(prev);
sql create stream streams4 trigger force_window_close IGNORE EXPIRED 1 IGNORE UPDATE 1 into streamt4 as select _irowts, interp(a), interp(b) from st partition by tbname every(5s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(now + 3000a,1,1,1);
$loop_count = 0
loop0:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt3;
sql select * from streamt3;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows == 0 then
goto loop0
endi
print select * from streamt4;
sql select * from streamt4;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows == 0 then
goto loop0
endi
sleep 70000
$loop_count = 0
loop0_1:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print sql select * from information_schema.ins_stream_tasks where checkpoint_time is null;
sql select * from information_schema.ins_stream_tasks where checkpoint_time is null;
sleep 10000
if $rows > 0 then
print wait checkpoint.rows = $rows
goto loop0_1
endi
print restart taosd 01 ......
system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start
run tsim/stream/checkTaskStatus.sim
print select * from streamt3;
sql select * from streamt3;
$streamt3_rows = $rows
print =====streamt3_rows=$streamt3_rows
print select * from streamt4;
sql select * from streamt4;
$streamt4_rows = $rows
print =====streamt4_rows=$streamt4_rows
$loop_count = 0
loop1:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt3;
sql select * from streamt3;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows <= $streamt3_rows then
print =====rows=$rows
print =====streamt3_rows=$streamt3_rows
goto loop1
endi
print select * from streamt4;
sql select * from streamt4;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $rows <= $streamt4_rows then
print =====rows=$rows
print =====streamt4_rows=$streamt4_rows
goto loop1
endi
sql insert into t1 values(now + 3000a,10,10,10);
$loop_count = 0
loop2:
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
print select * from streamt1;
sql select * from streamt1;
print $data00 $data01 $data02 $data03 $data04
print $data10 $data11 $data12 $data13 $data14
print $data20 $data21 $data22 $data23 $data24
print $data30 $data31 $data32 $data33 $data34
print $data40 $data41 $data42 $data43 $data44
print $data50 $data51 $data52 $data53 $data54
if $data02 != 10 then
goto loop2
endi
print end
system sh/exec.sh -n dnode1 -s stop -x SIGINT