diff --git a/include/libs/function/function.h b/include/libs/function/function.h index b359fa5d6a..54762a0f17 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -150,12 +150,8 @@ typedef struct SqlFunctionCtx { } SqlFunctionCtx; enum { - TEXPR_NODE_DUMMY = 0x0, TEXPR_BINARYEXPR_NODE= 0x1, TEXPR_UNARYEXPR_NODE = 0x2, - TEXPR_FUNCTION_NODE = 0x3, - TEXPR_COL_NODE = 0x4, - TEXPR_VALUE_NODE = 0x8, }; typedef struct tExprNode { diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index ca25b7aab1..a70cff5552 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -55,7 +55,6 @@ typedef struct SResultRow { uint32_t numOfRows; // number of rows of current time window STimeWindow win; struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo -// char *key; // start key of current result row } SResultRow; typedef struct SResultRowPosition { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e3e85bf2b8..49316ff746 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -540,6 +540,13 @@ typedef struct SFillOperatorInfo { bool multigroupResult; } SFillOperatorInfo; +typedef struct SScalarSupp { + SExprInfo* pScalarExprInfo; + int32_t numOfScalarExpr; // the number of scalar expression in group operator + SqlFunctionCtx* pScalarFuncCtx; + int32_t* rowCellInfoOffset; // offset value for each row result cell info +} SScalarSupp; + typedef struct SGroupbyOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; @@ -552,10 +559,7 @@ typedef struct SGroupbyOperatorInfo { char* keyBuf; // group by keys for hash int32_t groupKeyLen; // total group by column width SGroupResInfo groupResInfo; - SExprInfo* pScalarExprInfo; - int32_t numOfScalarExpr; // the number of scalar expression in group operator - SqlFunctionCtx* pScalarFuncCtx; - int32_t* rowCellInfoOffset; // offset value for each row result cell info + SScalarSupp scalarSup; } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { @@ -578,6 +582,7 @@ typedef struct SPartitionOperatorInfo { int32_t* columnOffset; // start position for each column data void* pGroupIter; // group iterator int32_t pageIndex; // page index of current group + SScalarSupp scalarSupp; } SPartitionOperatorInfo; typedef struct SWindowRowsSup { @@ -755,6 +760,8 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); +SArray* extractPartitionColInfo(SNodeList* pNodeList); + void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); @@ -834,20 +841,17 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, - SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, - int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, - SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 6ba1cf859e..01ed30c189 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -28,17 +28,6 @@ typedef struct SCompSupporter { int32_t order; } SCompSupporter; -int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { - int32_t size = 0; - - for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { -// size += pQueryAttr->pExpr1[i].base.interBytes; - } - - assert(size >= 0); - return size; -} - int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) { pResultRowInfo->size = 0; pResultRowInfo->cur.pageId = -1; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index de109a24ca..89692c0061 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -40,9 +40,6 @@ #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) -#define SDATA_BLOCK_INITIALIZER \ - (SDataBlockInfo) { {0}, 0 } - #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #if 0 @@ -95,8 +92,6 @@ static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlo static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo); -static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); - static void releaseQueryBuf(size_t numOfTables); static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); @@ -454,75 +449,6 @@ static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId); } -static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey, bool ascQuery, - bool timeWindowInterpo) { - int64_t skey = TSKEY_INITIAL_VAL; -#if 0 - int32_t i = 0; - for (i = pResultRowInfo->size - 1; i >= 0; --i) { - SResultRow* pResult = pResultRowInfo->pResult[i]; - if (pResult->closed) { - break; - } - - // new closed result rows - if (timeWindowInterpo) { - if (pResult->endInterp && - ((pResult->win.skey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery))) { - if (i > 0) { // the first time window, the startInterp is false. - assert(pResult->startInterp); - } - - closeResultRow(pResultRowInfo, i); - } else { - skey = pResult->win.skey; - } - } else { - if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { - closeResultRow(pResultRowInfo, i); - } else { - skey = pResult->win.skey; - } - } - } - - // all result rows are closed, set the last one to be the skey - if (skey == TSKEY_INITIAL_VAL) { - if (pResultRowInfo->size == 0) { - // assert(pResultRowInfo->current == NULL); - assert(pResultRowInfo->curPos == -1); - pResultRowInfo->curPos = -1; - } else { - pResultRowInfo->curPos = pResultRowInfo->size - 1; - } - } else { - for (i = pResultRowInfo->size - 1; i >= 0; --i) { - SResultRow* pResult = pResultRowInfo->pResult[i]; - if (pResult->closed) { - break; - } - } - - if (i == pResultRowInfo->size - 1) { - pResultRowInfo->curPos = i; - } else { - pResultRowInfo->curPos = i + 1; // current not closed result object - } - } -#endif -} -// -// static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey, -// bool ascQuery, bool interp) { -// if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) { -// closeAllResultRows(pResultRowInfo); -// pResultRowInfo->curPos = pResultRowInfo->size - 1; -// } else { -// int32_t step = ascQuery ? 1 : -1; -// doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp); -// } -//} - // query_range_start, query_range_end, window_duration, window_start, window_end void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) { pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; @@ -810,20 +736,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc // _rowts/_c0, not tbname column if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) { // do nothing - } else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) { - SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]); - pfCtx->fpSet.init(&pCtx[k], pResInfo); - - pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId); - pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset - - // set the timestamp(_rowts) output buffer - if (taosArrayGetSize(pPseudoList) > 0) { - int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); - pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; - } - - numOfRows = pfCtx->fpSet.process(pfCtx); } else { SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); taosArrayPush(pBlockList, &pSrcBlock); @@ -886,7 +798,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup); assert(pResultRow != NULL); - setResultRowKey(pResultRow, pData, type); setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset); return TSDB_CODE_SUCCESS; } @@ -908,21 +819,6 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return false; } - // if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) { - // // return QUERY_IS_ASC_QUERY(pQueryAttr); - // } - // - // // denote the order type - // if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) { - // // return pCtx->param[0].i == pQueryAttr->order.order; - // } - - // in the reverse table scan, only the following functions need to be executed - // if (IS_REVERSE_SCAN(pRuntimeEnv) || - // (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) { - // return false; - // } - return true; } @@ -3934,27 +3830,6 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { } } -// static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) { -// int32_t size = taosArrayGetSize(pTableListInfo->pTableList); -// if (size == 0) { -// return NULL; -// } -// -// STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo)); -// if (pTableQueryInfo == NULL) { -// return NULL; -// } -// -// for (int32_t j = 0; j < size; ++j) { -// STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j); -// STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j]; -// pTQueryInfo->lastKey = pk->lastKey; -// } -// -// pTableQueryInfo->lastKey = 0; -// return pTableQueryInfo; -//} - SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { @@ -4526,7 +4401,6 @@ static int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableU static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* createSortInfo(SNodeList* pNodeList); -static SArray* extractPartitionColInfo(SNodeList* pNodeList); int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; @@ -4861,12 +4735,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { - SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode; - SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - - SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num); - pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo); + pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 688c576d03..23226a0134 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -320,8 +320,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true); // there is an scalar expression that needs to be calculated right before apply the group aggregation. - if (pInfo->pScalarExprInfo != NULL) { - pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); + if (pInfo->scalarSup.pScalarExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, pTaskInfo->code); } @@ -385,9 +385,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pInfo->pGroupCols = pGroupColList; pInfo->pCondition = pCondition; - pInfo->pScalarExprInfo = pScalarExprInfo; - pInfo->numOfScalarExpr = numOfScalarExpr; - pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset); + pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo; + pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr; + pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset); int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { @@ -624,7 +624,9 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { return NULL; } - SGroupbyOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SPartitionOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { @@ -641,6 +643,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { break; } + // there is an scalar expression that needs to be calculated right before apply the group aggregation. + if (pInfo->scalarSupp.pScalarExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, pTaskInfo->code); + } + } + doHashPartition(pOperator, pBlock); } @@ -665,15 +675,26 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFree(pInfo->columnOffset); } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pGroupCols = pGroupColList; + SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); + + pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); + + if (pPartNode->pExprs != NULL) { + pInfo->scalarSupp.numOfScalarExpr = 0; + pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr); + pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx( + pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset); + } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); @@ -683,16 +704,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* uint32_t defaultPgsz = 0; uint32_t defaultBufsz = 0; - getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz); + getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz); int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf)); - pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity); - code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); + pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf)); + pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -701,7 +722,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pInfo->binfo.pRes = pResultBlock; + pInfo->binfo.pRes = pResBlock; pOperator->numOfExprs = numOfCols; pOperator->pExpr = pExprInfo; pOperator->info = pInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d30e4ef6db..62c2240098 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1157,6 +1157,8 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } + doFilter(pInfo->pCondition, pInfo->pRes); +#if 0 SFilterInfo* filter = NULL; int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0); @@ -1202,6 +1204,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { px->info.rows = numOfRow; pInfo->pRes = px; +#endif return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } @@ -1380,6 +1383,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); doFilterResult(pInfo); + blockDataDestroy(p); + pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -1469,10 +1474,10 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock); - // blockDataDestroy(p); todo handle memory leak - pInfo->pRes->info.rows = p->info.rows; - return p->info.rows; + blockDataDestroy(p); + + return pInfo->pRes->info.rows; } int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4141c85366..4673123779 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -333,11 +333,6 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp continue; } - // if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) { - // pCtx[k].start.key = INT64_MIN; - // continue; - // } - SFunctParam* pParam = &pCtx[k].param[0]; SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId); diff --git a/source/libs/function/inc/texpr.h b/source/libs/function/inc/texpr.h deleted file mode 100644 index 68aedcb5a2..0000000000 --- a/source/libs/function/inc/texpr.h +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_COMMON_EXPR_H_ -#define _TD_COMMON_EXPR_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include "os.h" - -#include "tmsg.h" -#include "taosdef.h" -#include "tskiplist.h" -#include "function.h" - -struct tExprNode; -struct SSchema; - -#define QUERY_COND_REL_PREFIX_IN "IN|" -#define QUERY_COND_REL_PREFIX_LIKE "LIKE|" -#define QUERY_COND_REL_PREFIX_MATCH "MATCH|" -#define QUERY_COND_REL_PREFIX_NMATCH "NMATCH|" - -#define QUERY_COND_REL_PREFIX_IN_LEN 3 -#define QUERY_COND_REL_PREFIX_LIKE_LEN 5 -#define QUERY_COND_REL_PREFIX_MATCH_LEN 6 -#define QUERY_COND_REL_PREFIX_NMATCH_LEN 7 - -typedef bool (*__result_filter_fn_t)(const void *, void *); -typedef void (*__do_filter_suppl_fn_t)(void *, void *); - -/** - * this structure is used to filter data in tags, so the offset of filtered tag column in tagdata string is required - */ -typedef struct tQueryInfo { - uint8_t optr; // expression operator - SSchema sch; // schema of tags - char* q; - __compar_fn_t compare; // filter function - bool indexed; // indexed columns -} tQueryInfo; - -typedef struct SExprTraverseSupp { - __result_filter_fn_t nodeFilterFn; - __do_filter_suppl_fn_t setupInfoFn; - void *pExtInfo; -} SExprTraverseSupp; - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_COMMON_EXPR_H_*/ diff --git a/source/libs/function/src/texpr.c b/source/libs/function/src/texpr.c index f04b4f17f9..039e0a9dfc 100644 --- a/source/libs/function/src/texpr.c +++ b/source/libs/function/src/texpr.c @@ -17,15 +17,7 @@ #include "os.h" #include "texception.h" -#include "taosdef.h" #include "tmsg.h" -#include "tarray.h" -#include "tbuffer.h" -#include "tcompare.h" -#include "thash.h" -#include "texpr.h" -#include "tvariant.h" -#include "tdef.h" static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));