From 3ff02561454c6a207df3b504eaae5632724ea257 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 30 Jul 2024 16:06:40 +0800 Subject: [PATCH] steam interp --- include/libs/nodes/plannodes.h | 5 +- include/libs/stream/tstreamFileState.h | 3 +- source/libs/executor/inc/executorInt.h | 102 +-- source/libs/executor/src/scanoperator.c | 119 +++- .../executor/src/streamtimesliceoperator.c | 640 ++++++++++++++---- source/libs/nodes/src/nodesCodeFuncs.c | 2 +- source/libs/planner/src/planLogicCreater.c | 2 + source/libs/planner/src/planPhysiCreater.c | 2 + source/libs/stream/src/streamSliceState.c | 38 +- source/libs/stream/src/streamState.c | 4 + source/libs/stream/src/tstreamFileState.c | 2 +- 11 files changed, 723 insertions(+), 196 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 509710707d..308dee14dd 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -208,6 +208,8 @@ typedef struct SInterpFuncLogicNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + char intervalUnit; + int8_t precision; EFillMode fillMode; SNode* pFillValues; // SNodeListNode SNode* pTimeSeries; // SColumnNode @@ -511,7 +513,8 @@ typedef struct SInterpFuncPhysiNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; - int8_t intervalUnit; + char intervalUnit; + int8_t precision; EFillMode fillMode; SNode* pFillValues; // SNodeListNode SNode* pTimeSeries; // SColumnNode diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 67e11e67f4..b19a788990 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -123,7 +123,7 @@ int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyL int32_t* pWinCode); int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); -// fill +// time slice int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen); int32_t hashSortFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen); int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey); @@ -133,6 +133,7 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen); int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId); +void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 7e0710b05a..00986674fb 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -828,6 +828,7 @@ typedef struct SStreamTimeSliceOperatorInfo { bool destHasPrimaryKey; SArray* historyPoints; SArray* pUpdated; // SWinKey + SArray* historyWins; SSHashObj* pUpdatedMap; int32_t delIndex; SArray* pDelWins; // SWinKey @@ -835,6 +836,7 @@ typedef struct SStreamTimeSliceOperatorInfo { uint64_t numOfDatapack; SGroupResInfo groupResInfo; bool ignoreNull; + bool isHistoryOp; } SStreamTimeSliceOperatorInfo; #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) @@ -952,56 +954,56 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset); void doClearBufferedBlocks(SStreamScanInfo* pInfo); -void streamOpReleaseState(struct SOperatorInfo* pOperator); -void streamOpReloadState(struct SOperatorInfo* pOperator); -void destroyStreamAggSupporter(SStreamAggSupporter* pSup); -void clearGroupResInfo(SGroupResInfo* pGroupResInfo); -int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SFunctionStateStore* pStore); -int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, - SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, - SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi, int32_t tsIndex); -int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, - int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic); -int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); -void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); -void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey); -int32_t deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, - SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd); -int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated); -int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed); -int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar); -int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2); -void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins); -int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult, - int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput, - struct SOperatorInfo* pOperator, int64_t winDelta); -void setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo); -int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo); -int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated); -int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key); -void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey); -void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite); -void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, - SSDataBlock* pBlock); -int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo); -void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin, - SResultWindowInfo* pNextWin); -int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, - SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin, - SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap); -void releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); -void resetWinRange(STimeWindow* winRange); -bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, - TSKEY ts, void* pPkVal, int32_t len); -int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); -void resetUnCloseSessionWinInfo(SSHashObj* winMap); -void setStreamOperatorCompleted(struct SOperatorInfo* pOperator); -void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup); -void destroyFlusedPos(void* pRes); -bool isIrowtsPseudoColumn(SExprInfo* pExprInfo); -bool isIsfilledPseudoColumn(SExprInfo* pExprInfo); +void streamOpReleaseState(struct SOperatorInfo* pOperator); +void streamOpReloadState(struct SOperatorInfo* pOperator); +void destroyStreamAggSupporter(SStreamAggSupporter* pSup); +void clearGroupResInfo(SGroupResInfo* pGroupResInfo); +int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SFunctionStateStore* pStore); +int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, + SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, + SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, + SStorageAPI* pApi, int32_t tsIndex); +int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, + int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic); +int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); +void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); +void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey); +int32_t deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate, + SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd); +int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated); +int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed); +int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar); +int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2); +void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins); +int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult, + int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput, + struct SOperatorInfo* pOperator, int64_t winDelta); +void setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo); +int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo); +int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated); +int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key); +void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey); +void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite); +void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo, + SSDataBlock* pBlock); +int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo); +void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin, + SResultWindowInfo* pNextWin); +int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup, + SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin, + SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap); +void releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI); +void resetWinRange(STimeWindow* winRange); +bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, + TSKEY ts, void* pPkVal, int32_t len); +int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); +void resetUnCloseSessionWinInfo(SSHashObj* winMap); +void setStreamOperatorCompleted(struct SOperatorInfo* pOperator); +void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup); +void destroyFlusedPos(void* pRes); +bool isIrowtsPseudoColumn(SExprInfo* pExprInfo); +bool isIsfilledPseudoColumn(SExprInfo* pExprInfo); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a7ddd8f97a..61eda480b3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2065,9 +2065,117 @@ _end: return code; } -static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) { - // todo(liuyao) add code 获取delete range的左邻居和右邻居,作为range - return TSDB_CODE_SUCCESS; +void getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TSKEY start, TSKEY end, int64_t groupId, + STimeWindow* pScanRange, STimeWindow* pDelRange) { + int32_t code = TSDB_CODE_SUCCESS; + SResultRowInfo dumyInfo = {0}; + dumyInfo.cur.pageId = -1; + STimeWindow sWin = getActiveTimeWindow(NULL, &dumyInfo, start, pInterval, TSDB_ORDER_ASC); + SWinKey startKey = {.groupId = groupId, .ts = sWin.skey}; + pDelRange->skey = sWin.skey; + + sWin = getActiveTimeWindow(NULL, &dumyInfo, end, pInterval, TSDB_ORDER_ASC); + SWinKey endKey = {.groupId = groupId, .ts = sWin.ekey}; + pDelRange->ekey = sWin.ekey; + + SWinKey preKey = {.groupId = groupId}; + code = pAggSup->stateStore.streamStateFillGetPrev(pAggSup->pState, &startKey, &preKey, NULL, NULL); + if (code == TSDB_CODE_SUCCESS) { + pScanRange->skey = preKey.ts; + } else { + pScanRange->skey = startKey.ts; + } + + SWinKey nextKey = {.groupId = groupId}; + code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &endKey, &nextKey, NULL, NULL); + if (code == TSDB_CODE_SUCCESS) { + pScanRange->ekey = nextKey.ts; + } else { + pScanRange->ekey = endKey.ts; + } +} + +static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, + EStreamType mode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + blockDataCleanup(pDestBlock); + if (pSrcBlock->info.rows == 0) { + return TSDB_CODE_SUCCESS; + } + SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; + SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* startData = (TSKEY*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* endData = (TSKEY*)pEndTsCol->pData; + SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* uidCol = (uint64_t*)pUidCol->pData; + SColumnInfoData* pGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* pSrcGp = (uint64_t*)pGpCol->pData; + SColumnInfoData* pSrcPkCol = NULL; + if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) { + pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX); + } + int64_t ver = pSrcBlock->info.version - 1; + + if (pInfo->partitionSup.needCalc && + (startData[0] != endData[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA))) { + code = getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock); + QUERY_CHECK_CODE(code, lino, _end); + startData = (TSKEY*)pStartTsCol->pData; + endData = (TSKEY*)pEndTsCol->pData; + uidCol = (uint64_t*)pUidCol->pData; + pSrcGp = (uint64_t*)pGpCol->pData; + } + + code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows); + QUERY_CHECK_CODE(code, lino, _end); + + SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX); + SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { + uint64_t groupId = pSrcGp[i]; + if (groupId == 0) { + void* pVal = NULL; + if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) { + pVal = colDataGetData(pSrcPkCol, i); + } + groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal); + } + + STimeWindow scanRange = {0}; + STimeWindow delRange = {0}; + ASSERT(mode == STREAM_DELETE_RESULT || mode == STREAM_DELETE_DATA); + getTimeSliceWinRange(pInfo->windowSup.pStreamAggSup, &pInfo->interval, startData[i], endData[i], groupId, &scanRange, &delRange); + + code = colDataSetVal(pDestStartCol, i, (const char*)&scanRange.skey, false); + QUERY_CHECK_CODE(code, lino, _end); + + code = colDataSetVal(pDestEndCol, i, (const char*)&scanRange.ekey, false); + QUERY_CHECK_CODE(code, lino, _end); + + colDataSetNULL(pDestUidCol, i); + code = colDataSetVal(pDestGpCol, i, (const char*)&groupId, false); + QUERY_CHECK_CODE(code, lino, _end); + + code = colDataSetVal(pDestCalStartTsCol, i, (const char*)&delRange.skey, false); + QUERY_CHECK_CODE(code, lino, _end); + + code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&delRange.ekey, false); + QUERY_CHECK_CODE(code, lino, _end); + + pDestBlock->info.rows++; + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, @@ -3033,7 +3141,8 @@ static bool hasScanRange(SStreamScanInfo* pInfo) { } static bool isStreamWindow(SStreamScanInfo* pInfo) { - return isIntervalWindow(pInfo) || isSessionWindow(pInfo) || isStateWindow(pInfo) || isCountWindow(pInfo); + return isIntervalWindow(pInfo) || isSessionWindow(pInfo) || isStateWindow(pInfo) || isCountWindow(pInfo) || + isTimeSlice(pInfo); } static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -4596,7 +4705,7 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx); } } - //TODO wjm check pInfo->filterCtx.code + // TODO wjm check pInfo->filterCtx.code __optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 40ac0da332..883ab4609f 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -20,6 +20,7 @@ #include "querytask.h" #include "storageapi.h" #include "streamexecutorInt.h" +#include "tchecksum.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" @@ -28,7 +29,7 @@ #define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState" #define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint" -#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN) +#define HAS_NON_ROW_DATA(pRowData) (pRowData == NULL || pRowData->key == INT64_MIN) typedef struct SSlicePoint { SWinKey key; @@ -37,12 +38,91 @@ typedef struct SSlicePoint { SRowBuffPos* pResPos; } SSlicePoint; +int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) { + return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0); +} + void streamTimeSliceReleaseState(SOperatorInfo* pOperator) { - // todo(liuyao) add + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; + int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SWinKey); + int32_t resSize = winSize + sizeof(TSKEY); + char* pBuff = taosMemoryCalloc(1, resSize); + QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno); + + memcpy(pBuff, pInfo->historyWins->pData, winSize); + memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); + qDebug("===stream=== time slice operator relase state. save result count:%d", + (int32_t)taosArrayGetSize(pInfo->historyWins)); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_STATE_NAME, + strlen(STREAM_TIME_SLICE_OP_STATE_NAME), pBuff, resSize); + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + taosMemoryFreeClear(pBuff); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.releaseStreamStateFn) { + downstream->fpSet.releaseStreamStateFn(downstream); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } void streamTimeSliceReloadState(SOperatorInfo* pOperator) { - // todo(liuyao) add + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + resetWinRange(&pAggSup->winRange); + + int32_t size = 0; + void* pBuf = NULL; + code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_TIME_SLICE_OP_STATE_NAME, + strlen(STREAM_TIME_SLICE_OP_STATE_NAME), &pBuf, &size); + QUERY_CHECK_CODE(code, lino, _end); + + int32_t num = (size - sizeof(TSKEY)) / sizeof(SWinKey); + qDebug("===stream=== time slice operator reload state. get result count:%d", num); + SWinKey* pKeyBuf = (SWinKey*)pBuf; + ASSERT(size == num * sizeof(SWinKey) + sizeof(TSKEY)); + + TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY)); + pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); + pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts); + + if (!pInfo->pUpdatedMap && num > 0) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pUpdatedMap = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); + } + if (!pInfo->pDeletedMap && num > 0) { + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pDeletedMap = tSimpleHashInit(64, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _end, terrno); + } + for (int32_t i = 0; i < num; i++) { + SWinKey* pKey = pKeyBuf + i; + qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pKey->ts, pKey->groupId, + i); + code = saveTimeSliceWinResult(pKey, pInfo->pUpdatedMap); + QUERY_CHECK_CODE(code, lino, _end); + } + taosMemoryFree(pBuf); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } + reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } } void destroyStreamTimeSliceOperatorInfo(void* param) { @@ -75,13 +155,165 @@ void destroyStreamTimeSliceOperatorInfo(void* param) { taosMemoryFreeClear(param); } -static void doStreamTimeSliceSaveCheckpoint(SOperatorInfo* pOperator) { - // todo(liuyao) add +int32_t doStreamTimeSliceEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, int32_t* pLen) { + int32_t code = TSDB_CODE_SUCCESS; + SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return TSDB_CODE_FAILED; + } + + void* pData = (buf == NULL) ? NULL : *buf; + + // 1.streamAggSup.pResultRows + int32_t tlen = 0; + int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); + tlen += taosEncodeFixedI32(buf, mapSize); + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { + void* key = tSimpleHashGetKey(pIte, &keyLen); + tlen += encodeSSessionKey(buf, key); + tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); + } + + // 2.twAggSup + tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.checksum + if (buf) { + uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t)); + tlen += taosEncodeFixedU32(buf, cksum); + } else { + tlen += sizeof(uint32_t); + } + + (*pLen) = tlen; + return code; } -static bool fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock) { +int32_t doStreamTimeSliceDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (!pInfo) { + code = TSDB_CODE_FAILED; + QUERY_CHECK_CODE(code, lino, _end); + } + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + + // 3.checksum + int32_t dataLen = len - sizeof(uint32_t); + void* pCksum = POINTER_SHIFT(buf, dataLen); + if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { + qError("stream event state is invalid"); + code = TSDB_CODE_FAILED; + QUERY_CHECK_CODE(code, lino, _end); + } + + // 1.streamAggSup.pResultRows + int32_t mapSize = 0; + buf = taosDecodeFixedI32(buf, &mapSize); + for (int32_t i = 0; i < mapSize; i++) { + SResultWindowInfo winfo = {0}; + buf = decodeSSessionKey(buf, &winfo.sessionWin); + int32_t winCode = TSDB_CODE_SUCCESS; + code = pAggSup->stateStore.streamStateSessionAddIfNotExist( + pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + + buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); + code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, + sizeof(SResultWindowInfo)); + QUERY_CHECK_CODE(code, lino, _end); + } + + // 2.twAggSup + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + } + return code; +} + +static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprInfo* pExprInfo, int32_t numOfExprs, + SStreamFillSupporter** ppResFillSup) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); + QUERY_CHECK_NULL(pFillSup, code, lino, _end, terrno); + + pFillSup->numOfFillCols = numOfExprs; + int32_t numOfNotFillCols = 0; + pFillSup->pAllColInfo = createFillColInfo(pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols, + (const SNodeListNode*)(pPhyFillNode->pFillValues)); + QUERY_CHECK_NULL(pFillSup->pAllColInfo, code, lino, _end, terrno); + + pFillSup->type = convertFillType(pPhyFillNode->fillMode); + pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols; + pFillSup->interval.interval = pPhyFillNode->interval; + pFillSup->interval.intervalUnit = pPhyFillNode->intervalUnit; + pFillSup->interval.offset = 0; + pFillSup->interval.offsetUnit = pPhyFillNode->intervalUnit; + pFillSup->interval.precision = pPhyFillNode->precision; + pFillSup->interval.sliding = pPhyFillNode->interval; + pFillSup->interval.slidingUnit = pPhyFillNode->intervalUnit; + pFillSup->pAPI = NULL; + + code = initResultBuf(pFillSup); + QUERY_CHECK_CODE(code, lino, _end); + + pFillSup->pResMap = NULL; + pFillSup->hasDelete = false; + (*ppResFillSup) = pFillSup; + +_end: + if (code != TSDB_CODE_SUCCESS) { + destroyStreamFillSupporter(pFillSup); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static void doStreamTimeSliceSaveCheckpoint(SOperatorInfo* pOperator) { + SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + void* buf = NULL; + if (needSaveStreamOperatorInfo(&pInfo->basic)) { + int32_t len = 0; + code = doStreamTimeSliceEncodeOpState(NULL, 0, pOperator, &len); + QUERY_CHECK_CODE(code, lino, _end); + + buf = taosMemoryCalloc(1, len); + QUERY_CHECK_NULL(buf, code, lino, _end, terrno); + + void* pBuf = buf; + code = doStreamTimeSliceEncodeOpState(&pBuf, len, pOperator, &len); + QUERY_CHECK_CODE(code, lino, _end); + + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME, + strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME), buf, len); + saveStreamOperatorStateComplete(&pInfo->basic); + } + +_end: + taosMemoryFreeClear(buf); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } +} + +static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock, + bool* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pBlock->info.rows >= pBlock->info.capacity) { - return false; + (*pRes) = false; + goto _end; } for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; @@ -89,33 +321,53 @@ static bool fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pRes SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); if (isIrowtsPseudoColumn(pFillCol->pExpr)) { - colDataSetVal(pDstCol, pBlock->info.rows, (char*)&ts, false); + code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&ts, false); + QUERY_CHECK_CODE(code, lino, _end); } else if (isIsfilledPseudoColumn(pFillCol->pExpr)) { bool isFilled = false; - colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false); + code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false); + QUERY_CHECK_CODE(code, lino, _end); } else { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; SResultCellData* pCell = getResultCell(pResRow, srcSlot); - setRowCell(pDstCol, pBlock->info.rows, pCell); + code = setRowCell(pDstCol, pBlock->info.rows, pCell); + QUERY_CHECK_CODE(code, lino, _end); } } pBlock->info.rows += 1; - return true; + (*pRes) = true; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; if (inWinRange(&pFillSup->winRange, &st)) { - fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock); + bool res = true; + code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res); + QUERY_CHECK_CODE(code, lino, _end); } pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillSup->interval.precision); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; @@ -126,10 +378,12 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot); int32_t index = pBlock->info.rows; if (isIrowtsPseudoColumn(pFillCol->pExpr)) { - colDataSetVal(pDstCol, pBlock->info.rows, (char*)&pFillInfo->current, false); + code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&pFillInfo->current, false); + QUERY_CHECK_CODE(code, lino, _end); } else if (isIsfilledPseudoColumn(pFillCol->pExpr)) { bool isFilled = true; - colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false); + code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false); + QUERY_CHECK_CODE(code, lino, _end); } else { if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { colDataSetNULL(pDstCol, index); @@ -144,8 +398,12 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi SPoint cur = {0}; cur.key = pFillInfo->current; cur.val = taosMemoryCalloc(1, pCell->bytes); + QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno); + taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type); - colDataSetVal(pDstCol, index, (const char*)cur.val, false); + code = colDataSetVal(pDstCol, index, (const char*)cur.val, false); + QUERY_CHECK_CODE(code, lino, _end); + destroySPoint(&cur); } } @@ -153,16 +411,27 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi pFillSup->interval.precision); pBlock->info.rows++; } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + bool res = true; if (pFillInfo->needFill == false) { - fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes); + code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res); + QUERY_CHECK_CODE(code, lino, _end); return; } if (pFillInfo->pos == FILL_POS_START) { - if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) { + code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res); + QUERY_CHECK_CODE(code, lino, _end); + if (res) { pFillInfo->pos = FILL_POS_INVALID; } } @@ -172,7 +441,9 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p fillLinearRange(pFillSup, pFillInfo, pRes); if (pFillInfo->pos == FILL_POS_MID) { - if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) { + code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res); + QUERY_CHECK_CODE(code, lino, _end); + if (res) { pFillInfo->pos = FILL_POS_INVALID; } } @@ -186,10 +457,17 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p } } if (pFillInfo->pos == FILL_POS_END) { - if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) { + code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res); + QUERY_CHECK_CODE(code, lino, _end); + if (res) { pFillInfo->pos = FILL_POS_INVALID; } } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId, bool ignoreNull) { @@ -226,53 +504,59 @@ static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, return resRow; } -static void getPointRowDataFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, - SSlicePoint* pPoint) { - int32_t curVLen = 0; - int32_t code = - pAggSup->stateStore.streamStateFillGet(pAggSup->pState, &pPoint->key, (void**)&pPoint->pResPos, &curVLen); - pPoint->pLeftRow = pPoint->pResPos->pRowBuff; - if (pFillSup->type == TSDB_FILL_LINEAR) { - pPoint->pRightRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize); - } else { - pPoint->pRightRow = NULL; - } -} - static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, - int64_t groupId) { + int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, + SSlicePoint* pNextPoint) { void* pState = pAggSup->pState; resetPrevAndNextWindow(pFillSup); + pCurPoint->pResPos = NULL; + pPrevPoint->pResPos = NULL; + pNextPoint->pResPos = NULL; - SWinKey key = {.ts = ts, .groupId = groupId}; - void* curVal = NULL; + pCurPoint->key.groupId = groupId; + pCurPoint->key.ts = ts; int32_t curVLen = 0; - int32_t code = pAggSup->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen); - if (code == TSDB_CODE_SUCCESS) { - pFillSup->cur.key = key.ts; - pFillSup->cur.pRowVal = curVal; + int32_t code = pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen); + pCurPoint->pLeftRow = pCurPoint->pResPos->pRowBuff; + if (pFillSup->type == TSDB_FILL_LINEAR) { + pCurPoint->pRightRow = POINTER_SHIFT(pCurPoint->pResPos->pRowBuff, pFillSup->rowSize); } else { - qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId); - pFillSup->cur.key = ts; - pFillSup->cur.pRowVal = NULL; + pCurPoint->pRightRow = NULL; } - SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId}; - void* preVal = NULL; - int32_t preVLen = 0; - code = pAggSup->stateStore.streamStateFillGetPrev(pState, &key, &preKey, &preVal, &preVLen); - if (code == TSDB_CODE_SUCCESS) { - pFillSup->prev.key = preKey.ts; - pFillSup->prev.pRowVal = preVal; + if (pCurPoint->pLeftRow->key == pCurPoint->key.ts) { + pFillSup->cur.key = pCurPoint->key.ts; + pFillSup->cur.pRowVal = pCurPoint->pResPos->pRowBuff; } - SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId}; - void* nextVal = NULL; - int32_t nextVLen = 0; - code = pAggSup->stateStore.streamStateFillGetNext(pState, &key, &nextKey, &nextVal, &nextVLen); - if (code == TSDB_CODE_SUCCESS) { - pFillSup->next.key = nextKey.ts; - pFillSup->next.pRowVal = nextVal; + if (pPrevPoint) { + if (HAS_NON_ROW_DATA(pCurPoint->pLeftRow)) { + pPrevPoint->key.groupId = groupId; + int32_t preVLen = 0; + code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key, + (void**)&pPrevPoint->pResPos, &preVLen); + if (code == TSDB_CODE_SUCCESS) { + pFillSup->prev.key = pPrevPoint->key.ts; + pFillSup->prev.pRowVal = pPrevPoint->pResPos->pRowBuff; + } + } else { + pFillSup->prev.key = pPrevPoint->pLeftRow->key; + pFillSup->prev.pRowVal = pPrevPoint->pLeftRow->pRowVal; + } + } + + if (HAS_NON_ROW_DATA(pCurPoint->pRightRow)) { + pNextPoint->key.groupId = groupId; + int32_t nextVLen = 0; + code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key, + (void**)&pNextPoint->pResPos, &nextVLen); + if (code == TSDB_CODE_SUCCESS) { + pFillSup->next.key = pNextPoint->key.ts; + pFillSup->next.pRowVal = pNextPoint->pResPos->pRowBuff; + } + } else { + pFillSup->next.key = pCurPoint->pRightRow->key; + pFillSup->next.pRowVal = pCurPoint->pRightRow->pRowVal; } } @@ -460,36 +744,44 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) SResultRowInfo dumyInfo = {0}; dumyInfo.cur.pageId = -1; STimeWindow curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC); - SSlicePoint point = {.key.ts = curWin.skey, .key.groupId = groupId}; - getPointRowDataFromState(pAggSup, pFillSup, &point); - if (needAdjValue(&point, tsCols[startPos], true, pFillSup->type)) { - transBlockToResultRow(pBlock, startPos, tsCols[startPos], point.pLeftRow); - saveWinResult(&point.key, point.pResPos, pInfo->pUpdatedMap); + SSlicePoint curPoint = {.key.ts = curWin.skey, .key.groupId = groupId}; + SSlicePoint prevPoint = {0}; + SSlicePoint nextPoint = {0}; + bool left = false; + bool right = false; + getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint); + right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type); + if (right) { + transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); + saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap); } + releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); while (startPos < pBlock->info.rows) { int32_t numOfWin = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); startPos += numOfWin; int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull); + left = needAdjValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); + if (left) { + transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); + saveTimeSliceWinResult(&nextPoint.key, pInfo->pUpdatedMap); + } + releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); + startPos = getQualifiedRowNumAsc(pExprSup, pBlock, startPos, pInfo->ignoreNull); if (startPos < 0) { break; } curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC); - getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId); - bool left = needAdjValue(&point, tsCols[leftRowId], true, pFillSup->type); - if (left) { - transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], point.pLeftRow); - } - bool right = needAdjValue(&point, tsCols[startPos], false, pFillSup->type); - if (right) { - transBlockToResultRow(pBlock, startPos, tsCols[startPos], point.pRightRow); - } + getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint); - if (left || right) { - saveWinResult(&point.key, point.pResPos, pInfo->pUpdatedMap); + right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type); + if (right) { + transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); + saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap); } + releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); } } @@ -505,21 +797,28 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); - // todo(liuyao) fill 增加接口,get buff from pos设置pFillSup->cur - SWinKey* pKey = (SWinKey*)pPos->pKey; + SWinKey* pKey = (SWinKey*)pPos->pKey; if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = pKey->groupId; } else if (pBlock->info.id.groupId != pKey->groupId) { pGroupResInfo->index--; break; } - getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId); + SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId}; + SSlicePoint prevPoint = {0}; + SSlicePoint nextPoint = {0}; + getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); setTimeSliceFillInfo(pFillSup, pFillInfo, pKey->ts); doStreamFillRange(pFillSup, pFillInfo, pBlock); + releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); + releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore); + releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); } } -static SSDataBlock* buildTimeSliceResult(SOperatorInfo* pOperator) { +static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; uint16_t opType = pOperator->operatorType; @@ -529,25 +828,97 @@ static SSDataBlock* buildTimeSliceResult(SOperatorInfo* pOperator) { if (pInfo->pDelRes->info.rows != 0) { // process the rest of the data printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + (*ppRes) = pInfo->pDelRes; + goto _end; } doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo); if (pInfo->pRes->info.rows != 0) { printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); - return pInfo->pRes; + (*ppRes) = pInfo->pRes; + goto _end; } - return NULL; + (*ppRes) = NULL; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { +int32_t getSliceMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t size = taosArrayGetSize(pAllWins); + if (size == 0) { + goto _end; + } + SWinKey* pKey = taosArrayGet(pAllWins, size - 1); + void* tmp = taosArrayPush(pMaxWins, pKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + + if (pKey->groupId == 0) { + goto _end; + } + uint64_t preGpId = pKey->groupId; + for (int32_t i = size - 2; i >= 0; i--) { + pKey = taosArrayGet(pAllWins, i); + if (preGpId != pKey->groupId) { + void* tmp = taosArrayPush(pMaxWins, pKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + preGpId = pKey->groupId; + } + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static void doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap) { + int32_t winCode = TSDB_CODE_SUCCESS; + + SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* groupIds = (uint64_t*)pGroupCol->pData; + SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + TSKEY* tsCalStarts = (TSKEY*)pCalStartCol->pData; + SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + TSKEY* tsCalEnds = (TSKEY*)pCalEndCol->pData; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + while (1) { + TSKEY ts = tsCalStarts[i]; + TSKEY endTs = tsCalEnds[i]; + uint64_t groupId = groupIds[i]; + SWinKey key = {.ts = ts, .groupId = groupId}; + SWinKey nextKey = {.groupId = groupId}; + winCode = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL); + if (key.ts > endTs) { + break; + } + (void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey)); + pAggSup->stateStore.streamStateDel(pAggSup->pState, &key); + if (winCode != TSDB_CODE_SUCCESS) { + break; + } + key = nextKey; + } + } +} + +static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; if (pOperator->status == OP_EXEC_DONE) { - return NULL; + (*ppRes) = NULL; + goto _end; } if (pOperator->status == OP_RES_TO_RETURN) { @@ -557,23 +928,30 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { doStreamFillRange(pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes); if (pInfo->pRes->info.rows > 0) { printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pRes; + (*ppRes) = pInfo->pRes; + goto _end; } } - SSDataBlock* resBlock = buildTimeSliceResult(pOperator); + SSDataBlock* resBlock = NULL; + code = buildTimeSliceResult(pOperator, &resBlock); + QUERY_CHECK_CODE(code, lino, _end); + if (resBlock != NULL) { - return resBlock; + (*ppRes) = resBlock; + goto _end; } if (pInfo->recvCkBlock) { pInfo->recvCkBlock = false; printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pCheckpointRes; + (*ppRes) = pInfo->pCheckpointRes; + goto _end; } setStreamOperatorCompleted(pOperator); - return NULL; + (*ppRes) = NULL; + goto _end; } SSDataBlock* fillResult = NULL; @@ -589,10 +967,11 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { } pInfo->numOfDatapack++; printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); + setStreamOperatorState(&pInfo->basic, pBlock->info.type); switch (pBlock->info.type) { case STREAM_DELETE_RESULT: { - // todo(liuyao) add + doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap); } break; case STREAM_NORMAL: case STREAM_INVALID: { @@ -610,7 +989,8 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { continue; } break; case STREAM_CREATE_CHILD_TABLE: { - return pBlock; + (*ppRes) = pBlock; + goto _end; } break; default: ASSERTS(false, "invalid SSDataBlock type"); @@ -628,53 +1008,38 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { - taosArrayPush(pInfo->pUpdated, pIte); + void* tmp = taosArrayPush(pInfo->pUpdated, pIte); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + } + taosArraySort(pInfo->pUpdated, winKeyCmprImpl); + + if (pInfo->isHistoryOp) { + code = getSliceMaxTsWins(pInfo->pUpdated, pInfo->historyWins); + QUERY_CHECK_CODE(code, lino, _end); } - taosArraySort(pInfo->pUpdated, winPosCmprImpl); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + QUERY_CHECK_CODE(code, lino, _end); + tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; - return buildTimeSliceResult(pOperator); -} + code = buildTimeSliceResult(pOperator, ppRes); + QUERY_CHECK_CODE(code, lino, _end); -int32_t doStreamTimeSliceEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) { - // todo(liuyao) add - return 0; -} - -void* doStreamTimeSliceDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) { - // todo(liuyao) add - return NULL; -} - -static SStreamFillSupporter* initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprInfo* pExprInfo, - int32_t numOfExprs) { - SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); - if (!pFillSup) { - return NULL; - } - pFillSup->numOfFillCols = numOfExprs; - int32_t numOfNotFillCols = 0; - pFillSup->pAllColInfo = createFillColInfo(pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols, - (const SNodeListNode*)(pPhyFillNode->pFillValues)); - pFillSup->type = convertFillType(pPhyFillNode->fillMode); - pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols; - pFillSup->interval.interval = pPhyFillNode->interval; - // todo(liuyao) 初始化 pFillSup->interval其他属性 - pFillSup->pAPI = NULL; - - int32_t code = initResultBuf(pFillSup); +_end: if (code != TSDB_CODE_SUCCESS) { - destroyStreamFillSupporter(pFillSup); - return NULL; + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } - pFillSup->pResMap = NULL; - pFillSup->hasDelete = false; - return pFillSup; + return code; +} + +static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { + SSDataBlock* pRes = NULL; + (void)doStreamTimeSliceNext(pOperator, &pRes); + return pRes; } int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, @@ -682,10 +1047,11 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SStreamTimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTimeSliceOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pOperator == NULL || pInfo == NULL) { - goto _error; - } + QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno); + + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno); + SStreamInterpFuncPhysiNode* pInterpPhyNode = (SStreamInterpFuncPhysiNode*)pPhyNode; pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(&pOperator->resultInfo, 4096); @@ -724,12 +1090,15 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno); + pInfo->pDelRes = NULL; code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); + pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn); + QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno); pInfo->ignoreExpiredData = pInterpPhyNode->streamNodeOption.igExpired; pInfo->ignoreExpiredDataSaved = false; @@ -745,7 +1114,13 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->destHasPrimaryKey = pInterpPhyNode->streamNodeOption.destHasPrimaryKey; pInfo->numOfDatapack = 0; - pInfo->pFillSup = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs); + pInfo->pFillSup = NULL; + code = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs, &pInfo->pFillSup); + QUERY_CHECK_CODE(code, lino, _error); + + if (pHandle) { + pInfo->isHistoryOp = pHandle->fillHistory; + } pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC; setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC, @@ -757,8 +1132,9 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pTaskInfo->streamInfo.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME, strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME), &buff, &len); if (res == TSDB_CODE_SUCCESS) { - doStreamTimeSliceDecodeOpState(buff, len, pOperator); + code = doStreamTimeSliceDecodeOpState(buff, len, pOperator); taosMemoryFree(buff); + QUERY_CHECK_CODE(code, lino, _error); } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamTimeSlice, NULL, destroyStreamTimeSliceOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0b3dcdea8d..308186f2c7 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1332,7 +1332,7 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { code = tjsonToObject(pJson, jkInterpFuncLogicPlanFillValues, jsonToNode, pNode->pFillValues); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonToObject(pJson, jkInterpFuncLogicPlanTimeSeries, jsonToNode, pNode->pTimeSeries); + code = jsonToNodeObject(pJson, jkInterpFuncLogicPlanTimeSeries, &pNode->pTimeSeries); } if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 23e5d9c902..42609c9a13 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -961,6 +961,8 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) { pInterpFunc->interval = ((SValueNode*)pSelect->pEvery)->datum.i; + pInterpFunc->intervalUnit = ((SValueNode*)pSelect->pEvery)->unit; + pInterpFunc->precision = pSelect->precision; } // set the output diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 26c9710876..9ab10208fc 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1851,6 +1851,8 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh pInterpFunc->timeRange = pFuncLogicNode->timeRange; pInterpFunc->interval = pFuncLogicNode->interval; pInterpFunc->fillMode = pFuncLogicNode->fillMode; + pInterpFunc->intervalUnit = pFuncLogicNode->intervalUnit; + pInterpFunc->precision = pFuncLogicNode->node.precision; pInterpFunc->pFillValues = NULL; code = nodesCloneNode(pFuncLogicNode->pFillValues, &pInterpFunc->pFillValues); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index a5de73095e..3a843a881f 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -30,6 +30,7 @@ int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) { int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen) { int32_t winCode = TSDB_CODE_SUCCESS; int32_t code = getRowBuff(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, &winCode); + SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); @@ -39,16 +40,18 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo pWinStates = taosArrayInit(16, sizeof(SWinKey)); tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); } + + //recover if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { TSKEY ts = getFlushMark(pFileState); SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; void* pState = getStateFileStore(pFileState); SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); - int32_t code = TSDB_CODE_SUCCESS; - for(int32_t i = 0; i < NUM_OF_FLUSED_WIN && code == TSDB_CODE_SUCCESS; i++) { + int32_t winCode = TSDB_CODE_SUCCESS; + for(int32_t i = 0; i < NUM_OF_FLUSED_WIN && winCode == TSDB_CODE_SUCCESS; i++) { SWinKey tmp = {.groupId = pKey->groupId}; - code = streamStateGetGroupKVByCur_rocksdb(pCur, &tmp, NULL, 0); - if (code != TSDB_CODE_SUCCESS) { + winCode = streamStateGetGroupKVByCur_rocksdb(pCur, &tmp, NULL, 0); + if (winCode != TSDB_CODE_SUCCESS) { break; } taosArrayPush(pWinStates, &tmp); @@ -65,7 +68,10 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo if (index == -1) { index = 0; } - taosArrayInsert(pWinStates, index, pKey); + SWinKey* pTmpKey = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pTmpKey, pKey) != 0) { + taosArrayInsert(pWinStates, index, pKey); + } } return code; } @@ -163,3 +169,25 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW return TSDB_CODE_FAILED; } +void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey) { + deleteRowBuff(pFileState, pKey, sizeof(SWinKey)); + SSHashObj* pSearchBuff = getSearchBuff(pFileState); + void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t)); + if (!ppBuff) { + return; + } + SArray* pWinStates = *ppBuff; + int32_t size = taosArrayGetSize(pWinStates); + if (!isFlushedState(pFileState, pKey->ts, 0)) { + // find the first position which is smaller than the pKey + int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); + if (index == -1) { + index = 0; + } + SWinKey* pTmpKey = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pTmpKey, pKey) == 0) { + taosArrayRemove(pWinStates, index); + } + } +} + diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 6b20bd346a..8139520e97 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -249,6 +249,10 @@ int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKe // todo refactor void streamStateFillDel(SStreamState* pState, const SWinKey* key) { + if (pState->pFileState) { + deleteHashSortRowBuff(pState->pFileState, key); + return; + } int32_t code = streamStateFillDel_rocksdb(pState, key); qTrace("%s at line %d res %d", __func__, __LINE__, code); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 92684a0e36..8298f90736 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -574,7 +574,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) { int32_t len = 0; void* p = NULL; - (*pWinCode) = pFileState->stateFileGetFn(pFileState->pFileStore, pKey, &p, &len); + (*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len); qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode)); if ((*pWinCode) == TSDB_CODE_SUCCESS) { memcpy(pNewPos->pRowBuff, p, len);