[td-13039] refactor.

This commit is contained in:
Haojun Liao 2022-04-02 15:08:48 +08:00
parent 9a5123c64f
commit cf25aca809
7 changed files with 171 additions and 538 deletions

View File

@ -193,20 +193,19 @@ typedef struct SColumn {
uint8_t scale;
} SColumn;
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct SOrder {
uint32_t order;
SColumn col;
} SOrder;
typedef struct SGroupbyExpr {
SArray* columnInfo; // SArray<SColIndex>, group by columns information
bool groupbyTag; // group by tag or column
} SGroupbyExpr;
typedef struct STableBlockDistInfo {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray *dataBlockInfos;
} STableBlockDistInfo;
enum {
FUNC_PARAM_TYPE_VALUE = 0x1,
@ -241,15 +240,6 @@ typedef struct SExprInfo {
struct tExprNode* pExpr;
} SExprInfo;
typedef struct SStateWindow {
SColumn col;
} SStateWindow;
typedef struct SSessionWindow {
int64_t gap; // gap between two session window(in microseconds)
SColumn col;
} SSessionWindow;
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1

View File

@ -52,7 +52,12 @@ typedef struct SFuncExecFuncs {
FExecFinalize finalize;
} SFuncExecFuncs;
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 8
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define FUNCTION_TYPE_SCALAR 1
#define FUNCTION_TYPE_AGG 2
@ -101,10 +106,6 @@ typedef struct SFuncExecFuncs {
#define FUNCTION_DERIVATIVE 32
#define FUNCTION_BLKINFO 33
#define FUNCTION_HISTOGRAM 34
#define FUNCTION_HLL 35
#define FUNCTION_MODE 36
#define FUNCTION_SAMPLE 37
#define FUNCTION_COV 38

View File

@ -171,6 +171,8 @@ tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT* pTsdbReadHandle);
/**

View File

@ -14,7 +14,7 @@
*/
#include "tsdbDef.h"
#include <tdatablock.h>
#include "tdatablock.h"
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
@ -31,6 +31,7 @@
#include "tlosertree.h"
#include "tsdbDef.h"
#include "tmsg.h"
#include "tsdbCommit.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
@ -209,34 +210,34 @@ static SArray* getDefaultLoadColumns(STsdbReadHandle* pTsdbReadHandle, bool load
return pLocalIdList;
}
//int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
// STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
//
// int64_t rows = 0;
// STsdbMemTable* pMemTable = pTsdbReadHandle->pMemTable;
// if (pMemTable == NULL) { return rows; }
//
//// STableData* pMem = NULL;
//// STableData* pIMem = NULL;
//
//// SMemTable* pMemT = pMemRef->snapshot.mem;
//// SMemTable* pIMemT = pMemRef->snapshot.imem;
//
// size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
// for (int32_t i = 0; i < size; ++i) {
// STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
//
//// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
//// pMem = pMemT->tData[pCheckInfo->tableId];
//// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
//// }
//// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
//// pIMem = pIMemT->tData[pCheckInfo->tableId];
//// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
//// }
// }
// return rows;
//}
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) pHandle;
int64_t rows = 0;
STsdbMemTable* pMemTable = NULL;//pTsdbReadHandle->pMemTable;
if (pMemTable == NULL) { return rows; }
// STableData* pMem = NULL;
// STableData* pIMem = NULL;
// SMemTable* pMemT = pMemRef->snapshot.mem;
// SMemTable* pIMemT = pMemRef->snapshot.imem;
size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, i);
// if (pMemT && pCheckInfo->tableId < pMemT->maxTables) {
// pMem = pMemT->tData[pCheckInfo->tableId];
// rows += (pMem && pMem->uid == pCheckInfo->tableId) ? pMem->numOfRows : 0;
// }
// if (pIMemT && pCheckInfo->tableId < pIMemT->maxTables) {
// pIMem = pIMemT->tData[pCheckInfo->tableId];
// rows += (pIMem && pIMem->uid == pCheckInfo->tableId) ? pIMem->numOfRows : 0;
// }
}
return rows;
}
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo* pGroupList) {
size_t numOfGroup = taosArrayGetSize(pGroupList->pGroupList);
@ -2261,8 +2262,8 @@ static void moveToNextDataBlockInCurrentFile(STsdbReadHandle* pTsdbReadHandle) {
cur->mixBlock = false;
cur->blockCompleted = false;
}
#if 0
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTableBlockInfo) {
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* pTableBlockInfo) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*) queryHandle;
pTableBlockInfo->totalSize = 0;
@ -2284,7 +2285,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0;
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
int defaultRows = 4096;//TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
STimeWindow win = TSWINDOW_INITIALIZER;
while (true) {
@ -2345,16 +2346,15 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDist* pTa
if (numOfRows > pTableBlockInfo->maxRows) pTableBlockInfo->maxRows = numOfRows;
if (numOfRows < pTableBlockInfo->minRows) pTableBlockInfo->minRows = numOfRows;
if (numOfRows < defaultRows) pTableBlockInfo->numOfSmallBlocks+=1;
int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
blockInfo->numBlocksOfStep++;
// int32_t stepIndex = (numOfRows-1)/TSDB_BLOCK_DIST_STEP_ROWS;
// SFileBlockInfo *blockInfo = (SFileBlockInfo*)taosArrayGet(pTableBlockInfo->dataBlockInfos, stepIndex);
// blockInfo->numBlocksOfStep++;
}
}
}
return code;
}
#endif
static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exists) {
STsdbFS* pFileHandle = REPO_FS(pTsdbReadHandle->pTsdb);

View File

@ -128,6 +128,11 @@ typedef struct {
int64_t sumRunTimes;
} SOperatorProfResult;
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct STaskCostInfo {
int64_t created;
int64_t start;
@ -163,6 +168,11 @@ typedef struct SOperatorCostInfo {
uint64_t execCost;
} SOperatorCostInfo;
typedef struct SOrder {
uint32_t order;
SColumn col;
} SOrder;
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef struct STaskAttr {
@ -196,7 +206,6 @@ typedef struct STaskAttr {
STimeWindow window;
SInterval interval;
SSessionWindow sw;
int16_t precision;
int16_t numOfOutput;
int16_t fillType;
@ -206,13 +215,8 @@ typedef struct STaskAttr {
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
int32_t maxTableColumnWidth;
int32_t tagLen; // tag value length of current query
SGroupbyExpr* pGroupbyExpr;
SExprInfo* pExpr1;
SExprInfo* pExpr2;
int32_t numOfExpr2;
SExprInfo* pExpr3;
int32_t numOfExpr3;
SColumnInfo* tableCols;
SColumnInfo* tagColList;
@ -220,8 +224,6 @@ typedef struct STaskAttr {
int64_t* fillVal;
SSingleColumnFilterInfo* pFilterInfo;
// SFilterInfo *pFilters;
void* tsdb;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
int32_t vgId;
@ -384,7 +386,7 @@ typedef struct SExchangeInfo {
} SExchangeInfo;
typedef struct STableScanInfo {
void* pTsdbReadHandle;
void* dataReader;
int32_t numOfBlocks; // extract basic running information.
int32_t numOfSkipped;
int32_t numOfBlockStatis;
@ -644,12 +646,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock,
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr,
int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream,

View File

@ -249,7 +249,6 @@ static void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType,
SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
static int32_t getGroupbyColumnIndex(SGroupbyExpr* pGroupbyExpr, SSDataBlock* pDataBlock);
static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type,
int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo,
SAggSupporter* pAggSup);
@ -287,44 +286,6 @@ static int compareRowData(const void* a, const void* b, const void* userData) {
return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
}
static void sortGroupResByOrderList(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv,
SSDataBlock* pDataBlock) {
SArray* columnOrderList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
size_t size = taosArrayGetSize(columnOrderList);
taosArrayDestroy(columnOrderList);
if (size <= 0) {
return;
}
int32_t orderId = pRuntimeEnv->pQueryAttr->order.col.colId;
if (orderId <= 0) {
return;
}
bool found = false;
int16_t dataOffset = 0;
for (int32_t j = 0; j < pDataBlock->info.numOfCols; ++j) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pDataBlock->pDataBlock, j);
if (orderId == j) {
found = true;
break;
}
dataOffset += pColInfoData->info.bytes;
}
if (found == false) {
return;
}
int16_t type = pRuntimeEnv->pQueryAttr->pExpr1[orderId].base.resSchema.type;
SRowCompSupporter support = {.pRuntimeEnv = pRuntimeEnv, .dataOffset = dataOffset, .comFunc = getComparFunc(type, 0)};
taosArraySortPWithExt(pGroupResInfo->pRows, compareRowData, &support);
}
// setup the output buffer for each operator
SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
@ -375,17 +336,6 @@ static bool isSelectivityWithTagsQuery(SqlFunctionCtx* pCtx, int32_t numOfOutput
// return (numOfSelectivity > 0 && hasTags);
}
static bool isProjQuery(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functId != FUNCTION_PRJ && functId != FUNCTION_TAGPRJ) {
return false;
}
}
return true;
}
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) ||
pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
@ -1411,15 +1361,10 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
}
static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlFunctionCtx* pCtx, int32_t pos,
int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols,
STimeWindow* win) {
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols, STimeWindow* win) {
bool ascQuery = true;
TSKEY curTs = tsCols[pos];
TSKEY lastTs = *(TSKEY*)pRuntimeEnv->prevRow[0];
TSKEY lastTs = 0;//*(TSKEY*)pRuntimeEnv->prevRow[0];
// lastTs == INT64_MIN and pos == 0 means this is the first time window, interpolation is not needed.
// start exactly from this point, no need to do interpolation
@ -1434,27 +1379,24 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlF
return true;
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
TSKEY prevTs = ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery)) ? lastTs : tsCols[pos - step];
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key,
RESULT_ROW_START_INTERP);
doTimeWindowInterpolation(pOperatorInfo, pOperatorInfo->info, pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP);
return true;
}
static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFunctionCtx* pCtx, int32_t endRowIndex,
SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey,
STimeWindow* win) {
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
int32_t order = TSDB_ORDER_ASC;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
TSKEY actualEndKey = tsCols[endRowIndex];
TSKEY key = QUERY_IS_ASC_QUERY(pQueryAttr) ? win->ekey : win->skey;
TSKEY key = order ? win->ekey : win->skey;
// not ended in current data block, do not invoke interpolation
if ((key > blockEkey && QUERY_IS_ASC_QUERY(pQueryAttr)) || (key < blockEkey && !QUERY_IS_ASC_QUERY(pQueryAttr))) {
if ((key > blockEkey /*&& QUERY_IS_ASC_QUERY(pQueryAttr)*/) || (key < blockEkey /*&& !QUERY_IS_ASC_QUERY(pQueryAttr)*/)) {
setNotInterpoWindowKey(pCtx, numOfOutput, RESULT_ROW_END_INTERP);
return false;
}
@ -1465,7 +1407,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun
return true;
}
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
int32_t nextRowIndex = endRowIndex + step;
assert(nextRowIndex >= 0);
@ -1667,10 +1609,9 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput;
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = true;
TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) {
@ -1683,7 +1624,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery);
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
STimeWindow win = {0};//getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MAIN_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL;
@ -1699,7 +1640,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY ekey = reviseWindowEkey(pQueryAttr, &win);
TSKEY ekey = 0;//reviseWindowEkey(pQueryAttr, &win);
// forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey,
// binarySearchForKey, true);
@ -1713,31 +1654,31 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey,
// prevEndPos);
if (startPos < 0) {
if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) {
int32_t code =
setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
startPos = pSDataBlock->info.rows - 1;
// if ((ascQuery && win.skey <= pQueryAttr->window.ekey) || ((!ascQuery) && win.ekey >= pQueryAttr->window.ekey)) {
// int32_t code =
// setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult,
// tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset);
// if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
// longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
// }
//
// startPos = pSDataBlock->info.rows - 1;
// window start(end) key interpolation
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos,
// forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, ascQuery ? &win : &preWin, startPos,
// forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
// }
break;
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
}
if (pQueryAttr->timeWindowInterpo) {
int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
// if (pQueryAttr->timeWindowInterpo) {
// int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
// saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
}
// }
// updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
}
@ -2023,28 +1964,6 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo* binfo, int32_t numOfCo
return TSDB_CODE_SUCCESS;
}
static int32_t getGroupbyColumnIndex(SGroupbyExpr* pGroupbyExpr, SSDataBlock* pDataBlock) {
size_t num = taosArrayGetSize(pGroupbyExpr->columnInfo);
for (int32_t k = 0; k < num; ++k) {
SColIndex* pColIndex = taosArrayGet(pGroupbyExpr->columnInfo, k);
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
continue;
}
int32_t colId = pColIndex->colId;
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
if (pColInfo->info.colId == colId) {
return i;
}
}
}
assert(0);
return -1;
}
static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
@ -2264,64 +2183,6 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return NULL;
}
static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv* pRuntimeEnv, int32_t numOfTables, SArray* pOperator,
void* merger) {
// qDebug("QInfo:0x%"PRIx64" setup runtime env", GET_TASKID(pRuntimeEnv));
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pResultRowHashTable =
taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pRuntimeEnv->pResultRowListSet =
taosHashInit(numOfTables * 10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pRuntimeEnv->keyBuf = taosMemoryMalloc(pQueryAttr->maxTableColumnWidth + sizeof(int64_t) + POINTER_BYTES);
// pRuntimeEnv->pool = initResultRowPool(getResultRowSize(pRuntimeEnv));
pRuntimeEnv->pResultRowArrayList = taosArrayInit(numOfTables, sizeof(SResultRowCell));
pRuntimeEnv->prevRow = taosMemoryMalloc(POINTER_BYTES * pQueryAttr->numOfCols + pQueryAttr->srcRowSize);
pRuntimeEnv->tagVal = taosMemoryMalloc(pQueryAttr->tagLen);
// NOTE: pTableCheckInfo need to update the query time range and the lastKey info
pRuntimeEnv->pTableRetrieveTsMap =
taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// pRuntimeEnv->scalarSup = createScalarFuncSupport(pQueryAttr->numOfOutput);
if (pRuntimeEnv->scalarSup == NULL || pRuntimeEnv->pResultRowHashTable == NULL || pRuntimeEnv->keyBuf == NULL ||
pRuntimeEnv->prevRow == NULL || pRuntimeEnv->tagVal == NULL) {
goto _clean;
}
if (pQueryAttr->numOfCols) {
char* start = POINTER_BYTES * pQueryAttr->numOfCols + (char*)pRuntimeEnv->prevRow;
pRuntimeEnv->prevRow[0] = start;
for (int32_t i = 1; i < pQueryAttr->numOfCols; ++i) {
pRuntimeEnv->prevRow[i] = pRuntimeEnv->prevRow[i - 1] + pQueryAttr->tableCols[i - 1].bytes;
}
if (pQueryAttr->tableCols[0].type == TSDB_DATA_TYPE_TIMESTAMP) {
*(int64_t*)pRuntimeEnv->prevRow[0] = INT64_MIN;
}
}
// qDebug("QInfo:0x%"PRIx64" init runtime environment completed", GET_TASKID(pRuntimeEnv));
// group by normal column, sliding window query, interval query are handled by interval query processor
// interval (down sampling operation)
return TSDB_CODE_SUCCESS;
_clean:
// destroyScalarFuncSupport(pRuntimeEnv->scalarSup, pRuntimeEnv->pQueryAttr->numOfOutput);
taosMemoryFreeClear(pRuntimeEnv->pResultRowHashTable);
taosMemoryFreeClear(pRuntimeEnv->keyBuf);
taosMemoryFreeClear(pRuntimeEnv->prevRow);
taosMemoryFreeClear(pRuntimeEnv->tagVal);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
static void doFreeQueryHandle(STaskRuntimeEnv* pRuntimeEnv) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
@ -2400,17 +2261,6 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_Q
// return false;
//}
static bool isFirstLastRowQuery(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionID = getExprFunctionId(&pQueryAttr->pExpr1[i]);
if (functionID == FUNCTION_LAST_ROW) {
return true;
}
}
return false;
}
static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
@ -2493,18 +2343,6 @@ static bool onlyOneQueryType(STaskAttr* pQueryAttr, int32_t functId, int32_t fun
return true;
}
static bool onlyFirstQuery(STaskAttr* pQueryAttr) {
return onlyOneQueryType(pQueryAttr, FUNCTION_FIRST, FUNCTION_FIRST_DST);
}
static bool onlyLastQuery(STaskAttr* pQueryAttr) {
return onlyOneQueryType(pQueryAttr, FUNCTION_LAST, FUNCTION_LAST_DST);
}
static bool notContainSessionOrStateWindow(STaskAttr* pQueryAttr) {
return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow);
}
static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
bool hasFirstLastFunc = false;
bool hasOtherFunc = false;
@ -3006,7 +2844,7 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo,
*status = BLK_DATA_ALL_NEEDED;
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL);
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
if (pCols == NULL) {
return terrno;
}
@ -3899,76 +3737,6 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S
return 0;
}
// TODO refactor: this funciton should be merged with setparamForStableStddevColumnData function.
void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExprInfo) {
#if 0
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t numOfExprs = pQueryAttr->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo1 = &(pExprInfo[i]);
if (pExprInfo1->base.functionId != FUNCTION_STDDEV_DST) {
continue;
}
SExprBasicInfo* pExpr = &pExprInfo1->base;
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (pQueryAttr->tagLen == 0 || memcmp(p->tags, pRuntimeEnv->tagVal, pQueryAttr->tagLen) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->info.colId == pExpr->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
#endif
}
void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput,
SExprInfo* pExpr, char* val, int16_t bytes) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
#if 0
int32_t numOfExprs = pQueryAttr->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SExprBasicInfo* pExpr1 = &pExpr[i].base;
if (pExpr1->functionId != FUNCTION_STDDEV_DST) {
continue;
}
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (bytes == 0 || memcmp(p->tags, val, bytes) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->info.colId == pExpr1->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
#endif
}
/*
* There are two cases to handle:
*
@ -4679,13 +4447,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
*newgroup = false;
while (tsdbNextDataBlock(pTableScanInfo->pTsdbReadHandle)) {
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
pTableScanInfo->numOfBlocks += 1;
tsdbRetrieveDataBlockInfo(pTableScanInfo->pTsdbReadHandle, &pBlock->info);
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
// todo opt
// if (pTableGroupInfo->numOfTables > 1 || (pRuntimeEnv->current == NULL && pTableGroupInfo->numOfTables == 1)) {
@ -4722,7 +4490,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->pTsdbReadHandle == NULL) {
if (pTableScanInfo->dataReader == NULL) {
return NULL;
}
@ -4750,11 +4518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN;
// if (pTaskInfo->pTsBuf) {
// bool ret = tsBufNextPos(pRuntimeEnv->pTsBuf);
// assert(ret);
// }
//
if (pResultRowInfo->size > 0) {
pResultRowInfo->curPos = 0;
}
@ -4790,44 +4553,43 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) {
STableScanInfo* pTableScanInfo = pOperator->info;
*newgroup = false;
#if 0
STableBlockDist tableBlockDist = {0};
tableBlockDist.numOfTables = (int32_t)pOperator->pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
STableBlockDistInfo tableBlockDist = {0};
tableBlockDist.numOfTables = 1; // TODO set the correct number of tables
int32_t numRowSteps = TSDB_DEFAULT_MAX_ROW_FBLOCK / TSDB_BLOCK_DIST_STEP_ROWS;
if (TSDB_DEFAULT_MAX_ROW_FBLOCK % TSDB_BLOCK_DIST_STEP_ROWS != 0) {
++numRowSteps;
}
tableBlockDist.dataBlockInfos = taosArrayInit(numRowSteps, sizeof(SFileBlockInfo));
taosArraySetSize(tableBlockDist.dataBlockInfos, numRowSteps);
tableBlockDist.maxRows = INT_MIN;
tableBlockDist.minRows = INT_MAX;
tsdbGetFileBlocksDistInfo(pTableScanInfo->pTsdbReadHandle, &tableBlockDist);
tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->pTsdbReadHandle);
tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &tableBlockDist);
tableBlockDist.numOfRowsInMemTable = (int32_t) tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader);
SSDataBlock* pBlock = &pTableScanInfo->block;
pBlock->info.rows = 1;
pBlock->info.numOfCols = 1;
SBufferWriter bw = tbufInitWriter(NULL, false);
blockDistInfoToBinary(&tableBlockDist, &bw);
// SBufferWriter bw = tbufInitWriter(NULL, false);
// blockDistInfoToBinary(&tableBlockDist, &bw);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
int32_t len = (int32_t) tbufTell(&bw);
pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
*(int32_t*) pColInfo->pData = len;
memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
tbufCloseWriter(&bw);
// int32_t len = (int32_t) tbufTell(&bw);
// pColInfo->pData = taosMemoryMalloc(len + sizeof(int32_t));
// *(int32_t*) pColInfo->pData = len;
// memcpy(pColInfo->pData + sizeof(int32_t), tbufGetData(&bw, false), len);
//
// tbufCloseWriter(&bw);
SArray* g = GET_TABLEGROUP(pOperator->pRuntimeEnv, 0);
pOperator->pRuntimeEnv->current = taosArrayGetP(g, 0);
pOperator->status = OP_EXEC_DONE;
return pBlock;
#endif
}
static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
@ -5472,7 +5234,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
}
pInfo->pFilterNode = pCondition;
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->dataReader = pTsdbReadHandle;
pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime;
pInfo->order = order;
@ -5494,7 +5256,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->dataReader = pTsdbReadHandle;
pInfo->times = 1;
pInfo->reverseTimes = 0;
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
@ -5515,32 +5277,42 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim
return pOperator;
}
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) {
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pInfo->pTsdbReadHandle = pTsdbReadHandle;
pInfo->dataReader = dataReader;
pInfo->block.pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
SColumnInfoData infoData = {{0}};
infoData.info.type = TSDB_DATA_TYPE_BINARY;
SColumnInfoData infoData = {0};
infoData.info.type = TSDB_DATA_TYPE_BINARY;
infoData.info.bytes = 1024;
infoData.info.colId = 0;
taosArrayPush(pInfo->block.pDataBlock, &infoData);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
pOperator->name = "TableBlockInfoScanOperator";
pOperator->name = "DataBlockInfoScanOperator";
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
// pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols;
pOperator->getNextFn = doBlockInfoScan;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doBlockInfoScan;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
}
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -5579,16 +5351,17 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
pInfo->readerHandle = streamReadHandle;
pInfo->pRes = pResBlock;
pOperator->name = "StreamBlockScanOperator";
pOperator->name = "StreamBlockScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doStreamBlockScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfOutput = pResBlock->info.numOfCols;
pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doStreamBlockScan;
pOperator->closeFn = operatorDummyCloseFn;
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
}
@ -5922,80 +5695,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
return pOperator;
}
SArray* getOrderCheckColumns(STaskAttr* pQuery) {
int32_t numOfCols = (pQuery->pGroupbyExpr == NULL) ? 0 : taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo);
SArray* pOrderColumns = NULL;
if (numOfCols > 0) {
pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo);
} else {
pOrderColumns = taosArrayInit(4, sizeof(SColIndex));
}
if (pQuery->interval.interval > 0) {
if (pOrderColumns == NULL) {
pOrderColumns = taosArrayInit(1, sizeof(SColIndex));
}
SColIndex colIndex = {.colIndex = 0, .colId = 0, .flag = TSDB_COL_NORMAL};
taosArrayPush(pOrderColumns, &colIndex);
}
{
numOfCols = (int32_t)taosArrayGetSize(pOrderColumns);
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex* index = taosArrayGet(pOrderColumns, i);
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SExprBasicInfo* pExpr = &pQuery->pExpr1[j].base;
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[j]);
if (index->colId == pExpr->pParam[0].pCol->colId &&
(functionId == FUNCTION_PRJ || functionId == FUNCTION_TAG || functionId == FUNCTION_TS)) {
index->colIndex = j;
index->colId = pExpr->resSchema.colId;
}
}
}
}
return pOrderColumns;
}
SArray* getResultGroupCheckColumns(STaskAttr* pQuery) {
int32_t numOfCols = (pQuery->pGroupbyExpr == NULL) ? 0 : taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo);
SArray* pOrderColumns = NULL;
if (numOfCols > 0) {
pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo);
} else {
pOrderColumns = taosArrayInit(4, sizeof(SColIndex));
}
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex* index = taosArrayGet(pOrderColumns, i);
bool found = false;
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
SExprBasicInfo* pExpr = &pQuery->pExpr1[j].base;
int32_t functionId = getExprFunctionId(&pQuery->pExpr1[j]);
// FUNCTION_TAG_DUMMY function needs to be ignored
// if (index->colId == pExpr->pColumns->info.colId &&
// ((TSDB_COL_IS_TAG(pExpr->pColumns->flag) && functionId == FUNCTION_TAG) ||
// (TSDB_COL_IS_NORMAL_COL(pExpr->pColumns->flag) && functionId == FUNCTION_PRJ))) {
// index->colIndex = j;
// index->colId = pExpr->resSchema.colId;
// found = true;
// break;
// }
}
assert(found && index->colIndex >= 0 && index->colIndex < pQuery->numOfOutput);
}
return pOrderColumns;
}
static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, const char* pKey);
static void cleanupAggSup(SAggSupporter* pAggSup);
@ -7044,9 +6743,6 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr
return pIntervalInfo->binfo.pRes;
}
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t order = pQueryAttr->order.order;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
@ -7062,14 +6758,14 @@ static SSDataBlock* doAllSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgr
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
// setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order);
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
}
pOperator->status = OP_RES_TO_RETURN;
pQueryAttr->order.order = order; // TODO : restore the order
// pQueryAttr->order.order = order; // TODO : restore the order
doCloseAllTimeWindow(pRuntimeEnv);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
@ -8153,7 +7849,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]);
if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
assert(pQueryAttr->numOfOutput == 1);
SExprInfo* pExprInfo = &pOperator->pExpr[0];
@ -8936,8 +8631,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return pList;
}
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
uint64_t queryId, uint64_t taskId) {
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId) {
int32_t code = 0;
if (tableType == TSDB_SUPER_TABLE) {
code = tsdbQuerySTableByTagCond(metaHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId);
@ -9041,43 +8735,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol
return TSDB_CODE_SUCCESS;
}
static void doUpdateExprColumnIndex(STaskAttr* pQueryAttr) {
assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL);
for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) {
SExprBasicInfo* pSqlExprMsg = &pQueryAttr->pExpr1[k].base;
// if (pSqlExprMsg->functionId == FUNCTION_ARITHM) {
// continue;
// }
// todo opt performance
SColIndex* pColIndex = NULL; /*&pSqlExprMsg->colInfo;*/
if (TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
int32_t f = 0;
for (f = 0; f < pQueryAttr->numOfCols; ++f) {
if (pColIndex->colId == pQueryAttr->tableCols[f].colId) {
pColIndex->colIndex = f;
break;
}
}
assert(f < pQueryAttr->numOfCols);
} else if (pColIndex->colId <= TSDB_UD_COLUMN_INDEX) {
// do nothing for user-defined constant value result columns
} else {
int32_t f = 0;
for (f = 0; f < pQueryAttr->numOfTags; ++f) {
if (pColIndex->colId == pQueryAttr->tagColList[f].colId) {
pColIndex->colIndex = f;
break;
}
}
assert(f < pQueryAttr->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX);
}
}
}
void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);
@ -9087,16 +8744,16 @@ void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const float THRESHOLD_RATIO = 0.85f;
if (isProjQuery(pQueryAttr)) {
int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQueryAttr->resultRowSize;
if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) {
numOfRes = MIN_ROWS_FOR_PRJ_QUERY;
}
pResultInfo->capacity = numOfRes;
} else { // in case of non-prj query, a smaller output buffer will be used.
pResultInfo->capacity = DEFAULT_MIN_ROWS;
}
// if (isProjQuery(pQueryAttr)) {
// int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQueryAttr->resultRowSize;
// if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) {
// numOfRes = MIN_ROWS_FOR_PRJ_QUERY;
// }
//
// pResultInfo->capacity = numOfRes;
// } else { // in case of non-prj query, a smaller output buffer will be used.
// pResultInfo->capacity = DEFAULT_MIN_ROWS;
// }
pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO);
pResultInfo->totalRows = 0;

View File

@ -176,26 +176,6 @@ typedef struct SResPair {
double avg;
} SResPair;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct STableBlockDist {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray *dataBlockInfos;
} STableBlockDist;
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) {
pCell->initialized = false;
}
@ -3984,7 +3964,7 @@ static void irate_function(SqlFunctionCtx *pCtx) {
}
}
static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDist* pDist) {
static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDistInfo* pDist) {
SBufferReader br = tbufInitReader(data, len, false);
pDist->numOfTables = tbufReadUint32(&br);
@ -4024,7 +4004,7 @@ static void blockDistInfoFromBinary(const char* data, int32_t len, STableBlockDi
static void blockInfo_func(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo);
int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist);
@ -4036,8 +4016,8 @@ static void blockInfo_func(SqlFunctionCtx* pCtx) {
//pResInfo->hasResult = DATA_SET_FLAG;
}
static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlockDist* pSrc) {
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlockDistInfo* pSrc) {
STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo);
assert(pDist != NULL && pSrc != NULL);
pDist->numOfTables += pSrc->numOfTables;
@ -4071,7 +4051,7 @@ static void mergeTableBlockDist(SResultRowEntryInfo* pResInfo, const STableBlock
}
void block_func_merge(SqlFunctionCtx* pCtx) {
STableBlockDist info = {0};
STableBlockDistInfo info = {0};
int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary(((char*)pCtx->pInput) + sizeof(int32_t), len, &info);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
@ -4082,7 +4062,7 @@ void block_func_merge(SqlFunctionCtx* pCtx) {
//pResInfo->hasResult = DATA_SET_FLAG;
}
void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents,
void getPercentiles(STableBlockDistInfo *pTableBlockDist, int64_t totalBlocks, int32_t numOfPercents,
double* percents, int32_t* percentiles) {
if (totalBlocks == 0) {
for (int32_t i = 0; i < numOfPercents; ++i) {
@ -4117,7 +4097,7 @@ void getPercentiles(STableBlockDist *pTableBlockDist, int64_t totalBlocks, int32
}
}
void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
void generateBlockDistResult(STableBlockDistInfo *pTableBlockDist, char* result) {
if (pTableBlockDist == NULL) {
return;
}
@ -4178,7 +4158,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
void blockinfo_func_finalizer(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
STableBlockDistInfo* pDist = (STableBlockDistInfo*) GET_ROWCELL_INTERBUF(pResInfo);
pDist->rowSize = (uint16_t)pCtx->param[0].i;
generateBlockDistResult(pDist, pCtx->pOutput);