Merge pull request #13957 from taosdata/feature/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2022-06-18 16:00:32 +08:00 committed by GitHub
commit f23abec35b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 539 additions and 729 deletions

View File

@ -158,10 +158,10 @@ typedef struct tExprNode {
int32_t nodeType;
union {
struct {// function node
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
int32_t functionId;
int32_t num;
struct SFunctionNode *pFunctNode;
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
int32_t functionId;
int32_t num;
struct SFunctionNode *pFunctNode;
} _function;
struct {

View File

@ -113,7 +113,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset);
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
@ -121,6 +121,6 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode);
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
#endif // TDENGINE_QUERYUTIL_H

View File

@ -49,8 +49,6 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
//#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
enum {
@ -64,11 +62,6 @@ enum {
TASK_COMPLETED = 0x2u,
};
typedef struct SResultRowCell {
uint64_t groupId;
SResultRowPosition pos;
} SResultRowCell;
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
@ -83,7 +76,6 @@ typedef struct SResultInfo { // TODO refactor
typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window
// SVariant tag;
} STableQueryInfo;
typedef struct SLimit {
@ -123,41 +115,7 @@ typedef struct SOperatorCostInfo {
double totalCost;
} SOperatorCostInfo;
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef struct STaskAttr {
SLimit limit;
SLimit slimit;
bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query
bool timeWindowInterpo; // if the time window start/end required interpolation
bool tsCompQuery; // is tscomp query
bool diffQuery; // is diff query
bool pointInterpQuery; // point interpolation query
int32_t havingNum; // having expr number
int16_t numOfCols;
int16_t numOfTags;
STimeWindow window;
SInterval interval;
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
int32_t resultRowSize;
int32_t tagLen; // tag value length of current query
SExprInfo* pExpr1;
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* fillVal;
void* tsdb;
// STableListInfo tableGroupInfo; // table list
int32_t vgId;
} STaskAttr;
struct SOperatorInfo;
//struct SAggSupporter;
//struct SOptrBasicInfo;
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
@ -195,31 +153,6 @@ typedef struct SExecTaskInfo {
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
typedef struct STaskRuntimeEnv {
STaskAttr* pQueryAttr;
uint32_t status; // query status
uint8_t scanFlag; // denotes reversed scan of data or not
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char** prevRow;
STSBuf* pTsBuf; // timestamp filter list
STSCursor cur;
char* tagVal; // tag value of current data block
// STableGroupInfo tableqinfoGroupInfo; // this is a table list
struct SOperatorInfo* proot;
SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value
STableQueryInfo* current;
SResultInfo resultInfo;
struct SUdfInfo* pUdfInfo;
} STaskRuntimeEnv;
enum {
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
@ -238,14 +171,20 @@ typedef struct SOperatorFpSet {
__optr_explain_fn_t getExplainFn;
} SOperatorFpSet;
typedef struct SExprSupp {
SExprInfo* pExprInfo;
int32_t numOfExprs; // the number of scalar expression in group operator
SqlFunctionCtx* pCtx;
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
} SExprSupp;
typedef struct SOperatorInfo {
uint8_t operatorType;
bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed
int32_t numOfExprs; // number of columns of the current operator results
char* name; // name, used to show the query execution plan
char* name; // name, for debug purpose
void* info; // extension attribution
SExprInfo* pExpr;
SExprSupp exprSupp;
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
SResultInfo resultInfo;
@ -260,6 +199,9 @@ typedef enum {
EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef struct SSourceDataInfo {
int32_t index;
SRetrieveTableRsp* pRsp;
@ -287,9 +229,6 @@ typedef struct SExchangeInfo {
uint64_t self;
} SExchangeInfo;
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef struct SColMatchInfo {
int32_t srcSlotId; // source slot id
int32_t colId;
@ -299,8 +238,8 @@ typedef struct SColMatchInfo {
} SColMatchInfo;
typedef struct SScanInfo {
int32_t numOfAsc;
int32_t numOfDesc;
int32_t numOfAsc;
int32_t numOfDesc;
} SScanInfo;
typedef struct SSampleExecInfo {
@ -320,17 +259,13 @@ typedef struct STableScanInfo {
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
int32_t numOfOutput;
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx;
// int32_t* rowCellInfoOffset;
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
@ -436,14 +371,12 @@ typedef struct SBlockDistInfo {
void* pHandle;
} SBlockDistInfo;
// todo remove this
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx;
SSDataBlock* pRes;
} SOptrBasicInfo;
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
@ -500,10 +433,7 @@ typedef struct SAggOperatorInfo {
STableQueryInfo *current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprInfo *pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied
SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct.
int32_t *rowCellInfoOffset; // offset value for each row result cell info
SExprSupp scalarExprSup;
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
@ -528,11 +458,7 @@ typedef struct SIndefOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SExprInfo* pScalarExpr;
int32_t numOfScalarExpr;
SqlFunctionCtx* pScalarCtx;
int32_t* rowCellInfoOffset;
SExprSupp scalarSup;
} SIndefOperatorInfo;
typedef struct SFillOperatorInfo {
@ -544,13 +470,6 @@ typedef struct SFillOperatorInfo {
bool multigroupResult;
} SFillOperatorInfo;
typedef struct SScalarSupp {
SExprInfo* pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression in group operator
SqlFunctionCtx* pScalarFuncCtx;
int32_t* rowCellInfoOffset; // offset value for each row result cell info
} SScalarSupp;
typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
@ -563,7 +482,7 @@ typedef struct SGroupbyOperatorInfo {
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SScalarSupp scalarSup;
SExprSupp scalarSup;
} SGroupbyOperatorInfo;
typedef struct SDataGroupInfo {
@ -587,7 +506,7 @@ typedef struct SPartitionOperatorInfo {
void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group
SSDataBlock* pUpdateRes;
SScalarSupp scalarSupp;
SExprSupp scalarSup;
} SPartitionOperatorInfo;
typedef struct SWindowRowsSup {
@ -740,15 +659,19 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSup(SExprSupp* pSup);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
@ -764,9 +687,11 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
void cleanupExecSupp(SExprSupp* pSupp);
SSDataBlock* loadNextDataBlock(void* param);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
@ -885,12 +810,13 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap,
SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);

View File

@ -42,21 +42,6 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
}
}
void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) {
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
// SResultRow *pWindowRes = pResultRowInfo->pResult[i];
// clearResultRow(pRuntimeEnv, pWindowRes);
int32_t groupIndex = 0;
int64_t uid = 0;
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, &groupIndex, sizeof(groupIndex), uid);
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex)));
}
pResultRowInfo->size = 0;
}
void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
// do nothing
}
@ -518,14 +503,14 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
return TSDB_CODE_SUCCESS;
}
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) {
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset) {
SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pFuncCtx == NULL) {
return NULL;
}
*rowCellInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
if (*rowCellInfoOffset == 0) {
*rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
if (*rowEntryInfoOffset == 0) {
taosMemoryFreeClear(pFuncCtx);
return NULL;
}
@ -584,8 +569,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
}
for (int32_t i = 1; i < numOfOutput; ++i) {
(*rowCellInfoOffset)[i] =
(int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize);
(*rowEntryInfoOffset)[i] =
(int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize);
}
setSelectValueColumnInfo(pFuncCtx, numOfOutput);

View File

@ -90,8 +90,6 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type
static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
static void releaseQueryBuf(size_t numOfTables);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
@ -143,35 +141,8 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock,
SqlFunctionCtx* pCtx, int32_t numOfExprs);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId,
SExecTaskInfo* pTaskInfo);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId);
SArray* getOrderCheckColumns(STaskAttr* pQuery);
typedef struct SRowCompSupporter {
STaskRuntimeEnv* pRuntimeEnv;
int16_t dataOffset;
__compar_fn_t comFunc;
} SRowCompSupporter;
static int compareRowData(const void* a, const void* b, const void* userData) {
const SResultRow* pRow1 = (const SResultRow*)a;
const SResultRow* pRow2 = (const SResultRow*)b;
SRowCompSupporter* supporter = (SRowCompSupporter*)userData;
STaskRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv;
SFilePage* page1 = getBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId);
SFilePage* page2 = getBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId);
int16_t offset = supporter->dataOffset;
return 0;
// char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
// char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset);
// return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
}
// setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
@ -187,6 +158,7 @@ static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
return true;
}
#if 0
static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t uid) {
bool existed = false;
@ -223,6 +195,7 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
return p1 != NULL;
}
#endif
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
SFilePage* pData = NULL;
@ -331,36 +304,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
return pResult;
}
// get the correct time window according to the handled timestamp
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo* pResultRowInfo, int64_t ts, STaskAttr* pQueryAttr) {
STimeWindow w = {0};
#if 0
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
// getInitialStartTimeWindow(pQueryAttr, ts, &w);
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
w.ekey =
taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) -
1;
} else {
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
}
} else {
w = pRow->win;
}
/*
* query border check, skey should not be bounded by the query time range, since the value skey will
* be used as the time window index value. So we only change ekey of time window accordingly.
*/
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) {
w.ekey = pQueryAttr->window.ekey;
}
#endif
return w;
}
// a new buffer page for each table. Needs to opt this design
static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, int32_t tid, uint32_t size) {
if (pWindowRes->pageId != -1) {
@ -408,13 +351,6 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
return 0;
}
static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, STimeWindow* win,
bool masterscan, SResultRow** pResult, int64_t groupId, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
assert(win->skey <= win->ekey);
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
}
// query_range_start, query_range_end, window_duration, window_start, window_end
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
@ -483,33 +419,15 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
}
}
static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* pWindow) {
TSKEY ekey = -1;
int32_t order = TSDB_ORDER_ASC;
if (order == TSDB_ORDER_ASC) {
ekey = pWindow->ekey;
if (ekey > pQueryAttr->window.ekey) {
ekey = pQueryAttr->window.ekey;
}
} else {
ekey = pWindow->skey;
if (ekey < pQueryAttr->window.ekey) {
ekey = pQueryAttr->window.ekey;
}
}
return ekey;
}
static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order,
int32_t scanFlag, bool createDummyCol);
static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock,
int32_t order) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].input.numOfRows = pBlock->info.rows;
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
setBlockStatisInfo(&pCtx[i], &pOperator->exprSupp.pExprInfo[i], pBlock);
}
}
@ -568,7 +486,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
int32_t scanFlag, bool createDummyCol) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].input.numOfRows = pBlock->info.rows;
@ -579,7 +497,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput->uid = pBlock->info.uid;
pInput->colDataAggIsSet = false;
SExprInfo* pOneExpr = &pOperator->pExpr[i];
SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i];
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
@ -615,7 +533,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
}
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process == NULL) {
@ -896,36 +814,6 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) {
void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
// if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) {
// continue;
// }
return false;
}
int32_t order = TSDB_ORDER_ASC;
if (order != TSDB_ORDER_DESC || !TSWINDOW_IS_EQUAL(pQueryAttr->window, TSWINDOW_DESC_INITIALIZER)) {
return false;
}
if (pQueryAttr->groupbyColumn) {
return false;
}
if (pQueryAttr->interval.interval > 0) {
return false;
}
if (pQueryAttr->numOfFilterCols > 0 || pQueryAttr->havingNum > 0) {
return false;
}
return true;
}
/////////////////////////////////////////////////////////////////////////////////////////////
// todo refactor : return window
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
@ -941,7 +829,9 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t ke
}
}
#if 0
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
bool hasFirstLastFunc = false;
bool hasOtherFunc = false;
@ -951,7 +841,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]);
#if 0
if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG ||
functionId == FUNCTION_TAG_DUMMY) {
continue;
@ -962,7 +852,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
} else {
hasOtherFunc = true;
}
#endif
}
if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) {
@ -976,6 +866,8 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
return status;
}
#endif
// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) {
// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
//
@ -1087,7 +979,7 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
//
// return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
// }
#if 0
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
STimeWindow w = {0};
@ -1136,55 +1028,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
return false;
}
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) {
int32_t len = 0;
int32_t start = 0;
for (int32_t j = 0; j < numOfRows; ++j) {
if (p[j] == 1) {
len++;
} else {
if (len > 0) {
int32_t cstart = j - len;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColumnInfoData->info.bytes;
memmove(((char*)pColumnInfoData->pData) + start * bytes, pColumnInfoData->pData + cstart * bytes,
len * bytes);
}
start += len;
len = 0;
}
}
}
if (len > 0) {
int32_t cstart = numOfRows - len;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColumnInfoData->info.bytes;
memmove(pColumnInfoData->pData + start * bytes, pColumnInfoData->pData + cstart * bytes, len * bytes);
}
start += len;
len = 0;
}
pBlock->info.rows = start;
pBlock->pBlockAgg = NULL; // clean the block statistics info
if (start > 0) {
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0);
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
pColumnInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
pBlock->info.window.skey = *(int64_t*)pColumnInfoData->pData;
pBlock->info.window.ekey = *(int64_t*)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1));
}
}
}
#endif
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
@ -1248,17 +1092,17 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
if (pQueryAttr->pointInterpQuery) {
needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset);
pTableScanInfo->rowEntryInfoOffset);
} else {
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx,
pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput,
pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput,
pRuntimeEnv->current->groupIndex);
}
@ -1303,7 +1147,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
@ -1386,10 +1230,10 @@ void initResultRow(SResultRow* pResultRow) {
* offset[0] offset[1] offset[2]
*/
// TODO refactor: some function move away
void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs,
SExecTaskInfo* pTaskInfo) {
SqlFunctionCtx* pCtx = pInfo->pCtx;
int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset;
void setFunctionResultOutput(SOperatorInfo *pOperator, SOptrBasicInfo *pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
initResultRowInfo(pResultRowInfo);
@ -1400,7 +1244,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
pTaskInfo, false, pSup);
for (int32_t i = 0; i < numOfExprs; ++i) {
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowCellInfoOffset);
struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset);
cleanupResultRowEntry(pEntry);
pCtx[i].resultInfo = pEntry;
@ -1441,9 +1285,9 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) {
// cleanupResultRowInfo(&pTableQueryInfo->resInfo);
}
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) {
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) {
for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowCellInfoOffset);
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
@ -1536,14 +1380,12 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
}
}
void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId,
SExecTaskInfo* pTaskInfo) {
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId) {
// for simple group by query without interval, all the tables belong to one group result.
int64_t uid = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx;
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId,
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
@ -1561,15 +1403,15 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
}
}
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
}
void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) {
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) {
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) {
return;
}
doSetTableGroupOutputBuf(pAggInfo, numOfOutput, groupId, pTaskInfo);
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
// record the current active group id
pAggInfo->groupId = groupId;
@ -1721,13 +1563,13 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExprInfo* pExprInfo = pOperator->pExpr;
int32_t numOfExprs = pOperator->numOfExprs;
SExprInfo* pExprInfo = pOperator->exprSupp.pExprInfo;
int32_t numOfExprs = pOperator->exprSupp.numOfExprs;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t* rowCellOffset = pbInfo->rowCellInfoOffset;
int32_t* rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
SSDataBlock* pBlock = pbInfo->pRes;
SqlFunctionCtx* pCtx = pbInfo->pCtx;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
blockDataCleanup(pBlock);
if (!hasDataInGroupInfo(pGroupResInfo)) {
@ -1740,7 +1582,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
}
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo,
int32_t* rowCellInfoOffset) {
int32_t* rowEntryInfoOffset) {
// update the number of result for each, only update the number of rows for the corresponding window result.
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
// return;
@ -1755,7 +1597,7 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu
continue;
}
SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowCellInfoOffset);
SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset);
pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes));
}
}
@ -2575,7 +2417,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
pOperator->exprSupp.numOfExprs = pInfo->pResult->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
@ -2696,7 +2538,7 @@ static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock
static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) {
SSortedMergeOperatorInfo* pInfo = pOperator->info;
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
// pCtx[i].size = 1;
}
@ -2711,8 +2553,8 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
doMergeResultImpl(pInfo, pCtx, numOfExpr, i);
} else {
doFinalizeResultImpl(pCtx, numOfExpr);
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows);
int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
// TODO check for available buffer;
@ -2759,16 +2601,16 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
break;
}
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
// updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor,
// pOperator->pRuntimeEnv, true);
doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock);
doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock);
// flush to tuple store, and after all data have been handled, return to upstream node or sink node
}
doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfExprs);
int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows);
doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
// setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows);
// TODO check for available buffer;
@ -2909,20 +2751,20 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
goto _error;
}
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, num, &pOperator->exprSupp.rowEntryInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) {
if (pOperator->exprSupp.pCtx == NULL || pInfo->binfo.pRes == NULL) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, num, keyBufSize, pTaskInfo->id.str);
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pOperator->exprSupp.pCtx, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num, pTaskInfo);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, num);
code = initGroupCol(pExprInfo, num, pGroupInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -2941,8 +2783,8 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = num;
pOperator->pExpr = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->pTaskInfo = pTaskInfo;
@ -2997,6 +2839,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
SExprSupp* pSup = &pOperator->exprSupp;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SOperatorInfo* downstream = pOperator->pDownstream[0];
@ -3017,18 +2860,18 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pAggInfo->pScalarExprInfo != NULL) {
code = projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx,
pAggInfo->numOfScalarExpr, NULL);
if (pAggInfo->scalarExprSup.pExprInfo != NULL) {
SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
}
// the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, 0, pInfo->pCtx);
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, 0, pSup->pCtx);
if (code != 0) {
longjmp(pTaskInfo->env, code);
}
@ -3287,6 +3130,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
@ -3304,10 +3148,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs);
projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfExprs);
copyTsColoum(pRes, pInfo->pCtx, pOperator->exprSupp.numOfExprs);
resetResultRowEntryResult(pInfo->pCtx, pOperator->exprSupp.numOfExprs);
return pRes;
}
}
@ -3339,10 +3183,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
doFilter(pProjectInfo->pFilterNode, pBlock);
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
pProjectInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
@ -3493,7 +3337,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
}
if (pOperator->fpSet.closeFn != NULL) {
pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfExprs);
pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs);
}
if (pOperator->pDownstream != NULL) {
@ -3505,11 +3349,11 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
pOperator->numOfDownstream = 0;
}
if (pOperator->pExpr != NULL) {
destroyExprInfo(pOperator->pExpr, pOperator->numOfExprs);
if (pOperator->exprSupp.pExprInfo != NULL) {
destroyExprInfo(pOperator->exprSupp.pExprInfo, pOperator->exprSupp.numOfExprs);
}
taosMemoryFreeClear(pOperator->pExpr);
taosMemoryFreeClear(pOperator->exprSupp.pExprInfo);
taosMemoryFreeClear(pOperator->info);
taosMemoryFreeClear(pOperator);
}
@ -3559,15 +3403,12 @@ void cleanupAggSup(SAggSupporter* pAggSup) {
destroyDiskbasedBuf(pAggSup->pResultBuf);
}
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) {
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pRes = pResultBlock;
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey);
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
size_t keyBufSize, const char* pkey) {
initExprSupp(pSup, pExprInfo, numOfCols);
doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey);
for (int32_t i = 0; i < numOfCols; ++i) {
pBasicInfo->pCtx[i].pBuf = pAggSup->pResultBuf;
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
}
return TSDB_CODE_SUCCESS;
@ -3582,6 +3423,19 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
}
}
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
pInfo->pRes = pBlock;
initResultRowInfo(&pInfo->resultRowInfo);
}
void initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr) {
pSup->pExprInfo = pExprInfo;
pSup->numOfExprs = numOfExpr;
if (pSup->pExprInfo != NULL) {
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset);
}
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
@ -3595,28 +3449,20 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows);
int32_t code =
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->groupId = INT32_MIN;
initResultRowInfo(&pInfo->binfo.resultRowInfo);
initBasicInfo(&pInfo->binfo, pResultBlock);
initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr);
pInfo->pScalarExprInfo = pScalarExprInfo;
pInfo->numOfScalarExpr = numOfScalarExpr;
if (pInfo->pScalarExprInfo != NULL) {
pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
}
pOperator->name = "TableAggregate";
pInfo->groupId = INT32_MIN;
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
@ -3655,24 +3501,20 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL;
}
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
assert(pInfo != NULL);
destroySqlFunctionCtx(pInfo->pCtx, numOfOutput);
taosMemoryFreeClear(pInfo->rowCellInfoOffset);
cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param;
doDestroyBasicInfo(pInfo, numOfOutput);
cleanupBasicInfo(pInfo);
}
void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupBasicInfo(&pInfo->binfo);
}
void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
@ -3687,22 +3529,25 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
return;
}
SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
taosArrayDestroy(pInfo->pPseudoColInfo);
}
void cleanupExecSupp(SExprSupp* pSupp) {
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
taosMemoryFree(pSupp->rowEntryInfoOffset);
}
static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup);
destroySqlFunctionCtx(pInfo->pScalarCtx, numOfOutput);
destroyExprInfo(pInfo->pScalarExpr, pInfo->numOfScalarExpr);
taosMemoryFree(pInfo->rowCellInfoOffset);
cleanupExecSupp(&pInfo->scalarSup);
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
@ -3764,17 +3609,16 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
}
initResultSizeInfo(pOperator, numOfRows);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols, pTaskInfo);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
@ -3795,6 +3639,7 @@ _error:
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
SIndefOperatorInfo* pIndefInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
@ -3829,18 +3674,19 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if (pIndefInfo->pScalarExpr != NULL) {
code = projectApplyFunctions(pIndefInfo->pScalarExpr, pBlock, pBlock, pIndefInfo->pScalarCtx,
pIndefInfo->numOfScalarExpr, pIndefInfo->pPseudoColInfo);
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
if (pScalarSup->pExprInfo != NULL) {
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
}
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs,
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pOperator->exprSupp.numOfExprs,
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
@ -3865,15 +3711,17 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
goto _error;
}
SExprSupp* pSup = &pOperator->exprSupp;
SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode;
int32_t numOfExpr = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pVectorFuncs, NULL, &numOfExpr);
int32_t numOfScalarExpr = 0;
if (pPhyNode->pExprs != NULL) {
pInfo->pScalarExpr = createExprInfo(pPhyNode->pExprs, NULL, &numOfScalarExpr);
pInfo->pScalarCtx = createSqlFunctionCtx(pInfo->pScalarExpr, numOfScalarExpr, &pInfo->rowCellInfoOffset);
SExprSupp* pSup1 = &pInfo->scalarSup;
pSup1->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup1->numOfExprs);
pSup1->pCtx = createSqlFunctionCtx(pSup1->pExprInfo, pSup1->numOfExprs, &pSup1->rowEntryInfoOffset);
}
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);
@ -3889,21 +3737,22 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
}
initResultSizeInfo(pOperator, numOfRows);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfExpr, pResBlock, keyBufSize, pTaskInfo->id.str);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr, pTaskInfo);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock;
pInfo->numOfScalarExpr = numOfScalarExpr;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfExpr);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator";
pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfExpr;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfExpr;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
@ -3999,8 +3848,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = num;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = num;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;

View File

@ -28,15 +28,16 @@
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup);
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupBasicInfo(&pInfo->binfo);
taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals);
cleanupExecSupp(&pInfo->scalarSup);
}
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
@ -216,7 +217,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info;
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
// qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
@ -250,16 +251,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, 0, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = j - num;
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC);
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
num = 1;
}
@ -267,15 +268,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (num > 0) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret =
setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
0, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = pBlock->info.rows - num;
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
}
}
@ -319,11 +320,11 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pBlock, order, scanFlag, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pScalarExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL);
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
@ -386,9 +387,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo;
pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr;
pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset);
pInfo->scalarSup.pExprInfo = pScalarExprInfo;
pInfo->scalarSup.numOfExprs = numOfScalarExpr;
pInfo->scalarSup.pCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowEntryInfoOffset);
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
if (code != TSDB_CODE_SUCCESS) {
@ -396,15 +397,16 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
}
initResultSizeInfo(pOperator, 4096);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResultBlock);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "GroupbyAggOperator";
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Groupby;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
@ -441,9 +443,9 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
// group id
size_t numOfCols = pOperator->numOfExprs;
size_t numOfCols = pOperator->exprSupp.numOfExprs;
for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->pExpr[i];
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
@ -645,8 +647,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
}
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSupp.pScalarExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL);
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
@ -664,16 +666,20 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
cleanupBasicInfo(&pInfo->binfo);
taosArrayDestroy(pInfo->pGroupCols);
for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
taosMemoryFree(key.pData);
}
taosArrayDestroy(pInfo->pGroupColVals);
taosMemoryFree(pInfo->keyBuf);
taosHashCleanup(pInfo->pGroupSet);
taosMemoryFree(pInfo->columnOffset);
cleanupExecSupp(&pInfo->scalarSup);
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
@ -691,10 +697,10 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
pInfo->scalarSupp.numOfScalarExpr = 0;
pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr);
pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx(
pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset);
pInfo->scalarSup.numOfExprs = 0;
pInfo->scalarSup.pExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSup.numOfExprs);
pInfo->scalarSup.pCtx = createSqlFunctionCtx(
pInfo->scalarSup.pExprInfo, pInfo->scalarSup.numOfExprs, &pInfo->scalarSup.rowEntryInfoOffset);
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -724,8 +730,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResBlock;
pOperator->numOfExprs = numOfCols;
pOperator->pExpr = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
@ -742,16 +748,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
return NULL;
}
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo,
SAggSupporter* pAggSup) {
int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = binfo->pCtx;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
SResultRow* pResultRow =
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
assert(pResultRow != NULL);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, pOperator->exprSupp.rowEntryInfoOffset);
return TSDB_CODE_SUCCESS;
}

View File

@ -48,8 +48,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
@ -132,10 +132,10 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
// only the timestamp match support for ordinary table
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
SExprInfo* pExprInfo = &pOperator->pExpr[i];
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;

View File

@ -261,9 +261,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols);
// currently only the tbname pseudo column
if (pTableScanInfo->numOfPseudoExpr > 0) {
addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr,
pBlock);
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock);
}
int64_t st = taosGetTimestampMs();
@ -538,8 +538,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
}
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
SExprSupp* pSup = &pInfo->pseudoSup;
pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs);
pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset);
}
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
@ -563,7 +564,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
@ -1117,7 +1118,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pRes->info.numOfCols;
pOperator->exprSupp.numOfExprs = pInfo->pRes->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
@ -1512,7 +1513,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
extractDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pRsp->numOfRows, pRsp->data, pRsp->compLen,
pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
pOperator->exprSupp.numOfExprs, startTs, NULL, pInfo->scanCols);
// todo log the filter info
doFilterResult(pInfo);
@ -1627,7 +1628,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->exprSupp.numOfExprs = pResBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
@ -1658,11 +1659,11 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
int32_t count = 0;
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
int32_t functionId = getExprFunctionId(&pOperator->exprSupp.pExprInfo[0]);
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
assert(pQueryAttr->numOfOutput == 1);
SExprInfo* pExprInfo = &pOperator->pExpr[0];
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
int32_t rsize = pExprInfo->base.resSchema.bytes;
count = 0;
@ -1725,7 +1726,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
#endif
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->pExpr[0];
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
SSDataBlock* pRes = pInfo->pRes;
int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList);
@ -1743,7 +1744,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos);
metaGetTableEntryByUid(&mr, item->uid);
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
// refactor later
@ -1823,8 +1824,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfExprs;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfExprs;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 4096);
@ -1869,7 +1870,7 @@ typedef struct STableMergeScanInfo {
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset;
int32_t* rowEntryInfoOffset;
SExprInfo* pExpr;
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
@ -1878,7 +1879,7 @@ typedef struct STableMergeScanInfo {
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SqlFunctionCtx* pPseudoCtx;
// int32_t* rowCellInfoOffset;
// int32_t* rowEntryInfoOffset;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
@ -2257,7 +2258,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset);
}
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
@ -2303,7 +2304,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 1024);

View File

@ -39,7 +39,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
SArray* pColMatchColInfo =
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
pInfo->binfo.pRes = pResBlock;
initResultSizeInfo(pOperator, 1024);
@ -51,8 +51,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now.
@ -145,9 +145,9 @@ SSDataBlock* loadNextDataBlock(void* param) {
void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->pExpr != NULL) {
if (pOperator->exprSupp.pExprInfo != NULL) {
int32_t code =
projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL);
projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
}

File diff suppressed because it is too large Load Diff

View File

@ -1,35 +1,35 @@
#run tsim/user/pass_alter.sim
#run tsim/user/basic1.sim
#run tsim/user/privilege2.sim
#run tsim/user/user_len.sim
#run tsim/user/privilege1.sim
#run tsim/user/pass_len.sim
#run tsim/table/basic1.sim
#run tsim/trans/lossdata1.sim
#run tsim/trans/create_db.sim
#run tsim/stable/alter_metrics.sim
#run tsim/stable/tag_modify.sim
#run tsim/stable/alter_comment.sim
#run tsim/stable/column_drop.sim
#run tsim/stable/column_modify.sim
#run tsim/stable/tag_rename.sim
#run tsim/stable/vnode3.sim
#run tsim/stable/metrics.sim
#run tsim/stable/alter_insert2.sim
#run tsim/stable/show.sim
#run tsim/stable/alter_import.sim
#run tsim/stable/tag_add.sim
#run tsim/stable/tag_drop.sim
#run tsim/stable/column_add.sim
#run tsim/stable/alter_count.sim
#run tsim/stable/values.sim
run tsim/user/pass_alter.sim
run tsim/user/basic1.sim
run tsim/user/privilege2.sim
run tsim/user/user_len.sim
run tsim/user/privilege1.sim
run tsim/user/pass_len.sim
run tsim/table/basic1.sim
run tsim/trans/lossdata1.sim
run tsim/trans/create_db.sim
run tsim/stable/alter_metrics.sim
run tsim/stable/tag_modify.sim
run tsim/stable/alter_comment.sim
run tsim/stable/column_drop.sim
run tsim/stable/column_modify.sim
run tsim/stable/tag_rename.sim
run tsim/stable/vnode3.sim
run tsim/stable/metrics.sim
run tsim/stable/alter_insert2.sim
run tsim/stable/show.sim
run tsim/stable/alter_import.sim
run tsim/stable/tag_add.sim
run tsim/stable/tag_drop.sim
run tsim/stable/column_add.sim
run tsim/stable/alter_count.sim
run tsim/stable/values.sim
run tsim/stable/dnode3.sim
#run tsim/stable/alter_insert1.sim
#run tsim/stable/refcount.sim
#run tsim/stable/disk.sim
run tsim/stable/alter_insert1.sim
run tsim/stable/refcount.sim
run tsim/stable/disk.sim
run tsim/db/basic1.sim
run tsim/db/basic3.sim
#run tsim/db/basic7.sim
run tsim/db/basic7.sim
run tsim/db/basic6.sim
run tsim/db/create_all_options.sim
run tsim/db/basic2.sim