[td-2895] refactor.
This commit is contained in:
parent
92d6d0af7e
commit
b9867fab6a
|
@ -356,9 +356,11 @@ typedef struct STableScanInfo {
|
|||
|
||||
SQLFunctionCtx *pCtx; // next operator query context
|
||||
SResultRowInfo *pResultRowInfo;
|
||||
int32_t *rowCellInfoOffset;
|
||||
SExprInfo *pExpr;
|
||||
|
||||
int32_t numOfOutput;
|
||||
int32_t *rowCellInfoOffset;
|
||||
|
||||
int64_t elapsedTime;
|
||||
} STableScanInfo;
|
||||
|
||||
|
|
|
@ -1352,7 +1352,21 @@ static void min_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
|||
}
|
||||
|
||||
static void stddev_function(SQLFunctionCtx *pCtx) {
|
||||
SStddevInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SStddevInfo *pStd = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
if (pCtx->currentStage == REPEAT_SCAN && pStd->stage == 0) {
|
||||
pStd->stage++;
|
||||
avg_finalizer(pCtx);
|
||||
|
||||
pResInfo->initialized = true; // set it initialized to avoid re-initialization
|
||||
|
||||
// save average value into tmpBuf, for second stage scan
|
||||
SAvgInfo *pAvg = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
pStd->avg = GET_DOUBLE_VAL(pCtx->pOutput);
|
||||
assert((isnan(pAvg->sum) && pAvg->num == 0) || (pStd->num == pAvg->num && pStd->avg == pAvg->sum));
|
||||
}
|
||||
|
||||
if (pStd->stage == 0) {
|
||||
// the first stage is to calculate average value
|
||||
|
|
|
@ -1126,6 +1126,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SQLFunctionCtx* pC
|
|||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
pCtx[i].size = pBlock->info.rows;
|
||||
pCtx[i].order = order;
|
||||
pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag;
|
||||
|
||||
setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo);
|
||||
}
|
||||
|
@ -1152,6 +1153,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx,
|
|||
for (int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
pCtx[i].size = pBlock->info.rows;
|
||||
pCtx[i].order = order;
|
||||
pCtx[i].currentStage = pOperator->pRuntimeEnv->scanFlag;
|
||||
|
||||
setBlockStatisInfo(&pCtx[i], pBlock, &pOperator->pExpr[i].base.colInfo);
|
||||
|
||||
|
@ -1181,7 +1183,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
|
|||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
for (int32_t k = 0; k < pOperator->numOfOutput; ++k) {
|
||||
setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo);
|
||||
// setBlockStatisInfo(&pCtx[k], pSDataBlock, &pOperator->pExpr[k].base.colInfo);
|
||||
|
||||
int32_t functionId = pCtx[k].functionId;
|
||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||
|
@ -1925,15 +1927,13 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde
|
|||
|
||||
pCtx->hasNull = hasNullRv(pColIndex, pStatis);
|
||||
|
||||
// limit/offset query will affect this value
|
||||
pCtx->size = pSDataBlock->info.rows;
|
||||
#if 0
|
||||
// set the statistics data for primary time stamp column
|
||||
// if (pCtx->functionId == TSDB_FUNC_SPREAD &&colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
// pCtx->preAggVals.isSet = true;
|
||||
// pCtx->preAggVals.statis.min = pBlockInfo->window.skey;
|
||||
// pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
|
||||
// }
|
||||
if (pCtx->functionId == TSDB_FUNC_SPREAD &&colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
pCtx->preAggVals.isSet = true;
|
||||
pCtx->preAggVals.statis.min = pBlockInfo->window.skey;
|
||||
pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -2222,7 +2222,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
}
|
||||
|
||||
if (!pQuery->stableQuery) { // TODO this problem should be handed at the client side
|
||||
if (!pQuery->stableQuery || isProjQuery(pQuery)) { // TODO this problem should be handed at the client side
|
||||
if (pQuery->limit.offset > 0) {
|
||||
pRuntimeEnv->proot = createOffsetOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
|
||||
}
|
||||
|
@ -2654,10 +2654,10 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i
|
|||
|
||||
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
|
||||
|
||||
static bool doDataBlockStaticFilter(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) {
|
||||
static bool doFilterOnBlockStatistics(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) {
|
||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (pDataStatis == NULL || (pQuery->numOfFilterCols == 0 && (!pQuery->topBotQuery))) {
|
||||
if (pDataStatis == NULL || pQuery->numOfFilterCols == 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -2783,7 +2783,7 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte
|
|||
|
||||
qualified = false;
|
||||
for (int32_t j = 0; j < pFilterInfo[k].numOfFilters; ++j) {
|
||||
SColumnFilterElem *pFilterElem = &pFilterInfo->pFilters[j];
|
||||
SColumnFilterElem *pFilterElem = &pFilterInfo[k].pFilters[j];
|
||||
|
||||
bool isnull = isNull(pElem, pFilterInfo[k].info.type);
|
||||
if (isnull) {
|
||||
|
@ -2858,12 +2858,12 @@ void filterDataBlock_rv(SSingleColumnFilterInfo *pFilterInfo, int32_t numOfFilte
|
|||
pBlock->pBlockStatis = NULL; // clean the block statistics info
|
||||
|
||||
if (start > 0) {
|
||||
SColumnInfoData *pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
SColumnInfoData* pColumnInfoData = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
assert(pColumnInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP &&
|
||||
pColumnInfoData->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
|
||||
|
||||
pBlock->info.window.skey = *(int64_t *)pColumnInfoData->pData;
|
||||
pBlock->info.window.ekey = *(int64_t *)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1));
|
||||
pBlock->info.window.skey = *(int64_t*)pColumnInfoData->pData;
|
||||
pBlock->info.window.ekey = *(int64_t*)(pColumnInfoData->pData + TSDB_KEYSIZE * (start - 1));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2905,11 +2905,12 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
|
|||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
SSqlFuncMsg* pSqlFunc = &pQuery->pExpr1[i].base;
|
||||
int32_t numOfOutput = pTableScanInfo->numOfOutput;
|
||||
SQLFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
||||
|
||||
int32_t functionId = pSqlFunc->functionId;
|
||||
int32_t colId = pSqlFunc->colInfo.colId;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId;
|
||||
|
||||
// group by + first/last should not apply the first/last block filter
|
||||
if (!pQuery->groupbyColumn && (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST)) {
|
||||
|
@ -2968,7 +2969,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
|
|||
}
|
||||
|
||||
// current block has been discard due to filter applied
|
||||
if (!doDataBlockStaticFilter(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
|
||||
if (!doFilterOnBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
|
||||
pCost->discardBlocks += 1;
|
||||
qDebug("QInfo:%p data block discard, brange:%"PRId64 "-%"PRId64", rows:%d", pQInfo, pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
(*status) = BLK_DATA_DISCARD;
|
||||
|
@ -2983,6 +2984,7 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
|
|||
}
|
||||
|
||||
if (pQuery->numOfFilterCols > 0) {
|
||||
// set the initial static data value filter expression
|
||||
if (pQuery->pFilterInfo[0].pData == NULL) {
|
||||
for(int32_t i = 0; i < pQuery->numOfFilterCols; ++i) {
|
||||
for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
|
||||
|
@ -3547,7 +3549,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR
|
|||
assert(pCtx[i].pOutput != NULL);
|
||||
|
||||
// set the timestamp output buffer for top/bottom/diff query
|
||||
int32_t functionId = pCtx->functionId;
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
|
||||
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
|
||||
}
|
||||
|
@ -4198,7 +4200,6 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
|
|||
}
|
||||
|
||||
pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQuery, bufPage, pResult->offset, offset);
|
||||
pCtx[i].currentStage = 0;
|
||||
offset += pCtx[i].outputBytes;
|
||||
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
|
@ -5975,8 +5976,6 @@ static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) {
|
|||
// }
|
||||
}
|
||||
|
||||
|
||||
|
||||
#if 0
|
||||
static UNUSED_FUNC void multiTableQueryProcess(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
@ -6062,6 +6061,7 @@ static char *getArithemicInputSrc(void *param, const char *name, int32_t colId)
|
|||
static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
|
||||
SSDataBlock *pBlock = &pTableScanInfo->block;
|
||||
SQuery* pQuery = pTableScanInfo->pRuntimeEnv->pQuery;
|
||||
STableGroupInfo* pTableGroupInfo = &pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo;
|
||||
|
||||
while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) {
|
||||
pTableScanInfo->numOfBlocks += 1;
|
||||
|
@ -6069,18 +6069,14 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
|
|||
// todo check for query cancel
|
||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info);
|
||||
|
||||
if (pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables > 1 ||
|
||||
(pQuery->current == NULL && pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1)) {
|
||||
STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet(
|
||||
pTableScanInfo->pRuntimeEnv->tableqinfoGroupInfo.map, &pBlock->info.tid, sizeof(pBlock->info.tid));
|
||||
if (pTableGroupInfo->numOfTables > 1 || (pQuery->current == NULL && pTableGroupInfo->numOfTables == 1)) {
|
||||
STableQueryInfo **pTableQueryInfo = (STableQueryInfo **)taosHashGet( pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid));
|
||||
if (pTableQueryInfo == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
pQuery->current = *pTableQueryInfo;
|
||||
doTableQueryInfoTimeWindowCheck(pQuery, *pTableQueryInfo);
|
||||
} else if (pTableScanInfo->pRuntimeEnv->pQuery->current == NULL) {
|
||||
|
||||
}
|
||||
|
||||
// this function never returns error?
|
||||
|
@ -6091,7 +6087,7 @@ static SSDataBlock* doTableScanImpl(STableScanInfo *pTableScanInfo) {
|
|||
}
|
||||
|
||||
// current block is ignored according to filter result by block statistics data, continue load the next block
|
||||
if (status == BLK_DATA_DISCARD) {
|
||||
if (status == BLK_DATA_DISCARD || pBlock->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -6147,6 +6143,8 @@ static SSDataBlock* doTableScan(void* param) {
|
|||
qDebug("QInfo:%p start to reverse scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
||||
pRuntimeEnv->qinfo, cond.twindow.skey, cond.twindow.ekey);
|
||||
|
||||
pRuntimeEnv->scanFlag = REVERSE_SCAN;
|
||||
|
||||
pTableScanInfo->times = 1;
|
||||
pTableScanInfo->current = 0;
|
||||
pTableScanInfo->reverseTimes = 0;
|
||||
|
@ -6187,6 +6185,9 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
|
|||
void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) {
|
||||
assert(pTableScanInfo != NULL && pDownstream != NULL);
|
||||
|
||||
pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr
|
||||
pTableScanInfo->numOfOutput = pDownstream->numOfOutput;
|
||||
|
||||
char* name = pDownstream->name;
|
||||
if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) {
|
||||
SAggOperatorInfo* pAggInfo = pDownstream->info;
|
||||
|
@ -6248,14 +6249,14 @@ static SOperatorInfo* createBiDirectionTableScanInfo(void* pTsdbQueryHandle, SQu
|
|||
return pOptr;
|
||||
}
|
||||
|
||||
static UNUSED_FUNC int32_t getTableScanId(STableScanInfo* pTableScanInfo) {
|
||||
return pTableScanInfo->current;
|
||||
}
|
||||
|
||||
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
|
||||
return pTableScanInfo->order;
|
||||
}
|
||||
|
||||
//static int32_t getTableScanFlag(STableScanInfo* pTableScanInfo) {
|
||||
// return pTableScanInfo->
|
||||
//}
|
||||
|
||||
// this is a blocking operator
|
||||
static SSDataBlock* doAggregate(void* param) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
|
@ -6388,7 +6389,11 @@ static SSDataBlock* doArithmeticOperation(void* param) {
|
|||
}
|
||||
}
|
||||
|
||||
return pArithInfo->binfo.pRes;
|
||||
if (pArithInfo->binfo.pRes->info.rows > 0) {
|
||||
return pArithInfo->binfo.pRes;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doLimit(void* param) {
|
||||
|
|
Loading…
Reference in New Issue