diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 04f73707c2..769be6268c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -110,7 +110,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo return TSDB_CODE_SUCCESS; } -static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { +static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { int64_t* ts = (int64_t*)pColData->pData; int64_t duration = pWin->ekey - pWin->skey + delta; @@ -419,7 +419,7 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SEx bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType) { if (pInterval->interval != pInterval->sliding && - ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart) )) { + ((pWin->ekey < calStart || pWin->skey > calEnd) || (blockType == STREAM_PULL_DATA && pWin->skey < calStart))) { return false; } @@ -904,8 +904,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } TSKEY ekey = ascScan ? win.ekey : win.skey; - int32_t forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->binfo.inputTsOrder); + int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, + pInfo->binfo.inputTsOrder); // prev time window not interpolation yet. if (pInfo->timeWindowInterpo) { @@ -932,7 +932,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow nextWin = win; while (1) { int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->binfo.inputTsOrder); + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, + pInfo->binfo.inputTsOrder); if (startPos < 0) { break; } @@ -944,8 +945,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } ekey = ascScan ? nextWin.ekey : nextWin.skey; - forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->binfo.inputTsOrder); + forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, + pInfo->binfo.inputTsOrder); // window start(end) key interpolation doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); // TODO: add to open window? how to close the open windows after input blocks exhausted? @@ -1604,7 +1605,8 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate); + pScanInfo->pUpdateInfo = + pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate); } pScanInfo->interval = pInfo->interval; @@ -1632,8 +1634,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); - int32_t code = - initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); + int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, + pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1862,7 +1864,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi if (pStateNode->window.pExprs != NULL) { int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalarExpr); - int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); + int32_t code = + initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2033,13 +2036,14 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } } -bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) { return pStore->streamStateCheck(pState, pKey); } +bool hasIntervalWindow(void* pState, SWinKey* pKey, SStateStore* pStore) { + return pStore->streamStateCheck(pState, pKey); +} int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, SStateStore* pStore) { - - SWinKey key = { .ts = win->skey, .groupId = groupId }; + SWinKey key = {.ts = win->skey, .groupId = groupId}; char* value = NULL; int32_t size = pAggSup->resultRowSize; @@ -2056,7 +2060,8 @@ int32_t setIntervalOutputBuf(void* pState, STimeWindow* win, SRowBuffPos** pResu return TSDB_CODE_SUCCESS; } -bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, SStateStore* pStore) { +bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, void* pState, STimeWindowAggSupp* pTwSup, + SStateStore* pStore) { if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; if (!hasIntervalWindow(pState, &key, pStore)) { @@ -2126,7 +2131,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB blockDataUpdateTsWindow(pBlock, 0); } -void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) { +void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, + int32_t numOfCh, SOperatorInfo* pOperator) { SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); TSKEY* tsData = (TSKEY*)pStartCol->pData; SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); @@ -2149,13 +2155,13 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S // pull data is over taosArrayDestroy(chArray); taosHashRemove(pMap, &winRes, sizeof(SWinKey)); - qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts); + qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts); void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey)); if (pFinalCh) { taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey)); doDeleteWindow(pOperator, winRes.ts, winRes.groupId); - STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval); + STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval); SPullWindowInfo pull = {.window = nextWin, .groupId = winRes.groupId, .calWin.skey = nextWin.skey, @@ -2194,7 +2200,7 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, i } else { SArray* chArray = *(void**)chIds; int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); - qDebug("===stream===check final retrive %" PRId64",chid:%d", winKey->ts, index); + qDebug("===stream===check final retrive %" PRId64 ",chid:%d", winKey->ts, index); if (index == -1) { qDebug("===stream===add final retrive %" PRId64, winKey->ts); taosHashPut(pInfo->pFinalPullDataMap, winKey, sizeof(SWinKey), NULL, 0); @@ -2215,8 +2221,8 @@ int32_t getOutputBuf(void* pState, SRowBuffPos* pPos, SResultRow** pResult, SSta int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; @@ -2347,7 +2353,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } while (1) { bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup); - if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_OP(pInfo)) || !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { + if ((pInfo->ignoreExpiredData && isClosed && !IS_FINAL_OP(pInfo)) || + !inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { break; @@ -2362,7 +2369,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .groupId = groupId, }; void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); - if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed && !chIds) { + if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed && + !chIds) { SPullWindowInfo pull = { .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request @@ -2481,57 +2489,57 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) { } } -int32_t encodeSWinKey(void **buf, SWinKey* key) { +int32_t encodeSWinKey(void** buf, SWinKey* key) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, key->ts); tlen += taosEncodeFixedU64(buf, key->groupId); return tlen; } -void* decodeSWinKey(void *buf, SWinKey* key) { +void* decodeSWinKey(void* buf, SWinKey* key) { buf = taosDecodeFixedI64(buf, &key->ts); buf = taosDecodeFixedU64(buf, &key->groupId); return buf; } -int32_t encodeSRowBuffPos(void **buf, SRowBuffPos* pos) { +int32_t encodeSRowBuffPos(void** buf, SRowBuffPos* pos) { int32_t tlen = 0; tlen += encodeSWinKey(buf, pos->pKey); return tlen; } -void* decodeSRowBuffPos(void *buf, SRowBuffPos* pos) { +void* decodeSRowBuffPos(void* buf, SRowBuffPos* pos) { buf = decodeSWinKey(buf, pos->pKey); return buf; } -int32_t encodeSTimeWindowAggSupp(void **buf, STimeWindowAggSupp* pTwAggSup) { +int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pTwAggSup->minTs); tlen += taosEncodeFixedI64(buf, pTwAggSup->maxTs); return tlen; } -void* decodeSTimeWindowAggSupp(void *buf, STimeWindowAggSupp* pTwAggSup) { +void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup) { buf = taosDecodeFixedI64(buf, &pTwAggSup->minTs); buf = taosDecodeFixedI64(buf, &pTwAggSup->maxTs); return buf; } -int32_t encodeSTimeWindow(void **buf, STimeWindow* pWin) { +int32_t encodeSTimeWindow(void** buf, STimeWindow* pWin) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pWin->skey); tlen += taosEncodeFixedI64(buf, pWin->ekey); return tlen; } -void* decodeSTimeWindow(void *buf, STimeWindow* pWin) { +void* decodeSTimeWindow(void* buf, STimeWindow* pWin) { buf = taosDecodeFixedI64(buf, &pWin->skey); buf = taosDecodeFixedI64(buf, &pWin->ekey); return buf; } -int32_t encodeSPullWindowInfo(void **buf, SPullWindowInfo* pPullInfo) { +int32_t encodeSPullWindowInfo(void** buf, SPullWindowInfo* pPullInfo) { int32_t tlen = 0; tlen += encodeSTimeWindow(buf, &pPullInfo->calWin); tlen += taosEncodeFixedU64(buf, pPullInfo->groupId); @@ -2539,14 +2547,14 @@ int32_t encodeSPullWindowInfo(void **buf, SPullWindowInfo* pPullInfo) { return tlen; } -void* decodeSPullWindowInfo(void *buf, SPullWindowInfo* pPullInfo) { +void* decodeSPullWindowInfo(void* buf, SPullWindowInfo* pPullInfo) { buf = decodeSTimeWindow(buf, &pPullInfo->calWin); buf = taosDecodeFixedU64(buf, &pPullInfo->groupId); buf = decodeSTimeWindow(buf, &pPullInfo->window); return buf; } -int32_t encodeSPullWindowInfoArray(void **buf, SArray* pPullInfos) { +int32_t encodeSPullWindowInfoArray(void** buf, SArray* pPullInfos) { int32_t tlen = 0; int32_t size = taosArrayGetSize(pPullInfos); tlen += taosEncodeFixedI32(buf, size); @@ -2557,7 +2565,7 @@ int32_t encodeSPullWindowInfoArray(void **buf, SArray* pPullInfos) { return tlen; } -void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) { +void* decodeSPullWindowInfoArray(void* buf, SArray* pPullInfos) { int32_t size = 0; buf = taosDecodeFixedI32(buf, &size); for (int32_t i = 0; i < size; i++) { @@ -2568,7 +2576,7 @@ void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) { return buf; } -int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator) { +int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return 0; @@ -2580,11 +2588,11 @@ int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pO int32_t tlen = 0; int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable); tlen += taosEncodeFixedI32(buf, mapSize); - void *pIte = NULL; - size_t keyLen = 0; + void* pIte = NULL; + size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) { - void *key = taosHashGetKey(pIte, &keyLen); + void* key = taosHashGetKey(pIte, &keyLen); tlen += encodeSWinKey(buf, key); SRowBuffPos* pPos = *(void**)pIte; tlen += encodeSRowBuffPos(buf, pPos); @@ -2599,7 +2607,7 @@ int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pO pIte = NULL; keyLen = 0; while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) { - void *key = taosHashGetKey(pIte, &keyLen); + void* key = taosHashGetKey(pIte, &keyLen); tlen += encodeSWinKey(buf, key); SArray* pArray = (SArray*)pIte; int32_t chSize = taosArrayGetSize(pArray); @@ -2628,16 +2636,16 @@ int32_t doStreamIntervalEncodeOpState(void **buf, int32_t len, SOperatorInfo* pO } void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) { - SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SStreamIntervalOperatorInfo* pInfo = pOperator->info; if (!pInfo) { - return ; + return; } // 6.checksum int32_t dataLen = len - sizeof(uint32_t); - void* pCksum = POINTER_SHIFT(buf, dataLen); + void* pCksum = POINTER_SHIFT(buf, dataLen); if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { - ASSERT(0); // debug + ASSERT(0); // debug qError("stream interval state is invalid"); return; } @@ -2646,7 +2654,7 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { - SWinKey key = {0}; + SWinKey key = {0}; SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey)); buf = decodeSWinKey(buf, &key); @@ -2695,10 +2703,10 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - SExprSupp* pSup = &pOperator->exprSupp; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + SExprSupp* pSup = &pOperator->exprSupp; qDebug("interval status %d %s", pOperator->status, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi"); @@ -2937,8 +2945,8 @@ int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) { int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { int32_t size = 0; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t resSize = getSelectivityBufSize(pSup->pCtx + i); - size = TMAX(size, resSize); + int32_t resSize = getSelectivityBufSize(pSup->pCtx + i); + size = TMAX(size, resSize); } return size; } @@ -2946,11 +2954,12 @@ int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { void streamIntervalReleaseState(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - int32_t resSize = sizeof(TSKEY); - pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); + int32_t resSize = sizeof(TSKEY); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, + strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); } SStreamIntervalOperatorInfo* pInfo = pOperator->info; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; pAPI->stateStore.streamStateCommit(pInfo->pState); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { @@ -2961,11 +2970,11 @@ void streamIntervalReleaseState(SOperatorInfo* pOperator) { void streamIntervalReloadState(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - int32_t size = 0; - void* pBuf = NULL; + int32_t size = 0; + void* pBuf = NULL; int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size); - TSKEY ts = *(TSKEY*)pBuf; + TSKEY ts = *(TSKEY*)pBuf; taosMemoryFree(pBuf); pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts); } @@ -2976,7 +2985,8 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, + SReadHandle* pHandle) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -3009,7 +3019,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); + int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3066,7 +3076,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); + int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); @@ -3094,7 +3104,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, // for stream void* buff = NULL; int32_t len = 0; - int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); + int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, + strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { doStreamIntervalDecodeOpState(buff, len, pOperator); taosMemoryFree(buff); @@ -3181,13 +3192,15 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->pState = pAggSup->pState; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, pScanInfo->igCheckUpdate); + pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, + pScanInfo->igCheckUpdate); } pScanInfo->twAggSup = *pTwSup; } int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, int64_t gap, - SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, SStorageAPI* pApi) { + SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, + SReadHandle* pHandle, SStorageAPI* pApi) { pSup->resultRowSize = keySize + getResultRowSize(pCtx, numOfOutput); pSup->pScanBlock = createSpecialDataBlock(STREAM_CLEAR); pSup->gap = gap; @@ -3297,7 +3310,8 @@ void setSessionOutputBuf(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT int32_t getSessionWinBuf(SStreamAggSupporter* pAggSup, SStreamStateCur* pCur, SResultWindowInfo* pWinInfo) { int32_t size = 0; - int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size); + int32_t code = + pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pWinInfo->sessionWin, &pWinInfo->pOutputBuf, &size); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3410,7 +3424,8 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* setSessionWinOutputInfo(pStUpdated, pNextWin); int32_t size = 0; pNextWin->sessionWin = pCurWin->sessionWin; - int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); + int32_t code = + pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pNextWin->pOutputBuf); SET_SESSION_WIN_INVALID(*pNextWin); @@ -3420,9 +3435,9 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, SSHashObj* pStDeleted) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SResultRow* pCurResult = NULL; @@ -3460,7 +3475,8 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC } int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { - saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); + saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, + &pAggSup->stateStore); return TSDB_CODE_SUCCESS; } @@ -3631,9 +3647,9 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo } static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pStUpdated) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; int32_t size = taosArrayGetSize(pWinArray); SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -3672,7 +3688,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS } } num++; - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap); initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput, pChild->exprSupp.rowEntryInfoOffset); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); @@ -3747,8 +3763,7 @@ void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL pGroupResInfo->pBuf = NULL; } -void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, - SSDataBlock* pBlock) { +void doBuildSessionResult(SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, SSDataBlock* pBlock) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; // set output datablock version pBlock->info.version = pTaskInfo->version; @@ -3787,20 +3802,20 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { } } -int32_t encodeSSessionKey(void **buf, SSessionKey* key) { +int32_t encodeSSessionKey(void** buf, SSessionKey* key) { int32_t tlen = 0; tlen += encodeSTimeWindow(buf, &key->win); tlen += taosEncodeFixedU64(buf, key->groupId); return tlen; } -void* decodeSSessionKey(void *buf, SSessionKey* key) { +void* decodeSSessionKey(void* buf, SSessionKey* key) { buf = decodeSTimeWindow(buf, &key->win); buf = taosDecodeFixedU64(buf, &key->groupId); return buf; } -int32_t encodeSResultWindowInfo(void **buf, SResultWindowInfo* key, int32_t outLen) { +int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) { int32_t tlen = 0; tlen += taosEncodeFixedBool(buf, key->isOutput); tlen += taosEncodeBinary(buf, key->pOutputBuf, outLen); @@ -3808,14 +3823,14 @@ int32_t encodeSResultWindowInfo(void **buf, SResultWindowInfo* key, int32_t outL return tlen; } -void* decodeSResultWindowInfo(void *buf, SResultWindowInfo* key, int32_t outLen) { +void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) { buf = taosDecodeFixedBool(buf, &key->isOutput); buf = taosDecodeBinary(buf, &key->pOutputBuf, outLen); buf = decodeSSessionKey(buf, &key->sessionWin); return buf; } -int32_t doStreamSessionEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { +int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return 0; @@ -3827,11 +3842,11 @@ int32_t doStreamSessionEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOp int32_t tlen = 0; int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); tlen += taosEncodeFixedI32(buf, mapSize); - void *pIte = NULL; - size_t keyLen = 0; + void* pIte = NULL; + size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { - void *key = taosHashGetKey(pIte, &keyLen); + void* key = taosHashGetKey(pIte, &keyLen); tlen += encodeSSessionKey(buf, key); tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); } @@ -3872,9 +3887,9 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera // 5.checksum if (isParent) { int32_t dataLen = len - sizeof(uint32_t); - void* pCksum = POINTER_SHIFT(buf, dataLen); + void* pCksum = POINTER_SHIFT(buf, dataLen); if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { - ASSERT(0); // debug + ASSERT(0); // debug qError("stream interval state is invalid"); return buf; } @@ -3884,7 +3899,7 @@ void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { - SSessionKey key = {0}; + SSessionKey key = {0}; SResultWindowInfo winfo = {0}; buf = decodeSSessionKey(buf, &key); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); @@ -3916,6 +3931,7 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); } static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { @@ -4028,7 +4044,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); tSimpleHashCleanup(pInfo->pStUpdated); pInfo->pStUpdated = NULL; - if(pInfo->isHistoryOp) { + if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); } initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); @@ -4066,8 +4082,10 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { void streamSessionReleaseState(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, + resSize); } SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { @@ -4082,16 +4100,16 @@ void resetWinRange(STimeWindow* winRange) { void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; resetWinRange(&pAggSup->winRange); SResultWindowInfo winInfo = {0}; - int32_t size = 0; - void* pBuf = NULL; - int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, - strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); - int32_t num = size / sizeof(SSessionKey); - SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; ASSERT(size == num * sizeof(SSessionKey)); for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; @@ -4103,7 +4121,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); - } + } } SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, @@ -4138,7 +4156,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh } code = initStreamAggSupporter(&pInfo->streamAggSup, pSup->pCtx, numOfCols, pSessionNode->gap, - pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pTaskInfo->storageAPI); + pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, + &pTaskInfo->storageAPI); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4183,7 +4202,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh // for stream void* buff = NULL; int32_t len = 0; - int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); + int32_t res = + pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, + strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { doStreamSessionDecodeOpState(buff, len, pOperator, true); taosMemoryFree(buff); @@ -4346,14 +4367,15 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, + SReadHandle* pHandle) { int32_t code = TSDB_CODE_OUT_OF_MEMORY; SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle); if (pOperator == NULL) { goto _error; } - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION); @@ -4444,9 +4466,9 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pCurWin->winInfo.sessionWin.groupId = groupId; pCurWin->winInfo.sessionWin.win.skey = ts; pCurWin->winInfo.sessionWin.win.ekey = ts; - int32_t code = - pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, pKeyData, pAggSup->stateKeySize, - compareStateKey, &pCurWin->winInfo.pOutputBuf, &size); + int32_t code = pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, + pKeyData, pAggSup->stateKeySize, compareStateKey, + &pCurWin->winInfo.pOutputBuf, &size); pCurWin->pStateKey = (SStateKeys*)((char*)pCurWin->winInfo.pOutputBuf + (pAggSup->resultRowSize - pAggSup->stateKeySize)); pCurWin->pStateKey->bytes = pAggSup->stateKeySize - sizeof(SStateKeys); @@ -4456,7 +4478,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &pCurWin->winInfo.sessionWin.win)) { code = TSDB_CODE_FAILED; - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)pCurWin->winInfo.pOutputBuf, + &pAggSup->pSessionAPI->stateStore); pCurWin->winInfo.pOutputBuf = taosMemoryMalloc(size); } @@ -4472,7 +4495,8 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pNextWin->winInfo.sessionWin = pCurWin->winInfo.sessionWin; pNextWin->winInfo.pOutputBuf = NULL; - SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); + SStreamStateCur* pCur = + pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &pNextWin->winInfo.sessionWin, NULL, 0); if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_INVALID(pNextWin->winInfo); @@ -4585,7 +4609,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } } -int32_t doStreamStateEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { +int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; if (!pInfo) { return 0; @@ -4597,11 +4621,11 @@ int32_t doStreamStateEncodeOpState(void **buf, int32_t len, SOperatorInfo* pOper int32_t tlen = 0; int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); tlen += taosEncodeFixedI32(buf, mapSize); - void *pIte = NULL; - size_t keyLen = 0; + void* pIte = NULL; + size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { - void *key = taosHashGetKey(pIte, &keyLen); + void* key = taosHashGetKey(pIte, &keyLen); tlen += encodeSSessionKey(buf, key); tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); } @@ -4642,9 +4666,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato // 5.checksum if (isParent) { int32_t dataLen = len - sizeof(uint32_t); - void* pCksum = POINTER_SHIFT(buf, dataLen); + void* pCksum = POINTER_SHIFT(buf, dataLen); if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { - ASSERT(0); // debug + ASSERT(0); // debug qError("stream interval state is invalid"); return buf; } @@ -4654,7 +4678,7 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { - SSessionKey key = {0}; + SSessionKey key = {0}; SResultWindowInfo winfo = {0}; buf = decodeSSessionKey(buf, &key); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); @@ -4768,7 +4792,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { tSimpleHashCleanup(pInfo->pSeUpdated); pInfo->pSeUpdated = NULL; - if(pInfo->isHistoryOp) { + if (pInfo->isHistoryOp) { getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); } @@ -4799,8 +4823,10 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { void streamStateReleaseState(SOperatorInfo* pOperator) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, resSize); + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_STATE_NAME, + strlen(STREAM_STATE_OP_STATE_NAME), pInfo->historyWins->pData, + resSize); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -4809,14 +4835,14 @@ void streamStateReleaseState(SOperatorInfo* pOperator) { static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin, SSHashObj* pStUpdated, SSHashObj* pStDeleted) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - SResultRow* pCurResult = NULL; - int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; initSessionOutputBuf(pCurWin, &pCurResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); SResultRow* pWinResult = NULL; initSessionOutputBuf(pNextWin, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); @@ -4835,22 +4861,22 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur void streamStateReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; resetWinRange(&pAggSup->winRange); - SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; - int32_t size = 0; - void* pBuf = NULL; - int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME, - strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); - int32_t num = size / sizeof(SSessionKey); - SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME, + strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; ASSERT(size == num * sizeof(SSessionKey)); for (int32_t i = 0; i < num; i++) { SStateWindowInfo curInfo = {0}; SStateWindowInfo nextInfo = {0}; setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); - if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { + if (compareStateKey(curInfo.pStateKey, nextInfo.pStateKey)) { compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pStUpdated, pInfo->pStDeleted); } } @@ -4859,7 +4885,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { downstream->fpSet.reloadStreamStateFn(downstream); - } + } } SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, @@ -4936,7 +4962,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys // for stream void* buff = NULL; int32_t len = 0; - int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len); + int32_t res = + pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, + strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { doStreamStateDecodeOpState(buff, len, pOperator, true); taosMemoryFree(buff); @@ -5322,8 +5350,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* } TSKEY ekey = ascScan ? win.ekey : win.skey; - int32_t forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->binfo.inputTsOrder); + int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, + iaInfo->binfo.inputTsOrder); ASSERT(forwardRows > 0); // prev time window not interpolation yet. @@ -5353,8 +5381,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* STimeWindow nextWin = win; while (1) { int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = - getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->binfo.inputTsOrder); + startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, + iaInfo->binfo.inputTsOrder); if (startPos < 0) { break; } @@ -5368,8 +5396,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* } ekey = ascScan ? nextWin.ekey : nextWin.skey; - forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->binfo.inputTsOrder); + forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, + iaInfo->binfo.inputTsOrder); // window start(end) key interpolation doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); @@ -5536,7 +5564,7 @@ _error: static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SExprSupp* pSup = &pOperator->exprSupp; @@ -5706,13 +5734,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, }; - pInfo->twAggSup = (STimeWindowAggSupp){ - .waterMark = pIntervalPhyNode->window.watermark, - .calTrigger = pIntervalPhyNode->window.triggerType, - .maxTs = INT64_MIN, - .minTs = INT64_MAX, - .deleteMark = getDeleteMark(pIntervalPhyNode) - }; + pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark, + .calTrigger = pIntervalPhyNode->window.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + .deleteMark = getDeleteMark(pIntervalPhyNode)}; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); @@ -5735,8 +5761,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pInfo->pState, &pTaskInfo->storageAPI.functionStore); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, + &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -5770,7 +5796,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - int32_t funResSize= getMaxFunResSize(pSup, numOfCols); + int32_t funResSize = getMaxFunResSize(pSup, numOfCols); pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, @@ -5789,7 +5815,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys // for stream void* buff = NULL; int32_t len = 0; - int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); + int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, + strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { doStreamIntervalDecodeOpState(buff, len, pOperator); taosMemoryFree(buff);