fix:timestampe is out of order
This commit is contained in:
parent
053f48e33a
commit
7c84ddd46d
|
@ -657,7 +657,6 @@ typedef struct SStreamFillOperatorInfo {
|
|||
SSDataBlock* pRes;
|
||||
SSDataBlock* pSrcBlock;
|
||||
int32_t srcRowIndex;
|
||||
SSDataBlock* pPrevSrcBlock;
|
||||
SSDataBlock* pSrcDelBlock;
|
||||
int32_t srcDelRowIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
|
|
|
@ -470,7 +470,6 @@ static void destroyStreamFillOperatorInfo(void* param) {
|
|||
pInfo->pFillSup = destroyStreamFillSupporter(pInfo->pFillSup);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock);
|
||||
pInfo->pPrevSrcBlock = blockDataDestroy(pInfo->pPrevSrcBlock);
|
||||
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
|
||||
pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList);
|
||||
taosMemoryFree(pInfo);
|
||||
|
@ -992,12 +991,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pInfo->srcRowIndex == 0) {
|
||||
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
|
||||
SSDataBlock* preBlock = pInfo->pPrevSrcBlock;
|
||||
if (preBlock->info.rows > 0) {
|
||||
int preRowId = preBlock->info.rows - 1;
|
||||
SColumnInfoData* pPreTsCol = taosArrayGet(preBlock->pDataBlock, pInfo->primaryTsCol);
|
||||
doFillResults(pOperator, pFillSup, pFillInfo, preBlock, (TSKEY*)pPreTsCol->pData, preRowId, pRes);
|
||||
}
|
||||
pInfo->srcRowIndex++;
|
||||
}
|
||||
|
||||
|
@ -1011,9 +1004,8 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
|
|||
}
|
||||
pInfo->srcRowIndex++;
|
||||
}
|
||||
doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
|
||||
blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
|
||||
blockDataCleanup(pInfo->pPrevSrcBlock);
|
||||
copyDataBlock(pInfo->pPrevSrcBlock, pInfo->pSrcBlock);
|
||||
blockDataCleanup(pInfo->pSrcBlock);
|
||||
}
|
||||
|
||||
|
@ -1173,7 +1165,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
static void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) {
|
||||
blockDataCleanup(pInfo->pPrevSrcBlock);
|
||||
tSimpleHashClear(pInfo->pFillSup->pResMap);
|
||||
pInfo->pFillSup->hasDelete = false;
|
||||
taosArrayClear(pInfo->pFillInfo->delRanges);
|
||||
|
@ -1231,13 +1222,6 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
SSDataBlock* preBlock = pInfo->pPrevSrcBlock;
|
||||
if (preBlock->info.rows > 0) {
|
||||
int preRowId = preBlock->info.rows - 1;
|
||||
SColumnInfoData* pPreTsCol = taosArrayGet(preBlock->pDataBlock, pInfo->primaryTsCol);
|
||||
doFillResults(pOperator, pInfo->pFillSup, pInfo->pFillInfo, preBlock, (TSKEY*)pPreTsCol->pData, preRowId,
|
||||
pInfo->pRes);
|
||||
}
|
||||
pInfo->pFillInfo->preRowKey = INT64_MIN;
|
||||
if (pInfo->pRes->info.rows > 0) {
|
||||
printDataBlock(pInfo->pRes, "stream fill");
|
||||
|
@ -1411,10 +1395,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
pInfo->pPrevSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
|
||||
blockDataEnsureCapacity(pInfo->pPrevSrcBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
|
||||
if (!pInfo->pFillInfo) {
|
||||
|
|
|
@ -127,7 +127,114 @@ if $rows != 13 then
|
|||
goto loop3
|
||||
endi
|
||||
|
||||
sql insert into t2 values(1648791217000,11,11,11,11.0,'eee') (1648791219000,11,11,11,11.0,'eee') t1 values(1648791217000,11,11,11,11.0,'eee') (1648791219000,11,11,11,11.0,'eee');
|
||||
|
||||
$loop_count = 0
|
||||
|
||||
loop4:
|
||||
sleep 200
|
||||
|
||||
$loop_count = $loop_count + 1
|
||||
if $loop_count == 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select * from streamt1 order by group_id, ts;
|
||||
|
||||
if $rows != 20 then
|
||||
print ====streamt1=rows1=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data04 == 0 then
|
||||
print ====streamt1=data04=$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select group_id,count(*) from streamt1 group by group_id;
|
||||
|
||||
if $rows != 2 then
|
||||
print ====streamt1=rows2=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sql select * from streamt2 order by group_id, ts;
|
||||
|
||||
if $rows != 20 then
|
||||
print ====streamt2=rows2=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data04 == 0 then
|
||||
print ====streamt2=data04=$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select group_id,count(*) from streamt2 group by group_id;
|
||||
|
||||
if $rows != 2 then
|
||||
print ====streamt2=rows2=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sql select * from streamt3 order by group_id, ts;
|
||||
|
||||
if $rows != 20 then
|
||||
print ====streamt3=rows3=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data04 == 0 then
|
||||
print ====streamt3=data04=$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select group_id,count(*) from streamt3 group by group_id;
|
||||
|
||||
if $rows != 2 then
|
||||
print ====streamt3=rows2=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
|
||||
sql select * from streamt4 order by group_id, ts;
|
||||
|
||||
if $rows != 20 then
|
||||
print ====streamt4=rows4=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data04 == 0 then
|
||||
print ====streamt4=data04=$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select group_id,count(*) from streamt4 group by group_id;
|
||||
|
||||
if $rows != 2 then
|
||||
print ====streamt4=rows2=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
sql select * from streamt5 order by group_id, ts;
|
||||
|
||||
if $rows != 20 then
|
||||
print ====streamt5=rows5=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
if $data04 == 0 then
|
||||
print ====streamt5=data04=$data04
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select group_id,count(*) from streamt5 group by group_id;
|
||||
|
||||
if $rows != 2 then
|
||||
print ====streamt5=rows2=$rows
|
||||
goto loop4
|
||||
endi
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue