This commit is contained in:
factosea 2024-12-26 11:13:51 +08:00
parent c371fb200d
commit 983431bab6
7 changed files with 82 additions and 222 deletions

View File

@ -149,7 +149,6 @@ int32_t sampleScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
int32_t tailScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t uniqueScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t colsScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
#ifdef __cplusplus
}

View File

@ -251,11 +251,6 @@ int32_t blockDBUsageSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo
int32_t blockDBUsageFunction(SqlFunctionCtx* pCtx);
int32_t blockDBUsageFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getColsFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t colsFunction(SqlFunctionCtx* pCtx);
int32_t colsPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t colsCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
#ifdef __cplusplus
}
#endif

View File

@ -5653,43 +5653,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.outputParaInfo = {.validDataType = FUNC_PARAM_SUPPORT_VARCHAR_TYPE}},
.translateFunc = translateOutVarchar,
},
{
.name = "cols",
.type = FUNCTION_TYPE_COLS,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_SELECT_COLS_FUNC,
.translateFunc = translateCols,
.dynDataRequiredFunc = NULL,
.getEnvFunc = getColsFuncEnv,
.initFunc = functionSetup,
.processFunc = colsFunction,
.sprocessFunc = colsScalarFunction,
.pPartialFunc = "_cols_partial",
.pMergeFunc = "_cols_merge",
.finalizeFunc = NULL
},
{
.name = "_cols_partial",
.type = FUNCTION_TYPE_COLS_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_SELECT_COLS_FUNC,
.translateFunc = translateCols,
.dynDataRequiredFunc = NULL,
.getEnvFunc = getColsFuncEnv,
.initFunc = functionSetup,
.processFunc = colsFunction,
.finalizeFunc = colsPartialFinalize,
.combineFunc = colsCombine,
},
{
.name = "_cols_merge",
.type = FUNCTION_TYPE_COLS_MERGE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_SELECT_COLS_FUNC,
.translateFunc = translateOutFirstIn,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunctionMerge,
.finalizeFunc = colsPartialFinalize,
.combineFunc = colsCombine,
},
};
// clang-format on

View File

@ -7194,77 +7194,3 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
bool getColsFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes;
return true;
}
int32_t colsFunction(SqlFunctionCtx* pCtx) {
int32_t code = TSDB_CODE_SUCCESS;
SFunctionNode* pSelectFunc = ((SFunctionNode*)(pCtx[0].pExpr->pExpr->_function.pFunctNode->pParameterList->pHead->pNode));
SFuncExecFuncs selectExecFuncs;
code = fmGetFuncExecFuncs(pCtx->functionId, &selectExecFuncs);
if(TSDB_CODE_SUCCESS != code) {
return code;
}
if(selectExecFuncs.process == NULL) {
return TSDB_CODE_SUCCESS;
}
return selectExecFuncs.process(pCtx);
}
int32_t colsPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes, pRes->pkBytes);
// todo check for failure
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
if (NULL == res) {
return terrno;
}
(void)memcpy(varDataVal(res), pRes, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
if (NULL == pCol) {
taosMemoryFree(res);
return TSDB_CODE_OUT_OF_RANGE;
}
if (pEntryInfo->numOfRes == 0) {
colDataSetNULL(pCol, pBlock->info.rows);
code = setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, pBlock->info.rows);
} else {
code = colDataSetVal(pCol, pBlock->info.rows, res, false);
if (TSDB_CODE_SUCCESS != code) {
taosMemoryFree(res);
return code;
}
code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
}
taosMemoryFree(res);
return code;
}
int32_t colsCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SFirstLastRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t bytes = pDBuf->bytes;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SFirstLastRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
pDBuf->hasResult = firstLastTransferInfoImpl(pSBuf, pDBuf, false);
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
pDResInfo->isNullRes &= pSResInfo->isNullRes;
return TSDB_CODE_SUCCESS;
}

View File

@ -19,6 +19,7 @@
#include <stdint.h>
#include "query.h"
#include "querynodes.h"
#include "taoserror.h"
#include "tdatablock.h"
#include "catalog.h"
@ -5195,54 +5196,6 @@ static int32_t createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr, SNo
return code;
}
static int32_t createMultiResColsFunc(SFunctionNode* pSrcFunc, SFunctionNode* pSelectFunc, SExprNode* pExpr, SNode** ppNodeOut) {
SFunctionNode* pFunc = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_FUNCTION, (SNode**)&pFunc);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = nodesMakeList(&pFunc->pParameterList);
SNode* pClonedFuncNode = NULL;
if (TSDB_CODE_SUCCESS != (code = nodesCloneNode((SNode*)pSelectFunc, &pClonedFuncNode)) ||
TSDB_CODE_SUCCESS != (code = nodesListStrictAppend(pFunc->pParameterList, pClonedFuncNode))) {
nodesDestroyNode((SNode*)pFunc);
return code;
}
SNode* pClonedExprNode = NULL;
if (TSDB_CODE_SUCCESS != (code = nodesCloneNode((SNode*)pExpr, &pClonedExprNode)) ||
TSDB_CODE_SUCCESS != (code = nodesListStrictAppend(pFunc->pParameterList, pClonedExprNode))) {
nodesDestroyNode((SNode*)pFunc);
return code;
}
pFunc->node.resType = pExpr->resType;
pFunc->funcId = pSrcFunc->funcId;
pFunc->funcType = pSrcFunc->funcType;
strcpy(pFunc->functionName, pSrcFunc->functionName);
char buf[TSDB_FUNC_NAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN + TSDB_NAME_DELIMITER_LEN + 3] = {0};
int32_t len = 0;
if(pExpr->asAlias) {
strncpy(pFunc->node.aliasName, pExpr->aliasName, TSDB_COL_NAME_LEN - 1);
strncpy(pFunc->node.userAlias, pExpr->userAlias, TSDB_COL_NAME_LEN - 1);
pFunc->node.asAlias = true;
} else if (QUERY_NODE_COLUMN == nodeType(pExpr)) {
SColumnNode* pCol = (SColumnNode*)pExpr;
len = tsnprintf(buf, sizeof(buf) - 1, "%s.%s", pCol->tableAlias, pCol->colName);
(void)taosHashBinary(buf, len);
strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1);
len = tsnprintf(buf, sizeof(buf) - 1, "%s", pCol->colName);
strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1);
} else {
len = tsnprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pExpr->aliasName);
(void)taosHashBinary(buf, len);
strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1);
len = tsnprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pExpr->userAlias);
strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1);
}
*ppNodeOut = (SNode*)pFunc;
return code;
}
static int32_t createTableAllCols(STranslateContext* pCxt, SColumnNode* pCol, bool igTags, SNodeList** pOutput) {
STableNode* pTable = NULL;
int32_t code = findTable(pCxt, pCol->tableAlias, &pTable);
@ -7345,10 +7298,17 @@ static int32_t translateSelectWithoutFrom(STranslateContext* pCxt, SSelectStmt*
pCxt->dual = true;
return translateExprList(pCxt, pSelect->pProjectionList);
}
typedef struct SCheckColsFuncCxt {
bool hasColsFunc;
bool hasMultiColsFunc;
} SCheckColsFuncCxt;
typedef struct SHasMultiColsFuncCxt {
bool hasMultiColsFunc;
} SHasMultiColsFuncCxt;
static bool isColsFuncByName(SFunctionNode* pFunc) {
if (strcasecmp(pFunc->functionName, "cols") != 0) {
return false;
}
return true;
}
static bool isMultiColsFunc(SFunctionNode* pFunc) {
if (strcasecmp(pFunc->functionName, "cols") != 0) {
@ -7358,12 +7318,15 @@ static bool isMultiColsFunc(SFunctionNode* pFunc) {
}
static EDealRes isMultiColsFuncNode(SNode** pNode, void* pContext) {
SHasMultiColsFuncCxt* pCxt = pContext;
SCheckColsFuncCxt* pCxt = pContext;
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (isMultiColsFunc(pFunc)) {
pCxt->hasMultiColsFunc = true;
return DEAL_RES_END;
if (isColsFuncByName(pFunc)) {
pCxt->hasColsFunc = true;
if (pFunc->pParameterList->length > 2) {
pCxt->hasMultiColsFunc = true;
return DEAL_RES_END;
}
}
}
return DEAL_RES_CONTINUE;
@ -7384,45 +7347,17 @@ static EDealRes pushDownBindSelectFunc(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static bool hasMultiColsFuncInList(SNodeList* nodeList) {
SHasMultiColsFuncCxt pCxt = {false};
nodesRewriteExprs(nodeList, isMultiColsFuncNode, &pCxt);
return pCxt.hasMultiColsFunc;
static void checkColsFuncInList(SNodeList* nodeList, SCheckColsFuncCxt* pCheckouColsFuncCxt) {
nodesRewriteExprs(nodeList, isMultiColsFuncNode, pCheckouColsFuncCxt);
return;
}
static bool hasMultiColsFunc(SNode** pNode) {
SHasMultiColsFuncCxt pCxt = {false};
nodesRewriteExpr(pNode, isMultiColsFuncNode, &pCxt);
return pCxt.hasMultiColsFunc;
}
static int32_t hasInvalidColsFunction(STranslateContext* pCxt, SNodeList* nodeList) {
SNode* pTmpNode = NULL;
FOREACH(pTmpNode, nodeList) {
if (QUERY_NODE_FUNCTION == nodeType(pTmpNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
if (hasMultiColsFuncInList(pFunc->pParameterList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid cols function in function %s", pFunc->functionName);
}
} else {
if (hasMultiColsFunc(&pTmpNode)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid cols function, can't be used at here");
}
}
}
return TSDB_CODE_SUCCESS;
static void checkColsFunc(SNode** pNode, SCheckColsFuncCxt* pCheckouColsFuncCxt) {
nodesRewriteExpr(pNode, isMultiColsFuncNode, pCheckouColsFuncCxt);
return;
}
static bool invalidColsAlias(SFunctionNode* pFunc) {
if (strcasecmp(pFunc->functionName, "cols") != 0) {
return false;
}
if (pFunc->node.asAlias) {
if (pFunc->pParameterList->length > 2) {
return true;
@ -7436,10 +7371,40 @@ static bool invalidColsAlias(SFunctionNode* pFunc) {
}
}
}
return false;
}
static int32_t hasInvalidColsFunction(STranslateContext* pCxt, SNodeList* nodeList,
SCheckColsFuncCxt* pCheckouColsFuncCxt) {
SNode* pTmpNode = NULL;
FOREACH(pTmpNode, nodeList) {
if (QUERY_NODE_FUNCTION == nodeType(pTmpNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
if (isColsFuncByName(pFunc)) {
pCheckouColsFuncCxt->hasColsFunc = true;
if (invalidColsAlias(pFunc)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid using alias for cols function");
}
}
checkColsFuncInList(pFunc->pParameterList, pCheckouColsFuncCxt);
if (pCheckouColsFuncCxt->hasMultiColsFunc) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid cols function in function %s", pFunc->functionName);
}
} else {
checkColsFunc(&pTmpNode, pCheckouColsFuncCxt);
if (pCheckouColsFuncCxt->hasMultiColsFunc) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid cols function, can't be used at here");
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t getSelectFuncIndex(SNodeList* FuncNodeList, SNode* pSelectFunc) {
SNode* pNode = NULL;
int32_t selectFuncIndex = 0;
@ -7453,21 +7418,14 @@ static int32_t getSelectFuncIndex(SNodeList* FuncNodeList, SNode* pSelectFunc) {
}
static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList) {
int32_t code = hasInvalidColsFunction(pCxt, *nodeList);
SCheckColsFuncCxt pCheckouColsFuncCxt = {false, false};
int32_t code = hasInvalidColsFunction(pCxt, *nodeList, &pCheckouColsFuncCxt);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
bool needRewrite = false;
SNode* pTmpNode = NULL;
FOREACH(pTmpNode, *nodeList) {
if (QUERY_NODE_FUNCTION == nodeType(pTmpNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
if(invalidColsAlias(pFunc)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid using alias for cols function");
}
needRewrite = true;
}
bool needRewrite = false;
if (pCheckouColsFuncCxt.hasColsFunc) {
needRewrite = true;
}
SNodeList* pNewNodeList = NULL;
@ -7484,6 +7442,7 @@ static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList
SNode* pNewNode = NULL;
int32_t nums = 0;
int32_t selectFuncCount = 0;
SNode* pTmpNode = NULL;
FOREACH(pTmpNode, *nodeList) {
if (QUERY_NODE_FUNCTION == nodeType(pTmpNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;

View File

@ -4589,6 +4589,3 @@ int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
return selectScalarFunction(pInput, inputNum, pOutput);
}
int32_t colsScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return selectScalarFunction(pInput, inputNum, pOutput);
}

View File

@ -71,6 +71,25 @@ class TDTestCase:
tdSql.query(f'select cols(last(c0), ts, c1), cols(first(c0), ts, c1), count(1) from {self.dbname}.meters')
tdSql.query(f'select cols(last(c0), ts as t1, c1 as c11), cols(first(c0), ts as c2, c1 c21), count(1) from {self.dbname}.meters')
def funcSupperTableTest(self):
tdSql.execute('create database if not exists db;')
tdSql.execute('use db')
tdSql.execute(f'drop table if exists db.st')
tdSql.execute('create table db.st (ts timestamp, c0 int, c1 float, c2 nchar(30), c3 bool) tags (t1 nchar(30))')
tdSql.execute('create table db.st_1 using db.st tags("st1")')
tdSql.execute('create table db.st_2 using db.st tags("st1")')
tdSql.execute('insert into db.st_1 values(1734574929000, 1, 1, "a1", true)')
tdSql.execute('insert into db.st_1 values(1734574929001, 2, 2, "bbbbbbbbb1", false)')
tdSql.execute('insert into db.st_1 values(1734574929002, 3, 3, "a2", true)')
tdSql.execute('insert into db.st_1 values(1734574929003, 4, 4, "bbbbbbbbb2", false)')
tdSql.query(f'select cols(last(c0), ts, c1, c2, c3), cols(first(c0), ts, c1, c2, c3) from db.st')
tdSql.checkRows(1)
tdSql.checkData(0, 1, 'bbbbbbbbb')
#tdSql.execute(f'drop table if exists db.st')
def funcNestTest(self):
tdSql.execute('create database db;')
@ -196,10 +215,12 @@ class TDTestCase:
tdSql.error(f'select cols(last(ts)+10, c1+10) from {self.dbname}.meters group by tbname')
tdSql.error(f'select cols(cols(last(ts), c0), c0) as cc from {self.dbname}.meters')
tdSql.error(f'select cols(last(ts), cols(last(ts), c0), c0) as cc from {self.dbname}.meters')
def run(self):
self.funcNestTest()
self.funcSupperTableTest()
return
self.create_test_data()
self.parse_test()