format code

This commit is contained in:
liuyao 2023-08-14 15:12:54 +08:00
parent 3043c7f09f
commit 4b435e2cbf
1 changed files with 148 additions and 114 deletions

View File

@ -26,12 +26,12 @@
#include "tlog.h"
#include "ttime.h"
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState"
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState"
#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState"
#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState"
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
@ -456,13 +456,14 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
}
}
bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) { return pStore->streamStateCheck(pState, pKey); }
bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) {
return pStore->streamStateCheck(pState, pKey);
}
int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset,
SAggSupporter* pAggSup, SStateStore* pStore) {
SWinKey key = { .ts = win->skey, .groupId = groupId };
SWinKey key = {.ts = win->skey, .groupId = groupId};
char* value = NULL;
int32_t size = pAggSup->resultRowSize;
@ -479,7 +480,8 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu
return TSDB_CODE_SUCCESS;
}
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, SStateStore* pStore) {
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup,
SStateStore* pStore) {
if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) {
SWinKey key = {.ts = pWin->skey, .groupId = groupId};
if (!hasIntervalWindow(pState, &key, pStore)) {
@ -549,7 +551,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
blockDataUpdateTsWindow(pBlock, 0);
}
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) {
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
int32_t numOfCh, SOperatorInfo* pOperator) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
@ -572,13 +575,13 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S
// pull data is over
taosArrayDestroy(chArray);
taosHashRemove(pMap, &winRes, sizeof(SWinKey));
qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts);
qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts);
void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey));
if (pFinalCh) {
taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey));
doDeleteWindow(pOperator, winRes.ts, winRes.groupId);
STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval);
STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval);
SPullWindowInfo pull = {.window = nextWin,
.groupId = winRes.groupId,
.calWin.skey = nextWin.skey,
@ -617,7 +620,7 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, i
} else {
SArray* chArray = *(void**)chIds;
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
qDebug("===stream===check final retrive %" PRId64",chid:%d", winKey->ts, index);
qDebug("===stream===check final retrive %" PRId64 ",chid:%d", winKey->ts, index);
if (index == -1) {
qDebug("===stream===add final retrive %" PRId64, winKey->ts);
taosHashPut(pInfo->pFinalPullDataMap, winKey, sizeof(SWinKey), NULL, 0);
@ -638,8 +641,8 @@ int32_t getOutputBuf(void* pState, SRowBuffPos* pPos, SResultRow** pResult, SSta
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
@ -775,7 +778,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
}
while (1) {
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_INTERVAL_OP(pOperator)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_INTERVAL_OP(pOperator)) ||
!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
break;
@ -790,7 +794,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
.groupId = groupId,
};
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->statestore) && isClosed && !chIds) {
if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->statestore) && isClosed &&
!chIds) {
SPullWindowInfo pull = {
.window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey};
// add pull data request
@ -910,19 +915,18 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) {
}
static char* getStreamOpName(uint16_t opType) {
switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "interval single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
return "interval final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return "interval semi";
default:
return "";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "interval single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
return "interval final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return "interval semi";
default:
return "";
}
}
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator){
static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
uint16_t opType = pOperator->operatorType;
@ -953,10 +957,10 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator){
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SExprSupp* pSup = &pOperator->exprSupp;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SExprSupp* pSup = &pOperator->exprSupp;
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
@ -1032,7 +1036,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
break;
}
pInfo->numOfDatapack++;
printDataBlock(pBlock, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final recv" : "interval semi recv", GET_TASKID(pTaskInfo));
printDataBlock(pBlock, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final recv" : "interval semi recv",
GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
@ -1056,7 +1061,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pDelRes, IS_FINAL_INTERVAL_OP(pOperator) ? "interval final" : "interval semi",
GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_CLEAR) {
pInfo->pDelRes->info.type = STREAM_CLEAR;
} else {
@ -1077,7 +1083,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
continue;
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_INTERVAL_OP(pOperator)) {
processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->numOfChild, pOperator);
processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins,
pInfo->numOfChild, pOperator);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
@ -1151,8 +1158,8 @@ static int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) {
static int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
int32_t size = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t resSize = getSelectivityBufSize(pSup->pCtx + i);
size = TMAX(size, resSize);
int32_t resSize = getSelectivityBufSize(pSup->pCtx + i);
size = TMAX(size, resSize);
}
return size;
}
@ -1160,11 +1167,12 @@ static int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) {
static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t resSize = sizeof(TSKEY);
pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize);
int32_t resSize = sizeof(TSKEY);
pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize);
}
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
pAPI->stateStore.streamStateCommit(pInfo->pState);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
@ -1175,11 +1183,11 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) {
void streamIntervalReloadState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
int32_t size = 0;
void* pBuf = NULL;
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME,
strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size);
TSKEY ts = *(TSKEY*)pBuf;
TSKEY ts = *(TSKEY*)pBuf;
taosMemoryFree(pBuf);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts);
@ -1227,7 +1235,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -1282,9 +1290,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo));
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
pInfo->pState->pFileState =
pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo));
pInfo->dataVersion = 0;
pInfo->statestore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false;
@ -1392,7 +1401,8 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, SStorageAPI* pApi) {
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, SStorageAPI* pApi) {
pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput);
pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR);
pSup->gap = gap;
@ -1495,7 +1505,8 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) {
int32_t size = 0;
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
int32_t code =
pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1608,7 +1619,8 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
setSessionWinOutputInfo(pStUpdated, pNextWin);
int32_t size = 0;
pNextWin->sessionWin = pCurWin->sessionWin;
int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
int32_t code =
pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pNextWin->pOutputBuf);
SET_SESSION_WIN_INVALID(*pNextWin);
@ -1617,7 +1629,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
}
static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated,
SSHashObj* pStDeleted, bool addGap) {
SSHashObj* pStDeleted, bool addGap) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
@ -1661,7 +1673,8 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
}
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore);
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
&pAggSup->stateStore);
pWinInfo->pOutputBuf = NULL;
return TSDB_CODE_SUCCESS;
}
@ -1693,7 +1706,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
pEndTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
}
TSKEY* endTsCols = (int64_t*)pEndTsCol->pData;
TSKEY* endTsCols = (int64_t*)pEndTsCol->pData;
for (int32_t i = 0; i < rows;) {
if (pInfo->ignoreExpiredData && isOverdue(endTsCols[i], &pInfo->twAggSup)) {
i++;
@ -1837,9 +1850,9 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
}
static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
int32_t size = taosArrayGetSize(pWinArray);
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
@ -1878,7 +1891,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
}
}
num++;
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap);
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
@ -1953,8 +1966,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
pGroupResInfo->pBuf = NULL;
}
void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
SSDataBlock* pBlock) {
void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// set output datablock version
pBlock->info.version = pTaskInfo->version;
@ -2005,12 +2017,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session",
GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo));
printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session",
GET_TASKID(pTaskInfo));
return pBInfo->pRes;
}
@ -2031,7 +2045,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) {
break;
}
printDataBlock(pBlock, IS_FINAL_SESSION_OP(pOperator) ? "final session recv" : "single session recv", GET_TASKID(pTaskInfo));
printDataBlock(pBlock, IS_FINAL_SESSION_OP(pOperator) ? "final session recv" : "single session recv",
GET_TASKID(pTaskInfo));
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
@ -2065,7 +2080,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_SESSION_OP(pOperator), true);
doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_SESSION_OP(pOperator),
true);
if (IS_FINAL_SESSION_OP(pOperator)) {
int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren);
@ -2094,7 +2110,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
tSimpleHashCleanup(pInfo->pStUpdated);
pInfo->pStUpdated = NULL;
if(pInfo->isHistoryOp) {
if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
@ -2109,13 +2125,15 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo));
printDataBlock(pInfo->pDelRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session",
GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildSessionResult(pOperator, pAggSup->pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session", GET_TASKID(pTaskInfo));
printDataBlock(pBInfo->pRes, IS_FINAL_SESSION_OP(pOperator) ? "final session" : "single session",
GET_TASKID(pTaskInfo));
return pBInfo->pRes;
}
@ -2126,8 +2144,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
void streamSessionReleaseState(SOperatorInfo* pOperator) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
@ -2142,16 +2162,16 @@ void resetWinRange(STimeWindow* winRange) {
void streamSessionReloadState(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
SResultWindowInfo winInfo = {0};
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pStUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -2162,7 +2182,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
if (winNum > 0) {
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey, winInfo.sessionWin.groupId);
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, winInfo.sessionWin.win.skey,
winInfo.sessionWin.groupId);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(winInfo, pInfo->pStUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -2216,7 +2237,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
}
code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap,
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI);
pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pTaskInfo->storageAPI);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -2399,7 +2421,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) {
SExecTaskInfo* pTaskInfo, int32_t numOfChild,
SReadHandle* pHandle) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle);
if (pOperator == NULL) {
@ -2503,9 +2526,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pCurWin->winInfo.sessionWin.groupId = groupId;
pCurWin->winInfo.sessionWin.win.skey = ts;
pCurWin->winInfo.sessionWin.win.ekey = ts;
int32_t code =
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize,
compareStateKey, &pCurWin->winInfo.pOutputBuf, &size);
int32_t code = pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
pKeyData, pAggSup->stateKeySize, compareStateKey,
&pCurWin->winInfo.pOutputBuf, &size);
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
@ -2515,10 +2538,11 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) {
code = TSDB_CODE_FAILED;
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf,
&pAggSup->pSessionAPI->stateStore);
pCurWin->winInfo.pOutputBuf = taosMemoryCalloc(1, size);
pCurWin->pStateKey =
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
(SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pCurWin->pStateKey->type = pAggSup->stateKeyType;
pCurWin->pStateKey->pData = (char*)pCurWin->pStateKey + sizeof(SStateKeys);
@ -2526,7 +2550,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pCurWin->winInfo.sessionWin.groupId = groupId;
pCurWin->winInfo.sessionWin.win.skey = ts;
pCurWin->winInfo.sessionWin.win.ekey = ts;
qDebug("===stream===reset state win key. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, pCurWin->winInfo.sessionWin.win.ekey);
qDebug("===stream===reset state win key. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
pCurWin->winInfo.sessionWin.win.ekey);
}
if (code == TSDB_CODE_SUCCESS) {
@ -2541,14 +2566,16 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
}
pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin;
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin);
SStreamStateCur* pCur =
pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pNextWin->winInfo.sessionWin);
int32_t nextSize = pAggSup->resultRowSize;
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, &pNextWin->winInfo.pOutputBuf, &nextSize);
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin,
&pNextWin->winInfo.pOutputBuf, &nextSize);
if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_INVALID(pNextWin->winInfo);
} else {
pNextWin->pStateKey =
(SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
(SStateKeys*)((char*)pNextWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize));
pNextWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys);
pNextWin->pStateKey->type = pAggSup->stateKeyType;
pNextWin->pStateKey->pData = (char*)pNextWin->pStateKey + sizeof(SStateKeys);
@ -2619,7 +2646,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
return;
}
int32_t rows = pSDataBlock->info.rows;
int32_t rows = pSDataBlock->info.rows;
blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
for (int32_t i = 0; i < rows; i += winRows) {
@ -2747,7 +2774,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
tSimpleHashCleanup(pInfo->pSeUpdated);
pInfo->pSeUpdated = NULL;
if(pInfo->isHistoryOp) {
if (pInfo->isHistoryOp) {
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
}
@ -2778,9 +2805,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
void streamStateReleaseState(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
qDebug("===stream=== relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData,
resSize);
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
@ -2789,14 +2818,14 @@ void streamStateReleaseState(SOperatorInfo* pOperator) {
static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
SSHashObj* pStUpdated, SSHashObj* pStDeleted) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SResultRow* pCurResult = NULL;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SResultRow* pCurResult = NULL;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
SResultRow* pWinResult = NULL;
initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
@ -2806,7 +2835,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &pNextWin->sessionWin, sizeof(SSessionKey));
if (pNextWin->isOutput && pStDeleted) {
qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey, pNextWin->sessionWin.groupId);
qDebug("===stream=== save delete window info %" PRId64 ", %" PRIu64, pNextWin->sessionWin.win.skey,
pNextWin->sessionWin.groupId);
saveDeleteRes(pStDeleted, pNextWin->sessionWin);
}
removeSessionResult(pStUpdated, pAggSup->pResultRows, pNextWin->sessionWin);
@ -2816,17 +2846,17 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
void streamStateReloadState(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange);
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
int32_t size = 0;
void* pBuf = NULL;
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME,
strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size);
int32_t num = size / sizeof(SSessionKey);
qDebug("===stream=== reload state. get result count:%d", num);
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf;
ASSERT(size == num * sizeof(SSessionKey));
if (!pInfo->pSeUpdated && num > 0) {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -2840,13 +2870,16 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
SStateWindowInfo curInfo = {0};
SStateWindowInfo nextInfo = {0};
SStateWindowInfo dummy = {0};
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i);
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey,
pSeKeyBuf[i].groupId, i);
setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo);
bool cpRes = compareWinStateKey(curInfo.pStateKey,nextInfo.pStateKey);
qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d", nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes);
bool cpRes = compareWinStateKey(curInfo.pStateKey, nextInfo.pStateKey);
qDebug("===stream=== reload state. next window info %" PRId64 ", %" PRIu64 ", compare:%d",
nextInfo.winInfo.sessionWin.win.skey, nextInfo.winInfo.sessionWin.groupId, cpRes);
if (cpRes) {
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeDeleted);
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, curInfo.winInfo.sessionWin.groupId);
qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey,
curInfo.winInfo.sessionWin.groupId);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
@ -2858,7 +2891,8 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));
}
} else if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) {
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore);
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)nextInfo.winInfo.pOutputBuf,
&pAggSup->pSessionAPI->stateStore);
}
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
@ -2870,7 +2904,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
}
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
@ -2980,7 +3014,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExprSupp* pSup = &pOperator->exprSupp;
@ -3164,8 +3198,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1);
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str,
pInfo->pState, &pTaskInfo->storageAPI.functionStore);
code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState,
&pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -3197,7 +3231,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->numOfDatapack = 0;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
int32_t funResSize= getMaxFunResSize(pSup, numOfCols);
int32_t funResSize = getMaxFunResSize(pSup, numOfCols);
pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,