feat(stream): ignore close window

This commit is contained in:
54liuyao 2022-06-29 15:13:56 +08:00
parent c2822a6449
commit 5e0cbdf0b7
3 changed files with 89 additions and 39 deletions

View File

@ -1662,8 +1662,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
int32_t len = 0; int32_t len = 0;
len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|\n", flag, len += snprintf(dumpBuf + len, size - len, "\n%s |block type %d |child id %d|group id %lu|\n", flag,
(int32_t)pDataBlock->info.type, pDataBlock->info.childId); (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) { for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag); len += snprintf(dumpBuf + len, size - len, "%s |", flag);
for (int32_t k = 0; k < colNum; k++) { for (int32_t k = 0; k < colNum; k++) {

View File

@ -416,6 +416,7 @@ typedef struct SIntervalAggOperatorInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
bool invertible; bool invertible;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation. SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
bool ignoreCloseWindow;
} SIntervalAggOperatorInfo; } SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo { typedef struct SStreamFinalIntervalOperatorInfo {
@ -437,6 +438,7 @@ typedef struct SStreamFinalIntervalOperatorInfo {
SArray* pPullWins; // SPullWindowInfo SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex; int32_t pullIndex;
SSDataBlock* pPullDataRes; SSDataBlock* pPullDataRes;
bool ignoreCloseWindow;
} SStreamFinalIntervalOperatorInfo; } SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
@ -574,6 +576,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SArray* pChildren; // cache for children's result; final stream operator SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal; bool isFinal;
bool ignoreCloseWindow;
} SStreamSessionAggOperatorInfo; } SStreamSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo { typedef struct STimeSliceOperatorInfo {
@ -617,6 +620,7 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator; void* pDelIterator;
SArray* pScanWindow; SArray* pScanWindow;
SArray* pChildren; // cache for children's result; SArray* pChildren; // cache for children's result;
bool ignoreCloseWindow;
} SStreamStateAggOperatorInfo; } SStreamStateAggOperatorInfo;
typedef struct SSortedMergeOperatorInfo { typedef struct SSortedMergeOperatorInfo {

View File

@ -813,6 +813,16 @@ static void removeResults(SArray* pWins, SArray* pUpdated) {
} }
} }
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) {
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0);
return pSup->maxTs != INT64_MIN && ts < pSup->maxTs - pSup->waterMark;
}
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) {
return isOverdue(pWin->ekey, pSup);
}
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag, SArray* pUpdated) { int32_t scanFlag, SArray* pUpdated) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
@ -830,15 +840,16 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win); pInfo->interval.precision, &pInfo->win);
int32_t ret = TSDB_CODE_SUCCESS;
if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) {
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM &&
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
} }
} }
@ -864,9 +875,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); if (!pInfo->ignoreCloseWindow || !isCloseWindow(&win, &pInfo->twAggSup)) {
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
pBlock->info.rows, numOfOutput, pInfo->order); doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, pInfo->order);
}
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
@ -877,6 +890,12 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
if (pInfo->ignoreCloseWindow && isCloseWindow(&nextWin, &pInfo->twAggSup)) {
ekey = ascScan ? nextWin.ekey : nextWin.skey;
forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
continue;
}
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
@ -885,10 +904,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM &&
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
}
} }
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
@ -1292,11 +1310,6 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup) {
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0);
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
}
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins) { SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins) {
void* pIte = NULL; void* pIte = NULL;
@ -1411,7 +1424,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
printDataBlock(pInfo->binfo.pRes, "single interval");
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
@ -1521,6 +1534,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
pInfo->ignoreCloseWindow = false;
if (pPhyNode->window.pExprs != NULL) { if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;
@ -2276,7 +2290,15 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, NULL); pInfo->interval.precision, NULL);
while (1) { while (1) {
if (IS_FINAL_OP(pInfo) && isCloseWindow(&nextWin, &pInfo->twAggSup) && pInfo->pChildren) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if (pInfo->ignoreCloseWindow && isClosed) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
}
continue;
}
if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
bool ignore = true; bool ignore = true;
SWinRes winRes = {.ts = nextWin.skey, .groupId = tableGroupId,}; SWinRes winRes = {.ts = nextWin.skey, .groupId = tableGroupId,};
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes)); void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
@ -2684,6 +2706,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createPullDataBlock(); pInfo->pPullDataRes = createPullDataBlock();
pInfo->ignoreCloseWindow = false;
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
pOperator->blocking = true; pOperator->blocking = true;
@ -2830,6 +2853,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->isFinal = false; pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode; pInfo->pPhyNode = pPhyNode;
pInfo->ignoreCloseWindow = false;
pOperator->name = "StreamSessionWindowAggOperator"; pOperator->name = "StreamSessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
@ -3007,6 +3031,9 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre
updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false); updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false);
doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols,
pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC); pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC);
SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pCurWin->pos.pageId);
setBufPageDirty(bufPage, true);
releaseBufPage(pAggSup->pResultBuf, bufPage);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3063,7 +3090,13 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
pWinInfo->isOutput = false; pWinInfo->isOutput = false;
} }
taosArrayRemove(pInfo->streamAggSup.pCurWins, i); taosArrayRemove(pInfo->streamAggSup.pCurWins, i);
SFilePage* tmpPage = getBufPage(pInfo->streamAggSup.pResultBuf, pWinInfo->pos.pageId);
releaseBufPage(pInfo->streamAggSup.pResultBuf, tmpPage);
} }
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pCurWin->pos.pageId);
ASSERT(num > 0);
setBufPageDirty(bufPage, true);
releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage);
} }
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated, static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SHashObj* pStUpdated,
@ -3083,22 +3116,23 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t winRows = 0; int32_t winRows = 0;
if (pSDataBlock->pDataBlock != NULL) { ASSERT(pSDataBlock->pDataBlock);
SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
startTsCols = (int64_t*)pStartTsCol->pData; startTsCols = (int64_t*)pStartTsCol->pData;
SColumnInfoData* pEndTsCol = NULL; SColumnInfoData* pEndTsCol = NULL;
if (hasEndTs) { if (hasEndTs) {
pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex); pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->endTsIndex);
} else {
pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
}
endTsCols = (int64_t*)pEndTsCol->pData;
} else { } else {
return; pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
} }
endTsCols = (int64_t*)pEndTsCol->pData;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
for (int32_t i = 0; i < pSDataBlock->info.rows;) { for (int32_t i = 0; i < pSDataBlock->info.rows;) {
if (pInfo->ignoreCloseWindow && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
i++;
continue;
}
int32_t winIndex = 0; int32_t winIndex = 0;
SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex); SResultWindowInfo* pCurWin = getSessionTimeWindow(pAggSup, startTsCols[i], endTsCols[i], groupId, gap, &winIndex);
winRows = winRows =
@ -3205,17 +3239,24 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
index = 0; index = 0;
} }
for (int32_t k = index; k < chWinSize; k++) { for (int32_t k = index; k < chWinSize; k++) {
SResultWindowInfo* pcw = taosArrayGet(pChWins, k); SResultWindowInfo* pChWin = taosArrayGet(pChWins, k);
if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { if (pParentWin->win.skey <= pChWin->win.skey && pChWin->win.ekey <= pParentWin->win.ekey) {
SResultRow* pChResult = NULL; SResultRow* pChResult = NULL;
setWindowOutputBuf(pcw, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput, setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo); pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo);
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pChWin->pos.pageId);
setBufPageDirty(bufPage, true);
releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage);
continue; continue;
} }
break; break;
} }
} }
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId);
ASSERT(size > 0);
setBufPageDirty(bufPage, true);
releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage);
} }
} }
@ -3234,7 +3275,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
void* pWin = taosArrayGet(pWins, i); void* pWin = taosArrayGet(pWins, i);
SResultWindowInfo* pSeWin = fn(pWin); SResultWindowInfo* pSeWin = fn(pWin);
if (pSeWin->win.ekey < pTwSup->maxTs - pTwSup->waterMark) { if (isCloseWindow(&pSeWin->win, pTwSup)) {
if (!pSeWin->isClosed) { if (!pSeWin->isClosed) {
pSeWin->isClosed = true; pSeWin->isClosed = true;
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -3745,6 +3786,10 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId); SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) { for (int32_t i = 0; i < pSDataBlock->info.rows; i += winRows) {
if (pInfo->ignoreCloseWindow && isOverdue(tsCols[i], &pInfo->twAggSup)) {
i++;
continue;
}
char* pKeyData = colDataGetData(pKeyColInfo, i); char* pKeyData = colDataGetData(pKeyColInfo, i);
int32_t winIndex = 0; int32_t winIndex = 0;
bool allEqual = true; bool allEqual = true;
@ -3895,6 +3940,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes->info.type = STREAM_DELETE; pInfo->pDelRes->info.type = STREAM_DELETE;
blockDataEnsureCapacity(pInfo->pDelRes, 64); blockDataEnsureCapacity(pInfo->pDelRes, 64);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->ignoreCloseWindow = false;
pOperator->name = "StreamStateAggOperator"; pOperator->name = "StreamStateAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;