[td-225] refactor some codes.

This commit is contained in:
hjxilinx 2020-05-11 00:25:57 +08:00
parent 586b5db8c1
commit 49b331d879
5 changed files with 129 additions and 198 deletions

View File

@ -1691,10 +1691,7 @@ static void last_function(SQLFunctionCtx *pCtx) {
}
static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) {
if (pCtx->order == TSDB_ORDER_ASC) {
return;
}
assert(pCtx->order != TSDB_ORDER_ASC);
void *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
return;
@ -2912,7 +2909,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) {
}
static void date_col_output_function(SQLFunctionCtx *pCtx) {
if (pCtx->scanFlag == SUPPLEMENTARY_SCAN) {
if (pCtx->scanFlag == REVERSE_SCAN) {
return;
}

View File

@ -110,6 +110,14 @@ typedef struct STableQueryInfo { // todo merge with the STableQueryInfo struct
SWindowResInfo windowResInfo;
} STableQueryInfo;
typedef struct SQueryCostSummary {
} SQueryCostSummary;
typedef struct SGroupItem {
STableId id;
STableQueryInfo* info;
} SGroupItem;
typedef struct SQuery {
int16_t numOfCols;
int16_t numOfTags;
@ -131,17 +139,15 @@ typedef struct SQuery {
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* defaultVal;
TSKEY lastKey;
// TSKEY lastKey;
uint32_t status; // query status
SResultRec rec;
int32_t pos;
SData** sdata;
STableQueryInfo* current;
SSingleColumnFilterInfo* pFilterInfo;
} SQuery;
typedef struct SQueryCostSummary {
} SQueryCostSummary;
typedef struct SQueryRuntimeEnv {
SResultInfo* resultInfo; // todo refactor to merge with SWindowResInfo
SQuery* pQuery;

View File

@ -104,7 +104,7 @@ extern "C" {
enum {
MASTER_SCAN = 0x0u,
SUPPLEMENTARY_SCAN = 0x1u,
REVERSE_SCAN = 0x1u,
REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan
FIRST_STAGE_MERGE = 0x10u,
SECONDARY_STAGE_MERGE = 0x20u,

View File

@ -40,9 +40,9 @@
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_QINFO_ADDR(x) ((void *)((char *)(x)-offsetof(SQInfo, runtimeEnv)))
@ -96,11 +96,6 @@ typedef struct {
STSCursor cur;
} SQueryStatusInfo;
typedef struct SGroupItem {
STableId id;
STableQueryInfo* info;
} SGroupItem;
static void setQueryStatus(SQuery *pQuery, int8_t status);
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
@ -121,7 +116,7 @@ static bool hasMainOutput(SQuery *pQuery);
static void createTableQueryInfo(SQInfo *pQInfo);
static void buildTagQueryResult(SQInfo *pQInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTaleId, STableQueryInfo *pTableQueryInfo);
static int32_t setAdditionalInfo(SQInfo *pQInfo, STableId *pTableId, STableQueryInfo *pTableQueryInfo);
static int32_t flushFromResultBuf(SQInfo *pQInfo);
bool getNeighborPoints(SQInfo *pQInfo, void *pMeterObj, SPointInterpoSupporter *pPointInterpSupporter) {
@ -621,8 +616,8 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
}
// query completed
if ((lastKey >= pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->window.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
if ((lastKey >= pQuery->current->win.ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(lastKey <= pQuery->current->win.ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeAllTimeWindow(pWindowResInfo);
pWindowResInfo->curIndex = pWindowResInfo->size - 1;
@ -662,22 +657,22 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
}
qTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, n);
qTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pRuntimeEnv), pWindowResInfo->size, n);
}
assert(pWindowResInfo->prevSKey != 0);
}
static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn,
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn,
bool updateLastKey) {
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
assert(startPos >= 0 && startPos < pDataBlockInfo->rows);
int32_t num = -1;
int32_t order = pQuery->order.order;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
STableQueryInfo* item = pQuery->current;
if (QUERY_IS_ASC_QUERY(pQuery)) {
if (ekey < pDataBlockInfo->window.ekey) {
num = getForwardStepsInBlock(pDataBlockInfo->rows, searchFn, ekey, startPos, order, pPrimaryColumn);
@ -685,13 +680,13 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
assert(ekey < pPrimaryColumn[startPos]);
} else {
if (updateLastKey) {
pQuery->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
item->lastKey = pPrimaryColumn[startPos + (num - 1)] + step;
}
}
} else {
num = pDataBlockInfo->rows - startPos;
if (updateLastKey) {
pQuery->lastKey = pDataBlockInfo->window.ekey + step;
item->lastKey = pDataBlockInfo->window.ekey + step;
}
}
} else { // desc
@ -701,13 +696,13 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
assert(ekey > pPrimaryColumn[startPos]);
} else {
if (updateLastKey) {
pQuery->lastKey = pPrimaryColumn[startPos - (num - 1)] + step;
item->lastKey = pPrimaryColumn[startPos - (num - 1)] + step;
}
}
} else {
num = startPos + 1;
if (updateLastKey) {
pQuery->lastKey = pDataBlockInfo->window.skey + step;
item->lastKey = pDataBlockInfo->window.skey + step;
}
}
}
@ -906,8 +901,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
SQuery * pQuery = pRuntimeEnv->pQuery;
SColumnInfoData *pColInfo = NULL;
TSKEY * primaryKeyCol = NULL;
TSKEY *primaryKeyCol = NULL;
if (pDataBlock != NULL) {
pColInfo = taosArrayGet(pDataBlock, 0);
primaryKeyCol = (TSKEY *)(pColInfo->pData);
@ -1097,15 +1092,20 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SQuery* pQuery = pRuntimeEnv->pQuery;
if (pResInfo->complete || functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
return false;
}
if (functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST) {
return !QUERY_IS_ASC_QUERY(pQuery);
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST) {
return QUERY_IS_ASC_QUERY(pQuery);
}
// in the supplementary scan, only the following functions need to be executed
if (IS_REVERSE_SCAN(pRuntimeEnv) &&
!(functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST ||
functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) {
if (IS_REVERSE_SCAN(pRuntimeEnv)) {// && (functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TS)) {
return false;
}
@ -1117,9 +1117,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* item = pQuery->current;
TSKEY *primaryKeyCol = (TSKEY*) ((SColumnInfoData *)taosArrayGet(pDataBlock, 0))->pData;
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
bool groupbyStateValue = isGroupbyNormalCol(pQuery->pGroupbyExpr);
SArithmeticSupport *sasArray = calloc((size_t)pQuery->numOfOutput, sizeof(SArithmeticSupport));
int16_t type = 0;
@ -1253,7 +1254,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
}
}
pQuery->lastKey = primaryKeyCol[offset] + step;
item->lastKey = primaryKeyCol[offset] + step;
// todo refactor: extract method
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
@ -1270,7 +1271,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo,
SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
STableQueryInfo* pTableQInfo = pQuery->current;
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
@ -1279,7 +1282,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
}
TSKEY lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.ekey : pDataBlockInfo->window.skey;
pQuery->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
pTableQInfo->lastKey = lastKey + GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
@ -1745,73 +1748,6 @@ static UNUSED_FUNC bool doSetDataInfo(SQInfo *pQInfo, SPointInterpoSupporter *pP
}
}
// TODO refactor code, the best way to implement the last_row is utilizing the iterator
bool normalizeUnBoundLastRowQuery(SQInfo *pQInfo, SPointInterpoSupporter *pPointInterpSupporter) {
#if 0
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SMeterObj *pMeterObj = pRuntimeEnv->pTabObj;
assert(!QUERY_IS_ASC_QUERY(pQuery) && notHasQueryTimeRange(pQuery));
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm];
TSKEY lastKey = -1;
pQuery->fileId = -1;
vnodeFreeFieldsEx(pRuntimeEnv);
// keep in-memory cache status in local variables in case that it may be changed by write operation
getBasicCacheInfoSnapshot(pQuery, pMeterObj->pCache, pMeterObj->vnode);
SCacheInfo *pCacheInfo = (SCacheInfo *)pMeterObj->pCache;
if (pCacheInfo != NULL && pCacheInfo->cacheBlocks != NULL && pQuery->numOfBlocks > 0) {
pQuery->fileId = -1;
TSKEY key = pMeterObj->lastKey;
pQuery->window.skey = key;
pQuery->window.ekey = key;
pQuery->lastKey = pQuery->window.skey;
/*
* cache block may have been flushed to disk, and no data in cache anymore.
* So, copy cache block to local buffer is required.
*/
lastKey = getQueryStartPositionInCache(pRuntimeEnv, &pQuery->slot, &pQuery->pos, false);
if (lastKey < 0) { // data has been flushed to disk, try again search in file
lastKey = getQueryPositionForCacheInvalid(pRuntimeEnv, searchFn);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
return false;
}
}
} else { // no data in cache, try file
TSKEY key = pMeterObj->lastKeyOnFile;
pQuery->window.skey = key;
pQuery->window.ekey = key;
pQuery->lastKey = pQuery->window.skey;
bool ret = getQualifiedDataBlock(pMeterObj, pRuntimeEnv, QUERY_RANGE_LESS_EQUAL, searchFn);
if (!ret) { // no data in file, return false;
return false;
}
lastKey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
}
assert(lastKey <= pQuery->window.skey);
pQuery->window.skey = lastKey;
pQuery->window.ekey = lastKey;
pQuery->lastKey = pQuery->window.skey;
return getNeighborPoints(pQInfo, pMeterObj, pPointInterpSupporter);
#endif
return true;
}
static void setScanLimitationByResultBuffer(SQuery *pQuery) {
if (isTopBottomQuery(pQuery)) {
pQuery->checkBuffer = 0;
@ -2446,9 +2382,11 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
GET_QINFO_ADDR(pRuntimeEnv), pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, pTableQueryInfo->lastKey,
pQuery->order.order);
TsdbQueryHandleT pQueryHandle = IS_MASTER_SCAN(pRuntimeEnv)? pRuntimeEnv->pQueryHandle : pRuntimeEnv->pSecQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
@ -2529,7 +2467,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
int32_t step = QUERY_IS_ASC_QUERY(pQuery) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP;
closeAllTimeWindow(&pRuntimeEnv->windowResInfo);
removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step);
removeRedundantWindow(&pRuntimeEnv->windowResInfo, pTableQueryInfo->lastKey - step, step);
pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window
} else {
assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL));
@ -2539,8 +2477,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
return 0;
}
static void updatelastkey(SQuery *pQuery, STableQueryInfo *pTableQInfo) { pTableQInfo->lastKey = pQuery->lastKey; }
/*
* set tag value in SQLFunctionCtx
* e.g.,tag information into input buffer
@ -3338,13 +3274,14 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SQueryStatusInfo info = {
.status = pQuery->status,
.windowIndex = pRuntimeEnv->windowResInfo.curIndex,
.lastKey = pQuery->lastKey,
.lastKey = pTableQueryInfo->lastKey,
.w = pQuery->window,
.curWindow = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey},
.curWindow = {.skey = pTableQueryInfo->lastKey, .ekey = pQuery->window.ekey},
};
return info;
@ -3388,6 +3325,7 @@ static void setEnvBeforeReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusI
static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatusInfo *pStatus) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
SWITCH_ORDER(pQuery->order.order);
switchCtxOrder(pRuntimeEnv);
@ -3401,17 +3339,19 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus
// update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query
// during reverse scan
pQuery->lastKey = pStatus->lastKey;
pTableQueryInfo->lastKey = pStatus->lastKey;
pQuery->status = pStatus->status;
pQuery->window = pStatus->w;
pTableQueryInfo->win = pStatus->w;
}
void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo *pTableQueryInfo = pQuery->current;
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
// store the start query position
SQInfo * pQInfo = (SQInfo *)GET_QINFO_ADDR(pRuntimeEnv);
SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv);
SET_MASTER_SCAN_FLAG(pRuntimeEnv);
@ -3422,7 +3362,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if (pRuntimeEnv->scanFlag == MASTER_SCAN) {
qstatus.status = pQuery->status;
qstatus.curWindow.ekey = pQuery->lastKey - step;
qstatus.curWindow.ekey = pTableQueryInfo->lastKey - step;
}
if (!needScanDataBlocksAgain(pRuntimeEnv)) {
@ -3457,7 +3397,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
if (pRuntimeEnv->stableQuery || !needReverseScan(pQuery)) {
if (!needReverseScan(pQuery)) {
return;
}
@ -3542,12 +3482,10 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->window = pTableQueryInfo->win;
pQuery->lastKey = pTableQueryInfo->lastKey;
assert(((pQuery->lastKey >= pQuery->window.skey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pQuery->lastKey <= pQuery->window.skey) && !QUERY_IS_ASC_QUERY(pQuery)));
pQuery->current = pTableQueryInfo;
assert(((pTableQueryInfo->lastKey >= pTableQueryInfo->win.skey) && QUERY_IS_ASC_QUERY(pQuery)) ||
((pTableQueryInfo->lastKey <= pTableQueryInfo->win.skey) && !QUERY_IS_ASC_QUERY(pQuery)));
}
/**
@ -3555,8 +3493,10 @@ void restoreIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *p
* @param pRuntimeEnv
* @param pDataBlockInfo
*/
void setExecutionContext(SQInfo *pQInfo, STableQueryInfo *pTableQueryInfo, STableId* pTableId, int32_t groupIdx, TSKEY nextKey) {
void setExecutionContext(SQInfo *pQInfo, STableId* pTableId, int32_t groupIdx, TSKEY nextKey) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
int32_t GROUPRESULTID = 1;
@ -3640,12 +3580,12 @@ int32_t setAdditionalInfo(SQInfo *pQInfo, STableId* pTableId, STableQueryInfo *p
* merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there
* is a previous result generated or not.
*/
void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSKEY key) {
void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
STableQueryInfo *pTableQueryInfo = pQuery->current;
if (pTableQueryInfo->queryRangeSet) {
pQuery->lastKey = key;
pTableQueryInfo->lastKey = key;
} else {
pQuery->window.skey = key;
@ -3682,8 +3622,6 @@ void setIntervalQueryRange(STableQueryInfo *pTableQueryInfo, SQInfo *pQInfo, TSK
pTableQueryInfo->queryRangeSet = 1;
pTableQueryInfo->lastKey = pQuery->window.skey;
pTableQueryInfo->win.skey = pQuery->window.skey;
pQuery->lastKey = pQuery->window.skey;
}
}
@ -3703,7 +3641,9 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
* 2. if there are top/bottom, first_dst/last_dst functions, we need to load timestamp column in any cases;
*/
STimeWindow *w = &pDataBlockInfo->window;
bool loadPrimaryTS = (pQuery->lastKey >= w->skey && pQuery->lastKey <= w->ekey) ||
STableQueryInfo* pTableQueryInfo = pQuery->current;
bool loadPrimaryTS = (pTableQueryInfo->lastKey >= w->skey && pTableQueryInfo->lastKey <= w->ekey) ||
(pQuery->window.ekey >= w->skey && pQuery->window.ekey <= w->ekey) || requireTimestamp(pQuery);
return loadPrimaryTS;
@ -3840,7 +3780,6 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo
}
updateWindowResNumOfRes(pRuntimeEnv, pTableQueryInfo);
updatelastkey(pQuery, pTableQueryInfo);
}
bool vnodeHasRemainResults(void *handle) {
@ -4034,10 +3973,12 @@ void vnodePrintQueryStatistics(SQInfo *pQInfo) {
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
SQuery *pQuery = pRuntimeEnv->pQuery;
STableQueryInfo* pTableQueryInfo = pQuery->current;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pQuery->limit.offset == pBlockInfo->rows) { // current block will ignore completed
pQuery->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step;
pTableQueryInfo->lastKey = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->window.ekey + step : pBlockInfo->window.skey + step;
pQuery->limit.offset = 0;
return;
}
@ -4057,7 +3998,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
TSKEY *keys = (TSKEY *)pColInfoData->pData;
// update the offset value
pQuery->lastKey = keys[pQuery->pos];
pTableQueryInfo->lastKey = keys[pQuery->pos];
pQuery->limit.offset = 0;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock);
@ -4076,6 +4017,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
pQuery->pos = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
STableQueryInfo* pTableQueryInfo = pQuery->current;
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
while (tsdbNextDataBlock(pQueryHandle)) {
@ -4087,8 +4029,8 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
if (pQuery->limit.offset > blockInfo.rows) {
pQuery->limit.offset -= blockInfo.rows;
pQuery->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey;
pQuery->lastKey += step;
pTableQueryInfo->lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.window.ekey : blockInfo.window.skey;
pTableQueryInfo->lastKey += step;
qTrace("QInfo:%p skip rows:%d, offset:%" PRId64 "", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.rows,
pQuery->limit.offset);
@ -4117,6 +4059,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
TSKEY skey1, ekey1;
STimeWindow w = {0};
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
STableQueryInfo *pTableQueryInfo = pQuery->current;
while (tsdbNextDataBlock(pRuntimeEnv->pQueryHandle)) {
SDataBlockInfo blockInfo = tsdbRetrieveDataBlockInfo(pRuntimeEnv->pQueryHandle);
@ -4162,7 +4105,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
// set the abort info
pQuery->pos = startPos;
pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
pWindowResInfo->prevSKey = tw.skey;
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock);
@ -4190,7 +4133,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) {
// set the abort info
pQuery->pos = startPos;
pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos];
pWindowResInfo->prevSKey = tw.skey;
win = tw;
} else {
@ -4211,12 +4154,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
setScanLimitationByResultBuffer(pQuery);
changeExecuteScanOrder(pQuery, false);
// dataInCache requires lastKey value
pQuery->lastKey = pQuery->window.skey;
STsdbQueryCond cond = {
.twindow = pQuery->window,
.order = pQuery->order.order,
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
};
@ -4229,6 +4169,9 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
} else if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableIdGroupInfo);
}
// create the table query support structures
createTableQueryInfo(pQInfo);
}
pQInfo->tsdb = tsdb;
@ -4322,7 +4265,6 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
// }
// the pQuery->window.skey is changed during normalizedFirstQueryRange, so set the newest lastkey value
pQuery->lastKey = pQuery->window.skey;
return TSDB_CODE_SUCCESS;
}
@ -4352,7 +4294,6 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
}
}
static int32_t xx = 0;
static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
@ -4392,31 +4333,18 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
}
}
if (pTableQueryInfo->id.tid == 5) {
int32_t k = 1;
printf("%d\n", k);
}
if (!QUERY_IS_ASC_QUERY(pQuery)) {
printf("-------------%d\n", xx++);
}
if (pTableQueryInfo->id.tid == 5 && !QUERY_IS_ASC_QUERY(pQuery)) {
int32_t k = 1;
printf("%d\n", k);
}
assert(pTableQueryInfo != NULL);
restoreIntervalQueryRange(pRuntimeEnv, pTableQueryInfo);
SDataStatis *pStatis = NULL;
SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
TSKEY nextKey = blockInfo.window.skey;
if (!isIntervalQuery(pQuery)) {
setExecutionContext(pQInfo, pTableQueryInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey);
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey);
} else { // interval query
setIntervalQueryRange(pTableQueryInfo, pQInfo, nextKey);
setIntervalQueryRange(pQInfo, nextKey);
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
if (ret != TSDB_CODE_SUCCESS) {
@ -4587,8 +4515,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
}
} else {
createTableQueryInfo(pQInfo);
/*
* 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed.
@ -4621,8 +4547,8 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex);
pQuery->current = item->info;
STableQueryInfo *pInfo = item->info;
if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) {
pQInfo->tableIndex++;
continue;
@ -4662,8 +4588,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
* to ensure that, we can reset the query range once query on a meter is completed.
*/
pQInfo->tableIndex++;
pInfo->lastKey = pQuery->lastKey;
// if the buffer is full or group by each table, we need to jump out of the loop
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL) /*||
isGroupbyEachTable(pQuery->pGroupbyExpr, pSupporter->pSidSet)*/) {
@ -4671,7 +4595,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
}
} else { // forward query range
pQuery->window.skey = pQuery->lastKey;
pQuery->window.skey = pQuery->current->lastKey;
// all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter
if (pQuery->rec.rows == 0) {
@ -4856,12 +4780,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", order:%d, forward scan start", pQInfo,
pQuery->window.skey, pQuery->window.ekey, pQuery->order.order);
// create the query support structures
createTableQueryInfo(pQInfo);
// do check all qualified data blocks
int64_t el = queryOnDataBlocks(pQInfo);
qTrace("QInfo:%p forward scan completed, elapsed time: %lldms, reversed scan start", pQInfo, el);
qTrace("QInfo:%p master scan completed, elapsed time: %lldms, reverse scan start", pQInfo, el);
// query error occurred or query is killed, abort current execution
if (pQInfo->code != TSDB_CODE_SUCCESS || isQueryKilled(pQInfo)) {
@ -4912,10 +4833,12 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
* select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a];
* select count(*) from table_name group by status_column;
*/
static void tableFixedOutputProcess(SQInfo *pQInfo) {
static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableInfo; // set current query table info
scanAllDataBlocks(pRuntimeEnv);
finalizeQueryResult(pRuntimeEnv);
@ -4935,10 +4858,12 @@ static void tableFixedOutputProcess(SQInfo *pQInfo) {
limitResults(pQInfo);
}
static void tableMultiOutputProcess(SQInfo *pQInfo) {
static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery * pQuery = pRuntimeEnv->pQuery;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableInfo;
// for ts_comp query, re-initialized is not allowed
if (!isTSCompQuery(pQuery)) {
resetCtxOutputBuf(pRuntimeEnv);
@ -4973,15 +4898,15 @@ static void tableMultiOutputProcess(SQInfo *pQInfo) {
}
qTrace("QInfo:%p vid:%d sid:%d id:%s, skip current result, offset:%" PRId64 ", next qrange:%" PRId64 "-%" PRId64,
pQInfo, pQuery->limit.offset, pQuery->lastKey);
pQInfo, pQuery->limit.offset, pQuery->current->lastKey);
resetCtxOutputBuf(pRuntimeEnv);
}
limitResults(pQInfo);
if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo, pQuery->lastKey,
pQuery->window.ekey);
qTrace("QInfo:%p query paused due to output limitation, next qrange:%" PRId64 "-%" PRId64, pQInfo,
pQuery->current->lastKey, pQuery->window.ekey);
}
if (!isTSCompQuery(pQuery)) {
@ -5021,11 +4946,12 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
}
// handle time interval query on table
static void tableIntervalProcess(SQInfo *pQInfo) {
static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
SQuery * pQuery = pRuntimeEnv->pQuery;
int32_t numOfInterpo = 0;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->current = pTableInfo;
// skip blocks without load the actual data block from file if no filter condition present
skipTimeInterval(pRuntimeEnv);
@ -5134,15 +5060,19 @@ static void tableQueryImpl(SQInfo *pQInfo) {
// number of points returned during this query
pQuery->rec.rows = 0;
int64_t st = taosGetTimestampUs();
assert(pQInfo->groupInfo.numOfTables == 1);
SArray* g = taosArrayGetP(pQInfo->groupInfo.pGroupList, 0);
SGroupItem* item = taosArrayGet(g, 0);
// group by normal column, sliding window query, interval query are handled by interval query processor
if (isIntervalQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation)
tableIntervalProcess(pQInfo);
tableIntervalProcess(pQInfo, item->info);
} else if (isFixedOutputQuery(pQuery)) {
tableFixedOutputProcess(pQInfo);
tableFixedOutputProcess(pQInfo, item->info);
} else { // diff/add/multiply/subtract/division
assert(pQuery->checkBuffer == 1);
tableMultiOutputProcess(pQInfo);
tableMultiOutputProcess(pQInfo, item->info);
}
// record the total elapsed time
@ -5836,7 +5766,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
pQuery->pos = -1;
pQuery->window = pQueryMsg->window;
pQuery->lastKey = pQuery->window.skey;
if (sem_init(&pQInfo->dataReady, 0, 0) != 0) {
qError("QInfo:%p init dataReady sem failed, reason:%s", pQInfo, strerror(errno));
@ -5913,9 +5842,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
if ((code = doInitQInfo(pQInfo, pTSBuf, tsdb, vgId, isSTable)) != TSDB_CODE_SUCCESS) {
goto _error;
}
// qTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
// pQInfo->refCount);
return code;
_error:
@ -6088,34 +6015,35 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
if (pQueryMsg->numOfTables <= 0) {
qError("Invalid number of tables to query, numOfTables:%d", pQueryMsg->numOfTables);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
goto _over;
}
if (pTableIdList == NULL || taosArrayGetSize(pTableIdList) == 0) {
qError("qmsg:%p, SQueryTableMsg wrong format", pQueryMsg);
code = TSDB_CODE_INVALID_QUERY_MSG;
goto _query_over;
goto _over;
}
SExprInfo *pExprs = NULL;
if ((code = createSqlFunctionExprFromMsg(pQueryMsg, &pExprs, pExprMsg, pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
goto _query_over;
goto _over;
}
SSqlGroupbyExpr *pGroupbyExpr = createGroupbyExprFromMsg(pQueryMsg, pGroupColIndex, &code);
if ((pGroupbyExpr == NULL && pQueryMsg->numOfGroupCols != 0) || code != TSDB_CODE_SUCCESS) {
goto _query_over;
goto _over;
}
bool isSTableQuery = false;
STableGroupInfo groupInfo = {0};
//todo multitable_query??
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_TABLE_QUERY)) {
isSTableQuery = TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
STableId *id = taosArrayGet(pTableIdList, 0);
if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) {
goto _query_over;
goto _over;
}
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_STABLE_QUERY)) {
isSTableQuery = true;
@ -6132,7 +6060,7 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
numOfGroupByCols);
if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query
code = TSDB_CODE_SUCCESS;
goto _query_over;
goto _over;
}
} else {
assert(0);
@ -6141,11 +6069,12 @@ int32_t qCreateQueryInfo(void *tsdb, int32_t vgId, SQueryTableMsg *pQueryMsg, qi
(*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo, pTagColumnInfo);
if ((*pQInfo) == NULL) {
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _over;
}
code = initQInfo(pQueryMsg, tsdb, vgId, *pQInfo, isSTableQuery);
_query_over:
_over:
tfree(tagCond);
tfree(tbnameCond);
taosArrayDestroy(pTableIdList);

View File

@ -85,7 +85,6 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
varDataSetLen(pTable->name, len);
memcpy(pTable->name->data, ptr, len);
printf("%s\n", pTable->name->data);
ptr = (char *)ptr + len;
T_READ_MEMBER(ptr, int64_t, pTable->tableId.uid);