feat:add encode/decode for operator

This commit is contained in:
wangmm0220 2022-05-27 23:21:41 +08:00
parent 708805b904
commit 780644c135
2 changed files with 63 additions and 0 deletions

View File

@ -778,6 +778,9 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
int32_t encodeExecTaskInfo(SOperatorInfo* ops, char** data);
int32_t decodeExecTaskInfo(SOperatorInfo* ops, char* data);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model); EOPTR_EXEC_MODEL model);

View File

@ -4954,6 +4954,66 @@ _error:
return NULL; return NULL;
} }
int32_t encodeExecTaskInfo(SOperatorInfo* ops, char** result){
uint8_t operatorType = ops->operatorType;
int32_t num = 0;
int32_t size = ops->numOfDownstream;
for (int32_t i = 0; i < size; ++i) {
encodeExecTaskInfo(ops->pDownstream[i], result);
}
SOperatorInfo* pOptr = NULL;
if (QUERY_NODE_PHYSICAL_PLAN_AGG == operatorType) {
if(ops->fpSet.encodeResultRow){
int32_t length = 0;
SAggOperatorInfo* pAggInfo = ops->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SAggSupporter *pSup = &pAggInfo->aggSup;
ops->fpSet.encodeResultRow(ops, pSup, pInfo, result, &length);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == operatorType || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == operatorType) {
}
return 0;
}
int32_t decodeExecTaskInfo(SOperatorInfo* ops, char* result){
uint8_t operatorType = ops->operatorType;
int32_t num = 0;
int32_t size = ops->numOfDownstream;
for (int32_t i = 0; i < size; ++i) {
decodeExecTaskInfo(ops->pDownstream[i], result);
}
SOperatorInfo* pOptr = NULL;
if (QUERY_NODE_PHYSICAL_PLAN_AGG == operatorType) {
if(ops->fpSet.decodeResultRow){
int32_t length = 0;
SAggOperatorInfo* pAggInfo = ops->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SAggSupporter *pSup = &pAggInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pInfo->resultRowInfo.size = 0;
ops->fpSet.decodeResultRow(ops, pSup, pInfo, result, length);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == operatorType || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == operatorType) {
}
return 0;
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model) { EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId; uint64_t queryId = pPlan->id.queryId;