feat: add sort group operator
This commit is contained in:
parent
22f0ae45ac
commit
805ce63fdd
|
@ -232,6 +232,260 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//=====================================================================================
|
||||
// Group Sort Operator
|
||||
typedef enum EChildOperatorStatus {CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED} EChildOperatorStatus;
|
||||
|
||||
typedef struct SGroupSortOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
SArray* pSortInfo;
|
||||
SArray* pColMatchInfo; // for index map from table scan output
|
||||
int32_t bufPageSize;
|
||||
|
||||
int64_t startTs; // sort start time
|
||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||
SSDataBlock* prefetchedDatablock;
|
||||
bool hasGroupId;
|
||||
uint64_t currGroupId;
|
||||
SSortHandle* pCurrSortHandle;
|
||||
|
||||
EChildOperatorStatus childOpStatus;
|
||||
} SGroupSortOperatorInfo;
|
||||
|
||||
|
||||
SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||
SGroupSortOperatorInfo* pInfo) {
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
if (p->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (p->info.rows > 0) {
|
||||
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
||||
colDataAssign(pDst, pSrc, p->info.rows);
|
||||
}
|
||||
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.capacity = p->info.rows;
|
||||
}
|
||||
|
||||
blockDataDestroy(p);
|
||||
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||
}
|
||||
|
||||
|
||||
typedef struct SGroupSortSourceParam {
|
||||
SOperatorInfo* childOpInfo;
|
||||
SGroupSortOperatorInfo* grpSortOpInfo;
|
||||
} SGroupSortSourceParam;
|
||||
|
||||
|
||||
SSDataBlock* fetchNextGroupSortDataBlock(void* param) {
|
||||
SGroupSortSourceParam* source = param;
|
||||
SGroupSortOperatorInfo* grpSortOpInfo = source->grpSortOpInfo;
|
||||
if (grpSortOpInfo->prefetchedDatablock) {
|
||||
SSDataBlock* pBlock = grpSortOpInfo->prefetchedDatablock;
|
||||
grpSortOpInfo->prefetchedDatablock = NULL;
|
||||
return pBlock;
|
||||
} else {
|
||||
SOperatorInfo* childOp = source->childOpInfo;
|
||||
SSDataBlock* block = childOp->fpSet.getNextFn(childOp);
|
||||
if (block != NULL) {
|
||||
if (block->info.groupId == grpSortOpInfo->currGroupId) {
|
||||
grpSortOpInfo->childOpStatus = CHILD_OP_SAME_GROUP;
|
||||
return block;
|
||||
} else {
|
||||
grpSortOpInfo->childOpStatus = CHILD_OP_NEW_GROUP;
|
||||
grpSortOpInfo->prefetchedDatablock = block;
|
||||
return NULL;
|
||||
}
|
||||
} else {
|
||||
grpSortOpInfo->childOpStatus = CHILD_OP_FINISHED;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int32_t beginSortGroup(SOperatorInfo* pOperator) {
|
||||
SGroupSortOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
|
||||
// pInfo->binfo.pRes is not equalled to the input datablock.
|
||||
pInfo->pCurrSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1,
|
||||
NULL, pTaskInfo->id.str);
|
||||
|
||||
tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
|
||||
|
||||
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
|
||||
param->childOpInfo = pOperator->pDownstream[0];
|
||||
param->grpSortOpInfo = pInfo;
|
||||
ps->param = param;
|
||||
tsortAddSource(pInfo->pCurrSortHandle, ps);
|
||||
|
||||
int32_t code = tsortOpen(pInfo->pCurrSortHandle);
|
||||
taosMemoryFreeClear(ps);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t finishSortGroup(SOperatorInfo* pOperator) {
|
||||
SGroupSortOperatorInfo* pInfo = pOperator->info;
|
||||
if (pInfo->pCurrSortHandle != NULL) {
|
||||
tsortDestroySortHandle(pInfo->pCurrSortHandle);
|
||||
}
|
||||
pInfo->pCurrSortHandle = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
}
|
||||
|
||||
SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SGroupSortOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->hasGroupId = true;
|
||||
|
||||
pInfo->prefetchedDatablock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]);
|
||||
pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId;
|
||||
pInfo->childOpStatus = CHILD_OP_NEW_GROUP;
|
||||
beginSortGroup(pOperator);
|
||||
}
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while (pInfo->childOpStatus != CHILD_OP_FINISHED) {
|
||||
pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
|
||||
pInfo->pColMatchInfo, pInfo);
|
||||
|
||||
if (pBlock != NULL) {
|
||||
pBlock->info.groupId = pInfo->currGroupId;
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
}
|
||||
if (pInfo->childOpStatus == CHILD_OP_NEW_GROUP) {
|
||||
finishSortGroup(pOperator);
|
||||
pInfo->currGroupId = pInfo->prefetchedDatablock->info.groupId;
|
||||
beginSortGroup(pOperator);
|
||||
}
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
if (pInfo->childOpStatus == CHILD_OP_FINISHED) {
|
||||
finishSortGroup(pOperator);
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
|
||||
//TODO: accumulate all sort handles;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
//TODO:
|
||||
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo) {
|
||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL/* || rowSize > 100 * 1024 * 1024*/) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SDataBlockDescNode* pDescNode = pSortPhyNode->node.pOutputDataBlockDesc;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||
SExprInfo* pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols);
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
SArray* pColMatchColInfo =
|
||||
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||
|
||||
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
|
||||
initResultSizeInfo(pOperator, 1024);
|
||||
|
||||
pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys);;
|
||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||
pOperator->name = "GroupSortOperator";
|
||||
//TODO
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL,
|
||||
getGroupSortExplainExecInfo);
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pInfo);
|
||||
taosMemoryFree(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param;
|
||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||
|
||||
taosArrayDestroy(pInfo->pSortInfo);
|
||||
taosArrayDestroy(pInfo->pColMatchInfo);
|
||||
}
|
||||
|
||||
|
||||
//TODO: sort group
|
||||
//TODO: msortCompare compare group id in multiway merge sort.
|
||||
//TODO: table merge scan, group first, then for each group, multiple readers
|
||||
|
||||
//=====================================================================================
|
||||
// Multiway Sort Merge operator
|
||||
typedef struct SMultiwaySortMergeOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
|
||||
|
|
Loading…
Reference in New Issue