enh: add hash join

This commit is contained in:
dapan1121 2023-06-20 19:34:16 +08:00
parent 1a53a9084e
commit 7e555e437b
3 changed files with 286 additions and 100 deletions

View File

@ -143,15 +143,6 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData) {
int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) {
dataLen = getJsonValueLen(pData);
} else {
dataLen = varDataTLen(pData);
}
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
pColumnInfoData->reassigned = true;
} else {
@ -244,6 +235,65 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
}
void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows);
} else {
if (numOfRows < sizeof(char) * 2) {
for (int32_t i = 0; i < numOfRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
}
} else {
int32_t i = 0;
for (; i < numOfRows; ++i) {
if (BitPos(currentRow + i)) {
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
} else {
break;
}
}
memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, (numOfRows - i) / sizeof(char));
i += (numOfRows - i) / sizeof(char) * sizeof(char);
for (; i < numOfRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i);
}
}
}
}
int32_t colDataCopyAndReassign(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, uint32_t numOfRows) {
int32_t code = colDataSetVal(pColumnInfoData, currentRow, pData, false);
if (code) {
return code;
}
if (numOfRows > 1) {
int32_t* pOffset = pColumnInfoData->varmeta.offset;
memset(&pOffset[currentRow + 1], &pOffset[currentRow], sizeof(pOffset[0]) * (numOfRows - 1));
pColumnInfoData->reassigned = true;
}
return TSDB_CODE_SUCCESS;
}
int32_t colDataCopyNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
uint32_t numOfRows, bool isNull) {
int32_t len = pColumnInfoData->info.bytes;
if (isNull) {
colDataSetNItemsNull(pColumnInfoData, currentRow, numOfRows);
pColumnInfoData->hasNull = true;
return 0;
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return colDataCopyAndReassign(pColumnInfoData, currentRow, pData, numOfRows);
} else {
return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, false);
}
}
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
int32_t numOfRow2) {
if (numOfRow2 <= 0) return;

View File

@ -19,18 +19,12 @@
extern "C" {
#endif
typedef struct SHJoinRowCtx {
bool rowRemains;
int64_t ts;
SArray* leftRowLocations;
SArray* leftCreatedBlocks;
SArray* rightCreatedBlocks;
int32_t leftRowIdx;
int32_t rightRowIdx;
bool rightUseBuildTable;
SArray* rightRowLocations;
} SHJoinRowCtx;
typedef struct SHJoinCtx {
bool rowRemains;
SBufRowInfo* pBuildRow;
SSDataBlock* pProbeData;
int32_t probeIdx;
} SHJoinCtx;
typedef struct SRowLocation {
SSDataBlock* pDataBlock;
@ -38,7 +32,8 @@ typedef struct SRowLocation {
} SRowLocation;
typedef struct SColBufInfo {
int32_t slotId;
int32_t srcSlot;
int32_t dstSlot;
bool vardata;
int32_t* offset;
int32_t bytes;
@ -55,13 +50,14 @@ typedef struct SBufPageInfo {
typedef struct SBufRowInfo {
void* next;
uint16_t pageId;
int32_t offset;
int32_t offset:31;
int32_t isNull:1;
} SBufRowInfo;
#pragma pack(pop)
typedef struct SResRowData {
typedef struct SGroupData {
SBufRowInfo* rows;
} SResRowData;
} SGroupData;
typedef struct SJoinTableInfo {
SOperatorInfo* downStream;
@ -79,22 +75,19 @@ typedef struct SJoinTableInfo {
} SJoinTableInfo;
typedef struct SHJoinOperatorInfo {
SSDataBlock* pRes;
int32_t joinType;
SJoinTableInfo tbs[2];
SSDataBlock* pRes;
int32_t joinType;
SJoinTableInfo tbs[2];
SJoinTableInfo* pBuild;
SJoinTableInfo* pProbe;
int32_t pResColNum;
int8_t* pResColMap;
SArray* pRowBufs;
SNode* pCondAfterJoin;
SSHashObj* pKeyHash;
SHJoinRowCtx rowCtx;
SNode* pCondAfterJoin;
SSHashObj* pKeyHash;
SHJoinCtx ctx;
} SHJoinOperatorInfo;
static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator);
static void destroyHashJoinOperator(void* param);

View File

@ -39,7 +39,7 @@ int32_t initJoinKeyBufInfo(SColBufInfo** ppInfo, int32_t* colNum, SNodeList* pLi
SNode* pNode = NULL;
FOREACH(pNode, pList) {
SColumnNode* pColNode = (SColumnNode*)pNode;
(*ppInfo)->slotId = pColNode->slotId;
(*ppInfo)->srcSlot = pColNode->slotId;
(*ppInfo)->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
(*ppInfo)->bytes = pColNode->node.resType.bytes;
bufSize += pColNode->node.resType.bytes;
@ -81,7 +81,7 @@ int32_t initJoinValBufInfo(SJoinTableInfo* pTable, SNodeList* pList) {
FOREACH(pNode, pList) {
SColumnNode* pColNode = (SColumnNode*)pNode;
if (pColNode->dataBlockId == pTable->blkId) {
pTable->valCols->slotId = pColNode->slotId;
pTable->valCols->srcSlot = pColNode->slotId;
pTable->valCols->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
if (pTable->valCols->vardata) {
pTable->valVarData = true;
@ -121,8 +121,27 @@ int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNo
}
void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
pInfo->pResColNum = pJoinNode->pTargets->length;
pInfo->pResColMap = taosMemoryCalloc(pJoinNode->pTargets->length, sizeof(int8_t));
if (NULL == pInfo->pResColMap) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pInfo->pBuild = &pInfo->tbs[1];
pInfo->pProbe = &pInfo->tbs[0];
SNode* pNode = NULL;
int32_t i = 0;
FOREACH(pNode, pJoinNode->pTargets) {
SColumnNode* pColNode = (SColumnNode*)pNode;
if (pColNode->dataBlockId == pInfo->pBuild->blkId) {
pInfo->pResColMap[i] = 1;
}
i++;
}
return TSDB_CODE_SUCCESS;
}
FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) {
@ -173,7 +192,10 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
initJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
initJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
setJoinBuildAndProbeTable(pInfo, pJoinNode);
code = setJoinBuildAndProbeTable(pInfo, pJoinNode);
if (code) {
goto _error;
}
code = initJoinBufPages(pInfo);
if (code) {
@ -256,59 +278,103 @@ void destroHashJoinOperator(void* param) {
taosMemoryFreeClear(param);
}
static void doHashJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
SHJoinOperatorInfo* pJoinInfo = pOperator->info;
FORCE_INLINE char* getColDataFromRowBufs(SArray* pRowBufs, SBufRowInfo* pRow) {
SBufPageInfo *pPage = taosArrayGet(pRowBufs, pRow->pageId);
return pPage->data + pRow->offset;
}
int32_t nrows = pRes->info.rows;
bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false;
while (1) {
int64_t leftTs = 0;
int64_t rightTs = 0;
if (pJoinInfo->rowCtx.rowRemains) {
leftTs = pJoinInfo->rowCtx.ts;
rightTs = pJoinInfo->rowCtx.ts;
FORCE_INLINE int32_t copyJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, int32_t rowNum, SBufRowInfo* pStart, SSDataBlock* pRes) {
SJoinTableInfo* pBuild = pJoin->pBuild;
SJoinTableInfo* pProbe = pJoin->pProbe;
int32_t buildIdx = 0;
int32_t probeIdx = 0;
SBufRowInfo* pRow = pStart;
int32_t code = 0;
for (int32_t i = 0; i < pJoin->pResColNum; ++i) {
if (pJoin->pResColMap[i]) {
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, pBuild->valCols[buildIdx].dstSlot);
for (int32_t r = 0; r < rowNum; ++r) {
code = colDataSetVal(pCol, pRes->info.rows + r, pRow->isNull ? NULL : getColDataFromRowBufs(pJoin->pRowBufs, pRow), pRow->isNull);
if (code) {
return code;
}
pRow = pRow->next;
}
buildIdx++;
} else {
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) {
SColumnInfoData* pSrc = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].srcSlot);
SColumnInfoData* pDst = taosArrayGet(pJoin->ctx.pProbeData, pProbe->valCols[probeIdx].dstSlot);
code = colDataCopyNItems(pDst, pRes->info.rows, colDataGetData(pSrc, pJoin->ctx.probeIdx), rowNum, colDataIsNull_s(pSrc, pJoin->ctx.probeIdx));
if (code) {
return code;
}
probeIdx++;
}
}
return TSDB_CODE_SUCCESS;
}
void appendJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinCtx* pCtx = &pJoin->ctx;
SBufRowInfo* pStart = pCtx->pBuildRow;
int32_t rowNum = 0;
int32_t resNum = pRes.info.rows;
while (pCtx->pBuildRow && resNum < pRes.info.capacity) {
rowNum++;
resNum++;
pCtx->pBuildRow = pCtx->pBuildRow->next;
}
int32_t code = copyJoinResRowsToBlock(pJoin, rowNum, pStart, pRes);
if (code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
pRes->info.rows = resNum;
pCtx->rowRemains = pCtx->pBuildRow ? true : false;
}
void doHashJoinImpl(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SJoinTableInfo* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx;
SSDataBlock* pRes = pJoin->pRes;
size_t bufLen = 0;
if (pJoin->ctx.pBuildRow) {
appendJoinResToBlock(pOperator, pRes);
return;
}
for (int32_t i = pCtx->probeIdx; i < pCtx->pProbeData->info.rows; ++i) {
copyColDataToBuf(pProbe->keyNum, i, pProbe->keyCols, pProbe->keyBuf, &bufLen);
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyBuf, bufLen);
if (pGroup) {
pCtx->pBuildRow = pGroup->rows;
appendJoinResToBlock(pOperator, pRes);
if (pRes->info.rows >= pRes.info.capacity) {
break;
}
}
if (leftTs == rightTs) {
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
} else if ((asc && leftTs < rightTs) || (!asc && leftTs > rightTs)) {
pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
} else if ((asc && leftTs > rightTs) || (!asc && leftTs < rightTs)) {
pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows && pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
}
// the pDataBlock are always the same one, no need to call this again
pRes->info.rows = nrows;
pRes->info.dataLoad = 1;
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
}
}
int32_t setColBufInfo(SSDataBlock* pBlock, int32_t colNum, SColBufInfo* pColList) {
for (int32_t i = 0; i < colNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pColList[i].slotId);
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pColList[i].srcSlot);
if (pColList[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pColList[i].slotId, pCol->info.type, pColList[i].vardata);
qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pColList[i].srcSlot, pCol->info.type, pColList[i].vardata);
return TSDB_CODE_INVALID_PARA;
}
if (pColList[i].bytes != IS_VAR_DATA_TYPE(pCol->info.bytes)) {
qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pColList[i].slotId, pCol->info.bytes, pColList[i].bytes);
qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pColList[i].srcSlot, pCol->info.bytes, pColList[i].bytes);
return TSDB_CODE_INVALID_PARA;
}
pColList[i].data = pCol->pData;
@ -341,11 +407,13 @@ FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo*
}
}
FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf) {
FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf, SBufRowInfo* pRow) {
do {
SBufPageInfo* page = taosArrayGetLast(pPages);
if ((page->pageSize - page->offset) >= bufSize) {
*pBuf = page->data + page->offset;
pRow->pageId = taosArrayGetSize(pPages) - 1;
pRow->offset = page->offset;
page->offset += bufSize;
return TSDB_CODE_SUCCESS;
}
@ -357,25 +425,58 @@ FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char**
} while (true);
}
int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SResRowData* pRes, SJoinTableInfo* pTable, char** pBuf, size_t keyLen) {
SResRowData res = {0};
FORCE_INLINE int32_t getJoinValBufSize(SJoinTableInfo* pTable, int32_t rowIdx) {
if (!pTable->valVarData) {
return pTable->valBufSize;
}
if (NULL == pRes) {
res.rows = taosMemoryMalloc(sizeof(SBufRowInfo));
if (NULL == res.rows) {
int32_t bufLen = 0;
for (int32_t i = 0; i < pTable->valNum; ++i) {
if (pTable->valCols[i].vardata) {
char* pData = pTable->valCols[i].data + pTable->valCols[i].offset[rowIdx];
bufLen += varDataTLen(pData);
} else {
bufLen += pTable->valCols[i].bytes;
}
}
return bufLen;
}
int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SGroupData* pGroup, SJoinTableInfo* pTable, char** pBuf, size_t keyLen, int32_t rowIdx) {
SGroupData group = {0};
SBufRowInfo* pRow = NULL;
if (NULL == pGroup) {
group.rows = taosMemoryMalloc(sizeof(SBufRowInfo));
if (NULL == group.rows) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pRow = group.rows;
} else {
pRow = taosMemoryMalloc(sizeof(SBufRowInfo))
if (NULL == pRow) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(), pBuf);
int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(pTable, rowIdx), pBuf, pRow);
if (code) {
return code;
}
if (NULL == pRes && tSimpleHashPut(pHash, pTable->keyBuf, keyLen, &res, sizeof(res))) {
return TSDB_CODE_OUT_OF_MEMORY;
if (NULL == pGroup) {
pRow->next = NULL;
if (tSimpleHashPut(pHash, pTable->keyBuf, keyLen, &group, sizeof(group))) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
pRow->next = pGroup->rows;
pGroup->rows = pRow;
}
return TSDB_CODE_SUCCESS;
}
int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, char* pKey, size_t keyLen, int32_t rowIdx) {
@ -386,8 +487,8 @@ int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, char* pKey,
}
char *valBuf = NULL;
SResRowData* pRes = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyBuf, keyLen);
code = getJoinValBuf(pJoin->pKeyHash, pRes, pBuild, &valBuf, keyLen);
SGroupData* pRes = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyBuf, keyLen);
code = getJoinValBuf(pJoin->pKeyHash, pRes, pBuild, &valBuf, keyLen, rowIdx);
if (code) {
return code;
}
@ -436,39 +537,81 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
void launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SJoinTableInfo* pProbe = pJoin->pProbe;
int32_t code = setColBufInfo(pBlock, pProbe->keyNum, pProbe->keyCols);
if (code) {
return code;
}
code = setColBufInfo(pBlock, pProbe->valNum, pProbe->valCols);
if (code) {
return code;
}
pJoin->ctx.probeIdx = 0;
pJoin->ctx.pBuildRow = NULL;
pJoin->ctx.pProbeData = pBlock;
doHashJoinImpl(pOperator);
}
SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoinInfo = pOperator->info;
SHJoinOperatorInfo* pJoin = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pRes = pJoinInfo->pRes;
SSDataBlock* pRes = pJoin->pRes;
blockDataCleanup(pRes);
if (NULL == pJoinInfo->pKeyHash) {
code = buildJoinKeyHash(pJoinInfo);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
if (NULL == pJoin->pKeyHash) {
code = buildJoinKeyHash(pJoin);
if (code) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
}
if (tSimpleHashGetSize(pJoinInfo->pKeyHash) <= 0) {
if (tSimpleHashGetSize(pJoin->pKeyHash) <= 0) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
}
if (pJoin->ctx.rowRemains) {
doHashJoinImpl(pOperator);
if (pRes->info.rows >= pOperator->resultInfo.threshold && pOperator->exprSupp.pFilterInfo != NULL) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
if (pRes->info.rows > 0) {
return pRes;
}
}
while (true) {
int32_t numOfRowsBefore = pRes->info.rows;
doHashJoinImpl(pOperator, pRes);
int32_t numOfNewRows = pRes->info.rows - numOfRowsBefore;
if (numOfNewRows == 0) {
SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream);
if (NULL == pBlock) {
break;
}
launchBlockHashJoin(pOperator, pBlock);
if (pRes->info.rows < pOperator->resultInfo.threshold) {
continue;
}
if (pOperator->exprSupp.pFilterInfo != NULL) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
if (pRes->info.rows > 0) {
break;
}
}
return (pRes->info.rows > 0) ? pRes : NULL;
}