Merge pull request #11510 from taosdata/feature/tag_filter_oper

enh: add tag filter operator
This commit is contained in:
Yihao Deng 2022-04-16 09:32:08 +08:00 committed by GitHub
commit a264bfc6d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 479 additions and 228 deletions

View File

@ -20,9 +20,9 @@ extern "C" {
#endif
#include "os.h"
#include "tsort.h"
#include "tcommon.h"
#include "tlosertree.h"
#include "tsort.h"
#include "ttszip.h"
#include "tvariant.h"
@ -35,8 +35,8 @@ extern "C" {
#include "tarray.h"
#include "thash.h"
#include "tlockfree.h"
#include "tpagedbuf.h"
#include "tmsg.h"
#include "tpagedbuf.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
@ -186,8 +186,10 @@ struct SOperatorInfo;
struct SAggSupporter;
struct SOptrBasicInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length);
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup,
struct SOptrBasicInfo* pInfo, char** result, int32_t* length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup,
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
@ -217,6 +219,7 @@ typedef struct SExecTaskInfo {
} SExecTaskInfo;
typedef struct STaskRuntimeEnv {
jmp_buf env;
STaskAttr* pQueryAttr;
uint32_t status; // query status
@ -342,6 +345,7 @@ typedef struct STableScanInfo {
int32_t numOfOutput;
int64_t elapsedTime;
int32_t prevGroupId; // previous table group id
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
} STableScanInfo;
@ -542,6 +546,7 @@ typedef struct SStateWindowOperatorInfo {
} SStateWindowOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo;
SArray* pSortInfo;
int32_t numOfSources;
@ -574,6 +579,10 @@ typedef struct SSortOperatorInfo {
uint64_t totalElapsed; // total elapsed time
} SSortOperatorInfo;
typedef struct STagFilterOperatorInfo {
SOptrBasicInfo binfo;
} STagFilterOperatorInfo;
typedef struct SJoinOperatorInfo {
SSDataBlock *pRes;
int32_t joinType;
@ -597,19 +606,24 @@ void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows,
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs,
uint64_t* total, SArray* pColList);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
@ -620,23 +634,34 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
SNode* pCondition, SEpSet epset, SArray* colList,
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
@ -649,7 +674,8 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput);
#endif
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList);
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
@ -673,11 +699,15 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model);
int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum);
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length);
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result,
int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length);
#ifdef __cplusplus
}

View File

@ -0,0 +1,23 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "tglobal.h"
typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus;
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode);
// construct tag filter operator later
int32_t doFilterTag(const SNode *pFilterNode, SArray *resutl);

View File

@ -0,0 +1,198 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "indexoperator.h"
#include "executorimpl.h"
#include "nodes.h"
typedef struct SIFCtx {
int32_t code;
SHashObj *pRes; /* element is SScalarParam */
} SIFCtx;
typedef struct SIFParam {
SArray * result;
SHashObj *pFilter;
} SIFParam;
// construct tag filter operator later
static void destroyTagFilterOperatorInfo(void *param) {
STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
}
static void sifFreeParam(SIFParam *param) {
if (param == NULL) return;
taosArrayDestroy(param->result);
}
int32_t sifInitOperParams(SIFParam *params, SOperatorNode *node, SIFCtx *ctx) {
int32_t code = 0;
return code;
}
static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *output) {
qError("index-filter not support buildin function");
return TSDB_CODE_SUCCESS;
}
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
SIFParam *params = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *output) { return TSDB_CODE_SUCCESS; }
static EDealRes sifWalkFunction(SNode *pNode, void *context) {
// impl later
SFunctionNode *node = (SFunctionNode *)pNode;
SIFParam output = {0};
SIFCtx *ctx = context;
ctx->code = sifExecFunction(node, ctx, &output);
if (ctx->code != TSDB_CODE_SUCCESS) {
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SLogicConditionNode *node = (SLogicConditionNode *)pNode;
SIFParam output = {0};
SIFCtx *ctx = context;
ctx->code = sifExecLogic(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
static EDealRes sifWalkOper(SNode *pNode, void *context) {
SOperatorNode *node = (SOperatorNode *)pNode;
SIFParam output = {0};
SIFCtx *ctx = context;
ctx->code = sifExecOper(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
EDealRes sifCalcWalker(SNode *node, void *context) {
if (QUERY_NODE_VALUE == nodeType(node) || QUERY_NODE_NODE_LIST == nodeType(node) ||
QUERY_NODE_COLUMN == nodeType(node)) {
return DEAL_RES_CONTINUE;
}
SIFCtx *ctx = (SIFCtx *)context;
if (QUERY_NODE_FUNCTION == nodeType(node)) {
return sifWalkFunction(node, ctx);
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(node)) {
return sifWalkLogic(node, ctx);
}
if (QUERY_NODE_OPERATOR == nodeType(node)) {
return sifWalkOper(node, ctx);
}
qError("invalid node type for index filter calculating, type:%d", nodeType(node));
ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
}
void sifFreeRes(SHashObj *res) {
void *pIter = taosHashIterate(res, NULL);
while (pIter) {
SIFParam *p = pIter;
if (p) {
sifFreeParam(p);
}
pIter = taosHashIterate(res, pIter);
}
taosHashCleanup(res);
}
static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
if (pNode == NULL || pDst == NULL) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SIFCtx ctx = {.code = 0};
ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
qError("index-filter failed to taosHashInit");
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
if (ctx.code != TSDB_CODE_SUCCESS) {
return ctx.code;
}
if (pDst) {
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
if (res == NULL) {
qError("no valid res in hash, node:(%p), type(%d)", (void *)&pNode, nodeType(pNode));
return TSDB_CODE_QRY_APP_ERROR;
}
taosArrayAddAll(pDst->result, res->result);
sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
}
return TSDB_CODE_SUCCESS;
}
int32_t doFilterTag(const SNode *pFilterNode, SArray *result) {
if (pFilterNode == NULL) {
return TSDB_CODE_SUCCESS;
}
SFilterInfo *filter = NULL;
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode *)pFilterNode, &filter, 0);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SIFParam param = {0};
code = sifCalculate((SNode *)pFilterNode, &param);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosArrayAddAll(result, param.result);
sifFreeParam(&param);
return code;
}
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) {
if (pFilterNode == NULL) {
return SFLT_NOT_INDEX;
}
// impl later
return SFLT_ACCURATE_INDEX;
}