enh: support group join

This commit is contained in:
dapan1121 2024-02-27 18:18:25 +08:00
parent 97aca25633
commit 8a2b4e2d86
9 changed files with 946 additions and 100 deletions

View File

@ -140,6 +140,7 @@ typedef struct SJoinLogicNode {
SNode* pFullOnCond; // except prim eq cond
SNodeList* pLeftEqNodes;
SNodeList* pRightEqNodes;
bool allEqTags;
bool isSingleTableJoin;
bool hasSubQuery;
bool isLowLevelJoin;

View File

@ -33,14 +33,12 @@ extern "C" {
struct SMJoinOperatorInfo;
typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*);
typedef int32_t (*joinCartFp)(void*);
typedef enum EJoinTableType {
E_JOIN_TB_BUILD = 1,
E_JOIN_TB_PROBE
} EJoinTableType;
#define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE")
#define IS_FULL_OUTER_JOIN(_jtype, _stype) ((_jtype) == JOIN_TYPE_FULL && (_stype) == JOIN_STYPE_OUTER)
@ -171,6 +169,7 @@ typedef struct SMJoinGrpRows {
bool lastEqGrp; \
bool lastProbeGrp; \
bool seqWinGrp; \
bool groupJoin; \
int32_t blkThreshold; \
int64_t jLimit
@ -178,6 +177,8 @@ typedef struct SMJoinCommonCtx {
MJOIN_COMMON_CTX;
} SMJoinCommonCtx;
typedef int32_t (*joinCartFp)(void*);
typedef struct SMJoinMergeCtx {
// KEEP IT FIRST
struct SMJoinOperatorInfo* pJoin;
@ -187,12 +188,12 @@ typedef struct SMJoinMergeCtx {
bool lastEqGrp;
bool lastProbeGrp;
bool seqWinGrp;
bool groupJoin;
int32_t blkThreshold;
int64_t jLimit;
// KEEP IT FIRST
bool hashCan;
bool keepOrder;
bool midRemains;
bool nmatchRemains;
SSDataBlock* midBlk;
@ -230,6 +231,7 @@ typedef struct SMJoinWindowCtx {
bool lastEqGrp;
bool lastProbeGrp;
bool seqWinGrp;
bool groupJoin;
int32_t blkThreshold;
int64_t jLimit;
// KEEP IT FIRST
@ -240,14 +242,11 @@ typedef struct SMJoinWindowCtx {
bool lowerRowsAcq;
bool eqRowsAcq;
bool greaterRowsAcq;
bool groupJoin;
int64_t seqGrpId;
int64_t winBeginTs;
int64_t winEndTs;
bool eqPostDone;
int64_t lastTs;
bool rowRemains;
SMJoinGrpRows probeGrp;
SMJoinGrpRows buildGrp;
SMJoinWinCache cache;
@ -279,26 +278,28 @@ typedef struct SMJoinExecInfo {
int64_t expectRows;
} SMJoinExecInfo;
typedef struct SMJoinRetrieveCtx {
bool grpRetrieve;
uint64_t lastGid[2];
SSDataBlock* remainBlk[2];
} SMJoinRetrieveCtx;
typedef SSDataBlock* (*joinImplFp)(SOperatorInfo*);
typedef SSDataBlock* (*joinRetrieveFp)(struct SMJoinOperatorInfo*, SMJoinTableCtx*);
typedef void (*joinResetFp)(struct SMJoinOperatorInfo*);
typedef struct SMJoinOperatorInfo {
SOperatorInfo* pOperator;
int32_t joinType;
int32_t subType;
int32_t inputTsOrder;
int32_t errCode;
int32_t errCode;
int64_t outGrpId;
SMJoinTableCtx tbs[2];
SMJoinTableCtx* build;
SMJoinTableCtx* probe;
SMJoinRetrieveCtx retrieveCtx;
SFilterInfo* pFPreFilter;
SFilterInfo* pPreFilter;
SFilterInfo* pFinFilter;
joinImplFp joinFp;
joinRetrieveFp retrieveFp;
joinResetFp grpResetFp;
SMJoinCtx ctx;
SMJoinExecInfo execInfo;
} SMJoinOperatorInfo;
@ -420,11 +421,15 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator);
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx);
void mJoinResetTableCtx(SMJoinTableCtx* pCtx);
void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin);
bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
void mJoinSetDone(SOperatorInfo* pOperator);
bool mJoinIsDone(SOperatorInfo* pOperator);
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen);
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart);
int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp);
int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp);
int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable);
int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable);
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable);

View File

@ -35,12 +35,12 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
int32_t buildGrpNum = taosArrayGetSize(cache->grps);
int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit);
pCtx->finBlk->info.id.groupId = (pCtx->seqWinGrp || pCtx->groupJoin) ? pCtx->seqGrpId : 0;
pCtx->finBlk->info.id.groupId = pCtx->seqWinGrp ? pCtx->pJoin->outGrpId : 0;
if (buildGrpNum <= 0 || buildTotalRows <= 0) {
MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp));
if (pCtx->seqWinGrp) {
pCtx->seqGrpId++;
pCtx->pJoin->outGrpId++;
}
return TSDB_CODE_SUCCESS;
}
@ -90,7 +90,7 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
cache->grpIdx = 0;
++probeGrp->readIdx;
if (pCtx->seqWinGrp) {
pCtx->seqGrpId++;
pCtx->pJoin->outGrpId++;
break;
}
}
@ -462,17 +462,20 @@ static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) {
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) {
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
mJoinSetDone(pOperator);
}
return false;
}
@ -531,7 +534,11 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
}
do {
if (!mLeftJoinRetrieve(pOperator, pJoin)) {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
continue;
}
break;
}
@ -578,7 +585,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
@ -603,6 +610,20 @@ _return:
return pCtx->finBlk;
}
void mLeftJoinGroupReset(SMJoinOperatorInfo* pJoin) {
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
pCtx->lastEqGrp = false;
pCtx->lastProbeGrp = false;
pCtx->hashCan = false;
pCtx->midRemains = false;
pCtx->lastEqTs = INT64_MIN;
mJoinResetGroupTableCtx(pJoin->probe);
mJoinResetGroupTableCtx(pJoin->build);
}
static int32_t mInnerJoinMergeCart(SMJoinMergeCtx* pCtx) {
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinTableCtx* probe = pCtx->pJoin->probe;
@ -716,11 +737,11 @@ static FORCE_INLINE int32_t mInnerJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
@ -827,8 +848,8 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
}
static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
if (!probeGot && !buildGot) {
return false;
@ -1722,7 +1743,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
}
do {
if (!mLeftJoinRetrieve(pOperator, pJoin)) {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) {
break;
}
@ -1868,14 +1889,13 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow
eqRowsNum += grp.endIdx - grp.beginIdx + 1;
if (pTable->blkRowIdx == pTable->blk->info.rows && !pTable->dsFetchDone) {
pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx);
pTable->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pTable);
qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->blkRowIdx = 0;
pCtx->buildGrp.blk = pTable->blk;
if (NULL == pTable->blk) {
pTable->dsFetchDone = true;
break;
}
} else {
@ -2051,16 +2071,19 @@ int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) {
}
static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
mJoinSetDone(pOperator);
}
return false;
}
@ -2100,6 +2123,10 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) {
do {
if (!mAsofLowerRetrieve(pOperator, pJoin, pCtx)) {
if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
continue;
}
break;
}
@ -2149,7 +2176,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) {
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) {
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
@ -2176,6 +2203,10 @@ _return:
}
int32_t mAsofGreaterTrimCacheBlk(SMJoinWindowCtx* pCtx) {
if (taosArrayGetSize(pCtx->cache.grps) <= 0) {
return TSDB_CODE_SUCCESS;
}
SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);
if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) {
MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx));
@ -2197,40 +2228,38 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) {
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinWinCache* pCache = &pCtx->cache;
int32_t grpNum = taosArrayGetSize(pCache->grps);
ASSERT(grpNum >= 1 && grpNum <= 2);
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1);
if (pGrp->blk != pCache->outBlk) {
int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0;
MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx));
if (1 == grpNum) {
pGrp->blk = pCache->outBlk;
pGrp->beginIdx = 0;
pGrp->readIdx = 0;
//pGrp->endIdx = pGrp->blk->info.rows - 1;
} else {
taosArrayPop(pCache->grps);
pGrp = taosArrayGet(pCache->grps, 0);
ASSERT(pGrp->blk == pCache->outBlk);
//pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
if (grpNum >= 1) {
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1);
if (pGrp->blk != pCache->outBlk) {
int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0;
MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx));
if (1 == grpNum) {
pGrp->blk = pCache->outBlk;
pGrp->beginIdx = 0;
pGrp->readIdx = 0;
//pGrp->endIdx = pGrp->blk->info.rows - 1;
} else {
taosArrayPop(pCache->grps);
pGrp = taosArrayGet(pCache->grps, 0);
ASSERT(pGrp->blk == pCache->outBlk);
//pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
}
//ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum);
}
//ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum);
} else {
ASSERT(grpNum == 1);
}
ASSERT(taosArrayGetSize(pCache->grps) == 1);
ASSERT(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum);
ASSERT(taosArrayGetSize(pCache->grps) == 1);
ASSERT(pGrp->blk->info.rows - pGrp->beginIdx == pCtx->cache.rowNum);
}
do {
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx);
build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, build);
qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
build->blkRowIdx = 0;
if (NULL == build->blk) {
build->dsFetchDone = true;
break;
}
@ -2337,13 +2366,12 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
return TSDB_CODE_SUCCESS;
}
pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx);
pTable->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pTable);
qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->blkRowIdx = 0;
if (NULL == pTable->blk) {
pTable->dsFetchDone = true;
return TSDB_CODE_SUCCESS;
}
@ -2420,7 +2448,7 @@ int32_t mAsofGreaterProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo*
}
static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
@ -2428,11 +2456,14 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
pJoin->build->newBlk = false;
MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
ASSERT(taosArrayGetSize(pCtx->cache.grps) <= 1);
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
mJoinSetDone(pOperator);
}
return false;
}
@ -2486,6 +2517,10 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
do {
if (!mAsofGreaterRetrieve(pOperator, pJoin, pCtx)) {
if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
continue;
}
break;
}
@ -2529,7 +2564,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) {
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
@ -2554,6 +2589,34 @@ _return:
return pCtx->finBlk;
}
void mAsofJoinGroupReset(SMJoinOperatorInfo* pJoin) {
SMJoinWindowCtx* pWin = &pJoin->ctx.windowCtx;
SMJoinWinCache* pCache = &pWin->cache;
pWin->lastEqGrp = false;
pWin->lastProbeGrp = false;
pWin->eqPostDone = false;
pWin->lastTs = INT64_MIN;
pCache->outRowIdx = 0;
pCache->rowNum = 0;
pCache->grpIdx = 0;
if (pCache->grpsQueue) {
TSWAP(pCache->grps, pCache->grpsQueue);
}
taosArrayClear(pCache->grps);
if (pCache->outBlk) {
blockDataCleanup(pCache->outBlk);
}
mJoinResetGroupTableCtx(pJoin->probe);
mJoinResetGroupTableCtx(pJoin->build);
}
static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
SMJoinWinCache* pCache = &pCtx->cache;
int32_t grpNum = taosArrayGetSize(pCache->grps);
@ -2579,7 +2642,7 @@ static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
}
static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool probeGot = mJoinRetrieveBlk(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
@ -2588,11 +2651,14 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin
mWinJoinCloneCacheBlk(pCtx);
}
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
buildGot = mJoinRetrieveBlk(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
if (!pCtx->groupJoin || NULL == pJoin->probe->remainInBlk) {
mJoinSetDone(pOperator);
}
return false;
}
@ -2766,13 +2832,12 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
}
do {
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx);
build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build);
qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
build->blkRowIdx = 0;
if (NULL == build->blk) {
build->dsFetchDone = true;
break;
}
@ -2844,13 +2909,12 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) {
do {
MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx));
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx);
build->blk = (*pCtx->pJoin->retrieveFp)(pCtx->pJoin, pCtx->pJoin->build);
qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
build->blkRowIdx = 0;
if (NULL == build->blk) {
build->dsFetchDone = true;
break;
}
@ -2892,6 +2956,10 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
do {
if (!mWinJoinRetrieve(pOperator, pJoin, pCtx)) {
if (pCtx->groupJoin && pCtx->finBlk->info.rows <= 0 && !mJoinIsDone(pOperator)) {
continue;
}
break;
}
@ -2927,6 +2995,33 @@ _return:
return pCtx->finBlk;
}
void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin) {
SMJoinWindowCtx* pWin = &pJoin->ctx.windowCtx;
SMJoinWinCache* pCache = &pWin->cache;
pWin->lastEqGrp = false;
pWin->lastProbeGrp = false;
pWin->eqPostDone = false;
pWin->lastTs = INT64_MIN;
pCache->outRowIdx = 0;
pCache->rowNum = 0;
pCache->grpIdx = 0;
if (pCache->grpsQueue) {
TSWAP(pCache->grps, pCache->grpsQueue);
}
taosArrayClear(pCache->grps);
if (pCache->outBlk) {
blockDataCleanup(pCache->outBlk);
}
mJoinResetGroupTableCtx(pJoin->probe);
mJoinResetGroupTableCtx(pJoin->build);
}
int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT;
pCache->colNum = pJoin->build->finNum;
@ -2955,9 +3050,8 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
pCtx->pJoin = pJoin;
pCtx->lastTs = INT64_MIN;
pCtx->seqWinGrp = pJoinNode->seqWinGroup;
pCtx->groupJoin = pJoinNode->grpJoin;
if (pCtx->seqWinGrp) {
pCtx->seqGrpId = 1;
pJoin->outGrpId = 1;
}
switch (pJoinNode->subType) {
@ -2973,6 +3067,7 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
} else if (pCtx->greaterRowsAcq) {
pJoin->joinFp = mAsofGreaterJoinDo;
}
pJoin->grpResetFp = mAsofJoinGroupReset;
break;
case JOIN_STYPE_WIN: {
SWindowOffsetNode* pOffsetNode = (SWindowOffsetNode*)pJoinNode->pWindowOffset;
@ -3023,6 +3118,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
pJoin->subType = JOIN_STYPE_OUTER;
pJoin->build->eqRowLimit = pCtx->jLimit;
pJoin->grpResetFp = mLeftJoinGroupReset;
} else {
pCtx->jLimit = -1;
}

View File

@ -591,7 +591,7 @@ int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastB
mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true);
if (!lastBuildGrp) {
mJoinRetrieveEqGrpRows(pJoin->pOperator, pJoin->build, timestamp);
mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp);
} else {
pJoin->build->grpIdx = 0;
}
@ -846,8 +846,92 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
pInfo->probe->type = E_JOIN_TB_PROBE;
}
SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
int32_t dsIdx = pTable->downStreamIdx;
if (E_JOIN_TB_PROBE == pTable->type) {
if (pTable->remainInBlk) {
SSDataBlock* pTmp = pTable->remainInBlk;
pTable->remainInBlk = NULL;
(*pJoin->grpResetFp)(pJoin);
pTable->lastInGid = pTmp->info.id.groupId;
return pTmp;
}
if (pTable->dsFetchDone) {
return NULL;
}
SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
if (NULL == pTmp) {
pTable->dsFetchDone = true;
return NULL;
}
if (0 == pTable->lastInGid) {
pTable->lastInGid = pTmp->info.id.groupId;
return pTmp;
}
if (pTable->lastInGid == pTmp->info.id.groupId) {
return pTmp;
}
pTable->remainInBlk = pTmp;
return NULL;
}
SMJoinTableCtx* pProbe = pJoin->probe;
ASSERT(pProbe->lastInGid);
while (true) {
if (pTable->remainInBlk) {
if (pTable->remainInBlk->info.id.groupId == pProbe->lastInGid) {
SSDataBlock* pTmp = pTable->remainInBlk;
pTable->remainInBlk = NULL;
pTable->lastInGid = pTmp->info.id.groupId;
return pTmp;
}
if (pTable->remainInBlk->info.id.groupId > pProbe->lastInGid) {
return NULL;
}
pTable->remainInBlk = NULL;
}
if (pTable->dsFetchDone) {
return NULL;
}
SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
if (NULL == pTmp) {
pTable->dsFetchDone = true;
return NULL;
}
pTable->remainInBlk = pTmp;
}
return NULL;
}
static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
if (pTable->dsFetchDone) {
return NULL;
}
SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTable->downStreamIdx);
if (NULL == pTmp) {
pTable->dsFetchDone = true;
}
return pTmp;
}
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
pJoin->retrieveCtx.grpRetrieve = pJoinNode->grpJoin;
pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin;
pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl;
if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType)))
|| (JOIN_STYPE_WIN == pJoin->subType)) {
@ -865,6 +949,10 @@ static void mJoinDestroyCtx(SMJoinOperatorInfo* pJoin) {
return mJoinDestroyMergeCtx(pJoin);
}
bool mJoinIsDone(SOperatorInfo* pOperator) {
return (OP_EXEC_DONE == pOperator->status);
}
void mJoinSetDone(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
if (pOperator->pDownstreamGetParams) {
@ -875,21 +963,15 @@ void mJoinSetDone(SOperatorInfo* pOperator) {
}
}
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
if (pTb->dsFetchDone) {
return (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) ? false : true;
}
bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) {
(*ppBlk) = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTb->downStreamIdx);
(*ppBlk) = (*pJoin->retrieveFp)(pJoin, pTb);
pTb->dsInitDone = true;
qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0);
*pIdx = 0;
if (NULL == (*ppBlk)) {
pTb->dsFetchDone = true;
} else {
if (NULL != (*ppBlk)) {
pTb->newBlk = true;
}
@ -1046,19 +1128,18 @@ _return:
}
int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, int64_t timestamp) {
int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp) {
bool wholeBlk = false;
mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true);
while (wholeBlk && !pTable->dsFetchDone) {
pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx);
qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->blk = (*pJoin->retrieveFp)(pJoin, pTable);
qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
pTable->blkRowIdx = 0;
if (NULL == pTable->blk) {
pTable->dsFetchDone = true;
break;
}
@ -1238,13 +1319,23 @@ int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable
return TSDB_CODE_SUCCESS;
}
void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx) {
pCtx->blk = NULL;
pCtx->blkRowIdx = 0;
pCtx->newBlk = false;
mJoinDestroyCreatedBlks(pCtx->createdBlks);
tSimpleHashClear(pCtx->pGrpHash);
}
void mJoinResetTableCtx(SMJoinTableCtx* pCtx) {
pCtx->dsInitDone = false;
pCtx->dsFetchDone = false;
pCtx->lastInGid = 0;
pCtx->remainInBlk = NULL;
mJoinDestroyCreatedBlks(pCtx->createdBlks);
tSimpleHashClear(pCtx->pGrpHash);
mJoinResetGroupTableCtx(pCtx);
}
void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) {
@ -1413,6 +1504,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
break;
case JOIN_STYPE_WIN:
pJoin->joinFp = mWinJoinDo;
pJoin->grpResetFp = mWinJoinGroupReset;
break;
default:
break;

View File

@ -485,6 +485,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
CLONE_NODE_FIELD(pFullOnCond);
CLONE_NODE_LIST_FIELD(pLeftEqNodes);
CLONE_NODE_LIST_FIELD(pRightEqNodes);
COPY_SCALAR_FIELD(allEqTags);
COPY_SCALAR_FIELD(isSingleTableJoin);
COPY_SCALAR_FIELD(hasSubQuery);
COPY_SCALAR_FIELD(seqWinGroup);

View File

@ -852,8 +852,7 @@ static int32_t pdcJoinSplitPrimEqCond(SOptimizeContext* pCxt, SJoinLogicNode* pJ
pPrimKeyEqCond = pJoin->pFullOnCond;
pJoinOnCond = NULL;
} else {
planError("unexcepted conds in fullOnCond, type:%s", nodesNodeName(nodeType(pJoin->pFullOnCond)));
code = TSDB_CODE_PLAN_INTERNAL_ERROR;
return TSDB_CODE_SUCCESS;
}
if (TSDB_CODE_SUCCESS == code) {
@ -924,11 +923,13 @@ static int32_t pdcJoinPartLogicEqualOnCond(SJoinLogicNode* pJoin) {
code = nodesListMakeAppend(&pTagEqOnConds, nodesCloneNode(pCond));
} else {
code = nodesListMakeAppend(&pColEqOnConds, nodesCloneNode(pCond));
pJoin->allEqTags = false;
}
} else if (allTags) {
code = nodesListMakeAppend(&pTagOnConds, nodesCloneNode(pCond));
} else {
code = nodesListMakeAppend(&pColOnConds, nodesCloneNode(pCond));
pJoin->allEqTags = false;
}
if (code) {
@ -978,6 +979,9 @@ static int32_t pdcJoinPartEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJ
pJoin->pTagEqCond = NULL;
return TSDB_CODE_SUCCESS;
}
pJoin->allEqTags = true;
if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pFullOnCond) &&
LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)(pJoin->pFullOnCond))->condType) {
return pdcJoinPartLogicEqualOnCond(pJoin);
@ -989,11 +993,13 @@ static int32_t pdcJoinPartEqualOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJ
pJoin->pTagEqCond = nodesCloneNode(pJoin->pFullOnCond);
} else {
pJoin->pColEqCond = nodesCloneNode(pJoin->pFullOnCond);
pJoin->allEqTags = false;
}
} else if (allTags) {
pJoin->pTagOnCond = nodesCloneNode(pJoin->pFullOnCond);
} else {
pJoin->pColOnCond = nodesCloneNode(pJoin->pFullOnCond);
pJoin->allEqTags = false;
}
return TSDB_CODE_SUCCESS;
@ -1274,7 +1280,7 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond) {
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond && !IS_WINDOW_JOIN(pJoin->subType)) {
code = pdcJoinSplitPrimEqCond(pCxt, pJoin);
}
@ -4540,10 +4546,37 @@ static int32_t grpJoinOptInsertPartitionNode(SLogicNode* pJoin) {
return code;
}
static int32_t grpJoinOptRewriteGroupJoin(SOptimizeContext* pCxt, SLogicNode* pJoin, SLogicSubplan* pLogicSubplan) {
int32_t code = grpJoinOptInsertPartitionNode(pJoin);
static int32_t grpJoinOptPartByTags(SLogicNode* pNode) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pChild = NULL;
SNode* pNew = NULL;
bool leftChild = true;
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
FOREACH(pChild, pNode->pChildren) {
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) {
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SScanLogicNode* pScan = (SScanLogicNode*)pChild;
if (leftChild) {
nodesListMakeStrictAppendList(&pScan->pGroupTags, nodesCloneList(pJoin->pLeftEqNodes));
leftChild = false;
} else {
nodesListMakeStrictAppendList(&pScan->pGroupTags, nodesCloneList(pJoin->pRightEqNodes));
}
pScan->groupSort = true;
pScan->groupOrderScan = true;
}
return code;
}
static int32_t grpJoinOptRewriteGroupJoin(SOptimizeContext* pCxt, SLogicNode* pNode, SLogicSubplan* pLogicSubplan) {
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
int32_t code = (pJoin->allEqTags && !pJoin->hasSubQuery) ? grpJoinOptPartByTags(pNode) : grpJoinOptInsertPartitionNode(pNode);
if (TSDB_CODE_SUCCESS == code) {
((SJoinLogicNode*)pJoin)->grpJoin = true;
pJoin->grpJoin = true;
pCxt->optimized = true;
}
return code;

View File

@ -1405,7 +1405,7 @@ static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubp
//if (pJoin->node.dynamicOp) {
// code = TSDB_CODE_SUCCESS;
//} else {
code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, false);
code = stbSplSplitMergeScanNode(pCxt, pSubplan, (SScanLogicNode*)pChild, pJoin->grpJoin ? true : false);
//}
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);

View File

@ -477,8 +477,437 @@ if $data61 != @23-11-17 16:29:03.000@ then
return -1
endi
sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1=b.col1 jlimit 2 order by a.ts
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != NULL then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data21 != NULL then
return -1
endi
if $data30 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:03.000@ then
return -1
endi
sql select a.ts, b.ts from sta a left asof join sta b on a.ts > b.ts and a.col1=b.col1 jlimit 2 order by a.ts
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != NULL then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data11 != NULL then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data21 != NULL then
return -1
endi
if $data30 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data41 != NULL then
return -1
endi
if $data50 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data51 != NULL then
return -1
endi
if $data60 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data70 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data71 != NULL then
return -1
endi
sql select a.ts, b.ts from sta a left asof join sta b on a.ts >= b.ts and a.col1=b.col1 jlimit 2 order by a.ts, b.ts;
if $rows != 10 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data70 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data71 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data80 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data81 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data90 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data91 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, b.ts from sta a left asof join sta b on a.ts < b.ts and a.col1=b.col1 jlimit 2 order by a.ts, b.ts
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != NULL then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data11 != NULL then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data31 != NULL then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data41 != NULL then
return -1
endi
if $data50 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data61 != NULL then
return -1
endi
if $data70 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data71 != NULL then
return -1
endi
sql select a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.col1=b.col1 jlimit 2 order by a.ts, b.ts
if $rows != 10 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data70 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data71 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data80 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data81 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data90 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data91 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, b.ts from sta a left asof join sta b on a.ts = b.ts and a.col1=b.col1 jlimit 2 order by a.ts, b.ts
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data70 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data71 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.t1, a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 jlimit 2 order by a.t1, a.ts, b.ts;
if $rows != 14 then
return -1
endi
sql select a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 order by a.t1, a.ts, b.ts;
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data70 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data71 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000' order by a.t1, a.ts, b.ts;
if $rows != 6 then
return -1
endi
if $data00 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select count(*) from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000';
if $rows != 1 then
return -1
endi
if $data00 != 6 then
return -1
endi
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1=a.ts;
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1 > 1;
sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1 > b.col1 jlimit 2 order by a.ts;
sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1 = 1 jlimit 2 order by a.ts;
sql_error select a.t1, a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 having(a.ts>0) order by a.t1, a.ts, b.ts;
sql_error select count(*) from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000' slimit 1;

View File

@ -384,6 +384,194 @@ if $data21 != 4 then
return -1
endi
sql select a.ts, b.ts from sta a left window join sta b on a.col1=b.col1 window_offset(-1s, 1s) order by a.col1, a.ts;
if $rows != 12 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:01.000@ then
return -1
endi
sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 window_offset(-1s, 1s) order by a.t1, a.ts, b.ts;
if $rows != 14 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data70 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data71 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data80 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data81 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data90 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data91 != @23-11-17 16:29:01.000@ then
return -1
endi
sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 and a.col1=b.col1 window_offset(-1s, 1s) order by a.t1, a.ts, b.ts;
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data70 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data71 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, b.ts from sta a left window join sta b on a.t1=b.t1 and a.col1=b.col1 window_offset(-2s, -1s) order by a.t1, a.ts, b.ts;
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != NULL then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != NULL then
return -1
endi
if $data20 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data21 != NULL then
return -1
endi
if $data30 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data31 != NULL then
return -1
endi
if $data40 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data41 != NULL then
return -1
endi
if $data50 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data51 != NULL then
return -1
endi
if $data60 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data61 != NULL then
return -1
endi
if $data70 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data71 != NULL then
return -1
endi
sql_error select a.col1, count(*) from sta a left window join sta b window_offset(-1s, 1s);
sql_error select b.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s);
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(b.ts > 0);
@ -392,3 +580,4 @@ sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s,
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(a.ts > "2023-11-17 16:29:00.000");
sql_error select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(a.ts > 0) order by count(*);
sql_error select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s) slimit 1;
sql_error select a.ts, b.ts from sta a left window join sta b on a.t1=1 window_offset(-1s, 1s) order by a.t1, a.ts;