feat: order by distributed split
This commit is contained in:
parent
a92fb4b788
commit
ce3aacf4d7
|
@ -2522,8 +2522,8 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
|
||||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||||
SArray* pColList) {
|
SArray* pColList) {
|
||||||
if (pColList == NULL) { // data from other sources
|
if (pColList == NULL) { // data from other sources
|
||||||
blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
|
blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
|
||||||
} else { // extract data according to pColList
|
} else { // extract data according to pColList
|
||||||
|
@ -2677,7 +2677,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
|
||||||
|
|
||||||
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
||||||
code = setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
|
code = setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
|
||||||
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
|
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosMemoryFreeClear(pDataInfo->pRsp);
|
taosMemoryFreeClear(pDataInfo->pRsp);
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -2790,7 +2790,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
|
||||||
int32_t code =
|
int32_t code =
|
||||||
setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
|
setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
|
||||||
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
|
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
|
||||||
|
|
||||||
if (pRsp->completed == 1) {
|
if (pRsp->completed == 1) {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||||
|
@ -2856,7 +2856,7 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
return seqLoadRemoteData(pOperator);
|
return seqLoadRemoteData(pOperator);
|
||||||
} else {
|
} else {
|
||||||
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
|
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
|
||||||
// return concurrentlyLoadRemoteData(pOperator);
|
// return concurrentlyLoadRemoteData(pOperator);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2911,18 +2911,18 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->seqLoadData = false;
|
pInfo->seqLoadData = false;
|
||||||
pInfo->pTransporter = pTransporter;
|
pInfo->pTransporter = pTransporter;
|
||||||
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
|
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
|
||||||
tsem_init(&pInfo->ready, 0, 0);
|
tsem_init(&pInfo->ready, 0, 0);
|
||||||
|
|
||||||
pOperator->name = "ExchangeOperator";
|
pOperator->name = "ExchangeOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
|
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
||||||
destroyExchangeOperatorInfo, NULL, NULL, NULL);
|
destroyExchangeOperatorInfo, NULL, NULL, NULL);
|
||||||
|
@ -4389,7 +4389,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
STimeWindowAggSupp twSup = {
|
STimeWindowAggSupp twSup = {
|
||||||
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
.waterMark = pTableScanNode->watermark, .calTrigger = pTableScanNode->triggerType, .maxTs = INT64_MIN};
|
||||||
tsdbReaderT pDataReader = NULL;
|
tsdbReaderT pDataReader = NULL;
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||||
|
@ -4517,17 +4517,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == type) {
|
||||||
SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
|
SMergePhysiNode* pMergePhyNode = (SMergePhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
|
SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
SArray* sortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* pColList = NULL;
|
SArray* pColList =
|
||||||
//extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
|
|
||||||
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo);
|
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) {
|
||||||
|
|
|
@ -1191,6 +1191,9 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
|
||||||
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
|
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
|
||||||
&pMerge->pTargets);
|
&pMerge->pTargets);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
*pPhyNode = (SPhysiNode*)pMerge;
|
*pPhyNode = (SPhysiNode*)pMerge;
|
||||||
|
|
Loading…
Reference in New Issue