This commit is contained in:
54liuyao 2024-08-08 19:30:18 +08:00
parent 0a7f3920bb
commit 3a14454fb5
5 changed files with 702 additions and 41 deletions

View File

@ -46,7 +46,6 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index);
void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup);
void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol);
bool hasCurWindow(SStreamFillSupporter* pFillSup);
bool hasPrevWindow(SStreamFillSupporter* pFillSup);
bool hasNextWindow(SStreamFillSupporter* pFillSup);

View File

@ -259,7 +259,7 @@ static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKE
pRowVal->key = ts;
}
void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) {
static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) {
for (int32_t i = 0; i < numOfCol; i++) {
if (!pFillCol[i].notFillCol) {
int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i);
@ -1271,30 +1271,26 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
SPoint value = {0};
value.val = taosMemoryCalloc(1, pColData->info.bytes);
if (!value.val) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
if (pColData == NULL) {
SPoint dummy = {0};
dummy.val = taosMemoryCalloc(1, 1);
taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &dummy);
dummy.val = taosMemoryCalloc(1, 1);
taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &dummy);
continue;
}
SPoint value = {0};
value.val = taosMemoryCalloc(1, pColData->info.bytes);
QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
if (!tmpRes) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
value.val = taosMemoryCalloc(1, pColData->info.bytes);
if (!value.val) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
if (!tmpRes) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
}
}
pFillInfo->pLinearInfo->winIndex = 0;
@ -1318,6 +1314,11 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, i);
if (pColData == NULL) {
pCell->bytes = 1;
pCell->type = 4;
continue;
}
pCell->bytes = pColData->info.bytes;
pCell->type = pColData->info.type;
}

View File

@ -347,10 +347,7 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p
(*pRes) = false;
goto _end;
}
if (IS_INVALID_WIN_KEY(ts)) {
(*pRes) = true;
goto _end;
}
bool ckRes = true;
code = checkResult(pFillSup, ts, pBlock->info.id.groupId, &ckRes);
QUERY_CHECK_CODE(code, lino, _end);
@ -428,7 +425,7 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
bool isFilled = true;
code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
QUERY_CHECK_CODE(code, lino, _end);
} else {
} else if (isInterpFunc(pFillCol->pExpr)) {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot);
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
@ -451,6 +448,11 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
QUERY_CHECK_CODE(code, lino, _end);
destroySPoint(&cur);
} else {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot);
code = setRowCell(pDstCol, pBlock->info.rows, pCell);
QUERY_CHECK_CODE(code, lino, _end);
}
}
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
@ -476,7 +478,7 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
bool res = true;
if (pFillInfo->needFill == false) {
if (pFillInfo->needFill == false && pFillInfo->pos != FILL_POS_INVALID) {
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res, false);
QUERY_CHECK_CODE(code, lino, _end);
return;
@ -847,6 +849,34 @@ _end:
return code;
}
static void copyNonFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
if (!isInterpFunc(pFillCol->pExpr) && !isIrowtsPseudoColumn(pFillCol->pExpr) &&
!isIsfilledPseudoColumn(pFillCol->pExpr)) {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pSrcCell = getResultCell(&pFillSup->cur, srcSlot);
SResultCellData* pDestCell = getResultCell(pFillInfo->pResRow, srcSlot);
pDestCell->isNull = pSrcCell->isNull;
if (!pDestCell->isNull) {
memcpy(pDestCell->pData, pSrcCell->pData, pSrcCell->bytes);
}
}
}
}
static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) {
for (int32_t i = 0; i < numOfCol; i++) {
if (isInterpFunc(pFillCol[i].pExpr)) {
int32_t slotId = pFillCol[i].pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pECell = getResultCell(pEndRow, slotId);
SPoint* pPoint = taosArrayGet(pEndPoins, slotId);
pPoint->key = pEndRow->key;
memcpy(pPoint->val, pECell->pData, pECell->bytes);
}
}
}
static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) {
pFillInfo->needFill = false;
@ -878,7 +908,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
}
copyNotFillExpData(pFillSup, pFillInfo);
copyNonFillValueInfo(pFillSup, pFillInfo);
pFillInfo->pResRow->key = ts;
} break;
case TSDB_FILL_PREV: {
@ -920,7 +950,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pos = FILL_POS_INVALID;
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
pFillSup->next.key = pFillSup->nextOriginKey;
calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols);
pFillSup->prev.key = pFillSup->prevOriginKey;
pFillInfo->pResRow = &pFillSup->prev;
@ -930,7 +960,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
copyCalcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols);
pFillSup->prev.key = pFillSup->prevOriginKey;
pFillInfo->pResRow = &pFillSup->prev;
@ -942,7 +972,7 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pos = FILL_POS_START;
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
pFillSup->next.key = pFillSup->nextOriginKey;
calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
copyCalcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols);
pFillInfo->pResRow = &pFillSup->cur;
pFillInfo->pLinearInfo->hasNext = false;
@ -955,7 +985,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
_end:
if (ts != pFillSup->cur.key) {
SET_WIN_KEY_INVALID(pFillSup->cur.key);
pFillInfo->pos = FILL_POS_INVALID;
}
}
@ -1481,7 +1510,7 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) {
return pRes;
}
static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
static void copyFillValueInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) {
int32_t valueIndex = 0;
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
@ -1525,8 +1554,10 @@ int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes) {
SStreamScanInfo* pInfo = (SStreamScanInfo*)downstream->info;
*ppRes = pInfo->pRes;
return TSDB_CODE_SUCCESS;
} else if (downstream->pDownstream[0] != NULL) {
return getDownstreamRes(downstream->pDownstream[0], ppRes);
} else if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
SStreamPartitionOperatorInfo* pInfo = (SStreamPartitionOperatorInfo*)downstream->info;
*ppRes = pInfo->binfo.pRes;
return TSDB_CODE_SUCCESS;
}
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_FAILED));
return TSDB_CODE_FAILED;
@ -1627,7 +1658,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pDownRes);
setValueForFillInfo(pInfo->pFillSup, pInfo->pFillInfo);
copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo);
if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory;

View File

@ -22,8 +22,28 @@ sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (16
sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,1,2,2,1.0) (1648791217001,2,2,2,2.1);
print sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
#sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
print sql select _irowts, interp(a), _isfilled, b from st where b = 0 and c = 0 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 0 and c = 0 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print sql select _irowts, interp(a), _isfilled, b from st where b = 1 and c =1 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 1 and c = 1 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print sql select _irowts, interp(a), _isfilled, b from st where b = 2 and c = 2 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 2 and c = 2 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(prev) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
@ -290,14 +310,32 @@ sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (1648791217001,2,0,0,2.1);
sql insert into t2 values(1648791212000,0,1,1,0.0) (1648791212001,1,1,1,1.0) (1648791217001,2,1,1,2.1);
sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (1648791217001,2,0,0,2.1) t2 values(1648791212000,0,1,1,0.0) (1648791212001,1,1,1,1.0) (1648791217001,2,1,1,2.1);
sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,1,2,2,1.0) (1648791217001,2,2,2,2.1);
print sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
#sql select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
print sql select _irowts, interp(a), _isfilled, b from st where b = 0 and c = 0 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 0 and c = 0 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print sql select _irowts, interp(a), _isfilled, b from st where b = 1 and c =1 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 1 and c = 1 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print sql select _irowts, interp(a), _isfilled, b from st where b = 2 and c = 2 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 2 and c = 2 partition by tbname, b,c range(1648791212000, 1648791217001) every(1s) fill(next) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15

View File

@ -0,0 +1,592 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step NULL
print =============== create database
sql create database test vgroups 1;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(2,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c every(1s) fill(NULL);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,1,0,0,1.0) (1648791217001,2,0,0,2.1) t2 values(1648791212000,0,1,1,0.0) (1648791212001,1,1,1,1.0) (1648791217001,2,1,1,2.1);
sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,1,2,2,1.0) (1648791217001,2,2,2,2.1);
print sql select _irowts, interp(a), _isfilled, b from st where b = 0 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 0 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print sql select _irowts, interp(a), _isfilled, b from st where b = 1 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 1 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print sql select _irowts, interp(a), _isfilled, b from st where b = 2 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 2 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(NULL) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop0:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt where b = 0 and c = 0 order by 1;
sql select * from streamt where b = 0 and c = 0 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop0
endi
if $data02 != 0 then
print ======data02=$data02
goto loop0
endi
if $data11 != NULL then
print ======data11=$data11
goto loop0
endi
if $data12 != 1 then
print ======data12=$data12
goto loop0
endi
if $data21 != NULL then
print ======data21=$data21
goto loop0
endi
if $data22 != 1 then
print ======data22=$data22
goto loop0
endi
if $data31 != NULL then
print ======data31=$data31
goto loop0
endi
if $data32 != 1 then
print ======data32=$data32
goto loop0
endi
if $data41 != NULL then
print ======data41=$data41
goto loop0
endi
if $data42 != 1 then
print ======data41=$data41
goto loop0
endi
if $data51 != NULL then
print ======data51=$data51
goto loop0
endi
if $data52 != 1 then
print ======data51=$data51
goto loop0
endi
print 1 sql select * from streamt where b = 1 and c = 1 order by 1;
sql select * from streamt where b = 1 and c = 1 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop0
endi
if $data02 != 0 then
print ======data02=$data02
goto loop0
endi
if $data11 != NULL then
print ======data11=$data11
goto loop0
endi
if $data12 != 1 then
print ======data12=$data12
goto loop0
endi
if $data21 != NULL then
print ======data21=$data21
goto loop0
endi
if $data22 != 1 then
print ======data22=$data22
goto loop0
endi
if $data31 != NULL then
print ======data31=$data31
goto loop0
endi
if $data32 != 1 then
print ======data32=$data32
goto loop0
endi
if $data41 != NULL then
print ======data41=$data41
goto loop0
endi
if $data42 != 1 then
print ======data41=$data41
goto loop0
endi
if $data51 != NULL then
print ======data51=$data51
goto loop0
endi
if $data52 != 1 then
print ======data51=$data51
goto loop0
endi
print 2 sql select * from streamt where b = 2 and c = 2 order by 1;
sql select * from streamt where b = 2 and c = 2 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop0
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop0
endi
if $data02 != 0 then
print ======data02=$data02
goto loop0
endi
if $data11 != NULL then
print ======data11=$data11
goto loop0
endi
if $data12 != 1 then
print ======data12=$data12
goto loop0
endi
if $data21 != NULL then
print ======data21=$data21
goto loop0
endi
if $data22 != 1 then
print ======data22=$data22
goto loop0
endi
if $data31 != NULL then
print ======data31=$data31
goto loop0
endi
if $data32 != 1 then
print ======data32=$data32
goto loop0
endi
if $data41 != NULL then
print ======data41=$data41
goto loop0
endi
if $data42 != 1 then
print ======data41=$data41
goto loop0
endi
if $data51 != NULL then
print ======data51=$data51
goto loop0
endi
if $data52 != 1 then
print ======data51=$data51
goto loop0
endi
print step linear
print =============== create database
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create table t3 using st tags(2,2,2);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _irowts, interp(a), _isfilled, tbname, b, c from st partition by tbname, b,c every(1s) fill(linear);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791212000,0,0,0,0.0) (1648791212001,10,0,0,1.0) (1648791217001,20,0,0,2.1) t2 values(1648791212000,0,1,1,0.0) (1648791212001,10,1,1,1.0) (1648791217001,20,1,1,2.1);
sql insert into t3 values(1648791212000,0,2,2,0.0) (1648791212001,10,2,2,1.0) (1648791217001,20,2,2,2.1);
print sql select _irowts, interp(a), _isfilled, b from st where b = 0 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 0 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop1:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print 0 sql select * from streamt2 where b = 0 and c = 0 order by 1;
sql select * from streamt2 where b = 0 and c = 0 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop1
endi
if $data02 != 0 then
print ======data02=$data02
goto loop1
endi
if $data11 != 11 then
print ======data11=$data11
goto loop1
endi
if $data12 != 1 then
print ======data12=$data12
goto loop1
endi
if $data21 != 13 then
print ======data21=$data21
goto loop1
endi
if $data22 != 1 then
print ======data22=$data22
goto loop1
endi
if $data31 != 15 then
print ======data31=$data31
goto loop1
endi
if $data32 != 1 then
print ======data32=$data32
goto loop1
endi
if $data41 != 17 then
print ======data41=$data41
goto loop1
endi
if $data42 != 1 then
print ======data41=$data41
goto loop1
endi
if $data51 != 19 then
print ======data51=$data51
goto loop1
endi
if $data52 != 1 then
print ======data51=$data51
goto loop1
endi
print sql select _irowts, interp(a), _isfilled, b from st where b = 1 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 1 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print 1 sql select * from streamt2 where b = 1 and c = 1 order by 1;
sql select * from streamt2 where b = 1 and c = 1 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop1
endi
if $data02 != 0 then
print ======data02=$data02
goto loop1
endi
if $data11 != 11 then
print ======data11=$data11
goto loop1
endi
if $data12 != 1 then
print ======data12=$data12
goto loop1
endi
if $data21 != 13 then
print ======data21=$data21
goto loop1
endi
if $data22 != 1 then
print ======data22=$data22
goto loop1
endi
if $data31 != 15 then
print ======data31=$data31
goto loop1
endi
if $data32 != 1 then
print ======data32=$data32
goto loop1
endi
if $data41 != 17 then
print ======data41=$data41
goto loop1
endi
if $data42 != 1 then
print ======data41=$data41
goto loop1
endi
if $data51 != 19 then
print ======data51=$data51
goto loop1
endi
if $data52 != 1 then
print ======data51=$data51
goto loop1
endi
print sql select _irowts, interp(a), _isfilled, b from st where b = 2 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1;
sql select _irowts, interp(a), _isfilled, b from st where b = 2 partition by tbname, b, c range(1648791212000, 1648791217001) every(1s) fill(linear) order by b, 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
print 2 sql select * from streamt2 where b = 2 and c = 2 order by 1;
sql select * from streamt2 where b = 2 and c = 2 order by 1;
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
# row 0
if $rows != 6 then
print ======rows=$rows
goto loop1
endi
# row 0
if $data01 != 0 then
print ======data01=$data01
goto loop1
endi
if $data02 != 0 then
print ======data02=$data02
goto loop1
endi
if $data11 != 11 then
print ======data11=$data11
goto loop1
endi
if $data12 != 1 then
print ======data12=$data12
goto loop1
endi
if $data21 != 13 then
print ======data21=$data21
goto loop1
endi
if $data22 != 1 then
print ======data22=$data22
goto loop1
endi
if $data31 != 15 then
print ======data31=$data31
goto loop1
endi
if $data32 != 1 then
print ======data32=$data32
goto loop1
endi
if $data41 != 17 then
print ======data41=$data41
goto loop1
endi
if $data42 != 1 then
print ======data41=$data41
goto loop1
endi
if $data51 != 19 then
print ======data51=$data51
goto loop1
endi
if $data52 != 1 then
print ======data51=$data51
goto loop1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT