merge 3.0
This commit is contained in:
commit
5f0005a7b7
|
@ -155,8 +155,8 @@ typedef struct SQueryTableDataCond {
|
||||||
int32_t numOfCols;
|
int32_t numOfCols;
|
||||||
SColumnInfo* colList;
|
SColumnInfo* colList;
|
||||||
int32_t type; // data block load type:
|
int32_t type; // data block load type:
|
||||||
int32_t numOfTWindows;
|
// int32_t numOfTWindows;
|
||||||
STimeWindow* twindows;
|
STimeWindow twindows;
|
||||||
int64_t startVersion;
|
int64_t startVersion;
|
||||||
int64_t endVersion;
|
int64_t endVersion;
|
||||||
} SQueryTableDataCond;
|
} SQueryTableDataCond;
|
||||||
|
|
|
@ -36,6 +36,7 @@ typedef struct SReadHandle {
|
||||||
void* vnode;
|
void* vnode;
|
||||||
void* mnd;
|
void* mnd;
|
||||||
SMsgCb* pMsgCb;
|
SMsgCb* pMsgCb;
|
||||||
|
int64_t version;
|
||||||
bool initMetaReader;
|
bool initMetaReader;
|
||||||
bool initTableReader;
|
bool initTableReader;
|
||||||
bool initTqReader;
|
bool initTqReader;
|
||||||
|
|
|
@ -418,7 +418,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfVnodeQueryThreads = tsNumOfCores / 2;
|
tsNumOfVnodeQueryThreads = tsNumOfCores / 2;
|
||||||
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 1);
|
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2);
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 1, 1);
|
tsNumOfVnodeFetchThreads = TRANGE(tsNumOfVnodeFetchThreads, 1, 1);
|
||||||
|
|
|
@ -130,7 +130,7 @@ bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
|
||||||
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
|
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||||
void *tsdbGetIdx(SMeta *pMeta);
|
void *tsdbGetIdx(SMeta *pMeta);
|
||||||
|
|
|
@ -89,6 +89,8 @@ typedef struct {
|
||||||
STqExecTb execTb;
|
STqExecTb execTb;
|
||||||
STqExecDb execDb;
|
STqExecDb execDb;
|
||||||
};
|
};
|
||||||
|
// TODO remove it
|
||||||
|
int64_t tsdbEndVer;
|
||||||
|
|
||||||
} STqExecHandle;
|
} STqExecHandle;
|
||||||
|
|
||||||
|
|
|
@ -483,6 +483,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
/*for (int32_t i = 0; i < 5; i++) {*/
|
/*for (int32_t i = 0; i < 5; i++) {*/
|
||||||
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
|
/*pHandle->execHandle.pExecReader[i] = tqOpenReader(pTq->pVnode);*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
int64_t ver = walGetCommittedVer(pTq->pVnode->pWal);
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
pHandle->execHandle.execCol.qmsg = req.qmsg;
|
pHandle->execHandle.execCol.qmsg = req.qmsg;
|
||||||
req.qmsg = NULL;
|
req.qmsg = NULL;
|
||||||
|
@ -493,6 +494,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
.vnode = pTq->pVnode,
|
.vnode = pTq->pVnode,
|
||||||
.initTableReader = true,
|
.initTableReader = true,
|
||||||
.initTqReader = true,
|
.initTqReader = true,
|
||||||
|
.version = ver,
|
||||||
};
|
};
|
||||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||||
|
@ -501,6 +503,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
ASSERT(scanner);
|
ASSERT(scanner);
|
||||||
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
|
pHandle->execHandle.pExecReader[i] = qExtractReaderFromStreamScanner(scanner);
|
||||||
ASSERT(pHandle->execHandle.pExecReader[i]);
|
ASSERT(pHandle->execHandle.pExecReader[i]);
|
||||||
|
pHandle->execHandle.tsdbEndVer = ver;
|
||||||
}
|
}
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
for (int32_t i = 0; i < 5; i++) {
|
for (int32_t i = 0; i < 5; i++) {
|
||||||
|
|
|
@ -96,6 +96,12 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pRsp->blockNum == 0 && pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
|
tqOffsetResetToLog(pOffset, pExec->tsdbEndVer + 1);
|
||||||
|
qStreamPrepareScan(task, pOffset);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
void* meta = qStreamExtractMetaMsg(task);
|
void* meta = qStreamExtractMetaMsg(task);
|
||||||
if (meta != NULL) {
|
if (meta != NULL) {
|
||||||
// tq add meta to rsp
|
// tq add meta to rsp
|
||||||
|
@ -107,7 +113,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
|
||||||
|
|
||||||
ASSERT(pRsp->rspOffset.type != 0);
|
ASSERT(pRsp->rspOffset.type != 0);
|
||||||
|
|
||||||
if (pRsp->rspOffset.type == TMQ_OFFSET__LOG) {
|
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
|
ASSERT(pRsp->rspOffset.version + 1 >= pRsp->reqOffset.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -356,14 +356,14 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
initReaderStatus(&pReader->status);
|
initReaderStatus(&pReader->status);
|
||||||
|
|
||||||
pReader->pTsdb =
|
pReader->pTsdb =
|
||||||
getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
|
getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
|
||||||
pReader->suid = pCond->suid;
|
pReader->suid = pCond->suid;
|
||||||
pReader->order = pCond->order;
|
pReader->order = pCond->order;
|
||||||
pReader->capacity = 4096;
|
pReader->capacity = 4096;
|
||||||
pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
|
pReader->idStr = (idstr != NULL) ? strdup(idstr) : NULL;
|
||||||
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
|
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
|
||||||
pReader->type = pCond->type;
|
pReader->type = pCond->type;
|
||||||
pReader->window = updateQueryTimeWindow(pVnode->pTsdb, pCond->twindows);
|
pReader->window = updateQueryTimeWindow(pVnode->pTsdb, &pCond->twindows);
|
||||||
|
|
||||||
ASSERT(pCond->numOfCols > 0);
|
ASSERT(pCond->numOfCols > 0);
|
||||||
|
|
||||||
|
@ -2954,7 +2954,7 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
return pReader->pResBlock->pDataBlock;
|
return pReader->pResBlock->pDataBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
|
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window)) {
|
if (isEmptyQueryTimeWindow(&pReader->window)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2964,7 +2964,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_
|
||||||
pReader->status.loadFromFile = true;
|
pReader->status.loadFromFile = true;
|
||||||
pReader->status.pTableIter = NULL;
|
pReader->status.pTableIter = NULL;
|
||||||
|
|
||||||
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows[tWinIdx]);
|
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
|
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
|
||||||
|
|
|
@ -279,9 +279,6 @@ typedef struct STableScanInfo {
|
||||||
SScanInfo scanInfo;
|
SScanInfo scanInfo;
|
||||||
int32_t scanTimes;
|
int32_t scanTimes;
|
||||||
SNode* pFilterNode; // filter info, which is push down by optimizer
|
SNode* pFilterNode; // filter info, which is push down by optimizer
|
||||||
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context,todo: remove this by using SExprSup
|
|
||||||
int32_t* rowEntryInfoOffset; // todo: remove this by using SExprSup
|
|
||||||
SExprInfo* pExpr;// todo: remove this by using SExprSup
|
|
||||||
|
|
||||||
SSDataBlock* pResBlock;
|
SSDataBlock* pResBlock;
|
||||||
SArray* pColMatchInfo;
|
SArray* pColMatchInfo;
|
||||||
|
@ -290,14 +287,10 @@ typedef struct STableScanInfo {
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
||||||
|
|
||||||
SSampleExecInfo sample; // sample execution info
|
SSampleExecInfo sample; // sample execution info
|
||||||
int32_t curTWinIdx;
|
|
||||||
|
|
||||||
int32_t currentGroupId;
|
int32_t currentGroupId;
|
||||||
int32_t currentTable;
|
int32_t currentTable;
|
||||||
uint64_t queryId; // todo remove it
|
|
||||||
uint64_t taskId; // todo remove it
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
struct {
|
struct {
|
||||||
|
@ -916,7 +909,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
|
||||||
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
|
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
|
||||||
|
|
||||||
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
||||||
int32_t precision, int32_t order);
|
int32_t order);
|
||||||
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
||||||
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
||||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
|
@ -932,7 +925,6 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
|
||||||
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
|
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
|
||||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
|
|
||||||
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition,
|
||||||
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset,
|
||||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include <common/ttime.h>
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
|
@ -769,12 +770,9 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
|
||||||
|
|
||||||
// pCond->twindow = pTableScanNode->scanRange;
|
// pCond->twindow = pTableScanNode->scanRange;
|
||||||
// TODO: get it from stable scan node
|
// TODO: get it from stable scan node
|
||||||
pCond->numOfTWindows = 1;
|
pCond->twindows = pTableScanNode->scanRange;
|
||||||
pCond->twindows = taosMemoryCalloc(pCond->numOfTWindows, sizeof(STimeWindow));
|
pCond->suid = pTableScanNode->scan.suid;
|
||||||
pCond->twindows[0] = pTableScanNode->scanRange;
|
pCond->type = BLOCK_LOAD_OFFSET_ORDER;
|
||||||
pCond->suid = pTableScanNode->scan.suid;
|
|
||||||
|
|
||||||
pCond->type = BLOCK_LOAD_OFFSET_ORDER;
|
|
||||||
pCond->startVersion = -1;
|
pCond->startVersion = -1;
|
||||||
pCond->endVersion = -1;
|
pCond->endVersion = -1;
|
||||||
// pCond->type = pTableScanNode->scanFlag;
|
// pCond->type = pTableScanNode->scanFlag;
|
||||||
|
@ -826,3 +824,87 @@ int32_t convertFillType(int32_t mode) {
|
||||||
|
|
||||||
return type;
|
return type;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery) {
|
||||||
|
if (ascQuery) {
|
||||||
|
getAlignQueryTimeWindow(pInterval, pInterval->precision, ts, w);
|
||||||
|
} else {
|
||||||
|
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
||||||
|
getAlignQueryTimeWindow(pInterval, pInterval->precision, ts, w);
|
||||||
|
|
||||||
|
int64_t key = w->skey;
|
||||||
|
while (key < ts) { // moving towards end
|
||||||
|
key = taosTimeAdd(key, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||||
|
if (key >= ts) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
w->skey = key;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static STimeWindow doCalculateTimeWindow(int64_t ts, SInterval* pInterval) {
|
||||||
|
STimeWindow w = {0};
|
||||||
|
|
||||||
|
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
||||||
|
w.skey = taosTimeTruncate(ts, pInterval, pInterval->precision);
|
||||||
|
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||||
|
} else {
|
||||||
|
int64_t st = w.skey;
|
||||||
|
|
||||||
|
if (st > ts) {
|
||||||
|
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t et = st + pInterval->interval - 1;
|
||||||
|
if (et < ts) {
|
||||||
|
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
||||||
|
}
|
||||||
|
|
||||||
|
w.skey = st;
|
||||||
|
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return w;
|
||||||
|
}
|
||||||
|
|
||||||
|
static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
|
||||||
|
int32_t factor = (order == TSDB_ORDER_ASC)? -1:1;
|
||||||
|
|
||||||
|
STimeWindow win = *pWindow;
|
||||||
|
STimeWindow save = win;
|
||||||
|
while(win.skey <= ts && win.ekey >= ts) {
|
||||||
|
save = win;
|
||||||
|
win.skey = taosTimeAdd(win.skey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||||
|
win.ekey = taosTimeAdd(win.ekey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
||||||
|
}
|
||||||
|
|
||||||
|
return save;
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the correct time window according to the handled timestamp
|
||||||
|
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
||||||
|
int32_t order) {
|
||||||
|
STimeWindow w = {0};
|
||||||
|
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
|
||||||
|
getInitialStartTimeWindow(pInterval, ts, &w, (order == TSDB_ORDER_ASC));
|
||||||
|
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
|
||||||
|
return w;
|
||||||
|
}
|
||||||
|
|
||||||
|
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
|
||||||
|
|
||||||
|
// in case of typical time window, we can calculate time window directly.
|
||||||
|
if (w.skey > ts || w.ekey < ts) {
|
||||||
|
w = doCalculateTimeWindow(ts, pInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInterval->interval != pInterval->sliding) {
|
||||||
|
// it is an sliding window query, in which sliding value is not equalled to
|
||||||
|
// interval value, and we need to find the first qualified time window.
|
||||||
|
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
|
||||||
|
}
|
||||||
|
|
||||||
|
return w;
|
||||||
|
}
|
|
@ -327,12 +327,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
ASSERT(found);
|
ASSERT(found);
|
||||||
|
|
||||||
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
tsdbSetTableId(pTableScanInfo->dataReader, uid);
|
||||||
int64_t oldSkey = pTableScanInfo->cond.twindows[0].skey;
|
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
|
||||||
pTableScanInfo->cond.twindows[0].skey = ts + 1;
|
pTableScanInfo->cond.twindows.skey = ts + 1;
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
pTableScanInfo->cond.twindows[0].skey = oldSkey;
|
pTableScanInfo->cond.twindows.skey = oldSkey;
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
|
||||||
pTableScanInfo->currentTable, tableSz);
|
pTableScanInfo->currentTable, tableSz);
|
||||||
|
|
|
@ -1038,6 +1038,7 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
|
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
|
||||||
|
#if 0
|
||||||
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
||||||
uint32_t status = BLK_DATA_NOT_LOAD;
|
uint32_t status = BLK_DATA_NOT_LOAD;
|
||||||
|
|
||||||
|
@ -1059,6 +1060,8 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
|
||||||
}
|
}
|
||||||
|
|
||||||
return status;
|
return status;
|
||||||
|
#endif
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
|
@ -2887,12 +2890,11 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||||
ASSERT(found);
|
ASSERT(found);
|
||||||
|
|
||||||
tsdbSetTableId(pInfo->dataReader, uid);
|
tsdbSetTableId(pInfo->dataReader, uid);
|
||||||
int64_t oldSkey = pInfo->cond.twindows[0].skey;
|
int64_t oldSkey = pInfo->cond.twindows.skey;
|
||||||
pInfo->cond.twindows[0].skey = ts + 1;
|
pInfo->cond.twindows.skey = ts + 1;
|
||||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0);
|
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||||
pInfo->cond.twindows[0].skey = oldSkey;
|
pInfo->cond.twindows.skey = oldSkey;
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
pInfo->curTWinIdx = 0;
|
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
|
qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts,
|
||||||
pInfo->currentTable, tableSz);
|
pInfo->currentTable, tableSz);
|
||||||
|
@ -4351,9 +4353,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
cond.colList->bytes = sizeof(TSKEY);
|
cond.colList->bytes = sizeof(TSKEY);
|
||||||
|
|
||||||
cond.numOfTWindows = 1;
|
cond.twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow));
|
|
||||||
cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
|
||||||
cond.suid = pBlockNode->suid;
|
cond.suid = pBlockNode->suid;
|
||||||
cond.type = BLOCK_LOAD_OFFSET_ORDER;
|
cond.type = BLOCK_LOAD_OFFSET_ORDER;
|
||||||
}
|
}
|
||||||
|
@ -4552,18 +4552,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return pOptr;
|
return pOptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param) {
|
|
||||||
const SQueryTableDataCond* pCond = param;
|
|
||||||
const STimeWindow* pWin1 = p1;
|
|
||||||
const STimeWindow* pWin2 = p2;
|
|
||||||
if (pCond->order == TSDB_ORDER_ASC) {
|
|
||||||
return pWin1->skey - pWin2->skey;
|
|
||||||
} else if (pCond->order == TSDB_ORDER_DESC) {
|
|
||||||
return pWin2->skey - pWin1->skey;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* extractColumnInfo(SNodeList* pNodeList) {
|
SArray* extractColumnInfo(SNodeList* pNodeList) {
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn));
|
||||||
|
|
|
@ -294,13 +294,8 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
|
||||||
// setupQueryRangeForReverseScan(pTableScanInfo);
|
// setupQueryRangeForReverseScan(pTableScanInfo);
|
||||||
|
|
||||||
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
|
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
STimeWindow* pTWindow = &pTableScanInfo->cond.twindows;
|
||||||
STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i];
|
TSWAP(pTWindow->skey, pTWindow->ekey);
|
||||||
TSWAP(pTWindow->skey, pTWindow->ekey);
|
|
||||||
}
|
|
||||||
|
|
||||||
SQueryTableDataCond* pCond = &pTableScanInfo->cond;
|
|
||||||
taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
|
||||||
|
@ -450,16 +445,10 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// do the ascending order traverse in the first place.
|
// do the ascending order traverse in the first place.
|
||||||
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
|
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
if (p != NULL) {
|
||||||
if (p != NULL) {
|
ASSERT(p->info.uid != 0);
|
||||||
ASSERT(p->info.uid != 0);
|
return p;
|
||||||
return p;
|
|
||||||
}
|
|
||||||
pTableScanInfo->curTWinIdx += 1;
|
|
||||||
if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
|
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->scanTimes += 1;
|
pTableScanInfo->scanTimes += 1;
|
||||||
|
@ -468,40 +457,25 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
|
||||||
qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
|
||||||
}
|
|
||||||
// do prepare for the next round table scan operation
|
// do prepare for the next round table scan operation
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
|
int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
|
||||||
if (pTableScanInfo->scanTimes < total) {
|
if (pTableScanInfo->scanTimes < total) {
|
||||||
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
|
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
|
||||||
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, 0);
|
prepareForDescendingScan(pTableScanInfo, pOperator->exprSupp.pCtx, 0);
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
|
||||||
qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (pTableScanInfo->scanTimes < total) {
|
while (pTableScanInfo->scanTimes < total) {
|
||||||
while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
|
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
if (p != NULL) {
|
||||||
if (p != NULL) {
|
return p;
|
||||||
return p;
|
|
||||||
}
|
|
||||||
pTableScanInfo->curTWinIdx += 1;
|
|
||||||
if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
|
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->scanTimes += 1;
|
pTableScanInfo->scanTimes += 1;
|
||||||
|
@ -512,12 +486,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
qDebug("%s start to repeat descending order scan data blocks due to query func required",
|
qDebug("%s start to repeat descending order scan data blocks due to query func required",
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
|
||||||
qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
|
||||||
}
|
|
||||||
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -544,9 +513,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
||||||
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
||||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0);
|
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
pInfo->curTWinIdx = 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,8 +546,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
|
||||||
// tsdbSetTableList(pInfo->dataReader, tableList);
|
// tsdbSetTableList(pInfo->dataReader, tableList);
|
||||||
|
|
||||||
tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0);
|
tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
|
||||||
pInfo->curTWinIdx = 0;
|
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
|
|
||||||
result = doTableScanGroup(pOperator);
|
result = doTableScanGroup(pOperator);
|
||||||
|
@ -650,7 +617,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
|
||||||
pInfo->scanFlag = MAIN_SCAN;
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
pInfo->pColMatchInfo = pColList;
|
pInfo->pColMatchInfo = pColList;
|
||||||
pInfo->curTWinIdx = 0;
|
|
||||||
pInfo->currentGroupId = -1;
|
pInfo->currentGroupId = -1;
|
||||||
|
|
||||||
pOperator->name = "TableScanOperator"; // for debug purpose
|
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||||
|
@ -880,12 +846,7 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
||||||
pTableScanInfo->cond.twindows[0] = *pWin;
|
pTableScanInfo->cond.twindows = *pWin;
|
||||||
pTableScanInfo->curTWinIdx = 0;
|
|
||||||
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
|
||||||
// if (!pTableScanInfo->dataReader) {
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
pTableScanInfo->currentGroupId = -1;
|
pTableScanInfo->currentGroupId = -1;
|
||||||
}
|
}
|
||||||
|
@ -943,8 +904,7 @@ static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t t
|
||||||
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
||||||
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
|
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
|
||||||
} else {
|
} else {
|
||||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision,
|
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, TSDB_ORDER_ASC);
|
||||||
TSDB_ORDER_ASC);
|
|
||||||
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
||||||
(*pRowIndex) +=
|
(*pRowIndex) +=
|
||||||
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||||
|
@ -1493,6 +1453,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
|
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
|
||||||
|
if (pHandle->version > 0) {
|
||||||
|
pSTInfo->cond.endVersion = pHandle->version;
|
||||||
|
}
|
||||||
|
|
||||||
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
||||||
if (pHandle->initTableReader) {
|
if (pHandle->initTableReader) {
|
||||||
|
|
|
@ -59,136 +59,6 @@ static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOper
|
||||||
|
|
||||||
static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
|
static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
|
||||||
|
|
||||||
static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w,
|
|
||||||
bool ascQuery) {
|
|
||||||
if (ascQuery) {
|
|
||||||
getAlignQueryTimeWindow(pInterval, precision, ts, w);
|
|
||||||
} else {
|
|
||||||
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
|
|
||||||
getAlignQueryTimeWindow(pInterval, precision, ts, w);
|
|
||||||
|
|
||||||
int64_t key = w->skey;
|
|
||||||
while (key < ts) { // moving towards end
|
|
||||||
key = taosTimeAdd(key, pInterval->sliding, pInterval->slidingUnit, precision);
|
|
||||||
if (key >= ts) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
w->skey = key;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order) {
|
|
||||||
int32_t factor = (order == TSDB_ORDER_ASC) ? -1 : 1;
|
|
||||||
|
|
||||||
STimeWindow win = *pWindow;
|
|
||||||
STimeWindow save = win;
|
|
||||||
while (win.skey <= ts && win.ekey >= ts) {
|
|
||||||
save = win;
|
|
||||||
win.skey = taosTimeAdd(win.skey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
|
||||||
win.ekey = taosTimeAdd(win.ekey, factor * pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
|
|
||||||
}
|
|
||||||
|
|
||||||
return save;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo do refactor
|
|
||||||
// get the correct time window according to the handled timestamp
|
|
||||||
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
|
||||||
int32_t precision, int32_t order) {
|
|
||||||
STimeWindow w = {0};
|
|
||||||
|
|
||||||
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
|
|
||||||
getInitialStartTimeWindow(pInterval, precision, ts, &w, true);
|
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
return w;
|
|
||||||
}
|
|
||||||
|
|
||||||
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
|
|
||||||
|
|
||||||
if (pInterval->interval == pInterval->sliding) {
|
|
||||||
if (w.skey > ts || w.ekey < ts) {
|
|
||||||
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
|
||||||
w.skey = taosTimeTruncate(ts, pInterval, precision);
|
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
} else {
|
|
||||||
int64_t st = w.skey;
|
|
||||||
|
|
||||||
if (st > ts) {
|
|
||||||
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t et = st + pInterval->interval - 1;
|
|
||||||
if (et < ts) {
|
|
||||||
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
|
||||||
}
|
|
||||||
|
|
||||||
w.skey = st;
|
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else { // it is an sliding window query, in which sliding value is not equalled to
|
|
||||||
// interval value, and we need to find the first qualified time window for asc/desc traverse respectively.
|
|
||||||
if (order == TSDB_ORDER_ASC) {
|
|
||||||
if (w.skey <= ts && w.ekey >= ts) {
|
|
||||||
// ts is resident in current time window, but we need to find the first
|
|
||||||
// qualified time window that cover this timestamp
|
|
||||||
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
|
|
||||||
} else {
|
|
||||||
// todo refactor:
|
|
||||||
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
|
||||||
w.skey = taosTimeTruncate(ts, pInterval, precision);
|
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
} else {
|
|
||||||
int64_t st = w.skey;
|
|
||||||
|
|
||||||
if (st > ts) {
|
|
||||||
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t et = st + pInterval->interval - 1;
|
|
||||||
if (et < ts) {
|
|
||||||
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
|
||||||
}
|
|
||||||
|
|
||||||
w.skey = st;
|
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
|
|
||||||
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (w.skey <= ts && w.ekey >= ts) {
|
|
||||||
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
|
|
||||||
} else {
|
|
||||||
// todo refactor:
|
|
||||||
if (pInterval->intervalUnit == 'n' || pInterval->intervalUnit == 'y') {
|
|
||||||
w.skey = taosTimeTruncate(ts, pInterval, precision);
|
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
} else {
|
|
||||||
int64_t st = w.skey;
|
|
||||||
|
|
||||||
if (st > ts) {
|
|
||||||
st -= ((st - ts + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t et = st + pInterval->interval - 1;
|
|
||||||
if (et < ts) {
|
|
||||||
st += ((ts - et + pInterval->sliding - 1) / pInterval->sliding) * pInterval->sliding;
|
|
||||||
}
|
|
||||||
|
|
||||||
w.skey = st;
|
|
||||||
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
|
||||||
|
|
||||||
w = getFirstQualifiedTimeWindow(ts, &w, pInterval, order);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return w;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
|
static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan,
|
||||||
SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
|
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
|
||||||
|
@ -929,8 +799,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
|
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols);
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
|
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
|
||||||
pInfo->interval.precision, pInfo->order);
|
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
|
if (!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) {
|
||||||
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
|
||||||
|
@ -1286,11 +1155,13 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
||||||
while (1) {
|
while (1) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pBlock);
|
doFilter(pInfo->pCondition, pBlock);
|
||||||
|
|
||||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
||||||
if (!hasRemain) {
|
if (!hasRemain) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock->info.rows > 0) {
|
if (pBlock->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1389,8 +1260,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||||
SResultRowInfo dumyInfo;
|
SResultRowInfo dumyInfo;
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
STimeWindow win =
|
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC);
|
||||||
getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, TSDB_ORDER_ASC);
|
|
||||||
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
|
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
|
||||||
if (pUpWins) {
|
if (pUpWins) {
|
||||||
SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]};
|
SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]};
|
||||||
|
@ -1412,7 +1282,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
||||||
SResultRowInfo dumyInfo;
|
SResultRowInfo dumyInfo;
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, TSDB_ORDER_ASC);
|
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, TSDB_ORDER_ASC);
|
||||||
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||||
uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId;
|
uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId;
|
||||||
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput);
|
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput);
|
||||||
|
@ -1452,7 +1322,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
|
||||||
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
||||||
SResultRowInfo dumyInfo;
|
SResultRowInfo dumyInfo;
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, pInterval->precision, TSDB_ORDER_ASC);
|
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval, TSDB_ORDER_ASC);
|
||||||
SWinRes winRe = {
|
SWinRes winRe = {
|
||||||
.ts = win.skey,
|
.ts = win.skey,
|
||||||
.groupId = groupId,
|
.groupId = groupId,
|
||||||
|
@ -2550,8 +2420,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
||||||
|
|
||||||
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
||||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
|
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
|
||||||
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
|
STimeWindow nextWin =
|
||||||
pInfo->interval.precision, pInfo->order);
|
getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order);
|
||||||
while (1) {
|
while (1) {
|
||||||
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
|
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
|
||||||
if (pInfo->ignoreExpiredData && isClosed) {
|
if (pInfo->ignoreExpiredData && isClosed) {
|
||||||
|
@ -4714,7 +4584,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
STimeWindow* prevWin = &prevGrpWin->window;
|
STimeWindow* prevWin = &prevGrpWin->window;
|
||||||
if ((ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey)) {
|
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
|
||||||
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
|
finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
|
||||||
tdListPopNode(miaInfo->groupIntervals, listNode);
|
tdListPopNode(miaInfo->groupIntervals, listNode);
|
||||||
}
|
}
|
||||||
|
@ -4739,8 +4609,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
||||||
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols);
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval,
|
STimeWindow win =
|
||||||
iaInfo->interval.precision, iaInfo->order);
|
getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->order);
|
||||||
|
|
||||||
int32_t ret =
|
int32_t ret =
|
||||||
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
|
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
|
||||||
|
|
|
@ -1863,8 +1863,6 @@ static void stddevTransferInfo(SStddevRes* pInput, SStddevRes* pOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pOutput->count += pInput->count;
|
pOutput->count += pInput->count;
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
|
int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
|
@ -1874,14 +1872,13 @@ int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SStddevRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
for(int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
|
||||||
char* data = colDataGetData(pCol, start);
|
char* data = colDataGetData(pCol, i);
|
||||||
SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data);
|
SStddevRes* pInputInfo = (SStddevRes*)varDataVal(data);
|
||||||
|
stddevTransferInfo(pInputInfo, pInfo);
|
||||||
stddevTransferInfo(pInputInfo, pInfo);
|
}
|
||||||
|
|
||||||
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ int32_t tQWorkerInit(SQWorkerPool *pool) {
|
||||||
worker->pool = pool;
|
worker->pool = pool;
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
|
uInfo("worker:%s is initialized, min:%d max:%d", pool->name, pool->min, pool->max);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue