feat: tag index filter plan
This commit is contained in:
parent
07d0a3a52a
commit
8da52cbb04
|
@ -146,7 +146,8 @@ bool fmIsBuiltinFunc(const char* pFunc);
|
||||||
|
|
||||||
bool fmIsAggFunc(int32_t funcId);
|
bool fmIsAggFunc(int32_t funcId);
|
||||||
bool fmIsScalarFunc(int32_t funcId);
|
bool fmIsScalarFunc(int32_t funcId);
|
||||||
bool fmIsNonstandardSQLFunc(int32_t funcId);
|
bool fmIsVectorFunc(int32_t funcId);
|
||||||
|
bool fmIsIndefiniteRowsFunc(int32_t funcId);
|
||||||
bool fmIsStringFunc(int32_t funcId);
|
bool fmIsStringFunc(int32_t funcId);
|
||||||
bool fmIsDatetimeFunc(int32_t funcId);
|
bool fmIsDatetimeFunc(int32_t funcId);
|
||||||
bool fmIsSelectFunc(int32_t funcId);
|
bool fmIsSelectFunc(int32_t funcId);
|
||||||
|
|
|
@ -54,6 +54,7 @@ typedef struct SScanLogicNode {
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
int8_t intervalUnit;
|
int8_t intervalUnit;
|
||||||
int8_t slidingUnit;
|
int8_t slidingUnit;
|
||||||
|
SNode* pTagCond;
|
||||||
} SScanLogicNode;
|
} SScanLogicNode;
|
||||||
|
|
||||||
typedef struct SJoinLogicNode {
|
typedef struct SJoinLogicNode {
|
||||||
|
@ -343,6 +344,7 @@ typedef struct SSubplan {
|
||||||
SNodeList* pParents; // the data destination subplan, get data from current subplan
|
SNodeList* pParents; // the data destination subplan, get data from current subplan
|
||||||
SPhysiNode* pNode; // physical plan of current subplan
|
SPhysiNode* pNode; // physical plan of current subplan
|
||||||
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
|
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
|
||||||
|
SNode* pTagCond;
|
||||||
} SSubplan;
|
} SSubplan;
|
||||||
|
|
||||||
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;
|
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;
|
||||||
|
|
|
@ -236,7 +236,7 @@ typedef struct SSelectStmt {
|
||||||
bool isTimeOrderQuery;
|
bool isTimeOrderQuery;
|
||||||
bool hasAggFuncs;
|
bool hasAggFuncs;
|
||||||
bool hasRepeatScanFuncs;
|
bool hasRepeatScanFuncs;
|
||||||
bool hasNonstdSQLFunc;
|
bool hasIndefiniteRowsFunc;
|
||||||
} SSelectStmt;
|
} SSelectStmt;
|
||||||
|
|
||||||
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;
|
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;
|
||||||
|
|
|
@ -155,8 +155,9 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
|
||||||
|
|
||||||
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
||||||
|
|
||||||
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo,
|
||||||
const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset,
|
||||||
|
SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
||||||
|
@ -583,8 +584,9 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
|
||||||
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
|
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
|
||||||
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
|
||||||
|
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||||
// keep it temporarily
|
// keep it temporarily
|
||||||
bool hasAgg = pCtx[k].input.colDataAggIsSet;
|
bool hasAgg = pCtx[k].input.colDataAggIsSet;
|
||||||
|
@ -666,8 +668,8 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
||||||
bool createDummyCol) {
|
int32_t scanFlag, bool createDummyCol) {
|
||||||
if (pBlock->pBlockAgg != NULL) {
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
||||||
} else {
|
} else {
|
||||||
|
@ -853,7 +855,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
// _rowts/_c0, not tbname column
|
// _rowts/_c0, not tbname column
|
||||||
if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
|
if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) {
|
} else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
|
||||||
pfCtx->fpSet.init(&pCtx[k], pResInfo);
|
pfCtx->fpSet.init(&pCtx[k], pResInfo);
|
||||||
|
|
||||||
|
@ -1073,7 +1075,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
if (strcmp(pCtx[i].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
|
||||||
pValCtx[num++] = &pCtx[i];
|
pValCtx[num++] = &pCtx[i];
|
||||||
} else if (fmIsAggFunc(pCtx[i].functionId)) {
|
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
|
||||||
p = &pCtx[i];
|
p = &pCtx[i];
|
||||||
}
|
}
|
||||||
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
|
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
|
||||||
|
@ -1124,7 +1126,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
||||||
SFuncExecEnv env = {0};
|
SFuncExecEnv env = {0};
|
||||||
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
|
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
|
||||||
|
|
||||||
if (fmIsAggFunc(pCtx->functionId) || fmIsNonstandardSQLFunc(pCtx->functionId)) {
|
if (fmIsAggFunc(pCtx->functionId) || fmIsIndefiniteRowsFunc(pCtx->functionId)) {
|
||||||
bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
|
bool isUdaf = fmIsUserDefinedFunc(pCtx->functionId);
|
||||||
if (!isUdaf) {
|
if (!isUdaf) {
|
||||||
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
|
||||||
|
@ -2006,8 +2008,9 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
|
||||||
const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
|
SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx,
|
||||||
|
int32_t numOfExprs) {
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
int32_t start = pGroupResInfo->index;
|
int32_t start = pGroupResInfo->index;
|
||||||
|
|
||||||
|
@ -2071,12 +2074,14 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId);
|
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||||
|
pBlock->info.groupId);
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) {
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
|
SDiskbasedBuf* pBuf) {
|
||||||
SExprInfo* pExprInfo = pOperator->pExpr;
|
SExprInfo* pExprInfo = pOperator->pExpr;
|
||||||
int32_t numOfExprs = pOperator->numOfExprs;
|
int32_t numOfExprs = pOperator->numOfExprs;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -3493,7 +3498,8 @@ _error:
|
||||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
|
||||||
// todo add more information about exchange operation
|
// todo add more information about exchange operation
|
||||||
int32_t type = pOperator->operatorType;
|
int32_t type = pOperator->operatorType;
|
||||||
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN ||
|
||||||
|
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
*order = TSDB_ORDER_ASC;
|
*order = TSDB_ORDER_ASC;
|
||||||
*scanFlag = MAIN_SCAN;
|
*scanFlag = MAIN_SCAN;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3860,7 +3866,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo);
|
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
||||||
|
pProjectInfo->pPseudoColInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -4601,8 +4608,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||||
} else {
|
} else {
|
||||||
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||||
queryId, taskId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
|
@ -4618,10 +4624,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
SArray* pCols =
|
||||||
|
extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols, tableIdList, pTaskInfo,
|
SOperatorInfo* pOperator =
|
||||||
pScanPhyNode->node.pConditions, pOperatorDumy);
|
createStreamScanOperatorInfo(pHandle->reader, pDataReader, pHandle, pScanPhyNode->uid, pResBlock, pCols,
|
||||||
|
tableIdList, pTaskInfo, pScanPhyNode->node.pConditions, pOperatorDumy);
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||||
|
@ -4633,7 +4641,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
SArray* colList =
|
||||||
|
extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
||||||
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
||||||
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
||||||
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
||||||
|
@ -4655,8 +4664,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* colList =
|
SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo,
|
||||||
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
|
COL_MATCH_FROM_COL_ID);
|
||||||
|
|
||||||
SOperatorInfo* pOperator =
|
SOperatorInfo* pOperator =
|
||||||
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
|
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
|
||||||
|
@ -4738,7 +4747,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* pColList = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
SArray* pColList =
|
||||||
|
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||||
|
@ -5239,15 +5249,15 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize,
|
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize, const char* pKey,
|
||||||
const char* pKey, const char* pDir) {
|
const char* pDir) {
|
||||||
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
||||||
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
|
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
|
||||||
int32_t pageSize = rowSize * 32;
|
int32_t pageSize = rowSize * 32;
|
||||||
int32_t bufSize = pageSize * 4096;
|
int32_t bufSize = pageSize * 4096;
|
||||||
createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
|
createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);;
|
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);
|
||||||
|
;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,7 @@ extern "C" {
|
||||||
|
|
||||||
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
|
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
|
||||||
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
|
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
|
||||||
#define FUNC_MGT_NONSTANDARD_SQL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
|
#define FUNC_MGT_INDEFINITE_ROWS_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
|
||||||
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
|
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
|
||||||
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
|
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
|
||||||
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
|
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
|
||||||
|
|
|
@ -14,8 +14,8 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "builtins.h"
|
#include "builtins.h"
|
||||||
#include "querynodes.h"
|
|
||||||
#include "builtinsimpl.h"
|
#include "builtinsimpl.h"
|
||||||
|
#include "querynodes.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -500,8 +500,7 @@ static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
|
||||||
|
|
||||||
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
|
||||||
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE can only be columns");
|
||||||
"The parameters of UNIQUE can only be columns");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pFunc->node.resType = ((SExprNode*)pPara)->resType;
|
pFunc->node.resType = ((SExprNode*)pPara)->resType;
|
||||||
|
@ -823,7 +822,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "top",
|
.name = "top",
|
||||||
.type = FUNCTION_TYPE_TOP,
|
.type = FUNCTION_TYPE_TOP,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
|
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||||
.translateFunc = translateTop,
|
.translateFunc = translateTop,
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -833,7 +832,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "bottom",
|
.name = "bottom",
|
||||||
.type = FUNCTION_TYPE_BOTTOM,
|
.type = FUNCTION_TYPE_BOTTOM,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC,
|
.classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||||
.translateFunc = translateBottom,
|
.translateFunc = translateBottom,
|
||||||
.getEnvFunc = getTopBotFuncEnv,
|
.getEnvFunc = getTopBotFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -915,7 +914,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "diff",
|
.name = "diff",
|
||||||
.type = FUNCTION_TYPE_DIFF,
|
.type = FUNCTION_TYPE_DIFF,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateDiff,
|
.translateFunc = translateDiff,
|
||||||
.getEnvFunc = getDiffFuncEnv,
|
.getEnvFunc = getDiffFuncEnv,
|
||||||
.initFunc = diffFunctionSetup,
|
.initFunc = diffFunctionSetup,
|
||||||
|
@ -925,7 +924,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "state_count",
|
.name = "state_count",
|
||||||
.type = FUNCTION_TYPE_STATE_COUNT,
|
.type = FUNCTION_TYPE_STATE_COUNT,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC,
|
||||||
.translateFunc = translateStateCount,
|
.translateFunc = translateStateCount,
|
||||||
.getEnvFunc = getStateFuncEnv,
|
.getEnvFunc = getStateFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -935,7 +934,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "state_duration",
|
.name = "state_duration",
|
||||||
.type = FUNCTION_TYPE_STATE_DURATION,
|
.type = FUNCTION_TYPE_STATE_DURATION,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateStateDuration,
|
.translateFunc = translateStateDuration,
|
||||||
.getEnvFunc = getStateFuncEnv,
|
.getEnvFunc = getStateFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -945,7 +944,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "csum",
|
.name = "csum",
|
||||||
.type = FUNCTION_TYPE_CSUM,
|
.type = FUNCTION_TYPE_CSUM,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateCsum,
|
.translateFunc = translateCsum,
|
||||||
.getEnvFunc = getCsumFuncEnv,
|
.getEnvFunc = getCsumFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
@ -955,7 +954,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "mavg",
|
.name = "mavg",
|
||||||
.type = FUNCTION_TYPE_MAVG,
|
.type = FUNCTION_TYPE_MAVG,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateMavg,
|
.translateFunc = translateMavg,
|
||||||
.getEnvFunc = getMavgFuncEnv,
|
.getEnvFunc = getMavgFuncEnv,
|
||||||
.initFunc = mavgFunctionSetup,
|
.initFunc = mavgFunctionSetup,
|
||||||
|
@ -965,7 +964,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "sample",
|
.name = "sample",
|
||||||
.type = FUNCTION_TYPE_SAMPLE,
|
.type = FUNCTION_TYPE_SAMPLE,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateSample,
|
.translateFunc = translateSample,
|
||||||
.getEnvFunc = getSampleFuncEnv,
|
.getEnvFunc = getSampleFuncEnv,
|
||||||
.initFunc = sampleFunctionSetup,
|
.initFunc = sampleFunctionSetup,
|
||||||
|
@ -975,7 +974,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "tail",
|
.name = "tail",
|
||||||
.type = FUNCTION_TYPE_TAIL,
|
.type = FUNCTION_TYPE_TAIL,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateTail,
|
.translateFunc = translateTail,
|
||||||
.getEnvFunc = getTailFuncEnv,
|
.getEnvFunc = getTailFuncEnv,
|
||||||
.initFunc = tailFunctionSetup,
|
.initFunc = tailFunctionSetup,
|
||||||
|
@ -985,7 +984,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "unique",
|
.name = "unique",
|
||||||
.type = FUNCTION_TYPE_UNIQUE,
|
.type = FUNCTION_TYPE_UNIQUE,
|
||||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||||
.translateFunc = translateUnique,
|
.translateFunc = translateUnique,
|
||||||
.getEnvFunc = getUniqueFuncEnv,
|
.getEnvFunc = getUniqueFuncEnv,
|
||||||
.initFunc = uniqueFunctionSetup,
|
.initFunc = uniqueFunctionSetup,
|
||||||
|
|
|
@ -149,6 +149,8 @@ bool fmIsAggFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MG
|
||||||
|
|
||||||
bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); }
|
bool fmIsScalarFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SCALAR_FUNC); }
|
||||||
|
|
||||||
|
bool fmIsVectorFunc(int32_t funcId) { return !fmIsScalarFunc(funcId); }
|
||||||
|
|
||||||
bool fmIsSelectFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SELECT_FUNC); }
|
bool fmIsSelectFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_SELECT_FUNC); }
|
||||||
|
|
||||||
bool fmIsTimelineFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_TIMELINE_FUNC); }
|
bool fmIsTimelineFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_TIMELINE_FUNC); }
|
||||||
|
@ -161,7 +163,7 @@ bool fmIsWindowPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(
|
||||||
|
|
||||||
bool fmIsWindowClauseFunc(int32_t funcId) { return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId); }
|
bool fmIsWindowClauseFunc(int32_t funcId) { return fmIsAggFunc(funcId) || fmIsWindowPseudoColumnFunc(funcId); }
|
||||||
|
|
||||||
bool fmIsNonstandardSQLFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_NONSTANDARD_SQL_FUNC); }
|
bool fmIsIndefiniteRowsFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_INDEFINITE_ROWS_FUNC); }
|
||||||
|
|
||||||
bool fmIsSpecialDataRequiredFunc(int32_t funcId) {
|
bool fmIsSpecialDataRequiredFunc(int32_t funcId) {
|
||||||
return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED);
|
return isSpecificClassifyFunc(funcId, FUNC_MGT_SPECIAL_DATA_REQUIRED);
|
||||||
|
|
|
@ -490,6 +490,7 @@ static const char* jkScanLogicPlanScanCols = "ScanCols";
|
||||||
static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
|
static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
|
||||||
static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize";
|
static const char* jkScanLogicPlanTableMetaSize = "TableMetaSize";
|
||||||
static const char* jkScanLogicPlanTableMeta = "TableMeta";
|
static const char* jkScanLogicPlanTableMeta = "TableMeta";
|
||||||
|
static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||||
|
|
||||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||||
|
@ -507,6 +508,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta);
|
code = tjsonAddObject(pJson, jkScanLogicPlanTableMeta, tableMetaToJson, pNode->pMeta);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddObject(pJson, jkScanLogicPlanTagCond, nodeToJson, pNode->pTagCond);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -528,6 +532,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonMakeObject(pJson, jkScanLogicPlanTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize);
|
code = tjsonMakeObject(pJson, jkScanLogicPlanTableMeta, jsonToTableMeta, (void**)&pNode->pMeta, objSize);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeObject(pJson, jkScanLogicPlanTagCond, &pNode->pTagCond);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1775,6 +1782,7 @@ static const char* jkSubplanDbFName = "DbFName";
|
||||||
static const char* jkSubplanNodeAddr = "NodeAddr";
|
static const char* jkSubplanNodeAddr = "NodeAddr";
|
||||||
static const char* jkSubplanRootNode = "RootNode";
|
static const char* jkSubplanRootNode = "RootNode";
|
||||||
static const char* jkSubplanDataSink = "DataSink";
|
static const char* jkSubplanDataSink = "DataSink";
|
||||||
|
static const char* jkSubplanTagCond = "TagCond";
|
||||||
|
|
||||||
static int32_t subplanToJson(const void* pObj, SJson* pJson) {
|
static int32_t subplanToJson(const void* pObj, SJson* pJson) {
|
||||||
const SSubplan* pNode = (const SSubplan*)pObj;
|
const SSubplan* pNode = (const SSubplan*)pObj;
|
||||||
|
@ -1801,6 +1809,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkSubplanDataSink, nodeToJson, pNode->pDataSink);
|
code = tjsonAddObject(pJson, jkSubplanDataSink, nodeToJson, pNode->pDataSink);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddObject(pJson, jkSubplanTagCond, nodeToJson, pNode->pTagCond);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1831,6 +1842,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeObject(pJson, jkSubplanDataSink, (SNode**)&pNode->pDataSink);
|
code = jsonToNodeObject(pJson, jkSubplanDataSink, (SNode**)&pNode->pDataSink);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeObject(pJson, jkSubplanTagCond, (SNode**)&pNode->pTagCond);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -342,26 +342,20 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ
|
||||||
CHECK_OUT_OF_MEM(cond);
|
CHECK_OUT_OF_MEM(cond);
|
||||||
cond->condType = type;
|
cond->condType = type;
|
||||||
cond->pParameterList = nodesMakeList();
|
cond->pParameterList = nodesMakeList();
|
||||||
if ((QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type != ((SLogicConditionNode*)pParam1)->condType) ||
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1) && type == ((SLogicConditionNode*)pParam1)->condType) {
|
||||||
(QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type != ((SLogicConditionNode*)pParam2)->condType)) {
|
|
||||||
nodesListAppend(cond->pParameterList, pParam1);
|
|
||||||
nodesListAppend(cond->pParameterList, pParam2);
|
|
||||||
} else {
|
|
||||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam1)) {
|
|
||||||
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList);
|
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam1)->pParameterList);
|
||||||
((SLogicConditionNode*)pParam1)->pParameterList = NULL;
|
((SLogicConditionNode*)pParam1)->pParameterList = NULL;
|
||||||
nodesDestroyNode(pParam1);
|
nodesDestroyNode(pParam1);
|
||||||
} else {
|
} else {
|
||||||
nodesListAppend(cond->pParameterList, pParam1);
|
nodesListAppend(cond->pParameterList, pParam1);
|
||||||
}
|
}
|
||||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2)) {
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pParam2) && type == ((SLogicConditionNode*)pParam2)->condType) {
|
||||||
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList);
|
nodesListAppendList(cond->pParameterList, ((SLogicConditionNode*)pParam2)->pParameterList);
|
||||||
((SLogicConditionNode*)pParam2)->pParameterList = NULL;
|
((SLogicConditionNode*)pParam2)->pParameterList = NULL;
|
||||||
nodesDestroyNode(pParam2);
|
nodesDestroyNode(pParam2);
|
||||||
} else {
|
} else {
|
||||||
nodesListAppend(cond->pParameterList, pParam2);
|
nodesListAppend(cond->pParameterList, pParam2);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return (SNode*)cond;
|
return (SNode*)cond;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -292,8 +292,8 @@ static bool isScanPseudoColumnFunc(const SNode* pNode) {
|
||||||
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsScanPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
|
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsScanPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isNonstandardSQLFunc(const SNode* pNode) {
|
static bool isIndefiniteRowsFunc(const SNode* pNode) {
|
||||||
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsNonstandardSQLFunc(((SFunctionNode*)pNode)->funcId));
|
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsIndefiniteRowsFunc(((SFunctionNode*)pNode)->funcId));
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isDistinctOrderBy(STranslateContext* pCxt) {
|
static bool isDistinctOrderBy(STranslateContext* pCxt) {
|
||||||
|
@ -806,7 +806,7 @@ static EDealRes haveAggOrNonstdFunction(SNode* pNode, void* pContext) {
|
||||||
if (isAggFunc(pNode)) {
|
if (isAggFunc(pNode)) {
|
||||||
*((bool*)pContext) = true;
|
*((bool*)pContext) = true;
|
||||||
return DEAL_RES_END;
|
return DEAL_RES_END;
|
||||||
} else if (isNonstandardSQLFunc(pNode)) {
|
} else if (isIndefiniteRowsFunc(pNode)) {
|
||||||
*((bool*)pContext) = true;
|
*((bool*)pContext) = true;
|
||||||
return DEAL_RES_END;
|
return DEAL_RES_END;
|
||||||
}
|
}
|
||||||
|
@ -851,6 +851,15 @@ static bool hasInvalidFuncNesting(SNodeList* pParameterList) {
|
||||||
return hasInvalidFunc;
|
return hasInvalidFunc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t getFuncInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||||
|
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
|
||||||
|
.pRpc = pCxt->pParseCxt->pTransporter,
|
||||||
|
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet,
|
||||||
|
.pErrBuf = pCxt->msgBuf.buf,
|
||||||
|
.errBufLen = pCxt->msgBuf.len};
|
||||||
|
return fmGetFuncInfo(¶m, pFunc);
|
||||||
|
}
|
||||||
|
|
||||||
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc) {
|
||||||
SNode* pParam = NULL;
|
SNode* pParam = NULL;
|
||||||
FOREACH(pParam, pFunc->pParameterList) {
|
FOREACH(pParam, pFunc->pParameterList) {
|
||||||
|
@ -859,12 +868,7 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SFmGetFuncInfoParam param = {.pCtg = pCxt->pParseCxt->pCatalog,
|
pCxt->errCode = getFuncInfo(pCxt, pFunc);
|
||||||
.pRpc = pCxt->pParseCxt->pTransporter,
|
|
||||||
.pMgmtEps = &pCxt->pParseCxt->mgmtEpSet,
|
|
||||||
.pErrBuf = pCxt->msgBuf.buf,
|
|
||||||
.errBufLen = pCxt->msgBuf.len};
|
|
||||||
pCxt->errCode = fmGetFuncInfo(¶m, pFunc);
|
|
||||||
if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsAggFunc(pFunc->funcId)) {
|
if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsAggFunc(pFunc->funcId)) {
|
||||||
if (beforeHaving(pCxt->currClause)) {
|
if (beforeHaving(pCxt->currClause)) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION);
|
||||||
|
@ -872,7 +876,7 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
|
||||||
if (hasInvalidFuncNesting(pFunc->pParameterList)) {
|
if (hasInvalidFuncNesting(pFunc->pParameterList)) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING);
|
||||||
}
|
}
|
||||||
if (pCxt->pCurrStmt->hasNonstdSQLFunc) {
|
if (pCxt->pCurrStmt->hasIndefiniteRowsFunc) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -899,14 +903,15 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsNonstandardSQLFunc(pFunc->funcId)) {
|
if (TSDB_CODE_SUCCESS == pCxt->errCode && fmIsIndefiniteRowsFunc(pFunc->funcId)) {
|
||||||
if (SQL_CLAUSE_SELECT != pCxt->currClause || pCxt->pCurrStmt->hasNonstdSQLFunc || pCxt->pCurrStmt->hasAggFuncs) {
|
if (SQL_CLAUSE_SELECT != pCxt->currClause || pCxt->pCurrStmt->hasIndefiniteRowsFunc ||
|
||||||
|
pCxt->pCurrStmt->hasAggFuncs) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_NOT_ALLOWED_FUNC);
|
||||||
}
|
}
|
||||||
if (hasInvalidFuncNesting(pFunc->pParameterList)) {
|
if (hasInvalidFuncNesting(pFunc->pParameterList)) {
|
||||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING);
|
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_AGG_FUNC_NESTING);
|
||||||
}
|
}
|
||||||
pCxt->pCurrStmt->hasNonstdSQLFunc = true;
|
pCxt->pCurrStmt->hasIndefiniteRowsFunc = true;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
|
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -990,7 +995,7 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode
|
||||||
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
|
strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName);
|
||||||
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
|
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
|
||||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||||
translateFunction(pCxt, pFunc);
|
pCxt->errCode == getFuncInfo(pCxt, pFunc);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
|
||||||
*pNode = (SNode*)pFunc;
|
*pNode = (SNode*)pFunc;
|
||||||
|
@ -1060,7 +1065,7 @@ static int32_t checkExprListForGroupBy(STranslateContext* pCxt, SNodeList* pList
|
||||||
}
|
}
|
||||||
|
|
||||||
static EDealRes rewriteColsToSelectValFuncImpl(SNode** pNode, void* pContext) {
|
static EDealRes rewriteColsToSelectValFuncImpl(SNode** pNode, void* pContext) {
|
||||||
if (isAggFunc(*pNode)) {
|
if (isAggFunc(*pNode) || isIndefiniteRowsFunc(*pNode)) {
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||||
|
@ -1097,7 +1102,7 @@ static EDealRes doCheckAggColCoexist(SNode* pNode, void* pContext) {
|
||||||
pCxt->existAggFunc = true;
|
pCxt->existAggFunc = true;
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
if (isNonstandardSQLFunc(pNode)) {
|
if (isIndefiniteRowsFunc(pNode)) {
|
||||||
pCxt->existNonstdFunc = true;
|
pCxt->existNonstdFunc = true;
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
|
@ -1939,7 +1944,7 @@ static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType d
|
||||||
nodesDestroyNode(pFunc);
|
nodesDestroyNode(pFunc);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
if (DEAL_RES_ERROR == translateFunction(pCxt, pFunc)) {
|
if (TSDB_CODE_SUCCESS != getFuncInfo(pCxt, pFunc)) {
|
||||||
nodesClearList(pFunc->pParameterList);
|
nodesClearList(pFunc->pParameterList);
|
||||||
pFunc->pParameterList = NULL;
|
pFunc->pParameterList = NULL;
|
||||||
nodesDestroyNode(pFunc);
|
nodesDestroyNode(pFunc);
|
||||||
|
@ -4082,10 +4087,11 @@ static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SS
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) {
|
static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* pFunc, SValueNode** pVal) {
|
||||||
if (DEAL_RES_ERROR == translateFunction(pCxt, pFunc)) {
|
int32_t code = getFuncInfo(pCxt, pFunc);
|
||||||
return pCxt->errCode;
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = scalarCalculateConstants((SNode*)pFunc, (SNode**)pVal);
|
||||||
}
|
}
|
||||||
return scalarCalculateConstants((SNode*)pFunc, (SNode**)pVal);
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t colDataBytesToValueDataBytes(uint8_t type, int32_t bytes) {
|
static int32_t colDataBytesToValueDataBytes(uint8_t type, int32_t bytes) {
|
||||||
|
|
|
@ -418,7 +418,7 @@ static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
|
||||||
if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) {
|
if (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && NULL == pSelect->pGroupByList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -442,8 +442,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
||||||
code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
|
code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && pSelect->hasAggFuncs) {
|
if (TSDB_CODE_SUCCESS == code && (pSelect->hasAggFuncs || pSelect->hasIndefiniteRowsFunc)) {
|
||||||
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsAggFunc, &pAgg->pAggFuncs);
|
code = nodesCollectFuncs(pSelect, SQL_CLAUSE_GROUP_BY, fmIsVectorFunc, &pAgg->pAggFuncs);
|
||||||
}
|
}
|
||||||
|
|
||||||
// rewrite the expression in subsequent clauses
|
// rewrite the expression in subsequent clauses
|
||||||
|
|
|
@ -313,22 +313,53 @@ static EDealRes cpdIsPrimaryKeyCondImpl(SNode* pNode, void* pContext) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool cpdIsPrimaryKeyCond(SNode* pNode) {
|
static bool cpdIsPrimaryKeyCond(SNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
bool isPrimaryKeyCond = false;
|
bool isPrimaryKeyCond = false;
|
||||||
nodesWalkExpr(pNode, cpdIsPrimaryKeyCondImpl, &isPrimaryKeyCond);
|
nodesWalkExpr(pNode, cpdIsPrimaryKeyCondImpl, &isPrimaryKeyCond);
|
||||||
return isPrimaryKeyCond;
|
return isPrimaryKeyCond;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cpdPartitionScanLogicCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pOtherCond) {
|
static EDealRes cpdIsTagCondImpl(SNode* pNode, void* pContext) {
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
*((bool*)pContext) = ((COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType) ? true : false);
|
||||||
|
return *((bool*)pContext) ? DEAL_RES_CONTINUE : DEAL_RES_END;
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool cpdIsTagCond(SNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pNode)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
bool isTagCond = false;
|
||||||
|
nodesWalkExpr(pNode, cpdIsTagCondImpl, &isTagCond);
|
||||||
|
return isTagCond;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t cpdPartitionScanLogicCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pTagCond,
|
||||||
|
SNode** pOtherCond) {
|
||||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pScan->node.pConditions;
|
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)pScan->node.pConditions;
|
||||||
|
|
||||||
|
if (LOGIC_COND_TYPE_AND != pLogicCond->condType) {
|
||||||
|
*pPrimaryKeyCond = NULL;
|
||||||
|
*pOtherCond = pScan->node.pConditions;
|
||||||
|
pScan->node.pConditions = NULL;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SNodeList* pPrimaryKeyConds = NULL;
|
SNodeList* pPrimaryKeyConds = NULL;
|
||||||
|
SNodeList* pTagConds = NULL;
|
||||||
SNodeList* pOtherConds = NULL;
|
SNodeList* pOtherConds = NULL;
|
||||||
SNode* pCond = NULL;
|
SNode* pCond = NULL;
|
||||||
FOREACH(pCond, pLogicCond->pParameterList) {
|
FOREACH(pCond, pLogicCond->pParameterList) {
|
||||||
if (cpdIsPrimaryKeyCond(pCond)) {
|
if (cpdIsPrimaryKeyCond(pCond)) {
|
||||||
code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond));
|
code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond));
|
||||||
|
} else if (cpdIsTagCond(pScan->node.pConditions)) {
|
||||||
|
code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond));
|
||||||
} else {
|
} else {
|
||||||
code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond));
|
code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond));
|
||||||
}
|
}
|
||||||
|
@ -338,37 +369,46 @@ static int32_t cpdPartitionScanLogicCond(SScanLogicNode* pScan, SNode** pPrimary
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* pTempPrimaryKeyCond = NULL;
|
SNode* pTempPrimaryKeyCond = NULL;
|
||||||
|
SNode* pTempTagCond = NULL;
|
||||||
SNode* pTempOtherCond = NULL;
|
SNode* pTempOtherCond = NULL;
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = cpdMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds);
|
code = cpdMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = cpdMergeConds(&pTempTagCond, &pTagConds);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = cpdMergeConds(&pTempOtherCond, &pOtherConds);
|
code = cpdMergeConds(&pTempOtherCond, &pOtherConds);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
*pPrimaryKeyCond = pTempPrimaryKeyCond;
|
*pPrimaryKeyCond = pTempPrimaryKeyCond;
|
||||||
|
*pTagCond = pTempTagCond;
|
||||||
*pOtherCond = pTempOtherCond;
|
*pOtherCond = pTempOtherCond;
|
||||||
nodesDestroyNode(pScan->node.pConditions);
|
nodesDestroyNode(pScan->node.pConditions);
|
||||||
pScan->node.pConditions = NULL;
|
pScan->node.pConditions = NULL;
|
||||||
} else {
|
} else {
|
||||||
nodesDestroyList(pPrimaryKeyConds);
|
nodesDestroyList(pPrimaryKeyConds);
|
||||||
|
nodesDestroyList(pTagConds);
|
||||||
nodesDestroyList(pOtherConds);
|
nodesDestroyList(pOtherConds);
|
||||||
nodesDestroyNode(pTempPrimaryKeyCond);
|
nodesDestroyNode(pTempPrimaryKeyCond);
|
||||||
|
nodesDestroyNode(pTempTagCond);
|
||||||
nodesDestroyNode(pTempOtherCond);
|
nodesDestroyNode(pTempOtherCond);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cpdPartitionScanCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pOtherCond) {
|
static int32_t cpdPartitionScanCond(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, SNode** pTagCond,
|
||||||
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pScan->node.pConditions) &&
|
SNode** pOtherCond) {
|
||||||
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)pScan->node.pConditions)->condType) {
|
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pScan->node.pConditions)) {
|
||||||
return cpdPartitionScanLogicCond(pScan, pPrimaryKeyCond, pOtherCond);
|
return cpdPartitionScanLogicCond(pScan, pPrimaryKeyCond, pTagCond, pOtherCond);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cpdIsPrimaryKeyCond(pScan->node.pConditions)) {
|
if (cpdIsPrimaryKeyCond(pScan->node.pConditions)) {
|
||||||
*pPrimaryKeyCond = pScan->node.pConditions;
|
*pPrimaryKeyCond = pScan->node.pConditions;
|
||||||
|
} else if (cpdIsTagCond(pScan->node.pConditions)) {
|
||||||
|
*pTagCond = pScan->node.pConditions;
|
||||||
} else {
|
} else {
|
||||||
*pOtherCond = pScan->node.pConditions;
|
*pOtherCond = pScan->node.pConditions;
|
||||||
}
|
}
|
||||||
|
@ -391,6 +431,36 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus;
|
||||||
|
|
||||||
|
static SIdxFltStatus idxGetFltStatus(SNode* pFilterNode) { return SFLT_ACCURATE_INDEX; }
|
||||||
|
|
||||||
|
static int32_t cpdApplyTagIndex(SScanLogicNode* pScan, SNode** pTagCond, SNode** pOtherCond) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SIdxFltStatus idxStatus = idxGetFltStatus(*pTagCond);
|
||||||
|
switch (idxStatus) {
|
||||||
|
case SFLT_NOT_INDEX:
|
||||||
|
code = cpdCondAppend(pOtherCond, pTagCond);
|
||||||
|
break;
|
||||||
|
case SFLT_COARSE_INDEX:
|
||||||
|
pScan->pTagCond = nodesCloneNode(*pTagCond);
|
||||||
|
if (NULL == pScan->pTagCond) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
code = cpdCondAppend(pOtherCond, pTagCond);
|
||||||
|
break;
|
||||||
|
case SFLT_ACCURATE_INDEX:
|
||||||
|
pScan->pTagCond = *pTagCond;
|
||||||
|
*pTagCond = NULL;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
code = TSDB_CODE_FAILED;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
|
static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) {
|
||||||
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD) ||
|
if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD) ||
|
||||||
TSDB_SYSTEM_TABLE == pScan->pMeta->tableType) {
|
TSDB_SYSTEM_TABLE == pScan->pMeta->tableType) {
|
||||||
|
@ -398,11 +468,15 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* pPrimaryKeyCond = NULL;
|
SNode* pPrimaryKeyCond = NULL;
|
||||||
|
SNode* pTagCond = NULL;
|
||||||
SNode* pOtherCond = NULL;
|
SNode* pOtherCond = NULL;
|
||||||
int32_t code = cpdPartitionScanCond(pScan, &pPrimaryKeyCond, &pOtherCond);
|
int32_t code = cpdPartitionScanCond(pScan, &pPrimaryKeyCond, &pTagCond, &pOtherCond);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pPrimaryKeyCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pPrimaryKeyCond) {
|
||||||
code = cpdCalcTimeRange(pScan, &pPrimaryKeyCond, &pOtherCond);
|
code = cpdCalcTimeRange(pScan, &pPrimaryKeyCond, &pOtherCond);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pTagCond) {
|
||||||
|
code = cpdApplyTagIndex(pScan, &pTagCond, &pOtherCond);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pScan->node.pConditions = pOtherCond;
|
pScan->node.pConditions = pOtherCond;
|
||||||
}
|
}
|
||||||
|
@ -618,30 +692,6 @@ static bool cpdContainPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
|
|
||||||
// if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
|
|
||||||
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
|
||||||
// }
|
|
||||||
// return TSDB_CODE_SUCCESS;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
|
|
||||||
// if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
|
|
||||||
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
|
||||||
// }
|
|
||||||
// bool hasPrimaryKeyEqualCond = false;
|
|
||||||
// SNode* pCond = NULL;
|
|
||||||
// FOREACH(pCond, pOnCond->pParameterList) {
|
|
||||||
// if (cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
|
|
||||||
// hasPrimaryKeyEqualCond = true;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// if (!hasPrimaryKeyEqualCond) {
|
|
||||||
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
|
||||||
// }
|
|
||||||
// return TSDB_CODE_SUCCESS;
|
|
||||||
// }
|
|
||||||
|
|
||||||
static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
if (NULL == pJoin->pOnConditions) {
|
if (NULL == pJoin->pOnConditions) {
|
||||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
|
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
|
||||||
|
@ -650,11 +700,6 @@ static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin)
|
||||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
// if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
|
|
||||||
// return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
|
|
||||||
// } else {
|
|
||||||
// return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
|
|
|
@ -411,7 +411,7 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys
|
||||||
return sortScanCols(pScanPhysiNode->pScanCols);
|
return sortScanCols(pScanPhysiNode->pScanCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode,
|
static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||||
SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
|
SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) {
|
||||||
int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
|
int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -438,6 +438,12 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo
|
||||||
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
pScanPhysiNode->uid = pScanLogicNode->pMeta->uid;
|
||||||
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType;
|
||||||
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
|
memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName));
|
||||||
|
if (NULL != pScanLogicNode->pTagCond) {
|
||||||
|
pSubplan->pTagCond = nodesCloneNode(pScanLogicNode->pTagCond);
|
||||||
|
if (NULL == pSubplan->pTagCond) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -463,7 +469,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
|
||||||
}
|
}
|
||||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||||
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
|
||||||
return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
|
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTagScan, pPhyNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||||
|
@ -498,7 +504,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
||||||
pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
|
pTableScan->intervalUnit = pScanLogicNode->intervalUnit;
|
||||||
pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
|
pTableScan->slidingUnit = pScanLogicNode->slidingUnit;
|
||||||
|
|
||||||
return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
|
static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
|
||||||
|
@ -522,7 +528,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
||||||
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||||
|
|
||||||
return createScanPhysiNodeFinalize(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
static int32_t createStreamScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||||
|
|
|
@ -18,6 +18,13 @@
|
||||||
#include "planInt.h"
|
#include "planInt.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
|
|
||||||
|
static void dumpQueryPlan(SQueryPlan* pPlan) {
|
||||||
|
char* pStr = NULL;
|
||||||
|
nodesNodeToString(pPlan, false, &pStr, NULL);
|
||||||
|
planDebugL("Query Plan: %s", pStr);
|
||||||
|
taosMemoryFree(pStr);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
|
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
|
||||||
SLogicNode* pLogicNode = NULL;
|
SLogicNode* pLogicNode = NULL;
|
||||||
SLogicSubplan* pLogicSubplan = NULL;
|
SLogicSubplan* pLogicSubplan = NULL;
|
||||||
|
@ -36,6 +43,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
|
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
dumpQueryPlan(*pPlan);
|
||||||
|
}
|
||||||
|
|
||||||
nodesDestroyNode(pLogicNode);
|
nodesDestroyNode(pLogicNode);
|
||||||
nodesDestroyNode(pLogicSubplan);
|
nodesDestroyNode(pLogicSubplan);
|
||||||
|
|
|
@ -32,6 +32,12 @@ TEST_F(PlanOptimizeTest, optimizeScanData) {
|
||||||
run("SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1");
|
run("SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(PlanOptimizeTest, ConditionPushDown) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
run("SELECT ts, c1 FROM st1 WHERE tag1 > 4");
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
|
TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
|
|
|
@ -232,45 +232,45 @@ class PlannerTestBaseImpl {
|
||||||
|
|
||||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
|
||||||
if (res_.prepareAst_.empty()) {
|
if (res_.prepareAst_.empty()) {
|
||||||
cout << "syntax tree : " << endl;
|
cout << "+++++++++++++++++++++syntax tree : " << endl;
|
||||||
cout << res_.ast_ << endl;
|
cout << res_.ast_ << endl;
|
||||||
} else {
|
} else {
|
||||||
cout << "prepare syntax tree : " << endl;
|
cout << "+++++++++++++++++++++prepare syntax tree : " << endl;
|
||||||
cout << res_.prepareAst_ << endl;
|
cout << res_.prepareAst_ << endl;
|
||||||
cout << "bound syntax tree : " << endl;
|
cout << "+++++++++++++++++++++bound syntax tree : " << endl;
|
||||||
cout << res_.boundAst_ << endl;
|
cout << res_.boundAst_ << endl;
|
||||||
cout << "syntax tree : " << endl;
|
cout << "+++++++++++++++++++++syntax tree : " << endl;
|
||||||
cout << res_.ast_ << endl;
|
cout << res_.ast_ << endl;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_LOGIC == module) {
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_LOGIC == module) {
|
||||||
cout << "raw logic plan : " << endl;
|
cout << "+++++++++++++++++++++raw logic plan : " << endl;
|
||||||
cout << res_.rawLogicPlan_ << endl;
|
cout << res_.rawLogicPlan_ << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_OPTIMIZED == module) {
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_OPTIMIZED == module) {
|
||||||
cout << "optimized logic plan : " << endl;
|
cout << "+++++++++++++++++++++optimized logic plan : " << endl;
|
||||||
cout << res_.optimizedLogicPlan_ << endl;
|
cout << res_.optimizedLogicPlan_ << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SPLIT == module) {
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SPLIT == module) {
|
||||||
cout << "split logic plan : " << endl;
|
cout << "+++++++++++++++++++++split logic plan : " << endl;
|
||||||
cout << res_.splitLogicPlan_ << endl;
|
cout << res_.splitLogicPlan_ << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SCALED == module) {
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SCALED == module) {
|
||||||
cout << "scaled logic plan : " << endl;
|
cout << "+++++++++++++++++++++scaled logic plan : " << endl;
|
||||||
cout << res_.scaledLogicPlan_ << endl;
|
cout << res_.scaledLogicPlan_ << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PHYSICAL == module) {
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PHYSICAL == module) {
|
||||||
cout << "physical plan : " << endl;
|
cout << "+++++++++++++++++++++physical plan : " << endl;
|
||||||
cout << res_.physiPlan_ << endl;
|
cout << res_.physiPlan_ << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SUBPLAN == module) {
|
if (DUMP_MODULE_ALL == module || DUMP_MODULE_SUBPLAN == module) {
|
||||||
cout << "physical subplan : " << endl;
|
cout << "+++++++++++++++++++++physical subplan : " << endl;
|
||||||
for (const auto& subplan : res_.physiSubplans_) {
|
for (const auto& subplan : res_.physiSubplans_) {
|
||||||
cout << subplan << endl;
|
cout << subplan << endl;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue