From 7e555e437bd3d6c0e2edaa0d9d5972d563a4bfd6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 20 Jun 2023 19:34:16 +0800 Subject: [PATCH] enh: add hash join --- source/common/src/tdatablock.c | 68 ++++- source/libs/executor/inc/hashjoin.h | 49 ++-- source/libs/executor/src/hashjoinoperator.c | 269 +++++++++++++++----- 3 files changed, 286 insertions(+), 100 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 033fbb0ef1..b0085605b7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -143,15 +143,6 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData) { int32_t type = pColumnInfoData->info.type; if (IS_VAR_DATA_TYPE(type)) { - int32_t dataLen = 0; - if (type == TSDB_DATA_TYPE_JSON) { - dataLen = getJsonValueLen(pData); - } else { - dataLen = varDataTLen(pData); - } - - SVarColAttr* pAttr = &pColumnInfoData->varmeta; - pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx]; pColumnInfoData->reassigned = true; } else { @@ -244,6 +235,65 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue); } +void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) { + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows); + } else { + if (numOfRows < sizeof(char) * 2) { + for (int32_t i = 0; i < numOfRows; ++i) { + colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i); + } + } else { + int32_t i = 0; + for (; i < numOfRows; ++i) { + if (BitPos(currentRow + i)) { + colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i); + } else { + break; + } + } + + memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, (numOfRows - i) / sizeof(char)); + i += (numOfRows - i) / sizeof(char) * sizeof(char); + + for (; i < numOfRows; ++i) { + colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i); + } + } + } +} + +int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) { + int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false); + if (code) { + return code; + } + + if (numOfRows > 1) { + int32_t* pOffset = pColumnInfoData->varmeta.offset; + memset(&pOffset[currentRow + 1], &pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1)); + pColumnInfoData->reassigned = true; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, + uint32_t numOfRows, bool isNull) { + int32_t len = pColumnInfoData->info.bytes; + if (isNull) { + colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows); + pColumnInfoData->hasNull = true; + return 0; + } + + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows); + } else { + return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, false); + } +} + static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, int32_t numOfRow2) { if (numOfRow2 <= 0) return; diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 316264d153..94eaeeac28 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -19,18 +19,12 @@ extern "C" { #endif -typedef struct SHJoinRowCtx { - bool rowRemains; - int64_t ts; - SArray* leftRowLocations; - SArray* leftCreatedBlocks; - SArray* rightCreatedBlocks; - int32_t leftRowIdx; - int32_t rightRowIdx; - - bool rightUseBuildTable; - SArray* rightRowLocations; -} SHJoinRowCtx; +typedef struct SHJoinCtx { + bool rowRemains; + SBufRowInfo* pBuildRow; + SSDataBlock* pProbeData; + int32_t probeIdx; +} SHJoinCtx; typedef struct SRowLocation { SSDataBlock* pDataBlock; @@ -38,7 +32,8 @@ typedef struct SRowLocation { } SRowLocation; typedef struct SColBufInfo { - int32_t slotId; + int32_t srcSlot; + int32_t dstSlot; bool vardata; int32_t* offset; int32_t bytes; @@ -55,13 +50,14 @@ typedef struct SBufPageInfo { typedef struct SBufRowInfo { void* next; uint16_t pageId; - int32_t offset; + int32_t offset:31; + int32_t isNull:1; } SBufRowInfo; #pragma pack(pop) -typedef struct SResRowData { +typedef struct SGroupData { SBufRowInfo* rows; -} SResRowData; +} SGroupData; typedef struct SJoinTableInfo { SOperatorInfo* downStream; @@ -79,22 +75,19 @@ typedef struct SJoinTableInfo { } SJoinTableInfo; typedef struct SHJoinOperatorInfo { - SSDataBlock* pRes; - int32_t joinType; - - SJoinTableInfo tbs[2]; - + SSDataBlock* pRes; + int32_t joinType; + SJoinTableInfo tbs[2]; SJoinTableInfo* pBuild; SJoinTableInfo* pProbe; + int32_t pResColNum; + int8_t* pResColMap; SArray* pRowBufs; - - SNode* pCondAfterJoin; - - SSHashObj* pKeyHash; - - - SHJoinRowCtx rowCtx; + SNode* pCondAfterJoin; + SSHashObj* pKeyHash; + SHJoinCtx ctx; } SHJoinOperatorInfo; + static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator); static void destroyHashJoinOperator(void* param); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index eb7e064afe..a79b25ad40 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -39,7 +39,7 @@ int32_t initJoinKeyBufInfo(SColBufInfo** ppInfo, int32_t* colNum, SNodeList* pLi SNode* pNode = NULL; FOREACH(pNode, pList) { SColumnNode* pColNode = (SColumnNode*)pNode; - (*ppInfo)->slotId = pColNode->slotId; + (*ppInfo)->srcSlot = pColNode->slotId; (*ppInfo)->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); (*ppInfo)->bytes = pColNode->node.resType.bytes; bufSize += pColNode->node.resType.bytes; @@ -81,7 +81,7 @@ int32_t initJoinValBufInfo(SJoinTableInfo* pTable, SNodeList* pList) { FOREACH(pNode, pList) { SColumnNode* pColNode = (SColumnNode*)pNode; if (pColNode->dataBlockId == pTable->blkId) { - pTable->valCols->slotId = pColNode->slotId; + pTable->valCols->srcSlot = pColNode->slotId; pTable->valCols->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); if (pTable->valCols->vardata) { pTable->valVarData = true; @@ -121,8 +121,27 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo } void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { + pInfo->pResColNum = pJoinNode->pTargets->length; + pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t)); + if (NULL == pInfo->pResColMap) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pInfo->pBuild = &pInfo->tbs[1]; pInfo->pProbe = &pInfo->tbs[0]; + + SNode* pNode = NULL; + int32_t i = 0; + FOREACH(pNode, pJoinNode->pTargets) { + SColumnNode* pColNode = (SColumnNode*)pNode; + if (pColNode->dataBlockId == pInfo->pBuild->blkId) { + pInfo->pResColMap[i] = 1; + } + + i++; + } + + return TSDB_CODE_SUCCESS; } FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { @@ -173,7 +192,10 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n initJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); initJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - setJoinBuildAndProbeTable(pInfo, pJoinNode); + code = setJoinBuildAndProbeTable(pInfo, pJoinNode); + if (code) { + goto _error; + } code = initJoinBufPages(pInfo); if (code) { @@ -256,59 +278,103 @@ void destroHashJoinOperator(void* param) { taosMemoryFreeClear(param); } -static void doHashJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { - SHJoinOperatorInfo* pJoinInfo = pOperator->info; +FORCE_INLINE char* getColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { + SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId); + return pPage->data + pRow->offset; +} - int32_t nrows = pRes->info.rows; - - bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false; - - while (1) { - int64_t leftTs = 0; - int64_t rightTs = 0; - if (pJoinInfo->rowCtx.rowRemains) { - leftTs = pJoinInfo->rowCtx.ts; - rightTs = pJoinInfo->rowCtx.ts; +FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { + SJoinTableInfo* pBuild = pJoin->pBuild; + SJoinTableInfo* pProbe = pJoin->pProbe; + int32_t buildIdx = 0; + int32_t probeIdx = 0; + SBufRowInfo* pRow = pStart; + int32_t code = 0; + + for (int32_t i = 0; i < pJoin->pResColNum; ++i) { + if (pJoin->pResColMap[i]) { + SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot); + for (int32_t r = 0; r < rowNum; ++r) { + code = colDataSetVal(pCol, pRes->info.rows + r, pRow->isNull ? NULL : getColDataFromRowBufs(pJoin->pRowBufs, pRow), pRow->isNull); + if (code) { + return code; + } + pRow = pRow->next; + } + buildIdx++; } else { - bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); - if (!hasNextTs) { + SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].srcSlot); + SColumnInfoData* pDst = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].dstSlot); + + code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx)); + if (code) { + return code; + } + probeIdx++; + } + } + + return TSDB_CODE_SUCCESS; +} + + +void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SHJoinCtx* pCtx = &pJoin->ctx; + SBufRowInfo* pStart = pCtx->pBuildRow; + int32_t rowNum = 0; + int32_t resNum = pRes.info.rows; + + while (pCtx->pBuildRow && resNum < pRes.info.capacity) { + rowNum++; + resNum++; + pCtx->pBuildRow = pCtx->pBuildRow->next; + } + + int32_t code = copyJoinResRowsToBlock(pJoin, rowNum, pStart, pRes); + if (code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + pRes->info.rows = resNum; + pCtx->rowRemains = pCtx->pBuildRow ? true : false; +} + +void doHashJoinImpl(struct SOperatorInfo* pOperator) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SJoinTableInfo* pProbe = pJoin->pProbe; + SHJoinCtx* pCtx = &pJoin->ctx; + SSDataBlock* pRes = pJoin->pRes; + size_t bufLen = 0; + + if (pJoin->ctx.pBuildRow) { + appendJoinResToBlock(pOperator, pRes); + return; + } + + for (int32_t i = pCtx->probeIdx; i < pCtx->pProbeData->info.rows; ++i) { + copyColDataToBuf(pProbe->keyNum, i, pProbe->keyCols, pProbe->keyBuf, &bufLen); + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyBuf, bufLen); + if (pGroup) { + pCtx->pBuildRow = pGroup->rows; + appendJoinResToBlock(pOperator, pRes); + if (pRes->info.rows >= pRes.info.capacity) { break; } } - - if (leftTs == rightTs) { - mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows); - } else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) { - pJoinInfo->leftPos += 1; - - if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { - continue; - } - } else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) { - pJoinInfo->rightPos += 1; - if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) { - continue; - } - } - - // the pDataBlock are always the same one, no need to call this again - pRes->info.rows = nrows; - pRes->info.dataLoad = 1; - if (pRes->info.rows >= pOperator->resultInfo.threshold) { - break; - } } } int32_t setColBufInfo(SSDataBlock* pBlock, int32_t colNum, SColBufInfo* pColList) { for (int32_t i = 0; i < colNum; ++i) { - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pColList[i].slotId); + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pColList[i].srcSlot); if (pColList[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { - qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pColList[i].slotId, pCol->info.type, pColList[i].vardata); + qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pColList[i].srcSlot, pCol->info.type, pColList[i].vardata); return TSDB_CODE_INVALID_PARA; } if (pColList[i].bytes != IS_VAR_DATA_TYPE(pCol->info.bytes)) { - qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pColList[i].slotId, pCol->info.bytes, pColList[i].bytes); + qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pColList[i].srcSlot, pCol->info.bytes, pColList[i].bytes); return TSDB_CODE_INVALID_PARA; } pColList[i].data = pCol->pData; @@ -341,11 +407,13 @@ FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo* } } -FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf) { +FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { do { SBufPageInfo* page = taosArrayGetLast(pPages); if ((page->pageSize - page->offset) >= bufSize) { *pBuf = page->data + page->offset; + pRow->pageId = taosArrayGetSize(pPages) - 1; + pRow->offset = page->offset; page->offset += bufSize; return TSDB_CODE_SUCCESS; } @@ -357,25 +425,58 @@ FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** } while (true); } -int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SResRowData* pRes, SJoinTableInfo* pTable, char** pBuf, size_t keyLen) { - SResRowData res = {0}; +FORCE_INLINE int32_t getJoinValBufSize(SJoinTableInfo* pTable, int32_t rowIdx) { + if (!pTable->valVarData) { + return pTable->valBufSize; + } - if (NULL == pRes) { - res.rows = taosMemoryMalloc(sizeof(SBufRowInfo)); - if (NULL == res.rows) { + int32_t bufLen = 0; + for (int32_t i = 0; i < pTable->valNum; ++i) { + if (pTable->valCols[i].vardata) { + char* pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx]; + bufLen += varDataTLen(pData); + } else { + bufLen += pTable->valCols[i].bytes; + } + } + + return bufLen; +} + + +int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* pGroup, SJoinTableInfo* pTable, char** pBuf, size_t keyLen, int32_t rowIdx) { + SGroupData group = {0}; + SBufRowInfo* pRow = NULL; + + if (NULL == pGroup) { + group.rows = taosMemoryMalloc(sizeof(SBufRowInfo)); + if (NULL == group.rows) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pRow = group.rows; + } else { + pRow = taosMemoryMalloc(sizeof(SBufRowInfo)) + if (NULL == pRow) { return TSDB_CODE_OUT_OF_MEMORY; } } - int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(), pBuf); + int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), pBuf, pRow); if (code) { return code; } - if (NULL == pRes && tSimpleHashPut(pHash, pTable->keyBuf, keyLen, &res, sizeof(res))) { - return TSDB_CODE_OUT_OF_MEMORY; + if (NULL == pGroup) { + pRow->next = NULL; + if (tSimpleHashPut(pHash, pTable->keyBuf, keyLen, &group, sizeof(group))) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } else { + pRow->next = pGroup->rows; + pGroup->rows = pRow; } + return TSDB_CODE_SUCCESS; } int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, char* pKey, size_t keyLen, int32_t rowIdx) { @@ -386,8 +487,8 @@ int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, char* pKey, } char *valBuf = NULL; - SResRowData* pRes = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyBuf, keyLen); - code = getJoinValBuf(pJoin->pKeyHash, pRes, pBuild, &valBuf, keyLen); + SGroupData* pRes = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyBuf, keyLen); + code = getJoinValBuf(pJoin->pKeyHash, pRes, pBuild, &valBuf, keyLen, rowIdx); if (code) { return code; } @@ -436,39 +537,81 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } +void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SJoinTableInfo* pProbe = pJoin->pProbe; + int32_t code = setColBufInfo(pBlock, pProbe->keyNum, pProbe->keyCols); + if (code) { + return code; + } + code = setColBufInfo(pBlock, pProbe->valNum, pProbe->valCols); + if (code) { + return code; + } + + + pJoin->ctx.probeIdx = 0; + pJoin->ctx.pBuildRow = NULL; + pJoin->ctx.pProbeData = pBlock; + + doHashJoinImpl(pOperator); +} + SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { - SHJoinOperatorInfo* pJoinInfo = pOperator->info; + SHJoinOperatorInfo* pJoin = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pRes = pJoinInfo->pRes; + SSDataBlock* pRes = pJoin->pRes; blockDataCleanup(pRes); - if (NULL == pJoinInfo->pKeyHash) { - code = buildJoinKeyHash(pJoinInfo); + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + if (NULL == pJoin->pKeyHash) { + code = buildJoinKeyHash(pJoin); if (code) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } - if (tSimpleHashGetSize(pJoinInfo->pKeyHash) <= 0) { + if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + pOperator->status = OP_EXEC_DONE; return NULL; } } + if (pJoin->ctx.rowRemains) { + doHashJoinImpl(pOperator); + + if (pRes->info.rows >= pOperator->resultInfo.threshold && pOperator->exprSupp.pFilterInfo != NULL) { + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + } + if (pRes->info.rows > 0) { + return pRes; + } + } + while (true) { - int32_t numOfRowsBefore = pRes->info.rows; - doHashJoinImpl(pOperator, pRes); - int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore; - if (numOfNewRows == 0) { + SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); + if (NULL == pBlock) { break; } + + launchBlockHashJoin(pOperator, pBlock); + + if (pRes->info.rows < pOperator->resultInfo.threshold) { + continue; + } + if (pOperator->exprSupp.pFilterInfo != NULL) { doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); } - if (pRes->info.rows >= pOperator->resultInfo.threshold) { + if (pRes->info.rows > 0) { break; } } + return (pRes->info.rows > 0) ? pRes : NULL; }