From 24a5a3f41bacef2181e0fd84d32dbfce185002d2 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sun, 25 Jun 2023 17:31:04 +0800 Subject: [PATCH] fix: compile issues --- include/common/tdatablock.h | 2 + source/common/src/tdatablock.c | 2 +- source/libs/executor/inc/hashjoin.h | 19 +- source/libs/executor/inc/operator.h | 4 +- source/libs/executor/src/hashjoinoperator.c | 1005 ++++++++++--------- 5 files changed, 541 insertions(+), 491 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 6cb7d88523..a4b39645a2 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -180,6 +180,8 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull); int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData); int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue); +int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, + uint32_t numOfRows, bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, int32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b0085605b7..03edb1be73 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -271,7 +271,7 @@ int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t curren if (numOfRows > 1) { int32_t* pOffset = pColumnInfoData->varmeta.offset; - memset(&pOffset[currentRow + 1], &pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1)); + memset(&pOffset[currentRow + 1], pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1)); pColumnInfoData->reassigned = true; } diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 136444ed39..a4a180d542 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -21,6 +21,14 @@ extern "C" { #define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760 +#pragma pack(push, 1) +typedef struct SBufRowInfo { + void* next; + uint16_t pageId; + int32_t offset; +} SBufRowInfo; +#pragma pack(pop) + typedef struct SHJoinCtx { bool rowRemains; SBufRowInfo* pBuildRow; @@ -42,7 +50,6 @@ typedef struct SHJoinColInfo { int32_t bytes; char* data; char* bitMap; - char* dataInBuf; } SHJoinColInfo; typedef struct SBufPageInfo { @@ -51,13 +58,6 @@ typedef struct SBufPageInfo { char* data; } SBufPageInfo; -#pragma pack(push, 1) -typedef struct SBufRowInfo { - void* next; - uint16_t pageId; - int32_t offset; -} SBufRowInfo; -#pragma pack(pop) typedef struct SGroupData { SBufRowInfo* rows; @@ -96,9 +96,6 @@ typedef struct SHJoinOperatorInfo { SHJoinCtx ctx; } SHJoinOperatorInfo; -static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator); -static void destroyHashJoinOperator(void* param); - #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 1d2685b8c6..c18edd870f 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -126,6 +126,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); @@ -163,4 +165,4 @@ int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SA } #endif -#endif // TDENGINE_OPERATOR_H \ No newline at end of file +#endif // TDENGINE_OPERATOR_H diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 4e1b2d9e5e..fca4a86f8d 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -27,7 +27,7 @@ #include "ttypes.h" #include "hashjoin.h" -int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { +static int32_t initHJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { pTable->keyNum = LIST_LENGTH(pList); pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo)); @@ -40,9 +40,9 @@ int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { SNode* pNode = NULL; FOREACH(pNode, pList) { SColumnNode* pColNode = (SColumnNode*)pNode; - pTable->keyCols[i]->srcSlot = pColNode->slotId; - pTable->keyCols[i]->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); - pTable->keyCols[i]->bytes = pColNode->node.resType.bytes; + pTable->keyCols[i].srcSlot = pColNode->slotId; + pTable->keyCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + pTable->keyCols[i].bytes = pColNode->node.resType.bytes; bufSize += pColNode->node.resType.bytes; ++i; } @@ -57,7 +57,7 @@ int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { return TSDB_CODE_SUCCESS; } -void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { +static void getHJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { *colNum = 0; SNode* pNode = NULL; @@ -70,7 +70,7 @@ void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { } } -bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) { +static bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) { for (int32_t i = 0; i < keyNum; ++i) { if (pKeys[i].srcSlot == slotId) { *pKeyIdx = i; @@ -81,8 +81,8 @@ bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32 return false; } -int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { - getJoinValColNum(pList, pTable->blkId, &pTable->valNum); +static int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { + getHJoinValColNum(pList, pTable->blkId, &pTable->valNum); if (pTable->valNum == 0) { return TSDB_CODE_SUCCESS; } @@ -133,7 +133,7 @@ int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { } -int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { +static int32_t initHJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { SNodeList* pKeyList = NULL; SHJoinTableInfo* pTable = &pJoin->tbs[idx]; pTable->downStream = pDownstream[idx]; @@ -144,11 +144,11 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo pKeyList = pJoinNode->pOnRight; } - int32_t code = initJoinKeyColsInfo(pTable, pKeyList); + int32_t code = initHJoinKeyColsInfo(pTable, pKeyList); if (code) { return code; } - int32_t code = initJoinValColsInfo(pTable, pJoinNode->pTargets); + code = initJoinValColsInfo(pTable, pJoinNode->pTargets); if (code) { return code; } @@ -158,7 +158,7 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo return TSDB_CODE_SUCCESS; } -void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { +static void setHJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { int32_t buildIdx = 0; int32_t probeIdx = 1; @@ -190,7 +190,7 @@ void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJ pInfo->pProbe = &pInfo->tbs[probeIdx]; } -void buildJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { +static int32_t buildHJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { pInfo->pResColNum = pJoinNode->pTargets->length; pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t)); if (NULL == pInfo->pResColMap) { @@ -213,7 +213,7 @@ void buildJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode } -FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { +static FORCE_INLINE int32_t addPageToHJoinBuf(SArray* pRowBufs) { SBufPageInfo page; page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE; page.offset = 0; @@ -226,15 +226,520 @@ FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { return TSDB_CODE_SUCCESS; } -int32_t initJoinBufPages(SHJoinOperatorInfo* pInfo) { +static int32_t initHJoinBufPages(SHJoinOperatorInfo* pInfo) { pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo)); if (NULL == pInfo->pRowBufs) { return TSDB_CODE_OUT_OF_MEMORY; } - return addPageToJoinBuf(pInfo->pRowBufs); + return addPageToHJoinBuf(pInfo->pRowBufs); } +static void freeHJoinTableInfo(SHJoinTableInfo* pTable) { + taosMemoryFreeClear(pTable->keyCols); + taosMemoryFreeClear(pTable->keyBuf); + taosMemoryFreeClear(pTable->valCols); + taosArrayDestroy(pTable->valVarCols); +} + +static void freeHJoinBufPage(void* param) { + SBufPageInfo* pInfo = (SBufPageInfo*)param; + taosMemoryFree(pInfo->data); +} + +static void destroyHJoinKeyHash(SSHashObj** ppHash) { + if (NULL == ppHash || NULL == (*ppHash)) { + return; + } + + void* pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(*ppHash, pIte, &iter)) != NULL) { + SGroupData* pGroup = pIte; + SBufRowInfo* pRow = pGroup->rows; + SBufRowInfo* pNext = NULL; + while (pRow) { + pNext = pRow->next; + taosMemoryFree(pRow); + pRow = pNext; + } + } + + tSimpleHashCleanup(*ppHash); + *ppHash = NULL; +} + +static void destroyHashJoinOperator(void* param) { + SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; + + destroyHJoinKeyHash(&pJoinOperator->pKeyHash); + + freeHJoinTableInfo(&pJoinOperator->tbs[0]); + freeHJoinTableInfo(&pJoinOperator->tbs[1]); + pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); + taosMemoryFreeClear(pJoinOperator->pResColMap); + taosArrayDestroyEx(pJoinOperator->pRowBufs, freeHJoinBufPage); + nodesDestroyNode(pJoinOperator->pCond); + + taosMemoryFreeClear(param); +} + +static FORCE_INLINE char* retrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { + SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId); + return pPage->data + pRow->offset; +} + +static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { + SHJoinTableInfo* pBuild = pJoin->pBuild; + SHJoinTableInfo* pProbe = pJoin->pProbe; + int32_t buildIdx = 0, buildValIdx = 0; + int32_t probeIdx = 0; + SBufRowInfo* pRow = pStart; + int32_t code = 0; + + for (int32_t r = 0; r < rowNum; ++r) { + char* pData = retrieveColDataFromRowBufs(pJoin->pRowBufs, pRow); + char* pValData = pData + pBuild->valBitMapSize; + char* pKeyData = pProbe->keyData; + buildIdx = buildValIdx = probeIdx = 0; + for (int32_t i = 0; i < pJoin->pResColNum; ++i) { + if (pJoin->pResColMap[i]) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot); + if (pBuild->valCols[buildIdx].keyCol) { + code = colDataSetVal(pDst, pRes->info.rows + r, pKeyData, false); + if (code) { + return code; + } + pKeyData += pBuild->valCols[buildIdx].vardata ? varDataTLen(pKeyData) : pBuild->valCols[buildIdx].bytes; + } else { + if (colDataIsNull_f(pData, buildValIdx)) { + code = colDataSetVal(pDst, pRes->info.rows + r, NULL, true); + if (code) { + return code; + } + } else { + code = colDataSetVal(pDst, pRes->info.rows + r, pValData, false); + if (code) { + return code; + } + pValData += pBuild->valCols[buildIdx].vardata ? varDataTLen(pValData) : pBuild->valCols[buildIdx].bytes; + } + buildValIdx++; + } + buildIdx++; + } else if (0 == i) { + SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData->pDataBlock, pProbe->valCols[probeIdx].srcSlot); + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); + + code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx)); + if (code) { + return code; + } + probeIdx++; + } + } + pRow = pRow->next; + } + + return TSDB_CODE_SUCCESS; +} + + +static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SHJoinCtx* pCtx = &pJoin->ctx; + SBufRowInfo* pStart = pCtx->pBuildRow; + int32_t rowNum = 0; + int32_t resNum = pRes->info.rows; + + while (pCtx->pBuildRow && resNum < pRes->info.capacity) { + rowNum++; + resNum++; + pCtx->pBuildRow = pCtx->pBuildRow->next; + } + + int32_t code = copyHJoinResRowsToBlock(pJoin, rowNum, pStart, pRes); + if (code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + pRes->info.rows = resNum; + pCtx->rowRemains = pCtx->pBuildRow ? true : false; +} + + +static FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { + char *pData = NULL; + size_t bufLen = 0; + + if (1 == pTable->keyNum) { + if (pTable->keyCols[0].vardata) { + pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx]; + bufLen = varDataTLen(pData); + } else { + pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx; + bufLen = pTable->keyCols[0].bytes; + } + pTable->keyData = pData; + } else { + for (int32_t i = 0; i < pTable->keyNum; ++i) { + if (pTable->keyCols[i].vardata) { + pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; + memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); + bufLen += varDataTLen(pData); + } else { + pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx; + memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); + bufLen += pTable->keyCols[i].bytes; + } + } + pTable->keyData = pTable->keyBuf; + } + + if (pBufLen) { + *pBufLen = bufLen; + } +} + + +static void doHashJoinImpl(struct SOperatorInfo* pOperator) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SHJoinTableInfo* pProbe = pJoin->pProbe; + SHJoinCtx* pCtx = &pJoin->ctx; + SSDataBlock* pRes = pJoin->pRes; + size_t bufLen = 0; + + if (pJoin->ctx.pBuildRow) { + appendHJoinResToBlock(pOperator, pRes); + return; + } + + for (int32_t i = pCtx->probeIdx; i < pCtx->pProbeData->info.rows; ++i) { + copyKeyColsDataToBuf(pProbe, i, &bufLen); + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); + if (pGroup) { + pCtx->pBuildRow = pGroup->rows; + appendHJoinResToBlock(pOperator, pRes); + if (pRes->info.rows >= pRes->info.capacity) { + break; + } + } + } +} + +static int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { + for (int32_t i = 0; i < pTable->keyNum; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); + if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { + qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata); + return TSDB_CODE_INVALID_PARA; + } + if (pTable->keyCols[i].bytes != pCol->info.bytes) { + qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes, pTable->keyCols[i].bytes); + return TSDB_CODE_INVALID_PARA; + } + pTable->keyCols[i].data = pCol->pData; + if (pTable->keyCols[i].vardata) { + pTable->keyCols[i].offset = pCol->varmeta.offset; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { + if (!pTable->valColExist) { + return TSDB_CODE_SUCCESS; + } + for (int32_t i = 0; i < pTable->valNum; ++i) { + if (pTable->valCols[i].keyCol) { + continue; + } + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->valCols[i].srcSlot); + if (pTable->valCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { + qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->valCols[i].srcSlot, pCol->info.type, pTable->valCols[i].vardata); + return TSDB_CODE_INVALID_PARA; + } + if (pTable->valCols[i].bytes != pCol->info.bytes) { + qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->valCols[i].srcSlot, pCol->info.bytes, pTable->valCols[i].bytes); + return TSDB_CODE_INVALID_PARA; + } + if (!pTable->valCols[i].vardata) { + pTable->valCols[i].bitMap = pCol->nullbitmap; + } + pTable->valCols[i].data = pCol->pData; + if (pTable->valCols[i].vardata) { + pTable->valCols[i].offset = pCol->varmeta.offset; + } + } + + return TSDB_CODE_SUCCESS; +} + + + +static FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { + if (!pTable->valColExist) { + return; + } + + char *pData = NULL; + size_t bufLen = pTable->valBitMapSize; + for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) { + if (pTable->valCols[i].keyCol) { + continue; + } + if (pTable->valCols[i].vardata) { + if (-1 == pTable->valCols[i].offset[rowIdx]) { + colDataSetNull_f(pTable->valData, m); + } else { + pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx]; + memcpy(pTable->valData + bufLen, pData, varDataTLen(pData)); + bufLen += varDataTLen(pData); + } + } else { + if (colDataIsNull_f(pTable->valCols[i].bitMap, rowIdx)) { + colDataSetNull_f(pTable->valData, m); + } else { + pData = pTable->valCols[i].data + pTable->valCols[i].bytes * rowIdx; + memcpy(pTable->valData + bufLen, pData, pTable->valCols[i].bytes); + bufLen += pTable->valCols[i].bytes; + } + } + m++; + } +} + + +static FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { + if (0 == bufSize) { + pRow->pageId = -1; + return TSDB_CODE_SUCCESS; + } + + if (bufSize > HASH_JOIN_DEFAULT_PAGE_SIZE) { + qError("invalid join value buf size:%d", bufSize); + return TSDB_CODE_INVALID_PARA; + } + + do { + SBufPageInfo* page = taosArrayGetLast(pPages); + if ((page->pageSize - page->offset) >= bufSize) { + *pBuf = page->data + page->offset; + pRow->pageId = taosArrayGetSize(pPages) - 1; + pRow->offset = page->offset; + page->offset += bufSize; + return TSDB_CODE_SUCCESS; + } + + int32_t code = addPageToHJoinBuf(pPages); + if (code) { + return code; + } + } while (true); +} + +static FORCE_INLINE int32_t getHJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { + if (NULL == pTable->valVarCols) { + return pTable->valBufSize; + } + + int32_t* varColIdx = NULL; + int32_t bufLen = pTable->valBufSize; + int32_t varColNum = taosArrayGetSize(pTable->valVarCols); + for (int32_t i = 0; i < varColNum; ++i) { + varColIdx = taosArrayGet(pTable->valVarCols, i); + char* pData = pTable->valCols[*varColIdx].data + pTable->valCols[*varColIdx].offset[rowIdx]; + bufLen += varDataTLen(pData); + } + + return bufLen; +} + + +static int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { + SGroupData group = {0}; + SBufRowInfo* pRow = NULL; + + if (NULL == pGroup) { + group.rows = taosMemoryMalloc(sizeof(SBufRowInfo)); + if (NULL == group.rows) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pRow = group.rows; + } else { + pRow = taosMemoryMalloc(sizeof(SBufRowInfo)); + if (NULL == pRow) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + int32_t code = getHJoinValBufSize(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); + if (code) { + return code; + } + + if (NULL == pGroup) { + pRow->next = NULL; + if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } else { + pRow->next = pGroup->rows; + pGroup->rows = pRow; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { + SHJoinTableInfo* pBuild = pJoin->pBuild; + int32_t code = setValColsData(pBlock, pBuild); + if (code) { + return code; + } + + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyData, keyLen); + code = addRowToHashImpl(pJoin, pGroup, pBuild, keyLen, rowIdx); + if (code) { + return code; + } + + copyValColsDataToBuf(pBuild, rowIdx); + + return TSDB_CODE_SUCCESS; +} + +static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { + SHJoinTableInfo* pBuild = pJoin->pBuild; + int32_t code = setKeyColsData(pBlock, pBuild); + if (code) { + return code; + } + + size_t bufLen = 0; + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + copyKeyColsDataToBuf(pBuild, i, &bufLen); + code = addRowToHash(pJoin, pBlock, bufLen, i); + if (code) { + return code; + } + } + + return code; +} + +static int32_t buildHJoinKeyHash(SHJoinOperatorInfo* pJoin) { + SSDataBlock* pBlock = NULL; + int32_t code = TSDB_CODE_SUCCESS; + + while (true) { + pBlock = pJoin->pBuild->downStream->fpSet.getNextFn(pJoin->pBuild->downStream); + if (NULL == pBlock) { + break; + } + + code = addBlockRowsToHash(pBlock, pJoin); + if (code) { + return code; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SHJoinTableInfo* pProbe = pJoin->pProbe; + int32_t code = setKeyColsData(pBlock, pProbe); + if (code) { + return code; + } + code = setValColsData(pBlock, pProbe); + if (code) { + return code; + } + + pJoin->ctx.probeIdx = 0; + pJoin->ctx.pBuildRow = NULL; + pJoin->ctx.pProbeData = pBlock; + + doHashJoinImpl(pOperator); + + return TSDB_CODE_SUCCESS; +} + +static void setHJoinDone(struct SOperatorInfo* pOperator) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + pOperator->status = OP_EXEC_DONE; + + SHJoinOperatorInfo* pInfo = pOperator->info; + destroyHJoinKeyHash(&pInfo->pKeyHash); +} + +static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { + SHJoinOperatorInfo* pJoin = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pRes = pJoin->pRes; + pRes->info.rows = 0; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + if (NULL == pJoin->pKeyHash) { + code = buildHJoinKeyHash(pJoin); + if (code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } + + if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) { + setHJoinDone(pOperator); + return NULL; + } + } + + if (pJoin->ctx.rowRemains) { + doHashJoinImpl(pOperator); + + if (pRes->info.rows >= pOperator->resultInfo.threshold && pOperator->exprSupp.pFilterInfo != NULL) { + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + } + if (pRes->info.rows > 0) { + return pRes; + } + } + + while (true) { + SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); + if (NULL == pBlock) { + setHJoinDone(pOperator); + break; + } + + code = launchBlockHashJoin(pOperator, pBlock); + if (code) { + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } + + if (pRes->info.rows < pOperator->resultInfo.threshold) { + continue; + } + + if (pOperator->exprSupp.pFilterInfo != NULL) { + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + } + if (pRes->info.rows > 0) { + break; + } + } + + return (pRes->info.rows > 0) ? pRes : NULL; +} SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { @@ -254,16 +759,16 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - initJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); - initJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); + initHJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); + initHJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - setJoinBuildAndProbeTable(pInfo, pJoinNode); - code = buildJoinResColMap(pInfo, pJoinNode); + setHJoinBuildAndProbeTable(pInfo, pJoinNode); + code = buildHJoinResColMap(pInfo, pJoinNode); if (code) { goto _error; } - code = initJoinBufPages(pInfo); + code = initHJoinBufPages(pInfo); if (code) { goto _error; } @@ -305,7 +810,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroHashJoinOperator, optrDefaultBufFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL); return pOperator; @@ -319,460 +824,4 @@ _error: return NULL; } -void destroHashJoinOperator(void* param) { - SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; - if (pJoinOperator->pColEqualOnConditions != NULL) { - mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); - taosMemoryFreeClear(pJoinOperator->rightEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->rightEqOnCondCols); - taosMemoryFreeClear(pJoinOperator->leftEqOnCondKeyBuf); - taosArrayDestroy(pJoinOperator->leftEqOnCondCols); - } - nodesDestroyNode(pJoinOperator->pCondAfterMerge); - - taosArrayDestroy(pJoinOperator->rowCtx.leftCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.rightCreatedBlocks); - taosArrayDestroy(pJoinOperator->rowCtx.leftRowLocations); - taosArrayDestroy(pJoinOperator->rowCtx.rightRowLocations); - - pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); - taosMemoryFreeClear(param); -} - -FORCE_INLINE char* retrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { - SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId); - return pPage->data + pRow->offset; -} - -FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { - SHJoinTableInfo* pBuild = pJoin->pBuild; - SHJoinTableInfo* pProbe = pJoin->pProbe; - int32_t buildIdx = 0, buildValIdx = 0; - int32_t probeIdx = 0; - SBufRowInfo* pRow = pStart; - int32_t code = 0; - - for (int32_t r = 0; r < rowNum; ++r) { - char* pData = retrieveColDataFromRowBufs(pJoin->pRowBufs, pRow); - char* pValData = pData + pBuild->valBitMapSize; - for (int32_t i = 0; i < pJoin->pResColNum; ++i) { - if (pJoin->pResColMap[i]) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot); - if (pBuild->valCols[buildIdx].keyCol) { - - } else { - if (colDataIsNull_f(pData, buildValIdx)) { - code = colDataSetVal(pDst, pRes->info.rows + r, NULL, true); - if (code) { - return code; - } - } else { - code = colDataSetVal(pDst, pRes->info.rows + r, pValData, false); - if (code) { - return code; - } - pValData += pBuild->valCols[buildIdx].vardata ? varDataTLen(pValData) : pBuild->valCols[buildIdx].bytes; - } - buildValIdx++; - } - buildIdx++; - } else { - SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].srcSlot); - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); - - code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx)); - if (code) { - return code; - } - probeIdx++; - } - } - pRow = pRow->next; - } - - return TSDB_CODE_SUCCESS; -} - - -FORCE_INLINE void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { - SHJoinOperatorInfo* pJoin = pOperator->info; - SHJoinCtx* pCtx = &pJoin->ctx; - SBufRowInfo* pStart = pCtx->pBuildRow; - int32_t rowNum = 0; - int32_t resNum = pRes.info.rows; - - while (pCtx->pBuildRow && resNum < pRes.info.capacity) { - rowNum++; - resNum++; - pCtx->pBuildRow = pCtx->pBuildRow->next; - } - - int32_t code = copyJoinResRowsToBlock(pJoin, rowNum, pStart, pRes); - if (code) { - pOperator->pTaskInfo->code = code; - T_LONG_JMP(pOperator->pTaskInfo->env, code); - } - - pRes->info.rows = resNum; - pCtx->rowRemains = pCtx->pBuildRow ? true : false; -} - -void doHashJoinImpl(struct SOperatorInfo* pOperator) { - SHJoinOperatorInfo* pJoin = pOperator->info; - SHJoinTableInfo* pProbe = pJoin->pProbe; - SHJoinCtx* pCtx = &pJoin->ctx; - SSDataBlock* pRes = pJoin->pRes; - size_t bufLen = 0; - - if (pJoin->ctx.pBuildRow) { - appendJoinResToBlock(pOperator, pRes); - return; - } - - for (int32_t i = pCtx->probeIdx; i < pCtx->pProbeData->info.rows; ++i) { - copyKeyColsDataToBuf(pProbe, i, &bufLen); - SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); - if (pGroup) { - pCtx->pBuildRow = pGroup->rows; - appendJoinResToBlock(pOperator, pRes); - if (pRes->info.rows >= pRes.info.capacity) { - break; - } - } - } -} - -int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { - for (int32_t i = 0; i < pTable->keyNum; ++i) { - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); - if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { - qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata); - return TSDB_CODE_INVALID_PARA; - } - if (pTable->keyCols[i].bytes != pCol->info.bytes) { - qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes, pTable->keyCols[i].bytes); - return TSDB_CODE_INVALID_PARA; - } - pTable->keyCols[i].data = pCol->pData; - if (pTable->keyCols[i].vardata) { - pTable->keyCols[i].offset = pCol->varmeta.offset; - } - } - - return TSDB_CODE_SUCCESS; -} - -int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { - if (!pTable->valColExist) { - return TSDB_CODE_SUCCESS; - } - for (int32_t i = 0; i < pTable->valNum; ++i) { - if (pTable->valCols[i].keyCol) { - continue; - } - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->valCols[i].srcSlot); - if (pTable->valCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { - qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->valCols[i].srcSlot, pCol->info.type, pTable->valCols[i].vardata); - return TSDB_CODE_INVALID_PARA; - } - if (pTable->valCols[i].bytes != pCol->info.bytes) { - qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->valCols[i].srcSlot, pCol->info.bytes, pTable->valCols[i].bytes); - return TSDB_CODE_INVALID_PARA; - } - if (!pTable->valCols[i].vardata)) { - pTable->valCols[i].bitMap = pCol->nullbitmap; - } - pTable->valCols[i].data = pCol->pData; - if (pTable->valCols[i].vardata) { - pTable->valCols[i].offset = pCol->varmeta.offset; - } - } - - return TSDB_CODE_SUCCESS; -} - - -FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { - char *pData = NULL; - size_t bufLen = 0; - - if (1 == pTable->keyNum) { - if (pTable->keyCols[0].vardata) { - pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx]; - bufLen = varDataTLen(pData); - } else { - pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx; - bufLen = pTable->keyCols[0].bytes; - } - pTable->keyData = pData; - } else { - for (int32_t i = 0; i < pTable->keyNum; ++i) { - if (pTable->keyCols[i].vardata) { - pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; - memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); - bufLen += varDataTLen(pData); - } else { - pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx; - memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); - bufLen += pTable->keyCols[i].bytes; - } - } - pTable->keyData = pTable->keyBuf; - } - - if (pBufLen) { - *pBufLen = bufLen; - } -} - -FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { - if (!pTable->valColExist) { - return; - } - - char *pData = NULL; - size_t bufLen = pTable->valBitMapSize; - for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) { - if (pTable->valCols[i].keyCol) { - continue; - } - if (pTable->valCols[i].vardata) { - if (-1 == pTable->valCols[i].offset[rowIdx]) { - colDataSetNull_f(pTable->valData, m); - } else { - pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx]; - memcpy(pTable->valData + bufLen, pData, varDataTLen(pData)); - bufLen += varDataTLen(pData); - } - } else { - if (colDataIsNull_f(pTable->valCols[i].bitMap, rowIdx)) { - colDataSetNull_f(pTable->valData, m); - } else { - pData = pTable->valCols[i].data + pTable->valCols[i].bytes * rowIdx; - memcpy(pTable->valData + bufLen, pData, pTable->valCols[i].bytes); - bufLen += pTable->valCols[i].bytes; - } - } - m++; - } -} - - -FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { - if (0 == bufSize) { - pRow->pageId = -1; - return TSDB_CODE_SUCCESS; - } - - if (bufSize > HASH_JOIN_DEFAULT_PAGE_SIZE) { - qError("invalid join value buf size:%d", bufSize); - return TSDB_CODE_INVALID_PARA; - } - - do { - SBufPageInfo* page = taosArrayGetLast(pPages); - if ((page->pageSize - page->offset) >= bufSize) { - *pBuf = page->data + page->offset; - pRow->pageId = taosArrayGetSize(pPages) - 1; - pRow->offset = page->offset; - page->offset += bufSize; - return TSDB_CODE_SUCCESS; - } - - int32_t code = addPageToJoinBuf(pPages); - if (code) { - return code; - } - } while (true); -} - -FORCE_INLINE int32_t getJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { - if (NULL == pTable->valVarCols) { - return pTable->valBufSize; - } - - int32_t* varColIdx = NULL; - int32_t bufLen = pTable->valBufSize; - int32_t varColNum = taosArrayGetSize(pTable->valVarCols); - for (int32_t i = 0; i < varColNum; ++i) { - varColIdx = taosArrayGet(pTable->valVarCols, i); - char* pData = pTable->valCols[*varColIdx].data + pTable->valCols[*varColIdx].offset[rowIdx]; - bufLen += varDataTLen(pData); - } - - return bufLen; -} - - -int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { - SGroupData group = {0}; - SBufRowInfo* pRow = NULL; - - if (NULL == pGroup) { - group.rows = taosMemoryMalloc(sizeof(SBufRowInfo)); - if (NULL == group.rows) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pRow = group.rows; - } else { - pRow = taosMemoryMalloc(sizeof(SBufRowInfo)) - if (NULL == pRow) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } - - int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); - if (code) { - return code; - } - - if (NULL == pGroup) { - pRow->next = NULL; - if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } else { - pRow->next = pGroup->rows; - pGroup->rows = pRow; - } - - return TSDB_CODE_SUCCESS; -} - -int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { - SHJoinTableInfo* pBuild = pJoin->pBuild; - int32_t code = setValColsData(pBlock, pBuild); - if (code) { - return code; - } - - SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyData, keyLen); - code = addRowToHashImpl(pJoin, pGroup, pBuild, keyLen, rowIdx); - if (code) { - return code; - } - - copyValColsDataToBuf(pBuild, rowIdx); - - return TSDB_CODE_SUCCESS; -} - -int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { - SHJoinTableInfo* pBuild = pJoin->pBuild; - int32_t code = setKeyColsData(pBlock, pBuild); - if (code) { - return code; - } - - size_t bufLen = 0; - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - copyKeyColsDataToBuf(pBuild, i, &bufLen); - code = addRowToHash(pJoin, pBlock, bufLen, i); - if (code) { - return code; - } - } - - return code; -} - -int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { - SHJoinOperatorInfo* pJoin = pOperator->info; - SSDataBlock* pBlock = NULL; - int32_t code = TSDB_CODE_SUCCESS; - - while (true) { - pBlock = pJoin->pBuild->downStream->fpSet.getNextFn(pJoin->pBuild->downStream); - if (NULL == pBlock) { - break; - } - - code = addBlockRowsToHash(pBlock, pJoin); - if (code) { - return code; - } - } - - return TSDB_CODE_SUCCESS; -} - -void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { - SHJoinOperatorInfo* pJoin = pOperator->info; - SHJoinTableInfo* pProbe = pJoin->pProbe; - int32_t code = setKeyColsData(pBlock, pProbe); - if (code) { - return code; - } - code = setValColsData(pBlock, pProbe); - if (code) { - return code; - } - - pJoin->ctx.probeIdx = 0; - pJoin->ctx.pBuildRow = NULL; - pJoin->ctx.pProbeData = pBlock; - - doHashJoinImpl(pOperator); -} - -SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { - SHJoinOperatorInfo* pJoin = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int32_t code = TSDB_CODE_SUCCESS; - SSDataBlock* pRes = pJoin->pRes; - pRes->info.rows = 0; - - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - if (NULL == pJoin->pKeyHash) { - code = buildJoinKeyHash(pJoin); - if (code) { - pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); - } - - if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } - } - - if (pJoin->ctx.rowRemains) { - doHashJoinImpl(pOperator); - - if (pRes->info.rows >= pOperator->resultInfo.threshold && pOperator->exprSupp.pFilterInfo != NULL) { - doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); - } - if (pRes->info.rows > 0) { - return pRes; - } - } - - while (true) { - SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); - if (NULL == pBlock) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - pOperator->status = OP_EXEC_DONE; - break; - } - - launchBlockHashJoin(pOperator, pBlock); - - if (pRes->info.rows < pOperator->resultInfo.threshold) { - continue; - } - - if (pOperator->exprSupp.pFilterInfo != NULL) { - doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); - } - if (pRes->info.rows > 0) { - break; - } - } - - return (pRes->info.rows > 0) ? pRes : NULL; -}