Merge pull request #16322 from taosdata/feature/TD-18530
fix(stream): state window update
This commit is contained in:
commit
3e2f1d2cb7
|
@ -1016,7 +1016,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||||
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
|
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
|
||||||
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
||||||
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
|
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
|
||||||
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
|
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID);
|
||||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||||
|
|
||||||
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||||
|
|
|
@ -1086,7 +1086,10 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
||||||
|
|
||||||
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
int32_t dummy = 0;
|
int32_t dummy = 0;
|
||||||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||||
uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[i]);
|
uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[i]);
|
||||||
|
@ -1100,9 +1103,13 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
||||||
SResultWindowInfo* pEndWin =
|
SResultWindowInfo* pEndWin =
|
||||||
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
|
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
|
||||||
ASSERT(pEndWin);
|
ASSERT(pEndWin);
|
||||||
|
TSKEY ts = INT64_MIN;
|
||||||
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
|
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
|
||||||
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
|
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
|
||||||
|
colDataAppendNULL(pDestUidCol, i);
|
||||||
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
|
colDataAppend(pDestGpCol, i, (const char*)&groupId, false);
|
||||||
|
colDataAppendNULL(pDestCalStartTsCol, i);
|
||||||
|
colDataAppendNULL(pDestCalEndTsCol, i);
|
||||||
pDestBlock->info.rows++;
|
pDestBlock->info.rows++;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1157,13 +1164,13 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid) {
|
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID) {
|
||||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
|
SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, uidCol);
|
||||||
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
|
colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false);
|
||||||
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
|
colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false);
|
||||||
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false);
|
colDataAppend(pUidCol, pBlock->info.rows, (const char*)pID, false);
|
||||||
pBlock->info.rows++;
|
pBlock->info.rows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1190,7 +1197,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
||||||
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
|
bool closedWin = isClosed && isSignleIntervalWindow(pInfo) &&
|
||||||
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
|
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup);
|
||||||
if ((update || closedWin) && out) {
|
if ((update || closedWin) && out) {
|
||||||
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
|
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, UID_COLUMN_INDEX, &pBlock->info.uid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (out) {
|
if (out) {
|
||||||
|
|
|
@ -3951,11 +3951,13 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
|
||||||
int32_t numOfOutput, int64_t gap, SArray* result) {
|
int32_t numOfOutput, int64_t gap, SArray* result) {
|
||||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||||
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
|
TSKEY* tsCols = (TSKEY*)pColDataInfo->pData;
|
||||||
|
SColumnInfoData* pGpDataInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
|
uint64_t* gpCols = (uint64_t*)pGpDataInfo->pData;
|
||||||
int32_t step = 0;
|
int32_t step = 0;
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
||||||
int32_t winIndex = 0;
|
int32_t winIndex = 0;
|
||||||
SResultWindowInfo* pCurWin =
|
SResultWindowInfo* pCurWin =
|
||||||
getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
|
getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex);
|
||||||
if (!pCurWin || pCurWin->pos.pageId == -1) {
|
if (!pCurWin || pCurWin->pos.pageId == -1) {
|
||||||
// window has been closed.
|
// window has been closed.
|
||||||
step = 1;
|
step = 1;
|
||||||
|
@ -4168,13 +4170,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, 0,
|
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, pOperator->exprSupp.numOfExprs, 0,
|
||||||
pWins);
|
pWins);
|
||||||
if (IS_FINAL_OP(pInfo)) {
|
if (IS_FINAL_OP(pInfo)) {
|
||||||
int32_t childIndex = getChildIndex(pBlock);
|
int32_t childIndex = getChildIndex(pBlock);
|
||||||
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
|
||||||
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
|
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
|
||||||
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, 0, pChildOp->exprSupp.numOfExprs,
|
doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs,
|
||||||
0, NULL);
|
0, NULL);
|
||||||
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
|
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
|
||||||
}
|
}
|
||||||
|
@ -4285,21 +4287,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows > 0) {
|
if (pBInfo->pRes->info.rows > 0) {
|
||||||
printDataBlock(pBInfo->pRes, "sems session");
|
printDataBlock(pBInfo->pRes, "semi session");
|
||||||
return pBInfo->pRes;
|
return pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||||
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
|
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
|
||||||
pInfo->returnDelete = true;
|
pInfo->returnDelete = true;
|
||||||
printDataBlock(pInfo->pDelRes, "sems session");
|
printDataBlock(pInfo->pDelRes, "semi session");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
printDataBlock(pInfo->pUpdateRes, "sems session");
|
printDataBlock(pInfo->pUpdateRes, "semi session");
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
// semi interval operator clear disk buffer
|
// semi interval operator clear disk buffer
|
||||||
|
@ -4318,13 +4320,14 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
clearSpecialDataBlock(pInfo->pUpdateRes);
|
clearSpecialDataBlock(pInfo->pUpdateRes);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
printDataBlock(pBlock, "semi session recv");
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, 0, pWins);
|
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, START_TS_COLUMN_INDEX, pSup->numOfExprs, 0, pWins);
|
||||||
removeSessionResults(pStUpdated, pWins);
|
removeSessionResults(pStUpdated, pWins);
|
||||||
taosArrayDestroy(pWins);
|
taosArrayDestroy(pWins);
|
||||||
copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex);
|
copyDataBlock(pInfo->pUpdateRes, pBlock);
|
||||||
break;
|
break;
|
||||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) {
|
||||||
// gap must be 0
|
// gap must be 0
|
||||||
|
@ -4364,21 +4367,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows > 0) {
|
if (pBInfo->pRes->info.rows > 0) {
|
||||||
printDataBlock(pBInfo->pRes, "sems session");
|
printDataBlock(pBInfo->pRes, "semi session");
|
||||||
return pBInfo->pRes;
|
return pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
|
||||||
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
|
if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) {
|
||||||
pInfo->returnDelete = true;
|
pInfo->returnDelete = true;
|
||||||
printDataBlock(pInfo->pDelRes, "sems session");
|
printDataBlock(pInfo->pDelRes, "semi session");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->pUpdateRes->info.rows > 0) {
|
if (pInfo->pUpdateRes->info.rows > 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
printDataBlock(pInfo->pUpdateRes, "sems session");
|
printDataBlock(pInfo->pUpdateRes, "semi session");
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4400,8 +4403,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
pOperator->name = "StreamSessionFinalAggOperator";
|
pOperator->name = "StreamSessionFinalAggOperator";
|
||||||
} else {
|
} else {
|
||||||
pInfo->isFinal = false;
|
pInfo->isFinal = false;
|
||||||
pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
|
||||||
pInfo->pUpdateRes->info.type = STREAM_CLEAR;
|
|
||||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
||||||
pOperator->name = "StreamSessionSemiAggOperator";
|
pOperator->name = "StreamSessionSemiAggOperator";
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
|
@ -4616,23 +4618,20 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock,
|
static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock,
|
||||||
int32_t tsIndex, SColumn* pCol, int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
|
SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
|
||||||
SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
SColumnInfoData* pKeyColInfo = taosArrayGet(pBlock->pDataBlock, keyIndex);
|
SColumnInfoData* pGroupColInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
TSKEY* tsCol = (TSKEY*)pTsColInfo->pData;
|
TSKEY* tsCol = (TSKEY*)pTsColInfo->pData;
|
||||||
bool allEqual = false;
|
bool allEqual = false;
|
||||||
int32_t step = 1;
|
int32_t step = 1;
|
||||||
uint64_t groupId = pBlock->info.groupId;
|
uint64_t* gpCol = (uint64_t*) pGroupColInfo->pData;
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
||||||
char* pKeyData = colDataGetData(pKeyColInfo, i);
|
|
||||||
int32_t winIndex = 0;
|
int32_t winIndex = 0;
|
||||||
SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], groupId, &winIndex);
|
SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], gpCol[i], &winIndex);
|
||||||
if (!pCurWin) {
|
if (!pCurWin) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, groupId, pKeyColInfo,
|
updateSessionWindowInfo(&pCurWin->winInfo, tsCol, NULL, 0, pBlock->info.rows, i, 0, NULL);
|
||||||
pBlock->info.rows, i, &allEqual, pSeDeleted);
|
|
||||||
ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData));
|
|
||||||
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
|
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
|
||||||
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
|
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
|
||||||
}
|
}
|
||||||
|
@ -4675,7 +4674,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
pSDataBlock->info.rows, i, &allEqual, pStDeleted);
|
pSDataBlock->info.rows, i, &allEqual, pStDeleted);
|
||||||
if (!allEqual) {
|
if (!allEqual) {
|
||||||
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
|
appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
|
||||||
&groupId);
|
GROUPID_COLUMN_INDEX, &groupId);
|
||||||
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
|
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
|
||||||
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
|
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
|
||||||
continue;
|
continue;
|
||||||
|
@ -4730,8 +4729,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pBlock, "single state recv");
|
printDataBlock(pBlock, "single state recv");
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId,
|
doClearStateWindows(&pInfo->streamAggSup, pBlock, pSeUpdated, pInfo->pSeDeleted);
|
||||||
pSeUpdated, pInfo->pSeDeleted);
|
|
||||||
continue;
|
continue;
|
||||||
} else if (pBlock->info.type == STREAM_DELETE_DATA) {
|
} else if (pBlock->info.type == STREAM_DELETE_DATA) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
|
|
Loading…
Reference in New Issue