Merge pull request #14240 from taosdata/szhou/feature/merge-interval-2
fix: overlapping intervals problem
This commit is contained in:
commit
8e2b93f0e1
|
@ -3900,18 +3900,22 @@ _error:
|
||||||
// merge interval operator
|
// merge interval operator
|
||||||
typedef struct SMergeIntervalAggOperatorInfo {
|
typedef struct SMergeIntervalAggOperatorInfo {
|
||||||
SIntervalAggOperatorInfo intervalAggOperatorInfo;
|
SIntervalAggOperatorInfo intervalAggOperatorInfo;
|
||||||
|
SList* groupIntervals;
|
||||||
SHashObj* groupIntervalHash;
|
SListIter groupIntervalsIter;
|
||||||
void* groupIntervalIter;
|
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SSDataBlock* prefetchedBlock;
|
SSDataBlock* prefetchedBlock;
|
||||||
bool inputBlocksFinished;
|
bool inputBlocksFinished;
|
||||||
} SMergeIntervalAggOperatorInfo;
|
} SMergeIntervalAggOperatorInfo;
|
||||||
|
|
||||||
|
typedef struct SGroupTimeWindow {
|
||||||
|
uint64_t groupId;
|
||||||
|
STimeWindow window;
|
||||||
|
} SGroupTimeWindow;
|
||||||
|
|
||||||
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
|
SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param;
|
||||||
taosHashCleanup(miaInfo->groupIntervalHash);
|
tdListFree(miaInfo->groupIntervals);
|
||||||
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
|
destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3940,15 +3944,22 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
|
||||||
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
bool ascScan = (iaInfo->order == TSDB_ORDER_ASC);
|
||||||
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
|
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
|
||||||
|
|
||||||
STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId));
|
SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin};
|
||||||
if (prevWin == NULL) {
|
tdListAppend(miaInfo->groupIntervals, &groupTimeWindow);
|
||||||
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((ascScan && newWin->skey > prevWin->skey || (!ascScan) && newWin->skey < prevWin->skey)) {
|
SListIter iter = {0};
|
||||||
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
|
tdListInitIter(miaInfo->groupIntervals, &iter, TD_LIST_FORWARD);
|
||||||
taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow));
|
SListNode* listNode = NULL;
|
||||||
|
while ((listNode = tdListNext(&iter)) != NULL) {
|
||||||
|
SGroupTimeWindow* prevGrpWin = (SGroupTimeWindow*)listNode->data;
|
||||||
|
if (prevGrpWin->groupId != tableGroupId ) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
STimeWindow* prevWin = &prevGrpWin->window;
|
||||||
|
if ((ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey)) {
|
||||||
|
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
|
||||||
|
tdListPopNode(miaInfo->groupIntervals, listNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -4075,6 +4086,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
|
tdListInitIter(miaInfo->groupIntervals, &miaInfo->groupIntervalsIter, TD_LIST_FORWARD);
|
||||||
miaInfo->inputBlocksFinished = true;
|
miaInfo->inputBlocksFinished = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -4100,14 +4112,12 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (miaInfo->inputBlocksFinished) {
|
if (miaInfo->inputBlocksFinished) {
|
||||||
void* win = taosHashIterate(miaInfo->groupIntervalHash, miaInfo->groupIntervalIter);
|
SListNode* listNode = tdListNext(&miaInfo->groupIntervalsIter);
|
||||||
if (win != NULL) {
|
|
||||||
miaInfo->groupIntervalIter = win;
|
|
||||||
|
|
||||||
size_t len = 0;
|
if (listNode != NULL) {
|
||||||
uint64_t* pTableGroupId = taosHashGetKey(win, &len);
|
SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
|
||||||
finalizeWindowResult(pOperator, *pTableGroupId, win, pRes);
|
finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
|
||||||
pRes->info.groupId = *pTableGroupId;
|
pRes->info.groupId = grpWin->groupId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4129,8 +4139,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
miaInfo->groupIntervalHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
|
miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow));
|
||||||
miaInfo->groupIntervalIter = NULL;
|
|
||||||
|
|
||||||
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue