From fe9d61b243fcefd61db0fd7b69a187e80a9abfc4 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 29 Jul 2024 14:14:34 +0800 Subject: [PATCH] add node to json --- include/libs/nodes/plannodes.h | 9 +- source/libs/executor/inc/streamexecutorInt.h | 3 +- .../executor/src/streamtimesliceoperator.c | 38 +++++--- .../executor/src/streamtimewindowoperator.c | 2 +- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 86 +++++++++++++++++++ source/libs/planner/src/planLogicCreater.c | 4 +- source/libs/planner/src/planPhysiCreater.c | 2 +- 8 files changed, 121 insertions(+), 24 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index cc09ceb858..509710707d 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -194,14 +194,14 @@ typedef struct SIndefRowsFuncLogicNode { bool isTimeLineFunc; } SIndefRowsFuncLogicNode; -typedef struct SStreamOption { +typedef struct SStreamNodeOption { int8_t triggerType; int64_t watermark; int64_t deleteMark; int8_t igExpired; int8_t igCheckUpdate; int8_t destHasPrimaryKey; -} SStreamOption; +} SStreamNodeOption; typedef struct SInterpFuncLogicNode { SLogicNode node; @@ -211,8 +211,7 @@ typedef struct SInterpFuncLogicNode { EFillMode fillMode; SNode* pFillValues; // SNodeListNode SNode* pTimeSeries; // SColumnNode - //todo(liuyao) 补充clone和json等 - SStreamOption streamOption; + SStreamNodeOption streamNodeOption; } SInterpFuncLogicNode; typedef struct SGroupCacheLogicNode { @@ -516,7 +515,7 @@ typedef struct SInterpFuncPhysiNode { EFillMode fillMode; SNode* pFillValues; // SNodeListNode SNode* pTimeSeries; // SColumnNode - SStreamOption streamOption; //todo(liuyao) 补充clone和json等 + SStreamNodeOption streamNodeOption; } SInterpFuncPhysiNode; typedef SInterpFuncPhysiNode SStreamInterpFuncPhysiNode; diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 0ab32cf838..8332fb3c0d 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -31,7 +31,7 @@ void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); -int64_t getDeleteMarkFromOption(SStreamOption* pOption); +int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption); void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins); int32_t copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins); bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo); @@ -55,6 +55,7 @@ void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SS int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell); bool hasRemainCalc(SStreamFillInfo* pFillInfo); void destroySPoint(void* ptr); +void destroyStreamFillInfo(SStreamFillInfo* pFillInfo); int winPosCmprImpl(const void* pKey1, const void* pKey2); diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index 695bec7fff..40ac0da332 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -47,21 +47,31 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) { void destroyStreamTimeSliceOperatorInfo(void* param) { SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param; - clearGroupResInfo(&pInfo->groupResInfo); + colDataDestroy(&pInfo->twAggSup.timeWindowData); + destroyStreamAggSupporter(&pInfo->streamAggSup); + destroyStreamFillSupporter(pInfo->pFillSup); + destroyStreamFillInfo(pInfo->pFillInfo); + blockDataDestroy(pInfo->pRes); + blockDataDestroy(pInfo->pDelRes); + blockDataDestroy(pInfo->pCheckpointRes); + + taosMemoryFreeClear(pInfo->leftRow.pRowVal); + taosMemoryFreeClear(pInfo->valueRow.pRowVal); + taosMemoryFreeClear(pInfo->rightRow.pRowVal); + + cleanupExprSupp(&pInfo->scalarSup); + taosArrayDestroy(pInfo->historyPoints); + taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); pInfo->pUpdated = NULL; - taosArrayDestroy(pInfo->pDelWins); - blockDataDestroy(pInfo->pDelRes); - destroyStreamAggSupporter(&pInfo->streamAggSup); - - colDataDestroy(&pInfo->twAggSup.timeWindowData); tSimpleHashCleanup(pInfo->pUpdatedMap); pInfo->pUpdatedMap = NULL; - tSimpleHashCleanup(pInfo->pDeletedMap); - blockDataDestroy(pInfo->pCheckpointRes); - // todo(liuyao) 看是否有遗漏 + taosArrayDestroy(pInfo->pDelWins); + tSimpleHashCleanup(pInfo->pDeletedMap); + clearGroupResInfo(&pInfo->groupResInfo); + taosMemoryFreeClear(param); } @@ -696,11 +706,11 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); pInfo->twAggSup = (STimeWindowAggSupp){ - .waterMark = pInterpPhyNode->streamOption.watermark, - .calTrigger = pInterpPhyNode->streamOption.triggerType, + .waterMark = pInterpPhyNode->streamNodeOption.watermark, + .calTrigger = pInterpPhyNode->streamNodeOption.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, - .deleteMark = getDeleteMarkFromOption(&pInterpPhyNode->streamOption), + .deleteMark = getDeleteMarkFromOption(&pInterpPhyNode->streamNodeOption), }; pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId; @@ -721,7 +731,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); - pInfo->ignoreExpiredData = pInterpPhyNode->streamOption.igExpired; + pInfo->ignoreExpiredData = pInterpPhyNode->streamNodeOption.igExpired; pInfo->ignoreExpiredDataSaved = false; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; @@ -733,7 +743,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); QUERY_CHECK_CODE(code, lino, _error); - pInfo->destHasPrimaryKey = pInterpPhyNode->streamOption.destHasPrimaryKey; + pInfo->destHasPrimaryKey = pInterpPhyNode->streamNodeOption.destHasPrimaryKey; pInfo->numOfDatapack = 0; pInfo->pFillSup = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index b0609f4d0d..42d6e50e2a 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1769,7 +1769,7 @@ int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) { return deleteMark; } -int64_t getDeleteMarkFromOption(SStreamOption* pOption) { +int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption) { if (pOption->deleteMark <= 0) { return DEAULT_DELETE_MARK; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 83947a70de..f0a6489a8f 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -660,6 +660,7 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc COPY_SCALAR_FIELD(fillMode); CLONE_NODE_FIELD(pFillValues); CLONE_NODE_FIELD(pTimeSeries); + COPY_OBJECT_FIELD(streamNodeOption, sizeof(SStreamNodeOption)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 1149eef653..0b3dcdea8d 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1217,10 +1217,65 @@ static int32_t jsonToLogicIndefRowsFuncNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkStreamOption_triggerType = "StreamOptionTriggerType"; +static const char* jkStreamOption_watermark = "StreamOptionWatermark"; +static const char* jkStreamOption_deleteMark = "StreamOptionDeleteMark"; +static const char* jkStreamOption_igExpired = "StreamOptionIgExpired"; +static const char* jkStreamOption_igCheckUpdate = "StreamOption_igCheckUpdate"; +static const char* jkStreamOption_destHasPrimaryKey = "StreamOptionDestHasPrimaryKey"; + +static int32_t streamNodeOptionToJson(const void* pObj, SJson* pJson) { + const SStreamNodeOption* pNode = (const SStreamNodeOption*)pObj; + int32_t code = tjsonAddIntegerToObject(pJson, jkStreamOption_triggerType, pNode->triggerType); + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkStreamOption_watermark, pNode->watermark); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkStreamOption_deleteMark, pNode->deleteMark); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkStreamOption_igExpired, pNode->igExpired); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkStreamOption_igCheckUpdate, pNode->igCheckUpdate); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkStreamOption_destHasPrimaryKey, pNode->destHasPrimaryKey); + } + return code; +} + +static int32_t jsonToStreamNodeOption(const SJson* pJson, void* pObj) { + SStreamNodeOption* pNode = (SStreamNodeOption*)pObj; + int32_t code = tjsonGetTinyIntValue(pJson, jkStreamOption_triggerType, &pNode->triggerType); + + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkStreamOption_watermark, &pNode->watermark); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkStreamOption_deleteMark, &pNode->deleteMark); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkStreamOption_igExpired, &pNode->igExpired); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkStreamOption_igCheckUpdate, &pNode->igCheckUpdate); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkStreamOption_destHasPrimaryKey, &pNode->destHasPrimaryKey); + } + return code; +} + static const char* jkInterpFuncLogicPlanFuncs = "Funcs"; static const char* jkInterpFuncLogicPlanStartTime = "StartTime"; static const char* jkInterpFuncLogicPlanEndTime = "EndTime"; static const char* jkInterpFuncLogicPlanInterval = "Interval"; +static const char* jkInterpFuncLogicPlanFillMode = "fillMode"; +static const char* jkInterpFuncLogicPlanFillValues = "FillValues"; +static const char* jkInterpFuncLogicPlanTimeSeries = "TimeSeries"; +static const char* jkInterpFuncLogicPlanStreamNodeOption = "StreamNodeOption"; static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncLogicNode* pNode = (const SInterpFuncLogicNode*)pObj; @@ -1238,6 +1293,18 @@ static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanInterval, pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanFillMode, pNode->fillMode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncLogicPlanFillValues, nodeToJson, pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncLogicPlanTimeSeries, nodeToJson, pNode->pTimeSeries); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption); + } return code; } @@ -1258,6 +1325,18 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkInterpFuncLogicPlanInterval, &pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkInterpFuncLogicPlanFillMode, (int8_t*)&pNode->fillMode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkInterpFuncLogicPlanFillValues, jsonToNode, pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkInterpFuncLogicPlanTimeSeries, jsonToNode, pNode->pTimeSeries); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption); + } return code; } @@ -3135,6 +3214,7 @@ static const char* jkInterpFuncPhysiPlanInterval = "Interval"; static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; +static const char* jkInterpFuncPhysiPlanStreamNodeOption = "StreamNodeOption"; static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; @@ -3164,6 +3244,9 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption); + } return code; } @@ -3196,6 +3279,9 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption); + } return code; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index d3f328943c..23e5d9c902 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -918,7 +918,7 @@ static bool isInterpFunc(int32_t funcId) { return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId) || fmisSelectGroupConstValueFunc(funcId); } -static void initStreamOption(SLogicPlanContext* pCxt, SStreamOption* pOption) { +static void initStreamOption(SLogicPlanContext* pCxt, SStreamNodeOption* pOption) { pOption->triggerType = pCxt->pPlanCxt->triggerType; pOption->watermark = pCxt->pPlanCxt->watermark; pOption->deleteMark = pCxt->pPlanCxt->deleteMark; @@ -969,7 +969,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p } if (TSDB_CODE_SUCCESS == code) { - initStreamOption(pCxt, &pInterpFunc->streamOption); + initStreamOption(pCxt, &pInterpFunc->streamNodeOption); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index ae28d44f1d..26c9710876 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1867,7 +1867,7 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh } if (pCxt->pPlanCxt->streamQuery) { - pInterpFunc->streamOption = pFuncLogicNode->streamOption; + pInterpFunc->streamNodeOption = pFuncLogicNode->streamNodeOption; } if (TSDB_CODE_SUCCESS == code) {