fix(query): enable table name query for super table.
This commit is contained in:
parent
8954035a2c
commit
e4f0a0fc67
|
@ -91,6 +91,8 @@ int32_t winDurFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
|
||||||
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t qStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
int32_t qEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
|
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -87,9 +87,7 @@ typedef struct SResultInfo { // TODO refactor
|
||||||
typedef struct STableQueryInfo {
|
typedef struct STableQueryInfo {
|
||||||
TSKEY lastKey; // last check ts, todo remove it later
|
TSKEY lastKey; // last check ts, todo remove it later
|
||||||
SResultRowPosition pos; // current active time window
|
SResultRowPosition pos; // current active time window
|
||||||
// int32_t groupIndex; // group id in table list
|
|
||||||
// SVariant tag;
|
// SVariant tag;
|
||||||
// SResultRowInfo resInfo; // result info
|
|
||||||
} STableQueryInfo;
|
} STableQueryInfo;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -365,9 +363,10 @@ typedef struct STableScanInfo {
|
||||||
typedef struct STagScanInfo {
|
typedef struct STagScanInfo {
|
||||||
SColumnInfo *pCols;
|
SColumnInfo *pCols;
|
||||||
SSDataBlock *pRes;
|
SSDataBlock *pRes;
|
||||||
int32_t totalTables;
|
SArray *pColMatchInfo;
|
||||||
int32_t curPos;
|
int32_t curPos;
|
||||||
void *pReader;
|
SReadHandle readHandle;
|
||||||
|
STableGroupInfo *pTableGroups;
|
||||||
} STagScanInfo;
|
} STagScanInfo;
|
||||||
|
|
||||||
typedef struct SStreamBlockScanInfo {
|
typedef struct SStreamBlockScanInfo {
|
||||||
|
@ -704,7 +703,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createTagScanOperatorInfo(void* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||||
|
@ -717,7 +716,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, bool createDummyCol);
|
||||||
|
|
||||||
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
|
||||||
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
||||||
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
|
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
|
||||||
|
|
|
@ -4739,7 +4739,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
SQueryTableDataCond cond = {0};
|
SQueryTableDataCond cond = {0};
|
||||||
|
|
||||||
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
int32_t code = initQueryTableDataCond(&cond, pTableScanNode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -4783,6 +4782,25 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
||||||
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||||
|
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*) pPhyNode;
|
||||||
|
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
|
int32_t code =
|
||||||
|
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t num = 0;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
|
||||||
|
|
||||||
|
int32_t numOfOutputCols = 0;
|
||||||
|
SArray* colList =
|
||||||
|
extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
||||||
|
|
||||||
|
SOperatorInfo* pOperator = createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableGroupInfo, pTaskInfo);
|
||||||
|
return pOperator;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -5166,9 +5184,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle*
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
#if 0
|
|
||||||
return tsdbQueryTables(pHandle->reader, &cond, pTableGroupInfo, queryId, taskId);
|
|
||||||
#endif
|
|
||||||
return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId);
|
return tsdbQueryTables(pHandle->vnode, &cond, pTableGroupInfo, queryId, taskId);
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
|
|
@ -13,15 +13,16 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "ttime.h"
|
#include <libs/function/function.h>
|
||||||
#include "filter.h"
|
#include "filter.h"
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "querynodes.h"
|
#include "querynodes.h"
|
||||||
|
#include "systable.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
#include "systable.h"
|
#include "ttime.h"
|
||||||
|
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
@ -1159,16 +1160,17 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
#if 0
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
|
int32_t maxNumOfTables = (int32_t)pResultInfo->capacity;
|
||||||
|
|
||||||
STagScanInfo *pInfo = pOperator->info;
|
STagScanInfo *pInfo = pOperator->info;
|
||||||
SSDataBlock *pRes = pInfo->pRes;
|
SSDataBlock *pRes = pInfo->pRes;
|
||||||
*newgroup = false;
|
|
||||||
|
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||||
|
@ -1237,55 +1239,54 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
|
//qDebug("QInfo:0x%"PRIx64" create count(tbname) query, res:%d rows:1", GET_TASKID(pRuntimeEnv), count);
|
||||||
} else { // return only the tags|table name etc.
|
} else { // return only the tags|table name etc.
|
||||||
SExprInfo* pExprInfo = &pOperator->pExpr[0]; // todo use the column list instead of exprinfo
|
#endif
|
||||||
|
|
||||||
count = 0;
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
SExprInfo* pExprInfo = &pOperator->pExpr[0];
|
||||||
int32_t i = pInfo->curPos++;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
|
|
||||||
STableQueryInfo* item = taosArrayGetP(pa, i);
|
SArray* pa = taosArrayGetP(pInfo->pTableGroups->pGroupList, 0);
|
||||||
|
|
||||||
|
char str[512] = {0};
|
||||||
|
int32_t count = 0;
|
||||||
|
SMetaReader mr = {0};
|
||||||
|
|
||||||
|
while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
|
||||||
|
STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
|
||||||
|
|
||||||
char *data = NULL, *dst = NULL;
|
|
||||||
int16_t type = 0, bytes = 0;
|
|
||||||
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
|
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
|
||||||
// not assign value in case of user defined constant output column
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||||
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j);
|
// refactor later
|
||||||
type = pExprInfo[j].base.resSchema.type;
|
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
||||||
bytes = pExprInfo[j].base.resSchema.bytes;
|
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
||||||
|
metaGetTableEntryByUid(&mr, item->uid);
|
||||||
|
|
||||||
if (pExprInfo[j].base.pColumns->info.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
STR_TO_VARSTR(str, mr.me.name);
|
||||||
data = tsdbGetTableName(item->pTable);
|
metaReaderClear(&mr);
|
||||||
} else {
|
|
||||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
colDataAppend(pDst, count, str, false);
|
||||||
doSetTagValueToResultBuf(dst, data, type, bytes);
|
|
||||||
|
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
||||||
|
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
||||||
|
// doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->curPos >= pInfo->totalTables) {
|
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||||
}
|
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
setTaskStatus(pOperator->pRuntimeEnv, TASK_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes->info.rows = count;
|
pRes->info.rows = count;
|
||||||
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
|
||||||
#endif
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
@ -1293,14 +1294,18 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
|
||||||
|
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pReader = readHandle;
|
pInfo->pTableGroups = pTableGroupInfo;
|
||||||
|
pInfo->pColMatchInfo = pColMatchInfo;
|
||||||
|
pInfo->pRes = pResBlock;
|
||||||
|
pInfo->readHandle = *pReadHandle;
|
||||||
pInfo->curPos = 0;
|
pInfo->curPos = 0;
|
||||||
pOperator->name = "TagScanOperator";
|
pOperator->name = "TagScanOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
|
||||||
|
@ -1311,6 +1316,9 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int
|
||||||
pOperator->numOfExprs = numOfOutput;
|
pOperator->numOfExprs = numOfOutput;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
|
|
|
@ -917,7 +917,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.translateFunc = translateTbnameColumn,
|
.translateFunc = translateTbnameColumn,
|
||||||
.getEnvFunc = NULL,
|
.getEnvFunc = NULL,
|
||||||
.initFunc = NULL,
|
.initFunc = NULL,
|
||||||
.sprocessFunc = NULL,
|
.sprocessFunc = qTbnameFunction,
|
||||||
.finalizeFunc = NULL
|
.finalizeFunc = NULL
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -1504,3 +1504,9 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
|
colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t*) colDataGetData(pInput->columnData, 4));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
|
ASSERT(inputNum == 1);
|
||||||
|
colDataAppend(pOutput->columnData, pOutput->numOfRows, colDataGetData(pInput->columnData, 0), false);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue