fix: count wrong group number issue
This commit is contained in:
parent
2a1f9a91c1
commit
a1eefd25ea
|
@ -185,6 +185,7 @@ typedef struct SMergeLogicNode {
|
|||
int32_t numOfChannels;
|
||||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
} SMergeLogicNode;
|
||||
|
||||
typedef enum EWindowType {
|
||||
|
@ -444,6 +445,7 @@ typedef struct SMergePhysiNode {
|
|||
int32_t numOfChannels;
|
||||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
} SMergePhysiNode;
|
||||
|
||||
typedef struct SWinodwPhysiNode {
|
||||
|
|
|
@ -1128,6 +1128,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_IGNORE_GROUPID_FORMAT, pMergeNode->ignoreGroupId ? "true" : "false");
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT);
|
||||
if (pMergeNode->groupSort) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, "_group_id asc");
|
||||
|
|
|
@ -545,6 +545,7 @@ typedef struct SMultiwayMergeOperatorInfo {
|
|||
SSDataBlock* pIntermediateBlock; // to hold the intermediate result
|
||||
int64_t startTs; // sort start time
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
} SMultiwayMergeOperatorInfo;
|
||||
|
@ -694,7 +695,11 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
|
|||
}
|
||||
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.id.groupId = pInfo->groupId;
|
||||
if (pInfo->ignoreGroupId) {
|
||||
pDataBlock->info.id.groupId = 0;
|
||||
} else {
|
||||
pDataBlock->info.id.groupId = pInfo->groupId;
|
||||
}
|
||||
pDataBlock->info.dataLoad = 1;
|
||||
}
|
||||
|
||||
|
@ -785,6 +790,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pInfo->groupSort = pMergePhyNode->groupSort;
|
||||
pInfo->ignoreGroupId = pMergePhyNode->ignoreGroupId;
|
||||
pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys);
|
||||
pInfo->pInputBlock = pInputBlock;
|
||||
size_t numOfCols = taosArrayGetSize(pInfo->binfo.pRes->pDataBlock);
|
||||
|
|
|
@ -455,6 +455,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
|
|||
COPY_SCALAR_FIELD(numOfChannels);
|
||||
COPY_SCALAR_FIELD(srcGroupId);
|
||||
COPY_SCALAR_FIELD(groupSort);
|
||||
COPY_SCALAR_FIELD(ignoreGroupId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -2027,6 +2027,7 @@ static const char* jkMergePhysiPlanTargets = "Targets";
|
|||
static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
|
||||
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
|
||||
static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID";
|
||||
|
||||
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
|
||||
|
@ -2047,6 +2048,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanGroupSort, pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanIgnoreGroupID, pNode->ignoreGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2070,6 +2074,9 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkMergePhysiPlanGroupSort, &pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkMergePhysiPlanIgnoreGroupID, &pNode->ignoreGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2512,7 +2512,8 @@ enum {
|
|||
PHY_MERGE_CODE_TARGETS,
|
||||
PHY_MERGE_CODE_NUM_OF_CHANNELS,
|
||||
PHY_MERGE_CODE_SRC_GROUP_ID,
|
||||
PHY_MERGE_CODE_GROUP_SORT
|
||||
PHY_MERGE_CODE_GROUP_SORT,
|
||||
PHY_MERGE_CODE_IGNORE_GROUP_ID,
|
||||
};
|
||||
|
||||
static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -2534,6 +2535,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_GROUP_SORT, pNode->groupSort);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2563,6 +2567,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_MERGE_CODE_GROUP_SORT:
|
||||
code = tlvDecodeBool(pTlv, &pNode->groupSort);
|
||||
break;
|
||||
case PHY_MERGE_CODE_IGNORE_GROUP_ID:
|
||||
code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -1559,6 +1559,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
|
|||
pMerge->numOfChannels = pMergeLogicNode->numOfChannels;
|
||||
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
|
||||
pMerge->groupSort = pMergeLogicNode->groupSort;
|
||||
pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
|
||||
|
||||
int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
|
||||
|
||||
|
|
|
@ -538,7 +538,8 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
|
|||
switch (nodeType(pNode)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_PROJECT: {
|
||||
SProjectLogicNode *pLogicNode = (SProjectLogicNode*)pNode;
|
||||
if (pMerge->node.pLimit || pMerge->node.pSlimit) {
|
||||
if (pLogicNode->ignoreGroupId && (pMerge->node.pLimit || pMerge->node.pSlimit)) {
|
||||
pMerge->ignoreGroupId = true;
|
||||
pLogicNode->ignoreGroupId = false;
|
||||
}
|
||||
break;
|
||||
|
|
|
@ -36,4 +36,34 @@ if $rows != 0 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql insert into tb0 values (now, 0);
|
||||
sql insert into tb1 values (now, 1);
|
||||
sql insert into tb2 values (now, 2);
|
||||
sql insert into tb3 values (now, 3);
|
||||
sql insert into tb4 values (now, 4);
|
||||
sql insert into tb5 values (now, 5);
|
||||
sql insert into tb6 values (now, 6);
|
||||
sql insert into tb7 values (now, 7);
|
||||
|
||||
sql select * from (select 1 from $mt1 where ts is not null partition by tbname limit 1);
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from (select ts from $mt1 where ts is not null partition by tbname slimit 2);
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from (select ts from $mt1 where ts is not null partition by tbname limit 2);
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue