[TD-225] refactor codes.
This commit is contained in:
parent
8103620575
commit
c121f5b7b9
|
@ -83,7 +83,7 @@ typedef struct SResultRec {
|
||||||
int32_t threshold; // result size threshold in rows.
|
int32_t threshold; // result size threshold in rows.
|
||||||
} SResultRec;
|
} SResultRec;
|
||||||
|
|
||||||
typedef struct SWindowResInfo {
|
typedef struct SResultRowInfo {
|
||||||
SResultRow** pResult; // result list
|
SResultRow** pResult; // result list
|
||||||
int16_t type:8; // data type for hash key
|
int16_t type:8; // data type for hash key
|
||||||
int32_t size:24; // number of result set
|
int32_t size:24; // number of result set
|
||||||
|
@ -91,7 +91,7 @@ typedef struct SWindowResInfo {
|
||||||
int32_t curIndex; // current start active index
|
int32_t curIndex; // current start active index
|
||||||
int64_t startTime; // start time of the first time window for sliding query
|
int64_t startTime; // start time of the first time window for sliding query
|
||||||
int64_t prevSKey; // previous (not completed) sliding window start key
|
int64_t prevSKey; // previous (not completed) sliding window start key
|
||||||
} SWindowResInfo;
|
} SResultRowInfo;
|
||||||
|
|
||||||
typedef struct SColumnFilterElem {
|
typedef struct SColumnFilterElem {
|
||||||
int16_t bytes; // column length
|
int16_t bytes; // column length
|
||||||
|
@ -114,7 +114,7 @@ typedef struct STableQueryInfo {
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
STSCursor cur;
|
STSCursor cur;
|
||||||
void* pTable; // for retrieve the page id list
|
void* pTable; // for retrieve the page id list
|
||||||
SWindowResInfo windowResInfo;
|
SResultRowInfo windowResInfo;
|
||||||
} STableQueryInfo;
|
} STableQueryInfo;
|
||||||
|
|
||||||
typedef struct SQueryCostInfo {
|
typedef struct SQueryCostInfo {
|
||||||
|
@ -178,7 +178,7 @@ typedef struct SQueryRuntimeEnv {
|
||||||
uint16_t* offset;
|
uint16_t* offset;
|
||||||
uint16_t scanFlag; // denotes reversed scan of data or not
|
uint16_t scanFlag; // denotes reversed scan of data or not
|
||||||
SFillInfo* pFillInfo;
|
SFillInfo* pFillInfo;
|
||||||
SWindowResInfo windowResInfo;
|
SResultRowInfo windowResInfo;
|
||||||
STSBuf* pTSBuf;
|
STSBuf* pTSBuf;
|
||||||
STSCursor cur;
|
STSCursor cur;
|
||||||
SQueryCostInfo summary;
|
SQueryCostInfo summary;
|
||||||
|
|
|
@ -30,19 +30,19 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t typ
|
||||||
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type);
|
void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type);
|
||||||
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
|
SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index);
|
||||||
|
|
||||||
int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, int32_t size, int16_t type);
|
int32_t initWindowResInfo(SResultRowInfo* pWindowResInfo, int32_t size, int16_t type);
|
||||||
|
|
||||||
void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo);
|
void cleanupTimeWindowInfo(SResultRowInfo* pWindowResInfo);
|
||||||
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo);
|
void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pWindowResInfo);
|
||||||
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
|
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num);
|
||||||
|
|
||||||
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
|
void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv);
|
||||||
int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo);
|
int32_t numOfClosedTimeWindow(SResultRowInfo* pWindowResInfo);
|
||||||
void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
|
void closeTimeWindow(SResultRowInfo* pWindowResInfo, int32_t slot);
|
||||||
void closeAllTimeWindow(SWindowResInfo* pWindowResInfo);
|
void closeAllTimeWindow(SResultRowInfo* pWindowResInfo);
|
||||||
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
|
void removeRedundantWindow(SResultRowInfo *pWindowResInfo, TSKEY lastKey, int32_t order);
|
||||||
|
|
||||||
static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int32_t slot) {
|
static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pWindowResInfo, int32_t slot) {
|
||||||
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
|
assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size);
|
||||||
return pWindowResInfo->pResult[slot];
|
return pWindowResInfo->pResult[slot];
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ static FORCE_INLINE SResultRow *getResultRow(SWindowResInfo *pWindowResInfo, int
|
||||||
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
|
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
|
||||||
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
|
#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1)
|
||||||
|
|
||||||
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot);
|
bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot);
|
||||||
|
|
||||||
int32_t initResultRow(SResultRow *pResultRow);
|
int32_t initResultRow(SResultRow *pResultRow);
|
||||||
|
|
||||||
|
|
|
@ -461,7 +461,7 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData,
|
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, char *pData,
|
||||||
int16_t bytes, bool masterscan, uint64_t uid) {
|
int16_t bytes, bool masterscan, uint64_t uid) {
|
||||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
|
||||||
int32_t *p1 =
|
int32_t *p1 =
|
||||||
|
@ -518,7 +518,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the correct time window according to the handled timestamp
|
// get the correct time window according to the handled timestamp
|
||||||
static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) {
|
static STimeWindow getActiveTimeWindow(SResultRowInfo *pWindowResInfo, int64_t ts, SQuery *pQuery) {
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
|
|
||||||
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
|
if (pWindowResInfo->curIndex == -1) { // the first window, from the previous stored value
|
||||||
|
@ -611,7 +611,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, SDataBlockInfo* pBockInfo,
|
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, SDataBlockInfo* pBockInfo,
|
||||||
STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) {
|
STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) {
|
||||||
assert(win->skey <= win->ekey);
|
assert(win->skey <= win->ekey);
|
||||||
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
||||||
|
@ -644,7 +644,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool getResultRowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) {
|
static bool getResultRowStatus(SResultRowInfo *pWindowResInfo, int32_t slot) {
|
||||||
assert(slot >= 0 && slot < pWindowResInfo->size);
|
assert(slot >= 0 && slot < pWindowResInfo->size);
|
||||||
return pWindowResInfo->pResult[slot]->closed;
|
return pWindowResInfo->pResult[slot]->closed;
|
||||||
}
|
}
|
||||||
|
@ -703,7 +703,7 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se
|
||||||
/**
|
/**
|
||||||
* NOTE: the query status only set for the first scan of master scan.
|
* NOTE: the query status only set for the first scan of master scan.
|
||||||
*/
|
*/
|
||||||
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SWindowResInfo *pWindowResInfo) {
|
static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
if (pRuntimeEnv->scanFlag != MASTER_SCAN) {
|
if (pRuntimeEnv->scanFlag != MASTER_SCAN) {
|
||||||
return pWindowResInfo->size;
|
return pWindowResInfo->size;
|
||||||
|
@ -1157,7 +1157,7 @@ static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY
|
||||||
* such as count/min/max etc.
|
* such as count/min/max etc.
|
||||||
*/
|
*/
|
||||||
static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
||||||
SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) {
|
SResultRowInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) {
|
||||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||||
|
|
||||||
|
@ -1575,7 +1575,7 @@ static void setTimeWindowEKeyInterp(SQueryRuntimeEnv* pRuntimeEnv, SArray* pData
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo,
|
||||||
SWindowResInfo *pWindowResInfo, SArray *pDataBlock) {
|
SResultRowInfo *pWindowResInfo, SArray *pDataBlock) {
|
||||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||||
|
|
||||||
|
@ -1781,7 +1781,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
STableQueryInfo* pTableQInfo = pQuery->current;
|
STableQueryInfo* pTableQInfo = pQuery->current;
|
||||||
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
|
||||||
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock);
|
||||||
|
@ -2605,7 +2605,7 @@ static bool overlapWithTimeWindow(SQuery* pQuery, SDataBlockInfo* pBlockInfo) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
|
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pWindowResInfo, void* pQueryHandle, SDataBlockInfo* pBlockInfo, SDataStatis **pStatis, SArray** pDataBlock, uint32_t* status) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
*status = BLK_DATA_NO_NEEDED;
|
*status = BLK_DATA_NO_NEEDED;
|
||||||
|
@ -2828,7 +2828,7 @@ static void doSetInitialTimewindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo
|
||||||
|
|
||||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
|
if (QUERY_IS_INTERVAL_QUERY(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == TSKEY_INITIAL_VAL) {
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w);
|
getAlignQueryTimeWindow(pQuery, pBlockInfo->window.skey, pBlockInfo->window.skey, pQuery->window.ekey, &w);
|
||||||
|
@ -3192,14 +3192,14 @@ int32_t tableResultComparFn(const void *pLeft, const void *pRight, void *param)
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SWindowResInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
|
SResultRowInfo *pWindowResInfo1 = &supporter->pTableQueryInfo[left]->windowResInfo;
|
||||||
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
|
SResultRow * pWindowRes1 = getResultRow(pWindowResInfo1, leftPos);
|
||||||
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId);
|
tFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes1->pageId);
|
||||||
|
|
||||||
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1);
|
char *b1 = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes1, page1);
|
||||||
TSKEY leftTimestamp = GET_INT64_VAL(b1);
|
TSKEY leftTimestamp = GET_INT64_VAL(b1);
|
||||||
|
|
||||||
SWindowResInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
|
SResultRowInfo *pWindowResInfo2 = &supporter->pTableQueryInfo[right]->windowResInfo;
|
||||||
SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
|
SResultRow * pWindowRes2 = getResultRow(pWindowResInfo2, rightPos);
|
||||||
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId);
|
tFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes2->pageId);
|
||||||
|
|
||||||
|
@ -3439,7 +3439,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
||||||
|
|
||||||
int32_t pos = pTree->pNode[0].index;
|
int32_t pos = pTree->pNode[0].index;
|
||||||
|
|
||||||
SWindowResInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pTableList[pos]->windowResInfo;
|
||||||
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
|
SResultRow *pWindowRes = getResultRow(pWindowResInfo, cs.position[pos]);
|
||||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
|
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId);
|
||||||
|
|
||||||
|
@ -3603,7 +3603,7 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
|
||||||
pTableQueryInfo->windowResInfo.curIndex = pTableQueryInfo->windowResInfo.size - 1;
|
pTableQueryInfo->windowResInfo.curIndex = pTableQueryInfo->windowResInfo.size - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t order) {
|
static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pWindowResInfo, int32_t order) {
|
||||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
||||||
|
@ -3635,7 +3635,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
|
||||||
int32_t order = pQuery->order.order;
|
int32_t order = pQuery->order.order;
|
||||||
|
|
||||||
// group by normal columns and interval query on normal table
|
// group by normal columns and interval query on normal table
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order);
|
disableFuncInReverseScanImpl(pRuntimeEnv, pWindowResInfo, order);
|
||||||
} else { // for simple result of table query,
|
} else { // for simple result of table query,
|
||||||
|
@ -3826,7 +3826,7 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
bool toContinue = false;
|
bool toContinue = false;
|
||||||
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
// for each group result, call the finalize function for each column
|
// for each group result, call the finalize function for each column
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
||||||
SResultRow *pResult = getResultRow(pWindowResInfo, i);
|
SResultRow *pResult = getResultRow(pWindowResInfo, i);
|
||||||
|
@ -4038,7 +4038,7 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
if (pRuntimeEnv->groupbyNormalCol || QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||||
// for each group result, call the finalize function for each column
|
// for each group result, call the finalize function for each column
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
if (pRuntimeEnv->groupbyNormalCol) {
|
if (pRuntimeEnv->groupbyNormalCol) {
|
||||||
closeAllTimeWindow(pWindowResInfo);
|
closeAllTimeWindow(pWindowResInfo);
|
||||||
}
|
}
|
||||||
|
@ -4122,7 +4122,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
|
||||||
void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
|
void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
|
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current;
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
// lastKey needs to be updated
|
// lastKey needs to be updated
|
||||||
pTableQueryInfo->lastKey = nextKey;
|
pTableQueryInfo->lastKey = nextKey;
|
||||||
|
@ -4290,7 +4290,7 @@ void setIntervalQueryRange(SQInfo *pQInfo, TSKEY key) {
|
||||||
* operations involve.
|
* operations involve.
|
||||||
*/
|
*/
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
SWindowResInfo *pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
||||||
|
|
||||||
TSKEY sk = MIN(win.skey, win.ekey);
|
TSKEY sk = MIN(win.skey, win.ekey);
|
||||||
TSKEY ek = MAX(win.skey, win.ekey);
|
TSKEY ek = MAX(win.skey, win.ekey);
|
||||||
|
@ -4334,7 +4334,7 @@ bool needPrimaryTimestampCol(SQuery *pQuery, SDataBlockInfo *pDataBlockInfo) {
|
||||||
return loadPrimaryTS;
|
return loadPrimaryTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_t orderType) {
|
static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_t orderType) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
@ -4411,7 +4411,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo, int32_
|
||||||
* @param pQInfo
|
* @param pQInfo
|
||||||
* @param result
|
* @param result
|
||||||
*/
|
*/
|
||||||
void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResInfo *pResultInfo) {
|
void copyFromWindowResToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
|
int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
|
||||||
|
@ -4450,7 +4450,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
STableQueryInfo* pTableQueryInfo = pQuery->current;
|
STableQueryInfo* pTableQueryInfo = pQuery->current;
|
||||||
|
|
||||||
SWindowResInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
SResultRowInfo * pWindowResInfo = &pTableQueryInfo->windowResInfo;
|
||||||
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
|
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : pDataBlockInfo->rows - 1;
|
||||||
|
|
||||||
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
|
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || pRuntimeEnv->groupbyNormalCol) {
|
||||||
|
@ -4698,7 +4698,7 @@ void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) {
|
static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
assert(pQuery->limit.offset == 0);
|
assert(pQuery->limit.offset == 0);
|
||||||
STimeWindow tw = *win;
|
STimeWindow tw = *win;
|
||||||
|
@ -4764,7 +4764,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) {
|
||||||
|
|
||||||
STimeWindow w = TSWINDOW_INITIALIZER;
|
STimeWindow w = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
||||||
|
|
||||||
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
|
||||||
|
@ -5403,7 +5403,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
scanMultiTableDataBlocks(pQInfo);
|
scanMultiTableDataBlocks(pQInfo);
|
||||||
pQInfo->groupIndex += 1;
|
pQInfo->groupIndex += 1;
|
||||||
|
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
// no results generated for current group, continue to try the next group
|
// no results generated for current group, continue to try the next group
|
||||||
taosArrayDestroy(s);
|
taosArrayDestroy(s);
|
||||||
|
|
|
@ -43,7 +43,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, int32_t size, int16_t type) {
|
int32_t initWindowResInfo(SResultRowInfo *pWindowResInfo, int32_t size, int16_t type) {
|
||||||
pWindowResInfo->capacity = size;
|
pWindowResInfo->capacity = size;
|
||||||
|
|
||||||
pWindowResInfo->type = type;
|
pWindowResInfo->type = type;
|
||||||
|
@ -59,7 +59,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, int32_t size, int16_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) {
|
void cleanupTimeWindowInfo(SResultRowInfo *pWindowResInfo) {
|
||||||
if (pWindowResInfo == NULL) {
|
if (pWindowResInfo == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -77,7 +77,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) {
|
||||||
tfree(pWindowResInfo->pResult);
|
tfree(pWindowResInfo->pResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo) {
|
void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo) {
|
||||||
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
|
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -101,7 +101,7 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0 || num == 0) {
|
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0 || num == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -160,7 +160,7 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
|
void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
|
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -169,7 +169,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
clearFirstNWindowRes(pRuntimeEnv, numOfClosed);
|
clearFirstNWindowRes(pRuntimeEnv, numOfClosed);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) {
|
int32_t numOfClosedTimeWindow(SResultRowInfo *pWindowResInfo) {
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
while (i < pWindowResInfo->size && pWindowResInfo->pResult[i]->closed) {
|
while (i < pWindowResInfo->size && pWindowResInfo->pResult[i]->closed) {
|
||||||
++i;
|
++i;
|
||||||
|
@ -178,7 +178,7 @@ int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) {
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
|
||||||
void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
|
void closeAllTimeWindow(SResultRowInfo *pWindowResInfo) {
|
||||||
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
|
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
||||||
|
@ -195,7 +195,7 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) {
|
||||||
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
|
* the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time.
|
||||||
* NOTE: remove redundant, only when the result set order equals to traverse order
|
* NOTE: remove redundant, only when the result set order equals to traverse order
|
||||||
*/
|
*/
|
||||||
void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
|
void removeRedundantWindow(SResultRowInfo *pWindowResInfo, TSKEY lastKey, int32_t order) {
|
||||||
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
|
assert(pWindowResInfo->size >= 0 && pWindowResInfo->capacity >= pWindowResInfo->size);
|
||||||
if (pWindowResInfo->size <= 1) {
|
if (pWindowResInfo->size <= 1) {
|
||||||
return;
|
return;
|
||||||
|
@ -224,11 +224,11 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isWindowResClosed(SWindowResInfo *pWindowResInfo, int32_t slot) {
|
bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot) {
|
||||||
return (getResultRow(pWindowResInfo, slot)->closed == true);
|
return (getResultRow(pWindowResInfo, slot)->closed == true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void closeTimeWindow(SWindowResInfo *pWindowResInfo, int32_t slot) {
|
void closeTimeWindow(SResultRowInfo *pWindowResInfo, int32_t slot) {
|
||||||
getResultRow(pWindowResInfo, slot)->closed = true;
|
getResultRow(pWindowResInfo, slot)->closed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue