Merge pull request #14407 from taosdata/feature/TD-16874
feat(stream): ignore close window
This commit is contained in:
commit
2f865a43ec
|
@ -1246,6 +1246,9 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
|
||||||
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
|
||||||
|
setBufPageDirty(bufPage, true);
|
||||||
|
releaseBufPage(pResultBuf, bufPage);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
|
bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
|
||||||
|
@ -3171,6 +3174,10 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
|
||||||
getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
|
getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
|
||||||
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
|
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
|
||||||
ASSERT(isInWindow(pCurWin, tsCols[i], gap));
|
ASSERT(isInWindow(pCurWin, tsCols[i], gap));
|
||||||
|
if (pCurWin->pos.pageId == -1) {
|
||||||
|
// window has been closed.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
|
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
|
||||||
if (result) {
|
if (result) {
|
||||||
taosArrayPush(result, pCurWin);
|
taosArrayPush(result, pCurWin);
|
||||||
|
@ -3246,14 +3253,14 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
|
||||||
setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput,
|
setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput,
|
||||||
pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo);
|
pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo);
|
||||||
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo);
|
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo);
|
||||||
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pChWin->pos.pageId);
|
SFilePage* bufPage = getBufPage(pChInfo->streamAggSup.pResultBuf, pChWin->pos.pageId);
|
||||||
setBufPageDirty(bufPage, true);
|
releaseBufPage(pChInfo->streamAggSup.pResultBuf, bufPage);
|
||||||
releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
} else if (!pChWin->isClosed) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId);
|
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId);
|
||||||
ASSERT(size > 0);
|
ASSERT(size > 0);
|
||||||
setBufPageDirty(bufPage, true);
|
setBufPageDirty(bufPage, true);
|
||||||
|
@ -3265,7 +3272,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
|
||||||
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
|
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
|
||||||
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
|
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
|
||||||
|
|
||||||
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn) {
|
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup,
|
||||||
|
SArray* pClosed, __get_win_info_ fn, bool delete) {
|
||||||
// Todo(liuyao) save window to tdb
|
// Todo(liuyao) save window to tdb
|
||||||
void** pIte = NULL;
|
void** pIte = NULL;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
|
@ -3279,10 +3287,18 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
|
||||||
if (isCloseWindow(&pSeWin->win, pTwSup)) {
|
if (isCloseWindow(&pSeWin->win, pTwSup)) {
|
||||||
if (!pSeWin->isClosed) {
|
if (!pSeWin->isClosed) {
|
||||||
pSeWin->isClosed = true;
|
pSeWin->isClosed = true;
|
||||||
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
|
||||||
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed);
|
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pSeWin->isOutput = true;
|
pSeWin->isOutput = true;
|
||||||
}
|
}
|
||||||
|
if (delete) {
|
||||||
|
taosArrayRemove(pWins, i);
|
||||||
|
i--;
|
||||||
|
size = taosArrayGetSize(pWins);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -3292,6 +3308,16 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete) {
|
||||||
|
int32_t size = taosArrayGetSize(pChildren);
|
||||||
|
for (int32_t i = 0; i < size; i++) {
|
||||||
|
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
|
||||||
|
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
|
||||||
|
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
|
||||||
|
closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) {
|
int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) {
|
||||||
void** pIte = NULL;
|
void** pIte = NULL;
|
||||||
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
||||||
|
@ -3339,6 +3365,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "Final Session Recv" : "Single Session Recv");
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
|
@ -3385,7 +3412,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
// restore the value
|
// restore the value
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
|
||||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession);
|
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
||||||
|
getResWinForSession, pInfo->ignoreCloseWindow);
|
||||||
|
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
|
||||||
copyUpdateResult(pStUpdated, pUpdated);
|
copyUpdateResult(pStUpdated, pUpdated);
|
||||||
taosHashCleanup(pStUpdated);
|
taosHashCleanup(pStUpdated);
|
||||||
|
|
||||||
|
@ -3437,10 +3466,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||||
if (pInfo->pDelRes->info.rows > 0) {
|
if (pInfo->pDelRes->info.rows > 0) {
|
||||||
|
printDataBlock(pInfo->pDelRes, "Semi Session");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pInfo->binfo.pRes->info.rows == 0) {
|
if (pBInfo->pRes->info.rows == 0) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
if (pInfo->pUpdateRes->info.rows == 0) {
|
if (pInfo->pUpdateRes->info.rows == 0) {
|
||||||
// semi interval operator clear disk buffer
|
// semi interval operator clear disk buffer
|
||||||
|
@ -3449,9 +3479,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
|
printDataBlock(pInfo->pUpdateRes, "Semi Session");
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
return pInfo->binfo.pRes;
|
printDataBlock(pBInfo->pRes, "Semi Session");
|
||||||
|
return pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
@ -3495,21 +3527,24 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
|
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
|
||||||
pSup->rowEntryInfoOffset);
|
pSup->rowEntryInfoOffset);
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||||
if (pInfo->pDelRes->info.rows > 0) {
|
if (pInfo->pDelRes->info.rows > 0) {
|
||||||
|
printDataBlock(pInfo->pDelRes, "Semi Session");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pInfo->binfo.pRes->info.rows == 0) {
|
if (pBInfo->pRes->info.rows == 0) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
if (pInfo->pUpdateRes->info.rows == 0) {
|
if (pInfo->pUpdateRes->info.rows == 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
|
printDataBlock(pInfo->pUpdateRes, "Semi Session");
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
printDataBlock(pBInfo->pRes, "Semi Session");
|
||||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3867,7 +3902,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
// restore the value
|
// restore the value
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
|
||||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState);
|
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
||||||
|
getResWinForState, pInfo->ignoreCloseWindow);
|
||||||
|
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
|
||||||
copyUpdateResult(pSeUpdated, pUpdated);
|
copyUpdateResult(pSeUpdated, pUpdated);
|
||||||
taosHashCleanup(pSeUpdated);
|
taosHashCleanup(pSeUpdated);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue