[td-13039] merge 3.0.
This commit is contained in:
parent
93e3a2f3f1
commit
dd66affd0a
|
@ -7514,8 +7514,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
|
||||
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
|
||||
int32_t numOfRows = 1;
|
||||
int32_t code =
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str);
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock, pTaskInfo->id.str);
|
||||
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
|
||||
goto _error;
|
||||
|
@ -7735,39 +7734,6 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols) {
|
||||
#if 0
|
||||
SColumnInfo* pCols = taosMemoryCalloc(numOfOutput, sizeof(SColumnInfo));
|
||||
|
||||
int32_t numOfFilter = 0;
|
||||
for(int32_t i = 0; i < numOfOutput; ++i) {
|
||||
if (pExpr[i].base.flist.numOfFilters > 0) {
|
||||
numOfFilter += 1;
|
||||
}
|
||||
|
||||
pCols[i].type = pExpr[i].base.resSchema.type;
|
||||
pCols[i].bytes = pExpr[i].base.resSchema.bytes;
|
||||
pCols[i].colId = pExpr[i].base.resSchema.colId;
|
||||
|
||||
pCols[i].flist.numOfFilters = pExpr[i].base.flist.numOfFilters;
|
||||
if (pCols[i].flist.numOfFilters != 0) {
|
||||
pCols[i].flist.filterInfo = taosMemoryCalloc(pCols[i].flist.numOfFilters, sizeof(SColumnFilterInfo));
|
||||
memcpy(pCols[i].flist.filterInfo, pExpr[i].base.flist.filterInfo, pCols[i].flist.numOfFilters * sizeof(SColumnFilterInfo));
|
||||
} else {
|
||||
// avoid runtime error
|
||||
pCols[i].flist.filterInfo = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
assert(numOfFilter > 0);
|
||||
|
||||
*numOfFilterCols = numOfFilter;
|
||||
return pCols;
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
|
||||
SLimitOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SLimitOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -7786,9 +7752,10 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit
|
|||
pOperator->getNextFn = doLimit;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
|
@ -7809,13 +7776,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pInfo->precision = TSDB_TIME_PRECISION_MILLI;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->interval = *pInterval;
|
||||
|
||||
pInfo->win.skey = INT64_MIN;
|
||||
pInfo->win.ekey = INT64_MAX;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
int32_t code =
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
|
||||
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
if (code != TSDB_CODE_SUCCESS /* || pInfo->pTableQueryInfo == NULL*/) {
|
||||
goto _error;
|
||||
|
@ -8555,6 +8520,23 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
|
|||
return s;
|
||||
}
|
||||
|
||||
static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) {
|
||||
SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
if (pCol == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCol->slotId = slotId;
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->precision = pType->precision;
|
||||
pCol->dataBlockId = blockId;
|
||||
|
||||
return pCol;
|
||||
}
|
||||
|
||||
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
|
||||
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
|
||||
int32_t numOfGroupKeys = 0;
|
||||
|
@ -8586,18 +8568,11 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
|
||||
SDataType* pType = &pColNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
|
||||
|
||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||
pCol->slotId = pColNode->slotId; // TODO refactor
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->precision = pType->precision;
|
||||
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
|
||||
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
|
||||
|
@ -8608,8 +8583,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
|
||||
pExp->pExpr->_function.functionId = pFuncNode->funcId;
|
||||
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
||||
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
|
||||
tListLen(pExp->pExpr->_function.functionName));
|
||||
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName, tListLen(pExp->pExpr->_function.functionName));
|
||||
|
||||
// TODO: value parameter needs to be handled
|
||||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||
|
@ -8620,21 +8594,12 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
||||
if (p1->type == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor
|
||||
SColumnNode* pcn = (SColumnNode*) p1;
|
||||
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
pExp->base.pParam[j].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
SColumn* pCol = pExp->base.pParam[j].pCol;
|
||||
|
||||
pCol->slotId = pcn->slotId;
|
||||
pCol->bytes = pcn->node.resType.bytes;
|
||||
pCol->type = pcn->node.resType.type;
|
||||
pCol->scale = pcn->node.resType.scale;
|
||||
pCol->precision = pcn->node.resType.precision;
|
||||
pCol->dataBlockId = pcn->dataBlockId;
|
||||
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType);
|
||||
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||
SValueNode* pvn = (SValueNode*)p1;
|
||||
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||
}
|
||||
}
|
||||
|
@ -8644,21 +8609,14 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
pExp->base.pParam[0].pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
|
||||
SDataType* pType = &pNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||
pType->precision, pNode->node.aliasName);
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName);
|
||||
|
||||
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
|
||||
|
||||
SColumn* pCol = pExp->base.pParam[0].pCol;
|
||||
pCol->slotId = pTargetNode->slotId; // TODO refactor
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->precision = pType->precision;
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -8692,17 +8650,15 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
|
|||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||
|
||||
SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) {
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
tsdbReaderT pDataReader =
|
||||
doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||
SArray* pColList =
|
||||
extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||
|
||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
|
||||
pScanPhyNode->reverse, pColList, pTaskInfo);
|
||||
|
@ -8745,7 +8701,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
assert(size == 1);
|
||||
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(((SProjectPhysiNode*)pPhyNode)->pProjections, NULL, &num);
|
||||
|
@ -8757,7 +8713,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
int32_t num = 0;
|
||||
|
||||
|
@ -8778,7 +8734,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
|
@ -8791,7 +8747,8 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
.sliding = pIntervalPhyNode->sliding,
|
||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||
.offset = pIntervalPhyNode->offset};
|
||||
.offset = pIntervalPhyNode->offset,
|
||||
.precision = TSDB_TIME_PRECISION_MILLI,};
|
||||
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
|
||||
|
@ -8799,7 +8756,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
assert(size == 1);
|
||||
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||
|
||||
|
@ -8811,7 +8768,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
assert(size == 1);
|
||||
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
|
@ -8827,7 +8784,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChildNode = taosArrayGetP(pPhyNode->pChildren, i);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
SOperatorInfo* op = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
return createMultiTableAggOperatorInfo(op, pPhyNode->pTargets, pTaskInfo, pTableGroupInfo);
|
||||
}
|
||||
}*/
|
||||
|
@ -8972,11 +8929,11 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
|||
for (int32_t i = 0; i < num; ++i) {
|
||||
SSlotDescNode* pNode = (SSlotDescNode*)nodesListGetNode(pOutputNodeList->pSlots, i);
|
||||
SColMatchInfo* info = taosArrayGet(pList, pNode->slotId);
|
||||
// if (pNode->output) {
|
||||
if (pNode->output) {
|
||||
(*numOfOutputCols) += 1;
|
||||
// } else {
|
||||
// info->output = false;
|
||||
// }
|
||||
} else {
|
||||
info->output = false;
|
||||
}
|
||||
}
|
||||
|
||||
return pList;
|
||||
|
@ -9045,7 +9002,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
}
|
||||
|
||||
STableGroupInfo group = {0};
|
||||
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group);
|
||||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group);
|
||||
if (NULL == (*pTaskInfo)->pRoot) {
|
||||
code = terrno;
|
||||
goto _complete;
|
||||
|
|
|
@ -1,13 +0,0 @@
|
|||
#include "tunaryoperator.h"
|
||||
|
||||
|
||||
|
||||
|
||||
// TODO dynamic define these functions
|
||||
//_unary_scalar_fn_t getUnaryScalarOperatorFn(int32_t operator) {
|
||||
// assert(0);
|
||||
//}
|
||||
|
||||
//bool isStringOperatorFn(int32_t op) {
|
||||
// return op == FUNCTION_LENGTH;
|
||||
//}
|
Loading…
Reference in New Issue