|
|
|
@ -75,24 +75,16 @@ int32_t dsinkNameToDsinkType(const char* name) {
|
|
|
|
|
return DSINK_Unknown;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SDataSink* initDataSink(int32_t type, int32_t size) {
|
|
|
|
|
SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
|
|
|
|
|
sink->info.type = type;
|
|
|
|
|
sink->info.name = dsinkTypeToDsinkName(type);
|
|
|
|
|
return sink;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
|
|
|
|
SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher));
|
|
|
|
|
return (SDataSink*)dispatcher;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) {
|
|
|
|
|
SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter));
|
|
|
|
|
inserter->numOfTables = pBlocks->numOfTables;
|
|
|
|
|
inserter->size = pBlocks->size;
|
|
|
|
|
SWAP(inserter->pData, pBlocks->pData, char*);
|
|
|
|
|
return (SDataSink*)inserter;
|
|
|
|
|
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
|
|
|
|
|
dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
|
|
|
|
|
if (NULL == dst->pSchema) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
|
|
|
|
|
dst->numOfCols = src->numOfCols;
|
|
|
|
|
dst->resultRowSize = src->resultRowSize;
|
|
|
|
|
dst->precision = src->precision;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
|
|
|
|
@ -102,6 +94,10 @@ static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataB
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
memcpy(dataBlockSchema->pSchema, pPlanNode->pSchema, sizeof(SSlotSchema) * pPlanNode->numOfCols);
|
|
|
|
|
dataBlockSchema->resultRowSize = 0;
|
|
|
|
|
for (int32_t i = 0; i < dataBlockSchema->numOfCols; ++i) {
|
|
|
|
|
dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -120,13 +116,37 @@ static bool cloneExprArray(SArray** dst, SArray* src) {
|
|
|
|
|
return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
|
|
|
|
|
SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
|
|
|
|
|
sink->info.type = type;
|
|
|
|
|
sink->info.name = dsinkTypeToDsinkName(type);
|
|
|
|
|
if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
|
|
|
|
|
tfree(sink);
|
|
|
|
|
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
return sink;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
|
|
|
|
|
SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
|
|
|
|
|
inserter->numOfTables = pBlocks->numOfTables;
|
|
|
|
|
inserter->size = pBlocks->size;
|
|
|
|
|
SWAP(inserter->pData, pBlocks->pData, char*);
|
|
|
|
|
return (SDataSink*)inserter;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
|
|
|
|
|
SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
|
|
|
|
|
return (SDataSink*)dispatcher;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
|
|
|
|
|
SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
|
|
|
|
|
node->info.type = type;
|
|
|
|
|
node->info.name = opTypeToOpName(type);
|
|
|
|
|
if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
|
|
|
|
|
free(node);
|
|
|
|
|
return NULL;
|
|
|
|
|
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
return node;
|
|
|
|
|
}
|
|
|
|
@ -240,7 +260,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
|
|
|
|
|
subplan->msgType = TDMT_VND_QUERY;
|
|
|
|
|
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
|
|
|
|
|
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
|
|
|
|
|
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
|
|
|
|
|
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
|
|
|
|
|
RECOVERY_CURRENT_SUBPLAN(pCxt);
|
|
|
|
|
}
|
|
|
|
|
return pCxt->nextId.templateId++;
|
|
|
|
@ -249,6 +269,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
|
|
|
|
|
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
|
|
|
|
|
SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
|
|
|
|
|
node->srcTemplateId = srcTemplateId;
|
|
|
|
|
node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr)));
|
|
|
|
|
return (SPhyNode*)node;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -314,7 +335,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
|
|
|
|
|
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
|
|
|
|
|
|
|
|
|
|
vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
|
|
|
|
|
subplan->pDataSink = createDataInserter(pCxt, blocks);
|
|
|
|
|
subplan->pDataSink = createDataInserter(pCxt, blocks, NULL);
|
|
|
|
|
subplan->pNode = NULL;
|
|
|
|
|
subplan->type = QUERY_TYPE_MODIFY;
|
|
|
|
|
subplan->msgType = pPayload->msgType;
|
|
|
|
@ -333,7 +354,7 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
|
|
|
|
|
|
|
|
|
|
subplan->msgType = TDMT_VND_QUERY;
|
|
|
|
|
subplan->pNode = createPhyNode(pCxt, pRoot);
|
|
|
|
|
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
|
|
|
|
|
subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);
|
|
|
|
|
}
|
|
|
|
|
// todo deal subquery
|
|
|
|
|
}
|
|
|
|
@ -360,7 +381,24 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
|
|
|
|
|
//todo
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
void setExchangSourceNode(uint64_t templateId, SQueryNodeAddr* pEp, SPhyNode* pNode) {
|
|
|
|
|
if (NULL == pNode) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
if (OP_Exchange == pNode->info.type) {
|
|
|
|
|
SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
|
|
|
|
|
if (templateId == pExchange->srcTemplateId) {
|
|
|
|
|
taosArrayPush(pExchange->pSrcEndPoints, pEp);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (pNode->pChildren != NULL) {
|
|
|
|
|
size_t size = taosArrayGetSize(pNode->pChildren);
|
|
|
|
|
for(int32_t i = 0; i < size; ++i) {
|
|
|
|
|
setExchangSourceNode(templateId, pEp, taosArrayGetP(pNode->pChildren, i));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* pEp) {
|
|
|
|
|
setExchangSourceNode(templateId, pEp, subplan->pNode);
|
|
|
|
|
}
|
|
|
|
|