fix issue

This commit is contained in:
54liuyao 2024-08-13 14:05:16 +08:00
parent 4fa67bd6cc
commit 5a2b89c357
4 changed files with 546 additions and 7 deletions

View File

@ -41,7 +41,6 @@ int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap);
void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index,
SSDataBlock* pBlock);
int32_t initResultBuf(SStreamFillSupporter* pFillSup);
SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pRes);
SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index);

View File

@ -1166,7 +1166,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
return pRes;
}
int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols;
for (int i = 0; i < pFillSup->numOfAllCols; i++) {
SFillColInfo* pCol = &pFillSup->pAllColInfo[i];

View File

@ -256,7 +256,21 @@ _end:
return code;
}
static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprInfo* pExprInfo, int32_t numOfExprs,
static int32_t initTimeSliceResultBuf(SStreamFillSupporter* pFillSup, SExprSupp* pExpSup) {
pFillSup->rowSize = sizeof(TSKEY) + getResultRowSize(pExpSup->pCtx, pFillSup->numOfAllCols);
pFillSup->next.key = INT64_MIN;
pFillSup->nextNext.key = INT64_MIN;
pFillSup->prev.key = INT64_MIN;
pFillSup->cur.key = INT64_MIN;
pFillSup->next.pRowVal = NULL;
pFillSup->nextNext.pRowVal = NULL;
pFillSup->prev.pRowVal = NULL;
pFillSup->cur.pRowVal = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprSupp* pExprSup, int32_t numOfExprs,
SStreamFillSupporter** ppResFillSup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
@ -265,7 +279,7 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE
pFillSup->numOfFillCols = numOfExprs;
int32_t numOfNotFillCols = 0;
pFillSup->pAllColInfo = createFillColInfo(pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols,
pFillSup->pAllColInfo = createFillColInfo(pExprSup->pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols,
(const SNodeListNode*)(pPhyFillNode->pFillValues));
QUERY_CHECK_NULL(pFillSup->pAllColInfo, code, lino, _end, terrno);
@ -283,7 +297,7 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE
pFillSup->pResMap = tSimpleHashInit(16, hashFn);
QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
code = initResultBuf(pFillSup);
code = initTimeSliceResultBuf(pFillSup, pExprSup);
QUERY_CHECK_CODE(code, lino, _end);
pFillSup->hasDelete = false;
@ -572,7 +586,7 @@ static void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) {
pPoint->pLeftRow = pPoint->pRightRow;
} else {
setResultRowData(&pPoint->pLeftRow, pPoint->pResPos->pRowBuff);
void* pBuff = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize + sizeof(TSKEY));
void* pBuff = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize);
setResultRowData(&pPoint->pRightRow, pBuff);
}
}
@ -1609,7 +1623,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId;
pInfo->pFillSup = NULL;
code = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs, &pInfo->pFillSup);
code = initTimeSliceFillSup(pInterpPhyNode, pExpSup, numOfExprs, &pInfo->pFillSup);
QUERY_CHECK_CODE(code, lino, _error);
int32_t ratio = 1;
@ -1659,6 +1673,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pDownRes);
copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo);
pInfo->ignoreNull = getIgoreNullRes(pExpSup);
if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory;

View File

@ -0,0 +1,525 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step prev
sql create database test1 vgroups 4;
sql use test1;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(prev);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,0,0,3) (1648791212000,10,10,3) (1648791212001,11,NULL,3);
sleep 500
sql insert into t1 values(1648791214001,20,NULL,3) (1648791213000,22,NULL,3);
print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev);
sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(prev);
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print sql select * from streamt1;
sql select * from streamt1;
if $rows != 6 then
print ======rows=$rows
goto loop0
endi
if $data01 != 0 then
print ======data01=$data01
goto loop0
endi
if $data02 != 10 then
print ======data02=$data02
goto loop0
endi
if $data11 != 1 then
print ======data11=$data11
goto loop0
endi
if $data12 != 10 then
print ======data12=$data12
goto loop0
endi
if $data21 != 1 then
print ======data21=$data21
goto loop0
endi
if $data22 != 10 then
print ======data22=$data22
goto loop0
endi
if $data31 != 1 then
print ======data31=$data31
goto loop0
endi
if $data32 != 10 then
print ======data32=$data32
goto loop0
endi
if $data41 != 1 then
print ======data41=$data41
goto loop0
endi
if $data42 != 10 then
print ======data42=$data42
goto loop0
endi
if $data51 != 0 then
print ======data51=$data51
goto loop0
endi
if $data52 != 0 then
print ======data52=$data52
goto loop0
endi
print step next
sql create database test2 vgroups 4;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(next);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,11,11,3) (1648791212000,10,10,3) (1648791212001,11,NULL,3);
sleep 500
sql insert into t1 values(1648791214001,20,NULL,3) (1648791213000,22,NULL,3);
print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(next);
sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(next);
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop1:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print sql select * from streamt2;
sql select * from streamt2;
if $rows != 6 then
print ======rows=$rows
goto loop1
endi
if $data01 != 0 then
print ======data01=$data01
goto loop1
endi
if $data02 != 10 then
print ======data02=$data02
goto loop1
endi
if $data11 != 1 then
print ======data11=$data11
goto loop1
endi
if $data12 != 11 then
print ======data12=$data12
goto loop1
endi
if $data21 != 1 then
print ======data21=$data21
goto loop1
endi
if $data22 != 11 then
print ======data22=$data22
goto loop1
endi
if $data31 != 1 then
print ======data31=$data31
goto loop1
endi
if $data32 != 11 then
print ======data32=$data32
goto loop1
endi
if $data41 != 1 then
print ======data41=$data41
goto loop1
endi
if $data42 != 11 then
print ======data42=$data42
goto loop1
endi
if $data51 != 0 then
print ======data51=$data51
goto loop1
endi
if $data52 != 11 then
print ======data52=$data52
goto loop1
endi
print step value
sql create database test3 vgroups 4;
sql use test3;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams3_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_1 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(NULL);
sql create stream streams3_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_2 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(value, 110);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,11,11,3) (1648791212000,10,10,3) (1648791212001,11,NULL,3);
sleep 500
sql insert into t1 values(1648791214001,20,NULL,3) (1648791213000,22,NULL,3);
print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(NULL);
sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(NULL);
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop3:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print sql select * from streamt3_1;
sql select * from streamt3_1;
if $rows != 6 then
print ======rows=$rows
goto loop3
endi
if $data01 != 0 then
print ======data01=$data01
goto loop3
endi
if $data02 != 10 then
print ======data02=$data02
goto loop3
endi
if $data11 != 1 then
print ======data11=$data11
goto loop3
endi
if $data12 != NULL then
print ======data12=$data12
goto loop3
endi
if $data21 != 1 then
print ======data21=$data21
goto loop3
endi
if $data22 != NULL then
print ======data22=$data22
goto loop3
endi
if $data31 != 1 then
print ======data31=$data31
goto loop3
endi
if $data32 != NULL then
print ======data32=$data32
goto loop3
endi
if $data41 != 1 then
print ======data41=$data41
goto loop3
endi
if $data42 != NULL then
print ======data42=$data42
goto loop3
endi
if $data51 != 0 then
print ======data51=$data51
goto loop3
endi
if $data52 != 11 then
print ======data52=$data52
goto loop3
endi
print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(value, 110);
sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(value, 110);
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop3_2:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print sql select * from streamt3_2;
sql select * from streamt3_2;
if $rows != 6 then
print ======rows=$rows
goto loop3_2
endi
if $data01 != 0 then
print ======data01=$data01
goto loop3_2
endi
if $data02 != 10 then
print ======data02=$data02
goto loop3_2
endi
if $data11 != 1 then
print ======data11=$data11
goto loop3_2
endi
if $data12 != 110 then
print ======data12=$data12
goto loop3_2
endi
if $data21 != 1 then
print ======data21=$data21
goto loop3_2
endi
if $data22 != 110 then
print ======data22=$data22
goto loop3_2
endi
if $data31 != 1 then
print ======data31=$data31
goto loop3_2
endi
if $data32 != 110 then
print ======data32=$data32
goto loop3_2
endi
if $data41 != 1 then
print ======data41=$data41
goto loop3_2
endi
if $data42 != 110 then
print ======data42=$data42
goto loop3_2
endi
if $data51 != 0 then
print ======data51=$data51
goto loop3_2
endi
if $data52 != 11 then
print ======data52=$data52
goto loop3_2
endi
print step linear
sql create database test4 vgroups 4;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname every(1s) fill(linear);
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791217000,11,55,3) (1648791212000,10,10,3) (1648791212001,11,NULL,3);
sleep 500
sql insert into t1 values(1648791214001,20,NULL,3) (1648791213000,22,NULL,3);
print sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(linear);
sql select _irowts, _isfilled as a1, interp(b, 1) from st partition by tbname range(1648791212000, 1648791217000) every(1s) fill(linear);
print $data00 $data01 $data02 $data03 $data04 $data05
print $data10 $data11 $data12 $data13 $data14 $data15
print $data20 $data21 $data22 $data23 $data24 $data25
print $data30 $data31 $data32 $data33 $data34 $data35
print $data40 $data41 $data42 $data43 $data44 $data45
print $data50 $data51 $data52 $data53 $data54 $data55
$loop_count = 0
loop4:
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sleep 300
print sql select * from streamt4;
sql select * from streamt4;
if $rows != 6 then
print ======rows=$rows
goto loop4
endi
if $data01 != 0 then
print ======data01=$data01
goto loop4
endi
if $data02 != 10 then
print ======data02=$data02
goto loop4
endi
if $data11 != 1 then
print ======data11=$data11
goto loop4
endi
if $data12 != 19 then
print ======data12=$data12
goto loop4
endi
if $data21 != 1 then
print ======data21=$data21
goto loop4
endi
if $data22 != 28 then
print ======data22=$data22
goto loop4
endi
if $data31 != 1 then
print ======data31=$data31
goto loop4
endi
if $data32 != 37 then
print ======data32=$data32
goto loop4
endi
if $data41 != 1 then
print ======data41=$data41
goto loop4
endi
if $data42 != 46 then
print ======data42=$data42
goto loop4
endi
if $data51 != 0 then
print ======data51=$data51
goto loop4
endi
if $data52 != 55 then
print ======data52=$data52
goto loop4
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT