Merge remote-tracking branch 'origin/feature/3.0_wxy' into feature/qnode

This commit is contained in:
dapan1121 2021-12-31 17:32:17 +08:00
commit 9462b1531a
4 changed files with 21 additions and 20 deletions

View File

@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "query.h"
#include "tmsg.h" #include "tmsg.h"
#include "tarray.h" #include "tarray.h"
@ -122,7 +123,7 @@ typedef struct SSubplan {
SSubplanId id; // unique id of the subplan SSubplanId id; // unique id of the subplan
int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY int32_t type; // QUERY_TYPE_MERGE|QUERY_TYPE_PARTIAL|QUERY_TYPE_SCAN|QUERY_TYPE_MODIFY
int32_t level; // the execution level of current subplan, starting from 0. int32_t level; // the execution level of current subplan, starting from 0.
SEpSet execEpSet; // for the scan/modify subplan, the optional execution node SQueryNodeAddr execNode; // for the scan/modify subplan, the optional execution node
SArray *pChildern; // the datasource subplan,from which to fetch the result SArray *pChildern; // the datasource subplan,from which to fetch the result
SArray *pParents; // the data destination subplan, get data from current subplan SArray *pParents; // the data destination subplan, get data from current subplan
SPhyNode *pNode; // physical plan of current subplan SPhyNode *pNode; // physical plan of current subplan

View File

@ -111,6 +111,13 @@ typedef struct SMsgSendInfo {
SDataBuf msgInfo; SDataBuf msgInfo;
} SMsgSendInfo; } SMsgSendInfo;
typedef struct SQueryNodeAddr{
int32_t nodeId; //vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t initTaskQueue(); int32_t initTaskQueue();

View File

@ -50,13 +50,6 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb. uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary; } SQueryProfileSummary;
typedef struct SQueryNodeAddr{
int32_t nodeId; //vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddrMsg epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
typedef struct SQueryResult { typedef struct SQueryResult {
int32_t code; int32_t code;
uint64_t numOfRows; uint64_t numOfRows;

View File

@ -211,22 +211,22 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {
return subplan; return subplan;
} }
static void vgroupInfoToEpSet(const SVgroupInfo* vg, SEpSet* epSet) { static void vgroupInfoToEpSet(const SVgroupInfo* vg, SQueryNodeAddr* execNode) {
epSet->inUse = 0; // todo execNode->nodeId = vg->vgId;
epSet->numOfEps = vg->numOfEps; execNode->inUse = 0; // todo
execNode->numOfEps = vg->numOfEps;
for (int8_t i = 0; i < vg->numOfEps; ++i) { for (int8_t i = 0; i < vg->numOfEps; ++i) {
epSet->port[i] = vg->epAddr[i].port; execNode->epAddr[i] = vg->epAddr[i];
strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
} }
return; return;
} }
static void vgroupMsgToEpSet(const SVgroupMsg* vg, SEpSet* epSet) { static void vgroupMsgToEpSet(const SVgroupMsg* vg, SQueryNodeAddr* execNode) {
epSet->inUse = 0; // todo execNode->nodeId = vg->vgId;
epSet->numOfEps = vg->numOfEps; execNode->inUse = 0; // todo
execNode->numOfEps = vg->numOfEps;
for (int8_t i = 0; i < vg->numOfEps; ++i) { for (int8_t i = 0; i < vg->numOfEps; ++i) {
epSet->port[i] = vg->epAddr[i].port; execNode->epAddr[i] = vg->epAddr[i];
strcpy(epSet->fqdn[i], vg->epAddr[i].fqdn);
} }
return; return;
} }
@ -236,7 +236,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) { for (int32_t i = 0; i < pTable->pMeta->vgroupList->numOfVgroups; ++i) {
STORE_CURRENT_SUBPLAN(pCxt); STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execEpSet); vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable); subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode); subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
RECOVERY_CURRENT_SUBPLAN(pCxt); RECOVERY_CURRENT_SUBPLAN(pCxt);
@ -297,7 +297,7 @@ static void splitInsertSubplan(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
STORE_CURRENT_SUBPLAN(pCxt); STORE_CURRENT_SUBPLAN(pCxt);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY);
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(vgs, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execEpSet); vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
subplan->pNode = NULL; subplan->pNode = NULL;
subplan->pDataSink = createDataInserter(pCxt, blocks); subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->type = QUERY_TYPE_MODIFY; subplan->type = QUERY_TYPE_MODIFY;