fix(query): fix the bug that special pseudo column tbname, along with other normal columns in select clause, cause taosd crash.
This commit is contained in:
parent
7a4df795f4
commit
a41f6a3479
|
@ -126,7 +126,7 @@ enum {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
MAIN_SCAN = 0x0u,
|
MAIN_SCAN = 0x0u,
|
||||||
REVERSE_SCAN = 0x1u,
|
REVERSE_SCAN = 0x1u, // todo remove it
|
||||||
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
|
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
|
||||||
MERGE_STAGE = 0x20u,
|
MERGE_STAGE = 0x20u,
|
||||||
};
|
};
|
||||||
|
@ -222,12 +222,12 @@ enum {
|
||||||
typedef struct tExprNode {
|
typedef struct tExprNode {
|
||||||
int32_t nodeType;
|
int32_t nodeType;
|
||||||
union {
|
union {
|
||||||
struct {
|
// struct {
|
||||||
int32_t optr; // binary operator
|
// int32_t optr; // binary operator
|
||||||
void *info; // support filter operation on this expression only available for leaf node
|
// void *info; // support filter operation on this expression only available for leaf node
|
||||||
struct tExprNode *pLeft; // left child pointer
|
// struct tExprNode *pLeft; // left child pointer
|
||||||
struct tExprNode *pRight; // right child pointer
|
// struct tExprNode *pRight; // right child pointer
|
||||||
} _node;
|
// } _node;
|
||||||
|
|
||||||
SSchema *pSchema;// column node
|
SSchema *pSchema;// column node
|
||||||
struct SVariant *pVal; // value node
|
struct SVariant *pVal; // value node
|
||||||
|
@ -237,12 +237,6 @@ typedef struct tExprNode {
|
||||||
int32_t functionId;
|
int32_t functionId;
|
||||||
int32_t num;
|
int32_t num;
|
||||||
struct SFunctionNode *pFunctNode;
|
struct SFunctionNode *pFunctNode;
|
||||||
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
|
|
||||||
// calculation instead.
|
|
||||||
// E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes.
|
|
||||||
// The concat function, concat(col1, col2), is a binary scalar
|
|
||||||
// operator and is kept in the attribute of _node.
|
|
||||||
struct tExprNode **pChild;
|
|
||||||
} _function;
|
} _function;
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
|
@ -273,6 +267,7 @@ typedef struct SAggFunctionInfo {
|
||||||
struct SScalarParam {
|
struct SScalarParam {
|
||||||
SColumnInfoData *columnData;
|
SColumnInfoData *columnData;
|
||||||
SHashObj *pHashFilter;
|
SHashObj *pHashFilter;
|
||||||
|
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -281,10 +276,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
||||||
|
|
||||||
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
|
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
|
||||||
|
|
||||||
tExprNode* exprTreeFromBinary(const void* data, size_t size);
|
|
||||||
|
|
||||||
tExprNode* exprdup(tExprNode* pTree);
|
|
||||||
|
|
||||||
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
|
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
|
||||||
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
|
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
|
||||||
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
|
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
|
||||||
|
|
|
@ -333,6 +333,8 @@ typedef struct SScanInfo {
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
void* dataReader;
|
void* dataReader;
|
||||||
|
SReadHandle readHandle;
|
||||||
|
|
||||||
SFileBlockLoadRecorder readRecorder;
|
SFileBlockLoadRecorder readRecorder;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
|
@ -348,6 +350,11 @@ typedef struct STableScanInfo {
|
||||||
SArray* pColMatchInfo;
|
SArray* pColMatchInfo;
|
||||||
int32_t numOfOutput;
|
int32_t numOfOutput;
|
||||||
|
|
||||||
|
SExprInfo* pPseudoExpr;
|
||||||
|
int32_t numOfPseudoExpr;
|
||||||
|
SqlFunctionCtx* pPseudoCtx;
|
||||||
|
// int32_t* rowCellInfoOffset;
|
||||||
|
|
||||||
SQueryTableDataCond cond;
|
SQueryTableDataCond cond;
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
|
@ -628,7 +635,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||||
SArray* pColList);
|
SArray* pColList);
|
||||||
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
||||||
int32_t getTableScanOrder(SOperatorInfo* pOperator);
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
||||||
|
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||||
|
@ -644,12 +651,17 @@ SSDataBlock* loadNextDataBlock(void* param);
|
||||||
|
|
||||||
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||||
|
|
||||||
|
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
||||||
|
int32_t type);
|
||||||
|
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
|
||||||
|
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
|
||||||
|
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||||
|
|
||||||
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
|
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
|
||||||
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
||||||
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
|
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo,
|
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
|
||||||
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
|
@ -704,7 +716,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
|
||||||
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
||||||
int32_t numOfOutput, SArray* pPseudoList);
|
int32_t numOfOutput, SArray* pPseudoList);
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
|
||||||
|
|
||||||
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
||||||
|
|
|
@ -649,7 +649,7 @@ static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* p
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
||||||
bool createDummyCol);
|
int32_t scanFlag, bool createDummyCol);
|
||||||
|
|
||||||
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
||||||
int32_t order) {
|
int32_t order) {
|
||||||
|
@ -660,12 +660,12 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
||||||
bool createDummyCol) {
|
bool createDummyCol) {
|
||||||
if (pBlock->pBlockAgg != NULL) {
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
||||||
} else {
|
} else {
|
||||||
doSetInputDataBlock(pOperator, pCtx, pBlock, order, createDummyCol);
|
doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -712,14 +712,14 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
||||||
bool createDummyCol) {
|
int32_t scanFlag, bool createDummyCol) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
||||||
pCtx[i].order = order;
|
pCtx[i].order = order;
|
||||||
pCtx[i].size = pBlock->info.rows;
|
pCtx[i].size = pBlock->info.rows;
|
||||||
pCtx[i].pSrcBlock = pBlock;
|
pCtx[i].pSrcBlock = pBlock;
|
||||||
pCtx[i].currentStage = MAIN_SCAN;
|
pCtx[i].currentStage = scanFlag;
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx[i].input;
|
SInputColumnInfoData* pInput = &pCtx[i].input;
|
||||||
pInput->uid = pBlock->info.uid;
|
pInput->uid = pBlock->info.uid;
|
||||||
|
@ -3490,7 +3490,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||||
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
||||||
// pOperator->pRuntimeEnv, true);
|
// pOperator->pRuntimeEnv, true);
|
||||||
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
|
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
|
||||||
|
@ -3655,17 +3655,19 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getTableScanOrder(SOperatorInfo* pOperator) {
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
|
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
|
||||||
return TSDB_ORDER_ASC;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
} else {
|
} else {
|
||||||
return getTableScanOrder(pOperator->pDownstream[0]);
|
return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
return pTableScanInfo->cond.order;
|
*order = pTableScanInfo->cond.order;
|
||||||
|
*scanFlag = pTableScanInfo->scanFlag;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a blocking operator
|
// this is a blocking operator
|
||||||
|
@ -3681,6 +3683,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
|
int32_t scanFlag = MAIN_SCAN;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
@ -3693,11 +3698,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
int32_t order = getTableScanOrder(pOperator);
|
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||||
if (pAggInfo->pScalarExprInfo != NULL) {
|
if (pAggInfo->pScalarExprInfo != NULL) {
|
||||||
int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
|
code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
|
||||||
pAggInfo->numOfScalarExpr, NULL);
|
pAggInfo->numOfScalarExpr, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
@ -3707,7 +3715,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true);
|
||||||
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
||||||
|
|
||||||
#if 0 // test for encode/decode result info
|
#if 0 // test for encode/decode result info
|
||||||
|
@ -3988,6 +3996,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
int32_t order = 0;
|
||||||
|
int32_t scanFlag = 0;
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -4019,15 +4030,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
int32_t order = getTableScanOrder(pOperator->pDownstream[0]);
|
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo);
|
||||||
pProjectInfo->pPseudoColInfo);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
longjmp(pTaskInfo->env, code);
|
||||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t status = handleLimitOffset(pOperator, pBlock);
|
int32_t status = handleLimitOffset(pOperator, pBlock);
|
||||||
|
@ -4626,8 +4636,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
|
|
||||||
pExp->pExpr->_function.functionId = pFuncNode->funcId;
|
pExp->pExpr->_function.functionId = pFuncNode->funcId;
|
||||||
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
||||||
|
|
||||||
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
|
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
|
||||||
tListLen(pExp->pExpr->_function.functionName));
|
tListLen(pExp->pExpr->_function.functionName));
|
||||||
|
#if 1
|
||||||
|
// todo refactor: add the parameter for tbname function
|
||||||
|
if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) {
|
||||||
|
pFuncNode->pParameterList = nodesMakeList();
|
||||||
|
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
||||||
|
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
if (NULL == res) { // todo handle error
|
||||||
|
} else {
|
||||||
|
res->node.resType = (SDataType) {.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
|
nodesListAppend(pFuncNode->pParameterList, res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||||
|
|
||||||
|
@ -4688,58 +4712,29 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
|
||||||
uint64_t queryId, uint64_t taskId);
|
uint64_t queryId, uint64_t taskId);
|
||||||
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
|
||||||
int32_t type);
|
|
||||||
|
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
|
||||||
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
||||||
|
|
||||||
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
|
|
||||||
SInterval interval = {
|
|
||||||
.interval = pTableScanNode->interval,
|
|
||||||
.sliding = pTableScanNode->sliding,
|
|
||||||
.intervalUnit = pTableScanNode->intervalUnit,
|
|
||||||
.slidingUnit = pTableScanNode->slidingUnit,
|
|
||||||
.offset = pTableScanNode->offset,
|
|
||||||
};
|
|
||||||
|
|
||||||
return interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||||
int32_t type = nodeType(pPhyNode);
|
int32_t type = nodeType(pPhyNode);
|
||||||
|
|
||||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
|
||||||
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||||
|
|
||||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
|
||||||
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SInterval interval = extractIntervalInfo(pTableScanNode);
|
|
||||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(
|
|
||||||
pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, pResBlock,
|
|
||||||
pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||||
|
@ -4929,7 +4924,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return pOptr;
|
return pOptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
|
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
|
||||||
pCond->loadExternalRows = false;
|
pCond->loadExternalRows = false;
|
||||||
|
|
||||||
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
|
|
|
@ -287,7 +287,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
if (pInfo->pScalarExprInfo != NULL) {
|
if (pInfo->pScalarExprInfo != NULL) {
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <libs/function/function.h>
|
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
@ -284,6 +283,27 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// currently only the tbname pseudo column
|
||||||
|
if (pTableScanInfo->numOfPseudoExpr > 0) {
|
||||||
|
int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
|
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
|
||||||
|
|
||||||
|
struct SScalarFuncExecFuncs fpSet;
|
||||||
|
fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet);
|
||||||
|
|
||||||
|
SColumnInfoData infoData = {0};
|
||||||
|
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
infoData.info.bytes = sizeof(uint64_t);
|
||||||
|
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
||||||
|
|
||||||
|
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
|
||||||
|
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
|
||||||
|
|
||||||
|
SScalarParam param = {.columnData = pColInfoData};
|
||||||
|
fpSet.process(&srcParam, 1, ¶m);
|
||||||
|
}
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -314,8 +334,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
|
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
|
||||||
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
|
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
|
||||||
"-%" PRId64,
|
"-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
|
||||||
|
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -359,10 +378,19 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput,
|
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
|
||||||
int32_t dataLoadFlag, const uint8_t* scanInfo, SArray* pColMatchInfo,
|
SInterval interval = {
|
||||||
SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval,
|
.interval = pTableScanNode->interval,
|
||||||
double sampleRatio, SExecTaskInfo* pTaskInfo) {
|
.sliding = pTableScanNode->sliding,
|
||||||
|
.intervalUnit = pTableScanNode->intervalUnit,
|
||||||
|
.slidingUnit = pTableScanNode->slidingUnit,
|
||||||
|
.offset = pTableScanNode->offset,
|
||||||
|
};
|
||||||
|
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -373,24 +401,39 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->cond = *pCond;
|
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]};
|
|
||||||
|
|
||||||
pInfo->interval = *pInterval;
|
int32_t numOfCols = 0;
|
||||||
pInfo->sampleRatio = sampleRatio;
|
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||||
pInfo->dataBlockLoadFlag = dataLoadFlag;
|
|
||||||
pInfo->pResBlock = pResBlock;
|
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||||
pInfo->pFilterNode = pCondition;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||||
|
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
|
||||||
|
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||||
|
|
||||||
|
pInfo->readHandle = *readHandle;
|
||||||
|
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
||||||
|
pInfo->sampleRatio = pTableScanNode->ratio;
|
||||||
|
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||||
|
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||||
|
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||||
pInfo->dataReader = pDataReader;
|
pInfo->dataReader = pDataReader;
|
||||||
pInfo->scanFlag = MAIN_SCAN;
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
pInfo->pColMatchInfo = pColMatchInfo;
|
pInfo->pColMatchInfo = pColList;
|
||||||
|
|
||||||
pOperator->name = "TableScanOperator"; // for dubug purpose
|
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = numOfOutput;
|
pOperator->numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||||
|
@ -1311,7 +1354,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
colDataAppend(pDst, count, str, false);
|
colDataAppend(pDst, count, str, false);
|
||||||
|
|
||||||
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
||||||
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
||||||
// doSetTagValueToResultBuf(dst, data, type, bytes);
|
// doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||||
|
|
|
@ -775,7 +775,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||||
|
|
||||||
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
||||||
|
@ -910,7 +910,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1024,7 +1024,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
if (pInfo->invertible) {
|
if (pInfo->invertible) {
|
||||||
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
|
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
|
||||||
}
|
}
|
||||||
|
@ -1286,7 +1286,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1334,7 +1334,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ target_include_directories(
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(scalar
|
target_link_libraries(scalar
|
||||||
PRIVATE os util common nodes function qcom
|
PRIVATE os util common nodes function qcom vnode
|
||||||
)
|
)
|
||||||
|
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
|
|
|
@ -26,6 +26,7 @@ typedef struct SScalarCtx {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
SArray *pBlockList; /* element is SSDataBlock* */
|
SArray *pBlockList; /* element is SSDataBlock* */
|
||||||
SHashObj *pRes; /* element is SScalarParam */
|
SHashObj *pRes; /* element is SScalarParam */
|
||||||
|
void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
|
||||||
} SScalarCtx;
|
} SScalarCtx;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -250,6 +250,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
|
||||||
*rowNum = param->numOfRows;
|
*rowNum = param->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
param->param = ctx->param;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -884,7 +885,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList};
|
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param};
|
||||||
|
|
||||||
// TODO: OPT performance
|
// TODO: OPT performance
|
||||||
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
#include "tdatablock.h"
|
|
||||||
#include "ttime.h"
|
|
||||||
#include "sclInt.h"
|
#include "sclInt.h"
|
||||||
#include "sclvector.h"
|
#include "sclvector.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
|
||||||
typedef float (*_float_fn)(float);
|
typedef float (*_float_fn)(float);
|
||||||
typedef double (*_double_fn)(double);
|
typedef double (*_double_fn)(double);
|
||||||
|
@ -1512,6 +1513,21 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
|
|
||||||
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
ASSERT(inputNum == 1);
|
ASSERT(inputNum == 1);
|
||||||
colDataAppend(pOutput->columnData, pOutput->numOfRows, colDataGetData(pInput->columnData, 0), false);
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pInput->param, 0);
|
||||||
|
|
||||||
|
uint64_t uid = *(uint64_t *)colDataGetData(pInput->columnData, 0);
|
||||||
|
metaGetTableEntryByUid(&mr, uid);
|
||||||
|
|
||||||
|
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(str, mr.me.name);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows += pInput->numOfRows;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue