diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index c2fc76453a..de04288b79 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -350,12 +350,13 @@ typedef struct SStateStore { int32_t (*streamStateGetInfo)(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen); int32_t (*streamStateFillPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); - int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); + int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, + int32_t* pWinCode); void (*streamStateFillDel)(SStreamState* pState, const SWinKey* key); int32_t (*streamStateFillGetNext)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, - int32_t* pVLen); + int32_t* pVLen, int32_t* pWinCode); int32_t (*streamStateFillGetPrev)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, - int32_t* pVLen); + int32_t* pVLen, int32_t* pWinCode); void (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur); void (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 308dee14dd..57b1a7d692 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -208,7 +208,7 @@ typedef struct SInterpFuncLogicNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; - char intervalUnit; + int8_t intervalUnit; int8_t precision; EFillMode fillMode; SNode* pFillValues; // SNodeListNode @@ -513,7 +513,7 @@ typedef struct SInterpFuncPhysiNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; - char intervalUnit; + int8_t intervalUnit; int8_t precision; EFillMode fillMode; SNode* pFillValues; // SNodeListNode diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index db2c3a6391..046812f396 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -75,12 +75,12 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch // fill int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); -int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode); void streamStateFillDel(SStreamState* pState, const SWinKey* key); int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, - int32_t* pVLen); + int32_t* pVLen, int32_t* pWinCode); int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, - int32_t* pVLen); + int32_t* pVLen, int32_t* pWinCode); int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index b19a788990..bb0d49f1f1 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -124,14 +124,15 @@ int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyL int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); // time slice -int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen); +int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, + int32_t* pWinCode); int32_t hashSortFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen); int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey); void clearSearchBuff(SStreamFileState* pFileState); int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, - int32_t* pVLen); + int32_t* pVLen, int32_t* pWinCode); int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, - int32_t* pVLen); + int32_t* pVLen, int32_t* pWinCode); int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId); void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey); diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 00986674fb..3ae7a968ff 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -777,8 +777,10 @@ typedef struct SStreamFillSupporter { int32_t type; // fill type SInterval interval; SResultRowData prev; + TSKEY prevOriginKey; SResultRowData cur; SResultRowData next; + TSKEY nextOriginKey; SResultRowData nextNext; SFillColInfo* pAllColInfo; // fill exprs and not fill exprs SExprSupp notFillExprSup; @@ -963,7 +965,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi, int32_t tsIndex); + SStorageAPI* pApi, int32_t tsIndex, int8_t stateType, int32_t ratio); int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic); int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); @@ -1004,6 +1006,7 @@ void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggS void destroyFlusedPos(void* pRes); bool isIrowtsPseudoColumn(SExprInfo* pExprInfo); bool isIsfilledPseudoColumn(SExprInfo* pExprInfo); +bool isInterpFunc(SExprInfo* pExprInfo); int32_t encodeSSessionKey(void** buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key); diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 8332fb3c0d..9b0e204f73 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -35,27 +35,28 @@ int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption); void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins); int32_t copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins); bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo); -void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal); int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey, STimeWindow* pNextWin); int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap); -void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup); void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index, SSDataBlock* pBlock); -SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index); int32_t initResultBuf(SStreamFillSupporter* pFillSup); -void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup); -void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol); -void resetFillWindow(SResultRowData* pRowData); -bool hasPrevWindow(SStreamFillSupporter* pFillSup); -bool hasNextWindow(SStreamFillSupporter* pFillSup); -void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo); -void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo); -int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell); -bool hasRemainCalc(SStreamFillInfo* pFillInfo); -void destroySPoint(void* ptr); -void destroyStreamFillInfo(SStreamFillInfo* pFillInfo); +SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pRes); +SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index); + +void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup); +void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol); +bool hasCurWindow(SStreamFillSupporter* pFillSup); +bool hasPrevWindow(SStreamFillSupporter* pFillSup); +bool hasNextWindow(SStreamFillSupporter* pFillSup); +void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo); +int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell); +bool hasRemainCalc(SStreamFillInfo* pFillInfo); +void destroySPoint(void* ptr); +void destroyStreamFillInfo(SStreamFillInfo* pFillInfo); +int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes); +void resetStreamFillSup(SStreamFillSupporter* pFillSup); int winPosCmprImpl(const void* pKey1, const void* pKey2); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 61eda480b3..bd0b9057e0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2065,9 +2065,11 @@ _end: return code; } -void getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TSKEY start, TSKEY end, int64_t groupId, +int32_t getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TSKEY start, TSKEY end, int64_t groupId, STimeWindow* pScanRange, STimeWindow* pDelRange) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t winCode = TSDB_CODE_SUCCESS; SResultRowInfo dumyInfo = {0}; dumyInfo.cur.pageId = -1; STimeWindow sWin = getActiveTimeWindow(NULL, &dumyInfo, start, pInterval, TSDB_ORDER_ASC); @@ -2079,20 +2081,28 @@ void getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TS pDelRange->ekey = sWin.ekey; SWinKey preKey = {.groupId = groupId}; - code = pAggSup->stateStore.streamStateFillGetPrev(pAggSup->pState, &startKey, &preKey, NULL, NULL); - if (code == TSDB_CODE_SUCCESS) { + code = pAggSup->stateStore.streamStateFillGetPrev(pAggSup->pState, &startKey, &preKey, NULL, NULL, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + if (winCode == TSDB_CODE_SUCCESS) { pScanRange->skey = preKey.ts; } else { pScanRange->skey = startKey.ts; } SWinKey nextKey = {.groupId = groupId}; - code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &endKey, &nextKey, NULL, NULL); - if (code == TSDB_CODE_SUCCESS) { + code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &endKey, &nextKey, NULL, NULL, &winCode); + QUERY_CHECK_CODE(code, lino, _end); + if (winCode == TSDB_CODE_SUCCESS) { pScanRange->ekey = nextKey.ts; } else { pScanRange->ekey = endKey.ts; } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, @@ -2150,7 +2160,9 @@ static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* p STimeWindow scanRange = {0}; STimeWindow delRange = {0}; ASSERT(mode == STREAM_DELETE_RESULT || mode == STREAM_DELETE_DATA); - getTimeSliceWinRange(pInfo->windowSup.pStreamAggSup, &pInfo->interval, startData[i], endData[i], groupId, &scanRange, &delRange); + code = getTimeSliceWinRange(pInfo->windowSup.pStreamAggSup, &pInfo->interval, startData[i], endData[i], groupId, + &scanRange, &delRange); + QUERY_CHECK_CODE(code, lino, _end); code = colDataSetVal(pDestStartCol, i, (const char*)&scanRange.skey, false); QUERY_CHECK_CODE(code, lino, _end); diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 9b9e9e2bf9..69ff56c530 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -803,8 +803,8 @@ _end: } } -int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { +int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); SCountWinodwPhysiNode* pCountNode = (SCountWinodwPhysiNode*)pPhyNode; @@ -845,7 +845,8 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, - GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex, + STREAM_STATE_BUFF_SORT, 1); QUERY_CHECK_CODE(code, lino, _error); pInfo->streamAggSup.windowCount = pCountNode->windowCount; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 86fc323991..3efaf0c002 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -518,8 +518,8 @@ int32_t doStreamEventDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera QUERY_CHECK_CODE(code, lino, _end); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); - code = - tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo, + sizeof(SResultWindowInfo)); QUERY_CHECK_CODE(code, lino, _end); } @@ -892,7 +892,8 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->primaryTsIndex = tsSlotId; code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, sizeof(bool) + sizeof(bool), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); + &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex, + STREAM_STATE_BUFF_SORT, 1); QUERY_CHECK_CODE(code, lino, _error); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index c8607267c7..e08101fe7a 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -32,9 +32,6 @@ #include "querytask.h" #include "tdatablock.h" #include "tfill.h" -#include "operator.h" -#include "querytask.h" - #define FILL_POS_INVALID 0 #define FILL_POS_START 1 @@ -81,7 +78,9 @@ void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) destroyExprInfo(pFillCol[i].pExpr, 1); taosVariantDestroy(&pFillCol[i].fillVal); } - taosMemoryFreeClear(pFillCol[start].pExpr); + if (start < end) { + taosMemoryFreeClear(pFillCol[start].pExpr); + } taosMemoryFree(pFillCol); return NULL; } @@ -140,12 +139,12 @@ static void destroyStreamFillOperatorInfo(void* param) { taosMemoryFree(pInfo); } -void resetFillWindow(SResultRowData* pRowData) { +static void resetFillWindow(SResultRowData* pRowData) { pRowData->key = INT64_MIN; taosMemoryFreeClear(pRowData->pRowVal); } -void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) { +static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) { if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) { resetFillWindow(&pFillSup->cur); } else { @@ -166,7 +165,7 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI SWinKey key = {.ts = ts, .groupId = groupId}; int32_t curVLen = 0; - int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen); + int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&pFillSup->cur.pRowVal, &curVLen, NULL); ASSERT(code == TSDB_CODE_SUCCESS); pFillSup->cur.key = key.ts; } @@ -180,7 +179,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, void* curVal = NULL; int32_t curVLen = 0; bool hasCurKey = true; - int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen); + int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen, NULL); if (code == TSDB_CODE_SUCCESS) { pFillSup->cur.key = key.ts; pFillSup->cur.pRowVal = curVal; @@ -233,14 +232,12 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, pAPI->stateStore.streamStateFreeCur(pCur); } -bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; } -bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; } -static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { - return pFillSup->nextNext.key != INT64_MIN; - return false; -} +bool hasCurWindow(SStreamFillSupporter* pFillSup) { return pFillSup->cur.key != INT64_MIN; } +bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; } +bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; } +static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->nextNext.key != INT64_MIN; } -void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) { +static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); @@ -262,8 +259,7 @@ void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, S pRowVal->key = ts; } -void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, - int32_t numOfCol) { +void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) { for (int32_t i = 0; i < numOfCol; i++) { if (!pFillCol[i].notFillCol) { int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i); @@ -285,7 +281,7 @@ static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFil pFillInfo->end = ts; } -void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) { +static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) { setFillInfoStart(start, pInterval, pFillInfo); pFillInfo->current = pFillInfo->start; setFillInfoEnd(end, pInterval, pFillInfo); @@ -460,12 +456,13 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS ASSERT(pFillInfo->pos != FILL_POS_INVALID); } -static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) { +int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SWinKey key = {.groupId = groupId, .ts = ts}; if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) { (*pRes) = false; + goto _end; } code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0); QUERY_CHECK_CODE(code, lino, _end); @@ -721,7 +718,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { } while (pInfo->srcRowIndex < pBlock->info.rows) { - TSKEY ts = tsCol[pInfo->srcRowIndex]; keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes); if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) { @@ -859,7 +855,7 @@ static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_ SWinKey key = {.ts = ts, .groupId = groupId}; void* val = NULL; int32_t len = 0; - int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len); + int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len, NULL); if (code != TSDB_CODE_SUCCESS) { qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId); @@ -971,9 +967,12 @@ _end: return code; } -static void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) { - tSimpleHashClear(pInfo->pFillSup->pResMap); - pInfo->pFillSup->hasDelete = false; +void resetStreamFillSup(SStreamFillSupporter* pFillSup) { + tSimpleHashClear(pFillSup->pResMap); + pFillSup->hasDelete = false; +} +void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) { + resetStreamFillSup(pInfo->pFillSup); taosArrayClear(pInfo->pFillInfo->delRanges); pInfo->pFillInfo->delIndex = 0; } @@ -1323,8 +1322,41 @@ _end: return NULL; } +static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { + if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) { + for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { + SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; + int32_t slotId = GET_DEST_SLOT_ID(pFillCol); + SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId); + SVariant* pVar = &(pFillCol->fillVal); + if (pCell->type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + SET_TYPED_DATA(pCell->pData, pCell->type, v); + } else if (IS_FLOAT_TYPE(pCell->type)) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + SET_TYPED_DATA(pCell->pData, pCell->type, v); + } else if (IS_INTEGER_TYPE(pCell->type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + SET_TYPED_DATA(pCell->pData, pCell->type, v); + } else { + pCell->isNull = true; + } + } + } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) { + for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { + SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; + int32_t slotId = GET_DEST_SLOT_ID(pFillCol); + SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId); + pCell->isNull = true; + } + } +} + int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, - SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { + SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -1359,36 +1391,7 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi goto _error; } - if (pInfo->pFillInfo->type == TSDB_FILL_SET_VALUE || pInfo->pFillInfo->type == TSDB_FILL_SET_VALUE_F) { - for (int32_t i = 0; i < pInfo->pFillSup->numOfAllCols; ++i) { - SFillColInfo* pFillCol = pInfo->pFillSup->pAllColInfo + i; - int32_t slotId = GET_DEST_SLOT_ID(pFillCol); - SResultCellData* pCell = getResultCell(pInfo->pFillInfo->pResRow, slotId); - SVariant* pVar = &(pFillCol->fillVal); - if (pCell->type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - SET_TYPED_DATA(pCell->pData, pCell->type, v); - } else if (IS_FLOAT_TYPE(pCell->type)) { - double v = 0; - GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - SET_TYPED_DATA(pCell->pData, pCell->type, v); - } else if (IS_INTEGER_TYPE(pCell->type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - SET_TYPED_DATA(pCell->pData, pCell->type, v); - } else { - pCell->isNull = true; - } - } - } else if (pInfo->pFillInfo->type == TSDB_FILL_NULL || pInfo->pFillInfo->type == TSDB_FILL_NULL_F) { - for (int32_t i = 0; i < pInfo->pFillSup->numOfAllCols; ++i) { - SFillColInfo* pFillCol = pInfo->pFillSup->pAllColInfo + i; - int32_t slotId = GET_DEST_SLOT_ID(pFillCol); - SResultCellData* pCell = getResultCell(pInfo->pFillInfo->pResRow, slotId); - pCell->isNull = true; - } - } + setValueForFillInfo(pInfo->pFillSup, pInfo->pFillInfo); code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes); QUERY_CHECK_CODE(code, lino, _error); diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index f49e45457c..a58c80d986 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -29,13 +29,21 @@ #define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState" #define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint" -#define HAS_NON_ROW_DATA(pRowData) (pRowData == NULL || pRowData->key == INT64_MIN) +#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN) +#define HAS_ROW_DATA(pRowData) (pRowData && pRowData->key != INT64_MIN) +#define IS_INVALID_WIN_KEY(ts) ((ts) == INT64_MIN) +#define SET_WIN_KEY_INVALID(ts) ((ts) = INT64_MIN) + +typedef struct SSliceRowData { + TSKEY key; + SResultCellData pRowVal[]; +} SSliceRowData; typedef struct SSlicePoint { - SWinKey key; - SResultRowData* pLeftRow; - SResultRowData* pRightRow; - SRowBuffPos* pResPos; + SWinKey key; + SSliceRowData* pLeftRow; + SSliceRowData* pRightRow; + SRowBuffPos* pResPos; } SSlicePoint; int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) { @@ -121,10 +129,23 @@ _end: } } +static void resetFillWindow(SResultRowData* pRowData) { + pRowData->key = INT64_MIN; + pRowData->pRowVal = NULL; +} + +static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) { + resetFillWindow(&pFillSup->cur); + resetFillWindow(&pFillSup->prev); + resetFillWindow(&pFillSup->next); + resetFillWindow(&pFillSup->nextNext); +} + void destroyStreamTimeSliceOperatorInfo(void* param) { SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param; colDataDestroy(&pInfo->twAggSup.timeWindowData); destroyStreamAggSupporter(&pInfo->streamAggSup); + resetPrevAndNextWindow(pInfo->pFillSup); destroyStreamFillSupporter(pInfo->pFillSup); destroyStreamFillInfo(pInfo->pFillInfo); blockDataDestroy(pInfo->pRes); @@ -168,8 +189,8 @@ int32_t doStreamTimeSliceEncodeOpState(void** buf, int32_t len, SOperatorInfo* p size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { - void* key = tSimpleHashGetKey(pIte, &keyLen); - tlen += encodeSSessionKey(buf, key); + void* pKey = tSimpleHashGetKey(pIte, &keyLen); + tlen += encodeSSessionKey(buf, pKey); tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); } @@ -258,11 +279,13 @@ static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SE pFillSup->interval.sliding = pPhyFillNode->interval; pFillSup->interval.slidingUnit = pPhyFillNode->intervalUnit; pFillSup->pAPI = NULL; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pFillSup->pResMap = tSimpleHashInit(16, hashFn); + QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno); code = initResultBuf(pFillSup); QUERY_CHECK_CODE(code, lino, _end); - pFillSup->pResMap = NULL; pFillSup->hasDelete = false; (*ppResFillSup) = pFillSup; @@ -303,6 +326,19 @@ _end: } } +static SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) { + if (!pRowVal) { + return NULL; + } + char* pData = (char*)pRowVal; + SResultCellData* pCell = pRowVal; + for (int32_t i = 0; i < index; i++) { + pData += (pCell->bytes + sizeof(SResultCellData)); + pCell = (SResultCellData*)pData; + } + return pCell; +} + static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock, bool* pRes) { int32_t code = TSDB_CODE_SUCCESS; @@ -311,6 +347,18 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p (*pRes) = false; goto _end; } + if (IS_INVALID_WIN_KEY(ts)) { + (*pRes) = true; + goto _end; + } + bool ckRes = true; + code = checkResult(pFillSup, ts, pBlock->info.id.groupId, &ckRes); + QUERY_CHECK_CODE(code, lino, _end); + if (!ckRes) { + (*pRes) = true; + goto _end; + } + for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; int32_t dstSlotId = GET_DEST_SLOT_ID(pFillCol); @@ -325,13 +373,13 @@ static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* p QUERY_CHECK_CODE(code, lino, _end); } else { int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; - SResultCellData* pCell = getResultCell(pResRow, srcSlot); + SResultCellData* pCell = getSliceResultCell(pResRow->pRowVal, srcSlot); code = setRowCell(pDstCol, pBlock->info.rows, pCell); QUERY_CHECK_CODE(code, lino, _end); } } - pBlock->info.rows += 1; + pBlock->info.rows++; (*pRes) = true; _end: @@ -365,13 +413,14 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { - for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { + bool ckRes = true; + code = checkResult(pFillSup, pFillInfo->current, pBlock->info.id.groupId, &ckRes); + QUERY_CHECK_CODE(code, lino, _end); + for (int32_t i = 0; i < pFillSup->numOfAllCols && ckRes; ++i) { SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; int32_t dstSlotId = GET_DEST_SLOT_ID(pFillCol); - int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); int16_t type = pDstCol->info.type; - SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot); int32_t index = pBlock->info.rows; if (isIrowtsPseudoColumn(pFillCol->pExpr)) { code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&pFillInfo->current, false); @@ -381,6 +430,8 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false); QUERY_CHECK_CODE(code, lino, _end); } else { + int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; + SResultCellData* pCell = getSliceResultCell(pFillInfo->pResRow->pRowVal, srcSlot); if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) { colDataSetNULL(pDstCol, index); continue; @@ -405,7 +456,9 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi } pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillSup->interval.precision); - pBlock->info.rows++; + if (ckRes) { + pBlock->info.rows++; + } } _end: @@ -414,6 +467,12 @@ _end: } } +static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) { + pFillInfo->start = start; + pFillInfo->current = pFillInfo->start; + pFillInfo->end = end; +} + static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; @@ -467,6 +526,10 @@ _end: } static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId, bool ignoreNull) { + if (rowId >= pBlock->info.rows) { + return -1; + } + if (!ignoreNull) { return rowId; } @@ -500,9 +563,37 @@ static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, return resRow; } -static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, - int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, - SSlicePoint* pNextPoint, int32_t* pWinCode) { +static void setResultRowData(SSliceRowData** ppRowData, void* pBuff) { (*ppRowData) = (SSliceRowData*)pBuff; } + +static void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup) { + if (pFillSup->type != TSDB_FILL_LINEAR) { + setResultRowData(&pPoint->pRightRow, pPoint->pResPos->pRowBuff); + pPoint->pLeftRow = pPoint->pRightRow; + } else { + setResultRowData(&pPoint->pLeftRow, pPoint->pResPos->pRowBuff); + void* pBuff = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize + sizeof(TSKEY)); + setResultRowData(&pPoint->pRightRow, pBuff); + } +} + +static TSKEY adustPrevTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) { + if (rowTs >= pointTs) { + pointTs = taosTimeAdd(pointTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); + } + return pointTs; +} + +static TSKEY adustEndTsKey(TSKEY pointTs, TSKEY rowTs, SInterval* pInterval) { + if (rowTs <= pointTs) { + pointTs = taosTimeAdd(pointTs, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision); + } + return pointTs; +} +static int32_t getLinearResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, + int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, + SSlicePoint* pNextPoint, int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t tmpRes = TSDB_CODE_SUCCESS; void* pState = pAggSup->pState; resetPrevAndNextWindow(pFillSup); @@ -513,55 +604,252 @@ static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSuppo pCurPoint->key.groupId = groupId; pCurPoint->key.ts = ts; int32_t curVLen = 0; - (*pWinCode) = pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen); - pCurPoint->pLeftRow = pCurPoint->pResPos->pRowBuff; - if (pFillSup->type == TSDB_FILL_LINEAR) { - pCurPoint->pRightRow = POINTER_SHIFT(pCurPoint->pResPos->pRowBuff, pFillSup->rowSize); - } else { - pCurPoint->pRightRow = NULL; - } + code = + pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, pWinCode); + QUERY_CHECK_CODE(code, lino, _end); - if (pCurPoint->pLeftRow->key == pCurPoint->key.ts) { - pFillSup->cur.key = pCurPoint->key.ts; - pFillSup->cur.pRowVal = pCurPoint->pResPos->pRowBuff; - } + setPointBuff(pCurPoint, pFillSup); - if (pPrevPoint) { + if (HAS_ROW_DATA(pCurPoint->pRightRow)) { + pFillSup->cur.key = pCurPoint->pRightRow->key; + pFillSup->cur.pRowVal = pCurPoint->pRightRow->pRowVal; if (HAS_NON_ROW_DATA(pCurPoint->pLeftRow)) { pPrevPoint->key.groupId = groupId; int32_t preVLen = 0; - tmpRes = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key, - (void**)&pPrevPoint->pResPos, &preVLen); + code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key, + (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes); + QUERY_CHECK_CODE(code, lino, _end); if (tmpRes == TSDB_CODE_SUCCESS) { - pFillSup->prev.key = pPrevPoint->key.ts; - pFillSup->prev.pRowVal = pPrevPoint->pResPos->pRowBuff; + ASSERT(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts)); + setPointBuff(pPrevPoint, pFillSup); + if (HAS_ROW_DATA(pPrevPoint->pRightRow)) { + pFillSup->prev.key = pPrevPoint->pRightRow->key; + pFillSup->prev.pRowVal = pPrevPoint->pRightRow->pRowVal; + } else { + pFillSup->prev.key = pPrevPoint->pLeftRow->key; + pFillSup->prev.pRowVal = pPrevPoint->pLeftRow->pRowVal; + } + pFillSup->prevOriginKey = pFillSup->prev.key; + pFillSup->prev.key = adustPrevTsKey(pPrevPoint->key.ts, pFillSup->prev.key, &pFillSup->interval); } + goto _end; + } + } + + if (HAS_ROW_DATA(pCurPoint->pLeftRow)) { + pFillSup->prev.key = pCurPoint->pLeftRow->key; + pFillSup->prev.pRowVal = pCurPoint->pLeftRow->pRowVal; + pFillSup->prevOriginKey = pFillSup->prev.key; + pFillSup->prev.key = adustPrevTsKey(pCurPoint->key.ts, pFillSup->prev.key, &pFillSup->interval); + if (HAS_NON_ROW_DATA(pCurPoint->pRightRow)) { + pNextPoint->key.groupId = groupId; + int32_t nextVLen = 0; + code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key, + (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes); + QUERY_CHECK_CODE(code, lino, _end); + if (tmpRes == TSDB_CODE_SUCCESS) { + ASSERT(!IS_INVALID_WIN_KEY(pNextPoint->key.ts)); + setPointBuff(pNextPoint, pFillSup); + if (HAS_ROW_DATA(pNextPoint->pLeftRow)) { + pFillSup->next.key = pNextPoint->pLeftRow->key; + pFillSup->next.pRowVal = pNextPoint->pLeftRow->pRowVal; + } else { + pFillSup->next.key = pNextPoint->pRightRow->key; + pFillSup->next.pRowVal = pNextPoint->pRightRow->pRowVal; + } + pFillSup->nextOriginKey = pFillSup->next.key; + pFillSup->next.key = adustEndTsKey(pNextPoint->key.ts, pFillSup->next.key, &pFillSup->interval); + } else { + resetFillWindow(&pFillSup->prev); + } + } + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t getResultInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, + int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, + SSlicePoint* pNextPoint, int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t tmpRes = TSDB_CODE_SUCCESS; + void* pState = pAggSup->pState; + resetPrevAndNextWindow(pFillSup); + pCurPoint->pResPos = NULL; + pPrevPoint->pResPos = NULL; + pNextPoint->pResPos = NULL; + + pCurPoint->key.groupId = groupId; + pCurPoint->key.ts = ts; + int32_t curVLen = 0; + code = + pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, pWinCode); + QUERY_CHECK_CODE(code, lino, _end); + + setPointBuff(pCurPoint, pFillSup); + + pFillSup->cur.key = pCurPoint->pRightRow->key; + pFillSup->cur.pRowVal = pCurPoint->pRightRow->pRowVal; + + pPrevPoint->key.groupId = groupId; + int32_t preVLen = 0; + code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key, + (void**)&pPrevPoint->pResPos, &preVLen, &tmpRes); + QUERY_CHECK_CODE(code, lino, _end); + if (tmpRes == TSDB_CODE_SUCCESS) { + ASSERT(!IS_INVALID_WIN_KEY(pPrevPoint->key.ts)); + setPointBuff(pPrevPoint, pFillSup); + if (HAS_ROW_DATA(pPrevPoint->pRightRow)) { + pFillSup->prev.key = pPrevPoint->pRightRow->key; + pFillSup->prev.pRowVal = pPrevPoint->pRightRow->pRowVal; } else { pFillSup->prev.key = pPrevPoint->pLeftRow->key; pFillSup->prev.pRowVal = pPrevPoint->pLeftRow->pRowVal; } + pFillSup->prev.key = adustPrevTsKey(pPrevPoint->key.ts, pFillSup->prev.key, &pFillSup->interval); } - if (HAS_NON_ROW_DATA(pCurPoint->pRightRow)) { - pNextPoint->key.groupId = groupId; - int32_t nextVLen = 0; - tmpRes = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key, - (void**)&pNextPoint->pResPos, &nextVLen); - if (tmpRes == TSDB_CODE_SUCCESS) { - pFillSup->next.key = pNextPoint->key.ts; - pFillSup->next.pRowVal = pNextPoint->pResPos->pRowBuff; + pNextPoint->key.groupId = groupId; + int32_t nextVLen = 0; + code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key, + (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes); + QUERY_CHECK_CODE(code, lino, _end); + if (tmpRes == TSDB_CODE_SUCCESS) { + ASSERT(!IS_INVALID_WIN_KEY(pNextPoint->key.ts)); + setPointBuff(pNextPoint, pFillSup); + if (HAS_ROW_DATA(pNextPoint->pLeftRow)) { + pFillSup->next.key = pNextPoint->pLeftRow->key; + pFillSup->next.pRowVal = pNextPoint->pLeftRow->pRowVal; + } else { + pFillSup->next.key = pNextPoint->pRightRow->key; + pFillSup->next.pRowVal = pNextPoint->pRightRow->pRowVal; } - } else { - pFillSup->next.key = pCurPoint->pRightRow->key; - pFillSup->next.pRowVal = pCurPoint->pRightRow->pRowVal; + pFillSup->next.key = adustEndTsKey(pNextPoint->key.ts, pFillSup->next.key, &pFillSup->interval); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -static void setTimeSliceFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { +static int32_t getPointInfoFromStateRight(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, + int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pNextPoint, + int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t tmpRes = TSDB_CODE_SUCCESS; + void* pState = pAggSup->pState; + pCurPoint->pResPos = NULL; + pNextPoint->pResPos = NULL; + + pNextPoint->key.groupId = groupId; + STimeWindow stw = {.skey = ts, .ekey = ts}; + getNextTimeWindow(&pFillSup->interval, &stw, TSDB_ORDER_ASC); + pNextPoint->key.ts = stw.skey; + + int32_t curVLen = 0; + code = pAggSup->stateStore.streamStateFillGet(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos, &curVLen, + pWinCode); + QUERY_CHECK_CODE(code, lino, _end); + + setPointBuff(pNextPoint, pFillSup); + + if (*pWinCode != TSDB_CODE_SUCCESS) { + if (pNextPoint->pLeftRow) { + SET_WIN_KEY_INVALID(pNextPoint->pLeftRow->key); + } + if (pNextPoint->pRightRow) { + SET_WIN_KEY_INVALID(pNextPoint->pRightRow->key); + } + } + + SET_WIN_KEY_INVALID(pCurPoint->key.ts); + pCurPoint->key.groupId = groupId; + int32_t nextVLen = 0; + code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pNextPoint->key, &pCurPoint->key, + (void**)&pCurPoint->pResPos, &nextVLen, &tmpRes); + QUERY_CHECK_CODE(code, lino, _end); + if (tmpRes == TSDB_CODE_SUCCESS) { + setPointBuff(pCurPoint, pFillSup); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, + int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pNextPoint, + int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t tmpRes = TSDB_CODE_SUCCESS; + void* pState = pAggSup->pState; + pCurPoint->pResPos = NULL; + pNextPoint->pResPos = NULL; + pCurPoint->key.groupId = groupId; + pCurPoint->key.ts = ts; + + int32_t curVLen = 0; + code = + pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen, pWinCode); + QUERY_CHECK_CODE(code, lino, _end); + + setPointBuff(pCurPoint, pFillSup); + + if (*pWinCode != TSDB_CODE_SUCCESS) { + if (pCurPoint->pLeftRow) { + SET_WIN_KEY_INVALID(pCurPoint->pLeftRow->key); + } + if (pCurPoint->pRightRow) { + SET_WIN_KEY_INVALID(pCurPoint->pRightRow->key); + } + } + + int32_t nextVLen = 0; + pNextPoint->key.groupId = groupId; + if (pFillSup->type != TSDB_FILL_LINEAR) { + SET_WIN_KEY_INVALID(pNextPoint->key.ts); + code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key, + (void**)&pNextPoint->pResPos, &nextVLen, &tmpRes); + QUERY_CHECK_CODE(code, lino, _end); + if (tmpRes == TSDB_CODE_SUCCESS) { + setPointBuff(pNextPoint, pFillSup); + } + } else { + pNextPoint->key.ts = taosTimeAdd(pCurPoint->key.ts, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, + pFillSup->interval.precision); + code = pAggSup->stateStore.streamStateFillGet(pState, &pNextPoint->key, (void**)&pNextPoint->pResPos, &nextVLen, + &tmpRes); + QUERY_CHECK_CODE(code, lino, _end); + setPointBuff(pNextPoint, pFillSup); + if (tmpRes != TSDB_CODE_SUCCESS) { + SET_WIN_KEY_INVALID(pNextPoint->pLeftRow->key); + SET_WIN_KEY_INVALID(pNextPoint->pRightRow->key); + } + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + +static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) { pFillInfo->needFill = false; pFillInfo->pos = FILL_POS_START; - return; + goto _end; } TSKEY prevWKey = INT64_MIN; TSKEY nextWKey = INT64_MIN; @@ -579,73 +867,79 @@ static void setTimeSliceFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo case TSDB_FILL_NULL_F: case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE_F: { - if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) { - pFillInfo->needFill = false; - pFillInfo->pos = FILL_POS_START; - } else if (hasPrevWindow(pFillSup)) { - setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + if (hasPrevWindow(pFillSup)) { + TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; } else { - setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); + TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; } copyNotFillExpData(pFillSup, pFillInfo); + pFillInfo->pResRow->key = ts; } break; case TSDB_FILL_PREV: { if (hasNextWindow(pFillSup)) { - setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); + TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; resetFillWindow(&pFillSup->prev); - pFillSup->prev.key = pFillSup->cur.key; + pFillSup->prev.key = ts; pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; } else { ASSERT(hasPrevWindow(pFillSup)); - setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; - pFillInfo->preRowKey = INT64_MIN; } pFillInfo->pResRow = &pFillSup->prev; } break; case TSDB_FILL_NEXT: { if (hasPrevWindow(pFillSup)) { - setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; resetFillWindow(&pFillSup->next); - pFillSup->next.key = pFillSup->cur.key; + pFillSup->next.key = ts; pFillSup->next.pRowVal = pFillSup->cur.pRowVal; - pFillInfo->preRowKey = INT64_MIN; } else { ASSERT(hasNextWindow(pFillSup)); - setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); + TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; + resetFillWindow(&pFillSup->prev); } pFillInfo->pResRow = &pFillSup->next; } break; case TSDB_FILL_LINEAR: { if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) { - setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); - pFillInfo->pos = FILL_POS_MID; - pFillInfo->pLinearInfo->nextEnd = nextWKey; - calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + setFillKeyInfo(prevWKey, nextWKey, &pFillSup->interval, pFillInfo); + pFillInfo->pos = FILL_POS_INVALID; + SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); + pFillSup->next.key = pFillSup->nextOriginKey; + calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, pFillSup->numOfAllCols); + pFillSup->prev.key = pFillSup->prevOriginKey; pFillInfo->pResRow = &pFillSup->prev; - - calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo, - pFillSup->numOfAllCols); - pFillInfo->pLinearInfo->hasNext = true; + pFillInfo->pLinearInfo->hasNext = false; } else if (hasPrevWindow(pFillSup)) { - setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); + TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; - pFillInfo->pLinearInfo->nextEnd = INT64_MIN; + SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, pFillSup->numOfAllCols); + pFillSup->prev.key = pFillSup->prevOriginKey; pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; } else { ASSERT(hasNextWindow(pFillSup)); - setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); + TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); + setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; - pFillInfo->pLinearInfo->nextEnd = INT64_MIN; + SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); + pFillSup->next.key = pFillSup->nextOriginKey; calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, pFillSup->numOfAllCols); pFillInfo->pResRow = &pFillSup->cur; @@ -656,33 +950,46 @@ static void setTimeSliceFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo ASSERT(0); break; } - ASSERT(pFillInfo->pos != FILL_POS_INVALID); + +_end: + if (ts != pFillSup->cur.key) { + SET_WIN_KEY_INVALID(pFillSup->cur.key); + pFillInfo->pos = FILL_POS_INVALID; + } } -static bool needAdjValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fillType) { +static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fillType) { + if (IS_INVALID_WIN_KEY(pPoint->key.ts)) { + return false; + } + switch (fillType) { case TSDB_FILL_NULL: case TSDB_FILL_NULL_F: case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE_F: { - if (HAS_NON_ROW_DATA(pPoint->pRightRow) && HAS_NON_ROW_DATA(pPoint->pLeftRow)) { + if (!isLeft && HAS_NON_ROW_DATA(pPoint->pRightRow)) { return true; } } break; case TSDB_FILL_PREV: { - if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key < ts)) { + if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key <= ts)) { + return true; + } + + if (!isLeft && pPoint->key.ts == ts) { return true; } } break; case TSDB_FILL_NEXT: { - if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key > ts)) { + if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key >= ts)) { return true; } } break; case TSDB_FILL_LINEAR: { - if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key < ts)) { + if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key <= ts)) { return true; - } else if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key > ts)) { + } else if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key >= ts)) { return true; } } break; @@ -692,7 +999,31 @@ static bool needAdjValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fil return false; } +static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); + SResultCellData* pCell = getSliceResultCell(pRowVal->pRowVal, i); + if (!colDataIsNull_s(pColData, rowId)) { + pCell->isNull = false; + pCell->type = pColData->info.type; + pCell->bytes = pColData->info.bytes; + char* val = colDataGetData(pColData, rowId); + if (IS_VAR_DATA_TYPE(pCell->type)) { + memcpy(pCell->pData, val, varDataTLen(val)); + } else { + memcpy(pCell->pData, val, pCell->bytes); + } + } else { + pCell->isNull = true; + } + } + pRowVal->key = ts; +} + static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t winCode = TSDB_CODE_SUCCESS; SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -713,7 +1044,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow; if (pFillSup->winRange.ekey <= 0) { - pFillSup->winRange.ekey = INT64_MIN; + pFillSup->winRange.ekey = INT64_MAX; } int32_t startPos = 0; @@ -733,6 +1064,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) if (checkNullRow(pExprSup, pBlock, startPos, pInfo->ignoreNull)) { continue; } + break; } if (startPos >= pBlock->info.rows) { @@ -742,13 +1074,18 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) SResultRowInfo dumyInfo = {0}; dumyInfo.cur.pageId = -1; STimeWindow curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC); - SSlicePoint curPoint = {.key.ts = curWin.skey, .key.groupId = groupId}; - SSlicePoint prevPoint = {0}; + SSlicePoint curPoint = {0}; SSlicePoint nextPoint = {0}; bool left = false; bool right = false; - getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint, &winCode); - right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type); + if (pFillSup->type != TSDB_FILL_PREV || curWin.skey == tsCols[startPos]) { + code = getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &nextPoint, &winCode); + } else { + code = getPointInfoFromStateRight(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &nextPoint, &winCode); + } + QUERY_CHECK_CODE(code, lino, _end); + + right = needAdjustValue(&curPoint, tsCols[startPos], false, pFillSup->type); if (right) { transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap); @@ -763,7 +1100,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) TSDB_ORDER_ASC); startPos += numOfWin; int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull); - left = needAdjValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); + left = needAdjustValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type); if (left) { transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); saveTimeSliceWinResult(&nextPoint.key, pInfo->pUpdatedMap); @@ -778,9 +1115,14 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) break; } curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC); - getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint, &winCode); + if (pFillSup->type != TSDB_FILL_PREV || curWin.skey == tsCols[startPos]) { + code = getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &nextPoint, &winCode); + } else { + code = getPointInfoFromStateRight(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &nextPoint, &winCode); + } + QUERY_CHECK_CODE(code, lino, _end); - right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type); + right = needAdjustValue(&curPoint, tsCols[startPos], false, pFillSup->type); if (right) { transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap); @@ -790,10 +1132,17 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) } releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; blockDataCleanup(pBlock); if (!hasRemainResults(pGroupResInfo)) { return; @@ -803,8 +1152,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor pBlock->info.id.groupId = 0; int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) { - SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); - SWinKey* pKey = (SWinKey*)pPos->pKey; + SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index); if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = pKey->groupId; } else if (pBlock->info.id.groupId != pKey->groupId) { @@ -814,14 +1162,27 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId}; SSlicePoint prevPoint = {0}; SSlicePoint nextPoint = {0}; - int32_t winCode = TSDB_CODE_SUCCESS; - getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, &winCode); - setTimeSliceFillInfo(pFillSup, pFillInfo, pKey->ts); + int32_t winCode = TSDB_CODE_SUCCESS; + if (pFillSup->type != TSDB_FILL_LINEAR) { + code = getResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, + &winCode); + } else { + code = getLinearResultInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, + &winCode); + } + QUERY_CHECK_CODE(code, lino, _end); + + setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); doStreamFillRange(pFillSup, pFillInfo, pBlock); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -906,7 +1267,8 @@ static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock uint64_t groupId = groupIds[i]; SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey nextKey = {.groupId = groupId}; - winCode = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL); + code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL, &winCode); + QUERY_CHECK_CODE(code, lino, _end); if (key.ts > endTs) { break; } @@ -970,6 +1332,7 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR } setStreamOperatorCompleted(pOperator); + resetStreamFillSup(pInfo->pFillSup); (*ppRes) = NULL; goto _end; } @@ -1029,7 +1392,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR void* pIte = NULL; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) { - void* tmp = taosArrayPush(pInfo->pUpdated, pIte); + SWinKey* pKey = (SWinKey*)tSimpleHashGetKey(pIte, NULL); + void* tmp = taosArrayPush(pInfo->pUpdated, pKey); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } taosArraySort(pInfo->pUpdated, winKeyCmprImpl); @@ -1040,16 +1404,24 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR } initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); - pInfo->pUpdated = NULL; + pInfo->pUpdated = taosArrayInit(16, sizeof(SWinKey)); + QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno); + code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); tSimpleHashCleanup(pInfo->pUpdatedMap); - pInfo->pUpdatedMap = NULL; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); code = buildTimeSliceResult(pOperator, ppRes); QUERY_CHECK_CODE(code, lino, _end); + if (!(*ppRes)) { + setStreamOperatorCompleted(pOperator); + resetStreamFillSup(pInfo->pFillSup); + } + _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); @@ -1063,6 +1435,55 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) { return pRes; } +static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) { + if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) { + int32_t valueIndex = 0; + for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { + SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; + if (!isInterpFunc(pFillCol->pExpr)) { + continue; + } + int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId; + SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot); + SFillColInfo* pValueCol = pFillSup->pAllColInfo + valueIndex; + SVariant* pVar = &(pValueCol->fillVal); + if (pCell->type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + SET_TYPED_DATA(pCell->pData, pCell->type, v); + } else if (IS_FLOAT_TYPE(pCell->type)) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + SET_TYPED_DATA(pCell->pData, pCell->type, v); + } else if (IS_INTEGER_TYPE(pCell->type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + SET_TYPED_DATA(pCell->pData, pCell->type, v); + } else { + pCell->isNull = true; + } + valueIndex++; + } + } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) { + for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) { + SFillColInfo* pFillCol = pFillSup->pAllColInfo + i; + int32_t slotId = GET_DEST_SLOT_ID(pFillCol); + SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId); + pCell->isNull = true; + } + } +} + +int32_t getDownstreamRes(SOperatorInfo* downstream, SSDataBlock** ppRes) { + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pInfo = (SStreamScanInfo*)downstream->info; + *ppRes = pInfo->pRes; + return TSDB_CODE_SUCCESS; + } + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_FAILED)); + return TSDB_CODE_FAILED; +} + int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** ppOptInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1101,9 +1522,20 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* }; pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId; - code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, - sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, - GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); + + pInfo->pFillSup = NULL; + code = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs, &pInfo->pFillSup); + QUERY_CHECK_CODE(code, lino, _error); + + int32_t ratio = 1; + if (pInfo->pFillSup->type == TSDB_FILL_LINEAR) { + ratio = 2; + } + + code = + initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, sizeof(TSKEY), + 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), + &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SORT, ratio); QUERY_CHECK_CODE(code, lino, _error); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); @@ -1123,8 +1555,8 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->ignoreExpiredData = pInterpPhyNode->streamNodeOption.igExpired; pInfo->ignoreExpiredDataSaved = false; - pInfo->pUpdated = NULL; - pInfo->pUpdatedMap = NULL; + pInfo->pUpdated = taosArrayInit(64, sizeof(SWinKey)); + pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn); pInfo->historyPoints = taosArrayInit(4, sizeof(SWinKey)); QUERY_CHECK_NULL(pInfo->historyPoints, code, lino, _error, terrno); @@ -1135,10 +1567,14 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->destHasPrimaryKey = pInterpPhyNode->streamNodeOption.destHasPrimaryKey; pInfo->numOfDatapack = 0; - pInfo->pFillSup = NULL; - code = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs, &pInfo->pFillSup); + + SSDataBlock* pDownRes = NULL; + code = getDownstreamRes(downstream, &pDownRes); QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pDownRes); + setValueForFillInfo(pInfo->pFillSup, pInfo->pFillInfo); + if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } @@ -1162,7 +1598,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState); if (downstream) { - if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->igCheckUpdate = true; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index af2b45bf13..fc9c8f7513 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -394,7 +394,8 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin doBuildDeleteResultImpl(&pInfo->stateStore, pInfo->pState, pWins, index, pBlock); } -void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index, SSDataBlock* pBlock) { +void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index, + SSDataBlock* pBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; blockDataCleanup(pBlock); @@ -1852,9 +1853,8 @@ _end: } } -int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild, - SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { +int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + int32_t numOfChild, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -2121,8 +2121,8 @@ static TSKEY sesionTs(void* pKey) { int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, - SStorageAPI* pApi, int32_t tsIndex) { - pSup->resultRowSize = keySize + getResultRowSize(pExpSup->pCtx, numOfOutput); + SStorageAPI* pApi, int32_t tsIndex, int8_t stateType, int32_t ratio) { + pSup->resultRowSize = (keySize + getResultRowSize(pExpSup->pCtx, numOfOutput)) * ratio; int32_t code = createSpecialDataBlock(STREAM_CLEAR, &pSup->pScanBlock); if (code) { @@ -2144,9 +2144,15 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in *(pSup->pState) = *pState; pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); - pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( - tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, - pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT); + if (stateType == STREAM_STATE_BUFF_SORT) { + pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( + tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, + pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, stateType); + } else if (stateType == STREAM_STATE_BUFF_HASH_SORT) { + pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( + tsStreamBufferSize, sizeof(SWinKey), pSup->resultRowSize, funResSize, compareTs, pSup->pState, + pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, stateType); + } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pSup->pResultRows = tSimpleHashInit(32, hashFn); @@ -3682,8 +3688,8 @@ _end: } } -int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { +int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, + SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; @@ -3726,9 +3732,10 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode }; pInfo->primaryTsIndex = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, - pTaskInfo->streamInfo.pState, 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); + code = + initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, pSessionNode->gap, pTaskInfo->streamInfo.pState, + 0, 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), + &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_SORT, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3993,8 +4000,8 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild, - SReadHandle* pHandle, SOperatorInfo** pOptrInfo) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, SReadHandle* pHandle, + SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); int32_t code = TSDB_CODE_SUCCESS; @@ -4870,9 +4877,10 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* int32_t keySize = sizeof(SStateKeys) + pColNode->node.resType.bytes; int16_t type = pColNode->node.resType.type; pInfo->primaryTsIndex = tsSlotId; - code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, - type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, - GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); + code = + initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, + &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), + &pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_SORT, 1); QUERY_CHECK_CODE(code, lino, _error); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -5229,7 +5237,7 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->recvGetAll = false; code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); - QUERY_CHECK_CODE(code, lino, _error); + QUERY_CHECK_CODE(code, lino, _error); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); @@ -5643,6 +5651,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } void setStreamOperatorCompleted(SOperatorInfo* pOperator) { - qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); + qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), + getStreamOpName(pOperator->operatorType), pOperator->status); setOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index e55a8f028f..00ec1203ba 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -227,7 +227,7 @@ static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumn return false; } -static bool isInterpFunc(SExprInfo* pExprInfo) { +bool isInterpFunc(SExprInfo* pExprInfo) { int32_t functionType = pExprInfo->pExpr->_function.functionType; return (functionType == FUNCTION_TYPE_INTERP); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index f0a6489a8f..18d17cc702 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -657,6 +657,8 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc CLONE_NODE_LIST_FIELD(pFuncs); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); COPY_SCALAR_FIELD(interval); + COPY_SCALAR_FIELD(intervalUnit); + COPY_SCALAR_FIELD(precision); COPY_SCALAR_FIELD(fillMode); CLONE_NODE_FIELD(pFillValues); CLONE_NODE_FIELD(pTimeSeries); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 308186f2c7..da8168beb1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1272,6 +1272,8 @@ static const char* jkInterpFuncLogicPlanFuncs = "Funcs"; static const char* jkInterpFuncLogicPlanStartTime = "StartTime"; static const char* jkInterpFuncLogicPlanEndTime = "EndTime"; static const char* jkInterpFuncLogicPlanInterval = "Interval"; +static const char* jkInterpFuncLogicPlanIntervalUnit = "IntervalUnit"; +static const char* jkInterpFuncLogicPlanPrecision = "Precision"; static const char* jkInterpFuncLogicPlanFillMode = "fillMode"; static const char* jkInterpFuncLogicPlanFillValues = "FillValues"; static const char* jkInterpFuncLogicPlanTimeSeries = "TimeSeries"; @@ -1293,6 +1295,12 @@ static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanInterval, pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanIntervalUnit, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanPrecision, pNode->precision); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanFillMode, pNode->fillMode); } @@ -1325,6 +1333,12 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkInterpFuncLogicPlanInterval, &pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkInterpFuncLogicPlanIntervalUnit, &pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkInterpFuncLogicPlanPrecision, &pNode->precision); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetTinyIntValue(pJson, jkInterpFuncLogicPlanFillMode, (int8_t*)&pNode->fillMode); } @@ -3211,6 +3225,8 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs"; static const char* jkInterpFuncPhysiPlanStartTime = "StartTime"; static const char* jkInterpFuncPhysiPlanEndTime = "EndTime"; static const char* jkInterpFuncPhysiPlanInterval = "Interval"; +static const char* jkInterpFuncPhysiPlanIntervalUnit = "intervalUnit"; +static const char* jkInterpFuncPhysiPlanPrecision = "precision"; static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; @@ -3235,6 +3251,12 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanIntervalUnit, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanPrecision, pNode->precision); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode); } @@ -3270,6 +3292,12 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkInterpFuncPhysiPlanIntervalUnit, &pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkInterpFuncPhysiPlanPrecision, &pNode->precision); + } if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode, code); } @@ -5708,6 +5736,8 @@ static const char* jkSelectStmtSlimit = "Slimit"; static const char* jkSelectStmtStmtName = "StmtName"; static const char* jkSelectStmtHasAggFuncs = "HasAggFuncs"; static const char* jkSelectStmtInterpFuncs = "HasInterpFuncs"; +static const char* jkSelectStmtInterpFill = "InterpFill"; +static const char* jkSelectStmtInterpEvery = "InterpEvery"; static int32_t selectStmtToJson(const void* pObj, SJson* pJson) { const SSelectStmt* pNode = (const SSelectStmt*)pObj; @@ -5758,6 +5788,12 @@ static int32_t selectStmtToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkSelectStmtInterpFuncs, pNode->hasInterpFunc); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSelectStmtInterpFill, nodeToJson, pNode->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSelectStmtInterpEvery, nodeToJson, pNode->pEvery); + } return code; } @@ -5811,6 +5847,12 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkSelectStmtInterpFuncs, &pNode->hasInterpFunc); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSelectStmtInterpFill, &pNode->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkSelectStmtInterpEvery, &pNode->pEvery); + } return code; } diff --git a/source/libs/stream/src/streamSliceState.c b/source/libs/stream/src/streamSliceState.c index 3a843a881f..5dfc6e03e1 100644 --- a/source/libs/stream/src/streamSliceState.c +++ b/source/libs/stream/src/streamSliceState.c @@ -27,9 +27,13 @@ int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) { return winKeyCmprImpl((SWinKey*)pWin1, pWin2); } -int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen) { - int32_t winCode = TSDB_CODE_SUCCESS; - int32_t code = getRowBuff(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, &winCode); +int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen, + int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + code = getRowBuff(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode); + QUERY_CHECK_CODE(code, lino, _end); SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); @@ -38,23 +42,26 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo pWinStates = (SArray*)(*ppBuff); } else { pWinStates = taosArrayInit(16, sizeof(SWinKey)); - tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno); + + code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); + QUERY_CHECK_CODE(code, lino, _end); } - //recover + // recover if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) { - TSKEY ts = getFlushMark(pFileState); - SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; - void* pState = getStateFileStore(pFileState); + TSKEY ts = getFlushMark(pFileState); + SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX}; + void* pState = getStateFileStore(pFileState); SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start); - int32_t winCode = TSDB_CODE_SUCCESS; - for(int32_t i = 0; i < NUM_OF_FLUSED_WIN && winCode == TSDB_CODE_SUCCESS; i++) { - SWinKey tmp = {.groupId = pKey->groupId}; - winCode = streamStateGetGroupKVByCur_rocksdb(pCur, &tmp, NULL, 0); - if (winCode != TSDB_CODE_SUCCESS) { + for (int32_t i = 0; i < NUM_OF_FLUSED_WIN; i++) { + SWinKey tmpKey = {.groupId = pKey->groupId}; + int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0); + if (tmpRes != TSDB_CODE_SUCCESS) { break; } - taosArrayPush(pWinStates, &tmp); + void* tmp = taosArrayPush(pWinStates, &tmp); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); streamStateCurPrev_rocksdb(pCur); } taosArraySort(pWinStates, winKeyCmprImpl); @@ -65,13 +72,20 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo if (!isFlushedState(pFileState, pKey->ts, 0)) { // find the first position which is smaller than the pKey int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); - if (index == -1) { - index = 0; - } - SWinKey* pTmpKey = taosArrayGet(pWinStates, index); - if (winKeyCmprImpl(pTmpKey, pKey) != 0) { - taosArrayInsert(pWinStates, index, pKey); + if (index >= 0) { + SWinKey* pTmpKey = taosArrayGet(pWinStates, index); + if (winKeyCmprImpl(pTmpKey, pKey) == 0) { + goto _end; + } } + index++; + void* tmp = taosArrayInsert(pWinStates, index, pKey); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; } @@ -91,16 +105,15 @@ void clearSearchBuff(SStreamFileState* pFileState) { if (!pSearchBuff) { return; } + TSKEY flushMark = getFlushMark(pFileState); void* pIte = NULL; int32_t iter = 0; - void* pBuff = getRowStateBuff(pFileState); - while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { + while ((pIte = tSimpleHashIterate(pSearchBuff, pIte, &iter)) != NULL) { SArray* pWinStates = *((void**)pIte); int32_t size = taosArrayGetSize(pWinStates); if (size > 0) { - TSKEY ts = getFlushMark(pFileState); - SWinKey key = *(SWinKey*)taosArrayGet(pWinStates, 0); - key.ts = ts; + int64_t gpId = *(int64_t*)tSimpleHashGetKey(pIte, NULL); + SWinKey key = {.ts = flushMark, .groupId = gpId}; int32_t num = binarySearch(pWinStates, size, &key, fillStateKeyCompare); if (size > NUM_OF_FLUSED_WIN) { num = TMIN(num, size - NUM_OF_FLUSED_WIN); @@ -110,7 +123,9 @@ void clearSearchBuff(SStreamFileState* pFileState) { } } -int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) { +int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, + int32_t* pVLen, int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); void* pState = getStateFileStore(pFileState); @@ -119,7 +134,7 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW pWinStates = (SArray*)(*ppBuff); } else { SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); - int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void **)pVal, pVLen); + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); streamStateFreeCur(pCur); return code; } @@ -127,21 +142,25 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); if (index == -1) { SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); - int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void **)pVal, pVLen); + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); streamStateFreeCur(pCur); return code; } else { if (index == size - 1) { - return TSDB_CODE_FAILED; + (*pWinCode) = TSDB_CODE_FAILED; + return code; } SWinKey* pNext = taosArrayGet(pWinStates, index + 1); *pResKey = *pNext; - return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen); + return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen, pWinCode); } - return TSDB_CODE_FAILED; + (*pWinCode) = TSDB_CODE_FAILED; + return code; } -int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) { +int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, + int32_t* pVLen, int32_t* pWinCode) { + int32_t code = TSDB_CODE_SUCCESS; SArray* pWinStates = NULL; SSHashObj* pSearchBuff = getSearchBuff(pFileState); void* pState = getStateFileStore(pFileState); @@ -150,7 +169,7 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW pWinStates = (SArray*)(*ppBuff); } else { SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); - int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); streamStateFreeCur(pCur); return code; } @@ -158,15 +177,16 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); if (index == -1 || index == 0) { SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); - int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); + (*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); streamStateFreeCur(pCur); return code; } else { SWinKey* pNext = taosArrayGet(pWinStates, index - 1); *pResKey = *pNext; - return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen); + return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen, pWinCode); } - return TSDB_CODE_FAILED; + (*pWinCode) = TSDB_CODE_FAILED; + return code; } void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey) { @@ -190,4 +210,3 @@ void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey) { } } } - diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 8139520e97..7706a6507f 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -232,19 +232,19 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* } // todo refactor -int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { +int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) { if (pState->pFileState) { - return getHashSortRowBuff(pState->pFileState, key, pVal, pVLen); + return getHashSortRowBuff(pState->pFileState, key, pVal, pVLen, pWinCode); } return streamStateFillGet_rocksdb(pState, key, pVal, pVLen); } -int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) { - return getHashSortNextRow(pState->pFileState, pKey, pResKey, pVal, pVLen); +int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { + return getHashSortNextRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); } -int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) { - return getHashSortPrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen); +int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen, int32_t* pWinCode) { + return getHashSortPrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode); } // todo refactor diff --git a/tests/script/tsim/stream/streamInterpLinear0.sim b/tests/script/tsim/stream/streamInterpLinear0.sim new file mode 100644 index 0000000000..5601213483 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpLinear0.sim @@ -0,0 +1,216 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(linear); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,1,1,1,1.1); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop0 +endi + + +sql insert into t1 values(1648791213001,2,2,2,2.1); +sql insert into t1 values(1648791213009,3,3,3,3.1); + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791217001,14,14,14,14.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(linear); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 5 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 8 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 11 then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != 13 then + print ======data41=$data41 + goto loop2 +endi + + +sql insert into t1 values(1648791215001,7,7,7,7.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(linear); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(linear); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + +$loop_count = 0 +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 3 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop3 +endi + +if $data11 != 4 then + print ======data11=$data11 + goto loop3 +endi + +if $data21 != 6 then + print ======data21=$data21 + goto loop3 +endi + +if $data31 != 10 then + print ======data31=$data31 + goto loop3 +endi + +if $data41 != 13 then + print ======data41=$data41 + goto loop3 +endi + +print count0 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpNext0.sim b/tests/script/tsim/stream/streamInterpNext0.sim new file mode 100644 index 0000000000..4bf2f27816 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpNext0.sim @@ -0,0 +1,216 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(next); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,1,1,1,1.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop0 +endi + + +sql insert into t1 values(1648791213001,2,2,2,1.1); +sql insert into t1 values(1648791213009,3,3,3,1.0); + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 4 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 4 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop2 +endi + + +sql insert into t1 values(1648791215001,5,5,5,5.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(next); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(next); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + +$loop_count = 0 +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 3 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop3 +endi + +if $data11 != 5 then + print ======data11=$data11 + goto loop3 +endi + +if $data21 != 5 then + print ======data21=$data21 + goto loop3 +endi + +if $data31 != 4 then + print ======data31=$data31 + goto loop3 +endi + +if $data41 != 4 then + print ======data41=$data41 + goto loop3 +endi + +print count0 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpPrev0.sim b/tests/script/tsim/stream/streamInterpPrev0.sim new file mode 100644 index 0000000000..962e2080fc --- /dev/null +++ b/tests/script/tsim/stream/streamInterpPrev0.sim @@ -0,0 +1,216 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(prev); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,1,1,1,1.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop0 +endi + + +sql insert into t1 values(1648791213001,2,2,2,1.1); +sql insert into t1 values(1648791213009,3,3,3,1.0); + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != 3 then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != 3 then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != 3 then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != 3 then + print ======data41=$data41 + goto loop2 +endi + + +sql insert into t1 values(1648791215001,5,5,5,5.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(prev); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(prev); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + +$loop_count = 0 +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 3 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop3 +endi + +if $data11 != 3 then + print ======data11=$data11 + goto loop3 +endi + +if $data21 != 3 then + print ======data21=$data21 + goto loop3 +endi + +if $data31 != 5 then + print ======data31=$data31 + goto loop3 +endi + +if $data41 != 5 then + print ======data41=$data41 + goto loop3 +endi + +print count0 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/streamInterpValue0.sim b/tests/script/tsim/stream/streamInterpValue0.sim new file mode 100644 index 0000000000..dedcf9a1d9 --- /dev/null +++ b/tests/script/tsim/stream/streamInterpValue0.sim @@ -0,0 +1,361 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print step1 +print =============== create database +sql create database test vgroups 1; +sql use test; + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(NULL); +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 every(1s) fill(value, 10,20,30,40); + +run tsim/stream/checkTaskStatus.sim + + +sql insert into t1 values(1648791213000,1,1,1,1.0); + +$loop_count = 0 + +loop0: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 0 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop0 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop0 +endi + + +sql insert into t1 values(1648791213001,2,2,2,1.1); +sql insert into t1 values(1648791213009,3,3,3,1.0); + +$loop_count = 0 + +loop1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 1 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 + +# row 0 +if $rows != 1 then + print ======rows=$rows + goto loop1 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop1 +endi + + +sql insert into t1 values(1648791217001,4,4,4,4.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +$loop_count = 0 +loop2: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop2 +endi + +if $data11 != NULL then + print ======data11=$data11 + goto loop2 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop2 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop2 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop2 +endi + + + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(value, 10,20,30,40); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(value, 10,20,30,40); + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +$loop_count = 0 +loop2_1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 2 sql select * from streamt2; +sql select * from streamt2; + +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data30 $data31 $data32 $data33 +print $data40 $data41 $data42 $data43 +print $data50 $data51 $data52 $data53 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop2_1 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop2_1 +endi + +if $data11 != 10 then + print ======data11=$data11 + goto loop2_1 +endi + +if $data21 != 10 then + print ======data21=$data21 + goto loop2_1 +endi + +if $data31 != 10 then + print ======data31=$data31 + goto loop2_1 +endi + +if $data41 != 10 then + print ======data41=$data41 + goto loop2_1 +endi + +sql insert into t1 values(1648791215001,5,5,5,5.1); + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(NULL); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(NULL); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + +$loop_count = 0 +loop3: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 3 sql select * from streamt; +sql select * from streamt; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop3 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop3 +endi + +if $data11 != NULL then + print ======data11=$data11 + goto loop3 +endi + +if $data21 != NULL then + print ======data21=$data21 + goto loop3 +endi + +if $data31 != NULL then + print ======data31=$data31 + goto loop3 +endi + +if $data41 != NULL then + print ======data41=$data41 + goto loop3 +endi + +print sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(value, 10,20,30,40); +sql select _irowts, interp(a), interp(b), interp(c), interp(d) from t1 range(1648791213000, 1648791217001) every(1s) fill(value, 10,20,30,40); + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +$loop_count = 0 +loop3_1: + +sleep 300 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +print 3 sql select * from streamt2; +sql select * from streamt2; + +print $data00 $data01 $data02 $data03 $data04 +print $data10 $data11 $data12 $data13 $data14 +print $data20 $data21 $data22 $data23 $data24 +print $data30 $data31 $data32 $data33 $data34 +print $data40 $data41 $data42 $data43 $data44 +print $data50 $data51 $data52 $data53 $data54 + + +# row 0 +if $rows != 5 then + print ======rows=$rows + goto loop3_1 +endi + +# row 0 +if $data01 != 1 then + print ======data01=$data01 + goto loop3_1 +endi + +if $data11 != 10 then + print ======data11=$data11 + goto loop3_1 +endi + +if $data21 != 10 then + print ======data21=$data21 + goto loop3_1 +endi + +if $data31 != 10 then + print ======data31=$data31 + goto loop3_1 +endi + +if $data41 != 10 then + print ======data41=$data41 + goto loop3_1 +endi + +if $data12 != 20 then + print ======data12=$data12 + goto loop3_1 +endi + +if $data13 != 30 then + print ======data13=$data13 + goto loop3_1 +endi + +if $data14 != 40.000000000 then + print ======data14=$data14 + goto loop3_1 +endi + +print count0 end +system sh/exec.sh -n dnode1 -s stop -x SIGINT