commit
6303f9e664
|
@ -88,20 +88,26 @@ static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order) {
|
||||
static int32_t overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockInfo, int32_t order, bool* overlap) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STimeWindow w = {0};
|
||||
|
||||
// 0 by default, which means it is not a interval operator of the upstream operator.
|
||||
if (pInterval->interval == 0) {
|
||||
return false;
|
||||
*overlap = false;
|
||||
return code;
|
||||
}
|
||||
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.skey);
|
||||
ASSERT(w.ekey >= pBlockInfo->window.skey);
|
||||
if(w.ekey < pBlockInfo->window.skey) {
|
||||
qError("w.ekey:%" PRId64 " < pBlockInfo->window.skey:%" PRId64, w.ekey, pBlockInfo->window.skey);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (w.ekey < pBlockInfo->window.ekey) {
|
||||
return true;
|
||||
*overlap = true;
|
||||
return code;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
@ -110,17 +116,25 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
|||
break;
|
||||
}
|
||||
|
||||
ASSERT(w.ekey > pBlockInfo->window.ekey);
|
||||
if(w.ekey <= pBlockInfo->window.ekey) {
|
||||
qError("w.ekey:%" PRId64 " <= pBlockInfo->window.ekey:%" PRId64, w.ekey, pBlockInfo->window.ekey);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
if (TMAX(w.skey, pBlockInfo->window.skey) <= pBlockInfo->window.ekey) {
|
||||
return true;
|
||||
*overlap = true;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
w = getAlignQueryTimeWindow(pInterval, pBlockInfo->window.ekey);
|
||||
ASSERT(w.skey <= pBlockInfo->window.ekey);
|
||||
if(w.skey > pBlockInfo->window.ekey) {
|
||||
qError("w.skey:%" PRId64 " > pBlockInfo->window.skey:%" PRId64, w.skey, pBlockInfo->window.ekey);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (w.skey > pBlockInfo->window.skey) {
|
||||
return true;
|
||||
*overlap = true;
|
||||
return code;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
|
@ -129,14 +143,19 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
|
|||
break;
|
||||
}
|
||||
|
||||
ASSERT(w.skey < pBlockInfo->window.skey);
|
||||
if(w.skey >= pBlockInfo->window.skey){
|
||||
qError("w.skey:%" PRId64 " >= pBlockInfo->window.skey:%" PRId64, w.skey, pBlockInfo->window.skey);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
if (pBlockInfo->window.skey <= TMIN(w.ekey, pBlockInfo->window.ekey)) {
|
||||
return true;
|
||||
*overlap = true;
|
||||
return code;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
*overlap = false;
|
||||
return code;
|
||||
}
|
||||
|
||||
// this function is for table scanner to extract temporary results of upstream aggregate results.
|
||||
|
@ -319,9 +338,18 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
|||
|
||||
bool loadSMA = false;
|
||||
*status = pTableScanInfo->dataBlockLoadFlag;
|
||||
if (pOperator->exprSupp.pFilterInfo != NULL ||
|
||||
overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
|
||||
if (pOperator->exprSupp.pFilterInfo != NULL) {
|
||||
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
} else {
|
||||
bool overlap = false;
|
||||
int ret =
|
||||
overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order, &overlap);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
if (overlap) {
|
||||
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
}
|
||||
}
|
||||
|
||||
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
||||
|
@ -358,7 +386,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
|
||||
if(*status != FUNC_DATA_REQUIRED_DATA_LOAD) {
|
||||
qError("[loadDataBlock] invalid status:%d", *status);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
// try to filter data block according to sma info
|
||||
if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
|
||||
|
@ -413,7 +444,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
|||
return code;
|
||||
}
|
||||
|
||||
ASSERT(p == pBlock);
|
||||
if(p != pBlock) {
|
||||
qError("[loadDataBlock] p != pBlock");
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows);
|
||||
|
||||
// restore the previous value
|
||||
|
|
Loading…
Reference in New Issue