use non sort merge
This commit is contained in:
parent
45300c1eae
commit
14c6dbd2af
|
@ -269,6 +269,7 @@ SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) {
|
||||||
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
SMultiwayMergeOperatorInfo* pInfo = pOperator->info;
|
||||||
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
|
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
|
qDebug("start to merge no sorted rows, %s", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
|
@ -278,13 +279,19 @@ SSDataBlock* doNonSortMerge(SOperatorInfo* pOperator) {
|
||||||
if (NULL == pBlock) {
|
if (NULL == pBlock) {
|
||||||
TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
|
TSWAP(pNonSortMerge->pSourceStatus[pNonSortMerge->sourceWorkIdx], pNonSortMerge->pSourceStatus[idx]);
|
||||||
pNonSortMerge->sourceWorkIdx++;
|
pNonSortMerge->sourceWorkIdx++;
|
||||||
idx = NON_SORT_NEXT_SRC(pNonSortMerge, idx);
|
idx = NON_SORT_NEXT_SRC(pNonSortMerge, pNonSortMerge->lastSourceIdx);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pBlock;
|
if (!pBlock) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
blockDataCleanup(pRes);
|
||||||
|
copyDataBlock(pRes, pBlock);
|
||||||
|
|
||||||
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyNonSortMergeOperatorInfo(void* param) {
|
void destroyNonSortMergeOperatorInfo(void* param) {
|
||||||
|
@ -491,6 +498,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
||||||
}
|
}
|
||||||
case MERGE_TYPE_NON_SORT: {
|
case MERGE_TYPE_NON_SORT: {
|
||||||
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
|
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
|
||||||
|
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||||
|
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||||
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MERGE_TYPE_COLUMNS: {
|
case MERGE_TYPE_COLUMNS: {
|
||||||
|
|
|
@ -28,6 +28,7 @@ typedef struct SProjectOperatorInfo {
|
||||||
bool mergeDataBlocks;
|
bool mergeDataBlocks;
|
||||||
SSDataBlock* pFinalRes;
|
SSDataBlock* pFinalRes;
|
||||||
bool inputIgnoreGroup;
|
bool inputIgnoreGroup;
|
||||||
|
bool outputIgnoreGroup;
|
||||||
} SProjectOperatorInfo;
|
} SProjectOperatorInfo;
|
||||||
|
|
||||||
typedef struct SIndefOperatorInfo {
|
typedef struct SIndefOperatorInfo {
|
||||||
|
@ -111,6 +112,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
|
pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder;
|
||||||
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
|
pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder;
|
||||||
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
|
pInfo->inputIgnoreGroup = pProjPhyNode->inputIgnoreGroup;
|
||||||
|
pInfo->outputIgnoreGroup = pProjPhyNode->ignoreGroupId;
|
||||||
|
|
||||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
|
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
|
||||||
pInfo->mergeDataBlocks = false;
|
pInfo->mergeDataBlocks = false;
|
||||||
|
@ -276,6 +278,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pProjectInfo->outputIgnoreGroup) {
|
||||||
|
pRes->info.id.groupId = 0;
|
||||||
|
}
|
||||||
|
|
||||||
return (pRes->info.rows > 0) ? pRes : NULL;
|
return (pRes->info.rows > 0) ? pRes : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,6 +391,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pProjectInfo->outputIgnoreGroup) {
|
||||||
|
p->info.id.groupId = 0;
|
||||||
|
}
|
||||||
|
|
||||||
return (p->info.rows > 0) ? p : NULL;
|
return (p->info.rows > 0) ? p : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1320,7 +1320,11 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit
|
||||||
SLogicNode* pSplitNode = NULL;
|
SLogicNode* pSplitNode = NULL;
|
||||||
int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
|
int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true, true);
|
bool needSort = true;
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pSplitNode)) {
|
||||||
|
needSort = !((SProjectLogicNode*)pSplitNode)->ignoreGroupId;
|
||||||
|
}
|
||||||
|
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, needSort, needSort);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||||
|
|
|
@ -226,4 +226,135 @@ print =============== clear
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
print ================= step12
|
||||||
|
|
||||||
|
sql create database test2 vgroups 4;
|
||||||
|
sql use test2;
|
||||||
|
sql create stable stb (ts timestamp, c1 int) tags (t1 int);
|
||||||
|
sql create table t1 using stb tags (1);
|
||||||
|
sql create table t2 using stb tags (2);
|
||||||
|
sql create table t3 using stb tags (3);
|
||||||
|
sql create table t4 using stb tags (4);
|
||||||
|
sql create table t5 using stb tags (4);
|
||||||
|
sql create table t6 using stb tags (4);
|
||||||
|
sql insert into t1 values ("2024-03-01 14:29:07.051", 11);
|
||||||
|
sql insert into t2 values ("2024-03-01 14:29:07.051", 21);
|
||||||
|
sql insert into t3 values ("2024-03-01 14:29:07.051", 31);
|
||||||
|
sql insert into t4 values ("2024-03-01 14:29:07.051", 41);
|
||||||
|
sql insert into t5 values ("2024-03-01 14:29:07.051", 51);
|
||||||
|
sql insert into t6 values ("2024-03-01 14:29:07.051", 61);
|
||||||
|
sql insert into t1 values ("2024-03-01 14:30:07.051", 12);
|
||||||
|
sql insert into t2 values ("2024-03-01 14:30:07.051", 22);
|
||||||
|
sql insert into t3 values ("2024-03-01 14:30:07.051", 32);
|
||||||
|
sql insert into t4 values ("2024-03-01 14:30:07.051", 42);
|
||||||
|
sql insert into t5 values ("2024-03-01 14:30:07.051", 52);
|
||||||
|
sql insert into t6 values ("2024-03-01 14:30:07.051", 62);
|
||||||
|
sql insert into t1 values ("2024-03-01 14:31:07.051", 13);
|
||||||
|
sql insert into t2 values ("2024-03-01 14:31:07.051", 23);
|
||||||
|
sql insert into t3 values ("2024-03-01 14:31:07.051", 33);
|
||||||
|
sql insert into t4 values ("2024-03-01 14:31:07.051", 43);
|
||||||
|
sql insert into t5 values ("2024-03-01 14:31:07.051", 53);
|
||||||
|
sql insert into t6 values ("2024-03-01 14:31:07.051", 63);
|
||||||
|
sql insert into t1 values ("2024-03-01 14:32:07.051", 14);
|
||||||
|
sql insert into t2 values ("2024-03-01 14:32:07.051", 24);
|
||||||
|
sql insert into t3 values ("2024-03-01 14:32:07.051", 34);
|
||||||
|
sql insert into t4 values ("2024-03-01 14:32:07.051", 44);
|
||||||
|
sql insert into t5 values ("2024-03-01 14:32:07.051", 54);
|
||||||
|
sql insert into t6 values ("2024-03-01 14:32:07.051", 64);
|
||||||
|
sql insert into t1 values ("2024-03-01 14:33:07.051", 15);
|
||||||
|
sql insert into t2 values ("2024-03-01 14:33:07.051", 25);
|
||||||
|
sql insert into t3 values ("2024-03-01 14:33:07.051", 35);
|
||||||
|
sql insert into t4 values ("2024-03-01 14:33:07.051", 45);
|
||||||
|
sql insert into t5 values ("2024-03-01 14:33:07.051", 55);
|
||||||
|
sql insert into t6 values ("2024-03-01 14:33:07.051", 65);
|
||||||
|
sql insert into t1 values ("2024-03-01 14:34:07.051", 16);
|
||||||
|
sql insert into t2 values ("2024-03-01 14:34:07.051", 26);
|
||||||
|
sql insert into t3 values ("2024-03-01 14:34:07.051", 36);
|
||||||
|
sql insert into t4 values ("2024-03-01 14:34:07.051", 46);
|
||||||
|
sql insert into t5 values ("2024-03-01 14:34:07.051", 56);
|
||||||
|
sql insert into t6 values ("2024-03-01 14:34:07.051", 66);
|
||||||
|
|
||||||
|
sleep 300
|
||||||
|
|
||||||
|
sql select _wstart, count(*) from (select * from stb partition by tbname) interval(2s);
|
||||||
|
|
||||||
|
print $data00,$data01
|
||||||
|
print $data10,$data11
|
||||||
|
print $data20,$data21
|
||||||
|
print $data30,$data31
|
||||||
|
print $data40,$data41
|
||||||
|
print $data50,$data51
|
||||||
|
print $data60,$data61
|
||||||
|
print $data70,$data71
|
||||||
|
|
||||||
|
if $rows != 6 then
|
||||||
|
print $rows
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 6 then
|
||||||
|
print $data01
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 6 then
|
||||||
|
print $data11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != 6 then
|
||||||
|
print $data21
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data31 != 6 then
|
||||||
|
print $data31
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data41 != 6 then
|
||||||
|
print $data41
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data51 != 6 then
|
||||||
|
print $data51
|
||||||
|
endi
|
||||||
|
|
||||||
|
|
||||||
|
sql select _wstart, count(*) from (select * from stb partition by tbname slimit 2) interval(2s);
|
||||||
|
|
||||||
|
print $data00,$data01
|
||||||
|
print $data10,$data11
|
||||||
|
print $data20,$data21
|
||||||
|
print $data30,$data31
|
||||||
|
print $data40,$data41
|
||||||
|
print $data50,$data51
|
||||||
|
print $data60,$data61
|
||||||
|
print $data70,$data71
|
||||||
|
|
||||||
|
if $rows != 6 then
|
||||||
|
print $rows then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data01 != 2 then
|
||||||
|
print $data01
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data11 != 2 then
|
||||||
|
print $data11
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data21 != 2 then
|
||||||
|
print $data21
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data31 != 2 then
|
||||||
|
print $data31
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data41 != 2 then
|
||||||
|
print $data41
|
||||||
|
endi
|
||||||
|
|
||||||
|
if $data51 != 2 then
|
||||||
|
print $data51
|
||||||
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue