diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 61fe0d0b0f..458171045c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -415,7 +415,6 @@ typedef struct SHashJoinPhysiNode { EJoinType joinType; SNodeList* pOnLeft; SNodeList* pOnRight; - SNode* pOnConditions; SNode* pFilterConditions; SNodeList* pTargets; SQueryStat inputStat[2]; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 12890571f9..6737aad5e2 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -167,7 +167,11 @@ typedef struct STempTableNode { SNode* pSubquery; } STempTableNode; -typedef enum EJoinType { JOIN_TYPE_INNER = 1 } EJoinType; +typedef enum EJoinType { + JOIN_TYPE_INNER = 1, + JOIN_TYPE_LEFT, + JOIN_TYPE_RIGHT, +} EJoinType; typedef struct SJoinTableNode { STableNode table; // QUERY_NODE_JOIN_TABLE diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 94eaeeac28..136444ed39 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -19,6 +19,8 @@ extern "C" { #endif +#define HASH_JOIN_DEFAULT_PAGE_SIZE 10485760 + typedef struct SHJoinCtx { bool rowRemains; SBufRowInfo* pBuildRow; @@ -31,14 +33,17 @@ typedef struct SRowLocation { int32_t pos; } SRowLocation; -typedef struct SColBufInfo { +typedef struct SHJoinColInfo { int32_t srcSlot; int32_t dstSlot; + bool keyCol; bool vardata; int32_t* offset; int32_t bytes; char* data; -} SColBufInfo; + char* bitMap; + char* dataInBuf; +} SHJoinColInfo; typedef struct SBufPageInfo { int32_t pageSize; @@ -50,8 +55,7 @@ typedef struct SBufPageInfo { typedef struct SBufRowInfo { void* next; uint16_t pageId; - int32_t offset:31; - int32_t isNull:1; + int32_t offset; } SBufRowInfo; #pragma pack(pop) @@ -59,33 +63,37 @@ typedef struct SGroupData { SBufRowInfo* rows; } SGroupData; -typedef struct SJoinTableInfo { +typedef struct SHJoinTableInfo { SOperatorInfo* downStream; int32_t blkId; SQueryStat inputStat; int32_t keyNum; - SColBufInfo* keyCols; + SHJoinColInfo* keyCols; char* keyBuf; + char* keyData; int32_t valNum; - SColBufInfo* valCols; + SHJoinColInfo* valCols; + char* valData; + int32_t valBitMapSize; int32_t valBufSize; - bool valVarData; -} SJoinTableInfo; + SArray* valVarCols; + bool valColExist; +} SHJoinTableInfo; typedef struct SHJoinOperatorInfo { - SSDataBlock* pRes; - int32_t joinType; - SJoinTableInfo tbs[2]; - SJoinTableInfo* pBuild; - SJoinTableInfo* pProbe; - int32_t pResColNum; - int8_t* pResColMap; - SArray* pRowBufs; - SNode* pCondAfterJoin; - SSHashObj* pKeyHash; - SHJoinCtx ctx; + int32_t joinType; + SHJoinTableInfo tbs[2]; + SHJoinTableInfo* pBuild; + SHJoinTableInfo* pProbe; + SSDataBlock* pRes; + int32_t pResColNum; + int8_t* pResColMap; + SArray* pRowBufs; + SNode* pCond; + SSHashObj* pKeyHash; + SHJoinCtx ctx; } SHJoinOperatorInfo; static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index a79b25ad40..57ed964b87 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -27,27 +27,31 @@ #include "ttypes.h" #include "hashjoin.h" -int32_t initJoinKeyBufInfo(SColBufInfo** ppInfo, int32_t* colNum, SNodeList* pList, char** ppBuf) { - *colNum = LIST_LENGTH(pList); +int32_t initJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { + pTable->keyNum = LIST_LENGTH(pList); - (*ppInfo) = taosMemoryMalloc((*colNum) * sizeof(SColBufInfo)); - if (NULL == (*ppInfo)) { + pTable->keyCols = taosMemoryMalloc(pTable->keyNum * sizeof(SHJoinColInfo)); + if (NULL == pTable->keyCols) { return TSDB_CODE_OUT_OF_MEMORY; } int64_t bufSize = 0; + int32_t i = 0; SNode* pNode = NULL; FOREACH(pNode, pList) { SColumnNode* pColNode = (SColumnNode*)pNode; - (*ppInfo)->srcSlot = pColNode->slotId; - (*ppInfo)->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); - (*ppInfo)->bytes = pColNode->node.resType.bytes; + pTable->keyCols[i]->srcSlot = pColNode->slotId; + pTable->keyCols[i]->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + pTable->keyCols[i]->bytes = pColNode->node.resType.bytes; bufSize += pColNode->node.resType.bytes; + ++i; } - *ppBuf = taosMemoryMalloc(bufSize); - if (NULL == *ppBuf) { - return TSDB_CODE_OUT_OF_MEMORY; + if (pTable->keyNum > 1) { + pTable->keyBuf = taosMemoryMalloc(bufSize); + if (NULL == pTable->keyBuf) { + return TSDB_CODE_OUT_OF_MEMORY; + } } return TSDB_CODE_SUCCESS; @@ -58,46 +62,80 @@ void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) { SNode* pNode = NULL; FOREACH(pNode, pList) { - SColumnNode* pCol = (SColumnNode*)pNode; + STargetNode* pTarget = (STargetNode*)pNode; + SColumnNode* pCol = (SColumnNode*)pTarget->pExpr; if (pCol->dataBlockId == blkId) { (*colNum)++; } } } -int32_t initJoinValBufInfo(SJoinTableInfo* pTable, SNodeList* pList) { +bool valColInKeyCols(int16_t slotId, int32_t keyNum, SHJoinColInfo* pKeys, int32_t* pKeyIdx) { + for (int32_t i = 0; i < keyNum; ++i) { + if (pKeys[i].srcSlot == slotId) { + *pKeyIdx = i; + return true; + } + } + + return false; +} + +int32_t initJoinValColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { getJoinValColNum(pList, pTable->blkId, &pTable->valNum); - if (pTable->valNum <= 0) { - qError("fail to get join value column, num:%d", pTable->valNum); - return TSDB_CODE_INVALID_MSG; + if (pTable->valNum == 0) { + return TSDB_CODE_SUCCESS; } - pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SColBufInfo)); + pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SHJoinColInfo)); if (NULL == pTable->valCols) { return TSDB_CODE_OUT_OF_MEMORY; } + int32_t i = 0; + int32_t colNum = 0; SNode* pNode = NULL; FOREACH(pNode, pList) { - SColumnNode* pColNode = (SColumnNode*)pNode; + STargetNode* pTarget = (STargetNode*)pNode; + SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr; if (pColNode->dataBlockId == pTable->blkId) { - pTable->valCols->srcSlot = pColNode->slotId; - pTable->valCols->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); - if (pTable->valCols->vardata) { - pTable->valVarData = true; + if (valColInKeyCols(pColNode->slotId, pTable->keyNum, pTable->keyCols, &pTable->valCols[i].srcSlot)) { + pTable->valCols[i].keyCol = true; + } else { + pTable->valCols[i].keyCol = false; + pTable->valCols[i].srcSlot = pColNode->slotId; + pTable->valColExist = true; + colNum++; } - pTable->valCols->bytes = pColNode->node.resType.bytes; - pTable->valBufSize += pColNode->node.resType.bytes; + pTable->valCols[i].dstSlot = pTarget->slotId; + pTable->valCols[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type); + if (pTable->valCols[i].vardata) { + if (NULL == pTable->valVarCols) { + pTable->valVarCols = taosArrayInit(pTable->valNum, sizeof(int32_t)); + if (NULL == pTable->valVarCols) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + taosArrayPush(pTable->valVarCols, &i); + } + pTable->valCols[i].bytes = pColNode->node.resType.bytes; + if (!pTable->valCols[i].keyCol && !pTable->valCols[i].vardata) { + pTable->valBufSize += pColNode->node.resType.bytes; + } + i++; } } + pTable->valBitMapSize = colNum / sizeof(int8_t) + ((colNum % sizeof(int8_t)) ? 1 : 0); + pTable->valBufSize += pTable->valBitMapSize; + return TSDB_CODE_SUCCESS; } int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { SNodeList* pKeyList = NULL; - SJoinTableInfo* pTable = &pJoin->tbs[idx]; + SHJoinTableInfo* pTable = &pJoin->tbs[idx]; pTable->downStream = pDownstream[idx]; pTable->blkId = pDownstream[idx]->resultDataBlockId; if (0 == idx) { @@ -106,11 +144,11 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo pKeyList = pJoinNode->pOnRight; } - int32_t code = initJoinKeyBufInfo(&pTable->keyCols, &pTable->keyNum, pKeyList, &pTable->keyBuf); + int32_t code = initJoinKeyColsInfo(pTable, pKeyList); if (code) { return code; } - int32_t code = initJoinValBufInfo(&pTable->keyCols, &pTable->keyNum, pJoinNode->pTargets, &pTable->keyBuf, pTable->blkId, pTable); + int32_t code = initJoinValColsInfo(pTable, pJoinNode->pTargets); if (code) { return code; } @@ -121,20 +159,50 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo } void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { + int32_t buildIdx = 0; + int32_t probeIdx = 1; + + pInfo->joinType = pJoinNode->joinType; + + switch (pInfo->joinType) { + case JOIN_TYPE_INNER: + if (pInfo->tbs[0].inputStat.inputRowNum <= pInfo->tbs[1].inputStat.inputRowNum) { + buildIdx = 0; + probeIdx = 1; + } else { + buildIdx = 1; + probeIdx = 0; + } + break; + case JOIN_TYPE_LEFT: + buildIdx = 1; + probeIdx = 0; + break; + case JOIN_TYPE_RIGHT: + buildIdx = 0; + probeIdx = 1; + break; + default: + break; + } + + pInfo->pBuild = &pInfo->tbs[buildIdx]; + pInfo->pProbe = &pInfo->tbs[probeIdx]; +} + +void buildJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { pInfo->pResColNum = pJoinNode->pTargets->length; pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t)); if (NULL == pInfo->pResColMap) { return TSDB_CODE_OUT_OF_MEMORY; } - pInfo->pBuild = &pInfo->tbs[1]; - pInfo->pProbe = &pInfo->tbs[0]; - SNode* pNode = NULL; int32_t i = 0; FOREACH(pNode, pJoinNode->pTargets) { - SColumnNode* pColNode = (SColumnNode*)pNode; - if (pColNode->dataBlockId == pInfo->pBuild->blkId) { + STargetNode* pTarget = (STargetNode*)pNode; + SColumnNode* pCol = (SColumnNode*)pTarget->pExpr; + if (pCol->dataBlockId == pInfo->pBuild->blkId) { pInfo->pResColMap[i] = 1; } @@ -144,6 +212,7 @@ void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJ return TSDB_CODE_SUCCESS; } + FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) { SBufPageInfo page; page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE; @@ -180,19 +249,16 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n int32_t numOfCols = 0; pInfo->pRes = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); - - SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); setOperatorInfo(pOperator, "HashJoinOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; initJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]); initJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]); - code = setJoinBuildAndProbeTable(pInfo, pJoinNode); + setJoinBuildAndProbeTable(pInfo, pJoinNode); + code = buildJoinResColMap(pInfo, pJoinNode); if (code) { goto _error; } @@ -210,13 +276,13 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n } if (pJoinNode->pFilterConditions != NULL && pJoinNode->node.pConditions != NULL) { - pInfo->pCondAfterJoin = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); - if (pInfo->pCondAfterJoin == NULL) { + pInfo->pCond = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (pInfo->pCond == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterJoin); + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCond); pLogicCond->pParameterList = nodesMakeList(); if (pLogicCond->pParameterList == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -227,29 +293,25 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); pLogicCond->condType = LOGIC_COND_TYPE_AND; } else if (pJoinNode->pFilterConditions != NULL) { - pInfo->pCondAfterJoin = nodesCloneNode(pJoinNode->pFilterConditions); + pInfo->pCond = nodesCloneNode(pJoinNode->pFilterConditions); } else if (pJoinNode->node.pConditions != NULL) { - pInfo->pCondAfterJoin = nodesCloneNode(pJoinNode->node.pConditions); + pInfo->pCond = nodesCloneNode(pJoinNode->node.pConditions); } else { - pInfo->pCondAfterJoin = NULL; + pInfo->pCond = NULL; } - code = filterInitFromNode(pInfo->pCondAfterJoin, &pOperator->exprSupp.pFilterInfo, 0); + code = filterInitFromNode(pInfo->pCond, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { goto _error; } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroHashJoinOperator, optrDefaultBufFn, NULL); - code = appendDownstream(pOperator, pDownstream, numOfDownstream); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } return pOperator; _error: if (pInfo != NULL) { - destroyMergeJoinOperator(pInfo); + destroyHashJoinOperator(pInfo); } taosMemoryFree(pOperator); @@ -278,47 +340,56 @@ void destroHashJoinOperator(void* param) { taosMemoryFreeClear(param); } -FORCE_INLINE char* getColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { +FORCE_INLINE char* retrieveColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) { SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId); return pPage->data + pRow->offset; } FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) { - SJoinTableInfo* pBuild = pJoin->pBuild; - SJoinTableInfo* pProbe = pJoin->pProbe; + SHJoinTableInfo* pBuild = pJoin->pBuild; + SHJoinTableInfo* pProbe = pJoin->pProbe; int32_t buildIdx = 0; int32_t probeIdx = 0; SBufRowInfo* pRow = pStart; int32_t code = 0; + + for (int32_t r = 0; r < rowNum; ++r) { + char* pData = retrieveColDataFromRowBufs(pJoin->pRowBufs, pRow); + for (int32_t i = 0; i < pJoin->pResColNum; ++i) { + if (pJoin->pResColMap[i]) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot); + if (pBuild->valCols[buildIdx].keyCol) { + + } else if (colDataIsNull_f(pData, r)) { - for (int32_t i = 0; i < pJoin->pResColNum; ++i) { - if (pJoin->pResColMap[i]) { - SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot); - for (int32_t r = 0; r < rowNum; ++r) { - code = colDataSetVal(pCol, pRes->info.rows + r, pRow->isNull ? NULL : getColDataFromRowBufs(pJoin->pRowBufs, pRow), pRow->isNull); + } else { + code = colDataSetVal(pDst, pRes->info.rows + r, , pRow->isNull); + if (code) { + return code; + } + } + buildIdx++; + } else { + SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].srcSlot); + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pProbe->valCols[probeIdx].dstSlot); + + code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx)); if (code) { return code; } - pRow = pRow->next; + probeIdx++; } - buildIdx++; - } else { - SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].srcSlot); - SColumnInfoData* pDst = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].dstSlot); - - code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx)); - if (code) { - return code; - } - probeIdx++; } + pRow = pRow->next; } + + return TSDB_CODE_SUCCESS; } -void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { +FORCE_INLINE void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinCtx* pCtx = &pJoin->ctx; SBufRowInfo* pStart = pCtx->pBuildRow; @@ -343,7 +414,7 @@ void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { void doHashJoinImpl(struct SOperatorInfo* pOperator) { SHJoinOperatorInfo* pJoin = pOperator->info; - SJoinTableInfo* pProbe = pJoin->pProbe; + SHJoinTableInfo* pProbe = pJoin->pProbe; SHJoinCtx* pCtx = &pJoin->ctx; SSDataBlock* pRes = pJoin->pRes; size_t bufLen = 0; @@ -354,8 +425,8 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) { } for (int32_t i = pCtx->probeIdx; i < pCtx->pProbeData->info.rows; ++i) { - copyColDataToBuf(pProbe->keyNum, i, pProbe->keyCols, pProbe->keyBuf, &bufLen); - SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyBuf, bufLen); + copyKeyColsDataToBuf(pProbe, i, &bufLen); + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); if (pGroup) { pCtx->pBuildRow = pGroup->rows; appendJoinResToBlock(pOperator, pRes); @@ -366,40 +437,82 @@ void doHashJoinImpl(struct SOperatorInfo* pOperator) { } } -int32_t setColBufInfo(SSDataBlock* pBlock, int32_t colNum, SColBufInfo* pColList) { - for (int32_t i = 0; i < colNum; ++i) { - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pColList[i].srcSlot); - if (pColList[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { - qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pColList[i].srcSlot, pCol->info.type, pColList[i].vardata); +int32_t setKeyColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { + for (int32_t i = 0; i < pTable->keyNum; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot); + if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { + qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot, pCol->info.type, pTable->keyCols[i].vardata); return TSDB_CODE_INVALID_PARA; } - if (pColList[i].bytes != IS_VAR_DATA_TYPE(pCol->info.bytes)) { - qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pColList[i].srcSlot, pCol->info.bytes, pColList[i].bytes); + if (pTable->keyCols[i].bytes != pCol->info.bytes) { + qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes, pTable->keyCols[i].bytes); return TSDB_CODE_INVALID_PARA; } - pColList[i].data = pCol->pData; - if (pColList[i].vardata) { - pColList[i].offset = pCol->varmeta.offset; + pTable->keyCols[i].data = pCol->pData; + if (pTable->keyCols[i].vardata) { + pTable->keyCols[i].offset = pCol->varmeta.offset; } } return TSDB_CODE_SUCCESS; } -FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo* pColList, char* pBuf, size_t *pBufLen) { - char *pData = NULL; - - size_t bufLen = 0; - for (int32_t i = 0; i < colNum; ++i) { - if (pColList[i].vardata) { - pData = pColList[i].data + pColList[i].offset[rowIdx]; - memcpy(pBuf + bufLen, pData, varDataTLen(pData)); - bufLen += varDataTLen(pData); - } else { - pData = pColList[i].data + pColList[i].bytes * rowIdx; - memcpy(pBuf + bufLen, pColList[i].data, pColList[i].bytes); - bufLen += pColList[i].bytes; +int32_t setValColsData(SSDataBlock* pBlock, SHJoinTableInfo* pTable) { + if (!pTable->valColExist) { + return TSDB_CODE_SUCCESS; + } + for (int32_t i = 0; i < pTable->valNum; ++i) { + if (pTable->valCols[i].keyCol) { + continue; } + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->valCols[i].srcSlot); + if (pTable->valCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) { + qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->valCols[i].srcSlot, pCol->info.type, pTable->valCols[i].vardata); + return TSDB_CODE_INVALID_PARA; + } + if (pTable->valCols[i].bytes != pCol->info.bytes) { + qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->valCols[i].srcSlot, pCol->info.bytes, pTable->valCols[i].bytes); + return TSDB_CODE_INVALID_PARA; + } + if (!pTable->valCols[i].vardata)) { + pTable->valCols[i].bitMap = pCol->nullbitmap; + } + pTable->valCols[i].data = pCol->pData; + if (pTable->valCols[i].vardata) { + pTable->valCols[i].offset = pCol->varmeta.offset; + } + } + + return TSDB_CODE_SUCCESS; +} + + +FORCE_INLINE void copyKeyColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx, size_t *pBufLen) { + char *pData = NULL; + size_t bufLen = 0; + + if (1 == pTable->keyNum) { + if (pTable->keyCols[0].vardata) { + pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx]; + bufLen = varDataTLen(pData); + } else { + pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx; + bufLen = pTable->keyCols[0].bytes; + } + pTable->keyData = pData; + } else { + for (int32_t i = 0; i < pTable->keyNum; ++i) { + if (pTable->keyCols[i].vardata) { + pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx]; + memcpy(pTable->keyBuf + bufLen, pData, varDataTLen(pData)); + bufLen += varDataTLen(pData); + } else { + pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx; + memcpy(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes); + bufLen += pTable->keyCols[i].bytes; + } + } + pTable->keyData = pTable->keyBuf; } if (pBufLen) { @@ -407,7 +520,50 @@ FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo* } } +FORCE_INLINE void copyValColsDataToBuf(SHJoinTableInfo* pTable, int32_t rowIdx) { + if (!pTable->valColExist) { + return; + } + + char *pData = NULL; + size_t bufLen = pTable->valBitMapSize; + for (int32_t i = 0, m = 0; i < pTable->valNum; ++i) { + if (pTable->valCols[i].keyCol) { + continue; + } + if (pTable->valCols[i].vardata) { + if (-1 == pTable->valCols[i].offset[rowIdx]) { + colDataSetNull_f(pTable->valData, m); + } else { + pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx]; + memcpy(pTable->valData + bufLen, pData, varDataTLen(pData)); + bufLen += varDataTLen(pData); + } + } else { + if (colDataIsNull_f(pTable->valCols[i].bitMap, rowIdx)) { + colDataSetNull_f(pTable->valData, m); + } else { + pData = pTable->valCols[i].data + pTable->valCols[i].bytes * rowIdx; + memcpy(pTable->valData + bufLen, pData, pTable->valCols[i].bytes); + bufLen += pTable->valCols[i].bytes; + } + } + m++; + } +} + + FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) { + if (0 == bufSize) { + pRow->pageId = -1; + return TSDB_CODE_SUCCESS; + } + + if (bufSize > HASH_JOIN_DEFAULT_PAGE_SIZE) { + qError("invalid join value buf size:%d", bufSize); + return TSDB_CODE_INVALID_PARA; + } + do { SBufPageInfo* page = taosArrayGetLast(pPages); if ((page->pageSize - page->offset) >= bufSize) { @@ -425,26 +581,25 @@ FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** } while (true); } -FORCE_INLINE int32_t getJoinValBufSize(SJoinTableInfo* pTable, int32_t rowIdx) { - if (!pTable->valVarData) { +FORCE_INLINE int32_t getJoinValBufSize(SHJoinTableInfo* pTable, int32_t rowIdx) { + if (NULL == pTable->valVarCols) { return pTable->valBufSize; } - int32_t bufLen = 0; - for (int32_t i = 0; i < pTable->valNum; ++i) { - if (pTable->valCols[i].vardata) { - char* pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx]; - bufLen += varDataTLen(pData); - } else { - bufLen += pTable->valCols[i].bytes; - } + int32_t* varColIdx = NULL; + int32_t bufLen = pTable->valBufSize; + int32_t varColNum = taosArrayGetSize(pTable->valVarCols); + for (int32_t i = 0; i < varColNum; ++i) { + varColIdx = taosArrayGet(pTable->valVarCols, i); + char* pData = pTable->valCols[*varColIdx].data + pTable->valCols[*varColIdx].offset[rowIdx]; + bufLen += varDataTLen(pData); } return bufLen; } -int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* pGroup, SJoinTableInfo* pTable, char** pBuf, size_t keyLen, int32_t rowIdx) { +int32_t addRowToHashImpl(SHJoinOperatorInfo* pJoin, SGroupData* pGroup, SHJoinTableInfo* pTable, size_t keyLen, int32_t rowIdx) { SGroupData group = {0}; SBufRowInfo* pRow = NULL; @@ -461,14 +616,14 @@ int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* p } } - int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), pBuf, pRow); + int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), &pTable->valData, pRow); if (code) { return code; } if (NULL == pGroup) { pRow->next = NULL; - if (tSimpleHashPut(pHash, pTable->keyBuf, keyLen, &group, sizeof(group))) { + if (tSimpleHashPut(pJoin->pKeyHash, pTable->keyData, keyLen, &group, sizeof(group))) { return TSDB_CODE_OUT_OF_MEMORY; } } else { @@ -479,36 +634,35 @@ int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* p return TSDB_CODE_SUCCESS; } -int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, char* pKey, size_t keyLen, int32_t rowIdx) { - SJoinTableInfo* pBuild = pJoin->pBuild; - int32_t code = setColBufInfo(pBlock, pBuild->valNum, pBuild->valCols); +int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, size_t keyLen, int32_t rowIdx) { + SHJoinTableInfo* pBuild = pJoin->pBuild; + int32_t code = setValColsData(pBlock, pBuild); if (code) { return code; } - char *valBuf = NULL; - SGroupData* pRes = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyBuf, keyLen); - code = getJoinValBuf(pJoin->pKeyHash, pRes, pBuild, &valBuf, keyLen, rowIdx); + SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyData, keyLen); + code = addRowToHashImpl(pJoin, pGroup, pBuild, keyLen, rowIdx); if (code) { return code; } - copyColDataToBuf(pBuild->valNum, rowIdx, pBuild->valCols, valBuf, NULL); + copyValColsDataToBuf(pBuild, rowIdx); return TSDB_CODE_SUCCESS; } int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) { - SJoinTableInfo* pBuild = pJoin->pBuild; - int32_t code = setColBufInfo(pBlock, pBuild->keyNum, pBuild->keyCols); + SHJoinTableInfo* pBuild = pJoin->pBuild; + int32_t code = setKeyColsData(pBlock, pBuild); if (code) { return code; } size_t bufLen = 0; for (int32_t i = 0; i < pBlock->info.rows; ++i) { - copyColDataToBuf(pBuild->keyNum, i, pBuild->keyCols, pBuild->keyBuf, &bufLen); - code = addRowToHash(pJoin, pBlock, pBuild->keyBuf, bufLen, i); + copyKeyColsDataToBuf(pBuild, i, &bufLen); + code = addRowToHash(pJoin, pBlock, bufLen, i); if (code) { return code; } @@ -528,7 +682,7 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { break; } - code = addBlockRowsToHash(pBlock, pJoin->pKeyHash, pJoin->pBuild); + code = addBlockRowsToHash(pBlock, pJoin); if (code) { return code; } @@ -539,17 +693,16 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) { void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) { SHJoinOperatorInfo* pJoin = pOperator->info; - SJoinTableInfo* pProbe = pJoin->pProbe; - int32_t code = setColBufInfo(pBlock, pProbe->keyNum, pProbe->keyCols); + SHJoinTableInfo* pProbe = pJoin->pProbe; + int32_t code = setKeyColsData(pBlock, pProbe); if (code) { return code; } - code = setColBufInfo(pBlock, pProbe->valNum, pProbe->valCols); + code = setValColsData(pBlock, pProbe); if (code) { return code; } - pJoin->ctx.probeIdx = 0; pJoin->ctx.pBuildRow = NULL; pJoin->ctx.pProbeData = pBlock; @@ -562,7 +715,7 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = TSDB_CODE_SUCCESS; SSDataBlock* pRes = pJoin->pRes; - blockDataCleanup(pRes); + pRes->info.rows = 0; if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -596,6 +749,8 @@ SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { while (true) { SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); if (NULL == pBlock) { + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + pOperator->status = OP_EXEC_DONE; break; }