diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a30d5646e7..e897617cd5 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -42,6 +42,8 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) +#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) + enum { // when query starts to execute, this status will set QUERY_NOT_COMPLETED = 0x1u, @@ -281,8 +283,7 @@ enum { typedef struct SOperatorInfo { uint8_t operatorType; bool blockingOptr; // block operator or not - uint8_t completed; // denote if current operator is completed - uint32_t seed; // operator seed + uint8_t status; // denote if current operator is completed int32_t numOfOutput; // number of columns of the current operator results char *name; // name, used to show the query execution plan void *info; // extension attribution @@ -306,7 +307,6 @@ typedef struct SQInfo { SQueryRuntimeEnv runtimeEnv; SQuery query; - SHashObj* arrTableIdInfo; /* @@ -363,13 +363,14 @@ typedef struct STableScanInfo { SExprInfo *pExpr; int32_t numOfOutput; - int64_t elapsedTime; } STableScanInfo; typedef struct STagScanInfo { SColumnInfo* pCols; SSDataBlock* pRes; + int32_t totalTables; + int32_t currentIndex; } STagScanInfo; typedef struct SOptrBasicInfo { @@ -379,12 +380,17 @@ typedef struct SOptrBasicInfo { SSDataBlock *pRes; } SOptrBasicInfo; -typedef struct SOptrBasicInfo SAggOperatorInfo; -typedef struct SOptrBasicInfo SHashIntervalOperatorInfo; +typedef struct SOptrBasicInfo STableIntervalOperatorInfo; + +typedef struct SAggOperatorInfo { + SOptrBasicInfo binfo; + uint32_t seed; +} SAggOperatorInfo; typedef struct SArithOperatorInfo { SOptrBasicInfo binfo; int32_t bufCapacity; + uint32_t seed; } SArithOperatorInfo; typedef struct SLimitOperatorInfo { @@ -401,10 +407,10 @@ typedef struct SFillOperatorInfo { int64_t totalInputRows; } SFillOperatorInfo; -typedef struct SHashGroupbyOperatorInfo { - SOptrBasicInfo binfo; - int32_t colIndex; -} SHashGroupbyOperatorInfo; +typedef struct SGroupbyOperatorInfo { + SOptrBasicInfo binfo; + int32_t colIndex; +} SGroupbyOperatorInfo; void freeParam(SQueryParam *param); int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8bebbbd24b..4ed79baf8b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -1198,7 +1198,7 @@ static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionC } static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, - SHashIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) { + STableIntervalOperatorInfo* pInfo, SSDataBlock* pSDataBlock, int32_t groupId) { SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; int32_t numOfOutput = pOperatorInfo->numOfOutput; SQuery* pQuery = pRuntimeEnv->pQuery; @@ -1250,8 +1250,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // int32_t p = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pSDataBlock->info.rows - 1; // doRowwiseTimeWindowInterpolation(pRuntimeEnv, pSDataBlock->pDataBlock, *(TSKEY *)pRuntimeEnv->prevRow[0], - // -1, tsCols[0], p, - // w.ekey, RESULT_ROW_END_INTERP); + // -1, tsCols[0], p, w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pInfo->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP); @@ -1298,7 +1297,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } } -static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SHashGroupbyOperatorInfo *pInfo, +static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -1331,7 +1330,6 @@ static void hashGroupbyAgg(SQueryRuntimeEnv *pRuntimeEnv, SOperatorInfo* pOperat } #if 0 - /** * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv @@ -4214,7 +4212,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe } -void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SAggOperatorInfo *pInfo, int32_t numOfOutput, +void setExecutionContext_rv(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int32_t numOfOutput, int32_t groupIndex, TSKEY nextKey) { STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; @@ -5460,6 +5458,7 @@ static UNUSED_FUNC bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t ind return true; } #endif + STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) { STsdbQueryCond cond = { .colList = pQuery->colList, @@ -6177,7 +6176,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqScanTableOp"; pOperator->blockingOptr = false; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->numOfOutput = pRuntimeEnv->pQuery->numOfCols; pOperator->exec = doTableScan; @@ -6195,25 +6194,25 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf if ((strcasecmp(name, "TableAggregate") == 0) || (strcasecmp(name, "STableAggregate") == 0)) { SAggOperatorInfo* pAggInfo = pDownstream->info; - pTableScanInfo->pCtx = pAggInfo->pCtx; - pTableScanInfo->pResultRowInfo = &pAggInfo->resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pAggInfo->rowCellInfoOffset; + pTableScanInfo->pCtx = pAggInfo->binfo.pCtx; + pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo; + pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; } else if (strcasecmp(name, "HashIntervalAgg") == 0) { - SHashIntervalOperatorInfo *pIntervalInfo = pDownstream->info; + STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info; pTableScanInfo->pCtx = pIntervalInfo->pCtx; pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; } else if (strcasecmp(name, "HashGroupbyAgg") == 0) { - SHashGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; + SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx; pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo; pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset; } else if (strcasecmp(name, "STableIntervalAggOp") == 0) { - SHashIntervalOperatorInfo *pInfo = pDownstream->info; + STableIntervalOperatorInfo *pInfo = pDownstream->info; pTableScanInfo->pCtx = pInfo->pCtx; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; @@ -6259,11 +6258,13 @@ static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { // this is a blocking operator static SSDataBlock* doAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } SAggOperatorInfo* pAggInfo = pOperator->info; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQuery* pQuery = pRuntimeEnv->pQuery; @@ -6277,7 +6278,7 @@ static SSDataBlock* doAggregate(void* param) { break; } - setTagVal_rv(pOperator, pQuery->current->pTable, pAggInfo->pCtx, pOperator->numOfOutput); + setTagVal_rv(pOperator, pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // TODO opt perf if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { @@ -6286,36 +6287,38 @@ static SSDataBlock* doAggregate(void* param) { } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); - doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); + doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock); } - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - finalizeQueryResult_rv(pOperator, pAggInfo->pCtx, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); - pAggInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput); + finalizeQueryResult_rv(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); + pInfo->pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); - return pAggInfo->pRes; + return pInfo->pRes; } static SSDataBlock* doSTableAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } SAggOperatorInfo* pAggInfo = pOperator->info; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (pOperator->completed == OP_RES_TO_RETURN) { - toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes); + if (pOperator->status == OP_RES_TO_RETURN) { + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); - if (pAggInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = OP_EXEC_DONE; + if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; } - return pAggInfo->pRes; + return pInfo->pRes; } SQuery* pQuery = pRuntimeEnv->pQuery; @@ -6329,7 +6332,7 @@ static SSDataBlock* doSTableAggregate(void* param) { break; } - setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pAggInfo->pCtx, pOperator->numOfOutput); + setTagVal_rv(pOperator, pRuntimeEnv->pQuery->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // TODO opt perf if (strncasecmp(upstream->name, "BidirectionSeqScanTableOp", strlen("BidirectionSeqScanTableOp")) == 0) { @@ -6338,25 +6341,26 @@ static SSDataBlock* doSTableAggregate(void* param) { } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pAggInfo->pCtx, pBlock, order); + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); TSKEY k = (pQuery->order.order == TSDB_ORDER_ASC)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; - setExecutionContext_rv(pRuntimeEnv, pAggInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k); - doAggregateImpl(pOperator, pQuery->window.skey, pAggInfo->pCtx, pBlock); + setExecutionContext_rv(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k); + doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock); } - pOperator->completed = OP_RES_TO_RETURN; - closeAllResultRows(&pAggInfo->resultRowInfo); + pOperator->status = OP_RES_TO_RETURN; + closeAllResultRows(&pInfo->resultRowInfo); - updateWindowResNumOfRes_rv(pRuntimeEnv, pAggInfo->pCtx, pOperator->numOfOutput, &pAggInfo->resultRowInfo, pAggInfo->rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pAggInfo->resultRowInfo, 0); + updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, + pInfo->rowCellInfoOffset); + initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0); - toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pAggInfo->pRes); - if (pAggInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = OP_EXEC_DONE; + toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); + if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; } - return pAggInfo->pRes; + return pInfo->pRes; } static SSDataBlock* doArithmeticOperation(void* param) { @@ -6398,7 +6402,7 @@ static SSDataBlock* doArithmeticOperation(void* param) { static SSDataBlock* doLimit(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6407,7 +6411,7 @@ static SSDataBlock* doLimit(void* param) { SSDataBlock* pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; return NULL; } @@ -6417,7 +6421,7 @@ static SSDataBlock* doLimit(void* param) { pInfo->total = pInfo->limit; setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED); - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; } else { pInfo->total += pBlock->info.rows; } @@ -6428,7 +6432,7 @@ static SSDataBlock* doLimit(void* param) { // TODO add log static SSDataBlock* doOffset(void* param) { SOperatorInfo *pOperator = (SOperatorInfo *)param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6438,7 +6442,7 @@ static SSDataBlock* doOffset(void* param) { SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; return NULL; } @@ -6465,18 +6469,18 @@ static SSDataBlock* doOffset(void* param) { static SSDataBlock* doHashIntervalAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info; + STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (pOperator->completed == OP_RES_TO_RETURN) { + if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; } return pIntervalInfo->pRes; @@ -6503,7 +6507,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { pQuery->order.order = order; pQuery->window = win; - pOperator->completed = OP_RES_TO_RETURN; + pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pIntervalInfo->resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset); @@ -6512,7 +6516,7 @@ static SSDataBlock* doHashIntervalAgg(void* param) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; } return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes; @@ -6520,17 +6524,17 @@ static SSDataBlock* doHashIntervalAgg(void* param) { static SSDataBlock* doSTableIntervalAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SHashIntervalOperatorInfo* pIntervalInfo = pOperator->info; + STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (pOperator->completed == OP_RES_TO_RETURN) { + if (pOperator->status == OP_RES_TO_RETURN) { copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; } return pIntervalInfo->pRes; @@ -6557,14 +6561,14 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { hashIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pIntervalInfo, pBlock, pTableQueryInfo->groupIndex); } - pOperator->completed = OP_RES_TO_RETURN; + pOperator->status = OP_RES_TO_RETURN; pQuery->order.order = order; // TODO : restore the order doCloseAllTimeWindow(pRuntimeEnv); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); copyResToQueryResultBuf_rv(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; } return pIntervalInfo->pRes; @@ -6572,18 +6576,18 @@ static SSDataBlock* doSTableIntervalAgg(void* param) { static SSDataBlock* doHashGroupbyAgg(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SHashGroupbyOperatorInfo *pInfo = pOperator->info; + SGroupbyOperatorInfo *pInfo = pOperator->info; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - if (pOperator->completed == OP_RES_TO_RETURN) { + if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; } return pInfo->binfo.pRes; @@ -6607,7 +6611,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { hashGroupbyAgg(pRuntimeEnv, pOperator, pInfo, pBlock); } - pOperator->completed = OP_RES_TO_RETURN; + pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); setQueryStatus(pRuntimeEnv->pQuery, QUERY_COMPLETED); @@ -6621,7 +6625,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; } return pInfo->binfo.pRes; @@ -6629,7 +6633,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) { static SSDataBlock* doFill(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -6645,7 +6649,7 @@ static SSDataBlock* doFill(void* param) { SSDataBlock *pBlock = pOperator->upstream->exec(pOperator->upstream); if (pBlock == NULL) { if (pInfo->totalInputRows == 0) { - pOperator->completed = OP_EXEC_DONE; + pOperator->status = OP_EXEC_DONE; return NULL; } @@ -6693,24 +6697,24 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { static SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, 1); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, 1); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); - int64_t seed = rand(); - setDefaultOutputBuf(pRuntimeEnv, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->pRes, pInfo->rowCellInfoOffset, seed); + pInfo->seed = rand(); + setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes, + pInfo->binfo.rowCellInfoOffset, pInfo->seed); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; pOperator->blockingOptr = true; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->seed = seed; // TODO refactor: seed to move to pInfo?? pOperator->exec = doAggregate; pOperator->cleanup = destroyBasicOperatorInfo; @@ -6738,7 +6742,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { } static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { - SHashGroupbyOperatorInfo* pInfo = (SHashGroupbyOperatorInfo*) param; + SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } @@ -6750,14 +6754,14 @@ static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, 1); - pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); - initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, 1); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "STableAggregate"; pOperator->blockingOptr = true; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->upstream = upstream; pOperator->pExpr = pExpr; @@ -6773,26 +6777,26 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); - int64_t seed = rand(); + pInfo->seed = rand(); pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; - SOptrBasicInfo* pBInfo = &pInfo->binfo; + SOptrBasicInfo* pBInfo = &pInfo->binfo; pBInfo->pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - setDefaultOutputBuf(pRuntimeEnv, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->pRes, pBInfo->rowCellInfoOffset, seed); + setDefaultOutputBuf(pRuntimeEnv, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->pRes, pBInfo->rowCellInfoOffset, + pInfo->seed); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ArithmeticOp"; pOperator->blockingOptr = false; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->seed = seed; pOperator->exec = doArithmeticOperation; pOperator->cleanup = destroyArithOperatorInfo; @@ -6808,7 +6812,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI pOperator->name = "LimitOp"; pOperator->blockingOptr = false; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->exec = doLimit; pOperator->info = pInfo; @@ -6825,7 +6829,7 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pOperator->name = "OffsetOp"; pOperator->blockingOptr = false; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->exec = doOffset; pOperator->info = pInfo; @@ -6835,7 +6839,7 @@ SOperatorInfo* createOffsetOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator } SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { - SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); + STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); @@ -6845,7 +6849,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe pOperator->name = "HashIntervalAgg"; pOperator->blockingOptr = true; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -6859,7 +6863,7 @@ SOperatorInfo* createIntervalAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe } SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { - SHashIntervalOperatorInfo* pInfo = calloc(1, sizeof(SHashIntervalOperatorInfo)); + STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); @@ -6868,7 +6872,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "STableIntervalAggOp"; pOperator->blockingOptr = true; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -6882,7 +6886,7 @@ SOperatorInfo* createStableIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, S } SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { - SHashGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SHashGroupbyOperatorInfo)); + SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); pInfo->colIndex = -1; // group by column index pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); @@ -6892,7 +6896,7 @@ SOperatorInfo* createHashGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "HashGroupbyAgg"; pOperator->blockingOptr = true; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -6914,7 +6918,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->name = "FillOp"; pOperator->blockingOptr = false; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->upstream = upstream; pOperator->pExpr = pExpr; pOperator->numOfOutput = numOfOutput; @@ -6929,35 +6933,32 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn static SSDataBlock* doTagScan(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->completed == OP_EXEC_DONE) { + if (pOperator->status == OP_EXEC_DONE) { return NULL; } - STagScanInfo *pTagScanInfo = pOperator->info; - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); - assert(numOfGroup == 0 || numOfGroup == 1); - if (numOfGroup == 0) { - return NULL; - } + int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; - SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); - - size_t num = taosArrayGetSize(pa); - assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables); + STagScanInfo *pInfo = pOperator->info; + SSDataBlock *pRes = pInfo->pRes; int32_t count = 0; -// int32_t functionId = pOperator->pExpr[0].base.functionId; - /*if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id + SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); + + int32_t functionId = pOperator->pExpr[0].base.functionId; + if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id + SQuery* pQuery = pRuntimeEnv->pQuery; assert(pQuery->numOfOutput == 1); SExprInfo* pExprInfo = &pOperator->pExpr[0]; int32_t rsize = pExprInfo->bytes; + count = 0; int16_t bytes = pExprInfo->bytes; - int16_t type = pExprInfo->type; + int16_t type = pExprInfo->type; for(int32_t i = 0; i < pQuery->numOfTags; ++i) { if (pQuery->tagColList[i].colId == pExprInfo->base.colInfo.colId) { @@ -6967,11 +6968,13 @@ static SSDataBlock* doTagScan(void* param) { } } - while(pRuntimeEnv->tableIndex < num && count < pRuntimeEnv->resultInfo.capacity) { - int32_t i = pRuntimeEnv->tableIndex++; + SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0); + + while(pInfo->currentIndex < pInfo->totalTables && count < maxNumOfTables) { + int32_t i = pInfo->currentIndex++; STableQueryInfo *item = taosArrayGetP(pa, i); - char *output = pQuery->sdata[0]->data + count * rsize; + char *output = pColInfo->pData + count * rsize; varDataSetLen(output, rsize - VARSTR_HEADER_SIZE); output = varDataVal(output); @@ -6989,32 +6992,30 @@ static SSDataBlock* doTagScan(void* param) { *(int32_t *)output = pQuery->vgId; output += sizeof(pQuery->vgId); + char* data = NULL; if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) { - char* data = tsdbGetTableName(item->pTable); - memcpy(output, data, varDataTLen(data)); + data = tsdbGetTableName(item->pTable); } else { - char* data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes); - doSetTagValueToResultBuf(output, data, type, bytes); + data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes); } + doSetTagValueToResultBuf(output, data, type, bytes); count += 1; } qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pRuntimeEnv->qinfo, count); - - } else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query + } /*else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query *(int64_t*) pQuery->sdata[0]->data = num; count = 1; SET_STABLE_QUERY_OVER(pRuntimeEnv); qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pRuntimeEnv->qinfo, count); - } else*/ { // return only the tags|table name etc. - count = 0; - int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; - SExprInfo* pExprInfo = pOperator->pExpr; + }*/ else { // return only the tags|table name etc. + SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo - while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) { - int32_t i = pRuntimeEnv->tableIndex++; + count = 0; + while(pInfo->currentIndex < pInfo->totalTables && count < maxNumOfTables) { + int32_t i = pInfo->currentIndex++; STableQueryInfo* item = taosArrayGetP(pa, i); @@ -7026,7 +7027,7 @@ static SSDataBlock* doTagScan(void* param) { continue; } - SColumnInfoData* pColInfo = taosArrayGet(pTagScanInfo->pRes->pDataBlock, j); + SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, j); type = pExprInfo[j].type; bytes = pExprInfo[j].bytes; @@ -7043,21 +7044,27 @@ static SSDataBlock* doTagScan(void* param) { count += 1; } - pTagScanInfo->pRes->info.rows = count; + pRes->info.rows = count; qDebug("QInfo:%p create tag values results completed, rows:%d", pRuntimeEnv->qinfo, count); } - return (pTagScanInfo->pRes->info.rows == 0)? NULL:pTagScanInfo->pRes; + return (pRes->info.rows == 0)? NULL:pInfo->pRes; } SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); + assert(numOfGroup == 0 || numOfGroup == 1); + + pInfo->totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables; + pInfo->currentIndex = 0; + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "SeqTagScanOp"; + pOperator->name = "SeqTableTagScan"; pOperator->blockingOptr = false; - pOperator->completed = OP_IN_EXECUTING; + pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->exec = doTagScan; pOperator->pExpr = pExpr; @@ -7088,57 +7095,18 @@ void tableQueryImpl(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; -#if 0 - if (hasNotReturnedResults(pRuntimeEnv, &pRuntimeEnv->groupResInfo)) { - if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) { - /* - * There are remain results that are not returned due to result interpolation - * So, we do keep in this procedure instead of launching retrieve procedure for next results. - */ -// pRuntimeEnv->resultInfo.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata); - if (pRuntimeEnv->resultInfo.rows > 0) { - limitOperator(pQuery, pQInfo); - qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pRuntimeEnv->resultInfo.rows, pRuntimeEnv->resultInfo.total); - } else { - return copyAndFillResult(pQInfo); - } - - } else { - pRuntimeEnv->resultInfo.rows = 0; - assert(pRuntimeEnv->resultRowInfo.size > 0); - copyToOutputBuf(pRuntimeEnv, &pRuntimeEnv->resultRowInfo); - doSecondaryArithmeticProcess(pQuery); - - if (pRuntimeEnv->resultInfo.rows > 0) { - limitOperator(pQuery, pQInfo); - } - - if (pRuntimeEnv->resultInfo.rows > 0) { - qDebug("QInfo:%p %" PRId64 " rows returned from group results, total:%" PRId64 "", pQInfo, pRuntimeEnv->resultInfo.rows, - pRuntimeEnv->resultInfo.total); - } else { - qDebug("QInfo:%p query over, %" PRId64 " rows are returned", pQInfo, pRuntimeEnv->resultInfo.total); - } - } - - return; - } -#endif - // number of points returned during this query int64_t st = taosGetTimestampUs(); assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); - SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0); - STableQueryInfo* item = taosArrayGetP(g, 0); - pQuery->current = item; + SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0); + pQuery->current = taosArrayGetP(g, 0); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); // record the total elapsed time pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); - assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1); } void buildTableBlockDistResult(SQInfo *pQInfo) { @@ -8478,7 +8446,6 @@ void buildTagQueryResult(SQInfo* pQInfo) { } pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot); - return; #if 0 SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0); diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 81221ed37a..266dd064ab 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -246,13 +246,12 @@ bool qTableQuery(qinfo_t qinfo) { if (isQueryKilled(pQInfo)) { qDebug("QInfo:%p query is killed", pQInfo); - } else if (pRuntimeEnv->outputBuf->info.rows == 0) { + } else if (GET_NUM_OF_RESULTS(pRuntimeEnv) == 0) { qDebug("QInfo:%p over, %" PRIzu " tables queried, %"PRId64" rows are returned", pQInfo, pRuntimeEnv->tableqinfoGroupInfo.numOfTables, pRuntimeEnv->resultInfo.total); } else { qDebug("QInfo:%p query paused, %d rows returned, numOfTotal:%" PRId64 " rows", - pQInfo, pRuntimeEnv->outputBuf->info.rows, - pRuntimeEnv->resultInfo.total + pRuntimeEnv->outputBuf->info.rows); + pQInfo, GET_NUM_OF_RESULTS(pRuntimeEnv), pRuntimeEnv->resultInfo.total + GET_NUM_OF_RESULTS(pRuntimeEnv)); } return doBuildResCheck(pQInfo); @@ -289,7 +288,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex if (pQInfo->dataReady == QUERY_RESULT_READY) { *buildRes = true; qDebug("QInfo:%p retrieve result info, rowsize:%d, rows:%d, code:%s", pQInfo, pQuery->resultRowSize, - pRuntimeEnv->outputBuf->info.rows, tstrerror(pQInfo->code)); + GET_NUM_OF_RESULTS(pRuntimeEnv), tstrerror(pQInfo->code)); } else { *buildRes = false; qDebug("QInfo:%p retrieve req set query return result after paused", pQInfo); @@ -313,7 +312,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co SQuery *pQuery = pQInfo->runtimeEnv.pQuery; SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - int64_t s = pRuntimeEnv->outputBuf->info.rows; + int64_t s = GET_NUM_OF_RESULTS(pRuntimeEnv); size_t size = getResultSize(pQInfo, &s); @@ -339,7 +338,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co } (*pRsp)->precision = htons(pQuery->precision); - if (pQInfo->runtimeEnv.outputBuf->info.rows > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { + if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { doDumpQueryResult(pQInfo, (*pRsp)->data); } else { setQueryStatus(pQuery, QUERY_OVER);