diff --git a/include/libs/function/function.h b/include/libs/function/function.h
index 88ac1532c2..7d977b0d23 100644
--- a/include/libs/function/function.h
+++ b/include/libs/function/function.h
@@ -126,7 +126,7 @@ enum {
enum {
MAIN_SCAN = 0x0u,
- REVERSE_SCAN = 0x1u,
+ REVERSE_SCAN = 0x1u, // todo remove it
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
MERGE_STAGE = 0x20u,
};
@@ -222,12 +222,12 @@ enum {
typedef struct tExprNode {
int32_t nodeType;
union {
- struct {
- int32_t optr; // binary operator
- void *info; // support filter operation on this expression only available for leaf node
- struct tExprNode *pLeft; // left child pointer
- struct tExprNode *pRight; // right child pointer
- } _node;
+// struct {
+// int32_t optr; // binary operator
+// void *info; // support filter operation on this expression only available for leaf node
+// struct tExprNode *pLeft; // left child pointer
+// struct tExprNode *pRight; // right child pointer
+// } _node;
SSchema *pSchema;// column node
struct SVariant *pVal; // value node
@@ -237,12 +237,6 @@ typedef struct tExprNode {
int32_t functionId;
int32_t num;
struct SFunctionNode *pFunctNode;
- // Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
- // calculation instead.
- // E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes.
- // The concat function, concat(col1, col2), is a binary scalar
- // operator and is kept in the attribute of _node.
- struct tExprNode **pChild;
} _function;
struct {
@@ -271,9 +265,10 @@ typedef struct SAggFunctionInfo {
} SAggFunctionInfo;
struct SScalarParam {
- SColumnInfoData *columnData;
- SHashObj *pHashFilter;
- int32_t numOfRows;
+ SColumnInfoData *columnData;
+ SHashObj *pHashFilter;
+ void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
+ int32_t numOfRows;
};
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
@@ -281,10 +276,6 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
-tExprNode* exprTreeFromBinary(const void* data, size_t size);
-
-tExprNode* exprdup(tExprNode* pTree);
-
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 4881f23134..ba18d30a52 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -333,6 +333,8 @@ typedef struct SScanInfo {
typedef struct STableScanInfo {
void* dataReader;
+ SReadHandle readHandle;
+
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
int64_t elapsedTime;
@@ -348,6 +350,11 @@ typedef struct STableScanInfo {
SArray* pColMatchInfo;
int32_t numOfOutput;
+ SExprInfo* pPseudoExpr;
+ int32_t numOfPseudoExpr;
+ SqlFunctionCtx* pPseudoCtx;
+// int32_t* rowCellInfoOffset;
+
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
@@ -628,7 +635,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
-int32_t getTableScanOrder(SOperatorInfo* pOperator);
+int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
@@ -644,12 +651,17 @@ SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
+SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
+ int32_t type);
+SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
+SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
+int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
+
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
-SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput, int32_t dataLoadFlag, const uint8_t* scanInfo,
- SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo);
+SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
@@ -704,7 +716,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
-void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
+void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index a5bc1fdf58..7e9ca1c52a 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -654,7 +654,7 @@ static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* p
}
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
- bool createDummyCol);
+ int32_t scanFlag, bool createDummyCol);
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
int32_t order) {
@@ -665,12 +665,12 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
}
}
-void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
+void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol) {
if (pBlock->pBlockAgg != NULL) {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
} else {
- doSetInputDataBlock(pOperator, pCtx, pBlock, order, createDummyCol);
+ doSetInputDataBlock(pOperator, pCtx, pBlock, order, scanFlag, createDummyCol);
}
}
@@ -717,14 +717,14 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc
}
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
- bool createDummyCol) {
+ int32_t scanFlag, bool createDummyCol) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].pSrcBlock = pBlock;
- pCtx[i].currentStage = MAIN_SCAN;
+ pCtx[i].currentStage = scanFlag;
SInputColumnInfoData* pInput = &pCtx[i].input;
pInput->uid = pBlock->info.uid;
@@ -740,7 +740,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0;
- // the last parameter is the timestamp column
+ // NOTE: the last parameter is the primary timestamp column
if (fmIsTimelineFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) {
pInput->pPTS = pInput->pData[j];
}
@@ -884,7 +884,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
ASSERT(!fmIsAggFunc(pfCtx->functionId));
- if (fmIsPseudoColumnFunc(pfCtx->functionId)) {
+ // _rowts/_c0, not tbname column
+ if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
// do nothing
} else if (fmIsNonstandardSQLFunc(pfCtx->functionId)) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
@@ -3506,7 +3507,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
break;
}
- setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true);
+ setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
// pOperator->pRuntimeEnv, true);
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
@@ -3671,17 +3672,24 @@ _error:
return NULL;
}
-int32_t getTableScanOrder(SOperatorInfo* pOperator) {
- if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
+int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag) {
+ // todo add more information about exchange operation
+ if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
+ *order = TSDB_ORDER_ASC;
+ *scanFlag = MAIN_SCAN;
+ return TSDB_CODE_SUCCESS;
+ } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
+ STableScanInfo* pTableScanInfo = pOperator->info;
+ *order = pTableScanInfo->cond.order;
+ *scanFlag = pTableScanInfo->scanFlag;
+ return TSDB_CODE_SUCCESS;
+ } else {
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
- return TSDB_ORDER_ASC;
+ return TSDB_CODE_INVALID_PARA;
} else {
- return getTableScanOrder(pOperator->pDownstream[0]);
+ return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
}
}
-
- STableScanInfo* pTableScanInfo = pOperator->info;
- return pTableScanInfo->cond.order;
}
// this is a blocking operator
@@ -3697,6 +3705,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
+ int32_t order = TSDB_ORDER_ASC;
+ int32_t scanFlag = MAIN_SCAN;
+
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
@@ -3709,11 +3720,14 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// }
- int32_t order = getTableScanOrder(pOperator);
+ int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
+ if (code != TSDB_CODE_SUCCESS) {
+ longjmp(pTaskInfo->env, code);
+ }
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->pScalarExprInfo != NULL) {
- int32_t code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
+ code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
pAggInfo->numOfScalarExpr, NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
@@ -3723,7 +3737,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
- setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true);
+ setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true);
doAggregateImpl(pOperator, 0, pInfo->pCtx);
#if 0 // test for encode/decode result info
@@ -4004,6 +4018,9 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
#endif
+ int32_t order = 0;
+ int32_t scanFlag = 0;
+
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
@@ -4035,15 +4052,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// }
// the pDataBlock are always the same one, no need to call this again
- int32_t order = getTableScanOrder(pOperator->pDownstream[0]);
+ int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
- setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false);
+ setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
- pTaskInfo->code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
- pProjectInfo->pPseudoColInfo);
- if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
- longjmp(pTaskInfo->env, pTaskInfo->code);
+ code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo);
+ if (code != TSDB_CODE_SUCCESS) {
+ longjmp(pTaskInfo->env, code);
}
int32_t status = handleLimitOffset(pOperator, pBlock);
@@ -4642,8 +4658,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pExp->pExpr->_function.functionId = pFuncNode->funcId;
pExp->pExpr->_function.pFunctNode = pFuncNode;
+
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
tListLen(pExp->pExpr->_function.functionName));
+#if 1
+ // todo refactor: add the parameter for tbname function
+ if (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0) {
+ pFuncNode->pParameterList = nodesMakeList();
+ ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
+ SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
+ if (NULL == res) { // todo handle error
+ } else {
+ res->node.resType = (SDataType) {.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
+ nodesListAppend(pFuncNode->pParameterList, res);
+ }
+ }
+#endif
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
@@ -4704,58 +4734,29 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t
uint64_t queryId, uint64_t taskId);
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
static SArray* extractColumnInfo(SNodeList* pNodeList);
-static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
- int32_t type);
static SArray* createSortInfo(SNodeList* pNodeList);
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
-static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
-static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
- SInterval interval = {
- .interval = pTableScanNode->interval,
- .sliding = pTableScanNode->sliding,
- .intervalUnit = pTableScanNode->intervalUnit,
- .slidingUnit = pTableScanNode->slidingUnit,
- .offset = pTableScanNode->offset,
- };
-
- return interval;
-}
-
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
int32_t type = nodeType(pPhyNode);
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
- SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
- int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL && terrno != 0) {
return NULL;
}
- SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
+ SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
- SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
- SSDataBlock* pResBlock = createResDataBlock(pDescNode);
-
- SQueryTableDataCond cond = {0};
- int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
- if (code != TSDB_CODE_SUCCESS) {
- return NULL;
- }
-
- SInterval interval = extractIntervalInfo(pTableScanNode);
- SOperatorInfo* pOperator = createTableScanOperatorInfo(
- pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, pResBlock,
- pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
STableScanInfo* pScanInfo = pOperator->info;
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
+
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
@@ -4945,7 +4946,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOptr;
}
-static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
+int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
pCond->loadExternalRows = false;
pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index 5d22c13ec6..ac6f0cf881 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -287,7 +287,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
- setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
+ setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->pScalarExprInfo != NULL) {
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index eaacb561d5..b3d915ab93 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -13,7 +13,6 @@
* along with this program. If not, see .
*/
-#include
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
@@ -284,6 +283,27 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue;
}
+ // currently only the tbname pseudo column
+ if (pTableScanInfo->numOfPseudoExpr > 0) {
+ int32_t dstSlotId = pTableScanInfo->pPseudoExpr->base.resSchema.slotId;
+ SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId);
+ colInfoDataEnsureCapacity(pColInfoData, 0, pBlock->info.rows);
+
+ struct SScalarFuncExecFuncs fpSet;
+ fmGetScalarFuncExecFuncs(pTableScanInfo->pPseudoExpr->pExpr->_function.functionId, &fpSet);
+
+ SColumnInfoData infoData = {0};
+ infoData.info.type = TSDB_DATA_TYPE_BIGINT;
+ infoData.info.bytes = sizeof(uint64_t);
+ colInfoDataEnsureCapacity(&infoData, 0, 1);
+
+ colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
+ SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
+
+ SScalarParam param = {.columnData = pColInfoData};
+ fpSet.process(&srcParam, 1, ¶m);
+ }
+
return pBlock;
}
@@ -314,8 +334,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STimeWindow* pWin = &pTableScanInfo->cond.twindow;
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64
- "-%" PRId64,
- GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
+ "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
// do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
@@ -359,10 +378,19 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL;
}
-SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCond* pCond, int32_t numOfOutput,
- int32_t dataLoadFlag, const uint8_t* scanInfo, SArray* pColMatchInfo,
- SSDataBlock* pResBlock, SNode* pCondition, SInterval* pInterval,
- double sampleRatio, SExecTaskInfo* pTaskInfo) {
+SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
+ SInterval interval = {
+ .interval = pTableScanNode->interval,
+ .sliding = pTableScanNode->sliding,
+ .intervalUnit = pTableScanNode->intervalUnit,
+ .slidingUnit = pTableScanNode->slidingUnit,
+ .offset = pTableScanNode->offset,
+ };
+
+ return interval;
+}
+
+SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@@ -373,25 +401,40 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
return NULL;
}
- pInfo->cond = *pCond;
- pInfo->scanInfo = (SScanInfo){.numOfAsc = scanInfo[0], .numOfDesc = scanInfo[1]};
+ SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
- pInfo->interval = *pInterval;
- pInfo->sampleRatio = sampleRatio;
- pInfo->dataBlockLoadFlag = dataLoadFlag;
- pInfo->pResBlock = pResBlock;
- pInfo->pFilterNode = pCondition;
- pInfo->dataReader = pDataReader;
- pInfo->scanFlag = MAIN_SCAN;
- pInfo->pColMatchInfo = pColMatchInfo;
+ int32_t numOfCols = 0;
+ SArray* pColList = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
- pOperator->name = "TableScanOperator"; // for dubug purpose
+ int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode);
+ if (code != TSDB_CODE_SUCCESS) {
+ return NULL;
+ }
+
+ if (pTableScanNode->scan.pScanPseudoCols != NULL) {
+ pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
+ pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
+ }
+
+ pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
+
+ pInfo->readHandle = *readHandle;
+ pInfo->interval = extractIntervalInfo(pTableScanNode);
+ pInfo->sampleRatio = pTableScanNode->ratio;
+ pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
+ pInfo->pResBlock = createResDataBlock(pDescNode);
+ pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
+ pInfo->dataReader = pDataReader;
+ pInfo->scanFlag = MAIN_SCAN;
+ pInfo->pColMatchInfo = pColList;
+
+ pOperator->name = "TableScanOperator"; // for debug purpose
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
- pOperator->blocking = false;
- pOperator->status = OP_NOT_OPENED;
- pOperator->info = pInfo;
- pOperator->numOfExprs = numOfOutput;
- pOperator->pTaskInfo = pTaskInfo;
+ pOperator->blocking = false;
+ pOperator->status = OP_NOT_OPENED;
+ pOperator->info = pInfo;
+ pOperator->numOfExprs = numOfCols;
+ pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL);
@@ -1311,7 +1354,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
metaReaderClear(&mr);
colDataAppend(pDst, count, str, false);
-
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
// doSetTagValueToResultBuf(dst, data, type, bytes);
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 0f3b1bda20..b194c5e535 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -775,7 +775,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
- setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
+ setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
@@ -910,7 +910,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
break;
}
- setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
+ setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
doStateWindowAggImpl(pOperator, pInfo, pBlock);
}
@@ -1024,7 +1024,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
- setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true);
+ setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
}
@@ -1286,7 +1286,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
- setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, true);
+ setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
}
@@ -1334,7 +1334,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs);
// the pDataBlock are always the same one, no need to call this again
- setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true);
+ setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
// hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0);
}
diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c
index 704f16f8b4..64bee0c096 100644
--- a/source/libs/function/src/builtinsimpl.c
+++ b/source/libs/function/src/builtinsimpl.c
@@ -1653,7 +1653,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
pResInfo->complete = true;
return 0;
} else {
- pInfo->pMemBucket = tMemBucketCreate(pCtx->inputBytes, pCtx->inputType, pInfo->minval, pInfo->maxval);
+ pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval);
}
}
@@ -1704,30 +1704,28 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
pInfo->numOfElems += 1;
}
}
+ } else {
+ // the second stage, calculate the true percentile value
+ int32_t start = pInput->startRowIndex;
+ for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
+ if (colDataIsNull_f(pCol->nullbitmap, i)) {
+ continue;
+ }
- return 0;
- }
-
- // the second stage, calculate the true percentile value
- int32_t start = pInput->startRowIndex;
- for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
- if (colDataIsNull_f(pCol->nullbitmap, i)) {
- continue;
+ char* data = colDataGetData(pCol, i);
+ notNullElems += 1;
+ tMemBucketPut(pInfo->pMemBucket, data, 1);
}
- char* data = colDataGetData(pCol, i);
-
- notNullElems += 1;
- tMemBucketPut(pInfo->pMemBucket, data, 1);
+ SET_VAL(pResInfo, notNullElems, 1);
}
- SET_VAL(pResInfo, notNullElems, 1);
return TSDB_CODE_SUCCESS;
}
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SVariant* pVal = &pCtx->param[1].param;
- double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
+ double v = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo* ppInfo = (SPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
diff --git a/source/libs/function/src/texpr.c b/source/libs/function/src/texpr.c
index 61ff6bb825..83e41b199e 100644
--- a/source/libs/function/src/texpr.c
+++ b/source/libs/function/src/texpr.c
@@ -64,21 +64,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
}
int32_t type = (*pExpr)->nodeType;
- if (type == TEXPR_BINARYEXPR_NODE) {
- doExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
- doExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
-
- if (fp != NULL) {
- fp((*pExpr)->_node.info);
- }
- } else if (type == TEXPR_UNARYEXPR_NODE) {
- doExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
- if (fp != NULL) {
- fp((*pExpr)->_node.info);
- }
-
- assert((*pExpr)->_node.pRight == NULL);
- } else if (type == TEXPR_VALUE_NODE) {
+ if (type == TEXPR_VALUE_NODE) {
taosVariantDestroy((*pExpr)->pVal);
taosMemoryFree((*pExpr)->pVal);
} else if (type == TEXPR_COL_NODE) {
@@ -90,9 +76,7 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
}
bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param) {
- tExprNode *pLeft = pExpr->_node.pLeft;
- tExprNode *pRight = pExpr->_node.pRight;
-
+#if 0
//non-leaf nodes, recursively traverse the expression tree in the post-root order
if (pLeft->nodeType == TEXPR_BINARYEXPR_NODE && pRight->nodeType == TEXPR_BINARYEXPR_NODE) {
if (pExpr->_node.optr == LOGIC_COND_TYPE_OR) { // or
@@ -114,6 +98,9 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp
// handle the leaf node
param->setupInfoFn(pExpr, param->pExtInfo);
return param->nodeFilterFn(pItem, pExpr->_node.info);
+#endif
+
+ return 0;
}
// TODO: these three functions should be made global
@@ -141,59 +128,6 @@ static UNUSED_FUNC char* exception_strdup(const char* str) {
return p;
}
-static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) {
- int32_t anchor = CLEANUP_GET_ANCHOR();
- if (CLEANUP_EXCEED_LIMIT()) {
- THROW(TSDB_CODE_QRY_EXCEED_TAGS_LIMIT);
- return NULL;
- }
-
- tExprNode* pExpr = exception_calloc(1, sizeof(tExprNode));
- CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, pExpr, NULL);
- pExpr->nodeType = tbufReadUint8(br);
-
- if (pExpr->nodeType == TEXPR_VALUE_NODE) {
- SVariant* pVal = exception_calloc(1, sizeof(SVariant));
- pExpr->pVal = pVal;
-
- pVal->nType = tbufReadUint32(br);
- if (pVal->nType == TSDB_DATA_TYPE_BINARY) {
- tbufReadToBuffer(br, &pVal->nLen, sizeof(pVal->nLen));
- pVal->pz = taosMemoryCalloc(1, pVal->nLen + 1);
- tbufReadToBuffer(br, pVal->pz, pVal->nLen);
- } else {
- pVal->i = tbufReadInt64(br);
- }
-
- } else if (pExpr->nodeType == TEXPR_COL_NODE) {
- SSchema* pSchema = exception_calloc(1, sizeof(SSchema));
- pExpr->pSchema = pSchema;
-
- pSchema->colId = tbufReadInt16(br);
- pSchema->bytes = tbufReadInt16(br);
- pSchema->type = tbufReadUint8(br);
- tbufReadToString(br, pSchema->name, TSDB_COL_NAME_LEN);
-
- } else if (pExpr->nodeType == TEXPR_BINARYEXPR_NODE) {
- pExpr->_node.optr = tbufReadUint8(br);
- pExpr->_node.pLeft = exprTreeFromBinaryImpl(br);
- pExpr->_node.pRight = exprTreeFromBinaryImpl(br);
- assert(pExpr->_node.pLeft != NULL && pExpr->_node.pRight != NULL);
- }
-
- CLEANUP_EXECUTE_TO(anchor, false);
- return pExpr;
-}
-
-tExprNode* exprTreeFromBinary(const void* data, size_t size) {
- if (size == 0) {
- return NULL;
- }
-
- SBufferReader br = tbufInitReader(data, size, false);
- return exprTreeFromBinaryImpl(&br);
-}
-
void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) {
SBufferReader br = tbufInitReader(buf, len, false);
uint32_t type = tbufReadUint32(&br);
@@ -405,38 +339,3 @@ err_ret:
taosHashCleanup(pObj);
taosMemoryFreeClear(tmp);
}
-
-tExprNode* exprdup(tExprNode* pNode) {
- if (pNode == NULL) {
- return NULL;
- }
-
- tExprNode* pCloned = taosMemoryCalloc(1, sizeof(tExprNode));
- if (pNode->nodeType == TEXPR_BINARYEXPR_NODE) {
- tExprNode* pLeft = exprdup(pNode->_node.pLeft);
- tExprNode* pRight = exprdup(pNode->_node.pRight);
-
- pCloned->_node.pLeft = pLeft;
- pCloned->_node.pRight = pRight;
- pCloned->_node.optr = pNode->_node.optr;
- } else if (pNode->nodeType == TEXPR_VALUE_NODE) {
- pCloned->pVal = taosMemoryCalloc(1, sizeof(SVariant));
- taosVariantAssign(pCloned->pVal, pNode->pVal);
- } else if (pNode->nodeType == TEXPR_COL_NODE) {
- pCloned->pSchema = taosMemoryCalloc(1, sizeof(SSchema));
- *pCloned->pSchema = *pNode->pSchema;
- } else if (pNode->nodeType == TEXPR_FUNCTION_NODE) {
- strcpy(pCloned->_function.functionName, pNode->_function.functionName);
-
- int32_t num = pNode->_function.num;
- pCloned->_function.num = num;
- pCloned->_function.pChild = taosMemoryCalloc(num, POINTER_BYTES);
- for(int32_t i = 0; i < num; ++i) {
- pCloned->_function.pChild[i] = exprdup(pNode->_function.pChild[i]);
- }
- }
-
- pCloned->nodeType = pNode->nodeType;
- return pCloned;
-}
-
diff --git a/source/libs/scalar/CMakeLists.txt b/source/libs/scalar/CMakeLists.txt
index 02d530533c..87f4bb9c64 100644
--- a/source/libs/scalar/CMakeLists.txt
+++ b/source/libs/scalar/CMakeLists.txt
@@ -8,7 +8,7 @@ target_include_directories(
)
target_link_libraries(scalar
- PRIVATE os util common nodes function qcom
+ PRIVATE os util common nodes function qcom vnode
)
if(${BUILD_TEST})
diff --git a/source/libs/scalar/inc/sclInt.h b/source/libs/scalar/inc/sclInt.h
index 1e7d1a4cbf..9257d2c0d4 100644
--- a/source/libs/scalar/inc/sclInt.h
+++ b/source/libs/scalar/inc/sclInt.h
@@ -26,6 +26,7 @@ typedef struct SScalarCtx {
int32_t code;
SArray *pBlockList; /* element is SSDataBlock* */
SHashObj *pRes; /* element is SScalarParam */
+ void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
} SScalarCtx;
diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c
index d7cac69d26..9843f0ac91 100644
--- a/source/libs/scalar/src/scalar.c
+++ b/source/libs/scalar/src/scalar.c
@@ -265,6 +265,7 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
*rowNum = param->numOfRows;
}
+ param->param = ctx->param;
return TSDB_CODE_SUCCESS;
}
@@ -909,7 +910,7 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
}
int32_t code = 0;
- SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList};
+ SScalarCtx ctx = {.code = 0, .pBlockList = pBlockList, .param = pDst->param};
// TODO: OPT performance
ctx.pRes = taosHashInit(SCL_DEFAULT_OP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c
index 0161323e37..d4a88622e2 100644
--- a/source/libs/scalar/src/sclfunc.c
+++ b/source/libs/scalar/src/sclfunc.c
@@ -1,10 +1,11 @@
#include "function.h"
#include "scalar.h"
-#include "tdatablock.h"
-#include "ttime.h"
#include "sclInt.h"
#include "sclvector.h"
+#include "tdatablock.h"
#include "tjson.h"
+#include "ttime.h"
+#include "vnode.h"
typedef float (*_float_fn)(float);
typedef double (*_double_fn)(double);
@@ -1512,6 +1513,21 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
ASSERT(inputNum == 1);
- colDataAppend(pOutput->columnData, pOutput->numOfRows, colDataGetData(pInput->columnData, 0), false);
+
+ SMetaReader mr = {0};
+ metaReaderInit(&mr, pInput->param, 0);
+
+ uint64_t uid = *(uint64_t *)colDataGetData(pInput->columnData, 0);
+ metaGetTableEntryByUid(&mr, uid);
+
+ char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
+ STR_TO_VARSTR(str, mr.me.name);
+ metaReaderClear(&mr);
+
+ for(int32_t i = 0; i < pInput->numOfRows; ++i) {
+ colDataAppend(pOutput->columnData, pOutput->numOfRows + i, str, false);
+ }
+
+ pOutput->numOfRows += pInput->numOfRows;
return TSDB_CODE_SUCCESS;
}