diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index 9007fe0b49..461547ad79 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -128,6 +128,8 @@ int32_t getPageId(const SPageInfo* pPgInfo); */ int32_t getBufPageSize(const SDiskbasedBuf* pBuf); +int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf); + /** * * @param pBuf diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index c0f0cf4dfe..f465f09ee1 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -25,7 +25,7 @@ #include "tglobal.h" #include "tmsgtype.h" #include "tnote.h" -#include "tpagedfile.h" +#include "tpagedbuf.h" #include "tref.h" struct tmq_list_t { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 02bacb15ca..c845d3d5ab 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -100,9 +100,9 @@ typedef struct STableQueryInfo { TSKEY lastKey; int32_t groupIndex; // group id in table list SVariant tag; - STimeWindow win; // todo remove it later - STSCursor cur; - void* pTable; // for retrieve the page id list +// STimeWindow win; // todo remove it later +// STSCursor cur; +// void* pTable; // for retrieve the page id list SResultRowInfo resInfo; } STableQueryInfo; @@ -454,8 +454,7 @@ typedef struct SAggOperatorInfo { SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer - SResultRowPool* - pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. + SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. STableQueryInfo* current; } SAggOperatorInfo; @@ -586,7 +585,8 @@ typedef struct SOrderOperatorInfo { SDiskbasedBuf *pSortInternalBuf; SMultiwayMergeTreeInfo *pMergeTree; SArray *pSources; // SArray - int32_t capacity; + int32_t bufPageSize; + int32_t numOfRowsInRes; SMsortComparParam cmpParam; } SOrderOperatorInfo; @@ -596,6 +596,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); @@ -609,8 +610,7 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf int32_t numOfOutput, bool multigroupResult); SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, - SExprInfo* pExpr, int32_t numOfOutput); + SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ffc39f24d2..a8a37b89d1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2780,7 +2780,7 @@ void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo int8_t *p = calloc(numOfRows, sizeof(int8_t)); bool all = true; - +#if 0 if (pRuntimeEnv->pTsBuf != NULL) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); @@ -2808,6 +2808,7 @@ void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo } else { all = doFilterDataBlock(pFilterInfo, numOfFilterCols, numOfRows, p); } +#endif if (!all) { doCompactSDataBlock(pBlock, numOfRows, p); @@ -2846,7 +2847,7 @@ void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, } // save the cursor status - pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); +// pRuntimeEnv->current->cur = tsBufGetCursor(pRuntimeEnv->pTsBuf); } else { // all = filterExecute(pRuntimeEnv->pQueryAttr->pFilters, numOfRows, &p, pBlock->pBlockAgg, pRuntimeEnv->pQueryAttr->numOfCols); } @@ -3286,11 +3287,11 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) return; } - TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY); - pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; +// TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY); +// pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; - SWITCH_ORDER(pTableQueryInfo->cur.order); - pTableQueryInfo->cur.vgroupIndex = -1; +// SWITCH_ORDER(pTableQueryInfo->cur.order); +// pTableQueryInfo->cur.vgroupIndex = -1; // set the index to be the end slot of result rows array SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo; @@ -3600,11 +3601,11 @@ static bool hasMainOutput(STaskAttr *pQueryAttr) { STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf) { STableQueryInfo *pTableQueryInfo = buf; - pTableQueryInfo->win = win; +// pTableQueryInfo->win = win; pTableQueryInfo->lastKey = win.skey; - pTableQueryInfo->pTable = pTable; - pTableQueryInfo->cur.vgroupIndex = -1; +// pTableQueryInfo->pTable = pTable; +// pTableQueryInfo->cur.vgroupIndex = -1; // set more initial size of interval/groupby query if (QUERY_IS_INTERVAL_QUERY(pQueryAttr) || groupbyColumn) { @@ -3622,12 +3623,9 @@ STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) { STableQueryInfo* pTableQueryInfo = calloc(1, sizeof(STableQueryInfo)); - pTableQueryInfo->win = win; +// pTableQueryInfo->win = win; pTableQueryInfo->lastKey = win.skey; - pTableQueryInfo->pTable = NULL; - pTableQueryInfo->cur.vgroupIndex = -1; - // set more initial size of interval/groupby query int32_t initialSize = 16; int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize, TSDB_DATA_TYPE_INT); @@ -3773,7 +3771,7 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; assert(pRuntimeEnv->pTsBuf != NULL); - +#if 0 // both the master and supplement scan needs to set the correct ts comp start position if (pTableQueryInfo->cur.vgroupIndex == -1) { taosVariantAssign(&pTableQueryInfo->tag, pTag); @@ -3807,7 +3805,7 @@ int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, S //qDebug("QInfo:0x%"PRIx64" find tag:%"PRId64" start pos in ts_comp, blockIndex:%d, tsIndex:%d", GET_TASKID(pRuntimeEnv), pTag->i, pTableQueryInfo->cur.blockIndex, pTableQueryInfo->cur.tsIndex); } } - +#endif return 0; } @@ -3897,7 +3895,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) { return; } - pTableQueryInfo->win.skey = key; +// pTableQueryInfo->win.skey = key; STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey}; /** @@ -3920,7 +3918,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) { // pResultRowInfo->prevSKey = w.skey; // } - pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; +// pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; } /** @@ -4645,7 +4643,8 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr } static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { - if (order == TSDB_ORDER_ASC) { +#if 0 + if (order == TSDB_ORDER_ASC) { assert( (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) && (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) && @@ -4656,6 +4655,8 @@ static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQuer (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) && (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey)); } +#endif + } //STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) { @@ -5800,6 +5801,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa pInfo->numOfCompleted += 1; pSource->rowIndex = -1; pSource->pageIndex = -1; + pSource->pBlock = blockDataDestroy(pSource->pBlock); } else { SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -5872,8 +5874,16 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, int32_t numOfCols) { pInfo->sourceId += 1; pInfo->cmpParam.numOfSources += 1; + if (pInfo->cmpParam.numOfSources > getNumOfInMemBufPages(pInfo->pSortInternalBuf)) { + // TODO sort memory not enough, return with error code. + } + ASSERT(pInfo->cmpParam.numOfSources == taosArrayGetSize(pInfo->pSources)); - return blockDataEnsureCapacity(pSource->pBlock, pInfo->capacity); + + int32_t rowSize = blockDataGetRowSize(pSource->pBlock); + int32_t numOfRows = getBufPageSize(pInfo->pSortInternalBuf)/rowSize; + + return blockDataEnsureCapacity(pSource->pBlock, numOfRows); } void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { @@ -5952,7 +5962,7 @@ static SSDataBlock* getSortedBlockData(SExecTaskInfo* pTaskInfo, SOrderOperatorI longjmp(pTaskInfo->env, code); } - if (pInfo->pDataBlock->info.rows >= pInfo->capacity) { + if (pInfo->pDataBlock->info.rows >= pInfo->numOfRowsInRes) { return pInfo->pDataBlock; } } @@ -6014,7 +6024,9 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { addToDiskbasedBuf(pInfo, pTaskInfo->env); } - blockDataEnsureCapacity(pInfo->pDataBlock, pInfo->capacity); + int32_t rowSize = blockDataGetRowSize(pInfo->pDataBlock); + int32_t numOfRows = getBufPageSize(pInfo->pSortInternalBuf)/rowSize; + blockDataEnsureCapacity(pInfo->pDataBlock, numOfRows); int32_t code = sortComparInit(&pInfo->cmpParam, pInfo); if (code != TSDB_CODE_SUCCESS) { @@ -6064,8 +6076,9 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI } pInfo->sortBufSize = 1024 * 1024 * 50; // 1MB - pInfo->capacity = 64*1024; - pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity); + pInfo->bufPageSize = 64 * 1024; + pInfo->numOfRowsInRes = 4096; + pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes); pInfo->pSources = taosArrayInit(4, POINTER_BYTES); pInfo->cmpParam.orderInfo = createBlockOrder(pExprInfo, pOrderVal); @@ -6077,7 +6090,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI } } - int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->capacity, pInfo->capacity*1000, 1, "/tmp/"); + int32_t code = createDiskbasedBuffer(&pInfo->pSortInternalBuf, pInfo->bufPageSize, pInfo->bufPageSize*1000, 1, "/tmp/"); if (pInfo->pSources == NULL || code != 0 || pInfo->cmpParam.orderInfo == NULL || pInfo->pDataBlock == NULL) { tfree(pOperator); destroyOrderOperatorInfo(pInfo, taosArrayGetSize(pExprInfo)); @@ -6142,7 +6155,7 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { return (pInfo->pRes->info.rows != 0)? pInfo->pRes:NULL; } -static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { +static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -6177,7 +6190,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { break; } - setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // if (downstream->operatorType == OP_DataBlocksOptScan) { // STableScanInfo* pScanInfo = downstream->info; @@ -6237,7 +6250,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // todo dynamic set tags if (pTableQueryInfo != NULL) { - setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); } // the pDataBlock are always the same one, no need to call this again @@ -6287,7 +6300,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // todo dynamic set tags if (pTableQueryInfo != NULL) { - setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); } // the pDataBlock are always the same one, no need to call this again @@ -6421,7 +6434,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { break; } - setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); @@ -6481,7 +6494,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { break; } - setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); @@ -6547,7 +6560,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; - setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); @@ -6602,7 +6615,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; - setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order); setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey); @@ -6850,7 +6863,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pRuntimeEnv->pQueryAttr->order.order); - setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); +// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); if (pInfo->colIndex == -1) { pInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock); } @@ -7018,31 +7031,41 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { tfree(pOperator); } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { - SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - - int32_t numOfRows = 1;//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); - - size_t numOfOutput = taosArrayGetSize(pExprInfo); +static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t numOfRows) { pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); pInfo->pResultRowHashTable = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pInfo->pResultRowListSet = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - pInfo->keyBuf = malloc(1024 + sizeof(int64_t) + POINTER_BYTES); // TODO: - pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo)); + pInfo->keyBuf = malloc(1024 + sizeof(int64_t) + POINTER_BYTES); // TODO: + pInfo->pool = initResultRowPool(getResultRowSize(pExprInfo)); pInfo->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); +} + +static SExprInfo* exprArrayDup(SArray* pExprInfo) { + size_t numOfOutput = taosArrayGetSize(pExprInfo); + SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); + for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); + assignExprInfo(&p[i], pExpr); + } +} + +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { + SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); + + int32_t numOfRows = 1; + //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); + + initAggInfo(pInfo, pExprInfo, numOfRows); pInfo->seed = rand(); setDefaultOutputBuf_rv(pInfo, pInfo->seed, MAIN_SCAN, pTaskInfo); + size_t numOfOutput = taosArrayGetSize(pExprInfo); - SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); - for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { - SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); - assignExprInfo(&p[i], pExpr); - } + SExprInfo* p = exprArrayDup(pExprInfo); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; @@ -7052,7 +7075,6 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pE pOperator->info = pInfo; pOperator->pExpr = p; pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = NULL; pOperator->pTaskInfo = pTaskInfo; pOperator->exec = doAggregate; @@ -7136,26 +7158,26 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); - - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) tableGroup); - pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); + size_t tableGroup = 1; // GET_NUM_OF_TABLEGROUP(pRuntimeEnv); + int32_t numOfRows = 1; + size_t numOfOutput = taosArrayGetSize(pExprInfo); + initAggInfo(pInfo, pExprInfo, numOfRows); + SExprInfo* p = exprArrayDup(pExprInfo); + // initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup, TSDB_DATA_TYPE_INT); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "MultiTableAggregate"; -// pOperator->operatorType = OP_MultiTableAggregate; + pOperator->operatorType = OP_MultiTableAggregate; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->pExpr = pExpr; + pOperator->pExpr = p; pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doSTableAggregate; + pOperator->exec = doMultiTableAggregate; pOperator->cleanupFn = destroyAggOperatorInfo; appendDownstream(pOperator, downstream); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 8381a7c585..c0e22ad555 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -33,193 +33,311 @@ #include "stub.h" #include "executor.h" -/** -{ - "Id": { - "QueryId": 1.3108161807422521e+19, - "TemplateId": 0, - "SubplanId": 0 - }, - "Node": { - "Name": "TableScan", - "Targets": [{ - "Base": { - "Schema": { - "Type": 9, - "ColId": 5000, - "Bytes": 8 - }, - "Columns": [{ - "TableId": 1, - "Flag": 0, - "Info": { - "ColId": 1, - "Type": 9, - "Bytes": 8 - } - }], - "InterBytes": 0 - }, - "Expr": { - "Type": 4, - "Column": { - "Type": 9, - "ColId": 1, - "Bytes": 8 - } - } - }, { - "Base": { - "Schema": { - "Type": 4, - "ColId": 5001, - "Bytes": 4 - }, - "Columns": [{ - "TableId": 1, - "Flag": 0, - "Info": { - "ColId": 2, - "Type": 4, - "Bytes": 4 - } - }], - "InterBytes": 0 - }, - "Expr": { - "Type": 4, - "Column": { - "Type": 4, - "ColId": 2, - "Bytes": 4 - } - } - }], - "InputSchema": [{ - "Type": 9, - "ColId": 5000, - "Bytes": 8 - }, { - "Type": 4, - "ColId": 5001, - "Bytes": 4 - }], - "TableScan": { - "TableId": 1, - "TableType": 2, - "Flag": 0, - "Window": { - "StartKey": -9.2233720368547758e+18, - "EndKey": 9.2233720368547758e+18 - } - } - }, - "DataSink": { - "Name": "Dispatch", - "Dispatch": { - } - } -} - */ +namespace { +typedef struct SDummyInputInfo { + int32_t max; + int32_t current; + int32_t startVal; + SSDataBlock* pBlock; +} SDummyInputInfo; + +SSDataBlock* getDummyBlock(void* param, bool* newgroup) { + SOperatorInfo* pOperator = static_cast(param); + SDummyInputInfo* pInfo = static_cast(pOperator->info); + if (pInfo->current >= pInfo->max) { + return NULL; + } + + int32_t numOfRows = 1000; + + if (pInfo->pBlock == NULL) { + pInfo->pBlock = static_cast(calloc(1, sizeof(SSDataBlock))); + + pInfo->pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + + SColumnInfoData colInfo = {0}; + colInfo.info.type = TSDB_DATA_TYPE_INT; + colInfo.info.bytes = sizeof(int32_t); + colInfo.info.colId = 1; + colInfo.pData = static_cast(calloc(numOfRows, sizeof(int32_t))); + colInfo.nullbitmap = static_cast(calloc(1, (numOfRows + 7) / 8)); + + taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo); + +// SColumnInfoData colInfo1 = {0}; +// colInfo1.info.type = TSDB_DATA_TYPE_BINARY; +// colInfo1.info.bytes = 40; +// colInfo1.info.colId = 2; +// +// colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t); +// colInfo1.varmeta.length = 0; +// colInfo1.varmeta.offset = static_cast(calloc(1, numOfRows * sizeof(int32_t))); +// +// taosArrayPush(pInfo->pBlock->pDataBlock, &colInfo1); + } else { + blockDataClearup(pInfo->pBlock, true); + } + + SSDataBlock* pBlock = pInfo->pBlock; + + char buf[128] = {0}; + char b1[128] = {0}; + for(int32_t i = 0; i < numOfRows; ++i) { + SColumnInfoData* pColInfo = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 0)); + + int32_t v = rand();//(++pInfo->startVal); + colDataAppend(pColInfo, i, reinterpret_cast(&v), false); + +// sprintf(buf, "this is %d row", i); +// STR_TO_VARSTR(b1, buf); +// +// SColumnInfoData* pColInfo2 = static_cast(TARRAY_GET_ELEM(pBlock->pDataBlock, 1)); +// colDataAppend(pColInfo2, i, b1, false); + } + + pBlock->info.rows = numOfRows; + pBlock->info.numOfCols = 1; + + pInfo->current += 1; + return pBlock; +} + +SOperatorInfo* createDummyOperator(int32_t numOfBlocks) { + SOperatorInfo* pOperator = static_cast(calloc(1, sizeof(SOperatorInfo))); + pOperator->name = "dummyInputOpertor4Test"; + pOperator->exec = getDummyBlock; + + SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); + pInfo->max = numOfBlocks; + pInfo->startVal = 5000000; + + pOperator->info = pInfo; + return pOperator; +} +} int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } TEST(testCase, build_executor_tree_Test) { - - const char* msg = "{\n" - "\t\"Id\":\t{\n" - "\t\t\"QueryId\":\t1.3108161807422521e+19,\n" - "\t\t\"TemplateId\":\t0,\n" - "\t\t\"SubplanId\":\t0\n" - "\t},\n" - "\t\"Node\":\t{\n" - "\t\t\"Name\":\t\"TableScan\",\n" - "\t\t\"Targets\":\t[{\n" - "\t\t\t\t\"Base\":\t{\n" - "\t\t\t\t\t\"Schema\":\t{\n" - "\t\t\t\t\t\t\"Type\":\t9,\n" - "\t\t\t\t\t\t\"ColId\":\t5000,\n" - "\t\t\t\t\t\t\"Bytes\":\t8\n" - "\t\t\t\t\t},\n" - "\t\t\t\t\t\"Columns\":\t[{\n" - "\t\t\t\t\t\t\t\"TableId\":\t1,\n" - "\t\t\t\t\t\t\t\"Flag\":\t0,\n" - "\t\t\t\t\t\t\t\"Info\":\t{\n" - "\t\t\t\t\t\t\t\t\"ColId\":\t1,\n" - "\t\t\t\t\t\t\t\t\"Type\":\t9,\n" - "\t\t\t\t\t\t\t\t\"Bytes\":\t8\n" - "\t\t\t\t\t\t\t}\n" - "\t\t\t\t\t\t}],\n" - "\t\t\t\t\t\"InterBytes\":\t0\n" - "\t\t\t\t},\n" - "\t\t\t\t\"Expr\":\t{\n" - "\t\t\t\t\t\"Type\":\t4,\n" - "\t\t\t\t\t\"Column\":\t{\n" - "\t\t\t\t\t\t\"Type\":\t9,\n" - "\t\t\t\t\t\t\"ColId\":\t1,\n" - "\t\t\t\t\t\t\"Bytes\":\t8\n" - "\t\t\t\t\t}\n" - "\t\t\t\t}\n" - "\t\t\t}, {\n" - "\t\t\t\t\"Base\":\t{\n" - "\t\t\t\t\t\"Schema\":\t{\n" - "\t\t\t\t\t\t\"Type\":\t4,\n" - "\t\t\t\t\t\t\"ColId\":\t5001,\n" - "\t\t\t\t\t\t\"Bytes\":\t4\n" - "\t\t\t\t\t},\n" - "\t\t\t\t\t\"Columns\":\t[{\n" - "\t\t\t\t\t\t\t\"TableId\":\t1,\n" - "\t\t\t\t\t\t\t\"Flag\":\t0,\n" - "\t\t\t\t\t\t\t\"Info\":\t{\n" - "\t\t\t\t\t\t\t\t\"ColId\":\t2,\n" - "\t\t\t\t\t\t\t\t\"Type\":\t4,\n" - "\t\t\t\t\t\t\t\t\"Bytes\":\t4\n" - "\t\t\t\t\t\t\t}\n" - "\t\t\t\t\t\t}],\n" - "\t\t\t\t\t\"InterBytes\":\t0\n" - "\t\t\t\t},\n" - "\t\t\t\t\"Expr\":\t{\n" - "\t\t\t\t\t\"Type\":\t4,\n" - "\t\t\t\t\t\"Column\":\t{\n" - "\t\t\t\t\t\t\"Type\":\t4,\n" - "\t\t\t\t\t\t\"ColId\":\t2,\n" - "\t\t\t\t\t\t\"Bytes\":\t4\n" - "\t\t\t\t\t}\n" - "\t\t\t\t}\n" - "\t\t\t}],\n" - "\t\t\"InputSchema\":\t[{\n" - "\t\t\t\t\"Type\":\t9,\n" - "\t\t\t\t\"ColId\":\t5000,\n" - "\t\t\t\t\"Bytes\":\t8\n" - "\t\t\t}, {\n" - "\t\t\t\t\"Type\":\t4,\n" - "\t\t\t\t\"ColId\":\t5001,\n" - "\t\t\t\t\"Bytes\":\t4\n" - "\t\t\t}],\n" - "\t\t\"TableScan\":\t{\n" - "\t\t\t\"TableId\":\t1,\n" - "\t\t\t\"TableType\":\t2,\n" - "\t\t\t\"Flag\":\t0,\n" - "\t\t\t\"Window\":\t{\n" - "\t\t\t\t\"StartKey\":\t-9.2233720368547758e+18,\n" - "\t\t\t\t\"EndKey\":\t9.2233720368547758e+18\n" - "\t\t\t}\n" - "\t\t}\n" - "\t},\n" - "\t\"DataSink\":\t{\n" - "\t\t\"Name\":\t\"Dispatch\",\n" - "\t\t\"Dispatch\":\t{\n" - "\t\t}\n" - "\t}\n" - "}"; + "\t\"Id\":\t{\n" + "\t\t\"QueryId\":\t1.3108161807422521e+19,\n" + "\t\t\"TemplateId\":\t0,\n" + "\t\t\"SubplanId\":\t0\n" + "\t},\n" + "\t\"Node\":\t{\n" + "\t\t\"Name\":\t\"TableScan\",\n" + "\t\t\"Targets\":\t[{\n" + "\t\t\t\t\"Base\":\t{\n" + "\t\t\t\t\t\"Schema\":\t{\n" + "\t\t\t\t\t\t\"Type\":\t9,\n" + "\t\t\t\t\t\t\"ColId\":\t5000,\n" + "\t\t\t\t\t\t\"Bytes\":\t8\n" + "\t\t\t\t\t},\n" + "\t\t\t\t\t\"Columns\":\t[{\n" + "\t\t\t\t\t\t\t\"TableId\":\t1,\n" + "\t\t\t\t\t\t\t\"Flag\":\t0,\n" + "\t\t\t\t\t\t\t\"Info\":\t{\n" + "\t\t\t\t\t\t\t\t\"ColId\":\t1,\n" + "\t\t\t\t\t\t\t\t\"Type\":\t9,\n" + "\t\t\t\t\t\t\t\t\"Bytes\":\t8\n" + "\t\t\t\t\t\t\t}\n" + "\t\t\t\t\t\t}],\n" + "\t\t\t\t\t\"InterBytes\":\t0\n" + "\t\t\t\t},\n" + "\t\t\t\t\"Expr\":\t{\n" + "\t\t\t\t\t\"Type\":\t4,\n" + "\t\t\t\t\t\"Column\":\t{\n" + "\t\t\t\t\t\t\"Type\":\t9,\n" + "\t\t\t\t\t\t\"ColId\":\t1,\n" + "\t\t\t\t\t\t\"Bytes\":\t8\n" + "\t\t\t\t\t}\n" + "\t\t\t\t}\n" + "\t\t\t}, {\n" + "\t\t\t\t\"Base\":\t{\n" + "\t\t\t\t\t\"Schema\":\t{\n" + "\t\t\t\t\t\t\"Type\":\t4,\n" + "\t\t\t\t\t\t\"ColId\":\t5001,\n" + "\t\t\t\t\t\t\"Bytes\":\t4\n" + "\t\t\t\t\t},\n" + "\t\t\t\t\t\"Columns\":\t[{\n" + "\t\t\t\t\t\t\t\"TableId\":\t1,\n" + "\t\t\t\t\t\t\t\"Flag\":\t0,\n" + "\t\t\t\t\t\t\t\"Info\":\t{\n" + "\t\t\t\t\t\t\t\t\"ColId\":\t2,\n" + "\t\t\t\t\t\t\t\t\"Type\":\t4,\n" + "\t\t\t\t\t\t\t\t\"Bytes\":\t4\n" + "\t\t\t\t\t\t\t}\n" + "\t\t\t\t\t\t}],\n" + "\t\t\t\t\t\"InterBytes\":\t0\n" + "\t\t\t\t},\n" + "\t\t\t\t\"Expr\":\t{\n" + "\t\t\t\t\t\"Type\":\t4,\n" + "\t\t\t\t\t\"Column\":\t{\n" + "\t\t\t\t\t\t\"Type\":\t4,\n" + "\t\t\t\t\t\t\"ColId\":\t2,\n" + "\t\t\t\t\t\t\"Bytes\":\t4\n" + "\t\t\t\t\t}\n" + "\t\t\t\t}\n" + "\t\t\t}],\n" + "\t\t\"InputSchema\":\t[{\n" + "\t\t\t\t\"Type\":\t9,\n" + "\t\t\t\t\"ColId\":\t5000,\n" + "\t\t\t\t\"Bytes\":\t8\n" + "\t\t\t}, {\n" + "\t\t\t\t\"Type\":\t4,\n" + "\t\t\t\t\"ColId\":\t5001,\n" + "\t\t\t\t\"Bytes\":\t4\n" + "\t\t\t}],\n" + "\t\t\"TableScan\":\t{\n" + "\t\t\t\"TableId\":\t1,\n" + "\t\t\t\"TableType\":\t2,\n" + "\t\t\t\"Flag\":\t0,\n" + "\t\t\t\"Window\":\t{\n" + "\t\t\t\t\"StartKey\":\t-9.2233720368547758e+18,\n" + "\t\t\t\t\"EndKey\":\t9.2233720368547758e+18\n" + "\t\t\t}\n" + "\t\t}\n" + "\t},\n" + "\t\"DataSink\":\t{\n" + "\t\t\"Name\":\t\"Dispatch\",\n" + "\t\t\"Dispatch\":\t{\n" + "\t\t}\n" + "\t}\n" + "}"; SExecTaskInfo* pTaskInfo = nullptr; DataSinkHandle sinkHandle = nullptr; - int32_t code = qCreateExecTask((SReadHandle*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); + SReadHandle handle = {.reader = reinterpret_cast(0x1), .meta = reinterpret_cast(0x1)}; + +// int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); } +//TEST(testCase, inMem_sort_Test) { +// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); +// SOrder o = {.order = TSDB_ORDER_ASC}; +// o.col.info.colId = 1; +// o.col.info.type = TSDB_DATA_TYPE_INT; +// taosArrayPush(pOrderVal, &o); +// +// SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); +// SExprInfo *exp = static_cast(calloc(1, sizeof(SExprInfo))); +// exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res"); +// taosArrayPush(pExprInfo, &exp); +// +// SExprInfo *exp1 = static_cast(calloc(1, sizeof(SExprInfo))); +// exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); +// taosArrayPush(pExprInfo, &exp1); +// +// SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal); +// +// bool newgroup = false; +// SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup); +// +// SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); +// SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); +// for(int32_t i = 0; i < pRes->info.rows; ++i) { +// char* p = colDataGet(pCol2, i); +// printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); +// } +//} + +typedef struct su { + int32_t v; + char *c; +} su; + +int32_t cmp(const void* p1, const void* p2) { + su* v1 = (su*) p1; + su* v2 = (su*) p2; + + int32_t x1 = *(int32_t*) v1->c; + int32_t x2 = *(int32_t*) v2->c; + if (x1 == x2) { + return 0; + } else { + return x1 < x2? -1:1; + } +} + +TEST(testCase, external_sort_Test) { +#if 0 + su* v = static_cast(calloc(1000000, sizeof(su))); + for(int32_t i = 0; i < 1000000; ++i) { + v[i].v = rand(); + v[i].c = static_cast(malloc(4)); + *(int32_t*) v[i].c = i; + } + + qsort(v, 1000000, sizeof(su), cmp); +// for(int32_t i = 0; i < 1000; ++i) { +// printf("%d ", v[i]); +// } +// printf("\n"); + return; +#endif + + srand(time(NULL)); + + SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); + SOrder o = {0}; + o.order = TSDB_ORDER_ASC; + o.col.info.colId = 1; + o.col.info.type = TSDB_DATA_TYPE_INT; + taosArrayPush(pOrderVal, &o); + + SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); + SExprInfo *exp = static_cast(calloc(1, sizeof(SExprInfo))); + exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res"); + taosArrayPush(pExprInfo, &exp); + + SExprInfo *exp1 = static_cast(calloc(1, sizeof(SExprInfo))); + exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); +// taosArrayPush(pExprInfo, &exp1); + + SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(50000), pExprInfo, pOrderVal); + + bool newgroup = false; + SSDataBlock* pRes = NULL; + + int32_t total = 1; + + int64_t s1 = taosGetTimestampUs(); + int32_t t = 1; + + while(1) { + int64_t s = taosGetTimestampUs(); + pRes = pOperator->exec(pOperator, &newgroup); + + int64_t e = taosGetTimestampUs(); + if (t++ == 1) { + printf("---------------elapsed:%ld\n", e - s); + } + + if (pRes == NULL) { + break; + } + +// SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); +// SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); +// for (int32_t i = 0; i < pRes->info.rows; ++i) { +// char* p = colDataGet(pCol2, i); +// printf("%d: %d, %s\n", total++, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); +// } + } + + printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf); + + int64_t s2 = taosGetTimestampUs(); + printf("total:%ld\n", s2 - s1); + + pOperator->cleanupFn(pOperator->info, 2); + tfree(exp); + tfree(exp1); + taosArrayDestroy(pExprInfo); + taosArrayDestroy(pOrderVal); +} #pragma GCC diagnostic pop diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 0ae1371be2..52a9fdd522 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -557,6 +557,10 @@ int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; } +int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf) { + return pBuf->inMemPages; +} + bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; }