Merge pull request #23277 from taosdata/fix/TD-26722

set fill history range
This commit is contained in:
Haojun Liao 2023-10-16 17:38:49 +08:00 committed by GitHub
commit 582bedd4bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 15 additions and 2 deletions

View File

@ -636,6 +636,7 @@ typedef struct SStreamFillSupporter {
SSHashObj* pResMap;
bool hasDelete;
SStorageAPI* pAPI;
STimeWindow winRange;
} SStreamFillSupporter;
typedef struct SStreamFillOperatorInfo {
@ -770,6 +771,7 @@ SSDataBlock* getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo);
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);
bool compareVal(const char* v, const SStateKeys* pKey);
bool inWinRange(STimeWindow* range, STimeWindow* cur);
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);

View File

@ -950,7 +950,10 @@ static bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
if (inWinRange(&pFillSup->winRange, &st)) {
buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock);
}
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
}
@ -960,7 +963,8 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
uint64_t groupId = pBlock->info.id.groupId;
SWinKey key = {.groupId = groupId, .ts = pFillInfo->current};
if (pFillSup->hasDelete && !checkResult(pFillSup, pFillInfo->current, groupId)) {
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
if ( ( pFillSup->hasDelete && !checkResult(pFillSup, pFillInfo->current, groupId) ) || !inWinRange(&pFillSup->winRange, &st) ) {
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
pFillInfo->pLinearInfo->winIndex++;
@ -1345,6 +1349,11 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo->pFillInfo->preRowKey = INT64_MIN;
}
pInfo->pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
if (pInfo->pFillSup->winRange.ekey <= 0) {
pInfo->pFillSup->winRange.ekey = INT64_MAX;
}
switch (pBlock->info.type) {
case STREAM_RETRIEVE:
return pBlock;

View File

@ -206,6 +206,8 @@ print create stream streams3 trigger at_once ignore update 1 into streamt3 as se
sql create stream streams3 trigger at_once ignore update 1 into streamt3 as select _wstart c1, count(*) c2, max(b) c3 from st interval(10s);
sleep 2000
sql insert into t1 values(1648791213000,1,1,1);
sql insert into t1 values(1648791213000,2,2,2);