[td-11818] merge 3.0
This commit is contained in:
commit
205aae66af
|
@ -26,7 +26,8 @@
|
||||||
"eamodio.gitlens",
|
"eamodio.gitlens",
|
||||||
"matepek.vscode-catch2-test-adapter",
|
"matepek.vscode-catch2-test-adapter",
|
||||||
"spmeesseman.vscode-taskexplorer",
|
"spmeesseman.vscode-taskexplorer",
|
||||||
"cschlosser.doxdocgen"
|
"cschlosser.doxdocgen",
|
||||||
|
"urosvujosevic.explorer-manager"
|
||||||
],
|
],
|
||||||
// Use 'forwardPorts' to make a list of ports inside the container available locally.
|
// Use 'forwardPorts' to make a list of ports inside the container available locally.
|
||||||
// "forwardPorts": [],
|
// "forwardPorts": [],
|
||||||
|
|
|
@ -120,6 +120,25 @@ typedef struct SExchangePhyNode {
|
||||||
SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode
|
SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode
|
||||||
} SExchangePhyNode;
|
} SExchangePhyNode;
|
||||||
|
|
||||||
|
typedef enum EAggAlgo {
|
||||||
|
AGG_ALGO_PLAIN = 1, // simple agg across all input rows
|
||||||
|
AGG_ALGO_SORTED, // grouped agg, input must be sorted
|
||||||
|
AGG_ALGO_HASHED // grouped agg, use internal hashtable
|
||||||
|
} EAggAlgo;
|
||||||
|
|
||||||
|
typedef enum EAggSplit {
|
||||||
|
AGG_SPLIT_PRE = 1, // first level agg, maybe don't need calculate the final result
|
||||||
|
AGG_SPLIT_FINAL // second level agg, must calculate the final result
|
||||||
|
} EAggSplit;
|
||||||
|
|
||||||
|
typedef struct SAggPhyNode {
|
||||||
|
SPhyNode node;
|
||||||
|
EAggAlgo aggAlgo; // algorithm used by agg operator
|
||||||
|
EAggSplit aggSplit; // distributed splitting mode
|
||||||
|
SArray *pExprs; // SExprInfo list, these are expression list of group_by_clause and parameter expression of aggregate function
|
||||||
|
SArray *pGroupByList; // SColIndex list, but these must be column node
|
||||||
|
} SAggPhyNode;
|
||||||
|
|
||||||
typedef struct SSubplanId {
|
typedef struct SSubplanId {
|
||||||
uint64_t queryId;
|
uint64_t queryId;
|
||||||
uint64_t templateId;
|
uint64_t templateId;
|
||||||
|
|
|
@ -30,7 +30,7 @@ OP_ENUM_MACRO(TagScan)
|
||||||
OP_ENUM_MACRO(SystemTableScan)
|
OP_ENUM_MACRO(SystemTableScan)
|
||||||
OP_ENUM_MACRO(Aggregate)
|
OP_ENUM_MACRO(Aggregate)
|
||||||
OP_ENUM_MACRO(Project)
|
OP_ENUM_MACRO(Project)
|
||||||
OP_ENUM_MACRO(Groupby)
|
// OP_ENUM_MACRO(Groupby)
|
||||||
OP_ENUM_MACRO(Limit)
|
OP_ENUM_MACRO(Limit)
|
||||||
OP_ENUM_MACRO(SLimit)
|
OP_ENUM_MACRO(SLimit)
|
||||||
OP_ENUM_MACRO(TimeWindow)
|
OP_ENUM_MACRO(TimeWindow)
|
||||||
|
|
|
@ -36,13 +36,11 @@ typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
|
||||||
|
|
||||||
typedef struct SVnodeCfg {
|
typedef struct SVnodeCfg {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
SDnode *pDnode;
|
SDnode * pDnode;
|
||||||
struct {
|
|
||||||
uint64_t wsize;
|
uint64_t wsize;
|
||||||
uint64_t ssize;
|
uint64_t ssize;
|
||||||
uint64_t lsize;
|
uint64_t lsize;
|
||||||
bool isHeapAllocator;
|
bool isHeapAllocator;
|
||||||
};
|
|
||||||
uint32_t ttl;
|
uint32_t ttl;
|
||||||
uint32_t keep;
|
uint32_t keep;
|
||||||
bool isWeak;
|
bool isWeak;
|
||||||
|
@ -54,9 +52,9 @@ typedef struct SVnodeCfg {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t sver;
|
int32_t sver;
|
||||||
char *timezone;
|
char * timezone;
|
||||||
char *locale;
|
char * locale;
|
||||||
char *charset;
|
char * charset;
|
||||||
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
|
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
|
||||||
PutReqToVQueryQFp putReqToVQueryQFp;
|
PutReqToVQueryQFp putReqToVQueryQFp;
|
||||||
} SVnodeOpt;
|
} SVnodeOpt;
|
||||||
|
|
|
@ -82,12 +82,12 @@ int32_t vnodePutReqToVQueryQ(SVnode *pVnode, struct SRpcMsg *pReq);
|
||||||
// For Log
|
// For Log
|
||||||
extern int32_t vDebugFlag;
|
extern int32_t vDebugFlag;
|
||||||
|
|
||||||
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
|
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", 255, __VA_ARGS__); }} while(0)
|
||||||
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
|
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", 255, __VA_ARGS__); }} while(0)
|
||||||
#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
|
#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", 255, __VA_ARGS__); }} while(0)
|
||||||
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
|
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", 255, __VA_ARGS__); }} while(0)
|
||||||
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,7 +116,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
// Check if it needs to commit
|
// Check if it needs to commit
|
||||||
if (vnodeShouldCommit(pVnode)) {
|
if (vnodeShouldCommit(pVnode)) {
|
||||||
tsem_wait(&(pVnode->canCommit));
|
// tsem_wait(&(pVnode->canCommit));
|
||||||
if (vnodeAsyncCommit(pVnode) < 0) {
|
if (vnodeAsyncCommit(pVnode) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
|
|
|
@ -295,13 +295,31 @@ static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTabl
|
||||||
|
|
||||||
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
|
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
|
||||||
|
|
||||||
if (needMultiNodeScan(pTable)) {
|
if (needMultiNodeScan(pTable)) {
|
||||||
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
|
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
|
||||||
}
|
}
|
||||||
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
|
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SPhyNode* createSingleTableAgg(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
|
SAggPhyNode* node = (SAggPhyNode*)initPhyNode(pPlanNode, OP_Aggregate, sizeof(SAggPhyNode));
|
||||||
|
SGroupbyExpr* pGroupBy = (SGroupbyExpr*)pPlanNode->pExtInfo;
|
||||||
|
node->aggAlgo = AGG_ALGO_PLAIN;
|
||||||
|
node->aggSplit = AGG_SPLIT_FINAL;
|
||||||
|
if (NULL != pGroupBy) {
|
||||||
|
node->aggAlgo = AGG_ALGO_HASHED;
|
||||||
|
node->pGroupByList = validPointer(taosArrayDup(pGroupBy->columnInfo));
|
||||||
|
}
|
||||||
|
return (SPhyNode*)node;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SPhyNode* createAggNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
|
// if (needMultiNodeAgg(pPlanNode)) {
|
||||||
|
|
||||||
|
// }
|
||||||
|
return createSingleTableAgg(pCxt, pPlanNode);
|
||||||
|
}
|
||||||
|
|
||||||
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
SPhyNode* node = NULL;
|
SPhyNode* node = NULL;
|
||||||
switch (pPlanNode->info.type) {
|
switch (pPlanNode->info.type) {
|
||||||
|
@ -311,6 +329,10 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
|
||||||
case QNODE_TABLESCAN:
|
case QNODE_TABLESCAN:
|
||||||
node = createTableScanNode(pCxt, pPlanNode);
|
node = createTableScanNode(pCxt, pPlanNode);
|
||||||
break;
|
break;
|
||||||
|
case QNODE_AGGREGATE:
|
||||||
|
case QNODE_GROUPBY:
|
||||||
|
node = createAggNode(pCxt, pPlanNode);
|
||||||
|
break;
|
||||||
case QNODE_MODIFY:
|
case QNODE_MODIFY:
|
||||||
// Insert is not an operator in a physical plan.
|
// Insert is not an operator in a physical plan.
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -20,6 +20,19 @@
|
||||||
typedef bool (*FToJson)(const void* obj, cJSON* json);
|
typedef bool (*FToJson)(const void* obj, cJSON* json);
|
||||||
typedef bool (*FFromJson)(const cJSON* json, void* obj);
|
typedef bool (*FFromJson)(const cJSON* json, void* obj);
|
||||||
|
|
||||||
|
static char* getString(const cJSON* json, const char* name) {
|
||||||
|
char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name));
|
||||||
|
return strdup(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void copyString(const cJSON* json, const char* name, char* dst) {
|
||||||
|
strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name)));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int64_t getNumber(const cJSON* json, const char* name) {
|
||||||
|
return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
|
||||||
|
}
|
||||||
|
|
||||||
static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) {
|
static bool addObject(cJSON* json, const char* name, FToJson func, const void* obj) {
|
||||||
if (NULL == obj) {
|
if (NULL == obj) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -62,6 +75,39 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
|
||||||
return func(jObj, *obj);
|
return func(jObj, *obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char* jkPnodeType = "Type";
|
||||||
|
static int32_t getPnodeTypeSize(cJSON* json) {
|
||||||
|
switch (getNumber(json, jkPnodeType)) {
|
||||||
|
case OP_TableScan:
|
||||||
|
case OP_DataBlocksOptScan:
|
||||||
|
case OP_TableSeqScan:
|
||||||
|
return sizeof(STableScanPhyNode);
|
||||||
|
case OP_TagScan:
|
||||||
|
return sizeof(STagScanPhyNode);
|
||||||
|
case OP_SystemTableScan:
|
||||||
|
return sizeof(SSystemTableScanPhyNode);
|
||||||
|
case OP_Aggregate:
|
||||||
|
return sizeof(SAggPhyNode);
|
||||||
|
case OP_Exchange:
|
||||||
|
return sizeof(SExchangePhyNode);
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void** obj) {
|
||||||
|
cJSON* jObj = cJSON_GetObjectItem(json, name);
|
||||||
|
if (NULL == jObj) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
*obj = calloc(1, getPnodeTypeSize(jObj));
|
||||||
|
if (NULL == *obj) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return func(jObj, *obj);
|
||||||
|
}
|
||||||
|
|
||||||
static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
|
static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) {
|
||||||
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
|
size_t size = (NULL == array) ? 0 : taosArrayGetSize(array);
|
||||||
if (size > 0) {
|
if (size > 0) {
|
||||||
|
@ -154,26 +200,9 @@ static bool fromRawArrayWithAlloc(const cJSON* json, const char* name, FFromJson
|
||||||
return fromItem(jArray, func, *array, itemSize, *size);
|
return fromItem(jArray, func, *array, itemSize, *size);
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void** array, int32_t itemSize, int32_t* size) {
|
static bool fromRawArray(const cJSON* json, const char* name, FFromJson func, void* array, int32_t itemSize, int32_t* size) {
|
||||||
const cJSON* jArray = getArray(json, name, size);
|
const cJSON* jArray = getArray(json, name, size);
|
||||||
if (*array == NULL) {
|
return fromItem(jArray, func, array, itemSize, *size);
|
||||||
*array = calloc(*size, itemSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
return fromItem(jArray, func, *array, itemSize, *size);
|
|
||||||
}
|
|
||||||
|
|
||||||
static char* getString(const cJSON* json, const char* name) {
|
|
||||||
char* p = cJSON_GetStringValue(cJSON_GetObjectItem(json, name));
|
|
||||||
return strdup(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void copyString(const cJSON* json, const char* name, char* dst) {
|
|
||||||
strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name)));
|
|
||||||
}
|
|
||||||
|
|
||||||
static int64_t getNumber(const cJSON* json, const char* name) {
|
|
||||||
return cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkSchemaType = "Type";
|
static const char* jkSchemaType = "Type";
|
||||||
|
@ -221,7 +250,7 @@ static bool dataBlockSchemaFromJson(const cJSON* json, void* obj) {
|
||||||
schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize);
|
schema->resultRowSize = getNumber(json, jkDataBlockSchemaResultRowSize);
|
||||||
schema->precision = getNumber(json, jkDataBlockSchemaPrecision);
|
schema->precision = getNumber(json, jkDataBlockSchemaPrecision);
|
||||||
|
|
||||||
return fromRawArray(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**) &(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols);
|
return fromRawArrayWithAlloc(json, jkDataBlockSchemaSlotSchema, schemaFromJson, (void**)&(schema->pSchema), sizeof(SSlotSchema), &schema->numOfCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr";
|
static const char* jkColumnFilterInfoLowerRelOptr = "LowerRelOptr";
|
||||||
|
@ -539,11 +568,9 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
|
||||||
if (res) {
|
if (res) {
|
||||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType);
|
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (res) {
|
if (res) {
|
||||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order);
|
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (res) {
|
if (res) {
|
||||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count);
|
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count);
|
||||||
}
|
}
|
||||||
|
@ -551,7 +578,6 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
|
||||||
if (res) {
|
if (res) {
|
||||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse);
|
res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse);
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -565,6 +591,66 @@ static bool scanNodeFromJson(const cJSON* json, void* obj) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char* jkColIndexColId = "ColId";
|
||||||
|
static const char* jkColIndexColIndex = "ColIndex";
|
||||||
|
static const char* jkColIndexFlag = "Flag";
|
||||||
|
static const char* jkColIndexName = "Name";
|
||||||
|
|
||||||
|
static bool colIndexToJson(const void* obj, cJSON* json) {
|
||||||
|
const SColIndex* col = (const SColIndex*)obj;
|
||||||
|
bool res = cJSON_AddNumberToObject(json, jkColIndexColId, col->colId);
|
||||||
|
if (res) {
|
||||||
|
res = cJSON_AddNumberToObject(json, jkColIndexColIndex, col->colIndex);
|
||||||
|
}
|
||||||
|
if (res) {
|
||||||
|
res = cJSON_AddNumberToObject(json, jkColIndexFlag, col->flag);
|
||||||
|
}
|
||||||
|
if (res) {
|
||||||
|
res = cJSON_AddStringToObject(json, jkColIndexName, col->name);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool colIndexFromJson(const cJSON* json, void* obj) {
|
||||||
|
SColIndex* col = (SColIndex*)obj;
|
||||||
|
col->colId = getNumber(json, jkColIndexColId);
|
||||||
|
col->colIndex = getNumber(json, jkColIndexColIndex);
|
||||||
|
col->flag = getNumber(json, jkColIndexFlag);
|
||||||
|
copyString(json, jkColIndexName, col->name);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static const char* jkAggNodeAggAlgo = "AggAlgo";
|
||||||
|
static const char* jkAggNodeAggSplit = "AggSplit";
|
||||||
|
static const char* jkAggNodeExprs = "Exprs";
|
||||||
|
static const char* jkAggNodeGroupByList = "GroupByList";
|
||||||
|
|
||||||
|
static bool aggNodeToJson(const void* obj, cJSON* json) {
|
||||||
|
const SAggPhyNode* agg = (const SAggPhyNode*)obj;
|
||||||
|
bool res = cJSON_AddNumberToObject(json, jkAggNodeAggAlgo, agg->aggAlgo);
|
||||||
|
if (res) {
|
||||||
|
res = cJSON_AddNumberToObject(json, jkAggNodeAggSplit, agg->aggSplit);
|
||||||
|
}
|
||||||
|
if (res) {
|
||||||
|
res = addArray(json, jkAggNodeExprs, exprInfoToJson, agg->pExprs);
|
||||||
|
}
|
||||||
|
if (res) {
|
||||||
|
res = addArray(json, jkAggNodeGroupByList, colIndexToJson, agg->pGroupByList);
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool aggNodeFromJson(const cJSON* json, void* obj) {
|
||||||
|
SAggPhyNode* agg = (SAggPhyNode*)obj;
|
||||||
|
agg->aggAlgo = getNumber(json, jkAggNodeAggAlgo);
|
||||||
|
agg->aggSplit = getNumber(json, jkAggNodeAggSplit);
|
||||||
|
bool res = fromArray(json, jkAggNodeExprs, exprInfoFromJson, &agg->pExprs, sizeof(SExprInfo));
|
||||||
|
if (res) {
|
||||||
|
res = fromArray(json, jkAggNodeGroupByList, colIndexFromJson, &agg->pGroupByList, sizeof(SExprInfo));
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
static const char* jkTableScanNodeFlag = "Flag";
|
static const char* jkTableScanNodeFlag = "Flag";
|
||||||
static const char* jkTableScanNodeWindow = "Window";
|
static const char* jkTableScanNodeWindow = "Window";
|
||||||
static const char* jkTableScanNodeTagsConditions = "TagsConditions";
|
static const char* jkTableScanNodeTagsConditions = "TagsConditions";
|
||||||
|
@ -673,10 +759,10 @@ static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
|
||||||
case OP_SystemTableScan:
|
case OP_SystemTableScan:
|
||||||
return scanNodeToJson(obj, json);
|
return scanNodeToJson(obj, json);
|
||||||
case OP_Aggregate:
|
case OP_Aggregate:
|
||||||
break; // todo
|
return aggNodeToJson(obj, json);
|
||||||
case OP_Project:
|
case OP_Project:
|
||||||
return true;
|
return true;
|
||||||
case OP_Groupby:
|
// case OP_Groupby:
|
||||||
case OP_Limit:
|
case OP_Limit:
|
||||||
case OP_SLimit:
|
case OP_SLimit:
|
||||||
case OP_TimeWindow:
|
case OP_TimeWindow:
|
||||||
|
@ -714,7 +800,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
|
||||||
break; // todo
|
break; // todo
|
||||||
case OP_Project:
|
case OP_Project:
|
||||||
return true;
|
return true;
|
||||||
case OP_Groupby:
|
// case OP_Groupby:
|
||||||
case OP_Limit:
|
case OP_Limit:
|
||||||
case OP_SLimit:
|
case OP_SLimit:
|
||||||
case OP_TimeWindow:
|
case OP_TimeWindow:
|
||||||
|
@ -741,12 +827,15 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
|
||||||
static const char* jkPnodeName = "Name";
|
static const char* jkPnodeName = "Name";
|
||||||
static const char* jkPnodeTargets = "Targets";
|
static const char* jkPnodeTargets = "Targets";
|
||||||
static const char* jkPnodeConditions = "Conditions";
|
static const char* jkPnodeConditions = "Conditions";
|
||||||
static const char* jkPnodeSchema = "InputSchema";
|
static const char* jkPnodeSchema = "TargetSchema";
|
||||||
static const char* jkPnodeChildren = "Children";
|
static const char* jkPnodeChildren = "Children";
|
||||||
// The 'pParent' field do not need to be serialized.
|
// The 'pParent' field do not need to be serialized.
|
||||||
static bool phyNodeToJson(const void* obj, cJSON* jNode) {
|
static bool phyNodeToJson(const void* obj, cJSON* jNode) {
|
||||||
const SPhyNode* phyNode = (const SPhyNode*)obj;
|
const SPhyNode* phyNode = (const SPhyNode*)obj;
|
||||||
bool res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name);
|
bool res = cJSON_AddNumberToObject(jNode, jkPnodeType, phyNode->info.type);
|
||||||
|
if (res) {
|
||||||
|
res = cJSON_AddStringToObject(jNode, jkPnodeName, phyNode->info.name);
|
||||||
|
}
|
||||||
if (res) {
|
if (res) {
|
||||||
res = addArray(jNode, jkPnodeTargets, exprInfoToJson, phyNode->pTargets);
|
res = addArray(jNode, jkPnodeTargets, exprInfoToJson, phyNode->pTargets);
|
||||||
}
|
}
|
||||||
|
@ -768,8 +857,8 @@ static bool phyNodeToJson(const void* obj, cJSON* jNode) {
|
||||||
static bool phyNodeFromJson(const cJSON* json, void* obj) {
|
static bool phyNodeFromJson(const cJSON* json, void* obj) {
|
||||||
SPhyNode* node = (SPhyNode*) obj;
|
SPhyNode* node = (SPhyNode*) obj;
|
||||||
|
|
||||||
node->info.name = getString(json, jkPnodeName);
|
node->info.type = getNumber(json, jkPnodeType);
|
||||||
node->info.type = opNameToOpType(node->info.name);
|
node->info.name = opTypeToOpName(node->info.type);
|
||||||
|
|
||||||
bool res = fromArray(json, jkPnodeTargets, exprInfoFromJson, &node->pTargets, sizeof(SExprInfo));
|
bool res = fromArray(json, jkPnodeTargets, exprInfoFromJson, &node->pTargets, sizeof(SExprInfo));
|
||||||
if (res) {
|
if (res) {
|
||||||
|
@ -914,11 +1003,13 @@ static SSubplan* subplanFromJson(const cJSON* json) {
|
||||||
if (NULL == subplan) {
|
if (NULL == subplan) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true);
|
bool res = fromObject(json, jkSubplanId, subplanIdFromJson, &subplan->id, true);
|
||||||
|
|
||||||
if (res) {
|
if (res) {
|
||||||
size_t size = MAX(sizeof(SPhyNode), sizeof(STableScanPhyNode));
|
res = fromPnode(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode);
|
||||||
res = fromObjectWithAlloc(json, jkSubplanNode, phyNodeFromJson, (void**)&subplan->pNode, size, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (res) {
|
if (res) {
|
||||||
res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false);
|
res = fromObjectWithAlloc(json, jkSubplanDataSink, dataSinkFromJson, (void**)&subplan->pDataSink, sizeof(SDataSink), false);
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,6 +30,21 @@ void* myCalloc(size_t nmemb, size_t size) {
|
||||||
|
|
||||||
class PhyPlanTest : public Test {
|
class PhyPlanTest : public Test {
|
||||||
protected:
|
protected:
|
||||||
|
void pushAgg(int32_t aggOp) {
|
||||||
|
unique_ptr<SQueryPlanNode> agg((SQueryPlanNode*)myCalloc(1, sizeof(SQueryPlanNode)));
|
||||||
|
agg->info.type = aggOp;
|
||||||
|
agg->pExpr = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||||
|
unique_ptr<SExprInfo> expr((SExprInfo*)myCalloc(1, sizeof(SExprInfo)));
|
||||||
|
expr->base.resSchema.type = TSDB_DATA_TYPE_INT;
|
||||||
|
expr->base.resSchema.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
|
||||||
|
expr->pExpr = (tExprNode*)myCalloc(1, sizeof(tExprNode));
|
||||||
|
expr->pExpr->nodeType = TEXPR_FUNCTION_NODE;
|
||||||
|
strcpy(expr->pExpr->_function.functionName, "Count");
|
||||||
|
SExprInfo* item = expr.release();
|
||||||
|
taosArrayPush(agg->pExpr, &item);
|
||||||
|
pushNode(agg.release());
|
||||||
|
}
|
||||||
|
|
||||||
void pushScan(const string& db, const string& table, int32_t scanOp) {
|
void pushScan(const string& db, const string& table, int32_t scanOp) {
|
||||||
shared_ptr<MockTableMeta> meta = mockCatalogService->getTableMeta(db, table);
|
shared_ptr<MockTableMeta> meta = mockCatalogService->getTableMeta(db, table);
|
||||||
EXPECT_TRUE(meta);
|
EXPECT_TRUE(meta);
|
||||||
|
@ -95,10 +110,11 @@ protected:
|
||||||
private:
|
private:
|
||||||
void pushNode(SQueryPlanNode* node) {
|
void pushNode(SQueryPlanNode* node) {
|
||||||
if (logicPlan_) {
|
if (logicPlan_) {
|
||||||
// todo
|
node->pChildren = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||||
} else {
|
SQueryPlanNode* child = logicPlan_.release();
|
||||||
logicPlan_.reset(node);
|
taosArrayPush(node->pChildren, &child);
|
||||||
}
|
}
|
||||||
|
logicPlan_.reset(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
void copySchemaMeta(STableMeta** dst, const STableMeta* src) {
|
void copySchemaMeta(STableMeta** dst, const STableMeta* src) {
|
||||||
|
@ -174,6 +190,16 @@ TEST_F(PhyPlanTest, superTableScanTest) {
|
||||||
// todo check
|
// todo check
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// select count(*) from table
|
||||||
|
TEST_F(PhyPlanTest, simpleAggTest) {
|
||||||
|
pushScan("test", "t1", QNODE_TABLESCAN);
|
||||||
|
pushAgg(QNODE_AGGREGATE);
|
||||||
|
ASSERT_EQ(run(), TSDB_CODE_SUCCESS);
|
||||||
|
explain();
|
||||||
|
SQueryDag* dag = result();
|
||||||
|
// todo check
|
||||||
|
}
|
||||||
|
|
||||||
// insert into t values(...)
|
// insert into t values(...)
|
||||||
TEST_F(PhyPlanTest, insertTest) {
|
TEST_F(PhyPlanTest, insertTest) {
|
||||||
ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS);
|
ASSERT_EQ(run("test", "insert into t1 values (now, 1, \"beijing\")"), TSDB_CODE_SUCCESS);
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
aux_source_directory(src TDB_SRC)
|
|
||||||
|
set(TDB_SUBDIRS "btree" "db" "hash" "mpool" "dmgr")
|
||||||
|
foreach(TDB_SUBDIR ${TDB_SUBDIRS})
|
||||||
|
aux_source_directory("src/${TDB_SUBDIR}" TDB_SRC)
|
||||||
|
endforeach()
|
||||||
|
|
||||||
add_library(tdb STATIC ${TDB_SRC})
|
add_library(tdb STATIC ${TDB_SRC})
|
||||||
# target_include_directories(
|
|
||||||
# tkv
|
|
||||||
# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/tkv"
|
|
||||||
# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
|
||||||
# )
|
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
tdb
|
tdb
|
||||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||||
|
@ -17,5 +18,5 @@ target_link_libraries(
|
||||||
)
|
)
|
||||||
|
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
# add_subdirectory(test)
|
add_subdirectory(test)
|
||||||
endif(${BUILD_TEST})
|
endif(${BUILD_TEST})
|
||||||
|
|
|
@ -22,10 +22,14 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define TDB_EXTERN
|
||||||
|
#define TDB_PUBLIC
|
||||||
|
#define TDB_STATIC static
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TDB_BTREE = 0,
|
TDB_BTREE_T = 0,
|
||||||
TDB_HASH,
|
TDB_HASH_T,
|
||||||
TDB_HEAP,
|
TDB_HEAP_T,
|
||||||
} tdb_db_t;
|
} tdb_db_t;
|
||||||
|
|
||||||
// Forward declaration
|
// Forward declaration
|
||||||
|
@ -39,9 +43,9 @@ typedef struct {
|
||||||
} TDB_KEY, TDB_VALUE;
|
} TDB_KEY, TDB_VALUE;
|
||||||
|
|
||||||
// TDB Operations
|
// TDB Operations
|
||||||
int tdbCreateDB(TDB** dbpp);
|
TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type);
|
||||||
int tdbOpenDB(TDB* dbp, tdb_db_t type, uint32_t flags);
|
TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags);
|
||||||
int tdbCloseDB(TDB* dbp, uint32_t flags);
|
TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "tdbDB.h"
|
||||||
|
#include "tdb.h"
|
||||||
|
|
||||||
|
TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type) {
|
||||||
|
TDB* dbp;
|
||||||
|
int ret;
|
||||||
|
|
||||||
|
dbp = calloc(1, sizeof(*dbp));
|
||||||
|
if (dbp == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dbp->pageSize = TDB_DEFAULT_PGSIZE;
|
||||||
|
dbp->type = type;
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case TDB_BTREE_T:
|
||||||
|
// ret = tdbInitBtreeDB(dbp);
|
||||||
|
// if (ret < 0) goto _err;
|
||||||
|
break;
|
||||||
|
case TDB_HASH_T:
|
||||||
|
// ret = tdbInitHashDB(dbp);
|
||||||
|
// if (ret < 0) goto _err;
|
||||||
|
break;
|
||||||
|
case TDB_HEAP_T:
|
||||||
|
// ret = tdbInitHeapDB(dbp);
|
||||||
|
// if (ret < 0) goto _err;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
*dbpp = dbp;
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
if (dbp) {
|
||||||
|
free(dbp);
|
||||||
|
}
|
||||||
|
*dbpp = NULL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags) {
|
||||||
|
// TODO
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags) {
|
||||||
|
// TODO
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -16,7 +16,7 @@
|
||||||
#ifndef _TD_TDB_BTREE_H_
|
#ifndef _TD_TDB_BTREE_H_
|
||||||
#define _TD_TDB_BTREE_H_
|
#define _TD_TDB_BTREE_H_
|
||||||
|
|
||||||
#include "tkvDef.h"
|
#include "tdbDef.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -26,6 +26,8 @@ typedef struct {
|
||||||
pgid_t root; // root page number
|
pgid_t root; // root page number
|
||||||
} TDB_BTREE;
|
} TDB_BTREE;
|
||||||
|
|
||||||
|
TDB_PUBLIC int tdbInitBtreeDB(TDB *dbp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -16,20 +16,22 @@
|
||||||
#ifndef _TD_TDB_DB_H_
|
#ifndef _TD_TDB_DB_H_
|
||||||
#define _TD_TDB_DB_H_
|
#define _TD_TDB_DB_H_
|
||||||
|
|
||||||
|
#include "tdb.h"
|
||||||
#include "tdbBtree.h"
|
#include "tdbBtree.h"
|
||||||
#include "tdbHash.h"
|
#include "tdbHash.h"
|
||||||
|
#include "tdbHeap.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
struct TDB {
|
struct TDB {
|
||||||
pgsize_t pageSize;
|
pgsize_t pageSize;
|
||||||
tdb_db_t type;
|
tdb_db_t type;
|
||||||
union {
|
union {
|
||||||
TDB_BTREE btree;
|
TDB_BTREE *btree;
|
||||||
TDB_HASH hash;
|
TDB_HASH * hash;
|
||||||
|
TDB_HEAP * heap;
|
||||||
} dbam; // Different access methods
|
} dbam; // Different access methods
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -24,16 +24,17 @@ extern "C" {
|
||||||
|
|
||||||
// pgid_t
|
// pgid_t
|
||||||
typedef int32_t pgid_t;
|
typedef int32_t pgid_t;
|
||||||
#define TKV_IVLD_PGID ((pgid_t)-1)
|
#define TDB_IVLD_PGID ((pgid_t)-1)
|
||||||
|
|
||||||
// framd_id_t
|
// framd_id_t
|
||||||
typedef int32_t frame_id_t;
|
typedef int32_t frame_id_t;
|
||||||
|
|
||||||
// pgsize_t
|
// pgsize_t
|
||||||
typedef int32_t pgsize_t;
|
typedef int32_t pgsize_t;
|
||||||
#define TKV_MIN_PGSIZE 512
|
#define TDB_MIN_PGSIZE 512
|
||||||
#define TKV_MAX_PGSIZE 16384
|
#define TDB_MAX_PGSIZE 16384
|
||||||
#define TKV_IS_PGSIZE_VLD(s) (((s) >= TKV_MIN_PGSIZE) && (TKV_MAX_PGSIZE <= TKV_MAX_PGSIZE))
|
#define TDB_DEFAULT_PGSIZE 4096
|
||||||
|
#define TDB_IS_PGSIZE_VLD(s) (((s) >= TKV_MIN_PGSIZE) && (TKV_MAX_PGSIZE <= TKV_MAX_PGSIZE))
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _TD_TKV_HAHS_H_
|
#ifndef _TD_TDB_HASH_H_
|
||||||
#define _TD_TKV_HAHS_H_
|
#define _TD_TDB_HASH_H_
|
||||||
|
|
||||||
#include "tdbDef.h"
|
#include "tdbDef.h"
|
||||||
|
|
||||||
|
@ -26,8 +26,10 @@ typedef struct {
|
||||||
// TODO
|
// TODO
|
||||||
} TDB_HASH;
|
} TDB_HASH;
|
||||||
|
|
||||||
|
TDB_PUBLIC int tdbInitHashDB(TDB *dbp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_TKV_HAHS_H_*/
|
#endif /*_TD_TDB_HASH_H_*/
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TDB_HEAP_H_
|
||||||
|
#define _TD_TDB_HEAP_H_
|
||||||
|
|
||||||
|
#include "tdbDef.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
// TODO
|
||||||
|
} TDB_HEAP;
|
||||||
|
|
||||||
|
TDB_PUBLIC int tdbInitHeapDB(TDB *dbp);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_TDB_HEAP_H_*/
|
|
@ -6,9 +6,9 @@ TEST(tdb_api_test, tdb_create_open_close_db_test) {
|
||||||
int ret;
|
int ret;
|
||||||
TDB *dbp;
|
TDB *dbp;
|
||||||
|
|
||||||
tdbCreateDB(&dbp);
|
tdbCreateDB(&dbp, TDB_BTREE_T);
|
||||||
|
|
||||||
tdbOpenDB(dbp, TDB_BTREE, 0);
|
tdbOpenDB(dbp, 0);
|
||||||
|
|
||||||
tdbCloseDB(dbp, 0);
|
tdbCloseDB(dbp, 0);
|
||||||
}
|
}
|
Loading…
Reference in New Issue