fix:support scalar function with fill
This commit is contained in:
parent
6a1ec6946c
commit
4108d8f898
|
@ -762,12 +762,10 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI
|
||||||
resetPrevAndNextWindow(pFillSup, pState);
|
resetPrevAndNextWindow(pFillSup, pState);
|
||||||
|
|
||||||
SWinKey key = {.ts = ts, .groupId = groupId};
|
SWinKey key = {.ts = ts, .groupId = groupId};
|
||||||
// void* curVal = NULL;
|
|
||||||
int32_t curVLen = 0;
|
int32_t curVLen = 0;
|
||||||
int32_t code = streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen);
|
int32_t code = streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen);
|
||||||
ASSERT(code == TSDB_CODE_SUCCESS);
|
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||||
pFillSup->cur.key = key.ts;
|
pFillSup->cur.key = key.ts;
|
||||||
// pFillSup->cur.pRowVal = curVal;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
|
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
|
||||||
|
@ -952,6 +950,19 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
|
||||||
|
for (int32_t i = pFillSup->numOfFillCols; i < pFillSup->numOfAllCols; ++i) {
|
||||||
|
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
|
||||||
|
int32_t slotId = GET_DEST_SLOT_ID(pFillCol);
|
||||||
|
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
|
||||||
|
SResultCellData* pCurCell = getResultCell(&pFillSup->cur, slotId);
|
||||||
|
pCell->isNull = pCurCell->isNull;
|
||||||
|
if (!pCurCell->isNull) {
|
||||||
|
memcpy(pCell->pData, pCurCell->pData, pCell->bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup,
|
void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup,
|
||||||
SStreamFillInfo* pFillInfo) {
|
SStreamFillInfo* pFillInfo) {
|
||||||
pFillInfo->preRowKey = pFillSup->cur.key;
|
pFillInfo->preRowKey = pFillSup->cur.key;
|
||||||
|
@ -993,6 +1004,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
|
||||||
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
|
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
|
||||||
pFillInfo->pos = FILL_POS_START;
|
pFillInfo->pos = FILL_POS_START;
|
||||||
}
|
}
|
||||||
|
copyNotFillExpData(pFillSup, pFillInfo);
|
||||||
} break;
|
} break;
|
||||||
case TSDB_FILL_PREV: {
|
case TSDB_FILL_PREV: {
|
||||||
if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) ||
|
if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) ||
|
||||||
|
|
|
@ -403,23 +403,46 @@ sql drop database if exists test4;
|
||||||
sql create database test4 vgroups 1;
|
sql create database test4 vgroups 1;
|
||||||
sql use test4;
|
sql use test4;
|
||||||
|
|
||||||
sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));;
|
sql create stable st(ts timestamp,a int,b int,c int, d double, s varchar(20) ) tags(ta int,tb int,tc int);
|
||||||
sql create stream streams4 trigger at_once into streamt4 as select _wstart ts, count(*) c1 from t1 where ts > 1648791210000 and ts < 1648791413000 interval(10s) fill(NULL);
|
sql create table t1 using st tags(1,1,1);
|
||||||
|
sql create table t2 using st tags(2,2,2);
|
||||||
|
|
||||||
|
sql create stream streams4 trigger at_once into streamt4 as select _wstart ts, count(*) c1, concat(tbname, 'aaa') as pname, timezone() from st where ts > 1648791000000 and ts < 1648793000000 partition by tbname interval(10s) fill(NULL);
|
||||||
sql insert into t1 values(1648791213000,1,2,3,1.0,'aaa');
|
sql insert into t1 values(1648791213000,1,2,3,1.0,'aaa');
|
||||||
sql insert into t1 values(1648791233000,1,2,3,1.0,'aaa');
|
sql insert into t1 values(1648791233000,1,2,3,1.0,'aaa');
|
||||||
|
sql insert into t1 values(1648791273000,1,2,3,1.0,'aaa');
|
||||||
|
|
||||||
|
sql insert into t2 values(1648791213000,1,2,3,1.0,'bbb');
|
||||||
|
sql insert into t2 values(1648791233000,1,2,3,1.0,'bbb');
|
||||||
|
sql insert into t2 values(1648791273000,1,2,3,1.0,'bbb');
|
||||||
|
|
||||||
$loop_count = 0
|
$loop_count = 0
|
||||||
|
|
||||||
loop4:
|
loop4:
|
||||||
sleep 200
|
sleep 200
|
||||||
sql select * from streamt4 order by ts;
|
sql select * from streamt4 order by pname, ts;
|
||||||
|
|
||||||
|
print ===> $data[0][0] , $data[0][1] , $data[0][2] , $data[0][3]
|
||||||
|
print ===> $data[1][0] , $data[1][1] , $data[1][2] , $data[1][3]
|
||||||
|
print ===> $data[2][0] , $data[2][1] , $data[2][2] , $data[2][3]
|
||||||
|
print ===> $data[3][0] , $data[3][1] , $data[3][2] , $data[3][3]
|
||||||
|
print ===> $data[4][0] , $data[4][1] , $data[4][2] , $data[4][3]
|
||||||
|
print ===> $data[5][0] , $data[5][1] , $data[5][2] , $data[5][3]
|
||||||
|
print ===> $data[6][0] , $data[6][1] , $data[6][2] , $data[6][3]
|
||||||
|
print ===> $data[7][0] , $data[7][1] , $data[7][2] , $data[7][3]
|
||||||
|
print ===> $data[8][0] , $data[8][1] , $data[8][2] , $data[8][3]
|
||||||
|
print ===> $data[9][0] , $data[9][1] , $data[9][2] , $data[9][3]
|
||||||
|
print ===> $data[10][0] , $data[10][1] , $data[10][2] , $data[10][3]
|
||||||
|
print ===> $data[11][0] , $data[11][1] , $data[11][2] , $data[11][3]
|
||||||
|
print ===> $data[12][0] , $data[12][1] , $data[12][2] , $data[12][3]
|
||||||
|
print ===> $data[13][0] , $data[13][1] , $data[13][2] , $data[13][3]
|
||||||
|
|
||||||
$loop_count = $loop_count + 1
|
$loop_count = $loop_count + 1
|
||||||
if $loop_count == 10 then
|
if $loop_count == 10 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $rows != 3 then
|
if $rows != 14 then
|
||||||
print =====rows=$rows
|
print =====rows=$rows
|
||||||
goto loop4
|
goto loop4
|
||||||
endi
|
endi
|
||||||
|
@ -429,6 +452,67 @@ if $data11 != NULL then
|
||||||
goto loop4
|
goto loop4
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
if $data12 != t1aaa then
|
||||||
|
print =====data12=$data12
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data13 == NULL then
|
||||||
|
print =====data13=$data13
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data32 != t1aaa then
|
||||||
|
print =====data32=$data32
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data42 != t1aaa then
|
||||||
|
print =====data42=$data42
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data52 != t1aaa then
|
||||||
|
print =====data52=$data52
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data81 != NULL then
|
||||||
|
print =====data81=$data81
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data82 != t2aaa then
|
||||||
|
print =====data82=$data82
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data83 == NULL then
|
||||||
|
print =====data83=$data83
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data[10][2] != t2aaa then
|
||||||
|
print =====data[10][2]=$data[10][2]
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data[11][2] != t2aaa then
|
||||||
|
print =====data[11][2]=$data[11][2]
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data[12][2] != t2aaa then
|
||||||
|
print =====data[12][2]=$data[12][2]
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data[12][3] == NULL then
|
||||||
|
print =====data[12][3]=$data[12][3]
|
||||||
|
goto loop4
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue