feat: group sort read child input during tsortOpen

This commit is contained in:
shenglian zhou 2022-06-22 09:42:04 +08:00
parent 805ce63fdd
commit faebe29ab7
1 changed files with 104 additions and 99 deletions

View File

@ -22,10 +22,11 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
goto _error; goto _error;
} }
@ -44,16 +45,17 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
pInfo->pColMatchInfo = pColMatchColInfo; ;
pOperator->name = "SortOperator"; pInfo->pColMatchInfo = pColMatchColInfo;
pOperator->name = "SortOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
// lazy evaluation for the following parameter since the input datablock is not known till now. // lazy evaluation for the following parameter since the input datablock is not known till now.
// pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2;
@ -146,8 +148,8 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
SOperatorInfo* pOperator = param; SOperatorInfo* pOperator = param;
SSortOperatorInfo* pSort = pOperator->info; SSortOperatorInfo* pSort = pOperator->info;
if (pOperator->exprSupp.pExprInfo != NULL) { if (pOperator->exprSupp.pExprInfo != NULL) {
int32_t code = int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); pOperator->exprSupp.numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code); longjmp(pOperator->pTaskInfo->env, code);
} }
@ -165,8 +167,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
pInfo->startTs = taosGetTimestampUs(); pInfo->startTs = taosGetTimestampUs();
// pInfo->binfo.pRes is not equalled to the input datablock. // pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
NULL, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
@ -234,28 +235,27 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
//===================================================================================== //=====================================================================================
// Group Sort Operator // Group Sort Operator
typedef enum EChildOperatorStatus {CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED} EChildOperatorStatus; typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
typedef struct SGroupSortOperatorInfo { typedef struct SGroupSortOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
uint32_t sortBufSize; // max buffer size for in-memory sort SArray* pSortInfo;
SArray* pSortInfo; SArray* pColMatchInfo;
SArray* pColMatchInfo; // for index map from table scan output
int32_t bufPageSize;
int64_t startTs; // sort start time int64_t startTs;
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. uint64_t sortElapsed;
SSDataBlock* prefetchedDatablock; bool hasGroupId;
bool hasGroupId; uint64_t currGroupId;
uint64_t currGroupId;
SSortHandle* pCurrSortHandle;
SSDataBlock* prefetchedSortInput;
SSortHandle* pCurrSortHandle;
EChildOperatorStatus childOpStatus; EChildOperatorStatus childOpStatus;
SSortExecInfo sortExecInfo;
} SGroupSortOperatorInfo; } SGroupSortOperatorInfo;
SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SArray* pColMatchInfo, SGroupSortOperatorInfo* pInfo) {
SGroupSortOperatorInfo* pInfo) {
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
SSDataBlock* p = tsortGetSortedDataBlock(pHandle); SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
@ -296,30 +296,28 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
} }
typedef struct SGroupSortSourceParam { typedef struct SGroupSortSourceParam {
SOperatorInfo* childOpInfo; SOperatorInfo* childOpInfo;
SGroupSortOperatorInfo* grpSortOpInfo; SGroupSortOperatorInfo* grpSortOpInfo;
} SGroupSortSourceParam; } SGroupSortSourceParam;
SSDataBlock* fetchNextGroupSortDataBlock(void* param) { SSDataBlock* fetchNextGroupSortDataBlock(void* param) {
SGroupSortSourceParam* source = param; SGroupSortSourceParam* source = param;
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo; SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
if (grpSortOpInfo->prefetchedDatablock) { if (grpSortOpInfo->prefetchedSortInput) {
SSDataBlock* pBlock = grpSortOpInfo->prefetchedDatablock; SSDataBlock* block = grpSortOpInfo->prefetchedSortInput;
grpSortOpInfo->prefetchedDatablock = NULL; grpSortOpInfo->prefetchedSortInput = NULL;
return pBlock; return block;
} else { } else {
SOperatorInfo* childOp = source->childOpInfo; SOperatorInfo* childOp = source->childOpInfo;
SSDataBlock* block = childOp->fpSet.getNextFn(childOp); SSDataBlock* block = childOp->fpSet.getNextFn(childOp);
if (block != NULL) { if (block != NULL) {
if (block->info.groupId == grpSortOpInfo->currGroupId) { if (block->info.groupId == grpSortOpInfo->currGroupId) {
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP; grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
return block; return block;
} else { } else {
grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP; grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
grpSortOpInfo->prefetchedDatablock = block; grpSortOpInfo->prefetchedSortInput = block;
return NULL; return NULL;
} }
} else { } else {
@ -329,19 +327,17 @@ SSDataBlock* fetchNextGroupSortDataBlock(void* param) {
} }
} }
int32_t beginSortGroup(SOperatorInfo* pOperator) { int32_t beginSortGroup(SOperatorInfo* pOperator) {
SGroupSortOperatorInfo* pInfo = pOperator->info; SGroupSortOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// pInfo->binfo.pRes is not equalled to the input datablock. // pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pCurrSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, pInfo->pCurrSortHandle =
NULL, pTaskInfo->id.str); tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator); tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
param->childOpInfo = pOperator->pDownstream[0]; param->childOpInfo = pOperator->pDownstream[0];
param->grpSortOpInfo = pInfo; param->grpSortOpInfo = pInfo;
@ -360,12 +356,18 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
int32_t finishSortGroup(SOperatorInfo* pOperator) { int32_t finishSortGroup(SOperatorInfo* pOperator) {
SGroupSortOperatorInfo* pInfo = pOperator->info; SGroupSortOperatorInfo* pInfo = pOperator->info;
SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pCurrSortHandle);
pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod;
pInfo->sortExecInfo.sortBuffer = sortExecInfo.sortBuffer;
pInfo->sortExecInfo.loops += sortExecInfo.loops;
pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes;
pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes;
if (pInfo->pCurrSortHandle != NULL) { if (pInfo->pCurrSortHandle != NULL) {
tsortDestroySortHandle(pInfo->pCurrSortHandle); tsortDestroySortHandle(pInfo->pCurrSortHandle);
} }
pInfo->pCurrSortHandle = NULL; pInfo->pCurrSortHandle = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
@ -373,7 +375,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupSortOperatorInfo* pInfo = pOperator->info; SGroupSortOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator); int32_t code = pOperator->fpSet._openFn(pOperator);
@ -384,46 +386,50 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
if (!pInfo->hasGroupId) { if (!pInfo->hasGroupId) {
pInfo->hasGroupId = true; pInfo->hasGroupId = true;
pInfo->prefetchedDatablock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]);
pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId; pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId;
pInfo->childOpStatus = CHILD_OP_NEW_GROUP; pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
beginSortGroup(pOperator); beginSortGroup(pOperator);
} }
SSDataBlock* pBlock = NULL;
while (pInfo->childOpStatus != CHILD_OP_FINISHED) {
pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->pColMatchInfo, pInfo);
SSDataBlock* pBlock = NULL;
while (pInfo->pCurrSortHandle != NULL) {
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP);
pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->pColMatchInfo, pInfo);
if (pBlock != NULL) { if (pBlock != NULL) {
pBlock->info.groupId = pInfo->currGroupId; pBlock->info.groupId = pInfo->currGroupId;
pOperator->resultInfo.totalRows += pBlock->info.rows; pOperator->resultInfo.totalRows += pBlock->info.rows;
return pBlock;
} else {
if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
finishSortGroup(pOperator);
pInfo->currGroupId = pInfo->prefetchedSortInput->info.groupId;
beginSortGroup(pOperator);
} else if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
finishSortGroup(pOperator);
doSetOperatorCompleted(pOperator);
return NULL;
}
} }
if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
finishSortGroup(pOperator);
pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId;
beginSortGroup(pOperator);
}
return pBlock;
} }
return NULL;
if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
finishSortGroup(pOperator);
doSetOperatorCompleted(pOperator);
}
return pBlock;
} }
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
//TODO: accumulate all sort handles; SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)pOptr->info;
*pOptrExplain = &pInfo->sortExecInfo;
*len = sizeof(SSortExecInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//TODO: // TODO:
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode,
SExecTaskInfo* pTaskInfo) {
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) { if (pInfo == NULL || pOperator == NULL /* || rowSize > 100 * 1024 * 1024*/) {
goto _error; goto _error;
} }
@ -442,20 +448,21 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysi
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);; pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);
pInfo->pColMatchInfo = pColMatchColInfo; ;
pOperator->name = "GroupSortOperator"; pInfo->pColMatchInfo = pColMatchColInfo;
//TODO pOperator->name = "GroupSortOperator";
// TODO
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL,
getGroupSortExplainExecInfo); NULL, getGroupSortExplainExecInfo);
int32_t code = appendDownstream(pOperator, &downstream, 1); int32_t code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -479,10 +486,9 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pColMatchInfo); taosArrayDestroy(pInfo->pColMatchInfo);
} }
// TODO: sort group
//TODO: sort group // TODO: msortCompare compare group id in multiway merge sort.
//TODO: msortCompare compare group id in multiway merge sort. // TODO: table merge scan, group first, then for each group, multiple readers
//TODO: table merge scan, group first, then for each group, multiple readers
//===================================================================================== //=====================================================================================
// Multiway Sort Merge operator // Multiway Sort Merge operator
@ -513,8 +519,8 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str); pInfo->pInputBlock, pTaskInfo->id.str);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
@ -540,7 +546,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
SArray* pColMatchInfo, SOperatorInfo* pOperator) { SArray* pColMatchInfo, SOperatorInfo* pOperator) {
SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info; SMultiwaySortMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pDataBlock); blockDataCleanup(pDataBlock);
@ -641,24 +647,23 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
goto _error; goto _error;
} }
initResultSizeInfo(pOperator, 1024); initResultSizeInfo(pOperator, 1024);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = pSortInfo; pInfo->pSortInfo = pSortInfo;
pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pColMatchInfo = pColMatchColInfo;
pInfo->pInputBlock = pInputBlock; pInfo->pInputBlock = pInputBlock;
pOperator->name = "MultiwaySortMerge"; pOperator->name = "MultiwaySortMerge";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
// one additional is reserved for merged result. // one additional is reserved for merged result.
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL, createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,