diff --git a/2.0/src/query/inc/qExecutor.h b/2.0/src/query/inc/qExecutor.h index 0fdc441289..9c738dad98 100644 --- a/2.0/src/query/inc/qExecutor.h +++ b/2.0/src/query/inc/qExecutor.h @@ -582,9 +582,9 @@ typedef struct SOrderOperatorInfo { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); -SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); +SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); diff --git a/2.0/src/query/src/qExecutor.c b/2.0/src/query/src/qExecutor.c index 20ea91eceb..a8e0ee4968 100644 --- a/2.0/src/query/src/qExecutor.c +++ b/2.0/src/query/src/qExecutor.c @@ -4835,11 +4835,11 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr break; } case OP_TableSeqScan: { - pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); + pRuntimeEnv->proot = createTableSeqScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv); break; } case OP_DataBlocksOptScan: { - pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); + pRuntimeEnv->proot = createTableScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0); break; } case OP_TableScan: { @@ -5162,7 +5162,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* return pOperator; } -SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pTsdbReadHandle = pTsdbQueryHandle; @@ -5267,7 +5267,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf } } -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f46e00cd25..6b1aae6c0c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -274,9 +274,9 @@ typedef struct STaskRuntimeEnv { SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer - SResultRowPool* - pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. + // The window result objects pool, all the resultRow Objects are allocated and managed by this object. char** prevRow; + SResultRowPool* pool; SArray* prevResult; // intermediate result, SArray STSBuf* pTsBuf; // timestamp filter list @@ -440,13 +440,13 @@ typedef struct SOptrBasicInfo { SqlFunctionCtx* pCtx; SSDataBlock* pRes; uint32_t resRowSize; + int32_t capacity; } SOptrBasicInfo; typedef struct SOptrBasicInfo STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { SOptrBasicInfo binfo; - uint32_t seed; SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not @@ -455,6 +455,8 @@ typedef struct SAggOperatorInfo { SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. STableQueryInfo *current; uint32_t groupId; + SGroupResInfo *pGroupResInfo; + STableQueryInfo *pTableQueryInfo; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { @@ -591,9 +593,9 @@ typedef struct SOrderOperatorInfo { } SOrderOperatorInfo; SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); +SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, @@ -672,8 +674,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); -STableQueryInfo* createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, - void* buf); +STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win); STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index cba912bc60..54917eb14c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -215,7 +215,7 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } -static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); +static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); @@ -228,8 +228,7 @@ static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo); - SArray* getOrderCheckColumns(STaskAttr* pQuery); - +SArray* getOrderCheckColumns(STaskAttr* pQuery); typedef struct SRowCompSupporter { STaskRuntimeEnv *pRuntimeEnv; @@ -2007,7 +2006,7 @@ static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprI return pFuncCtx; } -static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowCellInfoOffset) { +static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowCellInfoOffset, uint32_t* pRowSize) { size_t numOfOutput = taosArrayGetSize(pExprInfo); SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)calloc(numOfOutput, sizeof(SqlFunctionCtx)); @@ -2105,6 +2104,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SArray* pExprInfo, int32_t** rowC for(int32_t i = 1; i < numOfOutput; ++i) { SExprInfo* pExpr = taosArrayGetP(pExprInfo, i - 1); (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr->base.interBytes); + *pRowSize += pExpr->base.resSchema.bytes; } setCtxTagColumnInfo(pFuncCtx, numOfOutput); @@ -3250,8 +3250,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SqlFunctionCtx* pCt } } -void copyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) { - SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo; +void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) { pBlock->info.rows = 0; int32_t code = TSDB_CODE_SUCCESS; @@ -3259,12 +3258,12 @@ void copyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlo // all results in current group have been returned to client, try next group if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) { assert(pGroupResInfo->index == 0); - if ((code = mergeIntoGroupResult(&pRuntimeEnv->groupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) { +// if ((code = mergeIntoGroupResult(&pGroupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) { return; - } +// } } - doCopyToSDataBlock(pRuntimeEnv, pGroupResInfo, TSDB_ORDER_ASC, pBlock); +// doCopyToSDataBlock(pResBuf, pGroupResInfo, TSDB_ORDER_ASC, pBlock, ); // current data are all dumped to result buffer, clear it if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { @@ -3275,9 +3274,9 @@ void copyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlo } // enough results in data buffer, return - if (pBlock->info.rows >= threshold) { - break; - } +// if (pBlock->info.rows >= threshold) { +// break; +// } } } @@ -3381,7 +3380,7 @@ void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, in initCtxOutputBuffer(pCtx, pDataBlock->info.numOfCols); } -void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int64_t uid, int32_t stage, SExecTaskInfo* pTaskInfo) { +void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int32_t stage, SExecTaskInfo* pTaskInfo) { SOptrBasicInfo *pInfo = &pAggInfo->binfo; SqlFunctionCtx* pCtx = pInfo->pCtx; @@ -3390,8 +3389,10 @@ void setDefaultOutputBuf_rv(SAggOperatorInfo* pAggInfo, int64_t uid, int32_t sta SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; int64_t tid = 0; + int64_t groupId = 0; + pAggInfo->keyBuf = realloc(pAggInfo->keyBuf, sizeof(tid) + sizeof(int64_t) + POINTER_BYTES); - SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid, pTaskInfo, false, pAggInfo); + SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pAggInfo); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); @@ -3597,24 +3598,19 @@ static bool hasMainOutput(STaskAttr *pQueryAttr) { return false; } -STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf) { +STableQueryInfo *createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) { STableQueryInfo *pTableQueryInfo = buf; - -// pTableQueryInfo->win = win; pTableQueryInfo->lastKey = win.skey; -// pTableQueryInfo->pTable = pTable; -// pTableQueryInfo->cur.vgroupIndex = -1; - // set more initial size of interval/groupby query - if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) || groupbyColumn) { +// if (/*QUERY_IS_INTERVAL_QUERY(pQueryAttr) || */groupbyColumn) { int32_t initialSize = 128; int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize, TSDB_DATA_TYPE_INT); if (code != TSDB_CODE_SUCCESS) { return NULL; } - } else { // in other aggregate query, do not initialize the windowResInfo - } +// } else { // in other aggregate query, do not initialize the windowResInfo +// } return pTableQueryInfo; } @@ -3962,9 +3958,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) { * @param result */ -static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) { - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - +static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -3990,13 +3984,13 @@ static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* p } int32_t numOfRowsToCopy = pRow->numOfRows; - if (numOfResult + numOfRowsToCopy >= pRuntimeEnv->resultInfo.capacity) { + if (numOfResult + numOfRowsToCopy >= rowCapacity) { break; } pGroupResInfo->index += 1; - SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pRow->pageId); + SFilePage *page = getBufPage(pBuf, pRow->pageId); int32_t offset = 0; for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { @@ -4004,14 +3998,14 @@ static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* p int32_t bytes = pColInfoData->info.bytes; char *out = pColInfoData->pData + numOfResult * bytes; - char *in = getPosInResultPage(pQueryAttr, page, pRow->offset, offset); + char *in = getPosInResultPage_rv(page, pRow->offset, offset); memcpy(out, in, bytes * numOfRowsToCopy); offset += bytes; } numOfResult += numOfRowsToCopy; - if (numOfResult == pRuntimeEnv->resultInfo.capacity) { // output buffer is full + if (numOfResult == rowCapacity) { // output buffer is full break; } } @@ -4029,9 +4023,8 @@ static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntime return; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC; - doCopyToSDataBlock(pRuntimeEnv, pGroupResInfo, orderType, pBlock); + doCopyToSDataBlock(NULL, pGroupResInfo, orderType, pBlock, 0); // refactor : extract method SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); @@ -5359,7 +5352,7 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { return pResBlock; } -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); @@ -5391,7 +5384,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, return pOperator; } -SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pTsdbReadHandle = pTsdbReadHandle; @@ -6557,7 +6550,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { if (pOperator->status == OP_RES_TO_RETURN) { int64_t st = taosGetTimestampUs(); - copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); +// copyToSDataBlock(NULL, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -6597,7 +6590,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { doCloseAllTimeWindow(pRuntimeEnv); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); +// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -6615,7 +6608,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { - copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); +// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -6653,7 +6646,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); int64_t st = taosGetTimestampUs(); - copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); +// copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -7058,24 +7051,31 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows) { pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); pInfo->pResultRowHashTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pInfo->pResultRowListSet = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - pInfo->keyBuf = malloc(1024 + sizeof(int64_t) + POINTER_BYTES); // TODO: pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo)); pInfo->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); + // TODO: number of tables + pInfo->pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); + + STimeWindow win = {0, INT64_MAX}; + createTableQueryInfo(pInfo->pTableQueryInfo, false, win); + + return TSDB_CODE_SUCCESS; } static SExprInfo* exprArrayDup(SArray* pExprInfo) { - size_t numOfOutput = taosArrayGetSize(pExprInfo); - SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); - for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { - SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); - assignExprInfo(&p[i], pExpr); - } + size_t numOfOutput = taosArrayGetSize(pExprInfo); + SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); + for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); + assignExprInfo(&p[i], pExpr); + } + + return p; } SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { @@ -7085,9 +7085,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); initAggInfo(pInfo, pExprInfo, numOfRows); - - pInfo->seed = rand(); - setDefaultOutputBuf_rv(pInfo, pInfo->seed, MAIN_SCAN, pTaskInfo); + setDefaultOutputBuf_rv(pInfo, MAIN_SCAN, pTaskInfo); size_t numOfOutput = taosArrayGetSize(pExprInfo); SExprInfo* p = exprArrayDup(pExprInfo); @@ -8055,7 +8053,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask tsdbReaderT pDataReader = doCreateDataReader((STableScanPhyNode*) pPhyNode, pHandle, (uint64_t) queryId, taskId); - return createDataBlocksOptScanInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); + return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (pPhyNode->info.type == OP_Exchange) { SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode; return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);