add node to json

This commit is contained in:
54liuyao 2024-07-29 14:14:34 +08:00
parent 80003a4f90
commit fe9d61b243
8 changed files with 121 additions and 24 deletions

View File

@ -194,14 +194,14 @@ typedef struct SIndefRowsFuncLogicNode {
bool isTimeLineFunc; bool isTimeLineFunc;
} SIndefRowsFuncLogicNode; } SIndefRowsFuncLogicNode;
typedef struct SStreamOption { typedef struct SStreamNodeOption {
int8_t triggerType; int8_t triggerType;
int64_t watermark; int64_t watermark;
int64_t deleteMark; int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
int8_t igCheckUpdate; int8_t igCheckUpdate;
int8_t destHasPrimaryKey; int8_t destHasPrimaryKey;
} SStreamOption; } SStreamNodeOption;
typedef struct SInterpFuncLogicNode { typedef struct SInterpFuncLogicNode {
SLogicNode node; SLogicNode node;
@ -211,8 +211,7 @@ typedef struct SInterpFuncLogicNode {
EFillMode fillMode; EFillMode fillMode;
SNode* pFillValues; // SNodeListNode SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode SNode* pTimeSeries; // SColumnNode
//todo(liuyao) 补充clone和json等 SStreamNodeOption streamNodeOption;
SStreamOption streamOption;
} SInterpFuncLogicNode; } SInterpFuncLogicNode;
typedef struct SGroupCacheLogicNode { typedef struct SGroupCacheLogicNode {
@ -516,7 +515,7 @@ typedef struct SInterpFuncPhysiNode {
EFillMode fillMode; EFillMode fillMode;
SNode* pFillValues; // SNodeListNode SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode SNode* pTimeSeries; // SColumnNode
SStreamOption streamOption; //todo(liuyao) 补充clone和json等 SStreamNodeOption streamNodeOption;
} SInterpFuncPhysiNode; } SInterpFuncPhysiNode;
typedef SInterpFuncPhysiNode SStreamInterpFuncPhysiNode; typedef SInterpFuncPhysiNode SStreamInterpFuncPhysiNode;

View File

@ -31,7 +31,7 @@ void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo); void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
int64_t getDeleteMarkFromOption(SStreamOption* pOption); int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption);
void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins); void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins);
int32_t copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins); int32_t copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins);
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo); bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo);
@ -55,6 +55,7 @@ void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SS
int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell); int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell);
bool hasRemainCalc(SStreamFillInfo* pFillInfo); bool hasRemainCalc(SStreamFillInfo* pFillInfo);
void destroySPoint(void* ptr); void destroySPoint(void* ptr);
void destroyStreamFillInfo(SStreamFillInfo* pFillInfo);
int winPosCmprImpl(const void* pKey1, const void* pKey2); int winPosCmprImpl(const void* pKey1, const void* pKey2);

View File

@ -47,21 +47,31 @@ void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
void destroyStreamTimeSliceOperatorInfo(void* param) { void destroyStreamTimeSliceOperatorInfo(void* param) {
SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param; SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param;
clearGroupResInfo(&pInfo->groupResInfo); colDataDestroy(&pInfo->twAggSup.timeWindowData);
destroyStreamAggSupporter(&pInfo->streamAggSup);
destroyStreamFillSupporter(pInfo->pFillSup);
destroyStreamFillInfo(pInfo->pFillInfo);
blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pCheckpointRes);
taosMemoryFreeClear(pInfo->leftRow.pRowVal);
taosMemoryFreeClear(pInfo->valueRow.pRowVal);
taosMemoryFreeClear(pInfo->rightRow.pRowVal);
cleanupExprSupp(&pInfo->scalarSup);
taosArrayDestroy(pInfo->historyPoints);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos); taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes);
destroyStreamAggSupporter(&pInfo->streamAggSup);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
tSimpleHashCleanup(pInfo->pUpdatedMap); tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
tSimpleHashCleanup(pInfo->pDeletedMap);
blockDataDestroy(pInfo->pCheckpointRes); taosArrayDestroy(pInfo->pDelWins);
// todo(liuyao) 看是否有遗漏 tSimpleHashCleanup(pInfo->pDeletedMap);
clearGroupResInfo(&pInfo->groupResInfo);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -696,11 +706,11 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->twAggSup = (STimeWindowAggSupp){ pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pInterpPhyNode->streamOption.watermark, .waterMark = pInterpPhyNode->streamNodeOption.watermark,
.calTrigger = pInterpPhyNode->streamOption.triggerType, .calTrigger = pInterpPhyNode->streamNodeOption.triggerType,
.maxTs = INT64_MIN, .maxTs = INT64_MIN,
.minTs = INT64_MAX, .minTs = INT64_MAX,
.deleteMark = getDeleteMarkFromOption(&pInterpPhyNode->streamOption), .deleteMark = getDeleteMarkFromOption(&pInterpPhyNode->streamNodeOption),
}; };
pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId;
@ -721,7 +731,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn); pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
pInfo->ignoreExpiredData = pInterpPhyNode->streamOption.igExpired; pInfo->ignoreExpiredData = pInterpPhyNode->streamNodeOption.igExpired;
pInfo->ignoreExpiredDataSaved = false; pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL; pInfo->pUpdatedMap = NULL;
@ -733,7 +743,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes); code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
pInfo->destHasPrimaryKey = pInterpPhyNode->streamOption.destHasPrimaryKey; pInfo->destHasPrimaryKey = pInterpPhyNode->streamNodeOption.destHasPrimaryKey;
pInfo->numOfDatapack = 0; pInfo->numOfDatapack = 0;
pInfo->pFillSup = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs); pInfo->pFillSup = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs);

View File

@ -1769,7 +1769,7 @@ int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
return deleteMark; return deleteMark;
} }
int64_t getDeleteMarkFromOption(SStreamOption* pOption) { int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption) {
if (pOption->deleteMark <= 0) { if (pOption->deleteMark <= 0) {
return DEAULT_DELETE_MARK; return DEAULT_DELETE_MARK;
} }

View File

@ -660,6 +660,7 @@ static int32_t logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFunc
COPY_SCALAR_FIELD(fillMode); COPY_SCALAR_FIELD(fillMode);
CLONE_NODE_FIELD(pFillValues); CLONE_NODE_FIELD(pFillValues);
CLONE_NODE_FIELD(pTimeSeries); CLONE_NODE_FIELD(pTimeSeries);
COPY_OBJECT_FIELD(streamNodeOption, sizeof(SStreamNodeOption));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -1217,10 +1217,65 @@ static int32_t jsonToLogicIndefRowsFuncNode(const SJson* pJson, void* pObj) {
return code; return code;
} }
static const char* jkStreamOption_triggerType = "StreamOptionTriggerType";
static const char* jkStreamOption_watermark = "StreamOptionWatermark";
static const char* jkStreamOption_deleteMark = "StreamOptionDeleteMark";
static const char* jkStreamOption_igExpired = "StreamOptionIgExpired";
static const char* jkStreamOption_igCheckUpdate = "StreamOption_igCheckUpdate";
static const char* jkStreamOption_destHasPrimaryKey = "StreamOptionDestHasPrimaryKey";
static int32_t streamNodeOptionToJson(const void* pObj, SJson* pJson) {
const SStreamNodeOption* pNode = (const SStreamNodeOption*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkStreamOption_triggerType, pNode->triggerType);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkStreamOption_watermark, pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkStreamOption_deleteMark, pNode->deleteMark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkStreamOption_igExpired, pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkStreamOption_igCheckUpdate, pNode->igCheckUpdate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkStreamOption_destHasPrimaryKey, pNode->destHasPrimaryKey);
}
return code;
}
static int32_t jsonToStreamNodeOption(const SJson* pJson, void* pObj) {
SStreamNodeOption* pNode = (SStreamNodeOption*)pObj;
int32_t code = tjsonGetTinyIntValue(pJson, jkStreamOption_triggerType, &pNode->triggerType);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkStreamOption_watermark, &pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkStreamOption_deleteMark, &pNode->deleteMark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkStreamOption_igExpired, &pNode->igExpired);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkStreamOption_igCheckUpdate, &pNode->igCheckUpdate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkStreamOption_destHasPrimaryKey, &pNode->destHasPrimaryKey);
}
return code;
}
static const char* jkInterpFuncLogicPlanFuncs = "Funcs"; static const char* jkInterpFuncLogicPlanFuncs = "Funcs";
static const char* jkInterpFuncLogicPlanStartTime = "StartTime"; static const char* jkInterpFuncLogicPlanStartTime = "StartTime";
static const char* jkInterpFuncLogicPlanEndTime = "EndTime"; static const char* jkInterpFuncLogicPlanEndTime = "EndTime";
static const char* jkInterpFuncLogicPlanInterval = "Interval"; static const char* jkInterpFuncLogicPlanInterval = "Interval";
static const char* jkInterpFuncLogicPlanFillMode = "fillMode";
static const char* jkInterpFuncLogicPlanFillValues = "FillValues";
static const char* jkInterpFuncLogicPlanTimeSeries = "TimeSeries";
static const char* jkInterpFuncLogicPlanStreamNodeOption = "StreamNodeOption";
static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) { static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
const SInterpFuncLogicNode* pNode = (const SInterpFuncLogicNode*)pObj; const SInterpFuncLogicNode* pNode = (const SInterpFuncLogicNode*)pObj;
@ -1238,6 +1293,18 @@ static int32_t logicInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanInterval, pNode->interval); code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanInterval, pNode->interval);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkInterpFuncLogicPlanFillMode, pNode->fillMode);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncLogicPlanFillValues, nodeToJson, pNode->pFillValues);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncLogicPlanTimeSeries, nodeToJson, pNode->pTimeSeries);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption);
}
return code; return code;
} }
@ -1258,6 +1325,18 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkInterpFuncLogicPlanInterval, &pNode->interval); code = tjsonGetBigIntValue(pJson, jkInterpFuncLogicPlanInterval, &pNode->interval);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkInterpFuncLogicPlanFillMode, (int8_t*)&pNode->fillMode);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkInterpFuncLogicPlanFillValues, jsonToNode, pNode->pFillValues);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkInterpFuncLogicPlanTimeSeries, jsonToNode, pNode->pTimeSeries);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
}
return code; return code;
} }
@ -3135,6 +3214,7 @@ static const char* jkInterpFuncPhysiPlanInterval = "Interval";
static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; static const char* jkInterpFuncPhysiPlanFillMode = "FillMode";
static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; static const char* jkInterpFuncPhysiPlanFillValues = "FillValues";
static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries";
static const char* jkInterpFuncPhysiPlanStreamNodeOption = "StreamNodeOption";
static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj;
@ -3164,6 +3244,9 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries); code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, streamNodeOptionToJson, &pNode->streamNodeOption);
}
return code; return code;
} }
@ -3196,6 +3279,9 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries); code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkInterpFuncPhysiPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
}
return code; return code;
} }

View File

@ -918,7 +918,7 @@ static bool isInterpFunc(int32_t funcId) {
return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId) || fmisSelectGroupConstValueFunc(funcId); return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId) || fmisSelectGroupConstValueFunc(funcId);
} }
static void initStreamOption(SLogicPlanContext* pCxt, SStreamOption* pOption) { static void initStreamOption(SLogicPlanContext* pCxt, SStreamNodeOption* pOption) {
pOption->triggerType = pCxt->pPlanCxt->triggerType; pOption->triggerType = pCxt->pPlanCxt->triggerType;
pOption->watermark = pCxt->pPlanCxt->watermark; pOption->watermark = pCxt->pPlanCxt->watermark;
pOption->deleteMark = pCxt->pPlanCxt->deleteMark; pOption->deleteMark = pCxt->pPlanCxt->deleteMark;
@ -969,7 +969,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
initStreamOption(pCxt, &pInterpFunc->streamOption); initStreamOption(pCxt, &pInterpFunc->streamNodeOption);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {

View File

@ -1867,7 +1867,7 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
} }
if (pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->streamQuery) {
pInterpFunc->streamOption = pFuncLogicNode->streamOption; pInterpFunc->streamNodeOption = pFuncLogicNode->streamNodeOption;
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {