feat(stream): stream state support delete
This commit is contained in:
parent
f9c89fdd7f
commit
ce6e505084
|
@ -3189,7 +3189,7 @@ SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY start
|
||||||
*pIndex = index + 1;
|
*pIndex = index + 1;
|
||||||
return pWin;
|
return pWin;
|
||||||
} else if (endTs != INT64_MIN && isInWindow(pWin, endTs, gap)) {
|
} else if (endTs != INT64_MIN && isInWindow(pWin, endTs, gap)) {
|
||||||
*pIndex = index;
|
*pIndex = index + 1;
|
||||||
return pWin;
|
return pWin;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3442,7 +3442,7 @@ void deleteWindow(SArray* pWinInfos, int32_t index) {
|
||||||
taosArrayRemove(pWinInfos, index);
|
taosArrayRemove(pWinInfos, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doDeleteSessionWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result) {
|
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result) {
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
|
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
|
||||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
@ -3700,13 +3700,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
// gap must be 0
|
// gap must be 0
|
||||||
doDeleteSessionWindows(&pInfo->streamAggSup, pBlock, 0, pWins);
|
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins);
|
||||||
if (IS_FINAL_OP(pInfo)) {
|
if (IS_FINAL_OP(pInfo)) {
|
||||||
int32_t childIndex = getChildIndex(pBlock);
|
int32_t childIndex = getChildIndex(pBlock);
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||||
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
|
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
|
||||||
// gap must be 0
|
// gap must be 0
|
||||||
doDeleteSessionWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL);
|
doDeleteTimeWindows(&pChildInfo->streamAggSup, pBlock, 0, NULL);
|
||||||
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
|
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
|
||||||
}
|
}
|
||||||
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
|
copyDeleteWindowInfo(pWins, pInfo->pStDeleted);
|
||||||
|
@ -3840,7 +3840,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
// gap must be 0
|
// gap must be 0
|
||||||
doDeleteSessionWindows(&pInfo->streamAggSup, pBlock, 0, NULL);
|
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, NULL);
|
||||||
copyDataBlock(pInfo->pDelRes, pBlock);
|
copyDataBlock(pInfo->pDelRes, pBlock);
|
||||||
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||||
break;
|
break;
|
||||||
|
@ -4232,6 +4232,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
|
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
|
||||||
pSeUpdated, pInfo->pSeDeleted);
|
pSeUpdated, pInfo->pSeDeleted);
|
||||||
continue;
|
continue;
|
||||||
|
} else if (pBlock->info.type == STREAM_DELETE_DATA) {
|
||||||
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
|
doDeleteTimeWindows(&pInfo->streamAggSup, pBlock, 0, pWins);
|
||||||
|
copyDeleteWindowInfo(pWins, pInfo->pSeDeleted);
|
||||||
|
taosArrayDestroy(pWins);
|
||||||
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
} else if (pBlock->info.type == STREAM_GET_ALL) {
|
||||||
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForState);
|
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pUpdated, getResWinForState);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -518,8 +518,8 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
|
||||||
#ifdef BUF_PAGE_DEBUG
|
#ifdef BUF_PAGE_DEBUG
|
||||||
uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%"PRId64, pi->pageId, pi->used, pi->offset);
|
uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%"PRId64, pi->pageId, pi->used, pi->offset);
|
||||||
#endif
|
#endif
|
||||||
assert(pi->pData != NULL && pi->used == true);
|
// assert(pi->pData != NULL && pi->used == true);
|
||||||
// assert(pi->pData != NULL);
|
assert(pi->pData != NULL);
|
||||||
pi->used = false;
|
pi->used = false;
|
||||||
pBuf->statis.releasePages += 1;
|
pBuf->statis.releasePages += 1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue