Merge pull request #11263 from taosdata/feature/3.0_wxy
normal table rollup translate, and condition rewrite
This commit is contained in:
commit
e5b18c6e0e
|
@ -94,6 +94,7 @@ typedef struct SColumnDefNode {
|
|||
char colName[TSDB_COL_NAME_LEN];
|
||||
SDataType dataType;
|
||||
char comments[TSDB_STB_COMMENT_LEN];
|
||||
bool sma;
|
||||
} SColumnDefNode;
|
||||
|
||||
typedef struct SCreateTableStmt {
|
||||
|
|
|
@ -1050,6 +1050,7 @@ SNode* createColumnDefNode(SAstCreateContext* pCxt, const SToken* pColName, SDat
|
|||
if (NULL != pComment) {
|
||||
trimString(pComment->z, pComment->n, pCol->comments, sizeof(pCol->comments));
|
||||
}
|
||||
pCol->sma = true;
|
||||
return (SNode*)pCol;
|
||||
}
|
||||
|
||||
|
|
|
@ -90,14 +90,91 @@ static EDealRes calcConst(SNode** pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static bool isCondition(const SNode* pNode) {
|
||||
if (QUERY_NODE_OPERATOR == nodeType(pNode)) {
|
||||
return nodesIsComparisonOp((const SOperatorNode*)pNode);
|
||||
}
|
||||
return (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode));
|
||||
}
|
||||
|
||||
static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) {
|
||||
SOperatorNode* pOp = nodesMakeNode(QUERY_NODE_OPERATOR);
|
||||
if (NULL == pOp) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pOp->opType = OP_TYPE_IS_TRUE;
|
||||
pOp->pLeft = pSrc;
|
||||
*pIsTrue = (SNode*)pOp;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EDealRes doRewriteCondition(SNode** pNode, void* pContext) {
|
||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pNode)) {
|
||||
SNode* pParam = NULL;
|
||||
FOREACH(pParam, ((SLogicConditionNode*)*pNode)->pParameterList) {
|
||||
if (!isCondition(pParam)) {
|
||||
SNode* pIsTrue = NULL;
|
||||
if (TSDB_CODE_SUCCESS != rewriteIsTrue(pParam, &pIsTrue)) {
|
||||
((SCalcConstContext*)pContext)->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return DEAL_RES_ERROR;
|
||||
}
|
||||
REPLACE_NODE(pIsTrue);
|
||||
}
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static int32_t rewriteCondition(SCalcConstContext* pCxt, SNode** pNode) {
|
||||
if (!isCondition(*pNode)) {
|
||||
return rewriteIsTrue(*pNode, pNode);
|
||||
}
|
||||
nodesRewriteExprPostOrder(pNode, doRewriteCondition, pCxt);
|
||||
return pCxt->code;
|
||||
}
|
||||
|
||||
static int32_t rewriteConditionForFromTable(SCalcConstContext* pCxt, SNode* pTable) {
|
||||
if (QUERY_NODE_JOIN_TABLE == nodeType(pTable)) {
|
||||
SJoinTableNode* pJoin = (SJoinTableNode*)pTable;
|
||||
pCxt->code = rewriteConditionForFromTable(pCxt, pJoin->pLeft);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
||||
pCxt->code = rewriteConditionForFromTable(pCxt, pJoin->pRight);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
||||
pCxt->code = rewriteCondition(pCxt, &pJoin->pOnCond);
|
||||
}
|
||||
}
|
||||
return pCxt->code;
|
||||
}
|
||||
|
||||
static int32_t calcConstFromTable(SCalcConstContext* pCxt, SSelectStmt* pSelect) {
|
||||
nodesRewriteExprPostOrder(&pSelect->pFromTable, calcConst, pCxt);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
||||
pCxt->code = rewriteConditionForFromTable(pCxt, pSelect->pFromTable);
|
||||
}
|
||||
return pCxt->code;
|
||||
}
|
||||
|
||||
static int32_t calcConstCondition(SCalcConstContext* pCxt, SNode** pCond) {
|
||||
if (NULL == *pCond) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
nodesRewriteExprPostOrder(pCond, calcConst, pCxt);
|
||||
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
||||
pCxt->code = rewriteCondition(pCxt, pCond);
|
||||
}
|
||||
return pCxt->code;
|
||||
}
|
||||
|
||||
static int32_t calcConstSelect(SSelectStmt* pSelect) {
|
||||
SCalcConstContext cxt = { .code = TSDB_CODE_SUCCESS };
|
||||
nodesRewriteExprsPostOrder(pSelect->pProjectionList, calcConst, &cxt);
|
||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||
nodesRewriteExprPostOrder(&pSelect->pFromTable, calcConst, &cxt);
|
||||
cxt.code = calcConstFromTable(&cxt, pSelect);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||
nodesRewriteExprPostOrder(&pSelect->pWhere, calcConst, &cxt);
|
||||
cxt.code = calcConstCondition(&cxt, &pSelect->pWhere);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||
nodesRewriteExprsPostOrder(pSelect->pPartitionByList, calcConst, &cxt);
|
||||
|
@ -109,7 +186,7 @@ static int32_t calcConstSelect(SSelectStmt* pSelect) {
|
|||
nodesRewriteExprsPostOrder(pSelect->pGroupByList, calcConst, &cxt);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||
nodesRewriteExprPostOrder(&pSelect->pHaving, calcConst, &cxt);
|
||||
cxt.code = calcConstCondition(&cxt, &pSelect->pHaving);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||
nodesRewriteExprsPostOrder(pSelect->pOrderByList, calcConst, &cxt);
|
||||
|
|
|
@ -1119,7 +1119,7 @@ static int32_t columnNodeToField(SNodeList* pList, SArray** pArray) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static const SColumnDefNode* findColDef(const SNodeList* pCols, const SColumnNode* pCol) {
|
||||
static SColumnDefNode* findColDef(SNodeList* pCols, const SColumnNode* pCol) {
|
||||
SNode* pColDef = NULL;
|
||||
FOREACH(pColDef, pCols) {
|
||||
if (0 == strcmp(pCol->colName, ((SColumnDefNode*)pColDef)->colName)) {
|
||||
|
@ -1129,16 +1129,20 @@ static const SColumnDefNode* findColDef(const SNodeList* pCols, const SColumnNod
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static int32_t checkCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
if (NULL != pStmt->pOptions->pSma) {
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pStmt->pCols) {
|
||||
((SColumnDefNode*)pNode)->sma = false;
|
||||
}
|
||||
FOREACH(pNode, pStmt->pOptions->pSma) {
|
||||
SColumnNode* pSmaCol = (SColumnNode*)pNode;
|
||||
const SColumnDefNode* pColDef = findColDef(pStmt->pCols, pSmaCol);
|
||||
SColumnDefNode* pColDef = findColDef(pStmt->pCols, pSmaCol);
|
||||
if (NULL == pColDef) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLUMN, pSmaCol->colName);
|
||||
}
|
||||
pSmaCol->node.resType = pColDef->dataType;
|
||||
pColDef->sma = true;
|
||||
}
|
||||
}
|
||||
if (NULL != pStmt->pOptions->pFuncs) {
|
||||
|
@ -1158,7 +1162,7 @@ static int32_t getAggregationMethod(SNodeList* pFuncs) {
|
|||
}
|
||||
|
||||
static int32_t translateCreateSuperTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
|
||||
int32_t code = checkCreateSuperTable(pCxt, pStmt);
|
||||
int32_t code = checkCreateTable(pCxt, pStmt);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
@ -1489,7 +1493,7 @@ static int32_t nodeTypeToShowType(ENodeType nt) {
|
|||
case QUERY_NODE_SHOW_CONNECTIONS_STMT:
|
||||
return TSDB_MGMT_TABLE_CONNS;
|
||||
case QUERY_NODE_SHOW_LICENCE_STMT:
|
||||
return 0; // todo
|
||||
return TSDB_MGMT_TABLE_GRANTS;
|
||||
case QUERY_NODE_SHOW_QUERIES_STMT:
|
||||
return TSDB_MGMT_TABLE_QUERIES;
|
||||
case QUERY_NODE_SHOW_SCORES_STMT:
|
||||
|
@ -2161,11 +2165,11 @@ typedef struct SVgroupTablesBatch {
|
|||
char dbName[TSDB_DB_NAME_LEN];
|
||||
} SVgroupTablesBatch;
|
||||
|
||||
static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchemaEx* pSchema) {
|
||||
static void toSchemaEx(const SColumnDefNode* pCol, col_id_t colId, SSchemaEx* pSchema) {
|
||||
pSchema->colId = colId;
|
||||
pSchema->type = pCol->dataType.type;
|
||||
pSchema->bytes = calcTypeBytes(pCol->dataType);
|
||||
pSchema->sma = TSDB_BSMA_TYPE_LATEST; // TODO: use default value currently, and use the real value later.
|
||||
pSchema->sma = pCol->sma ? TSDB_BSMA_TYPE_LATEST : TSDB_BSMA_TYPE_NONE;
|
||||
strcpy(pSchema->name, pCol->colName);
|
||||
}
|
||||
|
||||
|
@ -2175,33 +2179,60 @@ static void destroyCreateTbReq(SVCreateTbReq* pReq) {
|
|||
taosMemoryFreeClear(pReq->ntbCfg.pSchema);
|
||||
}
|
||||
|
||||
static int32_t buildNormalTableBatchReq(int32_t acctId, const char* pDbName, const char* pTableName,
|
||||
const SNodeList* pColumns, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
|
||||
static int32_t buildSmaParam(STableOptions* pOptions, SVCreateTbReq* pReq) {
|
||||
if (0 == LIST_LENGTH(pOptions->pFuncs)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pReq->ntbCfg.pRSmaParam = taosMemoryCalloc(1, sizeof(SRSmaParam));
|
||||
if (NULL == pReq->ntbCfg.pRSmaParam) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pReq->ntbCfg.pRSmaParam->delay = pOptions->delay;
|
||||
pReq->ntbCfg.pRSmaParam->xFilesFactor = pOptions->filesFactor;
|
||||
pReq->ntbCfg.pRSmaParam->nFuncIds = LIST_LENGTH(pOptions->pFuncs);
|
||||
pReq->ntbCfg.pRSmaParam->pFuncIds = taosMemoryCalloc(pReq->ntbCfg.pRSmaParam->nFuncIds, sizeof(func_id_t));
|
||||
if (NULL == pReq->ntbCfg.pRSmaParam->pFuncIds) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t index = 0;
|
||||
SNode* pFunc = NULL;
|
||||
FOREACH(pFunc, pOptions->pFuncs) {
|
||||
pReq->ntbCfg.pRSmaParam->pFuncIds[index++] = ((SFunctionNode*)pFunc)->funcId;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo, SVgroupTablesBatch* pBatch) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
SName name = { .type = TSDB_DB_NAME_T, .acctId = acctId };
|
||||
strcpy(name.dbname, pDbName);
|
||||
strcpy(name.dbname, pStmt->dbName);
|
||||
tNameGetFullDbName(&name, dbFName);
|
||||
|
||||
SVCreateTbReq req = {0};
|
||||
req.type = TD_NORMAL_TABLE;
|
||||
req.dbFName = strdup(dbFName);
|
||||
req.name = strdup(pTableName);
|
||||
req.ntbCfg.nCols = LIST_LENGTH(pColumns);
|
||||
req.name = strdup(pStmt->tableName);
|
||||
req.ntbCfg.nCols = LIST_LENGTH(pStmt->pCols);
|
||||
req.ntbCfg.pSchema = taosMemoryCalloc(req.ntbCfg.nCols, sizeof(SSchemaEx));
|
||||
if (NULL == req.name || NULL == req.ntbCfg.pSchema) {
|
||||
destroyCreateTbReq(&req);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
SNode* pCol;
|
||||
int32_t index = 0;
|
||||
FOREACH(pCol, pColumns) {
|
||||
toSchema((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
|
||||
col_id_t index = 0;
|
||||
FOREACH(pCol, pStmt->pCols) {
|
||||
toSchemaEx((SColumnDefNode*)pCol, index + 1, req.ntbCfg.pSchema + index);
|
||||
++index;
|
||||
}
|
||||
// TODO: use the real sma for normal table.
|
||||
if (TSDB_CODE_SUCCESS != buildSmaParam(pStmt->pOptions, &req)) {
|
||||
destroyCreateTbReq(&req);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pBatch->info = *pVgroupInfo;
|
||||
strcpy(pBatch->dbName, pDbName);
|
||||
strcpy(pBatch->dbName, pStmt->dbName);
|
||||
pBatch->req.pArray = taosArrayInit(1, sizeof(struct SVCreateTbReq));
|
||||
if (NULL == pBatch->req.pArray) {
|
||||
destroyCreateTbReq(&req);
|
||||
|
@ -2282,7 +2313,7 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
|
|||
}
|
||||
|
||||
SVgroupTablesBatch tbatch = {0};
|
||||
int32_t code = buildNormalTableBatchReq(acctId, pStmt->dbName, pStmt->tableName, pStmt->pCols, pInfo, &tbatch);
|
||||
int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = serializeVgroupTablesBatch(&tbatch, *pBufArray);
|
||||
}
|
||||
|
@ -2297,8 +2328,11 @@ static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt*
|
|||
static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
|
||||
SCreateTableStmt* pStmt = (SCreateTableStmt*)pQuery->pRoot;
|
||||
|
||||
int32_t code = checkCreateTable(pCxt, pStmt);
|
||||
SVgroupInfo info = {0};
|
||||
int32_t code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getTableHashVgroup(pCxt, pStmt->dbName, pStmt->tableName, &info);
|
||||
}
|
||||
SArray* pBufArray = NULL;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray);
|
||||
|
|
|
@ -44,7 +44,7 @@ protected:
|
|||
query_ = nullptr;
|
||||
bool res = runImpl(parseCode, translateCode);
|
||||
qDestroyQuery(query_);
|
||||
if (!res) {
|
||||
if (1/*!res*/) {
|
||||
dump();
|
||||
}
|
||||
return res;
|
||||
|
@ -69,6 +69,12 @@ private:
|
|||
return (terrno == translateCode);
|
||||
}
|
||||
translatedAstStr_ = toString(query_->pRoot);
|
||||
code = calculateConstant(&cxt_, query_);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
calcConstErrStr_ = string("code:") + tstrerror(code) + string(", msg:") + errMagBuf_;
|
||||
return false;
|
||||
}
|
||||
calcConstAstStr_ = toString(query_->pRoot);
|
||||
return (TSDB_CODE_SUCCESS == translateCode);
|
||||
}
|
||||
|
||||
|
@ -88,6 +94,13 @@ private:
|
|||
cout << "translate output: " << endl;
|
||||
cout << translatedAstStr_ << endl;
|
||||
}
|
||||
if (!calcConstErrStr_.empty()) {
|
||||
cout << "calculateConstant error: " << calcConstErrStr_ << endl;
|
||||
}
|
||||
if (!calcConstAstStr_.empty()) {
|
||||
cout << "calculateConstant output: " << endl;
|
||||
cout << calcConstAstStr_ << endl;
|
||||
}
|
||||
}
|
||||
|
||||
string toString(const SNode* pRoot, bool format = false) {
|
||||
|
@ -112,6 +125,8 @@ private:
|
|||
parsedAstStr_.clear();
|
||||
translateErrStr_.clear();
|
||||
translatedAstStr_.clear();
|
||||
calcConstErrStr_.clear();
|
||||
calcConstAstStr_.clear();
|
||||
}
|
||||
|
||||
string acctId_;
|
||||
|
@ -124,6 +139,8 @@ private:
|
|||
string parsedAstStr_;
|
||||
string translateErrStr_;
|
||||
string translatedAstStr_;
|
||||
string calcConstErrStr_;
|
||||
string calcConstAstStr_;
|
||||
};
|
||||
|
||||
TEST_F(ParserTest, createAccount) {
|
||||
|
@ -191,6 +208,9 @@ TEST_F(ParserTest, selectConstant) {
|
|||
|
||||
bind("SELECT 1234567890123456789012345678901234567890, 20.1234567890123456789012345678901234567890, 'abc', \"wxy\", TIMESTAMP '2022-02-09 17:30:20', true, false, 15s FROM t1");
|
||||
ASSERT_TRUE(run());
|
||||
|
||||
bind("SELECT 123 + 45 FROM t1 where 2 - 1");
|
||||
ASSERT_TRUE(run());
|
||||
}
|
||||
|
||||
TEST_F(ParserTest, selectExpression) {
|
||||
|
|
Loading…
Reference in New Issue