From 745f14d140dab40b50a3352e921b79bc86f660e5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 31 Jul 2024 08:57:42 +0800 Subject: [PATCH] stream interp --- source/libs/executor/src/executor.c | 16 +++++ .../executor/src/streamtimesliceoperator.c | 59 +++++++++++++------ source/libs/parser/src/parTranslater.c | 28 ++++++--- 3 files changed, 75 insertions(+), 28 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3ee08d8fb0..bb2da00657 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1096,6 +1096,22 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + pSup->calTriggerSaved = pSup->calTrigger; + pSup->deleteMarkSaved = pSup->deleteMark; + pSup->calTrigger = STREAM_TRIGGER_AT_ONCE; + pSup->deleteMark = INT64_MAX; + pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; + pInfo->ignoreExpiredData = false; + qInfo("save stream task:%s, param for state: %d", GET_TASKID(pTaskInfo), pInfo->ignoreExpiredData); + } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC) { + SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; + STimeWindowAggSupp* pSup = &pInfo->twAggSup; + + ASSERT(pSup->calTrigger == STREAM_TRIGGER_AT_ONCE || pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); + ASSERT(pSup->calTriggerSaved == 0 && pSup->deleteMarkSaved == 0); + + qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); + pSup->calTriggerSaved = pSup->calTrigger; pSup->deleteMarkSaved = pSup->deleteMark; pSup->calTrigger = STREAM_TRIGGER_AT_ONCE; diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 883ab4609f..f49e45457c 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -99,11 +99,7 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) { pInfo->pUpdatedMap = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno); } - if (!pInfo->pDeletedMap && num > 0) { - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - pInfo->pDeletedMap = tSimpleHashInit(64, hashFn); - QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _end, terrno); - } + for (int32_t i = 0; i < num; i++) { SWinKey* pKey = pKeyBuf + i; qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pKey->ts, pKey->groupId, @@ -506,8 +502,9 @@ static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts, int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint, - SSlicePoint* pNextPoint) { - void* pState = pAggSup->pState; + SSlicePoint* pNextPoint, int32_t* pWinCode) { + int32_t tmpRes = TSDB_CODE_SUCCESS; + void* pState = pAggSup->pState; resetPrevAndNextWindow(pFillSup); pCurPoint->pResPos = NULL; pPrevPoint->pResPos = NULL; @@ -516,7 +513,7 @@ static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSuppo pCurPoint->key.groupId = groupId; pCurPoint->key.ts = ts; int32_t curVLen = 0; - int32_t code = pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen); + (*pWinCode) = pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen); pCurPoint->pLeftRow = pCurPoint->pResPos->pRowBuff; if (pFillSup->type == TSDB_FILL_LINEAR) { pCurPoint->pRightRow = POINTER_SHIFT(pCurPoint->pResPos->pRowBuff, pFillSup->rowSize); @@ -533,9 +530,9 @@ static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSuppo if (HAS_NON_ROW_DATA(pCurPoint->pLeftRow)) { pPrevPoint->key.groupId = groupId; int32_t preVLen = 0; - code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key, - (void**)&pPrevPoint->pResPos, &preVLen); - if (code == TSDB_CODE_SUCCESS) { + tmpRes = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key, + (void**)&pPrevPoint->pResPos, &preVLen); + if (tmpRes == TSDB_CODE_SUCCESS) { pFillSup->prev.key = pPrevPoint->key.ts; pFillSup->prev.pRowVal = pPrevPoint->pResPos->pRowBuff; } @@ -548,9 +545,9 @@ static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSuppo if (HAS_NON_ROW_DATA(pCurPoint->pRightRow)) { pNextPoint->key.groupId = groupId; int32_t nextVLen = 0; - code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key, - (void**)&pNextPoint->pResPos, &nextVLen); - if (code == TSDB_CODE_SUCCESS) { + tmpRes = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key, + (void**)&pNextPoint->pResPos, &nextVLen); + if (tmpRes == TSDB_CODE_SUCCESS) { pFillSup->next.key = pNextPoint->key.ts; pFillSup->next.pRowVal = pNextPoint->pResPos->pRowBuff; } @@ -696,6 +693,7 @@ static bool needAdjValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fil } static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) { + int32_t winCode = TSDB_CODE_SUCCESS; SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; @@ -749,11 +747,14 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) SSlicePoint nextPoint = {0}; bool left = false; bool right = false; - getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint); + getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint, &winCode); right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type); if (right) { transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap); + if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { + saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap); + } } releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); @@ -766,6 +767,9 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) if (left) { transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow); saveTimeSliceWinResult(&nextPoint.key, pInfo->pUpdatedMap); + if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { + saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap); + } } releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore); @@ -774,12 +778,15 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) break; } curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC); - getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint); + getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint, &winCode); right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type); if (right) { transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow); saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap); + if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) { + saveTimeSliceWinResult(&curPoint.key, pInfo->pDeletedMap); + } } releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); } @@ -807,7 +814,8 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId}; SSlicePoint prevPoint = {0}; SSlicePoint nextPoint = {0}; - getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint); + int32_t winCode = TSDB_CODE_SUCCESS; + getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint, &winCode); setTimeSliceFillInfo(pFillSup, pFillInfo, pKey->ts); doStreamFillRange(pFillSup, pFillInfo, pBlock); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); @@ -879,7 +887,10 @@ _end: return code; } -static void doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap) { +static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap, + SArray* pDelWins) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t winCode = TSDB_CODE_SUCCESS; SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); @@ -900,6 +911,9 @@ static void doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* p break; } (void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey)); + void* tmp = taosArrayPush(pDelWins, &key); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + pAggSup->stateStore.streamStateDel(pAggSup->pState, &key); if (winCode != TSDB_CODE_SUCCESS) { break; @@ -907,6 +921,12 @@ static void doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* p key = nextKey; } } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { @@ -971,7 +991,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR switch (pBlock->info.type) { case STREAM_DELETE_RESULT: { - doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap); + code = doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap, pInfo->pDelWins); + QUERY_CHECK_CODE(code, lino, _end); } break; case STREAM_NORMAL: case STREAM_INVALID: { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0fd1121aaa..bb770387f1 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5736,15 +5736,20 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { } } - if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) { - if (pSelect->pRange != NULL && QUERY_NODE_OPERATOR == nodeType(pSelect->pRange) && pSelect->pEvery == NULL) { - // single point interp every can be omitted - } else { - if (pCxt->createStream) { - if (NULL == pSelect->pEvery || NULL == pSelect->pFill) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, - "Missing EVERY clause or FILL clause"); - } + if (pCxt->createStream) { + if (NULL != pSelect->pRange) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream Unsupported RANGE clause"); + } + + if (NULL == pSelect->pEvery || NULL == pSelect->pFill) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Missing EVERY clause or FILL clause"); + } + } else { + if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) { + if (pSelect->pRange != NULL && QUERY_NODE_OPERATOR == nodeType(pSelect->pRange) && pSelect->pEvery == NULL) { + // single point interp every can be omitted } else { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE, "Missing RANGE clause, EVERY clause or FILL clause"); @@ -10182,6 +10187,11 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm } } + if (pStmt->pOptions->fillHistory && pSelect->hasInterpFunc) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Stream interp unsupported Fill history"); + } + return TSDB_CODE_SUCCESS; }