refact stream interval

This commit is contained in:
liuyao 2023-08-14 14:24:21 +08:00
parent 9d1e5a512d
commit 57a020edfc
7 changed files with 3267 additions and 3282 deletions

View File

@ -190,4 +190,7 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols);
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta);
#endif // TDENGINE_EXECUTIL_H

View File

@ -695,6 +695,13 @@ uint64_t calcGroupId(char* pData, int32_t len);
void streamOpReleaseState(struct SOperatorInfo* pOperator);
void streamOpReloadState(struct SOperatorInfo* pOperator);
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo);
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);
bool compareVal(const char* v, const SStateKeys* pKey);
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
#ifdef __cplusplus
}
#endif

View File

@ -58,16 +58,6 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->groupId = groupId;
}
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
int64_t* ts = (int64_t*)pColData->pData;
int32_t delta = includeEndpoint ? 1 : 0;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo) {
SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo));
@ -250,7 +240,7 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
pBlock->info.rows, numOfOutput);
}

View File

@ -2187,3 +2187,14 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
}
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
int64_t* ts = (int64_t*)pColData->pData;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}

View File

@ -1070,3 +1070,15 @@ void streamOpReloadState(SOperatorInfo* pOperator) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
bool compareVal(const char* v, const SStateKeys* pKey) {
if (IS_VAR_DATA_TYPE(pKey->type)) {
if (varDataLen(v) != varDataLen(pKey->pData)) {
return false;
} else {
return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
}
} else {
return memcmp(pKey->pData, v, pKey->bytes) == 0;
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff