diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 153cf0bb3e..d4469be5e3 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -119,9 +119,9 @@ typedef struct SSubplanId { typedef struct SSubplan { SSubplanId id; // unique id of the subplan - int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN + int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY int32_t level; // the execution level of current subplan, starting from 0. - SEpSet execEpSet; // for the scan sub plan, the optional execution node + SEpSet execEpSet; // for the scan/modify subplan, the optional execution node SArray *pChildern; // the datasource subplan,from which to fetch the result SArray *pParents; // the data destination subplan, get data from current subplan SPhyNode *pNode; // physical plan of current subplan @@ -152,7 +152,7 @@ int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode /** * Convert to subplan to string for the scheduler to send to the executor */ -int32_t qSubPlanToString(const SSubplan* subplan, char** str); +int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len); int32_t qStringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 1bee95b8e5..ed29839905 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -102,7 +102,7 @@ int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); -int32_t subPlanToString(const SSubplan *pPhyNode, char** str); +int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t stringToSubplan(const char* str, SSubplan** subplan); /** @@ -121,6 +121,9 @@ void* destroyQueryPhyPlan(struct SPhyNode* pQueryPhyNode); const char* opTypeToOpName(int32_t type); int32_t opNameToOpType(const char* name); +const char* dsinkTypeToDsinkName(int32_t type); +int32_t dsinkNameToDsinkType(const char* name); + #ifdef __cplusplus } #endif diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 9db9e059b3..8388458b4c 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -47,16 +47,38 @@ const char* opTypeToOpName(int32_t type) { int32_t opNameToOpType(const char* name) { for (int32_t i = 1; i < sizeof(gOpName) / sizeof(gOpName[0]); ++i) { - if (strcmp(name, gOpName[i])) { + if (0 == strcmp(name, gOpName[i])) { return i; } } return OP_Unknown; } +const char* dsinkTypeToDsinkName(int32_t type) { + switch (type) { + case DSINK_Dispatch: + return "Dispatch"; + case DSINK_Insert: + return "Insert"; + default: + break; + } + return "Unknown"; +} + +int32_t dsinkNameToDsinkType(const char* name) { + if (0 == strcmp(name, "Dispatch")) { + return DSINK_Dispatch; + } else if (0 == strcmp(name, "Insert")) { + return DSINK_Insert; + } + return DSINK_Unknown; +} + static SDataSink* initDataSink(int32_t type, int32_t size) { SDataSink* sink = (SDataSink*)vailidPointer(calloc(1, size)); sink->info.type = type; + sink->info.name = dsinkTypeToDsinkName(type); return sink; } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 15c0e632a7..bf052f34b4 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -695,6 +695,70 @@ static bool phyNodeFromJson(const cJSON* json, void* obj) { return res; } +static const char* jkInserterNumOfTables = "NumOfTables"; +static const char* jkInserterDataSize = "DataSize"; + +static bool inserterToJson(const void* obj, cJSON* json) { + const SDataInserter* inserter = (const SDataInserter*)obj; + bool res = cJSON_AddNumberToObject(json, jkInserterNumOfTables, inserter->numOfTables); + if (res) { + res = cJSON_AddNumberToObject(json, jkInserterDataSize, inserter->size); + } + // todo pData + return res; +} + +static bool inserterFromJson(const cJSON* json, void* obj) { + SDataInserter* inserter = (SDataInserter*)obj; + inserter->numOfTables = getNumber(json, jkInserterNumOfTables); + inserter->size = getNumber(json, jkInserterDataSize); + // todo pData +} + +static bool specificDataSinkToJson(const void* obj, cJSON* json) { + const SDataSink* dsink = (const SDataSink*)obj; + switch (dsink->info.type) { + case DSINK_Dispatch: + return true; + case DSINK_Insert: + return inserterToJson(obj, json); + default: + break; + } + return false; +} + +static bool specificDataSinkFromJson(const cJSON* json, void* obj) { + SDataSink* dsink = (SDataSink*)obj; + switch (dsink->info.type) { + case DSINK_Dispatch: + return true; + case DSINK_Insert: + return inserterFromJson(json, obj); + default: + break; + } + return false; +} + +static const char* jkDataSinkName = "Name"; + +static bool dataSinkToJson(const void* obj, cJSON* json) { + const SDataSink* dsink = (const SDataSink*)obj; + bool res = cJSON_AddStringToObject(json, jkDataSinkName, dsink->info.name); + if (res) { + res = addObject(json, dsink->info.name, specificDataSinkToJson, dsink); + } + return res; +} + +static bool dataSinkFromJson(const cJSON* json, void* obj) { + SDataSink* dsink = (SDataSink*)obj; + dsink->info.name = getString(json, jkDataSinkName); + dsink->info.type = dsinkNameToDsinkType(dsink->info.name); + return fromObject(json, dsink->info.name, specificDataSinkFromJson, dsink, true); +} + static const char* jkIdQueryId = "QueryId"; static const char* jkIdTemplateId = "TemplateId"; static const char* jkIdSubplanId = "SubplanId"; @@ -721,6 +785,7 @@ static bool subplanIdFromJson(const cJSON* json, void* obj) { static const char* jkSubplanId = "Id"; static const char* jkSubplanNode = "Node"; +static const char* jkSubplanDataSink = "DataSink"; static cJSON* subplanToJson(const SSubplan* subplan) { cJSON* jSubplan = cJSON_CreateObject(); @@ -734,6 +799,9 @@ static cJSON* subplanToJson(const SSubplan* subplan) { if (res) { res = addObject(jSubplan, jkSubplanNode, phyNodeToJson, subplan->pNode); } + if (res) { + res = addObject(jSubplan, jkSubplanDataSink, dataSinkToJson, subplan->pDataSink); + } if (!res) { cJSON_Delete(jSubplan); @@ -751,6 +819,9 @@ static SSubplan* subplanFromJson(const cJSON* json) { if (res) { res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, sizeof(SPhyNode), false); } + if (res) { + res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false); + } if (!res) { qDestroySubplan(subplan); @@ -759,13 +830,22 @@ static SSubplan* subplanFromJson(const cJSON* json) { return subplan; } -int32_t subPlanToString(const SSubplan* subplan, char** str) { +int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { + if (QUERY_TYPE_MODIFY == subplan->type) { + SDataInserter* insert = (SDataInserter*)(subplan->pDataSink); + *len = insert->size; + *str = insert->pData; + insert->pData == NULL; + return TSDB_CODE_SUCCESS; + } + cJSON* json = subplanToJson(subplan); if (NULL == json) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } *str = cJSON_Print(json); + *len = strlen(*str); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 7722a7d363..e8523249e4 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -50,8 +50,8 @@ int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr return setSubplanExecutionNode(subplan, templateId, ep); } -int32_t qSubPlanToString(const SSubplan *subplan, char** str) { - return subPlanToString(subplan, str); +int32_t qSubPlanToString(const SSubplan *subplan, char** str, int32_t* len) { + return subPlanToString(subplan, str, len); } int32_t qStringToSubplan(const char* str, SSubplan** subplan) { diff --git a/source/libs/planner/test/phyPlanTests.cpp b/source/libs/planner/test/phyPlanTests.cpp index 3be3337304..ddd1151742 100644 --- a/source/libs/planner/test/phyPlanTests.cpp +++ b/source/libs/planner/test/phyPlanTests.cpp @@ -62,8 +62,9 @@ protected: size_t num = taosArrayGetSize(subplans); for (size_t j = 0; j < num; ++j) { std::cout << "no " << j << ":" << std::endl; + int32_t len = 0; char* str = nullptr; - ASSERT_EQ (TSDB_CODE_SUCCESS, qSubPlanToString((const SSubplan*)taosArrayGetP(subplans, j), &str)); + ASSERT_EQ (TSDB_CODE_SUCCESS, qSubPlanToString((const SSubplan*)taosArrayGetP(subplans, j), &str, &len)); std::cout << str << std::endl; free(str); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 99a9b06fe4..5671a0d747 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -521,8 +521,8 @@ _return: int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SSubplan *plan = task->plan; - - SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); + int32_t len = 0; + SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &len)); if (plan->execEpSet.numOfEps <= 0) { SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); }