From 1a53a9084e81997269b0c6c3dac111eb9455b1da Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 19 Jun 2023 19:40:15 +0800 Subject: [PATCH] feat: add buffer page --- source/libs/executor/inc/hashjoin.h | 52 +++--- source/libs/executor/src/hashjoinoperator.c | 196 +++++++++++++++++--- 2 files changed, 206 insertions(+), 42 deletions(-) diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index b1c1052c88..316264d153 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -45,13 +45,37 @@ typedef struct SColBufInfo { char* data; } SColBufInfo; +typedef struct SBufPageInfo { + int32_t pageSize; + int32_t offset; + char* data; +} SBufPageInfo; + +#pragma pack(push, 1) +typedef struct SBufRowInfo { + void* next; + uint16_t pageId; + int32_t offset; +} SBufRowInfo; +#pragma pack(pop) + +typedef struct SResRowData { + SBufRowInfo* rows; +} SResRowData; + typedef struct SJoinTableInfo { - int32_t keyNum; - SColBufInfo* keyCols; - char* keyBuf; - int32_t valNum; - SColBufInfo* valCols; - char* valBuf; + SOperatorInfo* downStream; + int32_t blkId; + SQueryStat inputStat; + + int32_t keyNum; + SColBufInfo* keyCols; + char* keyBuf; + + int32_t valNum; + SColBufInfo* valCols; + int32_t valBufSize; + bool valVarData; } SJoinTableInfo; typedef struct SHJoinOperatorInfo { @@ -62,26 +86,12 @@ typedef struct SHJoinOperatorInfo { SJoinTableInfo* pBuild; SJoinTableInfo* pProbe; - - int32_t pLeftKeyNum; - SColBufInfo* pLeftKeyInfo; - char* pLeftKeyBuf; - int32_t pLeftValNum; - SColBufInfo* pLeftValInfo; - char* pLeftValBuf; - - int32_t pRightKeyNum; - SColBufInfo* pRightKeyInfo; - char* pRightKeyBuf; - int32_t pRightValNum; - SColBufInfo* pRightValInfo; - char* pRightValBuf; + SArray* pRowBufs; SNode* pCondAfterJoin; SSHashObj* pKeyHash; - SQueryStat inputStat[2]; SHJoinRowCtx rowCtx; } SHJoinOperatorInfo; diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 6886de3bd5..eb7e064afe 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -36,14 +36,14 @@ int32_t initJoinKeyBufInfo(SColBufInfo** ppInfo, int32_t* colNum, SNodeList* pLi } int64_t bufSize = 0; - for (int32_t i = 0; i < *colNum; ++i) { - SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pList, i); - + SNode* pNode = NULL; + FOREACH(pNode, pList) { + SColumnNode* pColNode = (SColumnNode*)pNode; (*ppInfo)->slotId = pColNode->slotId; (*ppInfo)->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); (*ppInfo)->bytes = pColNode->node.resType.bytes; bufSize += pColNode->node.resType.bytes; - } + } *ppBuf = taosMemoryMalloc(bufSize); if (NULL == *ppBuf) { @@ -53,15 +53,101 @@ int32_t initJoinKeyBufInfo(SColBufInfo** ppInfo, int32_t* colNum, SNodeList* pLi return TSDB_CODE_SUCCESS; } -int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SNodeList* pKeyList, int32_t idx) { +void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { + *colNum = 0; + + SNode* pNode = NULL; + FOREACH(pNode, pList) { + SColumnNode* pCol = (SColumnNode*)pNode; + if (pCol->dataBlockId == blkId) { + (*colNum)++; + } + } +} + +int32_t initJoinValBufInfo(SJoinTableInfo* pTable, SNodeList* pList) { + getJoinValColNum(pList, pTable->blkId, &pTable->valNum); + if (pTable->valNum <= 0) { + qError("fail to get join value column, num:%d", pTable->valNum); + return TSDB_CODE_INVALID_MSG; + } + + pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SColBufInfo)); + if (NULL == pTable->valCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNode* pNode = NULL; + FOREACH(pNode, pList) { + SColumnNode* pColNode = (SColumnNode*)pNode; + if (pColNode->dataBlockId == pTable->blkId) { + pTable->valCols->slotId = pColNode->slotId; + pTable->valCols->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + if (pTable->valCols->vardata) { + pTable->valVarData = true; + } + pTable->valCols->bytes = pColNode->node.resType.bytes; + pTable->valBufSize += pColNode->node.resType.bytes; + } + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { + SNodeList* pKeyList = NULL; SJoinTableInfo* pTable = &pJoin->tbs[idx]; + pTable->downStream = pDownstream[idx]; + pTable->blkId = pDownstream[idx]->resultDataBlockId; + if (0 == idx) { + pKeyList = pJoinNode->pOnLeft; + } else { + pKeyList = pJoinNode->pOnRight; + } int32_t code = initJoinKeyBufInfo(&pTable->keyCols, &pTable->keyNum, pKeyList, &pTable->keyBuf); if (code) { return code; } + int32_t code = initJoinValBufInfo(&pTable->keyCols, &pTable->keyNum, pJoinNode->pTargets, &pTable->keyBuf, pTable->blkId, pTable); + if (code) { + return code; + } + + memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); + + return TSDB_CODE_SUCCESS; } +void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { + pInfo->pBuild = &pInfo->tbs[1]; + pInfo->pProbe = &pInfo->tbs[0]; +} + +FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { + SBufPageInfo page; + page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE; + page.offset = 0; + page.data = taosMemoryMalloc(page.pageSize); + if (NULL == page.data) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pRowBufs, &page); + return TSDB_CODE_SUCCESS; +} + +int32_t initJoinBufPages(SHJoinOperatorInfo* pInfo) { + pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo)); + if (NULL == pInfo->pRowBufs) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + return addPageToJoinBuf(pInfo->pRowBufs); +} + + SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo)); @@ -84,17 +170,17 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = numOfCols; - initJoinTableInfo(pJoinNode, pJoinNode->pOnLeft, 0); - initJoinTableInfo(pJoinNode, pJoinNode->pOnRight, 1); + initJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); + initJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - code = initJoinColBufInfo(&pInfo->pRightKeyInfo, &pInfo->pRightKeyNum, pJoinNode->pOnRight, &pInfo->pRightKeyBuf); + setJoinBuildAndProbeTable(pInfo, pJoinNode); + + code = initJoinBufPages(pInfo); if (code) { goto _error; } - memcpy(pInfo->inputStat, pJoinNode->inputStat, sizeof(pJoinNode->inputStat)); - - size_t hashCap = pInfo->inputStat[1].inputRowNum > 0 ? (pInfo->inputStat[1].inputRowNum * 1.5) : 1024; + size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024; pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); if (pInfo->pKeyHash == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -234,32 +320,100 @@ int32_t setColBufInfo(SSDataBlock* pBlock, int32_t colNum, SColBufInfo* pColList return TSDB_CODE_SUCCESS; } -FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo* pColList, char* pBuf, size_t *bufLen) { +FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo* pColList, char* pBuf, size_t *pBufLen) { char *pData = NULL; - *bufLen = 0; + size_t bufLen = 0; for (int32_t i = 0; i < colNum; ++i) { if (pColList[i].vardata) { pData = pColList[i].data + pColList[i].offset[rowIdx]; - memcpy(pBuf + *bufLen, pData, varDataTLen(pData)); - *bufLen += varDataTLen(pData); + memcpy(pBuf + bufLen, pData, varDataTLen(pData)); + bufLen += varDataTLen(pData); + } else { + pData = pColList[i].data + pColList[i].bytes * rowIdx; + memcpy(pBuf + bufLen, pColList[i].data, pColList[i].bytes); + bufLen += pColList[i].bytes; } } + + if (pBufLen) { + *pBufLen = bufLen; + } } +FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf) { + do { + SBufPageInfo* page = taosArrayGetLast(pPages); + if ((page->pageSize - page->offset) >= bufSize) { + *pBuf = page->data + page->offset; + page->offset += bufSize; + return TSDB_CODE_SUCCESS; + } -int32_t addBlockRowsKeyToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { - int32_t code = setColBufInfo(pBlock, pJoin->pRightKeyNum, pJoin->pRightKeyInfo); + int32_t code = addPageToJoinBuf(pPages); + if (code) { + return code; + } + } while (true); +} + +int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SResRowData* pRes, SJoinTableInfo* pTable, char** pBuf, size_t keyLen) { + SResRowData res = {0}; + + if (NULL == pRes) { + res.rows = taosMemoryMalloc(sizeof(SBufRowInfo)); + if (NULL == res.rows) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(), pBuf); + if (code) { + return code; + } + + if (NULL == pRes && tSimpleHashPut(pHash, pTable->keyBuf, keyLen, &res, sizeof(res))) { + return TSDB_CODE_OUT_OF_MEMORY; + } + +} + +int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, char* pKey, size_t keyLen, int32_t rowIdx) { + SJoinTableInfo* pBuild = pJoin->pBuild; + int32_t code = setColBufInfo(pBlock, pBuild->valNum, pBuild->valCols); + if (code) { + return code; + } + + char *valBuf = NULL; + SResRowData* pRes = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyBuf, keyLen); + code = getJoinValBuf(pJoin->pKeyHash, pRes, pBuild, &valBuf, keyLen); + if (code) { + return code; + } + + copyColDataToBuf(pBuild->valNum, rowIdx, pBuild->valCols, valBuf, NULL); + + return TSDB_CODE_SUCCESS; +} + +int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { + SJoinTableInfo* pBuild = pJoin->pBuild; + int32_t code = setColBufInfo(pBlock, pBuild->keyNum, pBuild->keyCols); if (code) { return code; } size_t bufLen = 0; for (int32_t i = 0; i < pBlock->info.rows; ++i) { - copyColDataToBuf(pJoin->pRightKeyNum, i, pJoin->pRightKeyInfo, pJoin->pRightKeyBuf, &bufLen); + copyColDataToBuf(pBuild->keyNum, i, pBuild->keyCols, pBuild->keyBuf, &bufLen); + code = addRowToHash(pJoin, pBlock, pBuild->keyBuf, bufLen, i); + if (code) { + return code; + } } - tSimpleHashPut(pJoin->pKeyHash, pJoin->pRightKeyBuf, bufLen, NULL, 0); + return code; } int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { @@ -268,12 +422,12 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; while (true) { - pBlock = pOperator->pDownstream[1]->fpSet.getNextFn(pOperator->pDownstream[1]); + pBlock = pJoin->pBuild->downStream->fpSet.getNextFn(pJoin->pBuild->downStream); if (NULL == pBlock) { break; } - code = addBlockRowsKeyToHash(pBlock, pJoin); + code = addBlockRowsToHash(pBlock, pJoin->pKeyHash, pJoin->pBuild); if (code) { return code; }