count window fill history

This commit is contained in:
54liuyao 2024-02-05 18:37:24 +08:00
parent 2bba011e4e
commit eaf10fcb43
3 changed files with 40 additions and 6 deletions

View File

@ -64,12 +64,26 @@ int32_t doCountWindowAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SWindowRowsSup* pRowSup = &pInfo->winSup;
int32_t rowIndex = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i++) {
for (int32_t i = 0; i < pBlock->info.rows;) {
// todo(liuyao) 1.如果group id发生变化获取新group id上一次的window的缓存并把旧group id的信息存入缓存。
// 没有sliding
// 只需要一个缓存即可
// 1.如果group id发生变化说明本group窗口全部结束输出上次的缓存这里需要判断缓存中是否有数据)
// 设置缓存
// 2.计算 当前需要合并的行数
// 3.做聚集计算。
// 4.达到行数将结果存入pInfo->res中。
// 有sliding
// 缓存是一个队列
// 1.如果group id发生变化说明本group窗口全部结束输出上次的缓存这里需要判断缓存中是否有数据可能输出多行)
// pInfo记录队列的起始位置
// 2.计算 当前需要合并的行数
// 3.做聚集计算。
// 4.达到行数pInfo记录队列的起始位置后移将结果存入pInfo->res中。
i += step;
}
return code;

View File

@ -117,6 +117,7 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows,
SSHashObj* pStDeleted, bool* pRebuild) {
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
int32_t num = 0;
for (int32_t i = start; i < rows; i++) {
if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) {
@ -148,7 +149,7 @@ int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pW
if (needDelState) {
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
if (pWinInfo->winInfo.pStatePos->needFree) {
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pWinInfo->winInfo.sessionWin);
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
}
}
@ -576,7 +577,15 @@ static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) {
}
void streamCountReleaseState(SOperatorInfo* pOperator) {
//nothing
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = sizeof(TSKEY);
char* pBuff = taosMemoryCalloc(1, resSize);
memcpy(pBuff, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
qDebug("===stream=== count window operator relase state. ");
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_COUNT_OP_STATE_NAME,
strlen(STREAM_COUNT_OP_STATE_NAME), pBuff, resSize);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
taosMemoryFreeClear(pBuff);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
@ -584,11 +593,22 @@ void streamCountReleaseState(SOperatorInfo* pOperator) {
}
void streamCountReloadState(SOperatorInfo* pOperator) {
// nothing
SStreamCountAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME,
strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size);
TSKEY ts = *(TSKEY*)pBuf;
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
taosMemoryFree(pBuf);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
}
SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,

View File

@ -786,10 +786,10 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
code = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
streamStateFreeCur(pCur);
goto _end;
} else {
taosMemoryFree(p);
}
taosMemoryFree(p);
streamStateFreeCur(pCur);
}
}