Merge pull request #12391 from taosdata/feature/3.0_liaohj
fix(query): fix special pseudo column caused taosd crash.
This commit is contained in:
commit
3678aa9094
|
@ -126,7 +126,7 @@ enum {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
MAIN_SCAN = 0x0u,
|
MAIN_SCAN = 0x0u,
|
||||||
REVERSE_SCAN = 0x1u,
|
REVERSE_SCAN = 0x1u, // todo remove it
|
||||||
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
|
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
|
||||||
MERGE_STAGE = 0x20u,
|
MERGE_STAGE = 0x20u,
|
||||||
};
|
};
|
||||||
|
@ -222,12 +222,12 @@ enum {
|
||||||
typedef struct tExprNode {
|
typedef struct tExprNode {
|
||||||
int32_t nodeType;
|
int32_t nodeType;
|
||||||
union {
|
union {
|
||||||
struct {
|
// struct {
|
||||||
int32_t optr; // binary operator
|
// int32_t optr; // binary operator
|
||||||
void *info; // support filter operation on this expression only available for leaf node
|
// void *info; // support filter operation on this expression only available for leaf node
|
||||||
struct tExprNode *pLeft; // left child pointer
|
// struct tExprNode *pLeft; // left child pointer
|
||||||
struct tExprNode *pRight; // right child pointer
|
// struct tExprNode *pRight; // right child pointer
|
||||||
} _node;
|
// } _node;
|
||||||
|
|
||||||
SSchema *pSchema;// column node
|
SSchema *pSchema;// column node
|
||||||
struct SVariant *pVal; // value node
|
struct SVariant *pVal; // value node
|
||||||
|
@ -237,12 +237,6 @@ typedef struct tExprNode {
|
||||||
int32_t functionId;
|
int32_t functionId;
|
||||||
int32_t num;
|
int32_t num;
|
||||||
struct SFunctionNode *pFunctNode;
|
struct SFunctionNode *pFunctNode;
|
||||||
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
|
|
||||||
// calculation instead.
|
|
||||||
// E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes.
|
|
||||||
// The concat function, concat(col1, col2), is a binary scalar
|
|
||||||
// operator and is kept in the attribute of _node.
|
|
||||||
struct tExprNode **pChild;
|
|
||||||
} _function;
|
} _function;
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
|
@ -271,9 +265,10 @@ typedef struct SAggFunctionInfo {
|
||||||
} SAggFunctionInfo;
|
} SAggFunctionInfo;
|
||||||
|
|
||||||
struct SScalarParam {
|
struct SScalarParam {
|
||||||
SColumnInfoData *columnData;
|
SColumnInfoData *columnData;
|
||||||
SHashObj *pHashFilter;
|
SHashObj *pHashFilter;
|
||||||
int32_t numOfRows;
|
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
|
||||||
|
int32_t numOfRows;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
|
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
|
||||||
|
@ -281,10 +276,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
|
||||||
|
|
||||||
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
|
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
|
||||||
|
|
||||||
tExprNode* exprTreeFromBinary(const void* data, size_t size);
|
|
||||||
|
|
||||||
tExprNode* exprdup(tExprNode* pTree);
|
|
||||||
|
|
||||||
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
|
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
|
||||||
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
|
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
|
||||||
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
|
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
|
||||||
|
|
|
@ -333,6 +333,8 @@ typedef struct SScanInfo {
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
void* dataReader;
|
void* dataReader;
|
||||||
|
SReadHandle readHandle;
|
||||||
|
|
||||||
SFileBlockLoadRecorder readRecorder;
|
SFileBlockLoadRecorder readRecorder;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
|
@ -348,6 +350,11 @@ typedef struct STableScanInfo {
|
||||||
SArray* pColMatchInfo;
|
SArray* pColMatchInfo;
|
||||||
int32_t numOfOutput;
|
int32_t numOfOutput;
|
||||||
|
|
||||||
|
SExprInfo* pPseudoExpr;
|
||||||
|
int32_t numOfPseudoExpr;
|
||||||
|
SqlFunctionCtx* pPseudoCtx;
|
||||||
|
// int32_t* rowCellInfoOffset;
|
||||||
|
|
||||||
SQueryTableDataCond cond;
|
SQueryTableDataCond cond;
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
|
@ -628,7 +635,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||||
SArray* pColList);
|
SArray* pColList);
|
||||||
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
||||||
int32_t getTableScanOrder(SOperatorInfo* pOperator);
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
||||||
|
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||||
|
@ -644,12 +651,17 @@ SSDataBlock* loadNextDataBlock(void* param);
|
||||||
|
|
||||||
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
|
||||||
|
|
||||||
|
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
||||||
|
int32_t type);
|
||||||
|
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
|
||||||
|
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
|
||||||
|
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
||||||
|
|
||||||
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
|
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
|
||||||
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
||||||
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
|
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo,
|
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
|
||||||
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
|
@ -704,7 +716,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
|
||||||
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
||||||
int32_t numOfOutput, SArray* pPseudoList);
|
int32_t numOfOutput, SArray* pPseudoList);
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
|
||||||
|
|
||||||
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
||||||
|
|
|
@ -654,7 +654,7 @@ static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* p
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
||||||
bool createDummyCol);
|
int32_t scanFlag, bool createDummyCol);
|
||||||
|
|
||||||
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
||||||
int32_t order) {
|
int32_t order) {
|
||||||
|
@ -665,12 +665,12 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
||||||
bool createDummyCol) {
|
bool createDummyCol) {
|
||||||
if (pBlock->pBlockAgg != NULL) {
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
|
||||||
} else {
|
} else {
|
||||||
doSetInputDataBlock(pOperator, pCtx, pBlock, order, createDummyCol);
|
doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -717,14 +717,14 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
||||||
bool createDummyCol) {
|
int32_t scanFlag, bool createDummyCol) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
||||||
pCtx[i].order = order;
|
pCtx[i].order = order;
|
||||||
pCtx[i].size = pBlock->info.rows;
|
pCtx[i].size = pBlock->info.rows;
|
||||||
pCtx[i].pSrcBlock = pBlock;
|
pCtx[i].pSrcBlock = pBlock;
|
||||||
pCtx[i].currentStage = MAIN_SCAN;
|
pCtx[i].currentStage = scanFlag;
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx[i].input;
|
SInputColumnInfoData* pInput = &pCtx[i].input;
|
||||||
pInput->uid = pBlock->info.uid;
|
pInput->uid = pBlock->info.uid;
|
||||||
|
@ -740,7 +740,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
pInput->numOfRows = pBlock->info.rows;
|
pInput->numOfRows = pBlock->info.rows;
|
||||||
pInput->startRowIndex = 0;
|
pInput->startRowIndex = 0;
|
||||||
|
|
||||||
// the last parameter is the timestamp column
|
// NOTE: the last parameter is the primary timestamp column
|
||||||
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
|
||||||
pInput->pPTS = pInput->pData[j];
|
pInput->pPTS = pInput->pData[j];
|
||||||
}
|
}
|
||||||
|
@ -884,7 +884,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||||
ASSERT(!fmIsAggFunc(pfCtx->functionId));
|
ASSERT(!fmIsAggFunc(pfCtx->functionId));
|
||||||
|
|
||||||
if (fmIsPseudoColumnFunc(pfCtx->functionId)) {
|
// _rowts/_c0, not tbname column
|
||||||
|
if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) {
|
} else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
|
||||||
|
@ -3506,7 +3507,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||||
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
||||||
// pOperator->pRuntimeEnv, true);
|
// pOperator->pRuntimeEnv, true);
|
||||||
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
|
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
|
||||||
|
@ -3671,17 +3672,24 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getTableScanOrder(SOperatorInfo* pOperator) {
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
// todo add more information about exchange operation
|
||||||
|
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
|
||||||
|
*order = TSDB_ORDER_ASC;
|
||||||
|
*scanFlag = MAIN_SCAN;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
*order = pTableScanInfo->cond.order;
|
||||||
|
*scanFlag = pTableScanInfo->scanFlag;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
|
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
|
||||||
return TSDB_ORDER_ASC;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
} else {
|
} else {
|
||||||
return getTableScanOrder(pOperator->pDownstream[0]);
|
return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
|
||||||
return pTableScanInfo->cond.order;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a blocking operator
|
// this is a blocking operator
|
||||||
|
@ -3697,6 +3705,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
|
int32_t scanFlag = MAIN_SCAN;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
@ -3709,11 +3720,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
int32_t order = getTableScanOrder(pOperator);
|
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||||
if (pAggInfo->pScalarExprInfo != NULL) {
|
if (pAggInfo->pScalarExprInfo != NULL) {
|
||||||
int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
|
code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
|
||||||
pAggInfo->numOfScalarExpr, NULL);
|
pAggInfo->numOfScalarExpr, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
@ -3723,7 +3737,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true);
|
||||||
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
||||||
|
|
||||||
#if 0 // test for encode/decode result info
|
#if 0 // test for encode/decode result info
|
||||||
|
@ -4004,6 +4018,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
int32_t order = 0;
|
||||||
|
int32_t scanFlag = 0;
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -4035,15 +4052,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
int32_t order = getTableScanOrder(pOperator->pDownstream[0]);
|
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo);
|
||||||
pProjectInfo->pPseudoColInfo);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
longjmp(pTaskInfo->env, code);
|
||||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t status = handleLimitOffset(pOperator, pBlock);
|
int32_t status = handleLimitOffset(pOperator, pBlock);
|
||||||
|
@ -4642,8 +4658,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
|
|
||||||
pExp->pExpr->_function.functionId = pFuncNode->funcId;
|
pExp->pExpr->_function.functionId = pFuncNode->funcId;
|
||||||
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
||||||
|
|
||||||
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
|
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
|
||||||
tListLen(pExp->pExpr->_function.functionName));
|
tListLen(pExp->pExpr->_function.functionName));
|
||||||
|
#if 1
|
||||||
|
// todo refactor: add the parameter for tbname function
|
||||||
|
if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) {
|
||||||
|
pFuncNode->pParameterList = nodesMakeList();
|
||||||
|
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
||||||
|
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
if (NULL == res) { // todo handle error
|
||||||
|
} else {
|
||||||
|
res->node.resType = (SDataType) {.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
|
nodesListAppend(pFuncNode->pParameterList, res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||||
|
|
||||||
|
@ -4704,58 +4734,29 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
|
||||||
uint64_t queryId, uint64_t taskId);
|
uint64_t queryId, uint64_t taskId);
|
||||||
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
|
||||||
int32_t type);
|
|
||||||
|
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
|
|
||||||
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
||||||
|
|
||||||
static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
|
|
||||||
SInterval interval = {
|
|
||||||
.interval = pTableScanNode->interval,
|
|
||||||
.sliding = pTableScanNode->sliding,
|
|
||||||
.intervalUnit = pTableScanNode->intervalUnit,
|
|
||||||
.slidingUnit = pTableScanNode->slidingUnit,
|
|
||||||
.offset = pTableScanNode->offset,
|
|
||||||
};
|
|
||||||
|
|
||||||
return interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||||
int32_t type = nodeType(pPhyNode);
|
int32_t type = nodeType(pPhyNode);
|
||||||
|
|
||||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
|
||||||
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||||
|
|
||||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
|
||||||
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SInterval interval = extractIntervalInfo(pTableScanNode);
|
|
||||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(
|
|
||||||
pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, pResBlock,
|
|
||||||
pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||||
|
@ -4945,7 +4946,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return pOptr;
|
return pOptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
|
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
|
||||||
pCond->loadExternalRows = false;
|
pCond->loadExternalRows = false;
|
||||||
|
|
||||||
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
|
|
|
@ -287,7 +287,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
if (pInfo->pScalarExprInfo != NULL) {
|
if (pInfo->pScalarExprInfo != NULL) {
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <libs/function/function.h>
|
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
@ -284,6 +283,27 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// currently only the tbname pseudo column
|
||||||
|
if (pTableScanInfo->numOfPseudoExpr > 0) {
|
||||||
|
int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
|
colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
|
||||||
|
|
||||||
|
struct SScalarFuncExecFuncs fpSet;
|
||||||
|
fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet);
|
||||||
|
|
||||||
|
SColumnInfoData infoData = {0};
|
||||||
|
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
infoData.info.bytes = sizeof(uint64_t);
|
||||||
|
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
||||||
|
|
||||||
|
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
|
||||||
|
SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
|
||||||
|
|
||||||
|
SScalarParam param = {.columnData = pColInfoData};
|
||||||
|
fpSet.process(&srcParam, 1, ¶m);
|
||||||
|
}
|
||||||
|
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -314,8 +334,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
|
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
|
||||||
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
|
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
|
||||||
"-%" PRId64,
|
"-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
|
||||||
|
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
|
@ -359,10 +378,19 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput,
|
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
|
||||||
int32_t dataLoadFlag, const uint8_t* scanInfo, SArray* pColMatchInfo,
|
SInterval interval = {
|
||||||
SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval,
|
.interval = pTableScanNode->interval,
|
||||||
double sampleRatio, SExecTaskInfo* pTaskInfo) {
|
.sliding = pTableScanNode->sliding,
|
||||||
|
.intervalUnit = pTableScanNode->intervalUnit,
|
||||||
|
.slidingUnit = pTableScanNode->slidingUnit,
|
||||||
|
.offset = pTableScanNode->offset,
|
||||||
|
};
|
||||||
|
|
||||||
|
return interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||||
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -373,25 +401,40 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->cond = *pCond;
|
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
||||||
pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]};
|
|
||||||
|
|
||||||
pInfo->interval = *pInterval;
|
int32_t numOfCols = 0;
|
||||||
pInfo->sampleRatio = sampleRatio;
|
SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||||
pInfo->dataBlockLoadFlag = dataLoadFlag;
|
|
||||||
pInfo->pResBlock = pResBlock;
|
|
||||||
pInfo->pFilterNode = pCondition;
|
|
||||||
pInfo->dataReader = pDataReader;
|
|
||||||
pInfo->scanFlag = MAIN_SCAN;
|
|
||||||
pInfo->pColMatchInfo = pColMatchInfo;
|
|
||||||
|
|
||||||
pOperator->name = "TableScanOperator"; // for dubug purpose
|
int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||||
|
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
|
||||||
|
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
|
||||||
|
|
||||||
|
pInfo->readHandle = *readHandle;
|
||||||
|
pInfo->interval = extractIntervalInfo(pTableScanNode);
|
||||||
|
pInfo->sampleRatio = pTableScanNode->ratio;
|
||||||
|
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
|
||||||
|
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||||
|
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||||
|
pInfo->dataReader = pDataReader;
|
||||||
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
|
pInfo->pColMatchInfo = pColList;
|
||||||
|
|
||||||
|
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = numOfOutput;
|
pOperator->numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||||
|
|
||||||
|
@ -1311,7 +1354,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
colDataAppend(pDst, count, str, false);
|
colDataAppend(pDst, count, str, false);
|
||||||
|
|
||||||
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
||||||
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
||||||
// doSetTagValueToResultBuf(dst, data, type, bytes);
|
// doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||||
|
|
|
@ -775,7 +775,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
|
||||||
|
|
||||||
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
|
||||||
|
@ -910,7 +910,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1024,7 +1024,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
if (pInfo->invertible) {
|
if (pInfo->invertible) {
|
||||||
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
|
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
|
||||||
}
|
}
|
||||||
|
@ -1286,7 +1286,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1334,7 +1334,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true);
|
setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1653,7 +1653,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
pResInfo->complete = true;
|
pResInfo->complete = true;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
|
pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1704,30 +1704,28 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
pInfo->numOfElems += 1;
|
pInfo->numOfElems += 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// the second stage, calculate the true percentile value
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
char* data = colDataGetData(pCol, i);
|
||||||
}
|
notNullElems += 1;
|
||||||
|
tMemBucketPut(pInfo->pMemBucket, data, 1);
|
||||||
// the second stage, calculate the true percentile value
|
|
||||||
int32_t start = pInput->startRowIndex;
|
|
||||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
|
||||||
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char* data = colDataGetData(pCol, i);
|
SET_VAL(pResInfo, notNullElems, 1);
|
||||||
|
|
||||||
notNullElems += 1;
|
|
||||||
tMemBucketPut(pInfo->pMemBucket, data, 1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_VAL(pResInfo, notNullElems, 1);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SVariant* pVal = &pCtx->param[1].param;
|
SVariant* pVal = &pCtx->param[1].param;
|
||||||
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
|
double v = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
|
@ -64,21 +64,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t type = (*pExpr)->nodeType;
|
int32_t type = (*pExpr)->nodeType;
|
||||||
if (type == TEXPR_BINARYEXPR_NODE) {
|
if (type == TEXPR_VALUE_NODE) {
|
||||||
doExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
|
|
||||||
doExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
|
|
||||||
|
|
||||||
if (fp != NULL) {
|
|
||||||
fp((*pExpr)->_node.info);
|
|
||||||
}
|
|
||||||
} else if (type == TEXPR_UNARYEXPR_NODE) {
|
|
||||||
doExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
|
|
||||||
if (fp != NULL) {
|
|
||||||
fp((*pExpr)->_node.info);
|
|
||||||
}
|
|
||||||
|
|
||||||
assert((*pExpr)->_node.pRight == NULL);
|
|
||||||
} else if (type == TEXPR_VALUE_NODE) {
|
|
||||||
taosVariantDestroy((*pExpr)->pVal);
|
taosVariantDestroy((*pExpr)->pVal);
|
||||||
taosMemoryFree((*pExpr)->pVal);
|
taosMemoryFree((*pExpr)->pVal);
|
||||||
} else if (type == TEXPR_COL_NODE) {
|
} else if (type == TEXPR_COL_NODE) {
|
||||||
|
@ -90,9 +76,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
|
bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
|
||||||
tExprNode *pLeft = pExpr->_node.pLeft;
|
#if 0
|
||||||
tExprNode *pRight = pExpr->_node.pRight;
|
|
||||||
|
|
||||||
//non-leaf nodes, recursively traverse the expression tree in the post-root order
|
//non-leaf nodes, recursively traverse the expression tree in the post-root order
|
||||||
if (pLeft->nodeType == TEXPR_BINARYEXPR_NODE && pRight->nodeType == TEXPR_BINARYEXPR_NODE) {
|
if (pLeft->nodeType == TEXPR_BINARYEXPR_NODE && pRight->nodeType == TEXPR_BINARYEXPR_NODE) {
|
||||||
if (pExpr->_node.optr == LOGIC_COND_TYPE_OR) { // or
|
if (pExpr->_node.optr == LOGIC_COND_TYPE_OR) { // or
|
||||||
|
@ -114,6 +98,9 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp
|
||||||
// handle the leaf node
|
// handle the leaf node
|
||||||
param->setupInfoFn(pExpr, param->pExtInfo);
|
param->setupInfoFn(pExpr, param->pExtInfo);
|
||||||
return param->nodeFilterFn(pItem, pExpr->_node.info);
|
return param->nodeFilterFn(pItem, pExpr->_node.info);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: these three functions should be made global
|
// TODO: these three functions should be made global
|
||||||
|
@ -141,59 +128,6 @@ static UNUSED_FUNC char* exception_strdup(const char* str) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) {
|
|
||||||
int32_t anchor = CLEANUP_GET_ANCHOR();
|
|
||||||
if (CLEANUP_EXCEED_LIMIT()) {
|
|
||||||
THROW(TSDB_CODE_QRY_EXCEED_TAGS_LIMIT);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tExprNode* pExpr = exception_calloc(1, sizeof(tExprNode));
|
|
||||||
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, pExpr, NULL);
|
|
||||||
pExpr->nodeType = tbufReadUint8(br);
|
|
||||||
|
|
||||||
if (pExpr->nodeType == TEXPR_VALUE_NODE) {
|
|
||||||
SVariant* pVal = exception_calloc(1, sizeof(SVariant));
|
|
||||||
pExpr->pVal = pVal;
|
|
||||||
|
|
||||||
pVal->nType = tbufReadUint32(br);
|
|
||||||
if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
|
|
||||||
tbufReadToBuffer(br, &pVal->nLen, sizeof(pVal->nLen));
|
|
||||||
pVal->pz = taosMemoryCalloc(1, pVal->nLen + 1);
|
|
||||||
tbufReadToBuffer(br, pVal->pz, pVal->nLen);
|
|
||||||
} else {
|
|
||||||
pVal->i = tbufReadInt64(br);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else if (pExpr->nodeType == TEXPR_COL_NODE) {
|
|
||||||
SSchema* pSchema = exception_calloc(1, sizeof(SSchema));
|
|
||||||
pExpr->pSchema = pSchema;
|
|
||||||
|
|
||||||
pSchema->colId = tbufReadInt16(br);
|
|
||||||
pSchema->bytes = tbufReadInt16(br);
|
|
||||||
pSchema->type = tbufReadUint8(br);
|
|
||||||
tbufReadToString(br, pSchema->name, TSDB_COL_NAME_LEN);
|
|
||||||
|
|
||||||
} else if (pExpr->nodeType == TEXPR_BINARYEXPR_NODE) {
|
|
||||||
pExpr->_node.optr = tbufReadUint8(br);
|
|
||||||
pExpr->_node.pLeft = exprTreeFromBinaryImpl(br);
|
|
||||||
pExpr->_node.pRight = exprTreeFromBinaryImpl(br);
|
|
||||||
assert(pExpr->_node.pLeft != NULL && pExpr->_node.pRight != NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
CLEANUP_EXECUTE_TO(anchor, false);
|
|
||||||
return pExpr;
|
|
||||||
}
|
|
||||||
|
|
||||||
tExprNode* exprTreeFromBinary(const void* data, size_t size) {
|
|
||||||
if (size == 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SBufferReader br = tbufInitReader(data, size, false);
|
|
||||||
return exprTreeFromBinaryImpl(&br);
|
|
||||||
}
|
|
||||||
|
|
||||||
void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) {
|
void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) {
|
||||||
SBufferReader br = tbufInitReader(buf, len, false);
|
SBufferReader br = tbufInitReader(buf, len, false);
|
||||||
uint32_t type = tbufReadUint32(&br);
|
uint32_t type = tbufReadUint32(&br);
|
||||||
|
@ -405,38 +339,3 @@ err_ret:
|
||||||
taosHashCleanup(pObj);
|
taosHashCleanup(pObj);
|
||||||
taosMemoryFreeClear(tmp);
|
taosMemoryFreeClear(tmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
tExprNode* exprdup(tExprNode* pNode) {
|
|
||||||
if (pNode == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tExprNode* pCloned = taosMemoryCalloc(1, sizeof(tExprNode));
|
|
||||||
if (pNode->nodeType == TEXPR_BINARYEXPR_NODE) {
|
|
||||||
tExprNode* pLeft = exprdup(pNode->_node.pLeft);
|
|
||||||
tExprNode* pRight = exprdup(pNode->_node.pRight);
|
|
||||||
|
|
||||||
pCloned->_node.pLeft = pLeft;
|
|
||||||
pCloned->_node.pRight = pRight;
|
|
||||||
pCloned->_node.optr = pNode->_node.optr;
|
|
||||||
} else if (pNode->nodeType == TEXPR_VALUE_NODE) {
|
|
||||||
pCloned->pVal = taosMemoryCalloc(1, sizeof(SVariant));
|
|
||||||
taosVariantAssign(pCloned->pVal, pNode->pVal);
|
|
||||||
} else if (pNode->nodeType == TEXPR_COL_NODE) {
|
|
||||||
pCloned->pSchema = taosMemoryCalloc(1, sizeof(SSchema));
|
|
||||||
*pCloned->pSchema = *pNode->pSchema;
|
|
||||||
} else if (pNode->nodeType == TEXPR_FUNCTION_NODE) {
|
|
||||||
strcpy(pCloned->_function.functionName, pNode->_function.functionName);
|
|
||||||
|
|
||||||
int32_t num = pNode->_function.num;
|
|
||||||
pCloned->_function.num = num;
|
|
||||||
pCloned->_function.pChild = taosMemoryCalloc(num, POINTER_BYTES);
|
|
||||||
for(int32_t i = 0; i < num; ++i) {
|
|
||||||
pCloned->_function.pChild[i] = exprdup(pNode->_function.pChild[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pCloned->nodeType = pNode->nodeType;
|
|
||||||
return pCloned;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ target_include_directories(
|
||||||
)
|
)
|
||||||
|
|
||||||
target_link_libraries(scalar
|
target_link_libraries(scalar
|
||||||
PRIVATE os util common nodes function qcom
|
PRIVATE os util common nodes function qcom vnode
|
||||||
)
|
)
|
||||||
|
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
|
|
|
@ -26,6 +26,7 @@ typedef struct SScalarCtx {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
SArray *pBlockList; /* element is SSDataBlock* */
|
SArray *pBlockList; /* element is SSDataBlock* */
|
||||||
SHashObj *pRes; /* element is SScalarParam */
|
SHashObj *pRes; /* element is SScalarParam */
|
||||||
|
void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
|
||||||
} SScalarCtx;
|
} SScalarCtx;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -265,6 +265,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
|
||||||
*rowNum = param->numOfRows;
|
*rowNum = param->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
param->param = ctx->param;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -909,7 +910,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList};
|
SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param};
|
||||||
|
|
||||||
// TODO: OPT performance
|
// TODO: OPT performance
|
||||||
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
|
@ -1,10 +1,11 @@
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "scalar.h"
|
#include "scalar.h"
|
||||||
#include "tdatablock.h"
|
|
||||||
#include "ttime.h"
|
|
||||||
#include "sclInt.h"
|
#include "sclInt.h"
|
||||||
#include "sclvector.h"
|
#include "sclvector.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
#include "tjson.h"
|
#include "tjson.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
|
||||||
typedef float (*_float_fn)(float);
|
typedef float (*_float_fn)(float);
|
||||||
typedef double (*_double_fn)(double);
|
typedef double (*_double_fn)(double);
|
||||||
|
@ -1512,6 +1513,21 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
|
|
||||||
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
ASSERT(inputNum == 1);
|
ASSERT(inputNum == 1);
|
||||||
colDataAppend(pOutput->columnData, pOutput->numOfRows, colDataGetData(pInput->columnData, 0), false);
|
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
metaReaderInit(&mr, pInput->param, 0);
|
||||||
|
|
||||||
|
uint64_t uid = *(uint64_t *)colDataGetData(pInput->columnData, 0);
|
||||||
|
metaGetTableEntryByUid(&mr, uid);
|
||||||
|
|
||||||
|
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
STR_TO_VARSTR(str, mr.me.name);
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||||
|
colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->numOfRows += pInput->numOfRows;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue