commit
7b5db1a317
|
@ -808,18 +808,19 @@ void adjustLoserTreeFromNewData(SLocalReducer *pLocalReducer, SLocalDataSource *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo,
|
void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo *pQueryInfo, SFillInfo *pFillInfo) {
|
||||||
SFillInfo *pFillInfo) {
|
|
||||||
// discard following dataset in the same group and reset the interpolation information
|
// discard following dataset in the same group and reset the interpolation information
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
|
||||||
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
|
||||||
|
|
||||||
int16_t prec = tinfo.precision;
|
if (pFillInfo != NULL) {
|
||||||
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
|
int64_t stime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.skey : pQueryInfo->window.ekey;
|
||||||
int64_t revisedSTime =
|
int64_t revisedSTime =
|
||||||
taosGetIntervalStartTimestamp(stime, pQueryInfo->intervalTime, pQueryInfo->slidingTimeUnit, prec);
|
taosGetIntervalStartTimestamp(stime, pQueryInfo->slidingTime, pQueryInfo->slidingTimeUnit, tinfo.precision);
|
||||||
taosResetFillInfo(pFillInfo, revisedSTime);
|
|
||||||
|
taosResetFillInfo(pFillInfo, revisedSTime);
|
||||||
|
}
|
||||||
|
|
||||||
pLocalReducer->discard = true;
|
pLocalReducer->discard = true;
|
||||||
pLocalReducer->discardData->num = 0;
|
pLocalReducer->discardData->num = 0;
|
||||||
|
@ -915,13 +916,12 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
|
||||||
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
|
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
|
||||||
/* impose the limitation of output rows on the final result */
|
/* impose the limitation of output rows on the final result */
|
||||||
int32_t prevSize = pFinalDataPage->num;
|
int32_t prevSize = pFinalDataPage->num;
|
||||||
int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
|
int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
|
||||||
|
assert(overflow < pRes->numOfRows);
|
||||||
assert(overFlow < pRes->numOfRows);
|
|
||||||
|
|
||||||
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
|
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
|
||||||
pRes->numOfRows -= overFlow;
|
pRes->numOfRows -= overflow;
|
||||||
pFinalDataPage->num -= overFlow;
|
pFinalDataPage->num -= overflow;
|
||||||
|
|
||||||
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
|
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
|
||||||
|
|
||||||
|
@ -988,13 +988,13 @@ static void doInterpolateResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, boo
|
||||||
|
|
||||||
if (pRes->numOfRows > 0) {
|
if (pRes->numOfRows > 0) {
|
||||||
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
|
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
|
||||||
int32_t overFlow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
|
int32_t overflow = pRes->numOfClauseTotal - pQueryInfo->limit.limit;
|
||||||
pRes->numOfRows -= overFlow;
|
pRes->numOfRows -= overflow;
|
||||||
|
|
||||||
assert(pRes->numOfRows >= 0);
|
assert(pRes->numOfRows >= 0);
|
||||||
|
|
||||||
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
|
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
|
||||||
pFinalDataPage->num -= overFlow;
|
pFinalDataPage->num -= overflow;
|
||||||
|
|
||||||
/* set remain data to be discarded, and reset the interpolation information */
|
/* set remain data to be discarded, and reset the interpolation information */
|
||||||
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
|
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
|
||||||
|
|
|
@ -3030,11 +3030,13 @@ static void updateTableQueryInfoForReverseScan(SQuery *pQuery, STableQueryInfo *
|
||||||
|
|
||||||
// order has change already!
|
// order has change already!
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
|
||||||
assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
|
// TODO validate the assertion
|
||||||
} else {
|
// if (!QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
// assert(pTableQueryInfo->win.ekey >= pTableQueryInfo->lastKey + step);
|
||||||
}
|
// } else {
|
||||||
|
// assert(pTableQueryInfo->win.ekey <= pTableQueryInfo->lastKey + step);
|
||||||
|
// }
|
||||||
|
|
||||||
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
pTableQueryInfo->win.ekey = pTableQueryInfo->lastKey + step;
|
||||||
|
|
||||||
|
@ -3113,7 +3115,7 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
|
||||||
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
|
void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
SWITCH_ORDER(pRuntimeEnv->pCtx[i] .order);
|
SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4384,10 +4386,11 @@ static int64_t queryOnDataBlocks(SQInfo *pQInfo) {
|
||||||
|
|
||||||
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
||||||
|
|
||||||
TSKEY nextKey = blockInfo.window.skey;
|
|
||||||
if (!isIntervalQuery(pQuery)) {
|
if (!isIntervalQuery(pQuery)) {
|
||||||
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, nextKey);
|
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
|
||||||
|
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIdx, blockInfo.window.ekey + step);
|
||||||
} else { // interval query
|
} else { // interval query
|
||||||
|
TSKEY nextKey = blockInfo.window.skey;
|
||||||
setIntervalQueryRange(pQInfo, nextKey);
|
setIntervalQueryRange(pQInfo, nextKey);
|
||||||
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
|
int32_t ret = setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
|
||||||
|
|
||||||
|
|
|
@ -545,9 +545,10 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
/*bool hasData = */ initTableMemIterator(pQueryHandle, pCheckInfo);
|
||||||
|
|
||||||
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
|
TSKEY k1 = TSKEY_INITIAL_VAL, k2 = TSKEY_INITIAL_VAL;
|
||||||
if (pCheckInfo->iter != NULL) {
|
if (pCheckInfo->iter != NULL && tSkipListIterGet(pCheckInfo->iter) != NULL) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
|
||||||
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
k1 = dataRowKey(row);
|
k1 = dataRowKey(row);
|
||||||
|
|
||||||
if (k1 == binfo.window.skey) {
|
if (k1 == binfo.window.skey) {
|
||||||
|
@ -561,9 +562,10 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCheckInfo->iiter != NULL) {
|
if (pCheckInfo->iiter != NULL && tSkipListIterGet(pCheckInfo->iiter) != NULL) {
|
||||||
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
|
||||||
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
k2 = dataRowKey(row);
|
k2 = dataRowKey(row);
|
||||||
|
|
||||||
if (k2 == binfo.window.skey) {
|
if (k2 == binfo.window.skey) {
|
||||||
|
@ -583,6 +585,12 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
|
mergeDataInDataBlock(pQueryHandle, pCheckInfo, pBlock, sa);
|
||||||
} else {
|
} else {
|
||||||
pQueryHandle->realNumOfRows = binfo.rows;
|
pQueryHandle->realNumOfRows = binfo.rows;
|
||||||
|
|
||||||
|
cur->rows = binfo.rows;
|
||||||
|
cur->win = binfo.window;
|
||||||
|
cur->mixBlock = false;
|
||||||
|
cur->blockCompleted = true;
|
||||||
|
cur->lastKey = binfo.window.ekey + (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { //desc order
|
} else { //desc order
|
||||||
|
@ -914,7 +922,10 @@ static void mergeDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo
|
||||||
|
|
||||||
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
||||||
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order);
|
int32_t end = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, key, order);
|
||||||
|
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
|
||||||
|
tSkipListIterNext(pCheckInfo->iter);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t start = -1;
|
int32_t start = -1;
|
||||||
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
||||||
int32_t remain = end - pos + 1;
|
int32_t remain = end - pos + 1;
|
||||||
|
|
Loading…
Reference in New Issue