Merge pull request #9562 from taosdata/feature/3.0_liaohj
Feature/3.0 liaohj
This commit is contained in:
commit
1786d6a56c
|
@ -119,11 +119,12 @@ typedef struct SSubplanId {
|
|||
} SSubplanId;
|
||||
|
||||
typedef struct SSubplan {
|
||||
SSubplanId id; // unique id of the subplan
|
||||
SSubplanId id; // unique id of the subplan
|
||||
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
|
||||
int32_t level; // the execution level of current subplan, starting from 0.
|
||||
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
|
||||
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
|
||||
SEpSet execEpSet; // for the scan/modify subplan, the optional execution node
|
||||
SArray *pChildern; // the datasource subplan,from which to fetch the result
|
||||
SArray *pChildren; // the datasource subplan,from which to fetch the result
|
||||
SArray *pParents; // the data destination subplan, get data from current subplan
|
||||
SPhyNode *pNode; // physical plan of current subplan
|
||||
SDataSink *pDataSink; // data of the subplan flow into the datasink
|
||||
|
|
|
@ -29,7 +29,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
|
|||
|
||||
// this function may be called by user or system, or by both simultaneously.
|
||||
void taos_cleanup(void) {
|
||||
tscDebug("start to cleanup client environment");
|
||||
tscInfo("start to cleanup client environment");
|
||||
|
||||
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
|
||||
return;
|
||||
|
@ -47,6 +47,8 @@ void taos_cleanup(void) {
|
|||
|
||||
rpcCleanup();
|
||||
taosCloseLog();
|
||||
|
||||
tscInfo("all local resources released");
|
||||
}
|
||||
|
||||
TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port) {
|
||||
|
@ -140,7 +142,9 @@ TAOS_ROW taos_fetch_row(TAOS_RES *pRes) {
|
|||
|
||||
SRequestObj *pRequest = (SRequestObj *) pRes;
|
||||
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
|
||||
pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pRequest->type == TSDB_SQL_INSERT ||
|
||||
pRequest->code != TSDB_CODE_SUCCESS ||
|
||||
taos_num_fields(pRes) == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -279,7 +279,7 @@ TEST(testCase, connect_Test) {
|
|||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
|
||||
//TEST(testCase, create_table_Test) {
|
||||
// // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// // assert(pConn != NULL);
|
||||
|
@ -470,9 +470,9 @@ TEST(testCase, create_multiple_tables) {
|
|||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table t_2 using st1 tags(1) t_3 using st1 tags(2)");
|
||||
pRes = taos_query(pConn, "create table t_2 using st1 tags(1)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
|
||||
printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
ASSERT_TRUE(false);
|
||||
}
|
||||
|
|
|
@ -392,7 +392,7 @@ create_table_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP. {
|
|||
%type create_stable_args{SCreateTableSql*}
|
||||
create_stable_args(A) ::= ifnotexists(U) ids(V) cpxName(Z) LP columnlist(X) RP TAGS LP columnlist(Y) RP. {
|
||||
A = tSetCreateTableInfo(X, Y, NULL, TSQL_CREATE_STABLE);
|
||||
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_TABLE);
|
||||
setSqlInfo(pInfo, A, NULL, TSDB_SQL_CREATE_STABLE);
|
||||
|
||||
V.n += Z.n;
|
||||
setCreatedTableName(pInfo, &V, &U);
|
||||
|
|
|
@ -785,7 +785,7 @@ void destroySqlInfo(SSqlInfo *pInfo) {
|
|||
taosArrayDestroy(pInfo->funcs);
|
||||
if (pInfo->type == TSDB_SQL_SELECT) {
|
||||
destroyAllSqlNode(&pInfo->sub);
|
||||
} else if (pInfo->type == TSDB_SQL_CREATE_TABLE) {
|
||||
} else if (pInfo->type == TSDB_SQL_CREATE_STABLE) {
|
||||
pInfo->pCreateTableInfo = destroyCreateTableSql(pInfo->pCreateTableInfo);
|
||||
} else if (pInfo->type == TSDB_SQL_ALTER_TABLE) {
|
||||
taosArrayDestroyEx(pInfo->pAlterInfo->varList, freeItem);
|
||||
|
|
|
@ -761,8 +761,9 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
|||
break;
|
||||
}
|
||||
|
||||
case TSDB_SQL_CREATE_TABLE: {
|
||||
case TSDB_SQL_CREATE_STABLE: {
|
||||
SCreateTableSql* pCreateTable = pInfo->pCreateTableInfo;
|
||||
assert(pCreateTable->type != TSQL_CREATE_CTABLE);
|
||||
|
||||
if (pCreateTable->type == TSQL_CREATE_TABLE || pCreateTable->type == TSQL_CREATE_STABLE) {
|
||||
if ((code = doCheckForCreateTable(pInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -772,13 +773,6 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, c
|
|||
|
||||
pDcl->pMsg = (char*)buildCreateTableMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
|
||||
pDcl->msgType = (pCreateTable->type == TSQL_CREATE_TABLE) ? TDMT_VND_CREATE_TABLE : TDMT_MND_CREATE_STB;
|
||||
} else if (pCreateTable->type == TSQL_CREATE_CTABLE) {
|
||||
if ((code = doCheckForCreateCTable(pInfo, pCtx, pMsgBuf, &pDcl->pMsg, &pDcl->msgLen)) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pDcl->msgType = TDMT_VND_CREATE_TABLE;
|
||||
} else if (pCreateTable->type == TSQL_CREATE_STREAM) {
|
||||
// if ((code = doCheckForStream(pSql, pInfo)) != TSDB_CODE_SUCCESS) {
|
||||
// return code;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -70,6 +70,11 @@ typedef struct SQueryPlanNode {
|
|||
struct SQueryPlanNode *pParent;
|
||||
} SQueryPlanNode;
|
||||
|
||||
typedef struct SDataPayloadInfo {
|
||||
int32_t msgType;
|
||||
SArray *payload;
|
||||
} SDataPayloadInfo;
|
||||
|
||||
/**
|
||||
* Optimize the query execution plan, currently not implement yet.
|
||||
* @param pQueryNode
|
||||
|
|
|
@ -37,18 +37,28 @@ int32_t optimizeQueryPlan(struct SQueryPlanNode* pQueryNode) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t createInsertPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
||||
static int32_t createModificationOpPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
||||
SInsertStmtInfo* pInsert = (SInsertStmtInfo*)pNode;
|
||||
|
||||
*pQueryPlan = calloc(1, sizeof(SQueryPlanNode));
|
||||
SArray* blocks = taosArrayInit(taosArrayGetSize(pInsert->pDataBlocks), POINTER_BYTES);
|
||||
if (NULL == *pQueryPlan || NULL == blocks) {
|
||||
|
||||
SDataPayloadInfo* pPayload = calloc(1, sizeof(SDataPayloadInfo));
|
||||
if (NULL == *pQueryPlan || NULL == blocks || NULL == pPayload) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
(*pQueryPlan)->info.type = QNODE_MODIFY;
|
||||
taosArrayAddAll(blocks, pInsert->pDataBlocks);
|
||||
(*pQueryPlan)->pExtInfo = blocks;
|
||||
|
||||
if (pNode->type == TSDB_SQL_INSERT) {
|
||||
pPayload->msgType = TDMT_VND_SUBMIT;
|
||||
} else if (pNode->type == TSDB_SQL_CREATE_TABLE) {
|
||||
pPayload->msgType = TDMT_VND_CREATE_TABLE;
|
||||
}
|
||||
|
||||
pPayload->payload = blocks;
|
||||
(*pQueryPlan)->pExtInfo = pPayload;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -69,7 +79,7 @@ int32_t createQueryPlan(const SQueryNode* pNode, SQueryPlanNode** pQueryPlan) {
|
|||
|
||||
case TSDB_SQL_INSERT:
|
||||
case TSDB_SQL_CREATE_TABLE:
|
||||
return createInsertPlan(pNode, pQueryPlan);
|
||||
return createModificationOpPlan(pNode, pQueryPlan);
|
||||
|
||||
default:
|
||||
return TSDB_CODE_FAILED;
|
||||
|
|
|
@ -191,13 +191,15 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
|||
subplan->level = 0;
|
||||
if (NULL != pCxt->pCurrentSubplan) {
|
||||
subplan->level = pCxt->pCurrentSubplan->level + 1;
|
||||
if (NULL == pCxt->pCurrentSubplan->pChildern) {
|
||||
pCxt->pCurrentSubplan->pChildern = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||
if (NULL == pCxt->pCurrentSubplan->pChildren) {
|
||||
pCxt->pCurrentSubplan->pChildren = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||
}
|
||||
taosArrayPush(pCxt->pCurrentSubplan->pChildern, &subplan);
|
||||
|
||||
taosArrayPush(pCxt->pCurrentSubplan->pChildren, &subplan);
|
||||
subplan->pParents = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||
taosArrayPush(subplan->pParents, &pCxt->pCurrentSubplan);
|
||||
}
|
||||
|
||||
SArray* currentLevel;
|
||||
if (subplan->level >= taosArrayGetSize(pCxt->pDag->pSubplans)) {
|
||||
currentLevel = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||
|
@ -205,6 +207,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
|||
} else {
|
||||
currentLevel = taosArrayGetP(pCxt->pDag->pSubplans, subplan->level);
|
||||
}
|
||||
|
||||
taosArrayPush(currentLevel, &subplan);
|
||||
pCxt->pCurrentSubplan = subplan;
|
||||
++(pCxt->pDag->numOfSubplans);
|
||||
|
@ -278,6 +281,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
|||
default:
|
||||
assert(false);
|
||||
}
|
||||
|
||||
if (pPlanNode->pChildren != NULL && taosArrayGetSize(pPlanNode->pChildren) > 0) {
|
||||
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
size_t size = taosArrayGetSize(pPlanNode->pChildren);
|
||||
|
@ -287,31 +291,38 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
|||
taosArrayPush(node->pChildren, &child);
|
||||
}
|
||||
}
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SArray* vgs = (SArray*)pPlanNode->pExtInfo;
|
||||
size_t numOfVg = taosArrayGetSize(vgs);
|
||||
for (int32_t i = 0; i < numOfVg; ++i) {
|
||||
static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SDataPayloadInfo* pPayload = (SDataPayloadInfo*) pPlanNode->pExtInfo;
|
||||
|
||||
size_t numOfVgroups = taosArrayGetSize(pPayload->payload);
|
||||
for (int32_t i = 0; i < numOfVgroups; ++i) {
|
||||
STORE_CURRENT_SUBPLAN(pCxt);
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
|
||||
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
|
||||
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
|
||||
|
||||
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
|
||||
subplan->pNode = NULL;
|
||||
subplan->pDataSink = createDataInserter(pCxt, blocks);
|
||||
subplan->type = QUERY_TYPE_MODIFY;
|
||||
subplan->pNode = NULL;
|
||||
subplan->type = QUERY_TYPE_MODIFY;
|
||||
subplan->msgType = pPayload->msgType;
|
||||
|
||||
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
||||
}
|
||||
}
|
||||
|
||||
static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
|
||||
if (QNODE_MODIFY == pRoot->info.type) {
|
||||
splitInsertSubplan(pCxt, pRoot);
|
||||
splitModificationOpSubPlan(pCxt, pRoot);
|
||||
} else {
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
|
||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
|
||||
++(pCxt->nextId.templateId);
|
||||
subplan->pNode = createPhyNode(pCxt, pRoot);
|
||||
|
||||
subplan->msgType = TDMT_VND_QUERY;
|
||||
subplan->pNode = createPhyNode(pCxt, pRoot);
|
||||
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
|
||||
}
|
||||
// todo deal subquery
|
||||
|
@ -325,6 +336,7 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
|
|||
.pCurrentSubplan = NULL,
|
||||
.nextId = {0} // todo queryid
|
||||
};
|
||||
|
||||
*pDag = context.pDag;
|
||||
context.pDag->pSubplans = validPointer(taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES));
|
||||
createSubplanByLevel(&context, pQueryNode);
|
||||
|
|
|
@ -793,7 +793,7 @@ static cJSON* subplanToJson(const SSubplan* subplan) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// The 'type', 'level', 'execEpSet', 'pChildern' and 'pParents' fields do not need to be serialized.
|
||||
// The 'type', 'level', 'execEpSet', 'pChildren' and 'pParents' fields do not need to be serialized.
|
||||
|
||||
bool res = addObject(jSubplan, jkSubplanId, subplanIdToJson, &subplan->id);
|
||||
if (res) {
|
||||
|
@ -835,7 +835,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
|
|||
SDataInserter* insert = (SDataInserter*)(subplan->pDataSink);
|
||||
*len = insert->size;
|
||||
*str = insert->pData;
|
||||
insert->pData == NULL;
|
||||
insert->pData = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -844,6 +844,7 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
|
|||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
*str = cJSON_Print(json);
|
||||
*len = strlen(*str) + 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -31,17 +31,20 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag)
|
|||
destroyQueryPlan(logicPlan);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = optimizeQueryPlan(logicPlan);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
destroyQueryPlan(logicPlan);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = createDag(logicPlan, NULL, pDag);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
destroyQueryPlan(logicPlan);
|
||||
qDestroyQueryDag(*pDag);
|
||||
return code;
|
||||
}
|
||||
|
||||
destroyQueryPlan(logicPlan);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
|
|||
for (int32_t m = 0; m < level->taskNum; ++m) {
|
||||
SSchTask *task = taosArrayGet(level->subTasks, m);
|
||||
SSubplan *plan = task->plan;
|
||||
int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
|
||||
int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0;
|
||||
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
|
||||
|
||||
if (childNum > 0) {
|
||||
|
@ -40,7 +40,7 @@ int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) {
|
|||
}
|
||||
|
||||
for (int32_t n = 0; n < childNum; ++n) {
|
||||
SSubplan **child = taosArrayGet(plan->pChildern, n);
|
||||
SSubplan **child = taosArrayGet(plan->pChildren, n);
|
||||
SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES);
|
||||
if (NULL == childTask || NULL == *childTask) {
|
||||
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
|
||||
|
@ -122,6 +122,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
//??
|
||||
job->attr.needFetch = true;
|
||||
|
||||
job->levelNum = levelNum;
|
||||
|
@ -455,15 +456,27 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
|
|||
|
||||
int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
||||
|
||||
switch (msgType) {
|
||||
case TDMT_VND_CREATE_TABLE_RSP: {
|
||||
if (rspCode != TSDB_CODE_SUCCESS) {
|
||||
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
|
||||
} else {
|
||||
// job->resNumOfRows += rsp->affectedRows;
|
||||
code = schProcessOnTaskSuccess(job, task);
|
||||
if (code) {
|
||||
goto _task_error;
|
||||
}
|
||||
}
|
||||
}
|
||||
case TDMT_VND_SUBMIT_RSP: {
|
||||
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
|
||||
if (rsp->code != TSDB_CODE_SUCCESS) {
|
||||
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code));
|
||||
if (rspCode != TSDB_CODE_SUCCESS) {
|
||||
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
|
||||
} else {
|
||||
SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg;
|
||||
job->resNumOfRows += rsp->affectedRows;
|
||||
|
||||
|
||||
code = schProcessOnTaskSuccess(job, task);
|
||||
if (code) {
|
||||
goto _task_error;
|
||||
|
@ -547,22 +560,29 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
|
|||
|
||||
_return:
|
||||
tfree(param);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SSchCallbackParam *pParam = (SSchCallbackParam *)param;
|
||||
qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code);
|
||||
|
@ -570,6 +590,9 @@ int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
||||
switch (msgType) {
|
||||
case TDMT_VND_CREATE_TABLE:
|
||||
*fp = schHandleCreateTableCallback;
|
||||
break;
|
||||
case TDMT_VND_SUBMIT:
|
||||
*fp = schHandleSubmitCallback;
|
||||
break;
|
||||
|
@ -640,6 +663,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
|||
int32_t code = 0;
|
||||
|
||||
switch (msgType) {
|
||||
case TDMT_VND_CREATE_TABLE:
|
||||
case TDMT_VND_SUBMIT: {
|
||||
if (NULL == task->msg || task->msgLen <= 0) {
|
||||
qError("submit msg is NULL");
|
||||
|
@ -746,19 +770,15 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
|
|||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
|
||||
// int32_t msgType = (plan->type == QUERY_TYPE_MODIFY)? TDMT_VND_SUBMIT : TDMT_VND_QUERY;
|
||||
|
||||
SCH_ERR_RET(schBuildAndSendMsg(job, task, msgType));
|
||||
|
||||
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
|
||||
SCH_ERR_RET(schPushTaskToExecList(job, task));
|
||||
|
||||
task->status = JOB_TASK_STATUS_EXECUTING;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t schLaunchJob(SSchJob *job) {
|
||||
SSchLevel *level = taosArrayGet(job->levels, job->levelIdx);
|
||||
for (int32_t i = 0; i < level->taskNum; ++i) {
|
||||
|
|
|
@ -58,7 +58,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
|
|||
scanPlan.execEpSet.numOfEps = 1;
|
||||
scanPlan.execEpSet.port[0] = 6030;
|
||||
strcpy(scanPlan.execEpSet.fqdn[0], "ep0");
|
||||
scanPlan.pChildern = NULL;
|
||||
scanPlan.pChildren = NULL;
|
||||
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
|
||||
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
||||
|
||||
|
@ -68,14 +68,14 @@ void schtBuildQueryDag(SQueryDag *dag) {
|
|||
mergePlan.type = QUERY_TYPE_MERGE;
|
||||
mergePlan.level = 0;
|
||||
mergePlan.execEpSet.numOfEps = 0;
|
||||
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
|
||||
mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
|
||||
mergePlan.pParents = NULL;
|
||||
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
||||
|
||||
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
|
||||
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
|
||||
|
||||
taosArrayPush(mergePointer->pChildern, &scanPointer);
|
||||
taosArrayPush(mergePointer->pChildren, &scanPointer);
|
||||
taosArrayPush(scanPointer->pParents, &mergePointer);
|
||||
|
||||
taosArrayPush(dag->pSubplans, &merge);
|
||||
|
@ -100,7 +100,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
|||
insertPlan[0].execEpSet.numOfEps = 1;
|
||||
insertPlan[0].execEpSet.port[0] = 6030;
|
||||
strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0");
|
||||
insertPlan[0].pChildern = NULL;
|
||||
insertPlan[0].pChildren = NULL;
|
||||
insertPlan[0].pParents = NULL;
|
||||
insertPlan[0].pNode = NULL;
|
||||
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
||||
|
@ -113,7 +113,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
|||
insertPlan[1].execEpSet.numOfEps = 1;
|
||||
insertPlan[1].execEpSet.port[0] = 6030;
|
||||
strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1");
|
||||
insertPlan[1].pChildern = NULL;
|
||||
insertPlan[1].pChildren = NULL;
|
||||
insertPlan[1].pParents = NULL;
|
||||
insertPlan[1].pNode = NULL;
|
||||
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
||||
|
|
Loading…
Reference in New Issue