fix: add multi-timeline support for session and state
This commit is contained in:
parent
9891f78961
commit
009777a727
|
@ -682,6 +682,7 @@ typedef struct SWindowRowsSup {
|
|||
TSKEY prevTs;
|
||||
int32_t startRowIndex;
|
||||
int32_t numOfRows;
|
||||
uint64_t groupId;
|
||||
} SWindowRowsSup;
|
||||
|
||||
typedef struct SSessionAggOperatorInfo {
|
||||
|
|
|
@ -90,16 +90,18 @@ static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, b
|
|||
ts[4] = pWin->ekey + delta; // window end key
|
||||
}
|
||||
|
||||
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts) {
|
||||
static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
|
||||
pRowSup->win.ekey = ts;
|
||||
pRowSup->prevTs = ts;
|
||||
pRowSup->numOfRows += 1;
|
||||
pRowSup->groupId = groupId;
|
||||
}
|
||||
|
||||
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex) {
|
||||
static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsList, int32_t rowIndex, uint64_t groupId) {
|
||||
pRowSup->startRowIndex = rowIndex;
|
||||
pRowSup->numOfRows = 0;
|
||||
pRowSup->win.skey = tsList[rowIndex];
|
||||
pRowSup->groupId = groupId;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey,
|
||||
|
@ -1142,7 +1144,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
|
||||
char* val = colDataGetData(pStateColInfoData, j);
|
||||
|
||||
if (!pInfo->hasKey) {
|
||||
if (gid != pRowSup->groupId || !pInfo->hasKey) {
|
||||
// todo extract method
|
||||
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
|
||||
varDataCopy(pInfo->stateKey.pData, val);
|
||||
|
@ -1152,10 +1154,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
|
||||
pInfo->hasKey = true;
|
||||
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j);
|
||||
doKeepTuple(pRowSup, tsList[j]);
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
||||
doKeepTuple(pRowSup, tsList[j], gid);
|
||||
} else if (compareVal(val, &pInfo->stateKey)) {
|
||||
doKeepTuple(pRowSup, tsList[j]);
|
||||
doKeepTuple(pRowSup, tsList[j], gid);
|
||||
if (j == 0 && pRowSup->startRowIndex != 0) {
|
||||
pRowSup->startRowIndex = 0;
|
||||
}
|
||||
|
@ -1177,8 +1179,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
// here we start a new session window
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j);
|
||||
doKeepTuple(pRowSup, tsList[j]);
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
||||
doKeepTuple(pRowSup, tsList[j], gid);
|
||||
|
||||
// todo extract method
|
||||
if (IS_VAR_DATA_TYPE(pInfo->stateKey.type)) {
|
||||
|
@ -1911,7 +1913,7 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// todo handle multiple tables cases.
|
||||
// todo handle multiple timeline cases. assume no timeline interweaving
|
||||
static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
|
@ -1935,12 +1937,13 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
// In case of ascending or descending order scan data, only one time window needs to be kepted for each table.
|
||||
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
|
||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||
if (pInfo->winSup.prevTs == INT64_MIN) {
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j);
|
||||
doKeepTuple(pRowSup, tsList[j]);
|
||||
} else if (tsList[j] - pRowSup->prevTs <= gap && (tsList[j] - pRowSup->prevTs) >= 0) {
|
||||
if (gid != pRowSup->groupId || pInfo->winSup.prevTs == INT64_MIN) {
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
||||
doKeepTuple(pRowSup, tsList[j], gid);
|
||||
} else if ((tsList[j] - pRowSup->prevTs >= 0) && tsList[j] - pRowSup->prevTs <= gap ||
|
||||
(pRowSup->prevTs - tsList[j] >= 0 ) && (pRowSup->prevTs - tsList[j] <= gap)) {
|
||||
// The gap is less than the threshold, so it belongs to current session window that has been opened already.
|
||||
doKeepTuple(pRowSup, tsList[j]);
|
||||
doKeepTuple(pRowSup, tsList[j], gid);
|
||||
if (j == 0 && pRowSup->startRowIndex != 0) {
|
||||
pRowSup->startRowIndex = 0;
|
||||
}
|
||||
|
@ -1963,8 +1966,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
|
||||
// here we start a new session window
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j);
|
||||
doKeepTuple(pRowSup, tsList[j]);
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
||||
doKeepTuple(pRowSup, tsList[j], gid);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue