[td-11818] create multiple tables.
This commit is contained in:
parent
f0044d826f
commit
5326356375
|
@ -119,11 +119,12 @@ typedef struct SSubplanId {
|
||||||
} SSubplanId;
|
} SSubplanId;
|
||||||
|
|
||||||
typedef struct SSubplan {
|
typedef struct SSubplan {
|
||||||
SSubplanId id; // unique id of the subplan
|
SSubplanId id; // unique id of the subplan
|
||||||
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
|
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
|
||||||
int32_t level; // the execution level of current subplan, starting from 0.
|
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
|
||||||
|
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
|
||||||
SEpSet execEpSet; // for the scan/modify subplan, the optional execution node
|
SEpSet execEpSet; // for the scan/modify subplan, the optional execution node
|
||||||
SArray *pChildern; // the datasource subplan,from which to fetch the result
|
SArray *pChildren; // the datasource subplan,from which to fetch the result
|
||||||
SArray *pParents; // the data destination subplan, get data from current subplan
|
SArray *pParents; // the data destination subplan, get data from current subplan
|
||||||
SPhyNode *pNode; // physical plan of current subplan
|
SPhyNode *pNode; // physical plan of current subplan
|
||||||
SDataSink *pDataSink; // data of the subplan flow into the datasink
|
SDataSink *pDataSink; // data of the subplan flow into the datasink
|
||||||
|
|
|
@ -140,7 +140,9 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
|
||||||
|
|
||||||
SRequestObj *pRequest = (SRequestObj *) pRes;
|
SRequestObj *pRequest = (SRequestObj *) pRes;
|
||||||
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||||
pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) {
|
pRequest->type == TSDB_SQL_INSERT ||
|
||||||
|
pRequest->code != TSDB_CODE_SUCCESS ||
|
||||||
|
taos_num_fields(pRes) == 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -279,7 +279,7 @@ TEST(testCase, connect_Test) {
|
||||||
// taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
// taos_close(pConn);
|
// taos_close(pConn);
|
||||||
//}
|
//}
|
||||||
//
|
|
||||||
//TEST(testCase, create_table_Test) {
|
//TEST(testCase, create_table_Test) {
|
||||||
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
// // assert(pConn != NULL);
|
// // assert(pConn != NULL);
|
||||||
|
@ -470,9 +470,9 @@ TEST(testCase, create_multiple_tables) {
|
||||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)");
|
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
ASSERT_TRUE(false);
|
ASSERT_TRUE(false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -392,7 +392,7 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
|
||||||
%type create_stable_args{SCreateTableSql*}
|
%type create_stable_args{SCreateTableSql*}
|
||||||
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
|
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
|
||||||
A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE);
|
A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE);
|
||||||
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
|
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_STABLE);
|
||||||
|
|
||||||
V.n += Z.n;
|
V.n += Z.n;
|
||||||
setCreatedTableName(pInfo, &V, &U);
|
setCreatedTableName(pInfo, &V, &U);
|
||||||
|
|
|
@ -785,7 +785,7 @@ void destroySqlInfo(SSqlInfo *pInfo) {
|
||||||
taosArrayDestroy(pInfo->funcs);
|
taosArrayDestroy(pInfo->funcs);
|
||||||
if (pInfo->type == TSDB_SQL_SELECT) {
|
if (pInfo->type == TSDB_SQL_SELECT) {
|
||||||
destroyAllSqlNode(&pInfo->sub);
|
destroyAllSqlNode(&pInfo->sub);
|
||||||
} else if (pInfo->type == TSDB_SQL_CREATE_TABLE) {
|
} else if (pInfo->type == TSDB_SQL_CREATE_STABLE) {
|
||||||
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
|
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
|
||||||
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
|
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
|
||||||
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem);
|
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem);
|
||||||
|
|
|
@ -761,8 +761,9 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_SQL_CREATE_TABLE: {
|
case TSDB_SQL_CREATE_STABLE: {
|
||||||
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
|
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
|
||||||
|
assert(pCreateTable->type != TSQL_CREATE_CTABLE);
|
||||||
|
|
||||||
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
|
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
|
||||||
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
|
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -772,13 +773,6 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
||||||
|
|
||||||
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
|
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
|
||||||
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
|
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
|
||||||
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
|
|
||||||
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) !=
|
|
||||||
TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDcl->msgType = TDMT_VND_CREATE_TABLE;
|
|
||||||
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
|
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
|
||||||
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
|
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
|
||||||
// return code;
|
// return code;
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -70,6 +70,11 @@ typedef struct SQueryPlanNode {
|
||||||
struct SQueryPlanNode *pParent;
|
struct SQueryPlanNode *pParent;
|
||||||
} SQueryPlanNode;
|
} SQueryPlanNode;
|
||||||
|
|
||||||
|
typedef struct SDataPayloadInfo {
|
||||||
|
int32_t msgType;
|
||||||
|
SArray *payload;
|
||||||
|
} SDataPayloadInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Optimize the query execution plan, currently not implement yet.
|
* Optimize the query execution plan, currently not implement yet.
|
||||||
* @param pQueryNode
|
* @param pQueryNode
|
||||||
|
|
|
@ -37,18 +37,28 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
||||||
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
|
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
|
||||||
|
|
||||||
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
|
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
|
||||||
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
|
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
|
||||||
if (NULL == *pQueryPlan || NULL == blocks) {
|
|
||||||
|
SDataPayloadInfo* pPayload = calloc(1, sizeof(SDataPayloadInfo));
|
||||||
|
if (NULL == *pQueryPlan || NULL == blocks || NULL == pPayload) {
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pQueryPlan)->info.type = QNODE_MODIFY;
|
(*pQueryPlan)->info.type = QNODE_MODIFY;
|
||||||
taosArrayAddAll(blocks, pInsert->pDataBlocks);
|
taosArrayAddAll(blocks, pInsert->pDataBlocks);
|
||||||
(*pQueryPlan)->pExtInfo = blocks;
|
|
||||||
|
if (pNode->type == TSDB_SQL_INSERT) {
|
||||||
|
pPayload->msgType = TDMT_VND_SUBMIT;
|
||||||
|
} else if (pNode->type == TSDB_SQL_CREATE_TABLE) {
|
||||||
|
pPayload->msgType = TDMT_VND_CREATE_TABLE;
|
||||||
|
}
|
||||||
|
|
||||||
|
pPayload->payload = blocks;
|
||||||
|
(*pQueryPlan)->pExtInfo = pPayload;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -69,7 +79,7 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
||||||
|
|
||||||
case TSDB_SQL_INSERT:
|
case TSDB_SQL_INSERT:
|
||||||
case TSDB_SQL_CREATE_TABLE:
|
case TSDB_SQL_CREATE_TABLE:
|
||||||
return createInsertPlan(pNode, pQueryPlan);
|
return createModificationOpPlan(pNode, pQueryPlan);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -191,13 +191,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
||||||
subplan->level = 0;
|
subplan->level = 0;
|
||||||
if (NULL != pCxt->pCurrentSubplan) {
|
if (NULL != pCxt->pCurrentSubplan) {
|
||||||
subplan->level = pCxt->pCurrentSubplan->level + 1;
|
subplan->level = pCxt->pCurrentSubplan->level + 1;
|
||||||
if (NULL == pCxt->pCurrentSubplan->pChildern) {
|
if (NULL == pCxt->pCurrentSubplan->pChildren) {
|
||||||
pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||||
}
|
}
|
||||||
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
|
|
||||||
|
taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
|
||||||
subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||||
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
|
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* currentLevel;
|
SArray* currentLevel;
|
||||||
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
|
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
|
||||||
currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||||
|
@ -205,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
||||||
} else {
|
} else {
|
||||||
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
|
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(currentLevel, &subplan);
|
taosArrayPush(currentLevel, &subplan);
|
||||||
pCxt->pCurrentSubplan = subplan;
|
pCxt->pCurrentSubplan = subplan;
|
||||||
++(pCxt->pDag->numOfSubplans);
|
++(pCxt->pDag->numOfSubplans);
|
||||||
|
@ -290,24 +293,28 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
SArray* vgs = (SArray*)pPlanNode->pExtInfo;
|
SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;
|
||||||
size_t numOfVg = taosArrayGetSize(vgs);
|
|
||||||
for (int32_t i = 0; i < numOfVg; ++i) {
|
size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
|
||||||
|
for (int32_t i = 0; i < numOfVgroups; ++i) {
|
||||||
STORE_CURRENT_SUBPLAN(pCxt);
|
STORE_CURRENT_SUBPLAN(pCxt);
|
||||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
|
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
|
||||||
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
|
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
|
||||||
|
|
||||||
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
|
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
|
||||||
subplan->pNode = NULL;
|
|
||||||
subplan->pDataSink = createDataInserter(pCxt, blocks);
|
subplan->pDataSink = createDataInserter(pCxt, blocks);
|
||||||
subplan->type = QUERY_TYPE_MODIFY;
|
subplan->pNode = NULL;
|
||||||
|
subplan->type = QUERY_TYPE_MODIFY;
|
||||||
|
subplan->msgType = pPayload->msgType;
|
||||||
|
|
||||||
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
|
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
|
||||||
if (QNODE_MODIFY == pRoot->info.type) {
|
if (QNODE_MODIFY == pRoot->info.type) {
|
||||||
splitInsertSubplan(pCxt, pRoot);
|
splitModificationOpSubPlan(pCxt, pRoot);
|
||||||
} else {
|
} else {
|
||||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
|
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
|
||||||
++(pCxt->nextId.templateId);
|
++(pCxt->nextId.templateId);
|
||||||
|
@ -325,6 +332,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
|
||||||
.pCurrentSubplan = NULL,
|
.pCurrentSubplan = NULL,
|
||||||
.nextId = {0} // todo queryid
|
.nextId = {0} // todo queryid
|
||||||
};
|
};
|
||||||
|
|
||||||
*pDag = context.pDag;
|
*pDag = context.pDag;
|
||||||
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||||
createSubplanByLevel(&context, pQueryNode);
|
createSubplanByLevel(&context, pQueryNode);
|
||||||
|
|
|
@ -793,7 +793,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The 'type', 'level', 'execEpSet', 'pChildern' and 'pParents' fields do not need to be serialized.
|
// The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized.
|
||||||
|
|
||||||
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
|
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
|
||||||
if (res) {
|
if (res) {
|
||||||
|
@ -835,7 +835,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
|
||||||
SDataInserter* insert = (SDataInserter*)(subplan->pDataSink);
|
SDataInserter* insert = (SDataInserter*)(subplan->pDataSink);
|
||||||
*len = insert->size;
|
*len = insert->size;
|
||||||
*str = insert->pData;
|
*str = insert->pData;
|
||||||
insert->pData == NULL;
|
insert->pData = NULL;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -844,6 +844,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
*str = cJSON_Print(json);
|
*str = cJSON_Print(json);
|
||||||
*len = strlen(*str) + 1;
|
*len = strlen(*str) + 1;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -31,17 +31,20 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag)
|
||||||
destroyQueryPlan(logicPlan);
|
destroyQueryPlan(logicPlan);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = optimizeQueryPlan(logicPlan);
|
code = optimizeQueryPlan(logicPlan);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
destroyQueryPlan(logicPlan);
|
destroyQueryPlan(logicPlan);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = createDag(logicPlan, NULL, pDag);
|
code = createDag(logicPlan, NULL, pDag);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
destroyQueryPlan(logicPlan);
|
destroyQueryPlan(logicPlan);
|
||||||
qDestroyQueryDag(*pDag);
|
qDestroyQueryDag(*pDag);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
destroyQueryPlan(logicPlan);
|
destroyQueryPlan(logicPlan);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
|
||||||
for (int32_t m = 0; m < level->taskNum; ++m) {
|
for (int32_t m = 0; m < level->taskNum; ++m) {
|
||||||
SSchTask *task = taosArrayGet(level->subTasks, m);
|
SSchTask *task = taosArrayGet(level->subTasks, m);
|
||||||
SSubplan *plan = task->plan;
|
SSubplan *plan = task->plan;
|
||||||
int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
|
int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0;
|
||||||
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
|
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
|
||||||
|
|
||||||
if (childNum > 0) {
|
if (childNum > 0) {
|
||||||
|
@ -40,7 +40,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t n = 0; n < childNum; ++n) {
|
for (int32_t n = 0; n < childNum; ++n) {
|
||||||
SSubplan **child = taosArrayGet(plan->pChildern, n);
|
SSubplan **child = taosArrayGet(plan->pChildren, n);
|
||||||
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
|
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
|
||||||
if (NULL == childTask || NULL == *childTask) {
|
if (NULL == childTask || NULL == *childTask) {
|
||||||
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
|
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
|
||||||
|
@ -122,6 +122,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//??
|
||||||
job->attr.needFetch = true;
|
job->attr.needFetch = true;
|
||||||
|
|
||||||
job->levelNum = levelNum;
|
job->levelNum = levelNum;
|
||||||
|
@ -547,22 +548,29 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
tfree(param);
|
tfree(param);
|
||||||
|
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
|
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
|
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
|
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
|
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
|
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
|
||||||
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
|
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
|
||||||
|
@ -570,6 +578,9 @@ int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
|
||||||
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
||||||
switch (msgType) {
|
switch (msgType) {
|
||||||
|
case TDMT_VND_CREATE_TABLE:
|
||||||
|
*fp = schHandleCreateTableCallback;
|
||||||
|
break;
|
||||||
case TDMT_VND_SUBMIT:
|
case TDMT_VND_SUBMIT:
|
||||||
*fp = schHandleSubmitCallback;
|
*fp = schHandleSubmitCallback;
|
||||||
break;
|
break;
|
||||||
|
@ -640,6 +651,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
switch (msgType) {
|
switch (msgType) {
|
||||||
|
case TDMT_VND_CREATE_TABLE:
|
||||||
case TDMT_VND_SUBMIT: {
|
case TDMT_VND_SUBMIT: {
|
||||||
if (NULL == task->msg || task->msgLen <= 0) {
|
if (NULL == task->msg || task->msgLen <= 0) {
|
||||||
qError("submit msg is NULL");
|
qError("submit msg is NULL");
|
||||||
|
@ -746,19 +758,15 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
|
// int32_t msgType = (plan->type == QUERY_TYPE_MODIFY)? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
|
||||||
|
|
||||||
SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
|
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
|
||||||
|
|
||||||
SCH_ERR_RET(schPushTaskToExecList(job, task));
|
SCH_ERR_RET(schPushTaskToExecList(job, task));
|
||||||
|
|
||||||
task->status = JOB_TASK_STATUS_EXECUTING;
|
task->status = JOB_TASK_STATUS_EXECUTING;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t schLaunchJob(SSchJob *job) {
|
int32_t schLaunchJob(SSchJob *job) {
|
||||||
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
|
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
|
||||||
for (int32_t i = 0; i < level->taskNum; ++i) {
|
for (int32_t i = 0; i < level->taskNum; ++i) {
|
||||||
|
|
|
@ -58,7 +58,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
|
||||||
scanPlan.execEpSet.numOfEps = 1;
|
scanPlan.execEpSet.numOfEps = 1;
|
||||||
scanPlan.execEpSet.port[0] = 6030;
|
scanPlan.execEpSet.port[0] = 6030;
|
||||||
strcpy(scanPlan.execEpSet.fqdn[0], "ep0");
|
strcpy(scanPlan.execEpSet.fqdn[0], "ep0");
|
||||||
scanPlan.pChildern = NULL;
|
scanPlan.pChildren = NULL;
|
||||||
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
|
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
|
||||||
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
||||||
|
|
||||||
|
@ -68,14 +68,14 @@ void schtBuildQueryDag(SQueryDag *dag) {
|
||||||
mergePlan.type = QUERY_TYPE_MERGE;
|
mergePlan.type = QUERY_TYPE_MERGE;
|
||||||
mergePlan.level = 0;
|
mergePlan.level = 0;
|
||||||
mergePlan.execEpSet.numOfEps = 0;
|
mergePlan.execEpSet.numOfEps = 0;
|
||||||
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
|
mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
|
||||||
mergePlan.pParents = NULL;
|
mergePlan.pParents = NULL;
|
||||||
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
||||||
|
|
||||||
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
|
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
|
||||||
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
|
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
|
||||||
|
|
||||||
taosArrayPush(mergePointer->pChildern, &scanPointer);
|
taosArrayPush(mergePointer->pChildren, &scanPointer);
|
||||||
taosArrayPush(scanPointer->pParents, &mergePointer);
|
taosArrayPush(scanPointer->pParents, &mergePointer);
|
||||||
|
|
||||||
taosArrayPush(dag->pSubplans, &merge);
|
taosArrayPush(dag->pSubplans, &merge);
|
||||||
|
@ -100,7 +100,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
insertPlan[0].execEpSet.numOfEps = 1;
|
insertPlan[0].execEpSet.numOfEps = 1;
|
||||||
insertPlan[0].execEpSet.port[0] = 6030;
|
insertPlan[0].execEpSet.port[0] = 6030;
|
||||||
strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0");
|
strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0");
|
||||||
insertPlan[0].pChildern = NULL;
|
insertPlan[0].pChildren = NULL;
|
||||||
insertPlan[0].pParents = NULL;
|
insertPlan[0].pParents = NULL;
|
||||||
insertPlan[0].pNode = NULL;
|
insertPlan[0].pNode = NULL;
|
||||||
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
||||||
|
@ -113,7 +113,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
insertPlan[1].execEpSet.numOfEps = 1;
|
insertPlan[1].execEpSet.numOfEps = 1;
|
||||||
insertPlan[1].execEpSet.port[0] = 6030;
|
insertPlan[1].execEpSet.port[0] = 6030;
|
||||||
strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1");
|
strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1");
|
||||||
insertPlan[1].pChildern = NULL;
|
insertPlan[1].pChildren = NULL;
|
||||||
insertPlan[1].pParents = NULL;
|
insertPlan[1].pParents = NULL;
|
||||||
insertPlan[1].pNode = NULL;
|
insertPlan[1].pNode = NULL;
|
||||||
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
||||||
|
|
Loading…
Reference in New Issue