[TD-225]refactor.
This commit is contained in:
parent
03f361ec96
commit
e534490a0b
|
@ -152,7 +152,7 @@ typedef struct SQuery {
|
|||
int16_t precision;
|
||||
int16_t numOfOutput;
|
||||
int16_t fillType;
|
||||
int16_t checkBuffer; // check if the buffer is full during scan each block
|
||||
int16_t checkResultBuf; // check if the buffer is full during scan each block
|
||||
SLimitVal limit;
|
||||
int32_t rowSize;
|
||||
SSqlGroupbyExpr* pGroupbyExpr;
|
||||
|
|
|
@ -1708,7 +1708,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl
|
|||
numOfRes = (int32_t) getNumOfResult(pRuntimeEnv);
|
||||
|
||||
// update the number of output result
|
||||
if (numOfRes > 0 && pQuery->checkBuffer == 1) {
|
||||
if (numOfRes > 0 && pQuery->checkResultBuf == 1) {
|
||||
assert(numOfRes >= pQuery->rec.rows);
|
||||
pQuery->rec.rows = numOfRes;
|
||||
|
||||
|
@ -2222,9 +2222,9 @@ void getAlignQueryTimeWindow(SQuery *pQuery, int64_t key, int64_t keyFirst, int6
|
|||
|
||||
static void setScanLimitationByResultBuffer(SQuery *pQuery) {
|
||||
if (isTopBottomQuery(pQuery)) {
|
||||
pQuery->checkBuffer = 0;
|
||||
pQuery->checkResultBuf = 0;
|
||||
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
||||
pQuery->checkBuffer = 0;
|
||||
pQuery->checkResultBuf = 0;
|
||||
} else {
|
||||
bool hasMultioutput = false;
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
|
@ -2239,7 +2239,7 @@ static void setScanLimitationByResultBuffer(SQuery *pQuery) {
|
|||
}
|
||||
}
|
||||
|
||||
pQuery->checkBuffer = hasMultioutput ? 1 : 0;
|
||||
pQuery->checkResultBuf = hasMultioutput ? 1 : 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -4761,20 +4761,21 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO refactor: setAdditionalInfo
|
||||
static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
|
||||
} else { // interval query
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
TSKEY nextKey = pBlockInfo->window.skey;
|
||||
setIntervalQueryRange(pQInfo, nextKey);
|
||||
|
||||
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTsBuf != NULL) {
|
||||
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
|
||||
}
|
||||
} else { // non-interval query
|
||||
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5630,8 +5631,6 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
|||
return;
|
||||
}
|
||||
|
||||
pQuery->current = pTableInfo; // set current query table info
|
||||
|
||||
scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
|
||||
finalizeQueryResult(pRuntimeEnv);
|
||||
|
||||
|
@ -5650,10 +5649,8 @@ static void tableAggregationProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
|||
static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
pQuery->current = pTableInfo;
|
||||
|
||||
// for ts_comp query, re-initialized is not allowed
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
if (!isTSCompQuery(pQuery)) {
|
||||
resetDefaultResInfoOutputBuf(pRuntimeEnv);
|
||||
}
|
||||
|
@ -5705,9 +5702,7 @@ static void tableProjectionProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
|||
// handle time interval query on table
|
||||
static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &(pQInfo->runtimeEnv);
|
||||
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
pQuery->current = pTableInfo;
|
||||
|
||||
TSKEY newStartKey = QUERY_IS_ASC_QUERY(pQuery)? INT64_MIN:INT64_MAX;
|
||||
|
||||
|
@ -5777,7 +5772,6 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|||
}
|
||||
|
||||
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||
return;
|
||||
} else {
|
||||
pQuery->rec.rows = 0;
|
||||
assert(pRuntimeEnv->windowResInfo.size > 0);
|
||||
|
@ -5795,10 +5789,10 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|||
if (pQuery->rec.rows <= 0 || pRuntimeEnv->windowResInfo.size <= pQInfo->groupIndex) {
|
||||
qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pQuery->rec.total);
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// number of points returned during this query
|
||||
pQuery->rec.rows = 0;
|
||||
|
@ -5806,7 +5800,9 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|||
|
||||
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
|
||||
SArray* g = GET_TABLEGROUP(pQInfo, 0);
|
||||
|
||||
STableQueryInfo* item = taosArrayGetP(g, 0);
|
||||
pQuery->current = item;
|
||||
|
||||
// group by normal column, sliding window query, interval query are handled by interval query processor
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyColumn) { // interval (down sampling operation)
|
||||
|
@ -5814,7 +5810,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
|||
} else if (isFixedOutputQuery(pRuntimeEnv)) {
|
||||
tableAggregationProcess(pQInfo, item);
|
||||
} else { // diff/add/multiply/subtract/division
|
||||
assert(pQuery->checkBuffer == 1);
|
||||
assert(pQuery->checkResultBuf == 1);
|
||||
tableProjectionProcess(pQInfo, item);
|
||||
}
|
||||
|
||||
|
@ -5834,7 +5830,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
|
|||
(isFixedOutputQuery(pRuntimeEnv) && (!isPointInterpoQuery(pQuery)) && (!pRuntimeEnv->groupbyColumn))) {
|
||||
multiTableQueryProcess(pQInfo);
|
||||
} else {
|
||||
assert((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
|
||||
assert((pQuery->checkResultBuf == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) ||
|
||||
pRuntimeEnv->groupbyColumn);
|
||||
|
||||
sequentialTableProcess(pQInfo);
|
||||
|
|
Loading…
Reference in New Issue