feat: support hash join

This commit is contained in:
dapan1121 2023-12-13 18:54:16 +08:00
parent 13ab89bd1b
commit b2fb30f6c0
4 changed files with 484 additions and 311 deletions

View File

@ -207,6 +207,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
int32_t pageSize); int32_t pageSize);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
@ -231,6 +232,7 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock); void blockDataCleanup(SSDataBlock* pDataBlock);
void blockDataReset(SSDataBlock* pDataBlock);
void blockDataEmpty(SSDataBlock* pDataBlock); void blockDataEmpty(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize);

View File

@ -649,6 +649,24 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) {
if (pDest->info.rows + numOfRows > pDest->info.capacity) {
return TSDB_CODE_FAILED;
}
size_t numOfCols = taosArrayGetSize(pDest->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
colDataAssignNRows(pCol2, pDest->info.rows, pCol1, srcIdx, numOfRows);
}
pDest->info.rows += pSrc->info.rows;
return TSDB_CODE_SUCCESS;
}
size_t blockDataGetSize(const SSDataBlock* pBlock) { size_t blockDataGetSize(const SSDataBlock* pBlock) {
size_t total = 0; size_t total = 0;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -751,6 +769,8 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount);
/*
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
bool isNull = false; bool isNull = false;
if (pBlock->pBlockAgg == NULL) { if (pBlock->pBlockAgg == NULL) {
@ -766,6 +786,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
colDataSetVal(pDstCol, j - startIndex, p, false); colDataSetVal(pDstCol, j - startIndex, p, false);
} }
} }
*/
} }
pDst->info.rows = rowCount; pDst->info.rows = rowCount;
@ -1282,6 +1303,31 @@ void blockDataEmpty(SSDataBlock* pDataBlock) {
pInfo->window.skey = 0; pInfo->window.skey = 0;
} }
void blockDataReset(SSDataBlock* pDataBlock) {
SDataBlockInfo* pInfo = &pDataBlock->info;
if (pInfo->capacity == 0) {
return;
}
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
p->hasNull = false;
p->reassigned = false;
if (IS_VAR_DATA_TYPE(p->info.type)) {
p->varmeta.length = 0;
}
}
pInfo->rows = 0;
pInfo->dataLoad = 0;
pInfo->window.ekey = 0;
pInfo->window.skey = 0;
pInfo->id.uid = 0;
pInfo->id.groupId = 0;
}
/* /*
* NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
* the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to

View File

@ -87,8 +87,7 @@ typedef struct SMJoinTableInfo {
SSDataBlock* blk; SSDataBlock* blk;
int32_t blkRowIdx; int32_t blkRowIdx;
int64_t grpRowsNum; int64_t grpTotalRows;
int64_t grpRemainRows;
int32_t grpIdx; int32_t grpIdx;
SArray* eqGrps; SArray* eqGrps;
SArray* createdBlks; SArray* createdBlks;
@ -111,19 +110,18 @@ typedef struct SMJoinGrpRows {
} SMJoinGrpRows; } SMJoinGrpRows;
typedef struct SMJoinMergeCtx { typedef struct SMJoinMergeCtx {
bool hashCan; bool hashCan;
bool keepOrder; bool keepOrder;
bool grpRemains; bool grpRemains;
bool midRemains; bool midRemains;
bool eqCart; bool lastEqGrp;
bool noColCond; int32_t blkThreshold;
int32_t blksCapacity; SSDataBlock* midBlk;
SSDataBlock* midBlk; SSDataBlock* finBlk;
SSDataBlock* finBlk; int64_t lastEqTs;
SSDataBlock* resBlk; SMJoinGrpRows probeNEqGrp;
int64_t lastEqTs; bool hashJoin;
SMJoinGrpRows probeNEqGrp; SMJoinOperatorInfo* pJoin;
bool hashJoin;
} SMJoinMergeCtx; } SMJoinMergeCtx;
typedef struct SMJoinWinCtx { typedef struct SMJoinWinCtx {
@ -161,17 +159,17 @@ typedef struct SMJoinOperatorInfo {
SOperatorInfo* pOperator; SOperatorInfo* pOperator;
int32_t joinType; int32_t joinType;
int32_t subType; int32_t subType;
int32_t inputTsOrder; int32_t inputTsOrder;
int32_t errCode;
SMJoinTableInfo tbs[2]; SMJoinTableInfo tbs[2];
SMJoinTableInfo* build; SMJoinTableInfo* build;
SMJoinTableInfo* probe; SMJoinTableInfo* probe;
SSDataBlock* pResBlk;
int32_t pResColNum; int32_t pResColNum;
int8_t* pResColMap; int8_t* pResColMap;
SFilterInfo* pFPreFilter; SFilterInfo* pFPreFilter;
SFilterInfo* pPreFilter; SFilterInfo* pPreFilter;
SFilterInfo* pFinFilter; SFilterInfo* pFinFilter;
SMJoinFuncs* joinFps; // SMJoinFuncs* joinFps;
SMJoinCtx ctx; SMJoinCtx ctx;
SMJoinExecInfo execInfo; SMJoinExecInfo execInfo;
} SMJoinOperatorInfo; } SMJoinOperatorInfo;
@ -180,7 +178,7 @@ typedef struct SMJoinOperatorInfo {
#define MJOIN_DS_NEED_INIT(_pOp, _tbctx) (MJOIN_DS_REQ_INIT(_pOp) && (!(_tbctx)->dsInitDone)) #define MJOIN_DS_NEED_INIT(_pOp, _tbctx) (MJOIN_DS_REQ_INIT(_pOp) && (!(_tbctx)->dsInitDone))
#define MJOIN_TB_LOW_BLK(_tbctx) ((_tbctx)->blkNum <= 0 || ((_tbctx)->blkNum == 1 && (_tbctx)->pHeadBlk->cloned)) #define MJOIN_TB_LOW_BLK(_tbctx) ((_tbctx)->blkNum <= 0 || ((_tbctx)->blkNum == 1 && (_tbctx)->pHeadBlk->cloned))
#define REACH_HJOIN_THRESHOLD(_prb, _bld) ((_prb)->grpRowsNum * (_bld)->grpRowsNum > MJOIN_HJOIN_CART_THRESHOLD) #define REACH_HJOIN_THRESHOLD(_prb, _bld) ((_prb)->grpTotalRows * (_bld)->grpTotalRows >= MJOIN_HJOIN_CART_THRESHOLD)
#define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair)) #define SET_SAME_TS_GRP_HJOIN(_pair, _octx) ((_pair)->hashJoin = (_octx)->hashCan && REACH_HJOIN_THRESHOLD(_pair))
@ -190,15 +188,42 @@ typedef struct SMJoinOperatorInfo {
#define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1) #define GRP_REMAIN_ROWS(_grp) ((_grp)->endIdx - (_grp)->readIdx + 1)
#define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx) #define GRP_DONE(_grp) ((_grp)->readIdx > (_grp)->endIdx)
#define MJOIN_TB_ROWS_DONE(_tb) ((_tb)->blkRowIdx >= (_tb)->blk->info.rows)
#define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity) #define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity)
#define SET_TABLE_CUR_TS(_col, _ts, _tb) \ #define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \
do { \ do { \
(_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \ (_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \
(_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \ (_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \
} while (0) } while (0)
#define MJOIN_GET_TB_CUR_TS(_col, _ts, _tb) \
do { \
(_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \
} while (0)
#define MJ_ERR_RET(c) \
do { \
int32_t _code = (c); \
if (_code != TSDB_CODE_SUCCESS) { \
terrno = _code; \
return _code; \
} \
} while (0)
#define MJ_ERR_JRET(c) \
do { \
code = (c); \
if (code != TSDB_CODE_SUCCESS) { \
terrno = code; \
goto _return; \
} \
} while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

File diff suppressed because it is too large Load Diff