Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
41c08c54ff
|
@ -1041,6 +1041,7 @@ typedef struct SResFetchMsg {
|
||||||
} SResFetchMsg;
|
} SResFetchMsg;
|
||||||
|
|
||||||
typedef struct SSchTasksStatusMsg {
|
typedef struct SSchTasksStatusMsg {
|
||||||
|
SMsgHead header;
|
||||||
uint64_t sId;
|
uint64_t sId;
|
||||||
} SSchTasksStatusMsg;
|
} SSchTasksStatusMsg;
|
||||||
|
|
||||||
|
@ -1056,6 +1057,7 @@ typedef struct SSchedulerStatusRsp {
|
||||||
} SSchedulerStatusRsp;
|
} SSchedulerStatusRsp;
|
||||||
|
|
||||||
typedef struct STaskCancelMsg {
|
typedef struct STaskCancelMsg {
|
||||||
|
SMsgHead header;
|
||||||
uint64_t sId;
|
uint64_t sId;
|
||||||
uint64_t queryId;
|
uint64_t queryId;
|
||||||
uint64_t taskId;
|
uint64_t taskId;
|
||||||
|
@ -1066,6 +1068,7 @@ typedef struct STaskCancelRsp {
|
||||||
} STaskCancelRsp;
|
} STaskCancelRsp;
|
||||||
|
|
||||||
typedef struct STaskDropMsg {
|
typedef struct STaskDropMsg {
|
||||||
|
SMsgHead header;
|
||||||
uint64_t sId;
|
uint64_t sId;
|
||||||
uint64_t queryId;
|
uint64_t queryId;
|
||||||
uint64_t taskId;
|
uint64_t taskId;
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "query.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
|
||||||
|
@ -123,7 +124,7 @@ typedef struct SSubplan {
|
||||||
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
|
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
|
||||||
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
|
int32_t msgType; // message type for subplan, used to denote the send message type to vnode.
|
||||||
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
|
int32_t level; // the execution level of current subplan, starting from 0 in a top-down manner.
|
||||||
SEpSet execEpSet; // for the scan/modify subplan, the optional execution node
|
SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
|
||||||
SArray *pChildren; // the datasource subplan,from which to fetch the result
|
SArray *pChildren; // the datasource subplan,from which to fetch the result
|
||||||
SArray *pParents; // the data destination subplan, get data from current subplan
|
SArray *pParents; // the data destination subplan, get data from current subplan
|
||||||
SPhyNode *pNode; // physical plan of current subplan
|
SPhyNode *pNode; // physical plan of current subplan
|
||||||
|
@ -147,7 +148,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag**
|
||||||
// @subplan subplan to be schedule
|
// @subplan subplan to be schedule
|
||||||
// @templateId templateId of a group of datasource subplans of this @subplan
|
// @templateId templateId of a group of datasource subplans of this @subplan
|
||||||
// @ep one execution location of this group of datasource subplans
|
// @ep one execution location of this group of datasource subplans
|
||||||
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep);
|
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
|
||||||
|
|
||||||
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
|
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
|
||||||
|
|
||||||
|
|
|
@ -111,6 +111,13 @@ typedef struct SMsgSendInfo {
|
||||||
SDataBuf msgInfo;
|
SDataBuf msgInfo;
|
||||||
} SMsgSendInfo;
|
} SMsgSendInfo;
|
||||||
|
|
||||||
|
typedef struct SQueryNodeAddr{
|
||||||
|
int32_t nodeId; //vgId or qnodeId
|
||||||
|
int8_t inUse;
|
||||||
|
int8_t numOfEps;
|
||||||
|
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
|
||||||
|
} SQueryNodeAddr;
|
||||||
|
|
||||||
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
||||||
|
|
||||||
int32_t initTaskQueue();
|
int32_t initTaskQueue();
|
||||||
|
|
|
@ -50,13 +50,6 @@ typedef struct SQueryProfileSummary {
|
||||||
uint64_t resultSize; // generated result size in Kb.
|
uint64_t resultSize; // generated result size in Kb.
|
||||||
} SQueryProfileSummary;
|
} SQueryProfileSummary;
|
||||||
|
|
||||||
typedef struct SQueryNodeAddr{
|
|
||||||
int32_t nodeId; //vgId or qnodeId
|
|
||||||
int8_t inUse;
|
|
||||||
int8_t numOfEps;
|
|
||||||
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
|
|
||||||
} SQueryNodeAddr;
|
|
||||||
|
|
||||||
typedef struct SQueryResult {
|
typedef struct SQueryResult {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
uint64_t numOfRows;
|
uint64_t numOfRows;
|
||||||
|
|
|
@ -577,11 +577,16 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
|
||||||
STbCfg tbCfg;
|
STbCfg tbCfg;
|
||||||
void * pBuf;
|
void * pBuf;
|
||||||
|
|
||||||
if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) {
|
for (;;) {
|
||||||
pBuf = value.data;
|
if (pTbCur->pCur->get(pTbCur->pCur, &key, &value, DB_NEXT) == 0) {
|
||||||
metaDecodeTbInfo(pBuf, &tbCfg);
|
pBuf = value.data;
|
||||||
return tbCfg.name;
|
metaDecodeTbInfo(pBuf, &tbCfg);
|
||||||
} else {
|
if (tbCfg.type == META_SUPER_TABLE) {
|
||||||
return NULL;
|
continue;
|
||||||
|
}
|
||||||
|
return tbCfg.name;
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
|
||||||
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
|
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
|
||||||
|
|
||||||
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId);
|
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId);
|
||||||
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep);
|
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
|
||||||
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
|
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
|
||||||
int32_t stringToSubplan(const char* str, SSubplan** subplan);
|
int32_t stringToSubplan(const char* str, SSubplan** subplan);
|
||||||
|
|
||||||
|
|
|
@ -214,22 +214,22 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
|
||||||
return subplan;
|
return subplan;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SEpSet* epSet) {
|
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
|
||||||
epSet->inUse = 0; // todo
|
execNode->nodeId = vg->vgId;
|
||||||
epSet->numOfEps = vg->numOfEps;
|
execNode->inUse = 0; // todo
|
||||||
|
execNode->numOfEps = vg->numOfEps;
|
||||||
for (int8_t i = 0; i < vg->numOfEps; ++i) {
|
for (int8_t i = 0; i < vg->numOfEps; ++i) {
|
||||||
epSet->port[i] = vg->epAddr[i].port;
|
execNode->epAddr[i] = vg->epAddr[i];
|
||||||
strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
|
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SEpSet* epSet) {
|
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
|
||||||
epSet->inUse = 0; // todo
|
execNode->nodeId = vg->vgId;
|
||||||
epSet->numOfEps = vg->numOfEps;
|
execNode->inUse = 0; // todo
|
||||||
|
execNode->numOfEps = vg->numOfEps;
|
||||||
for (int8_t i = 0; i < vg->numOfEps; ++i) {
|
for (int8_t i = 0; i < vg->numOfEps; ++i) {
|
||||||
epSet->port[i] = vg->epAddr[i].port;
|
execNode->epAddr[i] = vg->epAddr[i];
|
||||||
strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
|
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -239,7 +239,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
|
||||||
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
|
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
|
||||||
STORE_CURRENT_SUBPLAN(pCxt);
|
STORE_CURRENT_SUBPLAN(pCxt);
|
||||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
|
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
|
||||||
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet);
|
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
|
||||||
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
|
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
|
||||||
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
|
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
|
||||||
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
||||||
|
@ -304,7 +304,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
|
||||||
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
|
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
|
||||||
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
|
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
|
||||||
|
|
||||||
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet);
|
vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
|
||||||
subplan->pDataSink = createDataInserter(pCxt, blocks);
|
subplan->pDataSink = createDataInserter(pCxt, blocks);
|
||||||
subplan->pNode = NULL;
|
subplan->pNode = NULL;
|
||||||
subplan->type = QUERY_TYPE_MODIFY;
|
subplan->type = QUERY_TYPE_MODIFY;
|
||||||
|
@ -351,6 +351,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) {
|
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
|
||||||
//todo
|
//todo
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) {
|
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
|
||||||
return setSubplanExecutionNode(subplan, templateId, ep);
|
return setSubplanExecutionNode(subplan, templateId, ep);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -60,17 +60,19 @@ typedef struct SSchLevel {
|
||||||
|
|
||||||
|
|
||||||
typedef struct SSchTask {
|
typedef struct SSchTask {
|
||||||
uint64_t taskId; // task id
|
uint64_t taskId; // task id
|
||||||
SSchLevel *level; // level
|
SSchLevel *level; // level
|
||||||
SSubplan *plan; // subplan
|
SSubplan *plan; // subplan
|
||||||
char *msg; // operator tree
|
char *msg; // operator tree
|
||||||
int32_t msgLen; // msg length
|
int32_t msgLen; // msg length
|
||||||
int8_t status; // task status
|
int8_t status; // task status
|
||||||
SEpAddr execAddr; // task actual executed node address
|
SQueryNodeAddr execAddr; // task actual executed node address
|
||||||
SQueryProfileSummary summary; // task execution summary
|
int8_t condidateIdx; // current try condidation index
|
||||||
int32_t childReady; // child task ready number
|
SArray *condidateAddrs; // condidate node addresses, element is SQueryNodeAddr
|
||||||
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
|
SQueryProfileSummary summary; // task execution summary
|
||||||
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
|
int32_t childReady; // child task ready number
|
||||||
|
SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask*
|
||||||
|
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
|
||||||
} SSchTask;
|
} SSchTask;
|
||||||
|
|
||||||
typedef struct SSchJobAttr {
|
typedef struct SSchJobAttr {
|
||||||
|
|
|
@ -216,28 +216,49 @@ _return:
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) {
|
int32_t schSetTaskCondidateAddrs(SSchJob *job, SSchTask *task) {
|
||||||
if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
|
if (task->condidateAddrs) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t nodeNum = taosArrayGetSize(job->nodeList);
|
task->condidateIdx = 0;
|
||||||
|
task->condidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||||
for (int32_t i = 0; i < nodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
|
if (NULL == task->condidateAddrs) {
|
||||||
SEpAddr *addr = taosArrayGet(job->nodeList, i);
|
qError("taosArrayInit failed");
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
|
|
||||||
epSet->port[epSet->numOfEps] = addr->port;
|
|
||||||
|
|
||||||
++epSet->numOfEps;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) {
|
if (task->plan->execNode.numOfEps > 0) {
|
||||||
|
if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) {
|
||||||
|
qError("taosArrayPush failed");
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t addNum = 0;
|
||||||
|
int32_t nodeNum = taosArrayGetSize(job->nodeList);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||||
|
SQueryNodeAddr *naddr = taosArrayGet(job->nodeList, i);
|
||||||
|
|
||||||
|
if (NULL == taosArrayPush(task->condidateAddrs, &task->plan->execNode)) {
|
||||||
|
qError("taosArrayPush failed");
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
++addNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||||
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
|
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
|
||||||
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
|
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
|
||||||
|
|
||||||
++epSet->numOfEps;
|
++epSet->numOfEps;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -384,8 +405,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
|
strncpy(job->resEp.fqdn, task->execAddr.epAddr[task->execAddr.inUse].fqdn, sizeof(job->resEp.fqdn));
|
||||||
job->resEp.port = task->execAddr.port;
|
job->resEp.port = task->execAddr.epAddr[task->execAddr.inUse].port;
|
||||||
}
|
}
|
||||||
|
|
||||||
job->fetchTask = task;
|
job->fetchTask = task;
|
||||||
|
@ -395,12 +416,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
|
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
|
||||||
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
|
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
|
||||||
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
|
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
|
||||||
|
|
||||||
++job->dataSrcEps.numOfEps;
|
++job->dataSrcEps.numOfEps;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
for (int32_t i = 0; i < parentNum; ++i) {
|
for (int32_t i = 0; i < parentNum; ++i) {
|
||||||
SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
|
SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i);
|
||||||
|
@ -656,6 +679,16 @@ _return:
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) {
|
||||||
|
epSet->inUse = addr->inUse;
|
||||||
|
epSet->numOfEps = addr->numOfEps;
|
||||||
|
|
||||||
|
for (int8_t i = 0; i < epSet->numOfEps; ++i) {
|
||||||
|
strncpy(epSet->fqdn[i], addr->epAddr[i].fqdn, sizeof(addr->epAddr[i].fqdn));
|
||||||
|
epSet->port[i] = addr->epAddr[i].port;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||||
uint32_t msgSize = 0;
|
uint32_t msgSize = 0;
|
||||||
|
@ -689,6 +722,7 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||||
|
|
||||||
SSubQueryMsg *pMsg = msg;
|
SSubQueryMsg *pMsg = msg;
|
||||||
|
|
||||||
|
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
|
||||||
pMsg->sId = htobe64(schMgmt.sId);
|
pMsg->sId = htobe64(schMgmt.sId);
|
||||||
pMsg->queryId = htobe64(job->queryId);
|
pMsg->queryId = htobe64(job->queryId);
|
||||||
pMsg->taskId = htobe64(task->taskId);
|
pMsg->taskId = htobe64(task->taskId);
|
||||||
|
@ -705,6 +739,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SResReadyMsg *pMsg = msg;
|
SResReadyMsg *pMsg = msg;
|
||||||
|
|
||||||
|
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
|
||||||
pMsg->sId = htobe64(schMgmt.sId);
|
pMsg->sId = htobe64(schMgmt.sId);
|
||||||
pMsg->queryId = htobe64(job->queryId);
|
pMsg->queryId = htobe64(job->queryId);
|
||||||
pMsg->taskId = htobe64(task->taskId);
|
pMsg->taskId = htobe64(task->taskId);
|
||||||
|
@ -722,6 +758,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SResFetchMsg *pMsg = msg;
|
SResFetchMsg *pMsg = msg;
|
||||||
|
|
||||||
|
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
|
||||||
pMsg->sId = htobe64(schMgmt.sId);
|
pMsg->sId = htobe64(schMgmt.sId);
|
||||||
pMsg->queryId = htobe64(job->queryId);
|
pMsg->queryId = htobe64(job->queryId);
|
||||||
pMsg->taskId = htobe64(task->taskId);
|
pMsg->taskId = htobe64(task->taskId);
|
||||||
|
@ -736,6 +774,8 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskDropMsg *pMsg = msg;
|
STaskDropMsg *pMsg = msg;
|
||||||
|
|
||||||
|
pMsg->header.vgId = htonl(task->plan->execNode.nodeId);
|
||||||
pMsg->sId = htobe64(schMgmt.sId);
|
pMsg->sId = htobe64(schMgmt.sId);
|
||||||
pMsg->queryId = htobe64(job->queryId);
|
pMsg->queryId = htobe64(job->queryId);
|
||||||
pMsg->taskId = htobe64(task->taskId);
|
pMsg->taskId = htobe64(task->taskId);
|
||||||
|
@ -747,7 +787,12 @@ int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schAsyncSendMsg(job->transport, &task->plan->execEpSet, job->queryId, task->taskId, msgType, msg, msgSize));
|
SEpSet epSet;
|
||||||
|
SQueryNodeAddr *addr = taosArrayGet(task->condidateAddrs, task->condidateIdx);
|
||||||
|
|
||||||
|
schConvertAddrToEpSet(addr, &epSet);
|
||||||
|
|
||||||
|
SCH_ERR_JRET(schAsyncSendMsg(job->transport, &epSet, job->queryId, task->taskId, msgType, msg, msgSize));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -761,12 +806,10 @@ _return:
|
||||||
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
|
int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
|
||||||
SSubplan *plan = task->plan;
|
SSubplan *plan = task->plan;
|
||||||
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
|
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
|
||||||
if (plan->execEpSet.numOfEps <= 0) {
|
SCH_ERR_RET(schSetTaskCondidateAddrs(job, task));
|
||||||
SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (plan->execEpSet.numOfEps <= 0) {
|
if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) {
|
||||||
SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
|
SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId);
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -54,11 +54,13 @@ void schtBuildQueryDag(SQueryDag *dag) {
|
||||||
scanPlan.id.templateId = 0x0000000000000002;
|
scanPlan.id.templateId = 0x0000000000000002;
|
||||||
scanPlan.id.subplanId = 0x0000000000000003;
|
scanPlan.id.subplanId = 0x0000000000000003;
|
||||||
scanPlan.type = QUERY_TYPE_SCAN;
|
scanPlan.type = QUERY_TYPE_SCAN;
|
||||||
scanPlan.level = 1;
|
scanPlan.execNode.numOfEps = 1;
|
||||||
scanPlan.execEpSet.numOfEps = 1;
|
scanPlan.execNode.nodeId = 1;
|
||||||
scanPlan.execEpSet.port[0] = 6030;
|
scanPlan.execNode.inUse = 0;
|
||||||
strcpy(scanPlan.execEpSet.fqdn[0], "ep0");
|
scanPlan.execNode.epAddr[0].port = 6030;
|
||||||
|
strcpy(scanPlan.execNode.epAddr[0].fqdn, "ep0");
|
||||||
scanPlan.pChildren = NULL;
|
scanPlan.pChildren = NULL;
|
||||||
|
scanPlan.level = 1;
|
||||||
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
|
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
|
||||||
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
||||||
|
|
||||||
|
@ -67,7 +69,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
|
||||||
mergePlan.id.subplanId = 0x5555555555;
|
mergePlan.id.subplanId = 0x5555555555;
|
||||||
mergePlan.type = QUERY_TYPE_MERGE;
|
mergePlan.type = QUERY_TYPE_MERGE;
|
||||||
mergePlan.level = 0;
|
mergePlan.level = 0;
|
||||||
mergePlan.execEpSet.numOfEps = 0;
|
mergePlan.execNode.numOfEps = 0;
|
||||||
mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
|
mergePlan.pChildren = taosArrayInit(1, POINTER_BYTES);
|
||||||
mergePlan.pParents = NULL;
|
mergePlan.pParents = NULL;
|
||||||
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
|
||||||
|
@ -97,9 +99,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
insertPlan[0].id.subplanId = 0x0000000000000004;
|
insertPlan[0].id.subplanId = 0x0000000000000004;
|
||||||
insertPlan[0].type = QUERY_TYPE_MODIFY;
|
insertPlan[0].type = QUERY_TYPE_MODIFY;
|
||||||
insertPlan[0].level = 0;
|
insertPlan[0].level = 0;
|
||||||
insertPlan[0].execEpSet.numOfEps = 1;
|
insertPlan[0].execNode.numOfEps = 1;
|
||||||
insertPlan[0].execEpSet.port[0] = 6030;
|
insertPlan[0].execNode.nodeId = 1;
|
||||||
strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0");
|
insertPlan[0].execNode.inUse = 0;
|
||||||
|
insertPlan[0].execNode.epAddr[0].port = 6030;
|
||||||
|
strcpy(insertPlan[0].execNode.epAddr[0].fqdn, "ep0");
|
||||||
insertPlan[0].pChildren = NULL;
|
insertPlan[0].pChildren = NULL;
|
||||||
insertPlan[0].pParents = NULL;
|
insertPlan[0].pParents = NULL;
|
||||||
insertPlan[0].pNode = NULL;
|
insertPlan[0].pNode = NULL;
|
||||||
|
@ -110,9 +114,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
insertPlan[1].id.subplanId = 0x0000000000000005;
|
insertPlan[1].id.subplanId = 0x0000000000000005;
|
||||||
insertPlan[1].type = QUERY_TYPE_MODIFY;
|
insertPlan[1].type = QUERY_TYPE_MODIFY;
|
||||||
insertPlan[1].level = 0;
|
insertPlan[1].level = 0;
|
||||||
insertPlan[1].execEpSet.numOfEps = 1;
|
insertPlan[1].execNode.numOfEps = 1;
|
||||||
insertPlan[1].execEpSet.port[0] = 6030;
|
insertPlan[1].execNode.nodeId = 1;
|
||||||
strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1");
|
insertPlan[1].execNode.inUse = 1;
|
||||||
|
insertPlan[1].execNode.epAddr[0].port = 6030;
|
||||||
|
strcpy(insertPlan[1].execNode.epAddr[0].fqdn, "ep1");
|
||||||
insertPlan[1].pChildren = NULL;
|
insertPlan[1].pChildren = NULL;
|
||||||
insertPlan[1].pParents = NULL;
|
insertPlan[1].pParents = NULL;
|
||||||
insertPlan[1].pNode = NULL;
|
insertPlan[1].pNode = NULL;
|
||||||
|
@ -132,7 +138,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) {
|
int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -88,13 +88,9 @@ void tEndEncode(SCoder* pCoder) {
|
||||||
pCoder->size = pNode->size;
|
pCoder->size = pNode->size;
|
||||||
pCoder->pos = pNode->pos;
|
pCoder->pos = pNode->pos;
|
||||||
|
|
||||||
if (TD_RT_ENDIAN() == pCoder->endian) {
|
tEncodeI32(pCoder, len);
|
||||||
tPut(int32_t, pCoder->data + pCoder->pos, len);
|
|
||||||
} else {
|
|
||||||
tRPut32(pCoder->data + pCoder->pos, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
TD_CODER_MOVE_POS(pCoder, len + sizeof(int32_t));
|
TD_CODER_MOVE_POS(pCoder, len);
|
||||||
|
|
||||||
free(pNode);
|
free(pNode);
|
||||||
}
|
}
|
||||||
|
@ -127,15 +123,14 @@ void tEndDecode(SCoder* pCoder) {
|
||||||
struct SCoderNode* pNode;
|
struct SCoderNode* pNode;
|
||||||
|
|
||||||
ASSERT(pCoder->type == TD_DECODER);
|
ASSERT(pCoder->type == TD_DECODER);
|
||||||
ASSERT(tDecodeIsEnd(pCoder));
|
|
||||||
|
|
||||||
pNode = TD_SLIST_HEAD(&(pCoder->stack));
|
pNode = TD_SLIST_HEAD(&(pCoder->stack));
|
||||||
ASSERT(pNode);
|
ASSERT(pNode);
|
||||||
TD_SLIST_POP(&(pCoder->stack));
|
TD_SLIST_POP(&(pCoder->stack));
|
||||||
|
|
||||||
pCoder->data = pNode->data;
|
pCoder->data = pNode->data;
|
||||||
|
pCoder->pos = pCoder->size + pNode->pos;
|
||||||
pCoder->size = pNode->size;
|
pCoder->size = pNode->size;
|
||||||
pCoder->pos = pCoder->pos + pNode->pos;
|
|
||||||
|
|
||||||
free(pNode);
|
free(pNode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -360,8 +360,8 @@ TEST(td_encode_test, compound_struct_encode_test) {
|
||||||
SStructA_v2 sa2 = {.A_a = 10, .A_b = 65478, .A_c = "Hello", .A_d = 67, .A_e = 13};
|
SStructA_v2 sa2 = {.A_a = 10, .A_b = 65478, .A_c = "Hello", .A_d = 67, .A_e = 13};
|
||||||
SFinalReq_v1 req1 = {.pA = &sa1, .v_a = 15, .v_b = 35};
|
SFinalReq_v1 req1 = {.pA = &sa1, .v_a = 15, .v_b = 35};
|
||||||
SFinalReq_v2 req2 = {.pA = &sa2, .v_a = 15, .v_b = 32, .v_c = 37};
|
SFinalReq_v2 req2 = {.pA = &sa2, .v_a = 15, .v_b = 32, .v_c = 37};
|
||||||
SFinalReq_v1 dreq1;
|
SFinalReq_v1 dreq11, dreq21;
|
||||||
SFinalReq_v2 dreq21, dreq22;
|
SFinalReq_v2 dreq12, dreq22;
|
||||||
|
|
||||||
// Get size
|
// Get size
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, nullptr, 0, TD_ENCODER);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, nullptr, 0, TD_ENCODER);
|
||||||
|
@ -386,27 +386,30 @@ TEST(td_encode_test, compound_struct_encode_test) {
|
||||||
tCoderClear(&encoder);
|
tCoderClear(&encoder);
|
||||||
|
|
||||||
// Decode
|
// Decode
|
||||||
|
// buf1 -> req1
|
||||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_DECODER);
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_DECODER);
|
||||||
GTEST_ASSERT_EQ(tSFinalReq_v1_decode(&decoder, &dreq1), 0);
|
GTEST_ASSERT_EQ(tSFinalReq_v1_decode(&decoder, &dreq11), 0);
|
||||||
GTEST_ASSERT_EQ(dreq1.pA->A_a, req1.pA->A_a);
|
GTEST_ASSERT_EQ(dreq11.pA->A_a, req1.pA->A_a);
|
||||||
GTEST_ASSERT_EQ(dreq1.pA->A_b, req1.pA->A_b);
|
GTEST_ASSERT_EQ(dreq11.pA->A_b, req1.pA->A_b);
|
||||||
GTEST_ASSERT_EQ(strcmp(dreq1.pA->A_c, req1.pA->A_c), 0);
|
GTEST_ASSERT_EQ(strcmp(dreq11.pA->A_c, req1.pA->A_c), 0);
|
||||||
GTEST_ASSERT_EQ(dreq1.v_a, req1.v_a);
|
GTEST_ASSERT_EQ(dreq11.v_a, req1.v_a);
|
||||||
GTEST_ASSERT_EQ(dreq1.v_b, req1.v_b);
|
GTEST_ASSERT_EQ(dreq11.v_b, req1.v_b);
|
||||||
tCoderClear(&decoder);
|
tCoderClear(&decoder);
|
||||||
|
|
||||||
|
// buf1 -> req2 (backward compatibility)
|
||||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_DECODER);
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf1, buf1size, TD_DECODER);
|
||||||
GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq21), 0);
|
GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq12), 0);
|
||||||
GTEST_ASSERT_EQ(dreq21.pA->A_a, req1.pA->A_a);
|
GTEST_ASSERT_EQ(dreq12.pA->A_a, req1.pA->A_a);
|
||||||
GTEST_ASSERT_EQ(dreq21.pA->A_b, req1.pA->A_b);
|
GTEST_ASSERT_EQ(dreq12.pA->A_b, req1.pA->A_b);
|
||||||
GTEST_ASSERT_EQ(strcmp(dreq21.pA->A_c, req1.pA->A_c), 0);
|
GTEST_ASSERT_EQ(strcmp(dreq12.pA->A_c, req1.pA->A_c), 0);
|
||||||
GTEST_ASSERT_EQ(dreq21.pA->A_d, 0);
|
GTEST_ASSERT_EQ(dreq12.pA->A_d, 0);
|
||||||
GTEST_ASSERT_EQ(dreq21.pA->A_e, 0);
|
GTEST_ASSERT_EQ(dreq12.pA->A_e, 0);
|
||||||
GTEST_ASSERT_EQ(dreq21.v_a, req1.v_a);
|
GTEST_ASSERT_EQ(dreq12.v_a, req1.v_a);
|
||||||
GTEST_ASSERT_EQ(dreq21.v_b, req1.v_b);
|
GTEST_ASSERT_EQ(dreq12.v_b, req1.v_b);
|
||||||
GTEST_ASSERT_EQ(dreq21.v_c, 0);
|
GTEST_ASSERT_EQ(dreq12.v_c, 0);
|
||||||
tCoderClear(&decoder);
|
tCoderClear(&decoder);
|
||||||
|
|
||||||
|
// buf2 -> req2
|
||||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf2, buf2size, TD_DECODER);
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf2, buf2size, TD_DECODER);
|
||||||
GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq22), 0);
|
GTEST_ASSERT_EQ(tSFinalReq_v2_decode(&decoder, &dreq22), 0);
|
||||||
GTEST_ASSERT_EQ(dreq22.pA->A_a, req2.pA->A_a);
|
GTEST_ASSERT_EQ(dreq22.pA->A_a, req2.pA->A_a);
|
||||||
|
@ -418,4 +421,13 @@ TEST(td_encode_test, compound_struct_encode_test) {
|
||||||
GTEST_ASSERT_EQ(dreq22.v_b, req2.v_b);
|
GTEST_ASSERT_EQ(dreq22.v_b, req2.v_b);
|
||||||
GTEST_ASSERT_EQ(dreq22.v_c, req2.v_c);
|
GTEST_ASSERT_EQ(dreq22.v_c, req2.v_c);
|
||||||
tCoderClear(&decoder);
|
tCoderClear(&decoder);
|
||||||
|
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf2, buf2size, TD_DECODER);
|
||||||
|
GTEST_ASSERT_EQ(tSFinalReq_v1_decode(&decoder, &dreq21), 0);
|
||||||
|
GTEST_ASSERT_EQ(dreq21.pA->A_a, req2.pA->A_a);
|
||||||
|
GTEST_ASSERT_EQ(dreq21.pA->A_b, req2.pA->A_b);
|
||||||
|
GTEST_ASSERT_EQ(strcmp(dreq21.pA->A_c, req2.pA->A_c), 0);
|
||||||
|
GTEST_ASSERT_EQ(dreq21.v_a, req2.v_a);
|
||||||
|
GTEST_ASSERT_EQ(dreq21.v_b, req2.v_b);
|
||||||
|
tCoderClear(&decoder);
|
||||||
}
|
}
|
Loading…
Reference in New Issue