diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 583496a4c6..9572d9d784 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -462,7 +462,8 @@ typedef struct { int32_t tz; // query client timezone char intervalUnit; char slidingUnit; - char offsetUnit; + char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. + int8_t precision; int64_t interval; int64_t sliding; int64_t offset; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c3f5b37a09..819c0a74f7 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -76,11 +76,12 @@ typedef struct SResultRowCell { * If the number of generated results is greater than this value, * query query will be halt and return results to client immediate. */ -typedef struct SRspResultInfo { - int64_t total; // total generated result size in rows - int32_t capacity; // capacity of current result output buffer - int32_t threshold; // result size threshold in rows. -} SRspResultInfo; +typedef struct SResultInfo { // TODO refactor + int64_t totalRows; // total generated result size in rows + int64_t totalBytes; // total results in bytes. + int32_t capacity; // capacity of current result output buffer + int32_t threshold; // result size threshold in rows. +} SResultInfo; typedef struct SColumnFilterElem { int16_t bytes; // column length @@ -160,8 +161,8 @@ typedef struct STaskCostInfo { typedef struct SOperatorCostInfo { uint64_t openCost; uint64_t execCost; - uint64_t totalRows; - uint64_t totalBytes; +// uint64_t totalRows; +// uint64_t totalBytes; } SOperatorCostInfo; typedef struct { @@ -301,7 +302,7 @@ typedef struct STaskRuntimeEnv { int64_t currentOffset; // dynamic offset value STableQueryInfo* current; - SRspResultInfo resultInfo; + SResultInfo resultInfo; SHashObj* pTableRetrieveTsMap; struct SUdfInfo* pUdfInfo; } STaskRuntimeEnv; @@ -324,7 +325,7 @@ typedef struct SOperatorInfo { STaskRuntimeEnv* pRuntimeEnv; // todo remove it SExecTaskInfo* pTaskInfo; SOperatorCostInfo cost; - + SResultInfo resultInfo; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator __optr_fn_t getNextFn; @@ -539,6 +540,8 @@ typedef struct SFillOperatorInfo { void** p; SSDataBlock* existNewGroupBlock; bool multigroupResult; + SInterval intervalInfo; + int32_t capacity; } SFillOperatorInfo; typedef struct SGroupKeys { @@ -649,21 +652,19 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput, bool multigroupResult); - SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 787a8979fc..1b555e396b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -248,7 +248,7 @@ static int32_t setGroupResultOutputBuf_rv(SOptrBasicInfo *binfo, int32_t numOfCo static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); -static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo); +static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); @@ -7113,22 +7113,23 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRunti static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { SFillOperatorInfo *pInfo = pOperator->info; - pInfo->pRes->info.rows = 0; + SResultInfo* pResultInfo = &pOperator->resultInfo; + blockDataCleanup(pInfo->pRes); if (pOperator->status == OP_EXEC_DONE) { return NULL; } - STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); - if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { - return pInfo->pRes; - } +// doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); +// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { +// return pInfo->pRes; +// } + SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while(1) { - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = pDownstream->getNextFn(pDownstream, newgroup); + publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { assert(pBlock != NULL); @@ -7140,7 +7141,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { // Fill the previous group data block, before handle the data block of new group. // Close the fill operation for previous group data block - taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); +// taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); } else { if (pBlock == NULL) { if (pInfo->totalInputRows == 0) { @@ -7148,7 +7149,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { return NULL; } - taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); +// taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); } else { pInfo->totalInputRows += pBlock->info.rows; taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); @@ -7156,25 +7157,25 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) { } } - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pInfo->capacity, pInfo->p); // current group has no more result to return if (pInfo->pRes->info.rows > 0) { // 1. The result in current group not reach the threshold of output result, continue // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately - if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) { + if (pInfo->pRes->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) { return pInfo->pRes; } - doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); - if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { +// doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); + if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) { return pInfo->pRes; } } else if (pInfo->existNewGroupBlock) { // try next group assert(pBlock != NULL); - doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); +// doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup); - if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + if (pInfo->pRes->info.rows > pResultInfo->threshold) { return pInfo->pRes; } } else { @@ -7537,8 +7538,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 return 0; } -SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { - ASSERT(numOfDownstream == 1); +SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -7622,7 +7622,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); +// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7647,7 +7647,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pInfo->colIndex = -1; pInfo->reptScan = false; pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); +// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7712,7 +7712,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); +// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7736,7 +7736,7 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); +// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -7827,52 +7827,58 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } -SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, bool multigroupResult, SExecTaskInfo* pTaskInfo) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); - pInfo->multigroupResult = multigroupResult; + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pInfo->pRes = pResBlock; + pInfo->multigroupResult = multigroupResult; + pInfo->intervalInfo = *pInterval; + + SResultInfo* pResultInfo = &pOperator->resultInfo; { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal); +// struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pQueryAttr->fillVal); STimeWindow w = TSWINDOW_INITIALIZER; - TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); - TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); -// getAlignQueryTimeWindow(pQueryAttr, pQueryAttr->window.skey, sk, ek, &w); + TSKEY sk = TMIN(pTaskInfo->window.skey, pTaskInfo->window.ekey); + TSKEY ek = TMAX(pTaskInfo->window.skey, pTaskInfo->window.ekey); + getAlignQueryTimeWindow(pInterval, pInterval->precision, pTaskInfo->window.skey, sk, ek, &w); - pInfo->pFillInfo = - taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, - pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, - (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); + int32_t order = TSDB_ORDER_ASC; - pInfo->p = calloc(numOfOutput, POINTER_BYTES); +// pInfo->pFillInfo = +// taosCreateFillInfo(order, w.skey, 0, (int32_t)pResultInfo->capacity, numOfCols, +// pInterval->sliding, pInterval->slidingUnit, +// (int8_t)pInterval->precision, pQueryAttr->fillType, pColInfo, pTaskInfo->id.str); + + pInfo->p = calloc(numOfCols, POINTER_BYTES); } - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "FillOperator"; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Fill; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doFill; - pOperator->closeFn = destroySFillOperatorInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = doFill; + pOperator->pTaskInfo = pTaskInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + pOperator->closeFn = destroySFillOperatorInfo; + + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); // pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); // pInfo->slimit = pQueryAttr->slimit; // pInfo->limit = pQueryAttr->limit; -// pInfo->capacity = pRuntimeEnv->resultInfo.capacity; +// pInfo->capacity = pResultInfo->capacity; // pInfo->threshold = (int64_t)(pInfo->capacity * 0.8); // pInfo->currentOffset = pQueryAttr->limit.offset; // pInfo->currentGroupOffset = pQueryAttr->slimit.offset; @@ -7895,9 +7901,8 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI offset += pExpr[index->colIndex].base.resSchema.bytes; } - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SLimitOperator"; // pOperator->operatorType = OP_SLimit; @@ -7920,7 +7925,7 @@ static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) { } STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; + int32_t maxNumOfTables = (int32_t)pResultInfo->capacity; STagScanInfo *pInfo = pOperator->info; SSDataBlock *pRes = pInfo->pRes; @@ -8046,7 +8051,7 @@ static SSDataBlock* doTagScan(SOperatorInfo *pOperator, bool* newgroup) { SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); +// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); assert(numOfGroup == 0 || numOfGroup == 1); @@ -8390,10 +8395,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); - - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, - pScanPhyNode->reverse, pTaskInfo); + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); @@ -8847,7 +8849,7 @@ static void doUpdateExprColumnIndex(STaskAttr *pQueryAttr) { } } -void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { +void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) { const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); // the minimum number of rows for projection query @@ -8868,7 +8870,7 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { } pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO); - pResultInfo->total = 0; + pResultInfo->totalRows = 0; } //TODO refactor