Merge pull request #15489 from taosdata/szhou/fix/udf
feat: add multi-row equal timestamp for join operator
This commit is contained in:
commit
d6193655f5
|
@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
|
static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
|
||||||
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) {
|
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
|
||||||
|
int32_t rightPos) {
|
||||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
|
@ -129,7 +130,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock*
|
||||||
int32_t rowIndex = -1;
|
int32_t rowIndex = -1;
|
||||||
|
|
||||||
SColumnInfoData* pSrc = NULL;
|
SColumnInfoData* pSrc = NULL;
|
||||||
if (pJoinInfo->pLeft->info.blockId == blockId) {
|
if (pLeftBlock->info.blockId == blockId) {
|
||||||
pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
|
pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
|
||||||
rowIndex = leftPos;
|
rowIndex = leftPos;
|
||||||
} else {
|
} else {
|
||||||
|
@ -144,7 +145,128 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock*
|
||||||
colDataAppend(pDst, currRow, p, false);
|
colDataAppend(pDst, currRow, p, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
typedef struct SRowLocation {
|
||||||
|
SSDataBlock* pDataBlock;
|
||||||
|
int32_t pos;
|
||||||
|
} SRowLocation;
|
||||||
|
|
||||||
|
// pBlock[tsSlotId][startPos, endPos) == timestamp,
|
||||||
|
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
|
||||||
|
int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
|
||||||
|
int32_t numRows = pBlock->info.rows;
|
||||||
|
ASSERT(startPos < numRows);
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
|
||||||
|
|
||||||
|
int32_t i = startPos;
|
||||||
|
for (; i < numRows; ++i) {
|
||||||
|
char* pNextVal = colDataGetData(pCol, i);
|
||||||
|
if (timestamp != *(int64_t*)pNextVal) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int32_t endPos = i;
|
||||||
|
*pEndPos = endPos;
|
||||||
|
|
||||||
|
if (endPos - startPos == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSDataBlock* block = pBlock;
|
||||||
|
bool createdNewBlock = false;
|
||||||
|
if (endPos == numRows) {
|
||||||
|
block = blockDataExtractBlock(pBlock, startPos, endPos-startPos);
|
||||||
|
taosArrayPush(createdBlocks, &block);
|
||||||
|
createdNewBlock = true;
|
||||||
|
}
|
||||||
|
SRowLocation location = {0};
|
||||||
|
for (int32_t j = startPos; j < endPos; ++j) {
|
||||||
|
location.pDataBlock = block;
|
||||||
|
location.pos = ( createdNewBlock ? j - startPos : j);
|
||||||
|
taosArrayPush(rowLocations, &location);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// whichChild == 0, left child of join; whichChild ==1, right child of join
|
||||||
|
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
|
||||||
|
SSDataBlock* startDataBlock, int32_t startPos,
|
||||||
|
int64_t timestamp, SArray* rowLocations,
|
||||||
|
SArray* createdBlocks) {
|
||||||
|
ASSERT(whichChild == 0 || whichChild == 1);
|
||||||
|
|
||||||
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
int32_t endPos = -1;
|
||||||
|
SSDataBlock* dataBlock = startDataBlock;
|
||||||
|
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
|
||||||
|
while (endPos == dataBlock->info.rows) {
|
||||||
|
SOperatorInfo* ds = pOperator->pDownstream[whichChild];
|
||||||
|
dataBlock = ds->fpSet.getNextFn(ds);
|
||||||
|
if (whichChild == 0) {
|
||||||
|
pJoinInfo->leftPos = 0;
|
||||||
|
pJoinInfo->pLeft = dataBlock;
|
||||||
|
} else if (whichChild == 1) {
|
||||||
|
pJoinInfo->rightPos = 0;
|
||||||
|
pJoinInfo->pRight = dataBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dataBlock == NULL) {
|
||||||
|
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||||
|
endPos = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
|
||||||
|
}
|
||||||
|
if (endPos != -1) {
|
||||||
|
if (whichChild == 0) {
|
||||||
|
pJoinInfo->leftPos = endPos;
|
||||||
|
} else if (whichChild == 1) {
|
||||||
|
pJoinInfo->rightPos = endPos;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
|
||||||
|
int32_t* nRows) {
|
||||||
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
|
||||||
|
SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
|
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
|
||||||
|
SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
|
||||||
|
|
||||||
|
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
|
||||||
|
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
|
||||||
|
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
|
||||||
|
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
|
||||||
|
|
||||||
|
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
|
||||||
|
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
|
||||||
|
for (int32_t i = 0; i < leftNumJoin; ++i) {
|
||||||
|
for (int32_t j = 0; j < rightNumJoin; ++j) {
|
||||||
|
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
|
||||||
|
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
|
||||||
|
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
|
||||||
|
rightRow->pos);
|
||||||
|
++*nRows;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
|
||||||
|
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(rightCreatedBlocks);
|
||||||
|
taosArrayDestroy(rightRowLocations);
|
||||||
|
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
|
||||||
|
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(leftCreatedBlocks);
|
||||||
|
taosArrayDestroy(leftRowLocations);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
|
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
|
||||||
|
@ -201,12 +323,9 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (leftTs == rightTs) {
|
if (leftTs == rightTs) {
|
||||||
mergeJoinJoinLeftRight(pOperator, pRes, nrows,
|
mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight,
|
||||||
pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos);
|
pJoinInfo->rightPos);
|
||||||
pJoinInfo->leftPos += 1;
|
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
|
||||||
pJoinInfo->rightPos += 1;
|
|
||||||
|
|
||||||
nrows += 1;
|
|
||||||
} else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
|
} else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
|
||||||
pJoinInfo->leftPos += 1;
|
pJoinInfo->leftPos += 1;
|
||||||
|
|
||||||
|
|
|
@ -2098,9 +2098,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
|
||||||
|
|
||||||
switch (pSliceInfo->fillType) {
|
switch (pSliceInfo->fillType) {
|
||||||
case TSDB_FILL_NULL:
|
case TSDB_FILL_NULL: {
|
||||||
colDataAppendNULL(pDst, rows);
|
colDataAppendNULL(pDst, rows);
|
||||||
|
pResBlock->info.rows += 1;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case TSDB_FILL_SET_VALUE: {
|
case TSDB_FILL_SET_VALUE: {
|
||||||
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;
|
SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal;
|
||||||
|
@ -2118,9 +2120,11 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
|
GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
|
||||||
colDataAppend(pDst, rows, (char*)&v, false);
|
colDataAppend(pDst, rows, (char*)&v, false);
|
||||||
}
|
}
|
||||||
} break;
|
pResBlock->info.rows += 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case TSDB_FILL_LINEAR:
|
case TSDB_FILL_LINEAR: {
|
||||||
#if 0
|
#if 0
|
||||||
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|
||||||
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
|
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
|
||||||
|
@ -2151,17 +2155,22 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
// TODO: pResBlock->info.rows += 1;
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
case TSDB_FILL_PREV: {
|
case TSDB_FILL_PREV: {
|
||||||
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
|
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
|
||||||
colDataAppend(pDst, rows, pkey->pData, false);
|
colDataAppend(pDst, rows, pkey->pData, false);
|
||||||
} break;
|
pResBlock->info.rows += 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case TSDB_FILL_NEXT: {
|
case TSDB_FILL_NEXT: {
|
||||||
char* p = colDataGetData(pSrc, rowIndex);
|
char* p = colDataGetData(pSrc, rowIndex);
|
||||||
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex));
|
colDataAppend(pDst, rows, p, colDataIsNull_s(pSrc, rowIndex));
|
||||||
} break;
|
pResBlock->info.rows += 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
case TSDB_FILL_NONE:
|
case TSDB_FILL_NONE:
|
||||||
default:
|
default:
|
||||||
|
@ -2169,7 +2178,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pResBlock->info.rows += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
|
@ -2221,6 +2229,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
||||||
SInterval* pInterval = &pSliceInfo->interval;
|
SInterval* pInterval = &pSliceInfo->interval;
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
blockDataCleanup(pResBlock);
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
|
|
@ -98,6 +98,11 @@ while $i < $tbNum
|
||||||
endw
|
endw
|
||||||
|
|
||||||
print ===============multivnode projection join.sim
|
print ===============multivnode projection join.sim
|
||||||
|
sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts;
|
||||||
|
print ===> rows $row
|
||||||
|
if $row != 9000 then
|
||||||
|
print expect 9000, actual: $row
|
||||||
|
endi
|
||||||
sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1;
|
sql select join_mt0.ts,join_mt0.ts,join_mt0.t1 from join_mt0, join_mt1 where join_mt0.ts=join_mt1.ts and join_mt0.t1=join_mt1.t1;
|
||||||
print ===> rows $row
|
print ===> rows $row
|
||||||
if $row != 3000 then
|
if $row != 3000 then
|
||||||
|
|
|
@ -377,11 +377,11 @@ class TDTestCase:
|
||||||
tdSql.query("select ct1.c_int from db.ct1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts")
|
tdSql.query("select ct1.c_int from db.ct1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts")
|
||||||
tdSql.checkRows(self.rows)
|
tdSql.checkRows(self.rows)
|
||||||
tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts")
|
tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.ct1 as cy1 on ct1.ts=cy1.ts")
|
||||||
tdSql.checkRows(self.rows)
|
tdSql.checkRows(self.rows + int(self.rows * 0.6 //3)+ int(self.rows * 0.8 // 4))
|
||||||
tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts")
|
tdSql.query("select ct1.c_int from db.nt1 as ct1 join db1.nt1 as cy1 on ct1.ts=cy1.ts")
|
||||||
tdSql.checkRows(self.rows + 3)
|
tdSql.checkRows(self.rows + 3)
|
||||||
tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts")
|
tdSql.query("select ct1.c_int from db.stb1 as ct1 join db1.stb1 as cy1 on ct1.ts=cy1.ts")
|
||||||
tdSql.checkRows(self.rows * 3 + 6)
|
tdSql.checkRows(50)
|
||||||
|
|
||||||
tdSql.query("select count(*) from db.ct1")
|
tdSql.query("select count(*) from db.ct1")
|
||||||
tdSql.checkData(0, 0, self.rows)
|
tdSql.checkData(0, 0, self.rows)
|
||||||
|
|
|
@ -45,7 +45,7 @@ class TDTestCase:
|
||||||
break
|
break
|
||||||
|
|
||||||
tdSql.execute('use db_taosx')
|
tdSql.execute('use db_taosx')
|
||||||
tdSql.query("select * from ct3")
|
tdSql.query("select * from ct3 order by c1 desc")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
tdSql.checkData(0, 1, 51)
|
tdSql.checkData(0, 1, 51)
|
||||||
tdSql.checkData(0, 4, 940)
|
tdSql.checkData(0, 4, 940)
|
||||||
|
@ -58,17 +58,17 @@ class TDTestCase:
|
||||||
tdSql.query("select * from ct2")
|
tdSql.query("select * from ct2")
|
||||||
tdSql.checkRows(0)
|
tdSql.checkRows(0)
|
||||||
|
|
||||||
tdSql.query("select * from ct0")
|
tdSql.query("select * from ct0 order by c1")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
tdSql.checkData(0, 3, "a")
|
tdSql.checkData(0, 3, "a")
|
||||||
tdSql.checkData(1, 4, None)
|
tdSql.checkData(1, 4, None)
|
||||||
|
|
||||||
tdSql.query("select * from n1")
|
tdSql.query("select * from n1 order by cc3 desc")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
tdSql.checkData(0, 1, "eeee")
|
tdSql.checkData(0, 1, "eeee")
|
||||||
tdSql.checkData(1, 2, 940)
|
tdSql.checkData(1, 2, 940)
|
||||||
|
|
||||||
tdSql.query("select * from jt")
|
tdSql.query("select * from jt order by i desc")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
tdSql.checkData(0, 1, 11)
|
tdSql.checkData(0, 1, 11)
|
||||||
tdSql.checkData(0, 2, None)
|
tdSql.checkData(0, 2, None)
|
||||||
|
|
Loading…
Reference in New Issue