diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c7111f987d..362f40696c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -551,7 +551,9 @@ typedef struct SDistinctOperatorInfo { } SDistinctOperatorInfo; typedef struct SSortedMergeOperatorInfo { - SSDataBlock *pDataBlock; + SOptrBasicInfo binfo; + +// SSDataBlock *pDataBlock; bool hasVarCol; SArray *orderInfo; // SArray @@ -563,6 +565,11 @@ typedef struct SSortedMergeOperatorInfo { int32_t bufPageSize; uint32_t sortBufSize; // max buffer size for in-memory sort int32_t numOfRowsInRes; + + char** prevRow; + int32_t resultRowFactor; + bool multiGroupResults; + bool hasGroupColData; } SSortedMergeOperatorInfo; typedef struct SOrderOperatorInfo { diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 77a38f147e..92258f619f 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -24,8 +24,8 @@ extern "C" { #include "os.h" enum { - SORT_MULTIWAY_MERGE = 0x1, - SORT_SINGLESOURCE = 0x2, + SORT_MULTISOURCE_MERGE = 0x1, + SORT_SINGLESOURCE_SORT = 0x2, }; typedef struct SMultiMergeSource { @@ -40,10 +40,10 @@ typedef struct SExternalMemSource { int32_t pageIndex; } SExternalMemSource; -typedef struct SOperatorSource { +typedef struct SGenericSource { SMultiMergeSource src; - void* param; -} SOperatorSource; + void *param; +} SGenericSource; typedef struct SMsortComparParam { void **pSources; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c7c72e615d..5286bd7ba1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5603,7 +5603,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param; taosArrayDestroy(pInfo->orderInfo); destroySortHandle(pInfo->pSortHandle); - blockDataDestroy(pInfo->pDataBlock); + blockDataDestroy(pInfo->binfo.pRes); } static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { @@ -5624,6 +5624,7 @@ static SExprInfo* exprArrayDup(SArray* pExprInfo) { return p; } +// TODO merge aggregate super table static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHandle) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); @@ -5674,21 +5675,20 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->numOfRowsInRes); } - SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL); + SSchema* p = blockDataExtractSchema(pInfo->binfo.pRes, NULL); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTIWAY_MERGE, pInfo->bufPageSize, - numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)"); + pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, + numOfBufPage, p, pInfo->binfo.pRes->info.numOfCols, "GET_TASKID(pTaskInfo)"); tfree(p); setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - SOperatorSource* ps = calloc(1, sizeof(SOperatorSource)); + SGenericSource* ps = calloc(1, sizeof(SGenericSource)); ps->param = pOperator->pDownstream[i]; - sortAddSource(pInfo->pSortHandle, ps); } @@ -5698,7 +5698,7 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->numOfRowsInRes); } static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { @@ -5729,18 +5729,22 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { tfree(pInfo); - + tfree(pOperator); terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return NULL; } + int32_t numOfOutput = taosArrayGetSize(pExprInfo); + pInfo->binfo.capacity = 4096; + pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + // pInfo->resultRowFactor = // (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); pInfo->sortBufSize = 1024 * 16; // 1MB pInfo->bufPageSize = 1024; pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes); - pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal); + pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->numOfRowsInRes); + pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal); int32_t numOfRows = 1; @@ -5749,6 +5753,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; + pOperator->numOfOutput = numOfOutput; pOperator->pTaskInfo = pTaskInfo; pOperator->exec = doSortedMerge; @@ -5775,13 +5780,15 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE, pInfo->bufPageSize, + pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)"); tfree(p); setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); - sortAddSource(pInfo->pSortHandle, pOperator); + SGenericSource* ps = calloc(1, sizeof(SGenericSource)); + ps->param = pOperator; + sortAddSource(pInfo->pSortHandle, ps); // TODO set error code; int32_t code = sortOpen(pInfo->pSortHandle); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 196e22d92d..4854f0cbed 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -38,8 +38,6 @@ typedef struct SSortHandle { SArray *pOrderInfo; bool nullFirst; bool hasVarCol; - - SArray *pSources; // TODO refactor, remove it SArray *pOrderedSource; _sort_fetch_block_fn_t fetchfp; @@ -96,7 +94,6 @@ SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, pSortHandle->pageSize = pageSize; pSortHandle->numOfPages = numOfPages; pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); - pSortHandle->pSources = taosArrayInit(4, POINTER_BYTES); pSortHandle->pOrderInfo = pOrderInfo; pSortHandle->nullFirst = nullFirst; pSortHandle->cmpParam.orderInfo = pOrderInfo; @@ -113,8 +110,6 @@ SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, void destroySortHandle(SSortHandle* pSortHandle) { sortClose(pSortHandle); - - taosArrayDestroy(pSortHandle->pSources); if (pSortHandle->pMergeTree != NULL) { tMergeTreeDestroy(pSortHandle->pMergeTree); } @@ -125,11 +120,7 @@ void destroySortHandle(SSortHandle* pSortHandle) { } int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource) { - if (pSortHandle->type == SORT_SINGLESOURCE) { - pSortHandle->pParam = pSource; - } else { - taosArrayPush(pSortHandle->pOrderedSource, &pSource); - } + taosArrayPush(pSortHandle->pOrderedSource, &pSource); } static SSDataBlock* createDataBlock(const SSDataBlock* pDataBlock) { @@ -220,7 +211,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int int32_t code = 0; - if (pHandle->type == SORT_SINGLESOURCE) { + if (pHandle->type == SORT_SINGLESOURCE_SORT) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SExternalMemSource* pSource = cmpParam->pSources[i]; SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -244,7 +235,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int } for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { - SOperatorSource* pSource = cmpParam->pSources[i]; + SGenericSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); } } @@ -296,7 +287,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa pSource->pageIndex = -1; pSource->src.pBlock = blockDataDestroy(pSource->src.pBlock); } else { - if (pHandle->type == SORT_SINGLESOURCE) { + if (pHandle->type == SORT_SINGLESOURCE_SORT) { SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); SFilePage* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo)); @@ -307,7 +298,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa releaseBufPage(pHandle->pBuf, pPage); } else { - pSource->src.pBlock = pHandle->fetchfp(((SOperatorSource*)pSource)->param); + pSource->src.pBlock = pHandle->fetchfp(((SGenericSource*)pSource)->param); if (pSource->src.pBlock == NULL) { (*numOfCompleted) += 1; pSource->src.rowIndex = -1; @@ -530,8 +521,8 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { qDebug("%s %d round mergesort, elapsed:%"PRId64" readDisk:%.2f Kb, flushDisk:%.2f Kb", pHandle->idStr, t + 1, el, statis.loadBytes/1024.0, statis.flushBytes/1024.0); - if (pHandle->type == SORT_MULTIWAY_MERGE) { - pHandle->type = SORT_SINGLESOURCE; + if (pHandle->type == SORT_MULTISOURCE_MERGE) { + pHandle->type = SORT_SINGLESOURCE_SORT; pHandle->comparFn = msortComparFn; } } @@ -540,26 +531,15 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { return 0; } -int32_t sortOpen(SSortHandle* pHandle) { - if (pHandle->opened) { - return 0; - } - - if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - return -1; - } - - pHandle->opened = true; - +static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; - if (pHandle->type == SORT_SINGLESOURCE) { - if (pHandle->pParam == NULL) { - qError("%s sort source not set yet", pHandle->idStr); - return -1; - } + + if (pHandle->type == SORT_SINGLESOURCE_SORT) { + SGenericSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); + taosArrayClear(pHandle->pOrderedSource); while (1) { - SSDataBlock* pBlock = pHandle->fetchfp(pHandle->pParam); + SSDataBlock* pBlock = pHandle->fetchfp(source->param); if (pBlock == NULL) { break; } @@ -604,12 +584,31 @@ int32_t sortOpen(SSortHandle* pHandle) { doAddToBuf(pHandle->pDataBlock, pHandle); } } - } else { - // do nothing + + tfree(source); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t sortOpen(SSortHandle* pHandle) { + if (pHandle->opened) { + return 0; + } + + if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { + return -1; + } + + pHandle->opened = true; + + int32_t code = createInitialSortedMultiSources(pHandle); + if (code != TSDB_CODE_SUCCESS) { + return code; } // do internal sort - int32_t code = doInternalMergeSort(pHandle); + code = doInternalMergeSort(pHandle); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 2651b95b0f..455e731c05 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -356,7 +356,7 @@ TEST(testCase, external_sort_Test) { taosArrayDestroy(pExprInfo); taosArrayDestroy(pOrderVal); } -#endif + TEST(testCase, sorted_merge_Test) { srand(time(NULL)); @@ -424,5 +424,5 @@ TEST(testCase, sorted_merge_Test) { taosArrayDestroy(pExprInfo); taosArrayDestroy(pOrderVal); } - +#endif #pragma GCC diagnostic pop diff --git a/source/libs/executor/test/executorUtilTests.cpp b/source/libs/executor/test/executorUtilTests.cpp index 5c94b89c3d..754d4b6b80 100644 --- a/source/libs/executor/test/executorUtilTests.cpp +++ b/source/libs/executor/test/executorUtilTests.cpp @@ -76,12 +76,12 @@ int32_t docomp(const void* p1, const void* p2, void* param) { int32_t pRightIdx = *(int32_t *)p2; SMsortComparParam *pParam = (SMsortComparParam *)param; - SOperatorSource** px = reinterpret_cast(pParam->pSources); + SGenericSource** px = reinterpret_cast(pParam->pSources); SArray *pInfo = pParam->orderInfo; - SOperatorSource* pLeftSource = px[pLeftIdx]; - SOperatorSource* pRightSource = px[pRightIdx]; + SGenericSource* pLeftSource = px[pLeftIdx]; + SGenericSource* pRightSource = px[pRightIdx]; // this input is exhausted, set the special value to denote this if (pLeftSource->src.rowIndex == -1) { @@ -162,7 +162,7 @@ int32_t docomp(const void* p1, const void* p2, void* param) { // SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); // taosArrayPush(orderInfo, &oi); // -// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE, 1024, 5, "test_abc"); +// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, "test_abc"); // setFetchRawDataFp(phandle, getSingleColDummyBlock); // sortAddSource(phandle, &numOfRows); // @@ -182,42 +182,50 @@ int32_t docomp(const void* p1, const void* p2, void* param) { // destroySortHandle(phandle); //} // -//TEST(testCase, external_mem_sort_Test) { -// totalcount = 50; -// startVal = 100000; -// -// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); -// SOrder o = {.order = TSDB_ORDER_ASC}; -// o.col.info.colId = 1; -// o.col.info.type = TSDB_DATA_TYPE_INT; -// taosArrayPush(pOrderVal, &o); -// +TEST(testCase, external_mem_sort_Test) { + SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); + SOrder o = {.order = TSDB_ORDER_ASC}; + o.col.info.colId = 1; + o.col.info.type = TSDB_DATA_TYPE_INT; + taosArrayPush(pOrderVal, &o); + // int32_t numOfRows = 1000; -// SBlockOrderInfo oi = {0}; -// oi.order = TSDB_ORDER_ASC; -// oi.colIndex = 0; -// SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); -// taosArrayPush(orderInfo, &oi); -// -// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE, 1024, 5, "test_abc"); -// setFetchRawDataFp(phandle, getSingleColDummyBlock); -// sortAddSource(phandle, &numOfRows); -// -// int32_t code = sortOpen(phandle); -// int32_t row = 1; -// -// while(1) { -// STupleHandle* pTupleHandle = sortNextTuple(phandle); -// if (pTupleHandle == NULL) { -// break; -// } -// -// void* v = sortGetValue(pTupleHandle, 0); -// printf("%d: %d\n", row++, *(int32_t*) v); -// -// } -// destroySortHandle(phandle); -//} + SBlockOrderInfo oi = {0}; + oi.order = TSDB_ORDER_ASC; + oi.colIndex = 0; + SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); + taosArrayPush(orderInfo, &oi); + + SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, }; + + SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc"); + setFetchRawDataFp(phandle, getSingleColDummyBlock); + + _info* pInfo = (_info*) calloc(1, sizeof(_info)); + pInfo->startVal = 100000; + pInfo->pageRows = 1000; + pInfo->count = 50; + + SGenericSource* ps = static_cast(calloc(1, sizeof(SGenericSource))); + ps->param = pInfo; + + sortAddSource(phandle, ps); + + int32_t code = sortOpen(phandle); + int32_t row = 1; + + while(1) { + STupleHandle* pTupleHandle = sortNextTuple(phandle); + if (pTupleHandle == NULL) { + break; + } + + void* v = sortGetValue(pTupleHandle, 0); + printf("%d: %d\n", row++, *(int32_t*) v); + + } + destroySortHandle(phandle); +} //TEST(testCase, ordered_merge_sort_Test) { // SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); @@ -234,7 +242,7 @@ int32_t docomp(const void* p1, const void* p2, void* param) { // taosArrayPush(orderInfo, &oi); // // SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4}; -// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTIWAY_MERGE, 1024, 5, &s, 1,"test_abc"); +// SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc"); // setFetchRawDataFp(phandle, getSingleColDummyBlock); // setComparFn(phandle, docomp); // diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index f014d8e07e..212b3f3067 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -562,7 +562,7 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { } void setBufPageDirty(SFilePage* pPage, bool dirty) { - int32_t offset = offsetof(SPageInfo, pData); // todo extract method + int32_t offset = offsetof(SPageInfo, pData); char* p = (char*)pPage - offset; SPageInfo* ppi = ((SPageInfo**) p)[0];