From e1d5971e39128d9f4824166158e04b99d19001c1 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Wed, 27 Jul 2022 16:11:32 +0800 Subject: [PATCH 1/9] feat: add multi-rows merge join --- source/libs/executor/src/joinoperator.c | 33 +++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 7b3c590f07..7f7bd4a3ef 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -146,6 +146,39 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* } } +typedef struct SRowLocation { + SSDataBlock* pDataBlock; + int32_t pos; +} SRowLocation; + +static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, + SArray* pPosArray) { + int32_t numRows = pBlock->info.rows; + ASSERT(startPos < numRows); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + int32_t i = startPos; + char* pVal = colDataGetData(pCol, i); + for (i = startPos + 1; i < numRows; ++i) { + char* pNextVal = colDataGetData(pCol, i); + if (*(int64_t*)pVal != *(int64_t*)pNextVal) { + break; + } + } + int32_t endPos = i; + + SSDataBlock* block = pBlock; + if (endPos - startPos > 1) { + block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); + } + SRowLocation location = {0}; + for (int32_t j = startPos; j < endPos; ++j) { + location.pDataBlock = block; + location.pos = j; + taosArrayPush(pPosArray, &location); + } + return 0; +} static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { SJoinOperatorInfo* pJoinInfo = pOperator->info; From 6cb92ef6eeef03e50811afbc3f5c17da651f0379 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Wed, 27 Jul 2022 17:09:24 +0800 Subject: [PATCH 2/9] fix: support multi-rows with same ts for join operator --- source/libs/executor/src/joinoperator.c | 30 ++++++++++++++----------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 7f7bd4a3ef..ec96a21f47 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { } static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, - SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) { + SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, + int32_t rightPos) { SJoinOperatorInfo* pJoinInfo = pOperator->info; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { @@ -144,24 +145,22 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* colDataAppend(pDst, currRow, p, false); } } - } typedef struct SRowLocation { - SSDataBlock* pDataBlock; - int32_t pos; + SSDataBlock* pDataBlock; + int32_t pos; } SRowLocation; -static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, - SArray* pPosArray) { +static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, int64_t timestamp, + SArray* pPosArray) { int32_t numRows = pBlock->info.rows; ASSERT(startPos < numRows); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); int32_t i = startPos; - char* pVal = colDataGetData(pCol, i); - for (i = startPos + 1; i < numRows; ++i) { + for (; i < numRows; ++i) { char* pNextVal = colDataGetData(pCol, i); - if (*(int64_t*)pVal != *(int64_t*)pNextVal) { + if (timestamp != *(int64_t*)pNextVal) { break; } } @@ -171,7 +170,7 @@ static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slot if (endPos - startPos > 1) { block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); } - SRowLocation location = {0}; + SRowLocation location = {0}; for (int32_t j = startPos; j < endPos; ++j) { location.pDataBlock = block; location.pos = j; @@ -180,6 +179,11 @@ static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slot return 0; } +static int32_t mergeJoinGetRowsEqualTimeStamp(SJoinOperatorInfo* pJoinInfo, SArray* pPosArray) { + + return 0; +} + static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { SJoinOperatorInfo* pJoinInfo = pOperator->info; @@ -228,14 +232,14 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) while (1) { int64_t leftTs = 0; int64_t rightTs = 0; - bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); + bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); if (!hasNextTs) { break; } if (leftTs == rightTs) { - mergeJoinJoinLeftRight(pOperator, pRes, nrows, - pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); + mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, + pJoinInfo->rightPos); pJoinInfo->leftPos += 1; pJoinInfo->rightPos += 1; From c061cd2fe28911133c3e31b090c0f735fe098aff Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 27 Jul 2022 21:42:35 +0800 Subject: [PATCH 3/9] fix: multi row same ts join --- source/libs/executor/src/joinoperator.c | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index ec96a21f47..db39f2e6fa 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -151,11 +151,11 @@ typedef struct SRowLocation { int32_t pos; } SRowLocation; -static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, int64_t timestamp, - SArray* pPosArray) { +static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, + int32_t* pEndPos, SArray* pRowLocations, SArray* createdBlocks) { int32_t numRows = pBlock->info.rows; ASSERT(startPos < numRows); - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); int32_t i = startPos; for (; i < numRows; ++i) { @@ -165,16 +165,18 @@ static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t slotId, } } int32_t endPos = i; + *pEndPos = endPos; SSDataBlock* block = pBlock; - if (endPos - startPos > 1) { + if (endPos == numRows) { block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); + taosArrayPush(createdBlocks, &block); } SRowLocation location = {0}; for (int32_t j = startPos; j < endPos; ++j) { location.pDataBlock = block; location.pos = j; - taosArrayPush(pPosArray, &location); + taosArrayPush(pRowLocations, &location); } return 0; } From c962e9b8fd08e09a8ce54ca8a9d1cabe1021d12b Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 28 Jul 2022 09:58:16 +0800 Subject: [PATCH 4/9] feat: add support for join operator when multiple rows with same ts --- source/libs/executor/src/joinoperator.c | 100 +++++++++++++++++++++--- 1 file changed, 90 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index db39f2e6fa..8902804fab 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -130,7 +130,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* int32_t rowIndex = -1; SColumnInfoData* pSrc = NULL; - if (pJoinInfo->pLeft->info.blockId == blockId) { + if (pLeftBlock->info.blockId == blockId) { pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId); rowIndex = leftPos; } else { @@ -151,8 +151,9 @@ typedef struct SRowLocation { int32_t pos; } SRowLocation; +// pBlock[tsSlotId][startPos, endPos) == timestamp, static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp, - int32_t* pEndPos, SArray* pRowLocations, SArray* createdBlocks) { + int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) { int32_t numRows = pBlock->info.rows; ASSERT(startPos < numRows); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId); @@ -167,25 +168,107 @@ static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotI int32_t endPos = i; *pEndPos = endPos; + if (endPos - startPos == 0) { + return 0; + } + SSDataBlock* block = pBlock; + bool createdNewBlock = false; if (endPos == numRows) { - block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); + block = blockDataExtractBlock(pBlock, startPos, endPos-startPos); taosArrayPush(createdBlocks, &block); + createdNewBlock = true; } SRowLocation location = {0}; for (int32_t j = startPos; j < endPos; ++j) { location.pDataBlock = block; - location.pos = j; - taosArrayPush(pRowLocations, &location); + location.pos = ( createdNewBlock ? j - startPos : j); + taosArrayPush(rowLocations, &location); } return 0; } -static int32_t mergeJoinGetRowsEqualTimeStamp(SJoinOperatorInfo* pJoinInfo, SArray* pPosArray) { +// whichChild == 0, left child of join; whichChild ==1, right child of join +static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId, + SSDataBlock* startDataBlock, int32_t startPos, + int64_t timestamp, SArray* rowLocations, + SArray* createdBlocks) { + ASSERT(whichChild == 0 || whichChild == 1); + SJoinOperatorInfo* pJoinInfo = pOperator->info; + int32_t endPos = -1; + SSDataBlock* dataBlock = startDataBlock; + mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); + while (endPos == dataBlock->info.rows) { + SOperatorInfo* ds = pOperator->pDownstream[whichChild]; + dataBlock = ds->fpSet.getNextFn(ds); + if (whichChild == 0) { + pJoinInfo->leftPos = 0; + pJoinInfo->pLeft = dataBlock; + } else if (whichChild == 1) { + pJoinInfo->rightPos = 0; + pJoinInfo->pRight = dataBlock; + } + + if (dataBlock == NULL) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + endPos = -1; + break; + } + + mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks); + } + if (endPos != -1) { + if (whichChild == 0) { + pJoinInfo->leftPos = endPos; + } else if (whichChild == 1) { + pJoinInfo->rightPos = endPos; + } + } return 0; } +static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes, + int32_t* nRows) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; + SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + + SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation)); + SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES); + + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft, + pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks); + mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, + pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); + + size_t leftNumJoin = taosArrayGetSize(leftRowLocations); + size_t rightNumJoin = taosArrayGetSize(rightRowLocations); + for (int32_t i = 0; i < leftNumJoin; ++i) { + for (int32_t j = 0; j < rightNumJoin; ++j) { + SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); + SRowLocation* rightRow = taosArrayGet(rightRowLocations, j); + mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock, + rightRow->pos); + ++*nRows; + } + } + + for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(rightCreatedBlocks); + taosArrayDestroy(rightRowLocations); + for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) { + SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i); + blockDataDestroy(pBlock); + } + taosArrayDestroy(leftCreatedBlocks); + taosArrayDestroy(leftRowLocations); + return TSDB_CODE_SUCCESS; +} + static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { SJoinOperatorInfo* pJoinInfo = pOperator->info; @@ -242,10 +325,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) if (leftTs == rightTs) { mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); - pJoinInfo->leftPos += 1; - pJoinInfo->rightPos += 1; - - nrows += 1; + mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); } else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) { pJoinInfo->leftPos += 1; From dc3576f587f62fd994153128a154b23d3535ad20 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 28 Jul 2022 10:00:16 +0800 Subject: [PATCH 5/9] fix: add test case --- tests/script/tsim/parser/join_multivnode.sim | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/script/tsim/parser/join_multivnode.sim b/tests/script/tsim/parser/join_multivnode.sim index c33fa85fa2..f1204326d3 100644 --- a/tests/script/tsim/parser/join_multivnode.sim +++ b/tests/script/tsim/parser/join_multivnode.sim @@ -98,6 +98,11 @@ while $i < $tbNum endw print ===============multivnode projection join.sim +sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts; +print ===> rows $row +if $row != 9000 then + print expect 9000, actual: $row +endi sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1; print ===> rows $row if $row != 3000 then From 9cbd8c7bea9c9af61c586d7bdecc363b419b8541 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 28 Jul 2022 12:02:36 +0800 Subject: [PATCH 6/9] fix: fix bugs related to join and nested query --- source/libs/executor/src/timewindowoperator.c | 24 +++++++++++++------ tests/system-test/2-query/join.py | 2 +- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9a82b194a9..ed1580ed91 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2098,9 +2098,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); switch (pSliceInfo->fillType) { - case TSDB_FILL_NULL: + case TSDB_FILL_NULL: { colDataAppendNULL(pDst, rows); + pResBlock->info.rows += 1; break; + } case TSDB_FILL_SET_VALUE: { SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; @@ -2118,9 +2120,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); colDataAppend(pDst, rows, (char*)&v, false); } - } break; + pResBlock->info.rows += 1; + break; + } - case TSDB_FILL_LINEAR: + case TSDB_FILL_LINEAR: { #if 0 if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { @@ -2151,17 +2155,22 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp } } #endif + // TODO: pResBlock->info.rows += 1; break; - + } case TSDB_FILL_PREV: { SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); colDataAppend(pDst, rows, pkey->pData, false); - } break; + pResBlock->info.rows += 1; + break; + } case TSDB_FILL_NEXT: { char* p = colDataGetData(pSrc, rowIndex); colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex)); - } break; + pResBlock->info.rows += 1; + break; + } case TSDB_FILL_NONE: default: @@ -2169,7 +2178,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp } } - pResBlock->info.rows += 1; } static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { @@ -2221,6 +2229,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { SInterval* pInterval = &pSliceInfo->interval; SOperatorInfo* downstream = pOperator->pDownstream[0]; + blockDataCleanup(pResBlock); + int32_t numOfRows = 0; while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); diff --git a/tests/system-test/2-query/join.py b/tests/system-test/2-query/join.py index 2348873a34..1e2a029282 100644 --- a/tests/system-test/2-query/join.py +++ b/tests/system-test/2-query/join.py @@ -377,7 +377,7 @@ class TDTestCase: tdSql.query("select ct1.c_int from db.ct1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts") tdSql.checkRows(self.rows) tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts") - tdSql.checkRows(self.rows) + tdSql.checkRows(self.rows + int(self.rows * 0.6 //3)+ int(self.rows * 0.8 // 4)) tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts") tdSql.checkRows(self.rows + 3) tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts") From 17a135ced51de8a7ee845cc97c04e7db843b0417 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 28 Jul 2022 13:39:49 +0800 Subject: [PATCH 7/9] fix: fix join test case to hanle repeated ts --- tests/system-test/2-query/join.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/join.py b/tests/system-test/2-query/join.py index 1e2a029282..9d30e1946a 100644 --- a/tests/system-test/2-query/join.py +++ b/tests/system-test/2-query/join.py @@ -381,7 +381,7 @@ class TDTestCase: tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts") tdSql.checkRows(self.rows + 3) tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts") - tdSql.checkRows(self.rows * 3 + 6) + tdSql.checkRows(50) tdSql.query("select count(*) from db.ct1") tdSql.checkData(0, 0, self.rows) From 448e726a38149a6f578dc8b65c390da8947d4036 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 28 Jul 2022 14:48:05 +0800 Subject: [PATCH 8/9] fix: fix tmq_taosx.py to pass CI --- tests/system-test/7-tmq/tmq_taosx.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index a136d0a1a2..8975e3f649 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -68,7 +68,7 @@ class TDTestCase: tdSql.checkData(0, 1, "eeee") tdSql.checkData(1, 2, 940) - tdSql.query("select * from jt") + tdSql.query("select * from jt order by i desc") tdSql.checkRows(2) tdSql.checkData(0, 1, 11) tdSql.checkData(0, 2, None) From c7f2a122c2cd03afa5969cd860379c91f1471ab6 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 28 Jul 2022 14:56:15 +0800 Subject: [PATCH 9/9] fix: modify tmq_taos.py to be indepent of the order of rows --- tests/system-test/7-tmq/tmq_taosx.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 8975e3f649..00b0aed5ee 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -43,9 +43,9 @@ class TDTestCase: tdLog.exit("compare error: %s != %s"%src, dst) else: break - + tdSql.execute('use db_taosx') - tdSql.query("select * from ct3") + tdSql.query("select * from ct3 order by c1 desc") tdSql.checkRows(2) tdSql.checkData(0, 1, 51) tdSql.checkData(0, 4, 940) @@ -58,12 +58,12 @@ class TDTestCase: tdSql.query("select * from ct2") tdSql.checkRows(0) - tdSql.query("select * from ct0") + tdSql.query("select * from ct0 order by c1") tdSql.checkRows(2) tdSql.checkData(0, 3, "a") tdSql.checkData(1, 4, None) - tdSql.query("select * from n1") + tdSql.query("select * from n1 order by cc3 desc") tdSql.checkRows(2) tdSql.checkData(0, 1, "eeee") tdSql.checkData(1, 2, 940)