enh: add join ut
This commit is contained in:
parent
8d9c093b99
commit
8e73bedbc7
|
@ -19,9 +19,9 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define MJOIN_DEFAULT_BLK_ROWS_NUM 4096
|
||||
#define MJOIN_HJOIN_CART_THRESHOLD 16
|
||||
#define MJOIN_BLK_SIZE_LIMIT 10485760
|
||||
#define MJOIN_DEFAULT_BLK_ROWS_NUM 2 //4096
|
||||
#define MJOIN_HJOIN_CART_THRESHOLD 1024 //16
|
||||
#define MJOIN_BLK_SIZE_LIMIT 0 //10485760
|
||||
#define MJOIN_ROW_BITMAP_SIZE (2 * 1048576)
|
||||
|
||||
struct SMJoinOperatorInfo;
|
||||
|
@ -203,7 +203,7 @@ typedef struct SMJoinOperatorInfo {
|
|||
SFilterInfo* pFPreFilter;
|
||||
SFilterInfo* pPreFilter;
|
||||
SFilterInfo* pFinFilter;
|
||||
// SMJoinFuncs* joinFps;
|
||||
joinImplFp joinFp;
|
||||
SMJoinCtx ctx;
|
||||
SMJoinExecInfo execInfo;
|
||||
} SMJoinOperatorInfo;
|
||||
|
@ -223,6 +223,7 @@ typedef struct SMJoinOperatorInfo {
|
|||
#define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx)
|
||||
|
||||
#define MJOIN_PROBE_TB_ROWS_DONE(_tb) ((_tb)->blkRowIdx >= (_tb)->blk->info.rows)
|
||||
#define FJOIN_PROBE_TB_ROWS_DONE(_tb) ((NULL == (_tb)->blk) || ((_tb)->blkRowIdx >= (_tb)->blk->info.rows))
|
||||
#define MJOIN_BUILD_TB_ROWS_DONE(_tb) ((NULL == (_tb)->blk) || ((_tb)->blkRowIdx >= (_tb)->blk->info.rows))
|
||||
|
||||
#define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity)
|
||||
|
@ -237,7 +238,7 @@ typedef struct SMJoinOperatorInfo {
|
|||
(_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \
|
||||
(_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \
|
||||
} else { \
|
||||
(_ts) = INT64_MIN; \
|
||||
(_ts) = INT64_MAX; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
@ -267,7 +268,9 @@ typedef struct SMJoinOperatorInfo {
|
|||
|
||||
|
||||
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
|
||||
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator);
|
||||
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator);
|
||||
SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator);
|
||||
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
|
||||
void mJoinSetDone(SOperatorInfo* pOperator);
|
||||
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen);
|
||||
|
@ -286,7 +289,8 @@ int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBl
|
|||
int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx);
|
||||
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx);
|
||||
int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t *rowBitmapOffset);
|
||||
int32_t mJoinProcessNonEqualGrp(SMJoinMergeCtx* pCtx, SColumnInfoData* pCol, bool probeGrp, int64_t* probeTs, int64_t* buildTs);
|
||||
int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
|
||||
int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -481,7 +481,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
|
|||
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
|
||||
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
|
||||
} else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
|
||||
MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pProbeCol, true, &probeTs, &buildTs));
|
||||
MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
|
||||
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
|
||||
return pCtx->finBlk;
|
||||
}
|
||||
|
@ -742,14 +742,9 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
|
|||
|
||||
static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
|
||||
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
|
||||
bool buildGot = false;
|
||||
|
||||
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
|
||||
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
|
||||
}
|
||||
bool buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
|
||||
|
||||
if (!probeGot && !buildGot) {
|
||||
mJoinSetDone(pOperator);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -764,7 +759,7 @@ static int32_t mFullJoinMergeCart(SMJoinMergeCtx* pCtx) {
|
|||
return (NULL == pCtx->pJoin->pFPreFilter) ? mOuterJoinMergeFullCart(pCtx) : mOuterJoinMergeSeqCart(pCtx);
|
||||
}
|
||||
|
||||
const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 4, 32, 0, 4, 1, 2};
|
||||
const uint8_t lowest_bit_bitmap[] = {32, 7, 6, 32, 5, 3, 32, 0, 4, 1, 2};
|
||||
|
||||
static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, int32_t idx) {
|
||||
SMJoinGrpRows grp = {0};
|
||||
|
@ -775,13 +770,14 @@ static FORCE_INLINE int32_t mFullJoinOutputHashRow(SMJoinMergeCtx* pCtx, SMJoinH
|
|||
return mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, &grp, false);
|
||||
}
|
||||
|
||||
static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch) {
|
||||
static int32_t mFullJoinOutputHashGrpRows(SMJoinMergeCtx* pCtx, SMJoinHashGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch, bool* grpDone) {
|
||||
int32_t rowNum = taosArrayGetSize(pGrpRows->pRows);
|
||||
for (; pNMatch->rowIdx < rowNum && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) {
|
||||
MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, pNMatch->rowIdx));
|
||||
}
|
||||
|
||||
if (pNMatch->rowIdx >= rowNum) {
|
||||
*grpDone = true;
|
||||
pNMatch->rowIdx = 0;
|
||||
}
|
||||
|
||||
|
@ -808,8 +804,14 @@ static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) {
|
|||
if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) {
|
||||
pGrpRows->allRowsNMatch = true;
|
||||
|
||||
MJ_ERR_RET(mFullJoinOutputHashGrpRows(pCtx, pGrpRows, pNMatch));
|
||||
bool grpDone = false;
|
||||
MJ_ERR_RET(mFullJoinOutputHashGrpRows(pCtx, pGrpRows, pNMatch, &grpDone));
|
||||
if (BLK_IS_FULL(pCtx->finBlk)) {
|
||||
if (grpDone) {
|
||||
pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
|
||||
pNMatch->bitIdx = 0;
|
||||
}
|
||||
|
||||
pCtx->nmatchRemains = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -819,7 +821,8 @@ static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
int32_t bitBytes = BitmapLen(taosArrayGetSize(pGrpRows->pRows));
|
||||
int32_t grpRowNum = taosArrayGetSize(pGrpRows->pRows);
|
||||
int32_t bitBytes = BitmapLen(grpRowNum);
|
||||
for (; pNMatch->bitIdx < bitBytes; ++pNMatch->bitIdx) {
|
||||
if (0 == build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx]) {
|
||||
continue;
|
||||
|
@ -829,6 +832,11 @@ static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) {
|
|||
char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx];
|
||||
while (*v && !BLK_IS_FULL(pCtx->finBlk)) {
|
||||
uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11];
|
||||
if (baseIdx + n >= grpRowNum) {
|
||||
MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
|
||||
continue;
|
||||
}
|
||||
|
||||
MJ_ERR_RET(mFullJoinOutputHashRow(pCtx, pGrpRows, baseIdx + n));
|
||||
MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
|
||||
if (++pGrpRows->rowMatchNum == taosArrayGetSize(pGrpRows->pRows)) {
|
||||
|
@ -839,6 +847,11 @@ static int32_t mFullJoinHandleHashGrpRemains(SMJoinMergeCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (BLK_IS_FULL(pCtx->finBlk)) {
|
||||
if (pNMatch->bitIdx == bitBytes) {
|
||||
pNMatch->pGrp = tSimpleHashIterate(build->pGrpHash, pNMatch->pGrp, &pNMatch->iter);
|
||||
pNMatch->bitIdx = 0;
|
||||
}
|
||||
|
||||
pCtx->nmatchRemains = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -863,12 +876,13 @@ static FORCE_INLINE int32_t mFullJoinOutputMergeRow(SMJoinMergeCtx* pCtx, SMJoin
|
|||
}
|
||||
|
||||
|
||||
static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch) {
|
||||
for (; pNMatch->rowIdx < pGrpRows->endIdx && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) {
|
||||
static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows* pGrpRows, SMJoinNMatchCtx* pNMatch, bool* grpDone) {
|
||||
for (; pNMatch->rowIdx <= pGrpRows->endIdx && !BLK_IS_FULL(pCtx->finBlk); ++pNMatch->rowIdx) {
|
||||
MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, pNMatch->rowIdx));
|
||||
}
|
||||
|
||||
if (pNMatch->rowIdx >= pGrpRows->endIdx) {
|
||||
if (pNMatch->rowIdx > pGrpRows->endIdx) {
|
||||
*grpDone = true;
|
||||
pNMatch->rowIdx = 0;
|
||||
}
|
||||
|
||||
|
@ -879,11 +893,13 @@ static int32_t mFullJoinOutputMergeGrpRows(SMJoinMergeCtx* pCtx, SMJoinGrpRows*
|
|||
static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
|
||||
SMJoinTableCtx* build = pCtx->pJoin->build;
|
||||
SMJoinNMatchCtx* pNMatch = &build->nMatchCtx;
|
||||
|
||||
bool grpDone = false;
|
||||
int32_t baseIdx = 0;
|
||||
int32_t rowNum = 0;
|
||||
int32_t grpNum = taosArrayGetSize(build->eqGrps);
|
||||
for (; pNMatch->grpIdx < grpNum; ++pNMatch->grpIdx) {
|
||||
for (; pNMatch->grpIdx < grpNum; ++pNMatch->grpIdx, pNMatch->bitIdx = 0) {
|
||||
grpDone = false;
|
||||
|
||||
SMJoinGrpRows* pGrpRows = taosArrayGet(build->eqGrps, pNMatch->grpIdx);
|
||||
if (pGrpRows->allRowsMatch) {
|
||||
continue;
|
||||
|
@ -892,9 +908,14 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
|
|||
if (pGrpRows->rowMatchNum <= 0 || pGrpRows->allRowsNMatch) {
|
||||
pGrpRows->allRowsNMatch = true;
|
||||
|
||||
MJ_ERR_RET(mFullJoinOutputMergeGrpRows(pCtx, pGrpRows, pNMatch));
|
||||
MJ_ERR_RET(mFullJoinOutputMergeGrpRows(pCtx, pGrpRows, pNMatch, &grpDone));
|
||||
|
||||
if (BLK_IS_FULL(pCtx->finBlk)) {
|
||||
if (grpDone) {
|
||||
++pNMatch->grpIdx;
|
||||
pNMatch->bitIdx = 0;
|
||||
}
|
||||
|
||||
pCtx->nmatchRemains = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -913,6 +934,12 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
|
|||
char *v = &build->pRowBitmap[pGrpRows->rowBitmapOffset + pNMatch->bitIdx];
|
||||
while (*v && !BLK_IS_FULL(pCtx->finBlk)) {
|
||||
uint8_t n = lowest_bit_bitmap[((*v & (*v - 1)) ^ *v) % 11];
|
||||
if (baseIdx + n > pGrpRows->endIdx) {
|
||||
MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
|
||||
continue;
|
||||
}
|
||||
|
||||
ASSERT(baseIdx + n <= pGrpRows->endIdx);
|
||||
MJ_ERR_RET(mFullJoinOutputMergeRow(pCtx, pGrpRows, baseIdx + n));
|
||||
|
||||
MJOIN_SET_ROW_BITMAP(build->pRowBitmap, pGrpRows->rowBitmapOffset + pNMatch->bitIdx, n);
|
||||
|
@ -925,6 +952,11 @@ static int32_t mFullJoinHandleMergeGrpRemains(SMJoinMergeCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (BLK_IS_FULL(pCtx->finBlk)) {
|
||||
if (pNMatch->bitIdx == bitBytes) {
|
||||
++pNMatch->grpIdx;
|
||||
pNMatch->bitIdx = 0;
|
||||
}
|
||||
|
||||
pCtx->nmatchRemains = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -976,6 +1008,14 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
|
|||
|
||||
do {
|
||||
if (!mFullJoinRetrieve(pOperator, pJoin, pCtx)) {
|
||||
if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
|
||||
MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
|
||||
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
|
||||
return pCtx->finBlk;
|
||||
}
|
||||
}
|
||||
|
||||
mJoinSetDone(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -988,7 +1028,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
|
|||
return pCtx->finBlk;
|
||||
}
|
||||
|
||||
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
|
||||
if (FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
|
||||
continue;
|
||||
} else {
|
||||
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
|
||||
|
@ -1000,7 +1040,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
|
||||
while (!FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
|
||||
if (probeTs == buildTs) {
|
||||
pCtx->lastEqTs = probeTs;
|
||||
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
|
||||
|
@ -1022,9 +1062,9 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
|
||||
MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pProbeCol, true, &probeTs, &buildTs));
|
||||
MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
|
||||
} else {
|
||||
MJ_ERR_JRET(mJoinProcessNonEqualGrp(pCtx, pBuildCol, false, &probeTs, &buildTs));
|
||||
MJ_ERR_JRET(mJoinProcessOverGrp(pCtx, pJoin->build, pBuildCol, &probeTs, &buildTs));
|
||||
}
|
||||
|
||||
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
|
||||
|
@ -1032,7 +1072,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pJoin->build->dsFetchDone && !MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
|
||||
if (pJoin->build->dsFetchDone && !FJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
|
||||
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
|
||||
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
|
||||
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
|
||||
|
@ -1046,7 +1086,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pJoin->probe->dsFetchDone && !MJOIN_PROBE_TB_ROWS_DONE(pJoin->build)) {
|
||||
if (pJoin->probe->dsFetchDone && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
|
||||
pCtx->buildNEqGrp.blk = pJoin->build->blk;
|
||||
pCtx->buildNEqGrp.beginIdx = pJoin->build->blkRowIdx;
|
||||
pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
|
||||
|
|
|
@ -53,7 +53,7 @@ int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo
|
|||
} else {
|
||||
bool* pRes = (bool*)p->pData;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *pRes) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) {
|
||||
if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + i)) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -113,8 +113,8 @@ int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SM
|
|||
continue;
|
||||
}
|
||||
|
||||
for (int32_t m = startRowIdx; m < buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) {
|
||||
if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *pRes) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) {
|
||||
for (int32_t m = startRowIdx; m <= buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) {
|
||||
if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + rowNum)) || MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -367,46 +367,52 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB
|
|||
|
||||
pCtx->hashJoin = false;
|
||||
|
||||
if (pJoin->build->rowBitmapSize > 0) {
|
||||
if (!lastBuildGrp && pJoin->build->rowBitmapSize > 0) {
|
||||
mJoinAllocGrpRowBitmap(pJoin->build);
|
||||
}
|
||||
|
||||
return (*pCtx->mergeCartFp)(pCtx);
|
||||
}
|
||||
|
||||
int32_t mJoinProcessNonEqualGrp(SMJoinMergeCtx* pCtx, SColumnInfoData* pCol, bool probeGrp, int64_t* probeTs, int64_t* buildTs) {
|
||||
SMJoinGrpRows* pGrp;
|
||||
SMJoinTableCtx* pTb;
|
||||
int64_t* pTs;
|
||||
|
||||
if (probeGrp) {
|
||||
pGrp = &pCtx->probeNEqGrp;
|
||||
pTb = pCtx->pJoin->probe;
|
||||
pTs = probeTs;
|
||||
} else {
|
||||
pGrp = &pCtx->buildNEqGrp;
|
||||
pTb = pCtx->pJoin->build;
|
||||
pTs = buildTs;
|
||||
}
|
||||
|
||||
pGrp->blk = pTb->blk;
|
||||
pGrp->beginIdx = pTb->blkRowIdx;
|
||||
pGrp->readIdx = pGrp->beginIdx;
|
||||
pGrp->endIdx = pGrp->beginIdx;
|
||||
int32_t mJoinProcessUnreachGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
|
||||
pCtx->probeNEqGrp.blk = pTb->blk;
|
||||
pCtx->probeNEqGrp.beginIdx = pTb->blkRowIdx;
|
||||
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
|
||||
pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx;
|
||||
|
||||
while (++pTb->blkRowIdx < pTb->blk->info.rows) {
|
||||
MJOIN_GET_TB_CUR_TS(pCol, *pTs, pTb);
|
||||
MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb);
|
||||
if (PROBE_TS_UNREACH(pCtx->ascTs, *probeTs, *buildTs)) {
|
||||
pGrp->endIdx = pTb->blkRowIdx;
|
||||
pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx;
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return mJoinNonEqCart(pCtx, pGrp, probeGrp);
|
||||
return mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true);
|
||||
}
|
||||
|
||||
int32_t mJoinProcessOverGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
|
||||
pCtx->buildNEqGrp.blk = pTb->blk;
|
||||
pCtx->buildNEqGrp.beginIdx = pTb->blkRowIdx;
|
||||
pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
|
||||
pCtx->buildNEqGrp.endIdx = pCtx->buildNEqGrp.beginIdx;
|
||||
|
||||
while (++pTb->blkRowIdx < pTb->blk->info.rows) {
|
||||
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb);
|
||||
if (PROBE_TS_OVER(pCtx->ascTs, *probeTs, *buildTs)) {
|
||||
pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx;
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
return mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false);
|
||||
}
|
||||
|
||||
|
||||
SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
|
||||
SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
|
||||
if (p) {
|
||||
|
@ -548,13 +554,8 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
|
|||
switch (pInfo->joinType) {
|
||||
case JOIN_TYPE_INNER:
|
||||
case JOIN_TYPE_FULL:
|
||||
if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) {
|
||||
buildIdx = 0;
|
||||
probeIdx = 1;
|
||||
} else {
|
||||
buildIdx = 1;
|
||||
probeIdx = 0;
|
||||
}
|
||||
break;
|
||||
case JOIN_TYPE_LEFT:
|
||||
buildIdx = 1;
|
||||
|
@ -957,8 +958,7 @@ SSDataBlock* mJoinMainProcess(struct SOperatorInfo* pOperator) {
|
|||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while (true) {
|
||||
//pBlock = (*pJoin->joinFps)(pOperator);
|
||||
pBlock = mLeftJoinDo(pOperator);
|
||||
pBlock = (*pJoin->joinFp)(pOperator);
|
||||
if (NULL == pBlock) {
|
||||
if (pJoin->errCode) {
|
||||
ASSERT(0);
|
||||
|
@ -1062,6 +1062,24 @@ int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
|
||||
switch (pJoin->joinType) {
|
||||
case JOIN_TYPE_INNER:
|
||||
pJoin->joinFp = mInnerJoinDo;
|
||||
break;
|
||||
case JOIN_TYPE_LEFT:
|
||||
case JOIN_TYPE_RIGHT:
|
||||
pJoin->joinFp = mLeftJoinDo;
|
||||
break;
|
||||
case JOIN_TYPE_FULL:
|
||||
pJoin->joinFp = mFullJoinDo;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
|
||||
|
@ -1087,6 +1105,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
|||
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
|
||||
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
|
||||
|
||||
MJ_ERR_JRET(mJoinSetImplFp(pInfo));
|
||||
MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
|
|
|
@ -66,7 +66,7 @@ enum {
|
|||
};
|
||||
|
||||
#define COL_DISPLAY_WIDTH 18
|
||||
#define JT_MAX_LOOP 100000
|
||||
#define JT_MAX_LOOP 3000
|
||||
|
||||
#define LEFT_BLK_ID 0
|
||||
#define RIGHT_BLK_ID 1
|
||||
|
@ -156,6 +156,9 @@ typedef struct {
|
|||
SOperatorInfo* pJoinOp;
|
||||
|
||||
int32_t loopIdx;
|
||||
|
||||
int32_t rightFinMatchNum;
|
||||
bool* rightFinMatch;
|
||||
} SJoinTestCtx;
|
||||
|
||||
typedef struct {
|
||||
|
@ -174,7 +177,7 @@ typedef struct {
|
|||
|
||||
|
||||
SJoinTestCtx jtCtx = {0};
|
||||
SJoinTestCtrl jtCtrl = {0, 0, 0, 0};
|
||||
SJoinTestCtrl jtCtrl = {1, 1, 1, 0};
|
||||
SJoinTestStat jtStat = {0};
|
||||
SJoinTestResInfo jtRes = {0};
|
||||
|
||||
|
@ -1144,6 +1147,136 @@ void leftJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
|||
|
||||
}
|
||||
|
||||
|
||||
void fullJoinAppendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
||||
bool leftMatch = false, rightMatch = false, lfilterOut = false, rfilterOut = false;
|
||||
void* lValue = NULL, *rValue = NULL, *filterValue = NULL;
|
||||
int64_t lBig = 0, rBig = 0, fbig = 0;
|
||||
int64_t rightTbOffset = jtCtx.blkRowSize * leftGrpRows;
|
||||
|
||||
memset(jtCtx.rightFinMatch, 0, rightGrpRows * sizeof(bool));
|
||||
|
||||
for (int32_t l = 0; l < leftGrpRows; ++l) {
|
||||
char* lrow = jtCtx.colRowDataBuf + jtCtx.blkRowSize * l;
|
||||
|
||||
lfilterOut = false;
|
||||
leftMatch = false;
|
||||
|
||||
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
|
||||
lValue = lrow + jtCtx.colRowOffset[c];
|
||||
switch (jtInputColType[c]) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
fbig = TIMESTAMP_FILTER_VALUE;
|
||||
lBig = *(int64_t*)lValue;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
fbig = INT_FILTER_VALUE;
|
||||
lBig = *(int32_t*)lValue;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
fbig = BIGINT_FILTER_VALUE;
|
||||
lBig = *(int64_t*)lValue;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (jtCtx.leftFilterNum && jtCtx.leftFilterColList[c] && ((*(bool*)(lrow + c)) || lBig <= fbig)) {
|
||||
lfilterOut = true;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t r = 0; r < rightGrpRows; ++r) {
|
||||
char* rrow = jtCtx.colRowDataBuf + rightTbOffset + jtCtx.blkRowSize * r;
|
||||
rightMatch = true;
|
||||
rfilterOut = false;
|
||||
|
||||
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
|
||||
lValue = lrow + jtCtx.colRowOffset[c];
|
||||
|
||||
if (!*(bool*)(rrow + c)) {
|
||||
rValue = rrow + jtCtx.colRowOffset[c];
|
||||
}
|
||||
|
||||
switch (jtInputColType[c]) {
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
fbig = TIMESTAMP_FILTER_VALUE;
|
||||
lBig = *(int64_t*)lValue;
|
||||
rBig = *(int64_t*)rValue;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
fbig = INT_FILTER_VALUE;
|
||||
lBig = *(int32_t*)lValue;
|
||||
rBig = *(int32_t*)rValue;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
fbig = BIGINT_FILTER_VALUE;
|
||||
lBig = *(int64_t*)lValue;
|
||||
rBig = *(int64_t*)rValue;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (jtCtx.colEqNum && jtCtx.colEqList[c] && ((*(bool*)(lrow + c)) || (*(bool*)(rrow + c)) || lBig != rBig)) {
|
||||
rightMatch = false;
|
||||
}
|
||||
|
||||
if (jtCtx.colOnNum && jtCtx.colOnList[c] && ((*(bool*)(lrow + c)) || (*(bool*)(rrow + c)) || lBig <= rBig)) {
|
||||
rightMatch = false;
|
||||
}
|
||||
|
||||
if (jtCtx.rightFilterNum && jtCtx.rightFilterColList[c] && ((*(bool*)(rrow + c)) || rBig <= fbig)) {
|
||||
rfilterOut = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (rightMatch) {
|
||||
jtCtx.rightFinMatch[r] = true;
|
||||
}
|
||||
|
||||
if (rfilterOut) {
|
||||
if (!rightMatch) {
|
||||
jtCtx.rightFinMatch[r] = true;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!lfilterOut && rightMatch) {
|
||||
putMatchRowToRes(lrow, rrow);
|
||||
leftMatch= true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!lfilterOut && !leftMatch && 0 == jtCtx.rightFilterNum) {
|
||||
putNMatchRowToRes(lrow, 0, MAX_SLOT_NUM);
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == jtCtx.leftFilterNum) {
|
||||
for (int32_t r = 0; r < rightGrpRows; ++r) {
|
||||
if (!jtCtx.rightFinMatch[r]) {
|
||||
char* rrow = jtCtx.colRowDataBuf + rightTbOffset + jtCtx.blkRowSize * r;
|
||||
putNMatchRowToRes(rrow, MAX_SLOT_NUM, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void appendEqGrpRes(int32_t leftGrpRows, int32_t rightGrpRows) {
|
||||
switch (jtCtx.joinType) {
|
||||
case JOIN_TYPE_LEFT:
|
||||
leftJoinAppendEqGrpRes(leftGrpRows, rightGrpRows);
|
||||
break;
|
||||
case JOIN_TYPE_FULL:
|
||||
fullJoinAppendEqGrpRes(leftGrpRows, rightGrpRows);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
void createTsEqGrpRows(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t leftGrpRows, int32_t rightGrpRows) {
|
||||
if (leftGrpRows <= 0 && rightGrpRows <= 0) {
|
||||
return;
|
||||
|
@ -1170,7 +1303,7 @@ void createTsEqGrpRows(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left
|
|||
|
||||
makeAppendBlkData(ppLeft, ppRight, leftGrpRows, rightGrpRows);
|
||||
|
||||
leftJoinAppendEqGrpRes(leftGrpRows, rightGrpRows);
|
||||
appendEqGrpRes(leftGrpRows, rightGrpRows);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1240,6 +1373,12 @@ void createDummyBlkList(int32_t leftMaxRows, int32_t leftMaxGrpRows, int32_t rig
|
|||
jtCtx.rightMaxGrpRows = rightMaxGrpRows;
|
||||
jtCtx.blkRows = blkRows;
|
||||
|
||||
int32_t maxGrpRows = TMAX(leftMaxGrpRows, rightMaxGrpRows);
|
||||
if (maxGrpRows > jtCtx.rightFinMatchNum) {
|
||||
jtCtx.rightFinMatchNum = maxGrpRows;
|
||||
jtCtx.rightFinMatch = (bool*)taosMemoryRealloc(jtCtx.rightFinMatch, maxGrpRows * sizeof(bool));
|
||||
}
|
||||
|
||||
createBothBlkRowsData();
|
||||
}
|
||||
|
||||
|
@ -1638,7 +1777,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
|
|||
bool contLoop = true;
|
||||
|
||||
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param->joinType, param->subType, param->cond, param->filter, param->asc);
|
||||
createDummyBlkList(200, 200, 200, 200, 10);
|
||||
createDummyBlkList(10, 10, 10, 10, 2);
|
||||
|
||||
while (contLoop) {
|
||||
rerunBlockedHere();
|
||||
|
@ -1663,8 +1802,14 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
|
|||
handleTestDone();
|
||||
}
|
||||
|
||||
void handleCaseEnd() {
|
||||
taosMemoryFree(jtCtx.rightFinMatch);
|
||||
jtCtx.rightFinMatchNum = 0;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
#if 0
|
||||
#if 1
|
||||
TEST(leftOuterJoin, noCondTest) {
|
||||
SJoinTestParam param;
|
||||
|
@ -1764,6 +1909,108 @@ TEST(leftOuterJoin, fullCondTest) {
|
|||
taosMemoryFree(pTask);
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
TEST(fullOuterJoin, noCondTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "fullOuterJoin:noCondTest";
|
||||
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
|
||||
|
||||
param.pTask = pTask;
|
||||
param.joinType = JOIN_TYPE_FULL;
|
||||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_NO_COND;
|
||||
param.asc = true;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
||||
printStatInfo(caseName);
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
TEST(fullOuterJoin, eqCondTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "fullOuterJoin:eqCondTest";
|
||||
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
|
||||
|
||||
param.pTask = pTask;
|
||||
param.joinType = JOIN_TYPE_FULL;
|
||||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_EQ_COND;
|
||||
param.asc = true;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
||||
printStatInfo(caseName);
|
||||
taosMemoryFree(pTask);
|
||||
handleCaseEnd();
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
TEST(fullOuterJoin, onCondTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "fullOuterJoin:onCondTest";
|
||||
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
|
||||
|
||||
param.pTask = pTask;
|
||||
param.joinType = JOIN_TYPE_FULL;
|
||||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_ON_COND;
|
||||
param.asc = true;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
||||
printStatInfo(caseName);
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
TEST(fullOuterJoin, fullCondTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "fullOuterJoin:fullCondTest";
|
||||
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
|
||||
|
||||
param.pTask = pTask;
|
||||
param.joinType = JOIN_TYPE_FULL;
|
||||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_FULL_COND;
|
||||
param.asc = true;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
||||
printStatInfo(caseName);
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
|
|
@ -1255,9 +1255,14 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
case QUERY_NODE_LOGIC_PLAN_JOIN: {
|
||||
SJoinLogicNode* pLogicNode = (SJoinLogicNode*)pNode;
|
||||
destroyLogicNode((SLogicNode*)pLogicNode);
|
||||
nodesDestroyNode(pLogicNode->pWindowOffset);
|
||||
nodesDestroyNode(pLogicNode->pJLimit);
|
||||
nodesDestroyNode(pLogicNode->pPrimKeyEqCond);
|
||||
nodesDestroyNode(pLogicNode->pFullOnCond);
|
||||
nodesDestroyNode(pLogicNode->pColEqCond);
|
||||
nodesDestroyNode(pLogicNode->pColOnCond);
|
||||
nodesDestroyNode(pLogicNode->pTagEqCond);
|
||||
nodesDestroyNode(pLogicNode->pTagOnCond);
|
||||
nodesDestroyNode(pLogicNode->pFullOnCond);
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG: {
|
||||
|
|
|
@ -1123,7 +1123,6 @@ static SNode* biMakeTbnameProjectAstNode(char* funcName, char* tableAlias) {
|
|||
n->literal = tstrdup(tableAlias);
|
||||
n->node.resType.type = TSDB_DATA_TYPE_BINARY;
|
||||
n->node.resType.bytes = strlen(n->literal);
|
||||
n->isDuration = false;
|
||||
n->translate = false;
|
||||
valNode = n;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
sql connect
|
||||
sql use test0;
|
||||
|
||||
sql select a.col1, b.col1 from sta a inner join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' order by a.col1, b.col1;
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data40 != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data41 != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.col1, b.col1 from sta a join sta b on a.ts = b.ts where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.col1, b.col1 from sta a join sta b on a.ts = b.ts;
|
||||
if $rows != 12 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.col1, b.col1 from tba1 a join tba2 b on a.ts = b.ts order by a.col1, b.col1;
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.col1, b.col1 from tba2 a join tba1 b on a.ts = b.ts order by a.col1, b.col1;
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
|
@ -5,19 +5,19 @@ sql connect
|
|||
sql drop database if exists test0
|
||||
sql create database test0 vgroups 3;
|
||||
sql use test0;
|
||||
create stable sta (ts timestamp, col1 int) tags(t1 int);
|
||||
create table tba1 using sta tags(1);
|
||||
create table tba2 using sta tags(2);
|
||||
sql create stable sta (ts timestamp, col1 int) tags(t1 int);
|
||||
sql create table tba1 using sta tags(1);
|
||||
sql create table tba2 using sta tags(2);
|
||||
|
||||
insert into tba1 values ('2023-11-17 16:29:00', 1);
|
||||
insert into tba1 values ('2023-11-17 16:29:02', 3);
|
||||
insert into tba1 values ('2023-11-17 16:29:03', 4);
|
||||
insert into tba1 values ('2023-11-17 16:29:04', 5);
|
||||
sql insert into tba1 values ('2023-11-17 16:29:00', 1);
|
||||
sql insert into tba1 values ('2023-11-17 16:29:02', 3);
|
||||
sql insert into tba1 values ('2023-11-17 16:29:03', 4);
|
||||
sql insert into tba1 values ('2023-11-17 16:29:04', 5);
|
||||
|
||||
insert into tba2 values ('2023-11-17 16:29:00', 2);
|
||||
insert into tba2 values ('2023-11-17 16:29:01', 3);
|
||||
insert into tba2 values ('2023-11-17 16:29:03', 5);
|
||||
insert into tba2 values ('2023-11-17 16:29:05', 7);
|
||||
sql insert into tba2 values ('2023-11-17 16:29:00', 2);
|
||||
sql insert into tba2 values ('2023-11-17 16:29:01', 3);
|
||||
sql insert into tba2 values ('2023-11-17 16:29:03', 5);
|
||||
sql insert into tba2 values ('2023-11-17 16:29:05', 7);
|
||||
|
||||
sql drop database if exists testa
|
||||
sql create database testa vgroups 3;
|
||||
|
@ -57,13 +57,17 @@ sql insert into ctb22 using st2 tags(2) values('2023-10-16 09:10:12', 110222, 11
|
|||
sql insert into ctb23 using st2 tags(3) values('2023-10-16 09:10:13', 110223, 1102230);
|
||||
sql insert into ctb24 using st2 tags(4) values('2023-10-16 09:10:14', 110224, 1102240);
|
||||
|
||||
run tsim/join/inner_join.sim
|
||||
run tsim/join/left_join.sim
|
||||
run tsim/join/right_join.sim
|
||||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
print ================== server restart completed
|
||||
|
||||
run tsim/join/inner_join.sim
|
||||
run tsim/join/left_join.sim
|
||||
run tsim/join/right_join.sim
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
sql connect
|
||||
sql use test0;
|
||||
|
||||
sql select a.col1, b.col1 from sta a right join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by b.col1, a.col1;
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data40 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data41 != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data50 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data51 != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data60 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data61 != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data70 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data71 != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data80 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data81 != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data90 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data91 != 7 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.col1, b.col1 from sta a right join sta b on a.ts = b.ts where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.col1, b.col1 from sta a right join sta b on a.ts = b.ts;
|
||||
if $rows != 12 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.col1, b.col1 from tba1 a right join tba2 b on a.ts = b.ts order by a.col1, b.col1;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 7 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.col1, b.col1 from tba2 a right join tba1 b on a.ts = b.ts order by a.col1, b.col1;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != 5 then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
Loading…
Reference in New Issue