diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h
index 0c7db45c4b..555274599a 100644
--- a/include/libs/scalar/scalar.h
+++ b/include/libs/scalar/scalar.h
@@ -91,6 +91,8 @@ int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
+int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index bc6139c304..eefa38d802 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -87,9 +87,7 @@ typedef struct SResultInfo { // TODO refactor
typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window
-// int32_t groupIndex; // group id in table list
// SVariant tag;
-// SResultRowInfo resInfo; // result info
} STableQueryInfo;
typedef enum {
@@ -363,11 +361,12 @@ typedef struct STableScanInfo {
} STableScanInfo;
typedef struct STagScanInfo {
- SColumnInfo *pCols;
- SSDataBlock *pRes;
- int32_t totalTables;
- int32_t curPos;
- void *pReader;
+ SColumnInfo *pCols;
+ SSDataBlock *pRes;
+ SArray *pColMatchInfo;
+ int32_t curPos;
+ SReadHandle readHandle;
+ STableGroupInfo *pTableGroups;
} STagScanInfo;
typedef struct SStreamBlockScanInfo {
@@ -704,7 +703,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
-SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
+SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
@@ -717,7 +716,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
-void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index a237eb0e7d..9aa251e1b6 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -4739,7 +4739,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
SQueryTableDataCond cond = {0};
-
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
@@ -4783,6 +4782,25 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
return pOperator;
+ } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
+ STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode;
+ SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
+
+ int32_t code =
+ doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
+ if (code != TSDB_CODE_SUCCESS) {
+ return NULL;
+ }
+
+ int32_t num = 0;
+ SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
+
+ int32_t numOfOutputCols = 0;
+ SArray* colList =
+ extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
+
+ SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
+ return pOperator;
} else {
ASSERT(0);
}
@@ -5088,7 +5106,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
SColMatchInfo c = {0};
c.output = true;
- c.colId = pColNode->colId;
+ c.colId = pColNode->colId;
c.targetSlotId = pNode->slotId;
taosArrayPush(pList, &c);
}
@@ -5166,9 +5184,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
-#if 0
- return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId);
-#endif
+
return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId);
_error:
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 8c9fdfe4e6..b728daa3bb 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -13,15 +13,16 @@
* along with this program. If not, see .
*/
-#include "ttime.h"
+#include
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
+#include "systable.h"
#include "tglobal.h"
#include "tname.h"
-#include "systable.h"
+#include "ttime.h"
#include "tdatablock.h"
#include "tmsg.h"
@@ -1159,16 +1160,17 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
-#if 0
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
+ SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+
+#if 0
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
STagScanInfo *pInfo = pOperator->info;
SSDataBlock *pRes = pInfo->pRes;
- *newgroup = false;
int32_t count = 0;
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
@@ -1237,55 +1239,54 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
} else { // return only the tags|table name etc.
- SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
+#endif
- count = 0;
- while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
- int32_t i = pInfo->curPos++;
+ STagScanInfo* pInfo = pOperator->info;
+ SExprInfo* pExprInfo = &pOperator->pExpr[0];
+ SSDataBlock* pRes = pInfo->pRes;
- STableQueryInfo* item = taosArrayGetP(pa, i);
+ SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0);
- char *data = NULL, *dst = NULL;
- int16_t type = 0, bytes = 0;
- for(int32_t j = 0; j < pOperator->numOfExprs; ++j) {
- // not assign value in case of user defined constant output column
- if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
- continue;
- }
+ char str[512] = {0};
+ int32_t count = 0;
+ SMetaReader mr = {0};
- SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
- type = pExprInfo[j].base.resSchema.type;
- bytes = pExprInfo[j].base.resSchema.bytes;
+ while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
+ STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
- if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
- data = tsdbGetTableName(item->pTable);
- } else {
- data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
- }
+ for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
+ SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
- dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
- doSetTagValueToResultBuf(dst, data, type, bytes);
+ // refactor later
+ if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
+ metaReaderInit(&mr, pInfo->readHandle.meta, 0);
+ metaGetTableEntryByUid(&mr, item->uid);
+
+ STR_TO_VARSTR(str, mr.me.name);
+ metaReaderClear(&mr);
+
+ colDataAppend(pDst, count, str, false);
+
+ // data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
+ // dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
+ // doSetTagValueToResultBuf(dst, data, type, bytes);
}
count += 1;
}
- if (pInfo->curPos >= pInfo->totalTables) {
+ if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
pOperator->status = OP_EXEC_DONE;
}
-
- //qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
}
+ // qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
if (pOperator->status == OP_EXEC_DONE) {
- setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED);
+ setTaskStatus(pTaskInfo, TASK_COMPLETED);
}
pRes->info.rows = count;
- return (pRes->info.rows == 0)? NULL:pInfo->pRes;
-
-#endif
- return TSDB_CODE_SUCCESS;
+ return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
@@ -1293,14 +1294,18 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
-SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
+SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
+ SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
- pInfo->pReader = readHandle;
+ pInfo->pTableGroups = pTableGroupInfo;
+ pInfo->pColMatchInfo = pColMatchInfo;
+ pInfo->pRes = pResBlock;
+ pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
@@ -1308,9 +1313,12 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
- pOperator->numOfExprs = numOfOutput;
+ pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
+ initResultSizeInfo(pOperator, 4096);
+ blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
+
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c
index 38922833f9..eac11558cb 100644
--- a/source/libs/function/src/builtins.c
+++ b/source/libs/function/src/builtins.c
@@ -917,7 +917,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateTbnameColumn,
.getEnvFunc = NULL,
.initFunc = NULL,
- .sprocessFunc = NULL,
+ .sprocessFunc = qTbnameFunction,
.finalizeFunc = NULL
},
{
diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c
index a507b41342..4df6148a6e 100644
--- a/source/libs/scalar/src/sclfunc.c
+++ b/source/libs/scalar/src/sclfunc.c
@@ -1504,3 +1504,9 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
return TSDB_CODE_SUCCESS;
}
+
+int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
+ ASSERT(inputNum == 1);
+ colDataAppend(pOutput->columnData, pOutput->numOfRows, colDataGetData(pInput->columnData, 0), false);
+ return TSDB_CODE_SUCCESS;
+}