Merge pull request #9796 from taosdata/feature/3.0_wxy
TD-12678 single table aggregation physical plan code
This commit is contained in:
commit
59780727df
|
@ -119,6 +119,25 @@ typedef struct SExchangePhyNode {
|
|||
SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode
|
||||
} SExchangePhyNode;
|
||||
|
||||
typedef enum EAggAlgo {
|
||||
AGG_ALGO_PLAIN = 1, // simple agg across all input rows
|
||||
AGG_ALGO_SORTED, // grouped agg, input must be sorted
|
||||
AGG_ALGO_HASHED // grouped agg, use internal hashtable
|
||||
} EAggAlgo;
|
||||
|
||||
typedef enum EAggSplit {
|
||||
AGG_SPLIT_PRE = 1, // first level agg, maybe don't need calculate the final result
|
||||
AGG_SPLIT_FINAL // second level agg, must calculate the final result
|
||||
} EAggSplit;
|
||||
|
||||
typedef struct SAggPhyNode {
|
||||
SPhyNode node;
|
||||
EAggAlgo aggAlgo; // algorithm used by agg operator
|
||||
EAggSplit aggSplit; // distributed splitting mode
|
||||
SArray *pExprs; // SExprInfo list, these are expression list of group_by_clause and parameter expression of aggregate function
|
||||
SArray *pGroupByList; // SColIndex list, but these must be column node
|
||||
} SAggPhyNode;
|
||||
|
||||
typedef struct SSubplanId {
|
||||
uint64_t queryId;
|
||||
uint64_t templateId;
|
||||
|
|
|
@ -30,7 +30,7 @@ OP_ENUM_MACRO(TagScan)
|
|||
OP_ENUM_MACRO(SystemTableScan)
|
||||
OP_ENUM_MACRO(Aggregate)
|
||||
OP_ENUM_MACRO(Project)
|
||||
OP_ENUM_MACRO(Groupby)
|
||||
// OP_ENUM_MACRO(Groupby)
|
||||
OP_ENUM_MACRO(Limit)
|
||||
OP_ENUM_MACRO(SLimit)
|
||||
OP_ENUM_MACRO(TimeWindow)
|
||||
|
|
|
@ -189,7 +189,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
|
|||
return (SPhyNode*)node;
|
||||
}
|
||||
|
||||
|
||||
static bool isSystemTable(SQueryTableInfo* pTable) {
|
||||
// todo
|
||||
return false;
|
||||
|
@ -295,13 +294,31 @@ static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTabl
|
|||
|
||||
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
|
||||
|
||||
if (needMultiNodeScan(pTable)) {
|
||||
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
|
||||
}
|
||||
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
|
||||
}
|
||||
|
||||
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
|
||||
SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
|
||||
node->aggAlgo = AGG_ALGO_PLAIN;
|
||||
node->aggSplit = AGG_SPLIT_FINAL;
|
||||
if (NULL != pGroupBy) {
|
||||
node->aggAlgo = AGG_ALGO_HASHED;
|
||||
node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
|
||||
}
|
||||
return (SPhyNode*)node;
|
||||
}
|
||||
|
||||
static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
// if (needMultiNodeAgg(pPlanNode)) {
|
||||
|
||||
// }
|
||||
return createSingleTableAgg(pCxt, pPlanNode);
|
||||
}
|
||||
|
||||
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||
SPhyNode* node = NULL;
|
||||
switch (pPlanNode->info.type) {
|
||||
|
@ -311,6 +328,10 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
|||
case QNODE_TABLESCAN:
|
||||
node = createTableScanNode(pCxt, pPlanNode);
|
||||
break;
|
||||
case QNODE_AGGREGATE:
|
||||
case QNODE_GROUPBY:
|
||||
node = createAggNode(pCxt, pPlanNode);
|
||||
break;
|
||||
case QNODE_MODIFY:
|
||||
// Insert is not an operator in a physical plan.
|
||||
break;
|
||||
|
|
|
@ -20,6 +20,19 @@
|
|||
typedef bool (*FToJson)(const void* obj, cJSON* json);
|
||||
typedef bool (*FFromJson)(const cJSON* json, void* obj);
|
||||
|
||||
static char* getString(const cJSON* json, const char* name) {
|
||||
char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name));
|
||||
return strdup(p);
|
||||
}
|
||||
|
||||
static void copyString(const cJSON* json, const char* name, char* dst) {
|
||||
strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name)));
|
||||
}
|
||||
|
||||
static int64_t getNumber(const cJSON* json, const char* name) {
|
||||
return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
|
||||
}
|
||||
|
||||
static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) {
|
||||
if (NULL == obj) {
|
||||
return true;
|
||||
|
@ -62,6 +75,39 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
|
|||
return func(jObj, *obj);
|
||||
}
|
||||
|
||||
static const char* jkPnodeType = "Type";
|
||||
static int32_t getPnodeTypeSize(cJSON* json) {
|
||||
switch (getNumber(json, jkPnodeType)) {
|
||||
case OP_TableScan:
|
||||
case OP_DataBlocksOptScan:
|
||||
case OP_TableSeqScan:
|
||||
return sizeof(STableScanPhyNode);
|
||||
case OP_TagScan:
|
||||
return sizeof(STagScanPhyNode);
|
||||
case OP_SystemTableScan:
|
||||
return sizeof(SSystemTableScanPhyNode);
|
||||
case OP_Aggregate:
|
||||
return sizeof(SAggPhyNode);
|
||||
case OP_Exchange:
|
||||
return sizeof(SExchangePhyNode);
|
||||
default:
|
||||
break;
|
||||
};
|
||||
return -1;
|
||||
}
|
||||
|
||||
static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void** obj) {
|
||||
cJSON* jObj = cJSON_GetObjectItem(json, name);
|
||||
if (NULL == jObj) {
|
||||
return true;
|
||||
}
|
||||
*obj = calloc(1, getPnodeTypeSize(jObj));
|
||||
if (NULL == *obj) {
|
||||
return false;
|
||||
}
|
||||
return func(jObj, *obj);
|
||||
}
|
||||
|
||||
static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
|
||||
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
|
||||
if (size > 0) {
|
||||
|
@ -154,26 +200,9 @@ static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson
|
|||
return fromItem(jArray, func, *array, itemSize, *size);
|
||||
}
|
||||
|
||||
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void** array, int32_t itemSize, int32_t* size) {
|
||||
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void* array, int32_t itemSize, int32_t* size) {
|
||||
const cJSON* jArray = getArray(json, name, size);
|
||||
if (*array == NULL) {
|
||||
*array = calloc(*size, itemSize);
|
||||
}
|
||||
|
||||
return fromItem(jArray, func, *array, itemSize, *size);
|
||||
}
|
||||
|
||||
static char* getString(const cJSON* json, const char* name) {
|
||||
char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name));
|
||||
return strdup(p);
|
||||
}
|
||||
|
||||
static void copyString(const cJSON* json, const char* name, char* dst) {
|
||||
strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name)));
|
||||
}
|
||||
|
||||
static int64_t getNumber(const cJSON* json, const char* name) {
|
||||
return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
|
||||
return fromItem(jArray, func, array, itemSize, *size);
|
||||
}
|
||||
|
||||
static const char* jkSchemaType = "Type";
|
||||
|
@ -221,7 +250,7 @@ static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) {
|
|||
schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize);
|
||||
schema->precision = getNumber(json, jkDataBlockSchemaPrecision);
|
||||
|
||||
return fromRawArray(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**) &(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols);
|
||||
return fromRawArrayWithAlloc(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**)&(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols);
|
||||
}
|
||||
|
||||
static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr";
|
||||
|
@ -534,19 +563,15 @@ static const char* jkScanNodeTableCount = "Count";
|
|||
static bool scanNodeToJson(const void* obj, cJSON* json) {
|
||||
const SScanPhyNode* scan = (const SScanPhyNode*)obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, scan->uid);
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, scan->tableType);
|
||||
}
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, scan->order);
|
||||
}
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, scan->count);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -559,6 +584,66 @@ static bool scanNodeFromJson(const cJSON* json, void* obj) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static const char* jkColIndexColId = "ColId";
|
||||
static const char* jkColIndexColIndex = "ColIndex";
|
||||
static const char* jkColIndexFlag = "Flag";
|
||||
static const char* jkColIndexName = "Name";
|
||||
|
||||
static bool colIndexToJson(const void* obj, cJSON* json) {
|
||||
const SColIndex* col = (const SColIndex*)obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkColIndexColId, col->colId);
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkColIndexColIndex, col->colIndex);
|
||||
}
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkColIndexFlag, col->flag);
|
||||
}
|
||||
if (res) {
|
||||
res = cJSON_AddStringToObject(json, jkColIndexName, col->name);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static bool colIndexFromJson(const cJSON* json, void* obj) {
|
||||
SColIndex* col = (SColIndex*)obj;
|
||||
col->colId = getNumber(json, jkColIndexColId);
|
||||
col->colIndex = getNumber(json, jkColIndexColIndex);
|
||||
col->flag = getNumber(json, jkColIndexFlag);
|
||||
copyString(json, jkColIndexName, col->name);
|
||||
return true;
|
||||
}
|
||||
|
||||
static const char* jkAggNodeAggAlgo = "AggAlgo";
|
||||
static const char* jkAggNodeAggSplit = "AggSplit";
|
||||
static const char* jkAggNodeExprs = "Exprs";
|
||||
static const char* jkAggNodeGroupByList = "GroupByList";
|
||||
|
||||
static bool aggNodeToJson(const void* obj, cJSON* json) {
|
||||
const SAggPhyNode* agg = (const SAggPhyNode*)obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkAggNodeAggAlgo, agg->aggAlgo);
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkAggNodeAggSplit, agg->aggSplit);
|
||||
}
|
||||
if (res) {
|
||||
res = addArray(json, jkAggNodeExprs, exprInfoToJson, agg->pExprs);
|
||||
}
|
||||
if (res) {
|
||||
res = addArray(json, jkAggNodeGroupByList, colIndexToJson, agg->pGroupByList);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static bool aggNodeFromJson(const cJSON* json, void* obj) {
|
||||
SAggPhyNode* agg = (SAggPhyNode*)obj;
|
||||
agg->aggAlgo = getNumber(json, jkAggNodeAggAlgo);
|
||||
agg->aggSplit = getNumber(json, jkAggNodeAggSplit);
|
||||
bool res = fromArray(json, jkAggNodeExprs, exprInfoFromJson, &agg->pExprs, sizeof(SExprInfo));
|
||||
if (res) {
|
||||
res = fromArray(json, jkAggNodeGroupByList, colIndexFromJson, &agg->pGroupByList, sizeof(SExprInfo));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static const char* jkTableScanNodeFlag = "Flag";
|
||||
static const char* jkTableScanNodeWindow = "Window";
|
||||
static const char* jkTableScanNodeTagsConditions = "TagsConditions";
|
||||
|
@ -667,10 +752,10 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
|
|||
case OP_SystemTableScan:
|
||||
return scanNodeToJson(obj, json);
|
||||
case OP_Aggregate:
|
||||
break; // todo
|
||||
return aggNodeToJson(obj, json);
|
||||
case OP_Project:
|
||||
return true;
|
||||
case OP_Groupby:
|
||||
// case OP_Groupby:
|
||||
case OP_Limit:
|
||||
case OP_SLimit:
|
||||
case OP_TimeWindow:
|
||||
|
@ -708,7 +793,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
|
|||
break; // todo
|
||||
case OP_Project:
|
||||
return true;
|
||||
case OP_Groupby:
|
||||
// case OP_Groupby:
|
||||
case OP_Limit:
|
||||
case OP_SLimit:
|
||||
case OP_TimeWindow:
|
||||
|
@ -735,12 +820,15 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
|
|||
static const char* jkPnodeName = "Name";
|
||||
static const char* jkPnodeTargets = "Targets";
|
||||
static const char* jkPnodeConditions = "Conditions";
|
||||
static const char* jkPnodeSchema = "InputSchema";
|
||||
static const char* jkPnodeSchema = "TargetSchema";
|
||||
static const char* jkPnodeChildren = "Children";
|
||||
// The 'pParent' field do not need to be serialized.
|
||||
static bool phyNodeToJson(const void* obj, cJSON* jNode) {
|
||||
const SPhyNode* phyNode = (const SPhyNode*)obj;
|
||||
bool res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name);
|
||||
bool res = cJSON_AddNumberToObject(jNode, jkPnodeType, phyNode->info.type);
|
||||
if (res) {
|
||||
res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name);
|
||||
}
|
||||
if (res) {
|
||||
res = addArray(jNode, jkPnodeTargets, exprInfoToJson, phyNode->pTargets);
|
||||
}
|
||||
|
@ -762,8 +850,8 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
|
|||
static bool phyNodeFromJson(const cJSON* json, void* obj) {
|
||||
SPhyNode* node = (SPhyNode*) obj;
|
||||
|
||||
node->info.name = getString(json, jkPnodeName);
|
||||
node->info.type = opNameToOpType(node->info.name);
|
||||
node->info.type = getNumber(json, jkPnodeType);
|
||||
node->info.name = opTypeToOpName(node->info.type);
|
||||
|
||||
bool res = fromArray(json, jkPnodeTargets, exprInfoFromJson, &node->pTargets, sizeof(SExprInfo));
|
||||
if (res) {
|
||||
|
@ -910,8 +998,7 @@ static SSubplan* subplanFromJson(const cJSON* json) {
|
|||
}
|
||||
bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true);
|
||||
if (res) {
|
||||
size_t size = MAX(sizeof(SPhyNode), sizeof(SScanPhyNode));
|
||||
res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, size, false);
|
||||
res = fromPnode(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode);
|
||||
}
|
||||
if (res) {
|
||||
res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false);
|
||||
|
|
|
@ -30,6 +30,21 @@ void* myCalloc(size_t nmemb, size_t size) {
|
|||
|
||||
class PhyPlanTest : public Test {
|
||||
protected:
|
||||
void pushAgg(int32_t aggOp) {
|
||||
unique_ptr<SQueryPlanNode> agg((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode)));
|
||||
agg->info.type = aggOp;
|
||||
agg->pExpr = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
unique_ptr<SExprInfo> expr((SExprInfo*)myCalloc(1, sizeof(SExprInfo)));
|
||||
expr->base.resSchema.type = TSDB_DATA_TYPE_INT;
|
||||
expr->base.resSchema.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
|
||||
expr->pExpr = (tExprNode*)myCalloc(1, sizeof(tExprNode));
|
||||
expr->pExpr->nodeType = TEXPR_FUNCTION_NODE;
|
||||
strcpy(expr->pExpr->_function.functionName, "Count");
|
||||
SExprInfo* item = expr.release();
|
||||
taosArrayPush(agg->pExpr, &item);
|
||||
pushNode(agg.release());
|
||||
}
|
||||
|
||||
void pushScan(const string& db, const string& table, int32_t scanOp) {
|
||||
shared_ptr<MockTableMeta> meta = mockCatalogService->getTableMeta(db, table);
|
||||
EXPECT_TRUE(meta);
|
||||
|
@ -95,10 +110,11 @@ protected:
|
|||
private:
|
||||
void pushNode(SQueryPlanNode* node) {
|
||||
if (logicPlan_) {
|
||||
// todo
|
||||
} else {
|
||||
logicPlan_.reset(node);
|
||||
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
SQueryPlanNode* child = logicPlan_.release();
|
||||
taosArrayPush(node->pChildren, &child);
|
||||
}
|
||||
logicPlan_.reset(node);
|
||||
}
|
||||
|
||||
void copySchemaMeta(STableMeta** dst, const STableMeta* src) {
|
||||
|
@ -174,6 +190,16 @@ TEST_F(PhyPlanTest, superTableScanTest) {
|
|||
// todo check
|
||||
}
|
||||
|
||||
// select count(*) from table
|
||||
TEST_F(PhyPlanTest, simpleAggTest) {
|
||||
pushScan("test", "t1", QNODE_TABLESCAN);
|
||||
pushAgg(QNODE_AGGREGATE);
|
||||
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||
explain();
|
||||
SQueryDag* dag = result();
|
||||
// todo check
|
||||
}
|
||||
|
||||
// insert into t values(...)
|
||||
TEST_F(PhyPlanTest, insertTest) {
|
||||
ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS);
|
||||
|
|
Loading…
Reference in New Issue