From 780644c135a2ac9c05c3693363f8eade63a46536 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 27 May 2022 23:21:41 +0800 Subject: [PATCH] feat:add encode/decode for operator --- source/libs/executor/inc/executorimpl.h | 3 ++ source/libs/executor/src/executorimpl.c | 60 +++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 300149a22d..261e482048 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -778,6 +778,9 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); int32_t getMaximumIdleDurationSec(); +int32_t encodeExecTaskInfo(SOperatorInfo* ops, char** data); +int32_t decodeExecTaskInfo(SOperatorInfo* ops, char* data); + void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 684c657d17..e65ba96b53 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4954,6 +4954,66 @@ _error: return NULL; } +int32_t encodeExecTaskInfo(SOperatorInfo* ops, char** result){ + uint8_t operatorType = ops->operatorType; + + int32_t num = 0; + int32_t size = ops->numOfDownstream; + + for (int32_t i = 0; i < size; ++i) { + encodeExecTaskInfo(ops->pDownstream[i], result); + } + + SOperatorInfo* pOptr = NULL; + if (QUERY_NODE_PHYSICAL_PLAN_AGG == operatorType) { + if(ops->fpSet.encodeResultRow){ + int32_t length = 0; + SAggOperatorInfo* pAggInfo = ops->info; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; + SAggSupporter *pSup = &pAggInfo->aggSup; + ops->fpSet.encodeResultRow(ops, pSup, pInfo, result, &length); + } + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == operatorType || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == operatorType) { + + } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == operatorType) { + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == operatorType) { + } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == operatorType) { + } + + return 0; +} + +int32_t decodeExecTaskInfo(SOperatorInfo* ops, char* result){ + uint8_t operatorType = ops->operatorType; + + int32_t num = 0; + int32_t size = ops->numOfDownstream; + + for (int32_t i = 0; i < size; ++i) { + decodeExecTaskInfo(ops->pDownstream[i], result); + } + + SOperatorInfo* pOptr = NULL; + if (QUERY_NODE_PHYSICAL_PLAN_AGG == operatorType) { + if(ops->fpSet.decodeResultRow){ + int32_t length = 0; + SAggOperatorInfo* pAggInfo = ops->info; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; + SAggSupporter *pSup = &pAggInfo->aggSup; + taosHashClear(pSup->pResultRowHashTable); + pInfo->resultRowInfo.size = 0; + ops->fpSet.decodeResultRow(ops, pSup, pInfo, result, length); + } + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == operatorType || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == operatorType) { + + } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == operatorType) { + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == operatorType) { + } else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == operatorType) { + } + + return 0; +} + int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model) { uint64_t queryId = pPlan->id.queryId;