enh: support hash join

This commit is contained in:
dapan1121 2024-04-12 15:44:23 +08:00
parent 86db8efcda
commit 319ab5f2c2
18 changed files with 1068 additions and 168 deletions

View File

@ -151,9 +151,10 @@ typedef struct SJoinLogicNode {
bool hashJoinHint;
// FOR HASH JOIN
STimeWindow timeRange; //table onCond filter
SNode* pLeftOnCond; //table onCond filter
SNode* pRightOnCond; //table onCond filter
int32_t timeRangeTarget; //table onCond filter
STimeWindow timeRange; //table onCond filter
SNode* pLeftOnCond; //table onCond filter
SNode* pRightOnCond; //table onCond filter
} SJoinLogicNode;
typedef struct SAggLogicNode {
@ -527,10 +528,15 @@ typedef struct SHashJoinPhysiNode {
SNode* pJLimit;
SNodeList* pOnLeft;
SNodeList* pOnRight;
STimeWindow timeRange; //table onCond filter
SNode* pLeftOnCond; //table onCond filter
SNode* pRightOnCond; //table onCond filter
SNode* pFullOnCond; //preFilter
SNode* leftPrimExpr;
SNode* rightPrimExpr;
int32_t leftPrimSlotId;
int32_t rightPrimSlotId;
int32_t timeRangeTarget; //table onCond filter
STimeWindow timeRange; //table onCond filter
SNode* pLeftOnCond; //table onCond filter
SNode* pRightOnCond; //table onCond filter
SNode* pFullOnCond; //preFilter
SNodeList* pTargets;
SQueryStat inputStat[2];

View File

@ -31,6 +31,15 @@ enum {
FLT_OPTION_NEED_UNIQE = 4,
};
typedef enum EConditionType {
COND_TYPE_PRIMARY_KEY = 1,
COND_TYPE_TAG_INDEX,
COND_TYPE_TAG,
COND_TYPE_NORMAL
} EConditionType;
#define FILTER_RESULT_ALL_QUALIFIED 0x1
#define FILTER_RESULT_NONE_QUALIFIED 0x2
#define FILTER_RESULT_PARTIAL_QUALIFIED 0x3
@ -54,6 +63,8 @@ extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pColsAgg,
/* condition split interface */
int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond,
SNode **pOtherCond);
bool filterIsMultiTableColsCond(SNode *pCond);
EConditionType filterClassifyCondition(SNode *pNode);
#ifdef __cplusplus
}

View File

@ -78,6 +78,7 @@ extern "C" {
#define EXPLAIN_JOIN_EQ_RIGHT_FORMAT "Right Equal Cond: "
#define EXPLAIN_COUNT_NUM_FORMAT "Window Count=%" PRId64
#define EXPLAIN_COUNT_SLIDING_FORMAT "Window Sliding=%" PRId64
#define EXPLAIN_TABLE_TIMERANGE_FORMAT "%s Table Time Range: [%" PRId64 ", %" PRId64 "]"
#define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms"
#define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms"

View File

@ -61,6 +61,15 @@ char* qExplainGetAsofOpStr(int32_t opType) {
}
}
char* qExplainGetTimerangeTargetStr(int32_t target) {
static char* targetName[] = {"", "Left", "Right", "Left/Right"};
if (target <= 0 || target > 3) {
return "Unknown";
}
return targetName[target];
}
void qExplainFreeResNode(SExplainResNode *resNode) {
if (NULL == resNode) {
@ -1700,19 +1709,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pJoinNode->node.pConditions || pJoinNode->pFullOnCond) {
if (pJoinNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
if (pJoinNode->node.pConditions) {
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
}
if (pJoinNode->pFullOnCond) {
if (pJoinNode->node.pConditions) {
EXPLAIN_ROW_APPEND(" AND ");
}
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
}
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
@ -1740,8 +1741,21 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
nodesNodeToSQL(pJoinNode->pTagEqCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
conditionsGot = true;
}
if (pJoinNode->pFullOnCond) {
if (conditionsGot) {
EXPLAIN_ROW_APPEND(" AND ");
}
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pFullOnCond, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pJoinNode->timeRangeTarget) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TABLE_TIMERANGE_FORMAT, qExplainGetTimerangeTargetStr(pJoinNode->timeRangeTarget), pJoinNode->timeRange.skey, pJoinNode->timeRange.ekey);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}

View File

@ -25,6 +25,9 @@ extern "C" {
#define HJOIN_ROW_BITMAP_SIZE (2 * 1048576)
#define HJOIN_BLK_THRESHOLD_RATIO 0.9
typedef int32_t (*hJoinImplFp)(SOperatorInfo*);
#pragma pack(push, 1)
typedef struct SBufRowInfo {
void* next;
@ -33,12 +36,24 @@ typedef struct SBufRowInfo {
} SBufRowInfo;
#pragma pack(pop)
typedef enum EHJoinPhase {
E_JOIN_PHASE_PRE = 1,
E_JOIN_PHASE_CUR,
E_JOIN_PHASE_POST
} EHJoinPhase;
typedef struct SHJoinCtx {
bool rowRemains;
bool midRemains;
int64_t limit;
SBufRowInfo* pBuildRow;
SSDataBlock* pProbeData;
int32_t probeIdx;
EHJoinPhase probePhase;
int32_t probePreIdx;
int32_t probeStartIdx;
int32_t probeEndIdx;
int32_t probePostIdx;
bool readMatch;
} SHJoinCtx;
typedef struct SHJoinColInfo {
@ -64,11 +79,31 @@ typedef struct SGroupData {
SBufRowInfo* rows;
} SGroupData;
typedef struct SHJoinTableInfo {
typedef struct SHJoinColMap {
int32_t srcSlot;
int32_t dstSlot;
bool vardata;
int32_t bytes;
} SHJoinColMap;
// for now timetruncate only
typedef struct SHJoinPrimExprCtx {
int64_t truncateUnit;
int64_t timezoneUnit;
int32_t targetSlotId;
} SHJoinPrimExprCtx;
typedef struct SHJoinTableCtx {
int32_t downStreamIdx;
SOperatorInfo* downStream;
int32_t blkId;
SQueryStat inputStat;
bool hasTimeRange;
SHJoinColMap* primCol;
SNode* primExpr;
SHJoinPrimExprCtx primCtx;
int32_t keyNum;
SHJoinColInfo* keyCols;
@ -82,7 +117,7 @@ typedef struct SHJoinTableInfo {
int32_t valBufSize;
SArray* valVarCols;
bool valColExist;
} SHJoinTableInfo;
} SHJoinTableCtx;
typedef struct SHJoinExecInfo {
int64_t buildBlkNum;
@ -95,14 +130,16 @@ typedef struct SHJoinExecInfo {
typedef struct SHJoinOperatorInfo {
int32_t joinType;
SHJoinTableInfo tbs[2];
SHJoinTableInfo* pBuild;
SHJoinTableInfo* pProbe;
EJoinType joinType;
EJoinSubType subType;
SHJoinTableCtx tbs[2];
SHJoinTableCtx* pBuild;
SHJoinTableCtx* pProbe;
SFilterInfo* pPreFilter;
SFilterInfo* pFinFilter;
SSDataBlock* finBlk;
SSDataBlock* midBlk;
STimeWindow tblTimeRange;
int32_t pResColNum;
int8_t* pResColMap;
SArray* pRowBufs;
@ -111,6 +148,7 @@ typedef struct SHJoinOperatorInfo {
SHJoinCtx ctx;
SHJoinExecInfo execInfo;
int32_t blkThreshold;
hJoinImplFp joinFp;
} SHJoinOperatorInfo;
@ -132,6 +170,16 @@ typedef struct SHJoinOperatorInfo {
} \
} while (0)
int32_t hInnerJoinDo(struct SOperatorInfo* pOperator);
int32_t hLeftJoinDo(struct SOperatorInfo* pOperator);
void hJoinSetDone(struct SOperatorInfo* pOperator);
void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched);
bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen);
int32_t hJoinCopyMergeMidBlk(SHJoinCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin);
int32_t hJoinHandleMidRemains(SHJoinOperatorInfo* pJoin, SHJoinCtx* pCtx);
bool hJoinBlkReachThreshold(SHJoinOperatorInfo* pInfo, int64_t blkRows);
int32_t hJoinCopyNMatchRowsToBlock(SHJoinOperatorInfo* pJoin, SSDataBlock* pRes, int32_t startIdx, int32_t rows);
#ifdef __cplusplus
}

View File

@ -30,6 +30,7 @@ extern "C" {
#define MJOIN_BLK_SIZE_LIMIT 10485760
#define MJOIN_ROW_BITMAP_SIZE (2 * 1048576)
#endif
#define MJOIN_SEMI_ANTI_BLK_ROWS_NUM 64
#define MJOIN_BLK_THRESHOLD_RATIO 0.9
struct SMJoinOperatorInfo;

View File

@ -29,3 +29,321 @@
int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinTableCtx* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx;
SSDataBlock* pRes = pJoin->finBlk;
size_t bufLen = 0;
int32_t code = 0;
bool allFetched = false;
if (pJoin->ctx.pBuildRow) {
hJoinAppendResToBlock(pOperator, pRes, &allFetched);
if (pRes->info.rows >= pRes->info.capacity) {
if (allFetched) {
++pCtx->probeStartIdx;
}
return code;
} else {
++pCtx->probeStartIdx;
}
}
for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
continue;
}
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
*/
if (pGroup) {
pCtx->pBuildRow = pGroup->rows;
hJoinAppendResToBlock(pOperator, pRes, &allFetched);
if (pRes->info.rows >= pRes->info.capacity) {
if (allFetched) {
++pCtx->probeStartIdx;
}
return code;
}
}
}
pCtx->rowRemains = false;
return code;
}
int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
bool allFetched = false;
SHJoinCtx* pCtx = &pJoin->ctx;
while (!allFetched) {
hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
if (pJoin->midBlk->info.rows > 0) {
doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL);
if (pJoin->midBlk->info.rows > 0) {
pCtx->readMatch = true;
HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
if (pCtx->midRemains) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
}
if (allFetched && !pCtx->readMatch) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
}
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
++pCtx->probeStartIdx;
*loopCont = true;
return TSDB_CODE_SUCCESS;
}
int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
SHJoinTableCtx* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx;
size_t bufLen = 0;
bool allFetched = false;
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
return TSDB_CODE_SUCCESS;
}
for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
continue;
}
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
*/
if (NULL == pGroup) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
++pCtx->probeStartIdx;
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
continue;
}
pCtx->readMatch = false;
pCtx->pBuildRow = pGroup->rows;
allFetched = false;
while (!allFetched) {
hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
if (pJoin->midBlk->info.rows > 0) {
doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL);
if (pJoin->midBlk->info.rows > 0) {
pCtx->readMatch = true;
HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
if (pCtx->midRemains) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
}
if (allFetched && !pCtx->readMatch) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
}
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
}
pCtx->probePhase = E_JOIN_PHASE_POST;
*loopCont = true;
return TSDB_CODE_SUCCESS;
}
int32_t hLeftJoinHandleRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
bool allFetched = false;
SHJoinCtx* pCtx = &pJoin->ctx;
hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched);
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
} else {
++pCtx->probeStartIdx;
}
*loopCont = true;
return TSDB_CODE_SUCCESS;
}
int32_t hLeftJoinHandleProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
SHJoinTableCtx* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx;
size_t bufLen = 0;
bool allFetched = false;
for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
continue;
}
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
*/
if (NULL == pGroup) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
++pCtx->probeStartIdx;
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
continue;
}
pCtx->pBuildRow = pGroup->rows;
hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched);
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
pCtx->probePhase = E_JOIN_PHASE_POST;
*loopCont = true;
return TSDB_CODE_SUCCESS;
}
int32_t hLeftJoinDo(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinCtx* pCtx = &pJoin->ctx;
while (pCtx->rowRemains) {
switch (pCtx->probePhase) {
case E_JOIN_PHASE_PRE: {
int32_t rows = pCtx->probeStartIdx - pCtx->probePreIdx;
int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows;
if (rows <= rowsLeft) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rows));
pCtx->probePhase = E_JOIN_PHASE_CUR;
} else {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rowsLeft));
pJoin->ctx.probePreIdx += rowsLeft;
return TSDB_CODE_SUCCESS;
}
break;
}
case E_JOIN_PHASE_CUR: {
bool loopCont = false;
if (NULL == pJoin->ctx.pBuildRow) {
HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqProbeRows(pOperator, pJoin, &loopCont) : hLeftJoinHandleProbeRows(pOperator, pJoin, &loopCont));
} else {
HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqRowRemains(pOperator, pJoin, &loopCont) : hLeftJoinHandleRowRemains(pOperator, pJoin, &loopCont));
}
if (!loopCont) {
return TSDB_CODE_SUCCESS;
}
break;
}
case E_JOIN_PHASE_POST: {
if (pCtx->probeEndIdx < (pCtx->pProbeData->info.rows - 1) && pCtx->probePostIdx <= (pCtx->pProbeData->info.rows - 1)) {
int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows;
int32_t rows = pCtx->pProbeData->info.rows - pCtx->probePostIdx;
if (rows <= rowsLeft) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rows));
pCtx->rowRemains = false;
} else {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rowsLeft));
pCtx->probePostIdx += rowsLeft;
return TSDB_CODE_SUCCESS;
}
} else {
pJoin->ctx.rowRemains = false;
}
break;
}
default:
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
}
return TSDB_CODE_SUCCESS;
}

View File

@ -26,6 +26,106 @@
#include "tmsg.h"
#include "ttypes.h"
#include "hashjoin.h"
#include "functionMgt.h"
bool hJoinBlkReachThreshold(SHJoinOperatorInfo* pInfo, int64_t blkRows) {
if (INT64_MAX == pInfo->ctx.limit || pInfo->pFinFilter != NULL) {
return blkRows >= pInfo->blkThreshold;
}
return (pInfo->execInfo.resRows + blkRows) >= pInfo->ctx.limit;
}
int32_t hJoinHandleMidRemains(SHJoinOperatorInfo* pJoin, SHJoinCtx* pCtx) {
ASSERT(0 < pJoin->midBlk->info.rows);
TSWAP(pJoin->midBlk, pJoin->finBlk);
pCtx->midRemains = false;
return TSDB_CODE_SUCCESS;
}
int32_t hJoinCopyMergeMidBlk(SHJoinCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
SSDataBlock* pLess = *ppMid;
SSDataBlock* pMore = *ppFin;
/*
if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
pLess = (*ppMid);
pMore = (*ppFin);
} else {
pLess = (*ppFin);
pMore = (*ppMid);
}
*/
int32_t totalRows = pMore->info.rows + pLess->info.rows;
if (totalRows <= pMore->info.capacity) {
HJ_ERR_RET(blockDataMerge(pMore, pLess));
blockDataCleanup(pLess);
pCtx->midRemains = false;
} else {
int32_t copyRows = pMore->info.capacity - pMore->info.rows;
HJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
blockDataShrinkNRows(pLess, copyRows);
pCtx->midRemains = true;
}
/*
if (pMore != (*ppFin)) {
TSWAP(*ppMid, *ppFin);
}
*/
return TSDB_CODE_SUCCESS;
}
int32_t hJoinSetImplFp(SHJoinOperatorInfo* pJoin) {
switch (pJoin->joinType) {
case JOIN_TYPE_INNER:
pJoin->joinFp = hInnerJoinDo;
break;
case JOIN_TYPE_LEFT:
case JOIN_TYPE_RIGHT: {
switch (pJoin->subType) {
case JOIN_STYPE_OUTER:
pJoin->joinFp = hLeftJoinDo;
break;
default:
break;
}
break;
}
default:
break;
}
return TSDB_CODE_SUCCESS;
}
int32_t hJoinLaunchPrimExpr(SSDataBlock* pBlock, SHJoinTableCtx* pTable, int32_t startIdx, int32_t endIdx) {
if (NULL == pTable->primExpr) {
return TSDB_CODE_SUCCESS;
}
SHJoinPrimExprCtx* pCtx = &pTable->primCtx;
SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
if (0 != pCtx->timezoneUnit) {
for (int32_t i = startIdx; i <= endIdx; ++i) {
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit - pCtx->timezoneUnit;
}
} else {
for (int32_t i = startIdx; i <= endIdx; ++i) {
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
}
}
return TSDB_CODE_SUCCESS;
}
static int64_t hJoinGetSingleKeyRowsNum(SBufRowInfo* pRow) {
@ -52,7 +152,7 @@ static int64_t hJoinGetRowsNumOfKeyHash(SSHashObj* pHash) {
return rowsNum;
}
static int32_t hJoinInitKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
static int32_t hJoinInitKeyColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) {
pTable->keyNum = LIST_LENGTH(pList);
pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo));
@ -106,7 +206,7 @@ static bool hJoinIsValColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo
return false;
}
static int32_t hJoinInitValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
static int32_t hJoinInitValColsInfo(SHJoinTableCtx* pTable, SNodeList* pList) {
hJoinGetValColsNum(pList, pTable->blkId, &pTable->valNum);
if (pTable->valNum == 0) {
return TSDB_CODE_SUCCESS;
@ -157,17 +257,71 @@ static int32_t hJoinInitValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
return TSDB_CODE_SUCCESS;
}
static int32_t hJoinInitPrimKeyInfo(SHJoinTableCtx* pTable, int32_t slotId) {
pTable->primCol = taosMemoryMalloc(sizeof(SHJoinColMap));
if (NULL == pTable->primCol) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTable->primCol->srcSlot = slotId;
return TSDB_CODE_SUCCESS;
}
static int32_t hJoinInitPrimExprCtx(SNode* pNode, SHJoinPrimExprCtx* pCtx, SHJoinTableCtx* pTable) {
if (NULL == pNode) {
pCtx->targetSlotId = pTable->primCol->srcSlot;
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_TARGET != nodeType(pNode)) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
STargetNode* pTarget = (STargetNode*)pNode;
if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr)) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
SValueNode* pCurrTz = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2) : NULL;
SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
pCtx->truncateUnit = pUnit->typeData;
if ((NULL == pCurrTz || 1 == pCurrTz->typeData) && pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) {
pCtx->timezoneUnit = offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
}
pCtx->targetSlotId = pTarget->slotId;
return TSDB_CODE_SUCCESS;
}
static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
SNodeList* pKeyList = NULL;
SHJoinTableInfo* pTable = &pJoin->tbs[idx];
SHJoinTableCtx* pTable = &pJoin->tbs[idx];
pTable->downStream = pDownstream[idx];
pTable->blkId = pDownstream[idx]->resultDataBlockId;
if (0 == idx) {
pKeyList = pJoinNode->pOnLeft;
pTable->hasTimeRange = pJoinNode->timeRangeTarget & 0x1;
} else {
pKeyList = pJoinNode->pOnRight;
pTable->hasTimeRange = pJoinNode->timeRangeTarget & 0x2;
}
HJ_ERR_RET(hJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
int32_t code = hJoinInitKeyColsInfo(pTable, pKeyList);
if (code) {
@ -180,6 +334,8 @@ static int32_t hJoinInitTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode*
memcpy(&pTable->inputStat, pStat, sizeof(*pStat));
HJ_ERR_RET(hJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
return TSDB_CODE_SUCCESS;
}
@ -188,9 +344,11 @@ static void hJoinSetBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysi
int32_t probeIdx = 1;
pInfo->joinType = pJoinNode->joinType;
pInfo->subType = pJoinNode->subType;
switch (pInfo->joinType) {
case JOIN_TYPE_INNER:
case JOIN_TYPE_FULL:
if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) {
buildIdx = 0;
probeIdx = 1;
@ -216,6 +374,14 @@ static void hJoinSetBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysi
pInfo->pBuild->downStreamIdx = buildIdx;
pInfo->pProbe->downStreamIdx = probeIdx;
if (0 == buildIdx) {
pInfo->pBuild->primExpr = pJoinNode->leftPrimExpr;
pInfo->pProbe->primExpr = pJoinNode->rightPrimExpr;
} else {
pInfo->pBuild->primExpr = pJoinNode->rightPrimExpr;
pInfo->pProbe->primExpr = pJoinNode->leftPrimExpr;
}
}
static int32_t hJoinBuildResColsMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
@ -263,11 +429,12 @@ static int32_t hJoinInitBufPages(SHJoinOperatorInfo* pInfo) {
return hJoinAddPageToBufs(pInfo->pRowBufs);
}
static void hJoinFreeTableInfo(SHJoinTableInfo* pTable) {
static void hJoinFreeTableInfo(SHJoinTableCtx* pTable) {
taosMemoryFreeClear(pTable->keyCols);
taosMemoryFreeClear(pTable->keyBuf);
taosMemoryFreeClear(pTable->valCols);
taosArrayDestroy(pTable->valVarCols);
taosMemoryFree(pTable->primCol);
}
static void hJoinFreeBufPage(void* param) {
@ -305,9 +472,9 @@ static FORCE_INLINE char* hJoinRetrieveColDataFromRowBufs(SArray* pRowBufs, SBuf
return pPage->data + pRow->offset;
}
static FORCE_INLINE int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) {
SHJoinTableInfo* pBuild = pJoin->pBuild;
SHJoinTableInfo* pProbe = pJoin->pProbe;
static int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) {
SHJoinTableCtx* pBuild = pJoin->pBuild;
SHJoinTableCtx* pProbe = pJoin->pProbe;
int32_t buildIdx = 0, buildValIdx = 0;
int32_t probeIdx = 0;
SBufRowInfo* pRow = pStart;
@ -347,7 +514,7 @@ static FORCE_INLINE int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, i
SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx));
code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeStartIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeStartIdx));
if (code) {
return code;
}
@ -360,8 +527,37 @@ static FORCE_INLINE int32_t hJoinCopyResRowsToBlock(SHJoinOperatorInfo* pJoin, i
return TSDB_CODE_SUCCESS;
}
int32_t hJoinCopyNMatchRowsToBlock(SHJoinOperatorInfo* pJoin, SSDataBlock* pRes, int32_t startIdx, int32_t rows) {
SHJoinTableCtx* pBuild = pJoin->pBuild;
SHJoinTableCtx* pProbe = pJoin->pProbe;
int32_t buildIdx = 0;
int32_t probeIdx = 0;
int32_t code = 0;
static FORCE_INLINE void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) {
for (int32_t i = 0; i < pJoin->pResColNum; ++i) {
if (pJoin->pResColMap[i]) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot);
colDataSetNItemsNull(pDst, pRes->info.rows, rows);
buildIdx++;
} else {
SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot);
colDataAssignNRows(pDst, pRes->info.rows, pSrc, startIdx, rows);
probeIdx++;
}
}
pRes->info.rows += rows;
return TSDB_CODE_SUCCESS;
}
void hJoinAppendResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinCtx* pCtx = &pJoin->ctx;
SBufRowInfo* pStart = pCtx->pBuildRow;
@ -387,7 +583,7 @@ static FORCE_INLINE void hJoinAppendResToBlock(struct SOperatorInfo* pOperator,
}
static FORCE_INLINE bool hJoinCopyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) {
bool hJoinCopyKeyColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen) {
char *pData = NULL;
size_t bufLen = 0;
@ -428,61 +624,7 @@ static FORCE_INLINE bool hJoinCopyKeyColsDataToBuf(SHJoinTableInfo* pTable, int3
return false;
}
static void doHashJoinImpl(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinTableInfo* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx;
SSDataBlock* pRes = pJoin->finBlk;
size_t bufLen = 0;
bool allFetched = false;
if (pJoin->ctx.pBuildRow) {
hJoinAppendResToBlock(pOperator, pRes, &allFetched);
if (pRes->info.rows >= pRes->info.capacity) {
if (allFetched) {
++pCtx->probeIdx;
}
return;
} else {
++pCtx->probeIdx;
}
}
for (; pCtx->probeIdx < pCtx->pProbeData->info.rows; ++pCtx->probeIdx) {
if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen)) {
continue;
}
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
*/
if (pGroup) {
pCtx->pBuildRow = pGroup->rows;
hJoinAppendResToBlock(pOperator, pRes, &allFetched);
if (pRes->info.rows >= pRes->info.capacity) {
if (allFetched) {
++pCtx->probeIdx;
}
return;
}
} else {
qTrace("no key matched");
}
}
pCtx->rowRemains = false;
}
static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) {
for (int32_t i = 0; i < pTable->keyNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
@ -503,7 +645,7 @@ static int32_t hJoinSetKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable)
return TSDB_CODE_SUCCESS;
}
static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) {
static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableCtx* pTable) {
if (!pTable->valColExist) {
return TSDB_CODE_SUCCESS;
}
@ -534,7 +676,7 @@ static int32_t hJoinSetValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable)
static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) {
static FORCE_INLINE void hJoinCopyValColsDataToBuf(SHJoinTableCtx* pTable, int32_t rowIdx) {
if (!pTable->valColExist) {
return;
}
@ -596,7 +738,7 @@ static FORCE_INLINE int32_t hJoinGetValBufFromPages(SArray* pPages, int32_t bufS
} while (true);
}
static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) {
static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableCtx* pTable, int32_t rowIdx) {
if (NULL == pTable->valVarCols) {
return pTable->valBufSize;
}
@ -614,7 +756,7 @@ static FORCE_INLINE int32_t hJoinGetValBufSize(SHJoinTableInfo* pTable, int32_t
}
static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) {
static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableCtx* pTable, size_t keyLen, int32_t rowIdx) {
SGroupData group = {0};
SBufRowInfo* pRow = NULL;
@ -652,7 +794,7 @@ static int32_t hJoinAddRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGro
}
static int32_t hJoinAddRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) {
SHJoinTableInfo* pBuild = pJoin->pBuild;
SHJoinTableCtx* pBuild = pJoin->pBuild;
int32_t code = hJoinSetValColsData(pBlock, pBuild);
if (code) {
return code;
@ -669,15 +811,64 @@ static int32_t hJoinAddRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock,
return TSDB_CODE_SUCCESS;
}
static bool hJoinFilterTimeRange(SSDataBlock* pBlock, STimeWindow* pRange, int32_t primSlot, int32_t* startIdx, int32_t* endIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, primSlot);
if (NULL == pCol) {
qError("hash join can't get prim col, slot:%d, slotNum:%d", primSlot, (int32_t)taosArrayGetSize(pBlock->pDataBlock));
return false;
}
TSKEY skey = *(TSKEY*)colDataGetData(pCol, 0);
TSKEY ekey = *(TSKEY*)colDataGetData(pCol, (pBlock->info.rows - 1));
if (ekey < pRange->skey || skey > pRange->ekey) {
return false;
}
if (skey >= pRange->skey && ekey <= pRange->ekey) {
*startIdx = 0;
*endIdx = pBlock->info.rows - 1;
return true;
}
if (skey < pRange->skey && ekey > pRange->ekey) {
TSKEY *pStart = (TSKEY*)taosbsearch(&pRange->skey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_GE);
TSKEY *pEnd = (TSKEY*)taosbsearch(&pRange->ekey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_LE);
*startIdx = ((uint64_t)pStart - (uint64_t)pCol->pData) / sizeof(int64_t);
*endIdx = ((uint64_t)pEnd - (uint64_t)pCol->pData) / sizeof(int64_t);
return true;
}
if (skey >= pRange->skey) {
TSKEY *pEnd = (TSKEY*)taosbsearch(&pRange->ekey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_LE);
*startIdx = 0;
*endIdx = ((uint64_t)pEnd - (uint64_t)pCol->pData) / sizeof(int64_t);
return true;
}
TSKEY *pStart = (TSKEY*)taosbsearch(&pRange->skey, pCol->pData, pBlock->info.rows, sizeof(TSKEY), compareInt64Val, TD_GE);
*startIdx = ((uint64_t)pStart - (uint64_t)pCol->pData) / sizeof(int64_t);
*endIdx = pBlock->info.rows - 1;
return true;
}
static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) {
SHJoinTableInfo* pBuild = pJoin->pBuild;
SHJoinTableCtx* pBuild = pJoin->pBuild;
int32_t startIdx = 0, endIdx = pBlock->info.rows - 1;
if (pBuild->hasTimeRange && !hJoinFilterTimeRange(pBlock, &pJoin->tblTimeRange, pBuild->primCol->srcSlot, &startIdx, &endIdx)) {
return TSDB_CODE_SUCCESS;
}
HJ_ERR_RET(hJoinLaunchPrimExpr(pBlock, pBuild, startIdx, endIdx));
int32_t code = hJoinSetKeyColsData(pBlock, pBuild);
if (code) {
return code;
}
size_t bufLen = 0;
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
for (int32_t i = startIdx; i <= endIdx; ++i) {
if (hJoinCopyKeyColsDataToBuf(pBuild, i, &bufLen)) {
continue;
}
@ -690,7 +881,7 @@ static int32_t hJoinAddBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo*
return code;
}
static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator) {
static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator, bool* queryDone) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SSDataBlock* pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS;
@ -710,12 +901,36 @@ static int32_t hJoinBuildHash(struct SOperatorInfo* pOperator) {
}
}
if (IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && tSimpleHashGetSize(pJoin->pKeyHash) <= 0) {
hJoinSetDone(pOperator);
*queryDone = true;
}
qTrace("build table rows:%" PRId64, hJoinGetRowsNumOfKeyHash(pJoin->pKeyHash));
return TSDB_CODE_SUCCESS;
}
static int32_t hJoinPrepareStart(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinTableInfo* pProbe = pJoin->pProbe;
SHJoinTableCtx* pProbe = pJoin->pProbe;
int32_t startIdx = 0, endIdx = pBlock->info.rows - 1;
if (pProbe->hasTimeRange && !hJoinFilterTimeRange(pBlock, &pJoin->tblTimeRange, pProbe->primCol->srcSlot, &startIdx, &endIdx)) {
if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) {
pJoin->ctx.probeEndIdx = -1;
pJoin->ctx.probePostIdx = 0;
pJoin->ctx.pProbeData = pBlock;
pJoin->ctx.rowRemains = true;
pJoin->ctx.probePhase = E_JOIN_PHASE_POST;
HJ_ERR_RET((*pJoin->joinFp)(pOperator));
}
return TSDB_CODE_SUCCESS;
}
HJ_ERR_RET(hJoinLaunchPrimExpr(pBlock, pProbe, startIdx, endIdx));
int32_t code = hJoinSetKeyColsData(pBlock, pProbe);
if (code) {
return code;
@ -725,17 +940,26 @@ static int32_t hJoinPrepareStart(struct SOperatorInfo* pOperator, SSDataBlock* p
return code;
}
pJoin->ctx.probeIdx = 0;
pJoin->ctx.probeStartIdx = startIdx;
pJoin->ctx.probeEndIdx = endIdx;
pJoin->ctx.pBuildRow = NULL;
pJoin->ctx.pProbeData = pBlock;
pJoin->ctx.rowRemains = true;
pJoin->ctx.probePreIdx = 0;
pJoin->ctx.probePostIdx = endIdx + 1;
doHashJoinImpl(pOperator);
if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && startIdx > 0) {
pJoin->ctx.probePhase = E_JOIN_PHASE_PRE;
} else {
pJoin->ctx.probePhase = E_JOIN_PHASE_CUR;
}
HJ_ERR_RET((*pJoin->joinFp)(pOperator));
return TSDB_CODE_SUCCESS;
}
static void hJoinSetDone(struct SOperatorInfo* pOperator) {
void hJoinSetDone(struct SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
SHJoinOperatorInfo* pInfo = pOperator->info;
@ -749,7 +973,6 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pRes = pJoin->finBlk;
pRes->info.rows = 0;
int64_t st = 0;
if (pOperator->cost.openCost == 0) {
@ -757,30 +980,35 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
}
if (pOperator->status == OP_EXEC_DONE) {
pRes->info.rows = 0;
goto _return;
}
if (!pJoin->keyHashBuilt) {
pJoin->keyHashBuilt = true;
code = hJoinBuildHash(pOperator);
bool queryDone = false;
code = hJoinBuildHash(pOperator, &queryDone);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) {
hJoinSetDone(pOperator);
if (queryDone) {
goto _return;
}
//qTrace("build table rows:%" PRId64, getRowsNumOfKeyHash(pJoin->pKeyHash));
}
blockDataCleanup(pRes);
if (pJoin->ctx.rowRemains) {
doHashJoinImpl(pOperator);
code = (*pJoin->joinFp)(pOperator);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (pRes->info.rows >= pRes->info.capacity && pJoin->pFinFilter != NULL) {
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
doFilter(pRes, pJoin->pFinFilter, NULL);
}
if (pRes->info.rows > 0) {
@ -804,13 +1032,14 @@ static SSDataBlock* hJoinMainProcess(struct SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code);
}
if (pRes->info.rows < pJoin->blkThreshold) {
if (!hJoinBlkReachThreshold(pJoin, pRes->info.rows)) {
continue;
}
if (pJoin->pFinFilter != NULL) {
if (pRes->info.rows > 0 && pJoin->pFinFilter != NULL) {
doFilter(pRes, pJoin->pFinFilter, NULL);
}
if (pRes->info.rows > 0) {
break;
}
@ -912,42 +1141,39 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
goto _return;
}
pInfo->tblTimeRange.skey = pJoinNode->timeRange.skey;
pInfo->tblTimeRange.ekey = pJoinNode->timeRange.ekey;
pInfo->ctx.limit = pJoinNode->node.pLimit ? ((SLimitNode*)pJoinNode->node.pLimit)->limit : INT64_MAX;
hJoinInitResBlocks(pInfo, pJoinNode);
setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
hJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
hJoinSetBuildAndProbeTable(pInfo, pJoinNode);
code = hJoinBuildResColsMap(pInfo, pJoinNode);
if (code) {
goto _error;
}
HJ_ERR_JRET(hJoinBuildResColsMap(pInfo, pJoinNode));
code = hJoinInitBufPages(pInfo);
if (code) {
goto _error;
}
HJ_ERR_JRET(hJoinInitBufPages(pInfo));
size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024;
pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (pInfo->pKeyHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
goto _return;
}
HJ_ERR_JRET(hJoinHandleConds(pInfo, pJoinNode));
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
HJ_ERR_JRET(hJoinInitResBlocks(pInfo, pJoinNode));
HJ_ERR_JRET(hJoinSetImplFp(pInfo));
HJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hJoinMainProcess, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
@ -955,7 +1181,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
return pOperator;
_error:
_return:
if (pInfo != NULL) {
destroyHashJoinOperator(pInfo);
}

View File

@ -31,7 +31,11 @@ static uint32_t mJoinGetFinBlkCapacity(SMJoinOperatorInfo* pJoin, SSortMergeJoin
uint32_t maxRows = TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
if (INT64_MAX != pJoin->ctx.mergeCtx.limit && NULL == pJoin->pFinFilter) {
uint32_t limitMaxRows = pJoin->ctx.mergeCtx.limit / MJOIN_BLK_THRESHOLD_RATIO + 1;
return (maxRows > limitMaxRows) ? limitMaxRows : maxRows;
maxRows = TMIN(maxRows, limitMaxRows);
}
if (JOIN_STYPE_SEMI == pJoinNode->subType || JOIN_STYPE_ANTI == pJoinNode->subType) {
maxRows = TMIN(MJOIN_SEMI_ANTI_BLK_ROWS_NUM, maxRows);
}
return maxRows;
@ -257,7 +261,7 @@ static int32_t mOuterJoinMergeSeqCart(SMJoinMergeCtx* pCtx) {
int32_t startGrpIdx = 0;
int32_t startRowIdx = -1;
blockDataCleanup(pCtx->midBlk);
//blockDataCleanup(pCtx->midBlk);
do {
for (; !GRP_DONE(probeGrp) && !BLK_IS_FULL(pCtx->finBlk);
@ -350,7 +354,7 @@ static int32_t mOuterJoinHashGrpCartFilter(SMJoinMergeCtx* pCtx, bool* contLoop)
SMJoinGrpRows* probeGrp = taosArrayGet(probe->eqGrps, probe->grpIdx);
int32_t startRowIdx = 0;
blockDataCleanup(pCtx->midBlk);
//blockDataCleanup(pCtx->midBlk);
do {
startRowIdx = build->grpRowIdx;

View File

@ -496,6 +496,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
COPY_SCALAR_FIELD(hashJoinHint);
CLONE_NODE_FIELD(pLeftOnCond);
CLONE_NODE_FIELD(pRightOnCond);
COPY_SCALAR_FIELD(timeRangeTarget);
COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow));
return TSDB_CODE_SUCCESS;
}

View File

@ -2189,6 +2189,7 @@ static const char* jkJoinPhysiPlanLeftOnCond = "LeftOnCond";
static const char* jkJoinPhysiPlanRightOnCond = "RightOnCond";
static const char* jkJoinPhysiPlanTimeRangeSKey = "TimeRangeSKey";
static const char* jkJoinPhysiPlanTimeRangeEKey = "TimeRangeEKey";
static const char* jkJoinPhysiPlanTimeRangeTarget = "TimeRangeTarget";
static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj;
@ -2333,12 +2334,27 @@ static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanSubType, pNode->subType);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkJoinPhysiPlanOnLeftCols, pNode->pOnLeft);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkJoinPhysiPlanOnRightCols, pNode->pOnRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanLeftPrimExpr, nodeToJson, pNode->leftPrimExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanRightPrimExpr, nodeToJson, pNode->rightPrimExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightPrimSlotId, pNode->rightPrimSlotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanOnConditions, nodeToJson, pNode->pFullOnCond);
}
@ -2363,6 +2379,9 @@ static int32_t physiHashJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkJoinPhysiPlanRightOnCond, nodeToJson, pNode->pRightOnCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanTimeRangeTarget, pNode->timeRangeTarget);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanTimeRangeSKey, pNode->timeRange.skey);
}
@ -2381,12 +2400,27 @@ static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanSubType, pNode->subType, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanOnLeftCols, &pNode->pOnLeft);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkJoinPhysiPlanOnRightCols, &pNode->pOnRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanLeftPrimExpr, &pNode->leftPrimExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanRightPrimExpr, &pNode->rightPrimExpr);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId, code);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightPrimSlotId, pNode->rightPrimSlotId, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pFullOnCond);
}
@ -2411,6 +2445,9 @@ static int32_t jsonToPhysiHashJoinNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkJoinPhysiPlanRightOnCond, &pNode->pRightOnCond);
}
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanTimeRangeTarget, pNode->timeRangeTarget, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkJoinPhysiPlanTimeRangeSKey, &pNode->timeRange.skey);
}

View File

@ -2641,8 +2641,14 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) {
enum {
PHY_HASH_JOIN_CODE_BASE_NODE = 1,
PHY_HASH_JOIN_CODE_JOIN_TYPE,
PHY_HASH_JOIN_CODE_JOIN_STYPE,
PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN,
PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN,
PHY_HASH_JOIN_CODE_LEFT_PRIM_EXPR,
PHY_HASH_JOIN_CODE_RIGHT_PRIM_EXPR,
PHY_HASH_JOIN_CODE_LEFT_PRIM_SLOTID,
PHY_HASH_JOIN_CODE_RIGHT_PRIM_SLOTID,
PHY_HASH_JOIN_CODE_TIME_RANGE_TARGET,
PHY_HASH_JOIN_CODE_ON_CONDITIONS,
PHY_HASH_JOIN_CODE_TARGETS,
PHY_HASH_JOIN_CODE_INPUT_ROW_NUM0,
@ -2663,12 +2669,30 @@ static int32_t physiHashJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_HASH_JOIN_CODE_JOIN_TYPE, pNode->joinType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeEnum(pEncoder, PHY_HASH_JOIN_CODE_JOIN_STYPE, pNode->subType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN, nodeListToMsg, pNode->pOnLeft);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN, nodeListToMsg, pNode->pOnRight);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_LEFT_PRIM_EXPR, nodeToMsg, pNode->leftPrimExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_RIGHT_PRIM_EXPR, nodeToMsg, pNode->rightPrimExpr);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_LEFT_PRIM_SLOTID, pNode->leftPrimSlotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_RIGHT_PRIM_SLOTID, pNode->rightPrimSlotId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_HASH_JOIN_CODE_TIME_RANGE_TARGET, pNode->timeRangeTarget);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_HASH_JOIN_CODE_ON_CONDITIONS, nodeToMsg, pNode->pFullOnCond);
}
@ -2717,12 +2741,30 @@ static int32_t msgToPhysiHashJoinNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_HASH_JOIN_CODE_JOIN_TYPE:
code = tlvDecodeEnum(pTlv, &pNode->joinType, sizeof(pNode->joinType));
break;
case PHY_HASH_JOIN_CODE_JOIN_STYPE:
code = tlvDecodeEnum(pTlv, &pNode->subType, sizeof(pNode->subType));
break;
case PHY_HASH_JOIN_CODE_ON_LEFT_COLUMN:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pOnLeft);
break;
case PHY_HASH_JOIN_CODE_ON_RIGHT_COLUMN:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pOnRight);
break;
case PHY_HASH_JOIN_CODE_LEFT_PRIM_EXPR:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->leftPrimExpr);
break;
case PHY_HASH_JOIN_CODE_RIGHT_PRIM_EXPR:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->rightPrimExpr);
break;
case PHY_HASH_JOIN_CODE_LEFT_PRIM_SLOTID:
code = tlvDecodeI32(pTlv, &pNode->leftPrimSlotId);
break;
case PHY_HASH_JOIN_CODE_RIGHT_PRIM_SLOTID:
code = tlvDecodeI32(pTlv, &pNode->rightPrimSlotId);
break;
case PHY_HASH_JOIN_CODE_TIME_RANGE_TARGET:
code = tlvDecodeI32(pTlv, &pNode->timeRangeTarget);
break;
case PHY_HASH_JOIN_CODE_ON_CONDITIONS:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pFullOnCond);
break;

View File

@ -1459,6 +1459,8 @@ void nodesDestroyNode(SNode* pNode) {
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pOnLeft);
nodesDestroyList(pPhyNode->pOnRight);
nodesDestroyNode(pPhyNode->leftPrimExpr);
nodesDestroyNode(pPhyNode->rightPrimExpr);
nodesDestroyNode(pPhyNode->pFullOnCond);
nodesDestroyList(pPhyNode->pTargets);

View File

@ -54,18 +54,18 @@ static void setColumnInfo(SFunctionNode* pFunc, SColumnNode* pCol, bool isPartit
}
break;
case FUNCTION_TYPE_WSTART:
if (!isPartitionBy) {
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
}
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pCol->colType = COLUMN_TYPE_WINDOW_START;
pCol->isPrimTs = true;
if (!isPartitionBy) {
pCol->isPrimTs = true;
}
break;
case FUNCTION_TYPE_WEND:
if (!isPartitionBy) {
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
}
pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pCol->colType = COLUMN_TYPE_WINDOW_END;
pCol->isPrimTs = true;
if (!isPartitionBy) {
pCol->isPrimTs = true;
}
break;
case FUNCTION_TYPE_WDURATION:
pCol->colType = COLUMN_TYPE_WINDOW_DURATION;

View File

@ -2160,6 +2160,11 @@ int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool groupSort, S
}
return nodesListMakeAppend(pSequencingNodes, (SNode*)pNode);
}
case QUERY_NODE_LOGIC_PLAN_SORT: {
*keepSort = true;
NODES_CLEAR_LIST(*pSequencingNodes);
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_LOGIC_PLAN_JOIN: {
return sortPriKeyOptHandleJoinSort(pNode, groupSort, pSort, pNotOptimize, pSequencingNodes, keepSort);
}
@ -2423,7 +2428,7 @@ static bool joinCondMayBeOptimized(SLogicNode* pNode) {
SScanLogicNode* pLScan = (SScanLogicNode*)pLeft;
SScanLogicNode* pRScan = (SScanLogicNode*)pRight;
if (!IS_TSWINDOW_SPECIFIED(pLScan->scanRange) && !IS_TSWINDOW_SPECIFIED(pLScan->scanRange)) {
if (!IS_TSWINDOW_SPECIFIED(pLScan->scanRange) && !IS_TSWINDOW_SPECIFIED(pRScan->scanRange)) {
return false;
}
@ -4744,7 +4749,7 @@ static bool hashJoinOptShouldBeOptimized(SLogicNode* pNode) {
goto _return;
}
if ((JOIN_STYPE_NONE != pJoin->subType && JOIN_STYPE_OUTER != pJoin->subType) || NULL != pJoin->pTagOnCond || NULL != pJoin->pColOnCond || pNode->pChildren->length != 2 ) {
if ((JOIN_STYPE_NONE != pJoin->subType && JOIN_STYPE_OUTER != pJoin->subType) || JOIN_TYPE_FULL == pJoin->joinType || pNode->pChildren->length != 2 ) {
goto _return;
}
@ -4766,8 +4771,9 @@ static int32_t hashJoinOptSplitPrimFromLogicCond(SNode **pCondition, SNode **pPr
SNodeList *pPrimaryKeyConds = NULL;
SNode *pCond = NULL;
WHERE_EACH(pCond, pLogicCond->pParameterList) {
if (isCondColumnsFromMultiTable(pCond) || COND_TYPE_PRIMARY_KEY != classifyCondition(pCond)) {
if (filterIsMultiTableColsCond(pCond) || COND_TYPE_PRIMARY_KEY != filterClassifyCondition(pCond)) {
WHERE_NEXT;
continue;
}
code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond));
@ -4809,11 +4815,11 @@ int32_t hashJoinOptSplitPrimCond(SNode **pCondition, SNode **pPrimaryKeyCond) {
}
bool needOutput = false;
if (isCondColumnsFromMultiTable(*pCondition)) {
if (filterIsMultiTableColsCond(*pCondition)) {
return TSDB_CODE_SUCCESS;
}
EConditionType type = classifyCondition(*pCondition);
EConditionType type = filterClassifyCondition(*pCondition);
if (COND_TYPE_PRIMARY_KEY == type) {
*pPrimaryKeyCond = *pCondition;
*pCondition = NULL;
@ -4830,6 +4836,7 @@ static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode,
pJoin->joinAlgo = JOIN_ALGO_HASH;
if (NULL != pJoin->pColOnCond) {
#if 0
EJoinType t = pJoin->joinType;
EJoinSubType s = pJoin->subType;
@ -4887,8 +4894,20 @@ static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode,
pJoin->timeRange.skey = TMAX(ltimeRange.skey, rtimeRange.skey);
pJoin->timeRange.ekey = TMIN(ltimeRange.ekey, rtimeRange.ekey);
}
#else
SNode* pPrimaryKeyCond = NULL;
hashJoinOptSplitPrimCond(&pJoin->pColOnCond, &pPrimaryKeyCond);
if (NULL != pPrimaryKeyCond) {
bool isStrict = false;
code = getTimeRangeFromNode(&pPrimaryKeyCond, &pJoin->timeRange, &isStrict);
nodesDestroyNode(pPrimaryKeyCond);
}
#endif
} else {
pJoin->timeRange = TSWINDOW_INITIALIZER;
}
#if 0
if (NULL != pJoin->pTagOnCond && !TSWINDOW_IS_EQUAL(pJoin->timeRange, TSWINDOW_DESC_INITIALIZER)) {
EJoinType t = pJoin->joinType;
EJoinSubType s = pJoin->subType;
@ -4917,14 +4936,16 @@ static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode,
return code;
}
}
#endif
if (!TSWINDOW_IS_EQUAL(pJoin->timeRange, TSWINDOW_DESC_INITIALIZER)) {
FOREACH(pNode, pJoin->node.pChildren) {
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
SNode* pChild = NULL;
FOREACH(pChild, pJoin->node.pChildren) {
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) {
continue;
}
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
SScanLogicNode* pScan = (SScanLogicNode*)pChild;
if (TSWINDOW_IS_EQUAL(pScan->scanRange, TSWINDOW_INITIALIZER)) {
continue;
} else if (pJoin->timeRange.ekey < pScan->scanRange.skey || pJoin->timeRange.skey > pScan->scanRange.ekey) {
@ -4936,7 +4957,59 @@ static int32_t hashJoinOptRewriteJoin(SOptimizeContext* pCxt, SLogicNode* pNode,
}
}
}
pJoin->timeRangeTarget = 0;
if (!TSWINDOW_IS_EQUAL(pJoin->timeRange, TSWINDOW_INITIALIZER)) {
SNode* pChild = NULL;
int32_t timeRangeTarget = 1;
FOREACH(pChild, pJoin->node.pChildren) {
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) {
timeRangeTarget++;
continue;
}
SScanLogicNode* pScan = (SScanLogicNode*)pChild;
if (TSWINDOW_IS_EQUAL(pScan->scanRange, pJoin->timeRange)) {
timeRangeTarget++;
continue;
}
bool replaced = false;
switch (pJoin->joinType) {
case JOIN_TYPE_INNER:
pScan->scanRange.skey = pJoin->timeRange.skey;
pScan->scanRange.ekey = pJoin->timeRange.ekey;
replaced = true;
break;
case JOIN_TYPE_LEFT:
if (2 == timeRangeTarget) {
pScan->scanRange.skey = pJoin->timeRange.skey;
pScan->scanRange.ekey = pJoin->timeRange.ekey;
replaced = true;
}
break;
case JOIN_TYPE_RIGHT:
if (1 == timeRangeTarget) {
pScan->scanRange.skey = pJoin->timeRange.skey;
pScan->scanRange.ekey = pJoin->timeRange.ekey;
replaced = true;
}
break;
default:
break;
}
if (replaced) {
timeRangeTarget++;
continue;
}
pJoin->timeRangeTarget += timeRangeTarget;
timeRangeTarget++;
}
}
pCxt->optimized = true;
OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_STB_JOIN);

View File

@ -792,7 +792,7 @@ static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkI
return TSDB_CODE_SUCCESS;
}
static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId, SSortMergeJoinPhysiNode* pJoin) {
static int32_t setMergeJoinPrimColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId, SSortMergeJoinPhysiNode* pJoin) {
if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
if (pOp->opType != OP_TYPE_EQUAL && JOIN_STYPE_ASOF != subType) {
@ -922,7 +922,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
&pJoin->pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) {
code = setColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
code = setMergeJoinPrimColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
@ -937,7 +937,13 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond,
&pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) {
code = setColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
code = setMergeJoinPrimColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
}
nodesDestroyNode(pPrimKeyCond);
}
@ -1127,6 +1133,107 @@ static int32_t sortHashJoinTargets(int16_t lBlkId, int16_t rBlkId, SHashJoinPhys
return TSDB_CODE_SUCCESS;
}
static int32_t setHashJoinPrimColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, SHashJoinPhysiNode* pJoin) {
if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
SOperatorNode* pOp = (SOperatorNode*)pEqCond;
if (pOp->opType != OP_TYPE_EQUAL) {
planError("invalid primary cond opType, opType:%d", pOp->opType);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
switch (nodeType(pOp->pLeft)) {
case QUERY_NODE_COLUMN: {
SColumnNode* pCol = (SColumnNode*)pOp->pLeft;
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
} else {
planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
break;
}
case QUERY_NODE_FUNCTION: {
SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft;
if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pParam)) {
planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SColumnNode* pCol = (SColumnNode*)pParam;
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
pJoin->leftPrimExpr = nodesCloneNode((SNode*)pFunc);
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
pJoin->rightPrimExpr = nodesCloneNode((SNode*)pFunc);
} else {
planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
break;
}
default:
planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
switch (nodeType(pOp->pRight)) {
case QUERY_NODE_COLUMN: {
SColumnNode* pCol = (SColumnNode*)pOp->pRight;
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
} else {
planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
break;
}
case QUERY_NODE_FUNCTION: {
SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight;
if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pParam)) {
planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SColumnNode* pCol = (SColumnNode*)pParam;
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
pJoin->leftPrimExpr = nodesCloneNode((SNode*)pFunc);
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
pJoin->rightPrimExpr = nodesCloneNode((SNode*)pFunc);
} else {
planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
break;
}
default:
planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
} else {
planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode,
SPhysiNode** pPhyNode) {
SHashJoinPhysiNode* pJoin =
@ -1144,10 +1251,23 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
pJoin->pWindowOffset = nodesCloneNode(pJoinLogicNode->pWindowOffset);
pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit);
pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
pJoin->timeRangeTarget = pJoinLogicNode->timeRangeTarget;
pJoin->timeRange.skey = pJoinLogicNode->timeRange.skey;
pJoin->timeRange.ekey = pJoinLogicNode->timeRange.ekey;
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond);
if (NULL != pJoinLogicNode->pPrimKeyEqCond) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
&pJoin->pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) {
code = setHashJoinPrimColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
}

View File

@ -15,6 +15,8 @@
#include "functionMgt.h"
#include "planInt.h"
#include "scalar.h"
#include "filter.h"
static char* getUsageErrFormat(int32_t errCode) {
switch (errCode) {

View File

@ -4802,14 +4802,8 @@ static EDealRes classifyConditionImpl(SNode *pNode, void *pContext) {
return DEAL_RES_CONTINUE;
}
typedef enum EConditionType {
COND_TYPE_PRIMARY_KEY = 1,
COND_TYPE_TAG_INDEX,
COND_TYPE_TAG,
COND_TYPE_NORMAL
} EConditionType;
static EConditionType classifyCondition(SNode *pNode) {
EConditionType filterClassifyCondition(SNode *pNode) {
SClassifyConditionCxt cxt = {.hasPrimaryKey = false, .hasTagIndexCol = false, .hasOtherCol = false};
nodesWalkExpr(pNode, classifyConditionImpl, &cxt);
return cxt.hasOtherCol ? COND_TYPE_NORMAL
@ -4819,7 +4813,7 @@ static EConditionType classifyCondition(SNode *pNode) {
: (cxt.hasTagIndexCol ? COND_TYPE_TAG_INDEX : COND_TYPE_TAG)));
}
static bool isCondColumnsFromMultiTable(SNode *pCond) {
bool filterIsMultiTableColsCond(SNode *pCond) {
SNodeList *pCondCols = nodesMakeList();
int32_t code = nodesCollectColumnsFromNode(pCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
if (code == TSDB_CODE_SUCCESS) {
@ -4851,12 +4845,12 @@ static int32_t partitionLogicCond(SNode **pCondition, SNode **pPrimaryKeyCond, S
SNodeList *pOtherConds = NULL;
SNode *pCond = NULL;
FOREACH(pCond, pLogicCond->pParameterList) {
if (isCondColumnsFromMultiTable(pCond)) {
if (filterIsMultiTableColsCond(pCond)) {
if (NULL != pOtherCond) {
code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond));
}
} else {
switch (classifyCondition(pCond)) {
switch (filterClassifyCondition(pCond)) {
case COND_TYPE_PRIMARY_KEY:
if (NULL != pPrimaryKeyCond) {
code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond));
@ -4942,13 +4936,13 @@ int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode *
}
bool needOutput = false;
if (isCondColumnsFromMultiTable(*pCondition)) {
if (filterIsMultiTableColsCond(*pCondition)) {
if (NULL != pOtherCond) {
*pOtherCond = *pCondition;
needOutput = true;
}
} else {
switch (classifyCondition(*pCondition)) {
switch (filterClassifyCondition(*pCondition)) {
case COND_TYPE_PRIMARY_KEY:
if (NULL != pPrimaryKeyCond) {
*pPrimaryKeyCond = *pCondition;