[td-714]
This commit is contained in:
parent
8c9c1b3cc3
commit
552ef8eeca
|
@ -16,7 +16,6 @@
|
||||||
#include "qfill.h"
|
#include "qfill.h"
|
||||||
|
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "hashfunc.h"
|
|
||||||
#include "qExecutor.h"
|
#include "qExecutor.h"
|
||||||
#include "qUtil.h"
|
#include "qUtil.h"
|
||||||
#include "qast.h"
|
#include "qast.h"
|
||||||
|
@ -24,7 +23,6 @@
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "queryLog.h"
|
#include "queryLog.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tdataformat.h"
|
|
||||||
#include "tlosertree.h"
|
#include "tlosertree.h"
|
||||||
#include "tscUtil.h" // todo move the function to common module
|
#include "tscUtil.h" // todo move the function to common module
|
||||||
#include "tscompression.h"
|
#include "tscompression.h"
|
||||||
|
@ -90,6 +88,9 @@ typedef struct {
|
||||||
} SQueryStatusInfo;
|
} SQueryStatusInfo;
|
||||||
|
|
||||||
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
||||||
|
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
|
||||||
|
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
|
||||||
|
|
||||||
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
||||||
|
|
||||||
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
||||||
|
@ -1707,6 +1708,20 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD
|
||||||
|
|
||||||
static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
|
static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
|
||||||
|
|
||||||
|
// todo refactor, add iterator
|
||||||
|
static void doExchangeTimeWindow(SQInfo* pQInfo) {
|
||||||
|
size_t t = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
for(int32_t i = 0; i < t; ++i) {
|
||||||
|
SArray* p1 = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
|
size_t len = taosArrayGetSize(p1);
|
||||||
|
for(int32_t j = 0; j < len; ++j) {
|
||||||
|
STableQueryInfo* pTableQueryInfo = (STableQueryInfo*) taosArrayGetP(p1, j);
|
||||||
|
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
|
static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
|
||||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
|
@ -1749,6 +1764,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
|
||||||
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
||||||
|
|
||||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||||
|
doExchangeTimeWindow(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->order.order = TSDB_ORDER_ASC;
|
pQuery->order.order = TSDB_ORDER_ASC;
|
||||||
|
@ -1758,18 +1774,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
|
||||||
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
||||||
|
|
||||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||||
|
doExchangeTimeWindow(pQInfo);
|
||||||
// todo refactor, add iterator
|
|
||||||
size_t t = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
|
||||||
for(int32_t i = 0; i < t; ++i) {
|
|
||||||
SArray* p1 = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
|
||||||
|
|
||||||
size_t len = taosArrayGetSize(p1);
|
|
||||||
for(int32_t j = 0; j < len; ++j) {
|
|
||||||
STableQueryInfo* pTableQueryInfo = (STableQueryInfo*) taosArrayGetP(p1, j);
|
|
||||||
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->order.order = TSDB_ORDER_DESC;
|
pQuery->order.order = TSDB_ORDER_DESC;
|
||||||
|
@ -2502,10 +2507,10 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
|
||||||
while (pQInfo->groupIndex < numOfGroups) {
|
while (pQInfo->groupIndex < numOfGroups) {
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex);
|
SArray *group = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
|
||||||
ret = mergeIntoGroupResultImpl(pQInfo, group);
|
ret = mergeIntoGroupResultImpl(pQInfo, group);
|
||||||
if (ret < 0) { // not enough disk space to save the data into disk
|
if (ret < 0) { // not enough disk space to save the data into disk
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -2538,7 +2543,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if all results has been sent to client
|
// check if all results has been sent to client
|
||||||
int32_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) {
|
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) {
|
||||||
pQInfo->tableIndex = pQInfo->tableqinfoGroupInfo.numOfTables; // set query completed
|
pQInfo->tableIndex = pQInfo->tableqinfoGroupInfo.numOfTables; // set query completed
|
||||||
return;
|
return;
|
||||||
|
@ -2872,10 +2877,10 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
size_t t = taosArrayGetSize(group);
|
size_t t = taosArrayGetSize(group);
|
||||||
for (int32_t j = 0; j < t; ++j) {
|
for (int32_t j = 0; j < t; ++j) {
|
||||||
|
@ -3362,7 +3367,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
|
||||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||||
|
|
||||||
pCtx->resultInfo = &pResult->resultInfo[i];
|
pCtx->resultInfo = &pResult->resultInfo[i];
|
||||||
if (pCtx->resultInfo->complete) {
|
if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3492,7 +3497,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
|
||||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
|
||||||
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
|
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
|
||||||
} else {
|
} else {
|
||||||
totalSubset = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return totalSubset;
|
return totalSubset;
|
||||||
|
@ -3633,25 +3638,20 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
|
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
|
||||||
|
|
||||||
// todo refactor
|
if (pQuery->limit.limit > 0 && pQuery->rec.total >= pQuery->limit.limit) {
|
||||||
if (pQuery->fillType == TSDB_FILL_NONE || (pQuery->fillType != TSDB_FILL_NONE && isPointInterpoQuery(pQuery))) {
|
|
||||||
assert(pFillInfo == NULL);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQuery->limit.limit > 0 && pQuery->rec.rows >= pQuery->limit.limit) {
|
if (pQuery->fillType != TSDB_FILL_NONE) {
|
||||||
return false;
|
// There are results not returned to client yet, so filling operation applied to the remain result is required
|
||||||
}
|
// in the first place.
|
||||||
|
|
||||||
// There are results not returned to client, fill operation applied to the remain result set in the
|
|
||||||
// first place is required.
|
|
||||||
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
||||||
if (remain > 0) {
|
if (remain > 0) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* While the code reaches here, there are no results returned to client now.
|
* While the code reaches here, there are no results remains now.
|
||||||
* If query is not completed yet, the gaps between two results blocks need to be handled after next data block
|
* If query is not completed yet, the gaps between two results blocks need to be handled after next data block
|
||||||
* is retrieved from TSDB.
|
* is retrieved from TSDB.
|
||||||
*
|
*
|
||||||
|
@ -3664,6 +3664,15 @@ bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
return numOfTotal > 0;
|
return numOfTotal > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// there are results waiting for returned to client.
|
||||||
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
|
||||||
|
(isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) &&
|
||||||
|
(pRuntimeEnv->windowResInfo.size > 0)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3703,7 +3712,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfInterpo) {
|
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfFilled) {
|
||||||
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
|
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
|
||||||
|
@ -4008,7 +4017,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||||
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
|
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
|
||||||
&& (!isFixedOutputQuery(pQuery))
|
&& (!isFixedOutputQuery(pQuery))
|
||||||
) {
|
) {
|
||||||
SArray* pa = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray* pa = GET_TABLEGROUP(pQInfo, 0);
|
||||||
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
|
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
|
||||||
cond.twindow = pCheckInfo->win;
|
cond.twindow = pCheckInfo->win;
|
||||||
}
|
}
|
||||||
|
@ -4052,7 +4061,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
||||||
pQuery->precision = tsdbGetCfg(tsdb)->precision;
|
pQuery->precision = tsdbGetCfg(tsdb)->precision;
|
||||||
|
|
||||||
setScanLimitationByResultBuffer(pQuery);
|
setScanLimitationByResultBuffer(pQuery);
|
||||||
changeExecuteScanOrder(pQuery, false);
|
changeExecuteScanOrder(pQInfo, false);
|
||||||
setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||||
|
|
||||||
pQInfo->tsdb = tsdb;
|
pQInfo->tsdb = tsdb;
|
||||||
|
@ -4155,9 +4164,9 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||||
STableQueryInfo *pTableQueryInfo = NULL;
|
STableQueryInfo *pTableQueryInfo = NULL;
|
||||||
|
|
||||||
// todo opt performance using hash table
|
// todo opt performance using hash table
|
||||||
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
for (int32_t i = 0; i < numOfGroup; ++i) {
|
for (int32_t i = 0; i < numOfGroup; ++i) {
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(group);
|
size_t num = taosArrayGetSize(group);
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
|
@ -4210,7 +4219,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray *group = GET_TABLEGROUP(pQInfo, 0);
|
||||||
STableQueryInfo* pCheckInfo = taosArrayGetP(group, index);
|
STableQueryInfo* pCheckInfo = taosArrayGetP(group, index);
|
||||||
|
|
||||||
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
|
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
|
||||||
|
@ -4274,7 +4283,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
|
||||||
size_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
|
||||||
if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) {
|
if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) {
|
||||||
resetCtxOutputBuf(pRuntimeEnv);
|
resetCtxOutputBuf(pRuntimeEnv);
|
||||||
|
@ -4324,7 +4333,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
taosArrayDestroy(s);
|
taosArrayDestroy(s);
|
||||||
|
|
||||||
// here we simply set the first table as current table
|
// here we simply set the first table as current table
|
||||||
pQuery->current = (STableQueryInfo*) taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
pQuery->current = (STableQueryInfo*) GET_TABLEGROUP(pQInfo, 0);
|
||||||
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
||||||
|
|
||||||
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
|
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
|
||||||
|
@ -4437,7 +4446,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
resetCtxOutputBuf(pRuntimeEnv);
|
resetCtxOutputBuf(pRuntimeEnv);
|
||||||
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
|
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
|
||||||
|
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray *group = GET_TABLEGROUP(pQInfo, 0);
|
||||||
assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables &&
|
assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables &&
|
||||||
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
|
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
|
||||||
|
|
||||||
|
@ -4588,9 +4597,9 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
if (isIntervalQuery(pQuery)) {
|
if (isIntervalQuery(pQuery)) {
|
||||||
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
for (int32_t i = 0; i < numOfGroup; ++i) {
|
for (int32_t i = 0; i < numOfGroup; ++i) {
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(group);
|
size_t num = taosArrayGetSize(group);
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
|
@ -4807,7 +4816,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
pQuery->current = pTableInfo;
|
pQuery->current = pTableInfo;
|
||||||
|
|
||||||
int32_t numOfInterpo = 0;
|
int32_t numOfFilled = 0;
|
||||||
TSKEY newStartKey = TSKEY_INITIAL_VAL;
|
TSKEY newStartKey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
// skip blocks without load the actual data block from file if no filter condition present
|
// skip blocks without load the actual data block from file if no filter condition present
|
||||||
|
@ -4835,9 +4844,9 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
} else {
|
} else {
|
||||||
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey);
|
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey);
|
||||||
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
|
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
|
||||||
numOfInterpo = 0;
|
numOfFilled = 0;
|
||||||
|
|
||||||
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
|
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
|
||||||
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||||
limitResults(pRuntimeEnv);
|
limitResults(pRuntimeEnv);
|
||||||
break;
|
break;
|
||||||
|
@ -4856,7 +4865,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQInfo->pointsInterpo += numOfInterpo;
|
pQInfo->pointsInterpo += numOfFilled;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tableQueryImpl(SQInfo *pQInfo) {
|
static void tableQueryImpl(SQInfo *pQInfo) {
|
||||||
|
@ -4864,53 +4873,48 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
if (queryHasRemainResults(pRuntimeEnv)) {
|
if (queryHasRemainResults(pRuntimeEnv)) {
|
||||||
|
|
||||||
|
if (pQuery->fillType != TSDB_FILL_NONE) {
|
||||||
/*
|
/*
|
||||||
* There are remain results that are not returned due to result interpolation
|
* There are remain results that are not returned due to result interpolation
|
||||||
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
|
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
|
||||||
*/
|
*/
|
||||||
int32_t numOfInterpo = 0;
|
int32_t numOfFilled = 0;
|
||||||
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
|
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
|
||||||
|
|
||||||
if (pQuery->rec.rows > 0) {
|
if (pQuery->rec.rows > 0) {
|
||||||
limitResults(pRuntimeEnv);
|
limitResults(pRuntimeEnv);
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
qTrace("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||||
return;
|
} else {
|
||||||
}
|
|
||||||
|
|
||||||
// here we have scan all qualified data in both data file and cache
|
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
|
||||||
// continue to get push data from the group result
|
|
||||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
|
|
||||||
((isIntervalQuery(pQuery) && pQuery->rec.total < pQuery->limit.limit))) {
|
|
||||||
// todo limit the output for interval query?
|
|
||||||
pQuery->rec.rows = 0;
|
pQuery->rec.rows = 0;
|
||||||
pQInfo->groupIndex = 0; // always start from 0
|
pQInfo->groupIndex = 0; // always start from 0
|
||||||
|
|
||||||
if (pRuntimeEnv->windowResInfo.size > 0) {
|
if (pRuntimeEnv->windowResInfo.size > 0) {
|
||||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||||
pQuery->rec.rows += pQuery->rec.rows;
|
|
||||||
|
|
||||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
||||||
|
|
||||||
if (pQuery->rec.rows > 0) {
|
if (pQuery->rec.rows > 0) {
|
||||||
qTrace("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
qTrace("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||||
return;
|
|
||||||
}
|
// there are not data remains
|
||||||
}
|
if (pRuntimeEnv->windowResInfo.size <= 0) {
|
||||||
|
qTrace("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total);
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// number of points returned during this query
|
// number of points returned during this query
|
||||||
pQuery->rec.rows = 0;
|
pQuery->rec.rows = 0;
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
|
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
|
||||||
SArray* g = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray* g = GET_TABLEGROUP(pQInfo, 0);
|
||||||
STableQueryInfo* item = taosArrayGetP(g, 0);
|
STableQueryInfo* item = taosArrayGetP(g, 0);
|
||||||
|
|
||||||
// group by normal column, sliding window query, interval query are handled by interval query processor
|
// group by normal column, sliding window query, interval query are handled by interval query processor
|
||||||
|
@ -5797,9 +5801,9 @@ static void freeQInfo(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor, extract method to destroytableDataInfo
|
// todo refactor, extract method to destroytableDataInfo
|
||||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
for (int32_t i = 0; i < numOfGroups; ++i) {
|
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray *p = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
SArray *p = GET_TABLEGROUP(pQInfo, i);;
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(p);
|
size_t num = taosArrayGetSize(p);
|
||||||
for(int32_t j = 0; j < num; ++j) {
|
for(int32_t j = 0; j < num; ++j) {
|
||||||
|
@ -6188,14 +6192,14 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
assert(numOfGroup == 0 || numOfGroup == 1);
|
assert(numOfGroup == 0 || numOfGroup == 1);
|
||||||
|
|
||||||
if (numOfGroup == 0) {
|
if (numOfGroup == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pa = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray* pa = GET_TABLEGROUP(pQInfo, 0);
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(pa);
|
size_t num = taosArrayGetSize(pa);
|
||||||
assert(num == pQInfo->tableqinfoGroupInfo.numOfTables);
|
assert(num == pQInfo->tableqinfoGroupInfo.numOfTables);
|
||||||
|
|
Loading…
Reference in New Issue