cols func

This commit is contained in:
factosea 2024-12-19 10:48:40 +08:00
parent 7964e5e6b7
commit 20f8d17c16
9 changed files with 223 additions and 41 deletions

View File

@ -282,6 +282,8 @@ typedef struct tExprNode {
struct SNode *pRootNode;
} _optrRoot;
};
int32_t bindTupleFuncIdx;
int32_t tupleFuncIdx;
} tExprNode;
struct SScalarParam {

View File

@ -16,6 +16,7 @@
#ifndef _TD_QUERY_NODES_H_
#define _TD_QUERY_NODES_H_
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
@ -61,6 +62,8 @@ typedef struct SExprNode {
bool asParam;
bool asPosition;
int32_t projIdx;
int32_t bindTupleFuncIdx;
int32_t tupleFuncIdx;
} SExprNode;
typedef enum EColumnType {

View File

@ -18,6 +18,8 @@
#include "index.h"
#include "os.h"
#include "query.h"
#include "querynodes.h"
#include "tarray.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
@ -1990,7 +1992,8 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
pExp->pExpr->bindTupleFuncIdx = ((SExprNode*)pNode)->bindTupleFuncIdx;
pExp->pExpr->tupleFuncIdx = ((SExprNode*)pNode)->tupleFuncIdx;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
@ -2071,42 +2074,81 @@ int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo**
return code;
}
static void deleteSubsidiareCtx(void* pData) {
SSubsidiaryResInfo* pCtx = (SSubsidiaryResInfo*)pData;
if (pCtx->pCtx) {
taosMemoryFreeClear(pCtx->pCtx);
}
}
// set the output buffer for the selectivity + tag query
static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
int32_t num = 0;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SqlFunctionCtx* p = NULL;
SqlFunctionCtx** pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
if (pValCtx == NULL) {
return terrno;
SArray* pValCtxArray = NULL;
for (int32_t i = numOfOutput - 1; i > 0; --i) { // select Func is at the end of the list
int32_t funcIdx = pCtx[i].pExpr->pExpr->tupleFuncIdx;
if (funcIdx > 0) {
if (pValCtxArray == NULL) {
// the end of the list is the select function of biggest index
pValCtxArray = taosArrayInit_s(sizeof(SSubsidiaryResInfo*), funcIdx);
if (pValCtxArray == NULL) {
return terrno;
}
}
if (funcIdx > pValCtxArray->size) {
qError("funcIdx:%d is out of range", funcIdx);
taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SSubsidiaryResInfo* pSubsidiary = &pCtx[i].subsidiaries;
pSubsidiary->pCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
if (pSubsidiary->pCtx == NULL) {
taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
return terrno;
}
pSubsidiary->num = 0;
taosArraySet(pValCtxArray, funcIdx - 1, &pSubsidiary);
} else {
break;
}
}
SHashObj* pSelectFuncs = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
QUERY_CHECK_NULL(pSelectFuncs, code, lino, _end, terrno);
SqlFunctionCtx* p = NULL;
SqlFunctionCtx** pValCtx = NULL;
if (pValCtxArray == NULL) {
pValCtx = taosMemoryCalloc(numOfOutput, POINTER_BYTES);
if (pValCtx == NULL) {
QUERY_CHECK_CODE(terrno, lino, _end);
}
}
for (int32_t i = 0; i < numOfOutput; ++i) {
const char* pName = pCtx[i].pExpr->pExpr->_function.functionName;
if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0) ||
(strcmp(pName, "_group_const_value") == 0)) {
pValCtx[num++] = &pCtx[i];
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
void* data = taosHashGet(pSelectFuncs, pName, strlen(pName));
if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) {
p = NULL;
break;
if (pValCtxArray == NULL) {
pValCtx[num++] = &pCtx[i];
} else {
int32_t tempRes = taosHashPut(pSelectFuncs, pName, strlen(pName), &num, sizeof(num));
if (tempRes != TSDB_CODE_SUCCESS && tempRes != TSDB_CODE_DUP_KEY) {
code = tempRes;
QUERY_CHECK_CODE(code, lino, _end);
int32_t bindFuncIndex = pCtx[i].pExpr->pExpr->bindTupleFuncIdx; // start from index 1;
if (bindFuncIndex > 0) { // 0 is default index related to the select function
bindFuncIndex -= 1;
}
p = &pCtx[i];
SSubsidiaryResInfo** pSubsidiary = taosArrayGet(pValCtxArray, bindFuncIndex);
if(pSubsidiary == NULL) {
QUERY_CHECK_CODE(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR, lino, _end);
}
(*pSubsidiary)->pCtx[(*pSubsidiary)->num] = &pCtx[i];
(*pSubsidiary)->num++;
}
} else if (fmIsSelectFunc(pCtx[i].functionId)) {
if (pValCtxArray == NULL) {
p = &pCtx[i];
}
}
}
taosHashCleanup(pSelectFuncs);
if (p != NULL) {
p->subsidiaries.pCtx = pValCtx;
@ -2117,8 +2159,8 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
_end:
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroyP(pValCtxArray, deleteSubsidiareCtx);
taosMemoryFreeClear(pValCtx);
taosHashCleanup(pSelectFuncs);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;

View File

@ -443,6 +443,8 @@ int32_t createFunctionWithSrcFunc(const char* pName, const SFunctionNode* pSrcFu
return code;
}
resetOutputChangedFunc(*ppFunc, pSrcFunc);
(*ppFunc)->node.bindTupleFuncIdx = pSrcFunc->node.bindTupleFuncIdx;
(*ppFunc)->node.tupleFuncIdx = pSrcFunc->node.tupleFuncIdx;
return code;
}

View File

@ -103,6 +103,8 @@ static int32_t exprNodeCopy(const SExprNode* pSrc, SExprNode* pDst) {
COPY_CHAR_ARRAY_FIELD(aliasName);
COPY_CHAR_ARRAY_FIELD(userAlias);
COPY_SCALAR_FIELD(projIdx);
COPY_SCALAR_FIELD(bindTupleFuncIdx);
COPY_SCALAR_FIELD(tupleFuncIdx);
return TSDB_CODE_SUCCESS;
}

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "functionMgt.h"
#include "querynodes.h"
#define COMPARE_SCALAR_FIELD(fldname) \
@ -137,6 +138,11 @@ static bool functionNodeEqual(const SFunctionNode* a, const SFunctionNode* b) {
COMPARE_SCALAR_FIELD(funcId);
COMPARE_STRING_FIELD(functionName);
COMPARE_NODE_LIST_FIELD(pParameterList);
if (a->funcType == FUNCTION_TYPE_SELECT_VALUE) {
if ((a->node.bindTupleFuncIdx != b->node.bindTupleFuncIdx) &&
(a->node.bindTupleFuncIdx != 0 != b->node.bindTupleFuncIdx != 0))
return false;
}
return true;
}

View File

@ -664,11 +664,18 @@ static int32_t msgToDataType(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { EXPR_CODE_RES_TYPE = 1 };
enum { EXPR_CODE_RES_TYPE = 1, EXPR_CODE_BIND_TUPLE_FUNC_IDX, EXPR_CODE_TUPLE_FUNC_IDX };
static int32_t exprNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SExprNode* pNode = (const SExprNode*)pObj;
return tlvEncodeObj(pEncoder, EXPR_CODE_RES_TYPE, dataTypeToMsg, &pNode->resType);
int32_t code = tlvEncodeObj(pEncoder, EXPR_CODE_RES_TYPE, dataTypeToMsg, &pNode->resType);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, EXPR_CODE_BIND_TUPLE_FUNC_IDX, pNode->bindTupleFuncIdx);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, EXPR_CODE_TUPLE_FUNC_IDX, pNode->tupleFuncIdx);
}
return code;
}
static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) {
@ -681,6 +688,12 @@ static int32_t msgToExprNode(STlvDecoder* pDecoder, void* pObj) {
case EXPR_CODE_RES_TYPE:
code = tlvDecodeObjFromTlv(pTlv, msgToDataType, &pNode->resType);
break;
case EXPR_CODE_BIND_TUPLE_FUNC_IDX:
code = tlvDecodeI32(pTlv, &pNode->bindTupleFuncIdx);
break;
case EXPR_CODE_TUPLE_FUNC_IDX:
code = tlvDecodeI32(pTlv, &pNode->tupleFuncIdx);
break;
default:
break;
}

View File

@ -13,8 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "nodes.h"
#include "parInt.h"
#include "parTranslater.h"
#include <stdint.h>
#include "query.h"
#include "querynodes.h"
#include "tdatablock.h"
#include "catalog.h"
@ -3534,6 +3538,8 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode
tstrncpy(pFunc->functionName, "_select_value", TSDB_FUNC_NAME_LEN);
tstrncpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName, TSDB_COL_NAME_LEN);
tstrncpy(pFunc->node.userAlias, ((SExprNode*)*pNode)->userAlias, TSDB_COL_NAME_LEN);
pFunc->node.bindTupleFuncIdx = ((SExprNode*)*pNode)->bindTupleFuncIdx;
pFunc->node.tupleFuncIdx = ((SExprNode*)*pNode)->tupleFuncIdx;
pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode);
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = getFuncInfo(pCxt, pFunc);
@ -3934,7 +3940,8 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) {
((QUERY_NODE_COLUMN == nodeType(*pNode) && ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG))) {
return rewriteExprToSelectTagFunc(pCxt->pTranslateCxt, pNode);
}
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
if ((isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) &&
((!nodesIsExprNode(*pNode) || ((SExprNode*)*pNode)->bindTupleFuncIdx == 0))) {
pCxt->existCol = true;
}
return DEAL_RES_CONTINUE;
@ -7362,6 +7369,19 @@ static EDealRes isMultiColsFuncNode(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
typedef struct SBindTupleFuncCxt {
int32_t bindTupleFuncIdx;
} SBindTupleFuncCxt;
static EDealRes pushDownBindSelectFunc(SNode** pNode, void* pContext) {
SBindTupleFuncCxt* pCxt = pContext;
if (nodesIsExprNode(*pNode)) {
((SExprNode*)*pNode)->bindTupleFuncIdx = pCxt->bindTupleFuncIdx;
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
}
return DEAL_RES_CONTINUE;
}
static bool hasMultiColsFuncInList(SNodeList* nodeList) {
SHasMultiColsFuncCxt pCxt = {false};
@ -7383,13 +7403,9 @@ static int32_t hasInvalidColsFunction(STranslateContext* pCxt, SNodeList* nodeLi
FOREACH(pTmpNode, nodeList) {
if (QUERY_NODE_FUNCTION == nodeType(pTmpNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
if (pFunc->funcType == FUNCTION_TYPE_COLS) {
// cols function at here is valid.
} else {
if (hasMultiColsFuncInList(pFunc->pParameterList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid cols function in function %s", pFunc->functionName);
}
if (hasMultiColsFuncInList(pFunc->pParameterList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid cols function in function %s", pFunc->functionName);
}
} else {
if (hasMultiColsFunc(&pTmpNode)) {
@ -7436,43 +7452,80 @@ static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_COLS_FUNCTION,
"Invalid using alias for cols function");
}
if (isMultiColsFunc(pFunc)) {
needRewrite = true;
break;
}
needRewrite = true;
}
}
SNodeList* pNewNodeList = NULL;
SNodeList* tmpFuncNodeList = NULL;
if (needRewrite) {
code = nodesMakeList(&pNewNodeList);
if (NULL == pNewNodeList) {
return code;
}
code = nodesMakeList(&tmpFuncNodeList);
if (NULL == tmpFuncNodeList) {
return code;
}
SNode* pNewNode = NULL;
int32_t nums = 0;
int32_t selectFuncNum = 0;
FOREACH(pTmpNode, *nodeList) {
if (QUERY_NODE_FUNCTION == nodeType(pTmpNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
if (isMultiColsFunc(pFunc)) {
if (strcasecmp(pFunc->functionName, "cols") == 0) {
++selectFuncNum;
SNode* pSelectFunc = nodesListGetNode(pFunc->pParameterList, 0);
if(nodeType(pSelectFunc) != QUERY_NODE_FUNCTION) {
code = TSDB_CODE_PAR_INVALID_COLS_FUNCTION;
parserError("Invalid cols function, the first parameter must be a select function");
goto _end;
}
nodesListMakeStrictAppend(&tmpFuncNodeList, pSelectFunc);
// start from index 1, because the first parameter is select function which needn't to output.
SFunctionNode* pSelectFunc = (SFunctionNode*)nodesListGetNode(pFunc->pParameterList, 0);
for (int i = 1; i < pFunc->pParameterList->length; ++i) {
SNode* pExpr = nodesListGetNode(pFunc->pParameterList, i);
SNode* pNewFunc = NULL;
code = createMultiResColsFunc(pFunc, pSelectFunc, (SExprNode*)pExpr, &pNewFunc);
code = nodesCloneNode(pExpr, &pNewNode);
if (nodesIsExprNode(pNewNode)) {
SBindTupleFuncCxt pCxt = {selectFuncNum};
nodesRewriteExpr(&pNewNode, pushDownBindSelectFunc, &pCxt);
} else {
code = TSDB_CODE_PAR_INVALID_COLS_FUNCTION;
parserError("Invalid cols function, the first parameter must be a select function");
goto _end;
}
if (TSDB_CODE_SUCCESS != code) goto _end;
code = nodesListMakeStrictAppend(&pNewNodeList, pNewFunc);
code = nodesListMakeStrictAppend(&pNewNodeList, pNewNode);
if (TSDB_CODE_SUCCESS != code) goto _end;
}
continue;
}
}
SNode* pNewNode = NULL;
code = nodesCloneNode(pTmpNode, &pNewNode);
if (TSDB_CODE_SUCCESS != code) goto _end;
code = nodesListMakeStrictAppend(&pNewNodeList, pNewNode);
if (TSDB_CODE_SUCCESS != code) goto _end;
}
SNode* pNode = NULL;
int32_t pNewSelectFuncIds = 0;
FOREACH(pNode, tmpFuncNodeList) {
++pNewSelectFuncIds;
code = nodesCloneNode(pNode, &pNewNode);
if (TSDB_CODE_SUCCESS != code) goto _end;
if (nodesIsExprNode(pNewNode)) {
SExprNode* pExprNode = (SExprNode*)pNewNode;
pExprNode->tupleFuncIdx = pNewSelectFuncIds;
} else {
code = TSDB_CODE_PAR_INVALID_COLS_FUNCTION;
parserError("Invalid cols function, the first parameter must be a select function");
goto _end;
}
code = nodesListMakeStrictAppend(&pNewNodeList, pNewNode);
if (TSDB_CODE_SUCCESS != code) goto _end;
}
nodesDestroyList(*nodeList);
nodesDestroyList(tmpFuncNodeList);
*nodeList = pNewNodeList;
}
return TSDB_CODE_SUCCESS;
@ -7480,6 +7533,7 @@ static int32_t rewriteColsFunction(STranslateContext* pCxt, SNodeList** nodeList
_end:
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pNewNodeList);
nodesDestroyList(tmpFuncNodeList);
}
return code;
}

View File

@ -71,10 +71,64 @@ class TDTestCase:
tdSql.query(f'select cols(last(c0), ts, c1), cols(first(c0), ts, c1), count(1) from {self.dbname}.meters')
tdSql.query(f'select cols(last(c0), ts as t1, c1 as c11), cols(first(c0), ts as c2, c1 c21), count(1) from {self.dbname}.meters')
def funcNestTest(self):
tdSql.execute('create database db;')
tdSql.execute('use db')
tdSql.execute(f'drop table if exists db.d1')
tdSql.execute('create table db.d1 (ts timestamp, c0 int, c1 float, c2 nchar(30), c3 bool)')
tdSql.execute('insert into db.d1 values(1734574929000, 1, 1.1, "a", true)')
tdSql.execute('insert into db.d1 values(1734574930000, 2, 2.2, "bbbbbbbbb", false)')
tdSql.query(f'select cols(last(c0), ts, c2), cols(first(c0), ts, c2) from db.d1')
tdSql.checkRows(1)
#tdSql.checkCols(4)
tdSql.checkData(0, 0, 1734574930000)
tdSql.checkData(0, 1, 2.2)
tdSql.checkData(0, 2, 1734574929000)
tdSql.checkData(0, 3, 1.1)
tdSql.query(f'select cols(last(c0), ts, c1, c2, c3), cols(first(c0), ts, c1, c2, c3) from db.d1')
tdSql.checkRows(1)
#tdSql.checkCols(6)
tdSql.checkData(0, 0, 1734574930000)
tdSql.checkData(0, 1, 2.2)
tdSql.checkData(0, 2, 'bbbbbbbbb')
tdSql.checkData(0, 3, False)
tdSql.checkData(0, 4, 1734574929000)
tdSql.checkData(0, 5, 1.1)
tdSql.checkData(0, 6, 'a')
tdSql.checkData(0, 7, True)
tdSql.query(f'select cols(last(ts), c1), cols(first(ts), c1) from db.d1')
tdSql.checkRows(1)
#tdSql.checkCols(6)
tdSql.checkData(0, 0, 2)
tdSql.checkData(0, 1, 1)
tdSql.query(f'select cols(first(ts), c1), cols(first(ts), c1) from db.d1')
tdSql.checkRows(1)
#tdSql.checkCols(6)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 1)
tdSql.query(f'select cols(first(c0), ts, length(c2)), cols(last(c0), ts, length(c2)) from db.d1')
tdSql.checkRows(1)
#tdSql.checkCols(6)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 1)
tdSql.query(f'select cols(first(c0), ts, length(c2)), cols(last(c0), ts, length(c2)) from db.d1')
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1734574929000)
tdSql.checkData(0, 1, 1)
tdSql.checkData(0, 2, 1734574929000)
tdSql.checkData(0, 3, 1)
def parse_test(self):
tdLog.info("parse test")
#** error sql **#
tdSql.error(f'select cols(ts) from {self.dbname}.meters group by tbname')
tdSql.error(f'select cols(ts) from {self.dbname}.meters')
@ -101,8 +155,12 @@ class TDTestCase:
tdSql.error(f'select cols(last(ts)+1, ts) from {self.dbname}.meters')
tdSql.error(f'select cols(last(ts)+10, c1+10) from {self.dbname}.meters group by tbname')
tdSql.error(f'select cols(cols(last(ts), c0), c0) as cc from {self.dbname}.meters')
def run(self):
self.funcNestTest()
return
self.create_test_data()
self.parse_test()
self.one_cols_1output_test()