refactor: do some internal refactor.
This commit is contained in:
parent
2cf8e30241
commit
f2a27f5842
|
@ -158,10 +158,10 @@ typedef struct tExprNode {
|
||||||
int32_t nodeType;
|
int32_t nodeType;
|
||||||
union {
|
union {
|
||||||
struct {// function node
|
struct {// function node
|
||||||
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
|
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
|
||||||
int32_t functionId;
|
int32_t functionId;
|
||||||
int32_t num;
|
int32_t num;
|
||||||
struct SFunctionNode *pFunctNode;
|
struct SFunctionNode *pFunctNode;
|
||||||
} _function;
|
} _function;
|
||||||
|
|
||||||
struct {
|
struct {
|
||||||
|
|
|
@ -49,8 +49,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
|
||||||
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
|
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
|
||||||
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
|
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
|
||||||
|
|
||||||
//#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
|
|
||||||
|
|
||||||
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
|
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -64,11 +62,6 @@ enum {
|
||||||
TASK_COMPLETED = 0x2u,
|
TASK_COMPLETED = 0x2u,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SResultRowCell {
|
|
||||||
uint64_t groupId;
|
|
||||||
SResultRowPosition pos;
|
|
||||||
} SResultRowCell;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If the number of generated results is greater than this value,
|
* If the number of generated results is greater than this value,
|
||||||
* query query will be halt and return results to client immediate.
|
* query query will be halt and return results to client immediate.
|
||||||
|
@ -122,35 +115,6 @@ typedef struct SOperatorCostInfo {
|
||||||
double totalCost;
|
double totalCost;
|
||||||
} SOperatorCostInfo;
|
} SOperatorCostInfo;
|
||||||
|
|
||||||
// The basic query information extracted from the SQueryInfo tree to support the
|
|
||||||
// execution of query in a data node.
|
|
||||||
typedef struct STaskAttr {
|
|
||||||
SLimit limit;
|
|
||||||
SLimit slimit;
|
|
||||||
bool stableQuery; // super table query or not
|
|
||||||
bool tsCompQuery; // is tscomp query
|
|
||||||
bool diffQuery; // is diff query
|
|
||||||
bool pointInterpQuery; // point interpolation query
|
|
||||||
int32_t havingNum; // having expr number
|
|
||||||
int16_t numOfCols;
|
|
||||||
int16_t numOfTags;
|
|
||||||
STimeWindow window;
|
|
||||||
SInterval interval;
|
|
||||||
int16_t precision;
|
|
||||||
int16_t numOfOutput;
|
|
||||||
int16_t fillType;
|
|
||||||
int32_t resultRowSize;
|
|
||||||
int32_t tagLen; // tag value length of current query
|
|
||||||
|
|
||||||
SExprInfo* pExpr1;
|
|
||||||
SColumnInfo* tagColList;
|
|
||||||
int32_t numOfFilterCols;
|
|
||||||
int64_t* fillVal;
|
|
||||||
void* tsdb;
|
|
||||||
// STableListInfo tableGroupInfo; // table list
|
|
||||||
int32_t vgId;
|
|
||||||
} STaskAttr;
|
|
||||||
|
|
||||||
struct SOperatorInfo;
|
struct SOperatorInfo;
|
||||||
|
|
||||||
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
|
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
|
||||||
|
@ -189,21 +153,6 @@ typedef struct SExecTaskInfo {
|
||||||
struct SOperatorInfo* pRoot;
|
struct SOperatorInfo* pRoot;
|
||||||
} SExecTaskInfo;
|
} SExecTaskInfo;
|
||||||
|
|
||||||
typedef struct STaskRuntimeEnv {
|
|
||||||
STaskAttr* pQueryAttr;
|
|
||||||
uint32_t status; // query status
|
|
||||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
|
||||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
|
||||||
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
|
||||||
char* keyBuf; // window key buffer
|
|
||||||
STSCursor cur;
|
|
||||||
char* tagVal; // tag value of current data block
|
|
||||||
struct SOperatorInfo* proot;
|
|
||||||
int64_t currentOffset; // dynamic offset value
|
|
||||||
STableQueryInfo* current;
|
|
||||||
SResultInfo resultInfo;
|
|
||||||
} STaskRuntimeEnv;
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
OP_NOT_OPENED = 0x0,
|
OP_NOT_OPENED = 0x0,
|
||||||
OP_OPENED = 0x1,
|
OP_OPENED = 0x1,
|
||||||
|
@ -222,14 +171,20 @@ typedef struct SOperatorFpSet {
|
||||||
__optr_explain_fn_t getExplainFn;
|
__optr_explain_fn_t getExplainFn;
|
||||||
} SOperatorFpSet;
|
} SOperatorFpSet;
|
||||||
|
|
||||||
|
typedef struct SExprSupp {
|
||||||
|
SExprInfo* pExprInfo;
|
||||||
|
int32_t numOfExprs; // the number of scalar expression in group operator
|
||||||
|
SqlFunctionCtx* pCtx;
|
||||||
|
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
|
||||||
|
} SExprSupp;
|
||||||
|
|
||||||
typedef struct SOperatorInfo {
|
typedef struct SOperatorInfo {
|
||||||
uint8_t operatorType;
|
uint8_t operatorType;
|
||||||
bool blocking; // block operator or not
|
bool blocking; // block operator or not
|
||||||
uint8_t status; // denote if current operator is completed
|
uint8_t status; // denote if current operator is completed
|
||||||
int32_t numOfExprs; // number of columns of the current operator results
|
|
||||||
char* name; // name, for debug purpose
|
char* name; // name, for debug purpose
|
||||||
void* info; // extension attribution
|
void* info; // extension attribution
|
||||||
SExprInfo* pExpr;
|
SExprSupp exprSupp;
|
||||||
SExecTaskInfo* pTaskInfo;
|
SExecTaskInfo* pTaskInfo;
|
||||||
SOperatorCostInfo cost;
|
SOperatorCostInfo cost;
|
||||||
SResultInfo resultInfo;
|
SResultInfo resultInfo;
|
||||||
|
@ -244,6 +199,9 @@ typedef enum {
|
||||||
EX_SOURCE_DATA_EXHAUSTED = 0x3,
|
EX_SOURCE_DATA_EXHAUSTED = 0x3,
|
||||||
} EX_SOURCE_STATUS;
|
} EX_SOURCE_STATUS;
|
||||||
|
|
||||||
|
#define COL_MATCH_FROM_COL_ID 0x1
|
||||||
|
#define COL_MATCH_FROM_SLOT_ID 0x2
|
||||||
|
|
||||||
typedef struct SSourceDataInfo {
|
typedef struct SSourceDataInfo {
|
||||||
int32_t index;
|
int32_t index;
|
||||||
SRetrieveTableRsp* pRsp;
|
SRetrieveTableRsp* pRsp;
|
||||||
|
@ -271,9 +229,6 @@ typedef struct SExchangeInfo {
|
||||||
uint64_t self;
|
uint64_t self;
|
||||||
} SExchangeInfo;
|
} SExchangeInfo;
|
||||||
|
|
||||||
#define COL_MATCH_FROM_COL_ID 0x1
|
|
||||||
#define COL_MATCH_FROM_SLOT_ID 0x2
|
|
||||||
|
|
||||||
typedef struct SColMatchInfo {
|
typedef struct SColMatchInfo {
|
||||||
int32_t srcSlotId; // source slot id
|
int32_t srcSlotId; // source slot id
|
||||||
int32_t colId;
|
int32_t colId;
|
||||||
|
@ -283,8 +238,8 @@ typedef struct SColMatchInfo {
|
||||||
} SColMatchInfo;
|
} SColMatchInfo;
|
||||||
|
|
||||||
typedef struct SScanInfo {
|
typedef struct SScanInfo {
|
||||||
int32_t numOfAsc;
|
int32_t numOfAsc;
|
||||||
int32_t numOfDesc;
|
int32_t numOfDesc;
|
||||||
} SScanInfo;
|
} SScanInfo;
|
||||||
|
|
||||||
typedef struct SSampleExecInfo {
|
typedef struct SSampleExecInfo {
|
||||||
|
@ -292,13 +247,6 @@ typedef struct SSampleExecInfo {
|
||||||
uint32_t seed; // random seed value
|
uint32_t seed; // random seed value
|
||||||
} SSampleExecInfo;
|
} SSampleExecInfo;
|
||||||
|
|
||||||
typedef struct SExecSupp {
|
|
||||||
SExprInfo* pExprInfo;
|
|
||||||
int32_t numOfExprs; // the number of scalar expression in group operator
|
|
||||||
SqlFunctionCtx* pCtx;
|
|
||||||
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
|
|
||||||
} SExecSupp;
|
|
||||||
|
|
||||||
typedef struct STableScanInfo {
|
typedef struct STableScanInfo {
|
||||||
void* dataReader;
|
void* dataReader;
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
|
@ -317,7 +265,7 @@ typedef struct STableScanInfo {
|
||||||
SArray* pColMatchInfo;
|
SArray* pColMatchInfo;
|
||||||
int32_t numOfOutput;
|
int32_t numOfOutput;
|
||||||
|
|
||||||
SExecSupp pseudoSup;
|
SExprSupp pseudoSup;
|
||||||
SQueryTableDataCond cond;
|
SQueryTableDataCond cond;
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
|
@ -426,12 +374,9 @@ typedef struct SBlockDistInfo {
|
||||||
// todo remove this
|
// todo remove this
|
||||||
typedef struct SOptrBasicInfo {
|
typedef struct SOptrBasicInfo {
|
||||||
SResultRowInfo resultRowInfo;
|
SResultRowInfo resultRowInfo;
|
||||||
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
|
|
||||||
SqlFunctionCtx* pCtx;
|
|
||||||
SSDataBlock* pRes;
|
SSDataBlock* pRes;
|
||||||
} SOptrBasicInfo;
|
} SOptrBasicInfo;
|
||||||
|
|
||||||
// TODO move the resultrowsiz together with SOptrBasicInfo:rowEntryInfoOffset
|
|
||||||
typedef struct SAggSupporter {
|
typedef struct SAggSupporter {
|
||||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||||
char* keyBuf; // window key buffer
|
char* keyBuf; // window key buffer
|
||||||
|
@ -516,7 +461,7 @@ typedef struct SIndefOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
SAggSupporter aggSup;
|
SAggSupporter aggSup;
|
||||||
SArray* pPseudoColInfo;
|
SArray* pPseudoColInfo;
|
||||||
SExecSupp scalarSup;
|
SExprSupp scalarSup;
|
||||||
} SIndefOperatorInfo;
|
} SIndefOperatorInfo;
|
||||||
|
|
||||||
typedef struct SFillOperatorInfo {
|
typedef struct SFillOperatorInfo {
|
||||||
|
@ -540,7 +485,7 @@ typedef struct SGroupbyOperatorInfo {
|
||||||
char* keyBuf; // group by keys for hash
|
char* keyBuf; // group by keys for hash
|
||||||
int32_t groupKeyLen; // total group by column width
|
int32_t groupKeyLen; // total group by column width
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
SExecSupp scalarSup;
|
SExprSupp scalarSup;
|
||||||
} SGroupbyOperatorInfo;
|
} SGroupbyOperatorInfo;
|
||||||
|
|
||||||
typedef struct SDataGroupInfo {
|
typedef struct SDataGroupInfo {
|
||||||
|
@ -564,7 +509,7 @@ typedef struct SPartitionOperatorInfo {
|
||||||
void* pGroupIter; // group iterator
|
void* pGroupIter; // group iterator
|
||||||
int32_t pageIndex; // page index of current group
|
int32_t pageIndex; // page index of current group
|
||||||
SSDataBlock* pUpdateRes;
|
SSDataBlock* pUpdateRes;
|
||||||
SExecSupp scalarSup;
|
SExprSupp scalarSup;
|
||||||
} SPartitionOperatorInfo;
|
} SPartitionOperatorInfo;
|
||||||
|
|
||||||
typedef struct SWindowRowsSup {
|
typedef struct SWindowRowsSup {
|
||||||
|
@ -717,7 +662,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
|
||||||
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
||||||
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
void operatorDummyCloseFn(void* param, int32_t numOfCols);
|
||||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
||||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
|
||||||
|
@ -741,7 +686,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||||
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
|
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
|
||||||
|
|
||||||
void cleanupExecSupp(SExecSupp* pSupp);
|
void cleanupExecSupp(SExprSupp* pSupp);
|
||||||
|
|
||||||
SSDataBlock* loadNextDataBlock(void* param);
|
SSDataBlock* loadNextDataBlock(void* param);
|
||||||
|
|
||||||
|
|
|
@ -143,9 +143,8 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock,
|
||||||
SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId);
|
||||||
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId,
|
|
||||||
SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
// setup the output buffer for each operator
|
// setup the output buffer for each operator
|
||||||
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
||||||
|
@ -161,6 +160,7 @@ static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
|
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
|
||||||
int16_t bytes, bool masterscan, uint64_t uid) {
|
int16_t bytes, bool masterscan, uint64_t uid) {
|
||||||
bool existed = false;
|
bool existed = false;
|
||||||
|
@ -197,6 +197,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
|
||||||
|
|
||||||
return p1 != NULL;
|
return p1 != NULL;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
|
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
|
||||||
SFilePage* pData = NULL;
|
SFilePage* pData = NULL;
|
||||||
|
@ -305,36 +306,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
return pResult;
|
return pResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the correct time window according to the handled timestamp
|
|
||||||
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo* pResultRowInfo, int64_t ts, STaskAttr* pQueryAttr) {
|
|
||||||
STimeWindow w = {0};
|
|
||||||
#if 0
|
|
||||||
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
|
|
||||||
// getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
|
||||||
|
|
||||||
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
|
|
||||||
w.ekey =
|
|
||||||
taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) -
|
|
||||||
1;
|
|
||||||
} else {
|
|
||||||
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
w = pRow->win;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* query border check, skey should not be bounded by the query time range, since the value skey will
|
|
||||||
* be used as the time window index value. So we only change ekey of time window accordingly.
|
|
||||||
*/
|
|
||||||
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
|
||||||
w.ekey = pQueryAttr->window.ekey;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return w;
|
|
||||||
}
|
|
||||||
|
|
||||||
// a new buffer page for each table. Needs to opt this design
|
// a new buffer page for each table. Needs to opt this design
|
||||||
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
|
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
|
||||||
if (pWindowRes->pageId != -1) {
|
if (pWindowRes->pageId != -1) {
|
||||||
|
@ -382,13 +353,6 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, STimeWindow* win,
|
|
||||||
bool masterscan, SResultRow** pResult, int64_t groupId, SqlFunctionCtx* pCtx,
|
|
||||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
|
|
||||||
assert(win->skey <= win->ekey);
|
|
||||||
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// query_range_start, query_range_end, window_duration, window_start, window_end
|
// query_range_start, query_range_end, window_duration, window_start, window_end
|
||||||
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
|
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
|
||||||
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
@ -457,33 +421,15 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* pWindow) {
|
|
||||||
TSKEY ekey = -1;
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
|
||||||
if (order == TSDB_ORDER_ASC) {
|
|
||||||
ekey = pWindow->ekey;
|
|
||||||
if (ekey > pQueryAttr->window.ekey) {
|
|
||||||
ekey = pQueryAttr->window.ekey;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ekey = pWindow->skey;
|
|
||||||
if (ekey < pQueryAttr->window.ekey) {
|
|
||||||
ekey = pQueryAttr->window.ekey;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ekey;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
|
||||||
int32_t scanFlag, bool createDummyCol);
|
int32_t scanFlag, bool createDummyCol);
|
||||||
|
|
||||||
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
|
||||||
int32_t order) {
|
int32_t order) {
|
||||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
pCtx[i].order = order;
|
pCtx[i].order = order;
|
||||||
pCtx[i].input.numOfRows = pBlock->info.rows;
|
pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||||
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
|
setBlockStatisInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -542,7 +488,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
int32_t scanFlag, bool createDummyCol) {
|
int32_t scanFlag, bool createDummyCol) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
pCtx[i].order = order;
|
pCtx[i].order = order;
|
||||||
pCtx[i].input.numOfRows = pBlock->info.rows;
|
pCtx[i].input.numOfRows = pBlock->info.rows;
|
||||||
|
|
||||||
|
@ -553,7 +499,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
pInput->uid = pBlock->info.uid;
|
pInput->uid = pBlock->info.uid;
|
||||||
pInput->colDataAggIsSet = false;
|
pInput->colDataAggIsSet = false;
|
||||||
|
|
||||||
SExprInfo* pOneExpr = &pOperator->pExpr[i];
|
SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
|
||||||
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
|
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
|
||||||
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
|
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
|
||||||
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
|
||||||
|
@ -589,7 +535,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
|
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
|
||||||
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
|
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
|
||||||
if (functionNeedToExecute(&pCtx[k])) {
|
if (functionNeedToExecute(&pCtx[k])) {
|
||||||
// todo add a dummy funtion to avoid process check
|
// todo add a dummy funtion to avoid process check
|
||||||
if (pCtx[k].fpSet.process == NULL) {
|
if (pCtx[k].fpSet.process == NULL) {
|
||||||
|
@ -870,32 +816,6 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
|
||||||
|
|
||||||
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
|
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
|
||||||
|
|
||||||
static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
|
|
||||||
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
|
||||||
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
|
|
||||||
// if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
|
||||||
if (order != TSDB_ORDER_DESC || !TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_DESC_INITIALIZER)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pQueryAttr->interval.interval > 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pQueryAttr->numOfFilterCols > 0 || pQueryAttr->havingNum > 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/////////////////////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// todo refactor : return window
|
// todo refactor : return window
|
||||||
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
|
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
|
||||||
|
@ -911,7 +831,9 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t ke
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
||||||
|
|
||||||
bool hasFirstLastFunc = false;
|
bool hasFirstLastFunc = false;
|
||||||
bool hasOtherFunc = false;
|
bool hasOtherFunc = false;
|
||||||
|
|
||||||
|
@ -921,7 +843,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
|
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
|
||||||
#if 0
|
|
||||||
if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
|
if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
|
||||||
functionId == FUNCTION_TAG_DUMMY) {
|
functionId == FUNCTION_TAG_DUMMY) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -932,7 +854,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
||||||
} else {
|
} else {
|
||||||
hasOtherFunc = true;
|
hasOtherFunc = true;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
|
if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
|
||||||
|
@ -946,6 +868,8 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
|
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
|
||||||
// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
||||||
//
|
//
|
||||||
|
@ -1057,7 +981,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
||||||
//
|
//
|
||||||
// return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
|
// return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
|
||||||
// }
|
// }
|
||||||
|
#if 0
|
||||||
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
|
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
|
|
||||||
|
@ -1106,55 +1030,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
|
|
||||||
int32_t len = 0;
|
|
||||||
int32_t start = 0;
|
|
||||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
|
||||||
if (p[j] == 1) {
|
|
||||||
len++;
|
|
||||||
} else {
|
|
||||||
if (len > 0) {
|
|
||||||
int32_t cstart = j - len;
|
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
|
||||||
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
|
|
||||||
int16_t bytes = pColumnInfoData->info.bytes;
|
|
||||||
memmove(((char*)pColumnInfoData->pData) + start * bytes, pColumnInfoData->pData + cstart * bytes,
|
|
||||||
len * bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
start += len;
|
|
||||||
len = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (len > 0) {
|
|
||||||
int32_t cstart = numOfRows - len;
|
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
|
||||||
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
|
|
||||||
int16_t bytes = pColumnInfoData->info.bytes;
|
|
||||||
memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
start += len;
|
|
||||||
len = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pBlock->info.rows = start;
|
|
||||||
pBlock->pBlockAgg = NULL; // clean the block statistics info
|
|
||||||
|
|
||||||
if (start > 0) {
|
|
||||||
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
|
||||||
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
|
|
||||||
pColumnInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
|
||||||
pBlock->info.window.skey = *(int64_t*)pColumnInfoData->pData;
|
|
||||||
pBlock->info.window.ekey = *(int64_t*)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
|
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
|
||||||
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
||||||
|
@ -1356,10 +1232,10 @@ void initResultRow(SResultRow* pResultRow) {
|
||||||
* offset[0] offset[1] offset[2]
|
* offset[0] offset[1] offset[2]
|
||||||
*/
|
*/
|
||||||
// TODO refactor: some function move away
|
// TODO refactor: some function move away
|
||||||
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs,
|
void setFunctionResultOutput(SOperatorInfo *pOperator, SOptrBasicInfo *pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs) {
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SqlFunctionCtx* pCtx = pInfo->pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
int32_t* rowEntryInfoOffset = pInfo->rowEntryInfoOffset;
|
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
||||||
|
|
||||||
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
|
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
|
||||||
initResultRowInfo(pResultRowInfo);
|
initResultRowInfo(pResultRowInfo);
|
||||||
|
@ -1506,14 +1382,12 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId,
|
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId) {
|
||||||
SExecTaskInfo* pTaskInfo) {
|
|
||||||
// for simple group by query without interval, all the tables belong to one group result.
|
// for simple group by query without interval, all the tables belong to one group result.
|
||||||
int64_t uid = 0;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
|
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
|
||||||
SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
int32_t* rowEntryInfoOffset = pAggInfo->binfo.rowEntryInfoOffset;
|
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
||||||
|
|
||||||
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
|
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
|
||||||
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
||||||
|
@ -1534,12 +1408,12 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
|
||||||
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) {
|
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
|
||||||
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
|
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
doSetTableGroupOutputBuf(pAggInfo, numOfOutput, groupId, pTaskInfo);
|
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
|
||||||
|
|
||||||
// record the current active group id
|
// record the current active group id
|
||||||
pAggInfo->groupId = groupId;
|
pAggInfo->groupId = groupId;
|
||||||
|
@ -1691,13 +1565,13 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
|
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
SDiskbasedBuf* pBuf) {
|
SDiskbasedBuf* pBuf) {
|
||||||
SExprInfo* pExprInfo = pOperator->pExpr;
|
SExprInfo* pExprInfo = pOperator->exprSupp.pExprInfo;
|
||||||
int32_t numOfExprs = pOperator->numOfExprs;
|
int32_t numOfExprs = pOperator->exprSupp.numOfExprs;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
int32_t* rowCellOffset = pbInfo->rowEntryInfoOffset;
|
int32_t* rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
|
||||||
SSDataBlock* pBlock = pbInfo->pRes;
|
SSDataBlock* pBlock = pbInfo->pRes;
|
||||||
SqlFunctionCtx* pCtx = pbInfo->pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
if (!hasDataInGroupInfo(pGroupResInfo)) {
|
if (!hasDataInGroupInfo(pGroupResInfo)) {
|
||||||
|
@ -2545,7 +2419,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
|
pOperator->exprSupp.numOfExprs = pInfo->pResult->info.numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
||||||
|
@ -2666,7 +2540,7 @@ static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock
|
||||||
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
|
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
|
||||||
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
// pCtx[i].size = 1;
|
// pCtx[i].size = 1;
|
||||||
}
|
}
|
||||||
|
@ -2681,8 +2555,8 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
|
||||||
doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
|
doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
|
||||||
} else {
|
} else {
|
||||||
doFinalizeResultImpl(pCtx, numOfExpr);
|
doFinalizeResultImpl(pCtx, numOfExpr);
|
||||||
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL);
|
int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
|
||||||
// setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows);
|
// setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
|
||||||
|
|
||||||
// TODO check for available buffer;
|
// TODO check for available buffer;
|
||||||
|
|
||||||
|
@ -2729,16 +2603,16 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
|
||||||
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
|
||||||
// pOperator->pRuntimeEnv, true);
|
// pOperator->pRuntimeEnv, true);
|
||||||
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
|
doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
|
||||||
// flush to tuple store, and after all data have been handled, return to upstream node or sink node
|
// flush to tuple store, and after all data have been handled, return to upstream node or sink node
|
||||||
}
|
}
|
||||||
|
|
||||||
doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfExprs);
|
doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
|
||||||
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL);
|
int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
|
||||||
// setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows);
|
// setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
|
||||||
|
|
||||||
// TODO check for available buffer;
|
// TODO check for available buffer;
|
||||||
|
|
||||||
|
@ -2879,20 +2753,20 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, num, &pInfo->binfo.rowEntryInfoOffset);
|
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, num, &pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) {
|
if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, keyBufSize, pTaskInfo->id.str);
|
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num, pTaskInfo);
|
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num);
|
||||||
code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
|
code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -2911,8 +2785,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = num;
|
pOperator->exprSupp.numOfExprs = num;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
@ -2967,6 +2841,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SAggOperatorInfo* pAggInfo = pOperator->info;
|
SAggOperatorInfo* pAggInfo = pOperator->info;
|
||||||
|
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
|
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
@ -2996,9 +2871,9 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
|
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true);
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
|
||||||
code = doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
code = doAggregateImpl(pOperator, 0, pSup->pCtx);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -3257,6 +3132,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
SProjectOperatorInfo* pProjectInfo = pOperator->info;
|
SProjectOperatorInfo* pProjectInfo = pOperator->info;
|
||||||
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
|
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
|
||||||
|
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
|
|
||||||
|
@ -3274,10 +3150,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
|
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
||||||
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs);
|
projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
|
||||||
if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
|
if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
|
||||||
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs);
|
copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
|
||||||
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfExprs);
|
resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3309,10 +3185,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
doFilter(pProjectInfo->pFilterNode, pBlock);
|
doFilter(pProjectInfo->pFilterNode, pBlock);
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
|
||||||
pProjectInfo->pPseudoColInfo);
|
pProjectInfo->pPseudoColInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
|
@ -3463,7 +3339,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->fpSet.closeFn != NULL) {
|
if (pOperator->fpSet.closeFn != NULL) {
|
||||||
pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfExprs);
|
pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->pDownstream != NULL) {
|
if (pOperator->pDownstream != NULL) {
|
||||||
|
@ -3475,11 +3351,11 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
pOperator->numOfDownstream = 0;
|
pOperator->numOfDownstream = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->pExpr != NULL) {
|
if (pOperator->exprSupp.pExprInfo != NULL) {
|
||||||
destroyExprInfo(pOperator->pExpr, pOperator->numOfExprs);
|
destroyExprInfo(pOperator->exprSupp.pExprInfo, pOperator->exprSupp.numOfExprs);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pOperator->pExpr);
|
taosMemoryFreeClear(pOperator->exprSupp.pExprInfo);
|
||||||
taosMemoryFreeClear(pOperator->info);
|
taosMemoryFreeClear(pOperator->info);
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFreeClear(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -3529,15 +3405,15 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
|
||||||
destroyDiskbasedBuf(pAggSup->pResultBuf);
|
destroyDiskbasedBuf(pAggSup->pResultBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
|
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
|
||||||
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowEntryInfoOffset);
|
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pSup->rowEntryInfoOffset);
|
||||||
pBasicInfo->pRes = pResultBlock;
|
pBasicInfo->pRes = pResultBlock;
|
||||||
|
|
||||||
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
|
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
pBasicInfo->pCtx[i].pBuf = pAggSup->pResultBuf;
|
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3566,7 +3442,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, numOfRows);
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
int32_t code =
|
int32_t code =
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -3585,8 +3461,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
||||||
|
@ -3627,10 +3503,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
|
|
||||||
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
|
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
|
||||||
assert(pInfo != NULL);
|
assert(pInfo != NULL);
|
||||||
|
|
||||||
destroySqlFunctionCtx(pInfo->pCtx, numOfOutput);
|
|
||||||
taosMemoryFreeClear(pInfo->rowEntryInfoOffset);
|
|
||||||
|
|
||||||
cleanupResultRowInfo(&pInfo->resultRowInfo);
|
cleanupResultRowInfo(&pInfo->resultRowInfo);
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
}
|
}
|
||||||
|
@ -3662,7 +3534,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
taosArrayDestroy(pInfo->pPseudoColInfo);
|
taosArrayDestroy(pInfo->pPseudoColInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupExecSupp(SExecSupp* pSupp) {
|
void cleanupExecSupp(SExprSupp* pSupp) {
|
||||||
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
|
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
|
||||||
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
|
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
|
||||||
|
|
||||||
|
@ -3737,17 +3609,17 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
}
|
}
|
||||||
initResultSizeInfo(pOperator, numOfRows);
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
|
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
|
||||||
|
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
|
||||||
pOperator->name = "ProjectOperator";
|
pOperator->name = "ProjectOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
||||||
|
@ -3768,6 +3640,7 @@ _error:
|
||||||
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||||
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
SIndefOperatorInfo* pIndefInfo = pOperator->info;
|
||||||
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
blockDataCleanup(pRes);
|
blockDataCleanup(pRes);
|
||||||
|
@ -3802,7 +3675,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated before apply the group aggregation.
|
||||||
SExecSupp* pScalarSup = &pIndefInfo->scalarSup;
|
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
|
||||||
if (pScalarSup->pExprInfo != NULL) {
|
if (pScalarSup->pExprInfo != NULL) {
|
||||||
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
|
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
|
||||||
pIndefInfo->pPseudoColInfo);
|
pIndefInfo->pPseudoColInfo);
|
||||||
|
@ -3811,10 +3684,10 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||||
|
|
||||||
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
|
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pOperator->exprSupp.numOfExprs,
|
||||||
pIndefInfo->pPseudoColInfo);
|
pIndefInfo->pPseudoColInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
|
@ -3839,6 +3712,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
|
||||||
|
|
||||||
int32_t numOfExpr = 0;
|
int32_t numOfExpr = 0;
|
||||||
|
@ -3846,7 +3721,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
|
|
||||||
int32_t numOfScalarExpr = 0;
|
int32_t numOfScalarExpr = 0;
|
||||||
if (pPhyNode->pExprs != NULL) {
|
if (pPhyNode->pExprs != NULL) {
|
||||||
SExecSupp* pSup = &pInfo->scalarSup;
|
SExprSupp* pSup = &pInfo->scalarSup;
|
||||||
pSup->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup->numOfExprs);
|
pSup->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup->numOfExprs);
|
||||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, numOfScalarExpr, &pSup->rowEntryInfoOffset);
|
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, numOfScalarExpr, &pSup->rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
@ -3864,19 +3739,19 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
}
|
}
|
||||||
initResultSizeInfo(pOperator, numOfRows);
|
initResultSizeInfo(pOperator, numOfRows);
|
||||||
|
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfExpr, pResBlock, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, pResBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr, pTaskInfo);
|
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfExpr);
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||||
|
|
||||||
pOperator->name = "IndefinitOperator";
|
pOperator->name = "IndefinitOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfExpr;
|
pOperator->exprSupp.numOfExprs = numOfExpr;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||||
|
@ -3973,8 +3848,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->numOfExprs = num;
|
pOperator->exprSupp.numOfExprs = num;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
|
|
@ -28,8 +28,8 @@
|
||||||
|
|
||||||
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
|
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
|
||||||
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
|
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
|
||||||
static int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
|
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
|
||||||
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
|
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
|
||||||
|
|
||||||
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
|
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
|
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
|
||||||
|
@ -217,7 +217,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
|
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
|
||||||
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
|
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
// qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
|
// qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
|
||||||
|
@ -251,16 +251,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||||
int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, 0, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowIndex = j - num;
|
int32_t rowIndex = j - num;
|
||||||
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
|
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC);
|
||||||
|
|
||||||
// assign the group keys or user input constant values if required
|
// assign the group keys or user input constant values if required
|
||||||
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
|
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
||||||
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
|
||||||
num = 1;
|
num = 1;
|
||||||
}
|
}
|
||||||
|
@ -268,15 +268,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
if (num > 0) {
|
if (num > 0) {
|
||||||
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
|
setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
|
||||||
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
|
0, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rowIndex = pBlock->info.rows - num;
|
int32_t rowIndex = pBlock->info.rows - num;
|
||||||
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
|
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC);
|
||||||
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
|
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,7 +320,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
|
setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pBlock, order, scanFlag, true);
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
if (pInfo->scalarSup.pExprInfo != NULL) {
|
if (pInfo->scalarSup.pExprInfo != NULL) {
|
||||||
|
@ -397,15 +397,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
}
|
}
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
|
initAggInfo(&pInfo->binfo, &pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
|
||||||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
pOperator->name = "GroupbyAggOperator";
|
pOperator->name = "GroupbyAggOperator";
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
// pOperator->operatorType = OP_Groupby;
|
// pOperator->operatorType = OP_Groupby;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
@ -442,9 +442,9 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
// group id
|
// group id
|
||||||
|
|
||||||
size_t numOfCols = pOperator->numOfExprs;
|
size_t numOfCols = pOperator->exprSupp.numOfExprs;
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SExprInfo* pExpr = &pOperator->pExpr[i];
|
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
|
||||||
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
|
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
@ -729,8 +729,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
@ -747,16 +747,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
|
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
|
||||||
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo,
|
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
|
||||||
SAggSupporter* pAggSup) {
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
|
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
|
||||||
SqlFunctionCtx* pCtx = binfo->pCtx;
|
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
|
||||||
|
|
||||||
SResultRow* pResultRow =
|
SResultRow* pResultRow =
|
||||||
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
|
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
|
||||||
assert(pResultRow != NULL);
|
assert(pResultRow != NULL);
|
||||||
|
|
||||||
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowEntryInfoOffset);
|
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
|
@ -48,8 +48,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
@ -132,10 +132,10 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
|
||||||
// only the timestamp match support for ordinary table
|
// only the timestamp match support for ordinary table
|
||||||
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
|
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
|
||||||
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
|
||||||
|
|
||||||
SExprInfo* pExprInfo = &pOperator->pExpr[i];
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];
|
||||||
|
|
||||||
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
|
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
|
||||||
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
|
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
|
||||||
|
|
|
@ -262,7 +262,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
SExecSupp* pSup = &pTableScanInfo->pseudoSup;
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock);
|
addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -538,7 +538,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
|
||||||
SExecSupp* pSup = &pInfo->pseudoSup;
|
SExprSupp* pSup = &pInfo->pseudoSup;
|
||||||
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
|
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
|
||||||
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
|
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
|
||||||
}
|
}
|
||||||
|
@ -564,7 +564,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
||||||
|
@ -1118,7 +1118,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = pInfo->pRes->info.numOfCols;
|
pOperator->exprSupp.numOfExprs = pInfo->pRes->info.numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
|
@ -1513,7 +1513,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
|
extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
|
||||||
pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
|
pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
|
||||||
|
|
||||||
// todo log the filter info
|
// todo log the filter info
|
||||||
doFilterResult(pInfo);
|
doFilterResult(pInfo);
|
||||||
|
@ -1628,7 +1628,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = pResBlock->info.numOfCols;
|
pOperator->exprSupp.numOfExprs = pResBlock->info.numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
|
@ -1659,11 +1659,11 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
int32_t count = 0;
|
int32_t count = 0;
|
||||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||||
|
|
||||||
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
|
int32_t functionId = getExprFunctionId(&pOperator->exprSupp.pExprInfo[0]);
|
||||||
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
|
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
|
||||||
assert(pQueryAttr->numOfOutput == 1);
|
assert(pQueryAttr->numOfOutput == 1);
|
||||||
|
|
||||||
SExprInfo* pExprInfo = &pOperator->pExpr[0];
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
int32_t rsize = pExprInfo->base.resSchema.bytes;
|
int32_t rsize = pExprInfo->base.resSchema.bytes;
|
||||||
|
|
||||||
count = 0;
|
count = 0;
|
||||||
|
@ -1726,7 +1726,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
STagScanInfo* pInfo = pOperator->info;
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
SExprInfo* pExprInfo = &pOperator->pExpr[0];
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
SSDataBlock* pRes = pInfo->pRes;
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
|
int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
|
||||||
|
@ -1744,7 +1744,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
|
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
|
||||||
metaGetTableEntryByUid(&mr, item->uid);
|
metaGetTableEntryByUid(&mr, item->uid);
|
||||||
|
|
||||||
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
|
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||||
|
|
||||||
// refactor later
|
// refactor later
|
||||||
|
@ -1824,8 +1824,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfExprs;
|
pOperator->exprSupp.numOfExprs = numOfExprs;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 4096);
|
initResultSizeInfo(pOperator, 4096);
|
||||||
|
@ -2304,7 +2304,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
initResultSizeInfo(pOperator, 1024);
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
SArray* pColMatchColInfo =
|
SArray* pColMatchColInfo =
|
||||||
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowEntryInfoOffset);
|
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 1024);
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
@ -51,8 +51,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
// lazy evaluation for the following parameter since the input datablock is not known till now.
|
// lazy evaluation for the following parameter since the input datablock is not known till now.
|
||||||
|
@ -145,9 +145,9 @@ SSDataBlock* loadNextDataBlock(void* param) {
|
||||||
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
|
||||||
SOperatorInfo* pOperator = param;
|
SOperatorInfo* pOperator = param;
|
||||||
SSortOperatorInfo* pSort = pOperator->info;
|
SSortOperatorInfo* pSort = pOperator->info;
|
||||||
if (pOperator->pExpr != NULL) {
|
if (pOperator->exprSupp.pExprInfo != NULL) {
|
||||||
int32_t code =
|
int32_t code =
|
||||||
projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
|
projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pOperator->pTaskInfo->env, code);
|
longjmp(pOperator->pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,35 +1,35 @@
|
||||||
#run tsim/user/pass_alter.sim
|
run tsim/user/pass_alter.sim
|
||||||
#run tsim/user/basic1.sim
|
run tsim/user/basic1.sim
|
||||||
#run tsim/user/privilege2.sim
|
run tsim/user/privilege2.sim
|
||||||
#run tsim/user/user_len.sim
|
run tsim/user/user_len.sim
|
||||||
#run tsim/user/privilege1.sim
|
run tsim/user/privilege1.sim
|
||||||
#run tsim/user/pass_len.sim
|
run tsim/user/pass_len.sim
|
||||||
#run tsim/table/basic1.sim
|
run tsim/table/basic1.sim
|
||||||
#run tsim/trans/lossdata1.sim
|
run tsim/trans/lossdata1.sim
|
||||||
#run tsim/trans/create_db.sim
|
run tsim/trans/create_db.sim
|
||||||
#run tsim/stable/alter_metrics.sim
|
run tsim/stable/alter_metrics.sim
|
||||||
#run tsim/stable/tag_modify.sim
|
run tsim/stable/tag_modify.sim
|
||||||
#run tsim/stable/alter_comment.sim
|
run tsim/stable/alter_comment.sim
|
||||||
#run tsim/stable/column_drop.sim
|
run tsim/stable/column_drop.sim
|
||||||
#run tsim/stable/column_modify.sim
|
run tsim/stable/column_modify.sim
|
||||||
#run tsim/stable/tag_rename.sim
|
run tsim/stable/tag_rename.sim
|
||||||
#run tsim/stable/vnode3.sim
|
run tsim/stable/vnode3.sim
|
||||||
#run tsim/stable/metrics.sim
|
run tsim/stable/metrics.sim
|
||||||
#run tsim/stable/alter_insert2.sim
|
run tsim/stable/alter_insert2.sim
|
||||||
#run tsim/stable/show.sim
|
run tsim/stable/show.sim
|
||||||
#run tsim/stable/alter_import.sim
|
run tsim/stable/alter_import.sim
|
||||||
#run tsim/stable/tag_add.sim
|
run tsim/stable/tag_add.sim
|
||||||
#run tsim/stable/tag_drop.sim
|
run tsim/stable/tag_drop.sim
|
||||||
#run tsim/stable/column_add.sim
|
run tsim/stable/column_add.sim
|
||||||
#run tsim/stable/alter_count.sim
|
run tsim/stable/alter_count.sim
|
||||||
#run tsim/stable/values.sim
|
run tsim/stable/values.sim
|
||||||
run tsim/stable/dnode3.sim
|
run tsim/stable/dnode3.sim
|
||||||
#run tsim/stable/alter_insert1.sim
|
run tsim/stable/alter_insert1.sim
|
||||||
#run tsim/stable/refcount.sim
|
run tsim/stable/refcount.sim
|
||||||
#run tsim/stable/disk.sim
|
run tsim/stable/disk.sim
|
||||||
run tsim/db/basic1.sim
|
run tsim/db/basic1.sim
|
||||||
run tsim/db/basic3.sim
|
run tsim/db/basic3.sim
|
||||||
#run tsim/db/basic7.sim
|
run tsim/db/basic7.sim
|
||||||
run tsim/db/basic6.sim
|
run tsim/db/basic6.sim
|
||||||
run tsim/db/create_all_options.sim
|
run tsim/db/create_all_options.sim
|
||||||
run tsim/db/basic2.sim
|
run tsim/db/basic2.sim
|
||||||
|
|
Loading…
Reference in New Issue