diff --git a/include/libs/planner/plannerOp.h b/include/libs/planner/plannerOp.h index 9030ffc946..840064025e 100644 --- a/include/libs/planner/plannerOp.h +++ b/include/libs/planner/plannerOp.h @@ -47,5 +47,6 @@ OP_ENUM_MACRO(AllTimeWindow) OP_ENUM_MACRO(AllMultiTableTimeInterval) OP_ENUM_MACRO(Order) OP_ENUM_MACRO(Exchange) +OP_ENUM_MACRO(SortMerge) //OP_ENUM_MACRO(TableScan) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 232b54554f..4265864f09 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -549,27 +549,19 @@ typedef struct SDistinctOperatorInfo { SArray* pDistinctDataInfo; } SDistinctOperatorInfo; -struct SGlobalMerger; - -typedef struct SMultiwayMergeInfo { - struct SGlobalMerger* pMerge; - SOptrBasicInfo binfo; - int32_t bufCapacity; - int64_t seed; - char** prevRow; - SArray* orderColumnList; - int32_t resultRowFactor; - - bool hasGroupColData; - char** currentGroupColData; - SArray* groupColumnList; - bool hasDataBlockForNewGroup; - SSDataBlock* pExistBlock; - - SArray* udfInfo; - bool hasPrev; - bool multiGroupResults; -} SMultiwayMergeInfo; +typedef struct SSortMergeOperatorInfo { + SOptrBasicInfo binfo; + SArray *orderInfo; // SArray + SArray* groupColumnList; + bool hasDataBlockForNewGroup; + char** currentGroupColData; + SArray* udfInfo; + int32_t numOfSources; +// char** prevRow; +// int32_t resultRowFactor; +// bool multiGroupResults; +// bool hasGroupColData; +} SSortMergeOperatorInfo; typedef struct SMsortComparParam { struct SExternalMemSource **pSources; @@ -592,6 +584,7 @@ typedef struct SOrderOperatorInfo { SMsortComparParam cmpParam; + // TODO extact struct int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. uint64_t totalSize; // total load bytes from remote @@ -642,8 +635,7 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal); -SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, - SOrder* pOrderVal); +SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, void* param, SArray* pUdfInfo, SExecTaskInfo* pTaskInfo); // SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); // SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a4197b4414..593edf257a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5600,12 +5600,9 @@ SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { } static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { - SMultiwayMergeInfo *pInfo = (SMultiwayMergeInfo*) param; + SSortMergeOperatorInfo *pInfo = (SSortMergeOperatorInfo*) param; destroyBasicOperatorInfo(&pInfo->binfo, numOfOutput); - - taosArrayDestroy(pInfo->orderColumnList); taosArrayDestroy(pInfo->groupColumnList); - tfree(pInfo->prevRow); tfree(pInfo->currentGroupColData); } @@ -5616,118 +5613,82 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevRow); } -SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, - SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { - SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); - - pInfo->resultRowFactor = - (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); - - pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx - - pInfo->multiGroupResults = groupResultMixedUp; - pInfo->pMerge = param; - pInfo->bufCapacity = 4096; - pInfo->udfInfo = pUdfInfo; - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); - pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); - pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); - - // TODO refactor - int32_t len = 0; - for(int32_t i = 0; i < numOfOutput; ++i) { -// len += pExpr[i].base.; +static SExprInfo* exprArrayDup(SArray* pExprInfo) { + size_t numOfOutput = taosArrayGetSize(pExprInfo); + SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); + for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); + assignExprInfo(&p[i], pExpr); } - int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; - pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); + return p; +} + +static SSDataBlock* doSortMerge(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSortMergeOperatorInfo* pInfo = pOperator->info; + + for(int32_t i = 0; i < pInfo->numOfSources; ++i) { + SSDataBlock* pBlock = pOperator->pDownstream[i]->exec(pOperator->pDownstream[i], newgroup); + + } + + return NULL; +} + +SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, void* param, SArray* pUdfInfo, SExecTaskInfo* pTaskInfo) { + SSortMergeOperatorInfo* pInfo = calloc(1, sizeof(SSortMergeOperatorInfo)); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + +// pInfo->resultRowFactor = +// (int32_t)(getRowNumForMultioutput(pRuntimeEnv->pQueryAttr, pRuntimeEnv->pQueryAttr->topBotQuery, false)); + + int32_t numOfOutput = taosArrayGetSize(pExprInfo); + pInfo->binfo.capacity = 4096; + pInfo->udfInfo = pUdfInfo; + pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); + pInfo->binfo.pCtx = createSqlFunctionCtx_rv( pExprInfo, &pInfo->binfo.rowCellInfoOffset, &pInfo->binfo.resRowSize); + + int32_t numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0; +// pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len)); int32_t offset = POINTER_BYTES * numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { - pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; - - SColIndex* index = taosArrayGet(pInfo->orderColumnList, i); - offset += pExpr[index->colIndex].base.resSchema.bytes; - } - - numOfCols = (pInfo->groupColumnList != NULL)? (int32_t)taosArrayGetSize(pInfo->groupColumnList):0; - pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len)); - offset = POINTER_BYTES * numOfCols; - - for(int32_t i = 0; i < numOfCols; ++i) { - pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset; - - SColIndex* index = taosArrayGet(pInfo->groupColumnList, i); - offset += pExpr[index->colIndex].base.resSchema.bytes; - } +// for(int32_t i = 0; i < numOfCols; ++i) { +// pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset; +// +// SColIndex* index = taosArrayGet(pInfo->groupColumnList, i); +// offset += pExpr[index->colIndex].base.resSchema.bytes; +// } initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); - pInfo->seed = rand(); - setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE); + int32_t numOfRows = 1; +// setDefaultOutputBuf_rv(pExprInfo, numOfRows); - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "GlobalAggregate"; -// pOperator->operatorType = OP_GlobalAggregate; + pOperator->name = "SortMerge"; + pOperator->operatorType = OP_SortMerge; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->pExpr = pExpr; + pOperator->pExpr = exprArrayDup(pExprInfo); pOperator->numOfOutput = numOfOutput; - pOperator->pRuntimeEnv = pRuntimeEnv; -// pOperator->exec = doGlobalAggregate; + + pOperator->pTaskInfo = pTaskInfo; + pOperator->exec = doSortMerge; pOperator->cleanupFn = destroyGlobalAggOperatorInfo; appendDownstream(pOperator, downstream); return pOperator; } -SOperatorInfo *createMultiwaySortOperatorInfo(STaskRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, - int32_t numOfRows, void *merger) { - SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); - - pInfo->pMerge = merger; - pInfo->bufCapacity = numOfRows; - pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); - - { // todo extract method to create prev compare buffer - int32_t len = 0; - for(int32_t i = 0; i < numOfOutput; ++i) { -// len += pExpr[i].base.colBytes; - } - - int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; - pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - - int32_t offset = POINTER_BYTES * numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { - pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; - - SColIndex* index = taosArrayGet(pInfo->orderColumnList, i); -// offset += pExpr[index->colIndex].base.colBytes; - } - } - - SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "MultiwaySortOperator"; -// pOperator->operatorType = OP_MultiwayMergeSort; - pOperator->blockingOptr = false; - pOperator->status = OP_IN_EXECUTING; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->numOfOutput = numOfOutput; - pOperator->pExpr = pExpr; -// pOperator->exec = doMultiwayMergeSort; - pOperator->cleanupFn = destroyGlobalAggOperatorInfo; - return pOperator; -} - typedef struct SExternalMemSource { SArray* pageIdList; int32_t pageIndex; - int32_t sourceId; int32_t rowIndex; SSDataBlock *pBlock; } SExternalMemSource; @@ -5879,8 +5840,6 @@ static int32_t doAddNewSource(SOrderOperatorInfo* pInfo, SArray* pAllSources, in } pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId); - pSource->sourceId = pInfo->sourceId; - pSource->pBlock = calloc(1, sizeof(SSDataBlock)); pSource->pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); pSource->pBlock->info.numOfCols = numOfCols; @@ -7197,17 +7156,6 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SArray* pExprInfo, int32_t n return TSDB_CODE_SUCCESS; } -static SExprInfo* exprArrayDup(SArray* pExprInfo) { - size_t numOfOutput = taosArrayGetSize(pExprInfo); - SExprInfo* p = calloc(numOfOutput, sizeof(SExprInfo)); - for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { - SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); - assignExprInfo(&p[i], pExpr); - } - - return p; -} - SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); @@ -8198,12 +8146,12 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask // Transfer the Array of STableKeyInfo into uid list. size_t numOfTables = taosArrayGetSize(pa); + idList = taosArrayInit(numOfTables, sizeof(uint64_t)); + for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pkeyInfo = taosArrayGet(pa, i); taosArrayPush(idList, &pkeyInfo->uid); } - - idList = taosArrayInit(numOfTables, sizeof(uint64_t)); } else { idList = taosArrayInit(4, sizeof(uint64_t)); }