feat: support non_sort mode

This commit is contained in:
dapan1121 2023-11-07 09:23:06 +08:00
parent 701a148ad1
commit 0ed4d1507f
15 changed files with 636 additions and 308 deletions

View File

@ -364,6 +364,11 @@ typedef struct SSortExecInfo {
int32_t readBytes; // read io bytes
} SSortExecInfo;
typedef struct SNonSortExecInfo {
} SNonSortExecInfo;
typedef struct STUidTagInfo {
char* name;
uint64_t uid;

View File

@ -40,6 +40,13 @@ typedef enum EGroupAction {
GROUP_ACTION_CLEAR
} EGroupAction;
typedef enum EMergeType {
MERGE_TYPE_SORT = 1,
MERGE_TYPE_NON_SORT,
MERGE_TYPE_COLUMNS,
MERGE_TYPE_MAX_VALUE
} EMergeType;
typedef struct SLogicNode {
ENodeType type;
bool dynamicOp;
@ -221,6 +228,8 @@ typedef struct SMergeLogicNode {
SNodeList* pInputs;
int32_t numOfChannels;
int32_t srcGroupId;
bool colsMerge;
bool needSort;
bool groupSort;
bool ignoreGroupId;
bool inputWithGroupId;
@ -531,6 +540,7 @@ typedef struct SExchangePhysiNode {
typedef struct SMergePhysiNode {
SPhysiNode node;
EMergeType type;
SNodeList* pMergeKeys;
SNodeList* pTargets;
int32_t numOfChannels;

View File

@ -37,7 +37,7 @@ extern "C" {
#define EXPLAIN_TABLE_COUNT_SCAN_FORMAT "Table Count Row Scan on %s"
#define EXPLAIN_PROJECTION_FORMAT "Projection"
#define EXPLAIN_JOIN_FORMAT "%s"
#define EXPLAIN_AGG_FORMAT "Aggragate"
#define EXPLAIN_AGG_FORMAT "%s"
#define EXPLAIN_INDEF_ROWS_FORMAT "Indefinite Rows Function"
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1"
#define EXPLAIN_SORT_FORMAT "Sort"
@ -85,7 +85,8 @@ extern "C" {
#define EXPLAIN_COLUMNS_FORMAT "columns=%d"
#define EXPLAIN_PSEUDO_COLUMNS_FORMAT "pseudo_columns=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_TABLE_SCAN_FORMAT "order=[asc|%d desc|%d]"
#define EXPLAIN_SCAN_ORDER_FORMAT "order=[asc|%d desc|%d]"
#define EXPLAIN_SCAN_MODE_FORMAT "mode=%s"
#define EXPLAIN_GROUPS_FORMAT "groups=%d"
#define EXPLAIN_WIDTH_FORMAT "width=%d"
#define EXPLAIN_INTERVAL_VALUE_FORMAT "interval=%" PRId64 "%c"
@ -105,6 +106,7 @@ extern "C" {
#define EXPLAIN_UID_SLOT_FORMAT "uid_slot=%d,%d"
#define EXPLAIN_SRC_SCAN_FORMAT "src_scan=%d,%d"
#define EXPLAIN_PLAN_BLOCKING "blocking=%d"
#define EXPLAIN_MERGE_MODE_FORMAT "mode=%s"
#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"
@ -157,6 +159,7 @@ typedef struct SExplainCtx {
#define EXPLAIN_ORDER_STRING(_order) ((ORDER_ASC == _order) ? "asc" : ORDER_DESC == _order ? "desc" : "unknown")
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
#define EXPLAIN_MERGE_MODE_STRING(_mode) ((_mode) == MERGE_TYPE_SORT ? "sort" : ((_mode) == MERGE_TYPE_NON_SORT ? "merge" : "column"))
#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u)))

View File

@ -284,10 +284,34 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t
return TSDB_CODE_SUCCESS;
}
static uint8_t getIntervalPrecision(SIntervalPhysiNode *pIntNode) {
static uint8_t qExplainGetIntervalPrecision(SIntervalPhysiNode *pIntNode) {
return ((SColumnNode *)pIntNode->window.pTspk)->node.resType.precision;
}
static char* qExplainGetScanMode(STableScanPhysiNode* pScan) {
bool isGroupByTbname = false;
bool isGroupByTag = false;
bool seq = false;
bool groupOrder = false;
if (pScan->pGroupTags && LIST_LENGTH(pScan->pGroupTags) == 1) {
SNode* p = nodesListGetNode(pScan->pGroupTags, 0);
if (QUERY_NODE_FUNCTION == nodeType(p) && (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0)) {
isGroupByTbname = true;
}
}
isGroupByTag = (NULL != pScan->pGroupTags) && !isGroupByTbname;
if ((((!isGroupByTag) || isGroupByTbname) && pScan->groupSort) || (isGroupByTag && (pScan->groupSort || pScan->scan.groupOrderScan))) {
return "seq_grp_order";
}
if ((isGroupByTbname && (pScan->groupSort || pScan->scan.groupOrderScan)) || (isGroupByTag && (pScan->groupSort || pScan->scan.groupOrderScan))) {
return "grp_order";
}
return "ts_order";
}
int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, int32_t level) {
int32_t tlen = 0;
bool isVerboseLine = false;
@ -360,7 +384,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_TABLE_SCAN_FORMAT, pTblScanNode->scanSeq[0], pTblScanNode->scanSeq[1]);
EXPLAIN_ROW_APPEND(EXPLAIN_SCAN_ORDER_FORMAT, pTblScanNode->scanSeq[0], pTblScanNode->scanSeq[1]);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_SCAN_MODE_FORMAT, qExplainGetScanMode(pTblScanNode));
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
@ -599,7 +625,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT);
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, (pAggNode->pGroupKeys ? "Group" : "Aggragate"));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
@ -841,7 +867,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND_SLIMIT(pIntNode->window.node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
uint8_t precision = getIntervalPrecision(pIntNode);
uint8_t precision = qExplainGetIntervalPrecision(pIntNode);
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT,
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision),
pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision),
@ -893,7 +919,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND_SLIMIT(pIntNode->window.node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
uint8_t precision = getIntervalPrecision(pIntNode);
uint8_t precision = qExplainGetIntervalPrecision(pIntNode);
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT,
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision),
pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision),
@ -1119,23 +1145,13 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pMergeNode->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT, EXPLAIN_ORDER_STRING(pMergeNode->node.outputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_MERGE_MODE_FORMAT, EXPLAIN_MERGE_MODE_STRING(pMergeNode->type));
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (EXPLAIN_MODE_ANALYZE == ctx->mode) {
// sort key
EXPLAIN_ROW_NEW(level + 1, "Merge Key: ");
if (pResNode->pExecInfo) {
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
SOrderByExprNode *ptn = (SOrderByExprNode *)nodesListGetNode(pMergeNode->pMergeKeys, i);
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
}
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
// sort method
EXPLAIN_ROW_NEW(level + 1, "Sort Method: ");
@ -1419,7 +1435,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND_SLIMIT(pIntNode->window.node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
uint8_t precision = getIntervalPrecision(pIntNode);
uint8_t precision = qExplainGetIntervalPrecision(pIntNode);
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT,
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision),
pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision),

View File

@ -239,7 +239,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
numOfDownstream = 2;
} else {
pInfo->downstreamResBlkId[0] = getOperatorResultBlockId(pDownstream[0], 0);
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 1);
pInfo->downstreamResBlkId[1] = getOperatorResultBlockId(pDownstream[1], 0);
}
int32_t numOfCols = 0;

View File

@ -23,52 +23,57 @@ typedef struct SSortMergeInfo {
SArray* pSortInfo;
SSortHandle* pSortHandle;
STupleHandle* prefetchedTuple;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
SSDataBlock* pInputBlock;
SColMatchInfo matchInfo;
} SSortMergeInfo;
typedef struct SNonSortMergeInfo {
int32_t lastSourceIdx;
int32_t sourceWorkIdx;
int32_t sourceNum;
int32_t* pSourceStatus;
} SNonSortMergeInfo;
typedef struct SColumnMergeInfo {
} SColumnMergeInfo;
typedef struct SColsMergeInfo {
uint64_t srcBlkIds[2];
} SColsMergeInfo;
typedef struct SMultiwayMergeOperatorInfo {
SOptrBasicInfo binfo;
EMergeType type;
union {
SSortMergeInfo sortMergeInfo;
SNonSortMergeInfo nsortMergeInfo;
SColumnMergeInfo colMergeInfo;
SColsMergeInfo colsMergeInfo;
};
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SLimitInfo limitInfo;
SColMatchInfo matchInfo;
SSDataBlock* pInputBlock;
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
int64_t startTs; // sort start time
bool groupMerge;
bool ignoreGroupId;
uint64_t groupId;
bool inputWithGroupId;
} SMultiwayMergeOperatorInfo;
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
SSDataBlock* sortMergeloadNextDataBlock(void* param) {
SOperatorInfo* pOperator = (SOperatorInfo*)param;
SSDataBlock* pBlock = pOperator->fpSet.getNextFn(pOperator);
return pBlock;
}
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
int32_t numOfBufPage = pSortMergeInfo->sortBufSize / pSortMergeInfo->bufPageSize;
pInfo->startTs = taosGetTimestampUs();
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pSortMergeInfo->pSortHandle = tsortCreateSortHandle(pSortMergeInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pSortMergeInfo->bufPageSize, numOfBufPage,
pSortMergeInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0);
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupMerge);
tsortSetFetchRawDataFp(pSortMergeInfo->pSortHandle, sortMergeloadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pSortMergeInfo->pSortHandle, pInfo->groupMerge);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SOperatorInfo* pDownstream = pOperator->pDownstream[i];
@ -80,19 +85,280 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
ps->param = pDownstream;
ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps);
tsortAddSource(pSortMergeInfo->pSortHandle, ps);
}
int32_t code = tsortOpen(pInfo->pSortHandle);
return tsortOpen(pSortMergeInfo->pSortHandle);
}
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
SSDataBlock* p, bool* newgroup) {
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
*newgroup = false;
while (1) {
STupleHandle* pTupleHandle = NULL;
if (pInfo->groupMerge || pInfo->inputWithGroupId) {
if (pSortMergeInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pSortMergeInfo->prefetchedTuple;
pSortMergeInfo->prefetchedTuple = NULL;
uint64_t gid = tsortGetGroupId(pTupleHandle);
if (gid != pInfo->groupId) {
*newgroup = true;
pInfo->groupId = gid;
}
}
} else {
pTupleHandle = tsortNextTuple(pHandle);
pInfo->groupId = 0;
}
if (pTupleHandle == NULL) {
break;
}
if (pInfo->groupMerge || pInfo->inputWithGroupId) {
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
p->info.id.groupId = tupleGroupId;
pInfo->groupId = tupleGroupId;
} else {
if (p->info.rows == 0) {
appendOneRowToDataBlock(p, pTupleHandle);
p->info.id.groupId = pInfo->groupId = tupleGroupId;
} else {
pSortMergeInfo->prefetchedTuple = pTupleHandle;
break;
}
}
} else {
appendOneRowToDataBlock(p, pTupleHandle);
}
if (p->info.rows >= capacity) {
break;
}
}
}
SSDataBlock* doSortMerge(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
SSortHandle* pHandle = pSortMergeInfo->pSortHandle;
SSDataBlock* pDataBlock = pInfo->binfo.pRes;
SArray* pColMatchInfo = pInfo->matchInfo.pList;
int32_t capacity = pOperator->resultInfo.capacity;
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
blockDataCleanup(pDataBlock);
if (pSortMergeInfo->pIntermediateBlock == NULL) {
pSortMergeInfo->pIntermediateBlock = tsortGetSortedDataBlock(pHandle);
if (pSortMergeInfo->pIntermediateBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
blockDataEnsureCapacity(pSortMergeInfo->pIntermediateBlock, capacity);
} else {
blockDataCleanup(pSortMergeInfo->pIntermediateBlock);
}
SSDataBlock* p = pSortMergeInfo->pIntermediateBlock;
bool newgroup = false;
while (1) {
doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
if (p->info.rows == 0) {
break;
}
if (newgroup) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
if (p->info.rows > 0) {
break;
}
}
if (p->info.rows > 0) {
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
}
pDataBlock->info.rows = p->info.rows;
pDataBlock->info.scanFlag = p->info.scanFlag;
if (pInfo->ignoreGroupId) {
pDataBlock->info.id.groupId = 0;
} else {
pDataBlock->info.id.groupId = pInfo->groupId;
}
pDataBlock->info.dataLoad = 1;
}
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
*pSortExecInfo = tsortGetSortExecInfo(pSortMergeInfo->pSortHandle);
*pOptrExplain = pSortExecInfo;
*len = sizeof(SSortExecInfo);
return TSDB_CODE_SUCCESS;
}
void destroySortMergeOperatorInfo(void* param) {
SSortMergeInfo* pSortMergeInfo = param;
pSortMergeInfo->pInputBlock = blockDataDestroy(pSortMergeInfo->pInputBlock);
pSortMergeInfo->pIntermediateBlock = blockDataDestroy(pSortMergeInfo->pIntermediateBlock);
tsortDestroySortHandle(pSortMergeInfo->pSortHandle);
taosArrayDestroy(pSortMergeInfo->pSortInfo);
}
#define NON_SORT_NEXT_SRC(_info, _idx) ((++(_idx) >= (_info)->sourceNum) ? ((_info)->sourceWorkIdx) : (_idx))
int32_t openNonSortMergeOperator(SOperatorInfo* pOperator) {
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SNonSortMergeInfo* pNonSortMergeInfo = &pInfo->nsortMergeInfo;
pNonSortMergeInfo->sourceWorkIdx = 0;
pNonSortMergeInfo->sourceNum = pOperator->numOfDownstream;
pNonSortMergeInfo->lastSourceIdx = -1;
pNonSortMergeInfo->pSourceStatus = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pNonSortMergeInfo->pSourceStatus));
if (NULL == pNonSortMergeInfo->pSourceStatus) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
pNonSortMergeInfo->pSourceStatus[i] = i;
}
return TSDB_CODE_SUCCESS;
}
SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
SSDataBlock* pBlock = NULL;
qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
int32_t idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
while (idx < pNonSortMerge->sourceNum) {
pBlock = getNextBlockFromDownstream(pOperator, pNonSortMerge->pSourceStatus[idx]);
if (NULL == pBlock) {
TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
pNonSortMerge->sourceWorkIdx++;
idx = NON_SORT_NEXT_SRC(pNonSortMerge, idx);
continue;
}
break;
}
return pBlock;
}
void destroyNonSortMergeOperatorInfo(void* param) {
SNonSortMergeInfo* pNonSortMerge = param;
taosMemoryFree(pNonSortMerge->pSourceStatus);
}
int32_t getNonSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
return TSDB_CODE_SUCCESS;
}
int32_t openColsMergeOperator(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* doColsMerge(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pBlock = NULL;
qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
for (int32_t i = 0; i < 2; ++i) {
pBlock = getNextBlockFromDownstream(pOperator, i);
if (NULL == pBlock) {
TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
pNonSortMerge->sourceWorkIdx++;
idx = NON_SORT_NEXT_SRC(pNonSortMerge, idx);
continue;
}
break;
}
return pBlock;
}
void destroyColsMergeOperatorInfo(void* param) {
}
int32_t getColsMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
return TSDB_CODE_SUCCESS;
}
SOperatorFpSet gMultiwayMergeFps[MERGE_TYPE_MAX_VALUE] = {
{0},
{._openFn = openSortMergeOperator, .getNextFn = doSortMerge, .closeFn = destroySortMergeOperatorInfo, .getExplainFn = getSortMergeExplainExecInfo},
{._openFn = openNonSortMergeOperator, .getNextFn = doNonSortMerge, .closeFn = destroyNonSortMergeOperatorInfo, .getExplainFn = getNonSortMergeExplainExecInfo},
{._openFn = openColsMergeOperator, .getNextFn = doColsMerge, .closeFn = destroyColsMergeOperatorInfo, .getExplainFn = getColsMergeExplainExecInfo},
};
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
int32_t code = 0;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
int64_t startTs = taosGetTimestampUs();
if (NULL != gMultiwayMergeFps[pInfo->type]._openFn) {
code = (*gMultiwayMergeFps[pInfo->type]._openFn)(pOperator);
}
pOperator->cost.openCost = (taosGetTimestampUs() - startTs) / 1000.0;
pOperator->status = OP_RES_TO_RETURN;
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
pOperator->status = OP_RES_TO_RETURN;
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
return code;
}
SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
@ -100,7 +366,8 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = NULL;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
@ -108,8 +375,9 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
if (NULL != gMultiwayMergeFps[pInfo->type].getNextFn) {
pBlock = (*gMultiwayMergeFps[pInfo->type].getNextFn)(pOperator);
}
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
@ -122,26 +390,24 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
void destroyMultiwayMergeOperatorInfo(void* param) {
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param;
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
pInfo->pIntermediateBlock = blockDataDestroy(pInfo->pIntermediateBlock);
tsortDestroySortHandle(pInfo->pSortHandle);
taosArrayDestroy(pInfo->pSortInfo);
taosArrayDestroy(pInfo->matchInfo.pList);
if (NULL != gMultiwayMergeFps[pInfo->type].closeFn) {
(*gMultiwayMergeFps[pInfo->type].closeFn)(&pInfo->sortMergeInfo);
}
taosMemoryFreeClear(param);
}
int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
int32_t code = 0;
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
*pSortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
*pOptrExplain = pSortExecInfo;
if (NULL != gMultiwayMergeFps[pInfo->type].getExplainFn) {
code = (*gMultiwayMergeFps[pInfo->type].getExplainFn)(pOptr, pOptrExplain, len);
}
*len = sizeof(SSortExecInfo);
return TSDB_CODE_SUCCESS;
return code;
}
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams,
@ -158,34 +424,59 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
goto _error;
}
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
int32_t numOfOutputCols = 0;
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
&pInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 1024);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
pInfo->groupMerge = pMergePhyNode->groupSort;
pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId;
pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
pInfo->pInputBlock = pInputBlock;
size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;
pInfo->type = pMergePhyNode->type;
switch (pInfo->type) {
case MERGE_TYPE_SORT: {
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
initResultSizeInfo(&pOperator->resultInfo, 1024);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
int32_t rowSize = pInfo->binfo.pRes->info.rowSize;
int32_t numOfOutputCols = 0;
pSortMergeInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
pSortMergeInfo->bufPageSize = getProperSortPageSize(rowSize, numOfCols);
pSortMergeInfo->sortBufSize = pSortMergeInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
pSortMergeInfo->pInputBlock = pInputBlock;
code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID,
&pSortMergeInfo->matchInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
break;
}
case MERGE_TYPE_NON_SORT: {
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
break;
}
case MERGE_TYPE_COLUMNS: {
SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
initResultSizeInfo(&pOperator->resultInfo, 1);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
pColsMerge->srcBlkIds[0] = getOperatorResultBlockId(downStreams[0], 0);
pColsMerge->srcBlkIds[1] = getOperatorResultBlockId(downStreams[1], 0);
break;
}
default:
qError("Invalid merge type: %d", pInfo->type);
code = TSDB_CODE_INVALID_PARA;
goto _error;
}
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL);

View File

@ -675,199 +675,5 @@ _error:
return NULL;
}
int32_t openSortMergeOperator(SOperatorInfo* pOperator) {
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (OPTR_IS_OPENED(pOperator)) {
return TSDB_CODE_SUCCESS;
}
pInfo->startTs = taosGetTimestampUs();
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupMerge);
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
SOperatorInfo* pDownstream = pOperator->pDownstream[i];
if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
pDownstream->fpSet._openFn(pDownstream);
}
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
ps->param = pDownstream;
ps->onlyRef = true;
tsortAddSource(pInfo->pSortHandle, ps);
}
int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
pOperator->status = OP_RES_TO_RETURN;
OPTR_SET_OPENED(pOperator);
return TSDB_CODE_SUCCESS;
}
static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* pHandle, int32_t capacity,
SSDataBlock* p, bool* newgroup) {
*newgroup = false;
while (1) {
STupleHandle* pTupleHandle = NULL;
if (pInfo->groupMerge || pInfo->inputWithGroupId) {
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
pInfo->prefetchedTuple = NULL;
uint64_t gid = tsortGetGroupId(pTupleHandle);
if (gid != pInfo->groupId) {
*newgroup = true;
pInfo->groupId = gid;
}
}
} else {
pTupleHandle = tsortNextTuple(pHandle);
pInfo->groupId = 0;
}
if (pTupleHandle == NULL) {
break;
}
if (pInfo->groupMerge || pInfo->inputWithGroupId) {
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
appendOneRowToDataBlock(p, pTupleHandle);
p->info.id.groupId = tupleGroupId;
pInfo->groupId = tupleGroupId;
} else {
if (p->info.rows == 0) {
appendOneRowToDataBlock(p, pTupleHandle);
p->info.id.groupId = pInfo->groupId = tupleGroupId;
} else {
pInfo->prefetchedTuple = pTupleHandle;
break;
}
}
} else {
appendOneRowToDataBlock(p, pTupleHandle);
}
if (p->info.rows >= capacity) {
break;
}
}
}
SSDataBlock* getSortMergeSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo,
SOperatorInfo* pOperator) {
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
int32_t capacity = pOperator->resultInfo.capacity;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
blockDataCleanup(pDataBlock);
if (pInfo->pIntermediateBlock == NULL) {
pInfo->pIntermediateBlock = tsortGetSortedDataBlock(pHandle);
if (pInfo->pIntermediateBlock == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
blockDataEnsureCapacity(pInfo->pIntermediateBlock, capacity);
} else {
blockDataCleanup(pInfo->pIntermediateBlock);
}
SSDataBlock* p = pInfo->pIntermediateBlock;
bool newgroup = false;
while (1) {
doGetSortedBlockData(pInfo, pHandle, capacity, p, &newgroup);
if (p->info.rows == 0) {
break;
}
if (newgroup) {
resetLimitInfoForNextGroup(&pInfo->limitInfo);
}
applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo);
if (p->info.rows > 0) {
break;
}
}
if (p->info.rows > 0) {
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i);
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId);
colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info);
}
pDataBlock->info.rows = p->info.rows;
pDataBlock->info.scanFlag = p->info.scanFlag;
if (pInfo->ignoreGroupId) {
pDataBlock->info.id.groupId = 0;
} else {
pDataBlock->info.id.groupId = pInfo->groupId;
}
pDataBlock->info.dataLoad = 1;
}
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
}
SSDataBlock* doSortMerge(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
qDebug("start to merge final sorted rows, %s", GET_TASKID(pTaskInfo));
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator);
if (pBlock != NULL) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
} else {
setOperatorCompleted(pOperator);
}
return pBlock;
}
int32_t getSortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) {
SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo));
SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info;
*pSortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle);
*pOptrExplain = pSortExecInfo;
*len = sizeof(SSortExecInfo);
return TSDB_CODE_SUCCESS;
}

View File

@ -488,6 +488,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
CLONE_NODE_LIST_FIELD(pInputs);
COPY_SCALAR_FIELD(numOfChannels);
COPY_SCALAR_FIELD(srcGroupId);
COPY_SCALAR_FIELD(needSort);
COPY_SCALAR_FIELD(groupSort);
COPY_SCALAR_FIELD(ignoreGroupId);
COPY_SCALAR_FIELD(inputWithGroupId);

View File

@ -2272,6 +2272,7 @@ static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID";
static const char* jkMergePhysiPlanInputWithGroupId = "InputWithGroupId";
static const char* jkMergePhysiPlanType = "Type";
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
@ -2298,6 +2299,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanInputWithGroupId, pNode->inputWithGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkMergePhysiPlanType, pNode->type);
}
return code;
}
@ -2324,6 +2328,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkMergePhysiPlanIgnoreGroupID, &pNode->ignoreGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkMergePhysiPlanType, (int32_t*)&pNode->type);
}
return code;
}

View File

@ -2683,6 +2683,7 @@ enum {
PHY_MERGE_CODE_GROUP_SORT,
PHY_MERGE_CODE_IGNORE_GROUP_ID,
PHY_MERGE_CODE_INPUT_WITH_GROUP_ID,
PHY_MERGE_CODE_TYPE,
};
static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2710,6 +2711,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, pNode->inputWithGroupId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_MERGE_CODE_TYPE, pNode->type);
}
return code;
}
@ -2745,6 +2749,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_MERGE_CODE_INPUT_WITH_GROUP_ID:
code = tlvDecodeBool(pTlv, &pNode->inputWithGroupId);
break;
case PHY_MERGE_CODE_TYPE:
code = tlvDecodeI32(pTlv, (int32_t*)&pNode->type);
break;
default:
break;
}

View File

@ -43,6 +43,7 @@ int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan);
bool getBatchScanOptionFromHint(SNodeList* pList);
bool getSortForGroupOptHint(SNodeList* pList);

View File

@ -1950,41 +1950,57 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
return nodesListMakeStrictAppend(&pMerge->node.pChildren, (SNode*)pExchange);
}
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SMergeLogicNode* pMergeLogicNode, SPhysiNode** pPhyNode) {
int32_t code = TSDB_CODE_SUCCESS;
SMergePhysiNode* pMerge =
(SMergePhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pMergeLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pMergeLogicNode->colsMerge) {
pMerge->type = MERGE_TYPE_COLUMNS;
} else if (pMergeLogicNode->needSort) {
pMerge->type = MERGE_TYPE_SORT;
} else {
pMerge->type = MERGE_TYPE_NON_SORT;
}
pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
pMerge->groupSort = pMergeLogicNode->groupSort;
pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId;
int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
if (!pMergeLogicNode->colsMerge) {
code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
if (TSDB_CODE_SUCCESS == code) {
for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
code = createExchangePhysiNodeByMerge(pMerge);
if (TSDB_CODE_SUCCESS != code) {
break;
if (TSDB_CODE_SUCCESS == code) {
for (int32_t i = 0; i < pMerge->numOfChannels; ++i) {
code = createExchangePhysiNodeByMerge(pMerge);
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
&pMerge->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code && NULL != pMergeLogicNode->pMergeKeys) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->pMergeKeys,
&pMerge->pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
&pMerge->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pMerge->node.pOutputDataBlockDesc->dataBlockId, -1, pMergeLogicNode->node.pTargets,
&pMerge->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pMerge->pTargets, pMerge->node.pOutputDataBlockDesc);
}
} else {
SDataBlockDescNode* pLeftDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc;
SDataBlockDescNode* pRightDesc = ((SPhysiNode*)nodesListGetNode(pChildren, 1))->pOutputDataBlockDesc;
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pMergeLogicNode->node.pTargets, &pMerge->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
@ -2022,7 +2038,7 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC:
return createInterpFuncPhysiNode(pCxt, pChildren, (SInterpFuncLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_MERGE:
return createMergePhysiNode(pCxt, (SMergeLogicNode*)pLogicNode, pPhyNode);
return createMergePhysiNode(pCxt, pChildren, (SMergeLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_GROUP_CACHE:
return createGroupCachePhysiNode(pCxt, pChildren, (SGroupCacheLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_DYN_QUERY_CTRL:

View File

@ -248,8 +248,6 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
}
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)) {
return true;
} else if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
return stbSplHasMultiTbScan(streamQuery, (SLogicNode*)pChild);
}
return false;
}
@ -540,11 +538,12 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
}
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort) {
SNodeList* pMergeKeys, SLogicNode* pPartChild, bool groupSort, bool needSort) {
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMerge->needSort = needSort;
pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
pMerge->srcGroupId = pCxt->groupId;
pMerge->node.precision = pPartChild->precision;
@ -621,7 +620,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk,
((SWindowLogicNode*)pInfo->pSplitNode)->node.outputTsOrder, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true);
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true, true);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(pMergeKeys);
@ -712,7 +711,7 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl
((SWindowLogicNode*)pWindow)->node.inputTsOrder, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true);
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true, true);
}
if (TSDB_CODE_SUCCESS == code) {
@ -982,7 +981,7 @@ static int32_t stbSplAggNodeCreateMerge(SSplitContext* pCtx, SStableSplitInfo* p
}
}
}
code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort);
code = stbSplCreateMergeNode(pCtx, NULL, pInfo->pSplitNode, pMergeKeys, pChildAgg, groupSort, true);
if (TSDB_CODE_SUCCESS == code && sortForGroup) {
SMergeLogicNode* pMerge =
(SMergeLogicNode*)nodesListGetNode(pInfo->pSplitNode->pChildren, LIST_LENGTH(pInfo->pSplitNode->pChildren) - 1);
@ -1145,7 +1144,7 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
bool groupSort = ((SSortLogicNode*)pInfo->pSplitNode)->groupSort;
int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort);
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pPartSort, groupSort, true);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pInfo->pSplitNode);
@ -1195,7 +1194,7 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit
SLogicNode* pSplitNode = NULL;
int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true);
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true, pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
@ -1269,7 +1268,7 @@ static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSub
((SLimitNode*)pMergeScan->pLimit)->limit += ((SLimitNode*)pMergeScan->pLimit)->offset;
((SLimitNode*)pMergeScan->pLimit)->offset = 0;
}
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort);
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort, true);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode((SNode*)pScan);
@ -1340,12 +1339,14 @@ static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeLis
static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
bool needSort = false;
SNodeList* pMergeKeys = NULL;
if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) {
needSort = true;
code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys);
}
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true);
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true, needSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,

View File

@ -0,0 +1,161 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planInt.h"
#include "catalog.h"
#include "functionMgt.h"
#include "systable.h"
#include "tglobal.h"
typedef struct SValidatePlanContext {
SPlanContext* pPlanCxt;
int32_t errCode;
} SValidatePlanContext;
int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode);
int32_t validateMergePhysiNode(SValidatePlanContext* pCxt, SMergePhysiNode* pMerge) {
if ((NULL != pMerge->node.pLimit || NULL != pMerge->node.pSlimit) && pMerge->type == MERGE_TYPE_NON_SORT) {
planError("no limit&slimit supported for non sort merge");
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t validateSubplanNode(SValidatePlanContext* pCxt, SSubplan* pSubPlan) {
if (SUBPLAN_TYPE_MODIFY == pSubPlan->subplanType) {
return TSDB_CODE_SUCCESS;
}
return doValidatePhysiNode(pCxt, (SNode*)pSubPlan->pNode);
}
int32_t validateQueryPlanNode(SValidatePlanContext* pCxt, SQueryPlan* pPlan) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
FOREACH(pNode, pPlan->pSubplans) {
if (QUERY_NODE_NODE_LIST != nodeType(pNode)) {
code = TSDB_CODE_PLAN_INTERNAL_ERROR;
break;
}
SNode* pSubNode = NULL;
SNodeListNode* pSubplans = (SNodeListNode*)pNode;
FOREACH(pSubNode, pSubplans->pNodeList) {
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pNode)) {
code = TSDB_CODE_PLAN_INTERNAL_ERROR;
break;
}
code = doValidatePhysiNode(pCxt, pSubNode);
if (code) {
break;
}
}
}
return code;
}
int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode) {
switch (nodeType(pNode)) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG:
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
return validateMergePhysiNode(pCxt, (SMergePhysiNode*)pNode);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT:
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_FILL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
case QUERY_NODE_PHYSICAL_PLAN_DELETE:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN:
case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:
case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL:
break;
case QUERY_NODE_PHYSICAL_SUBPLAN:
return validateSubplanNode(pCxt, (SSubplan*)pNode);
case QUERY_NODE_PHYSICAL_PLAN:
return validateQueryPlanNode(pCxt, (SQueryPlan *)pNode);
default:
break;
}
return TSDB_CODE_SUCCESS;
}
static void destoryValidatePlanContext(SValidatePlanContext* pCxt) {
}
int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan) {
SValidatePlanContext cxt = {.pPlanCxt = pCxt,
.errCode = TSDB_CODE_SUCCESS
};
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
FOREACH(pNode, pPlan->pSubplans) {
if (QUERY_NODE_NODE_LIST != nodeType(pNode)) {
code = TSDB_CODE_PLAN_INTERNAL_ERROR;
break;
}
SNode* pSubNode = NULL;
SNodeListNode* pSubplans = (SNodeListNode*)pNode;
FOREACH(pSubNode, pSubplans->pNodeList) {
code = doValidatePhysiNode(&cxt, pSubNode);
if (code) {
break;
}
}
}
destoryValidatePlanContext(&cxt);
return code;
}

View File

@ -57,6 +57,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if (TSDB_CODE_SUCCESS == code) {
code = createPhysiPlan(pCxt, pLogicPlan, pPlan, pExecNodeList);
}
if (TSDB_CODE_SUCCESS == code) {
code = validateQueryPlan(pCxt, *pPlan);
}
if (TSDB_CODE_SUCCESS == code) {
dumpQueryPlan(*pPlan);
}