fix: build hash table is created during creating join operator
This commit is contained in:
parent
ee16b46112
commit
14d4a81e75
|
@ -35,7 +35,7 @@ typedef struct SJoinRowCtx {
|
||||||
int32_t leftRowIdx;
|
int32_t leftRowIdx;
|
||||||
int32_t rightRowIdx;
|
int32_t rightRowIdx;
|
||||||
|
|
||||||
SSHashObj* buildTableTSRange;
|
bool rightUseBuildTable;
|
||||||
SArray* rightRowLocations;
|
SArray* rightRowLocations;
|
||||||
} SJoinRowCtx;
|
} SJoinRowCtx;
|
||||||
|
|
||||||
|
@ -62,6 +62,7 @@ typedef struct SJoinOperatorInfo {
|
||||||
char* rightTagKeyBuf;
|
char* rightTagKeyBuf;
|
||||||
int32_t rightTagKeyLen;
|
int32_t rightTagKeyLen;
|
||||||
|
|
||||||
|
SSHashObj* rightBuildTable;
|
||||||
SJoinRowCtx rowCtx;
|
SJoinRowCtx rowCtx;
|
||||||
} SJoinOperatorInfo;
|
} SJoinOperatorInfo;
|
||||||
|
|
||||||
|
@ -265,6 +266,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
||||||
extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols);
|
extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols);
|
||||||
initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols);
|
initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols);
|
||||||
initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols);
|
initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols);
|
||||||
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
pInfo->rightBuildTable = tSimpleHashInit(256, hashFn);
|
||||||
}
|
}
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL);
|
||||||
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||||
|
@ -292,9 +295,22 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) {
|
||||||
pColumn->scale = pColumnNode->node.resType.scale;
|
pColumn->scale = pColumnNode->node.resType.scale;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
|
||||||
|
void* p = NULL;
|
||||||
|
int32_t iter = 0;
|
||||||
|
|
||||||
|
while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) {
|
||||||
|
SArray* rows = (*(SArray**)p);
|
||||||
|
taosArrayDestroy(rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
tSimpleHashCleanup(pBuildTable);
|
||||||
|
}
|
||||||
|
|
||||||
void destroyMergeJoinOperator(void* param) {
|
void destroyMergeJoinOperator(void* param) {
|
||||||
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
|
SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param;
|
||||||
if (pJoinOperator->pTagEqualConditions != NULL) {
|
if (pJoinOperator->pTagEqualConditions != NULL) {
|
||||||
|
mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable);
|
||||||
taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf);
|
taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf);
|
||||||
taosArrayDestroy(pJoinOperator->rightTagCols);
|
taosArrayDestroy(pJoinOperator->rightTagCols);
|
||||||
|
|
||||||
|
@ -420,10 +436,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj** ppHashObj) {
|
static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj* buildTable) {
|
||||||
int32_t buildTableCap = TMIN(taosArrayGetSize(rightRowLocations), 256);
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
|
||||||
SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn);
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) {
|
||||||
SRowLocation* rightRow = taosArrayGet(rightRowLocations, i);
|
SRowLocation* rightRow = taosArrayGet(rightRowLocations, i);
|
||||||
int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf);
|
int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf);
|
||||||
|
@ -436,13 +449,12 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right
|
||||||
taosArrayPush(*ppRows, rightRow);
|
taosArrayPush(*ppRows, rightRow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*ppHashObj = buildTable;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows,
|
static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows,
|
||||||
const SArray* leftRowLocations, int32_t leftRowIdx,
|
const SArray* leftRowLocations, int32_t leftRowIdx,
|
||||||
int32_t rightRowIdx, SSHashObj* rightTableHash, SArray* rightRowLocations, bool* pReachThreshold) {
|
int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) {
|
||||||
*pReachThreshold = false;
|
*pReachThreshold = false;
|
||||||
uint32_t limitRowNum = pOperator->resultInfo.threshold;
|
uint32_t limitRowNum = pOperator->resultInfo.threshold;
|
||||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
@ -453,9 +465,9 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
|
for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) {
|
||||||
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
|
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
|
||||||
SArray* pRightRows = NULL;
|
SArray* pRightRows = NULL;
|
||||||
if (rightTableHash != NULL) {
|
if (useBuildTableTSRange) {
|
||||||
int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf);
|
int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf);
|
||||||
SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen);
|
SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftTagKeyBuf, keyLen);
|
||||||
if (!ppRightRows) {
|
if (!ppRightRows) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -488,20 +500,8 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) {
|
|
||||||
void* p = NULL;
|
|
||||||
int32_t iter = 0;
|
|
||||||
|
|
||||||
while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) {
|
|
||||||
SArray* rows = (*(SArray**)p);
|
|
||||||
taosArrayDestroy(rows);
|
|
||||||
}
|
|
||||||
|
|
||||||
tSimpleHashCleanup(pBuildTable);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks,
|
static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks,
|
||||||
SArray* rightCreatedBlocks, SSHashObj* rightTableHash, SArray* rightRowLocations) {
|
SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) {
|
||||||
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
|
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
|
||||||
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
|
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
|
||||||
blockDataDestroy(pBlock);
|
blockDataDestroy(pBlock);
|
||||||
|
@ -514,8 +514,8 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef
|
||||||
if (rightRowLocations != NULL) {
|
if (rightRowLocations != NULL) {
|
||||||
taosArrayDestroy(rightRowLocations);
|
taosArrayDestroy(rightRowLocations);
|
||||||
}
|
}
|
||||||
if (rightTableHash != NULL) {
|
if (rightUseBuildTable) {
|
||||||
mergeJoinDestoryBuildTable(rightTableHash);
|
tSimpleHashClear(pJoinInfo->rightBuildTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(leftCreatedBlocks);
|
taosArrayDestroy(leftCreatedBlocks);
|
||||||
|
@ -525,7 +525,7 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef
|
||||||
pJoinInfo->rowCtx.leftRowLocations = NULL;
|
pJoinInfo->rowCtx.leftRowLocations = NULL;
|
||||||
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
|
pJoinInfo->rowCtx.leftCreatedBlocks = NULL;
|
||||||
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
|
pJoinInfo->rowCtx.rightCreatedBlocks = NULL;
|
||||||
pJoinInfo->rowCtx.buildTableTSRange = NULL;
|
pJoinInfo->rowCtx.rightUseBuildTable = false;
|
||||||
pJoinInfo->rowCtx.rightRowLocations = NULL;
|
pJoinInfo->rowCtx.rightRowLocations = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -540,11 +540,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
|
||||||
int32_t leftRowIdx = 0;
|
int32_t leftRowIdx = 0;
|
||||||
int32_t rightRowIdx = 0;
|
int32_t rightRowIdx = 0;
|
||||||
SSHashObj* rightTableHash = NULL;
|
SSHashObj* rightTableHash = NULL;
|
||||||
|
bool rightUseBuildTable = false;
|
||||||
|
|
||||||
if (pJoinInfo->rowCtx.rowRemains) {
|
if (pJoinInfo->rowCtx.rowRemains) {
|
||||||
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
|
leftRowLocations = pJoinInfo->rowCtx.leftRowLocations;
|
||||||
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
|
leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks;
|
||||||
rightTableHash = pJoinInfo->rowCtx.buildTableTSRange;
|
rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable;
|
||||||
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
|
rightRowLocations = pJoinInfo->rowCtx.rightRowLocations;
|
||||||
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
|
rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks;
|
||||||
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
|
leftRowIdx = pJoinInfo->rowCtx.leftRowIdx;
|
||||||
|
@ -561,7 +562,8 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
|
||||||
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
|
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
|
||||||
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
|
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
|
||||||
if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
|
if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) {
|
||||||
mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash);
|
mergeJoinFillBuildTable(pJoinInfo, rightRowLocations, pJoinInfo->rightBuildTable);
|
||||||
|
rightUseBuildTable = true;
|
||||||
taosArrayDestroy(rightRowLocations);
|
taosArrayDestroy(rightRowLocations);
|
||||||
rightRowLocations = NULL;
|
rightRowLocations = NULL;
|
||||||
}
|
}
|
||||||
|
@ -578,12 +580,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
|
mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx,
|
||||||
rightRowIdx, rightTableHash, rightRowLocations, &reachThreshold);
|
rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!reachThreshold) {
|
if (!reachThreshold) {
|
||||||
mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
|
mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks,
|
||||||
rightTableHash, rightRowLocations);
|
rightUseBuildTable, rightRowLocations);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
pJoinInfo->rowCtx.rowRemains = true;
|
pJoinInfo->rowCtx.rowRemains = true;
|
||||||
|
@ -591,7 +593,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
|
||||||
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
|
pJoinInfo->rowCtx.leftRowLocations = leftRowLocations;
|
||||||
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
|
pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks;
|
||||||
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
|
pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks;
|
||||||
pJoinInfo->rowCtx.buildTableTSRange = rightTableHash;
|
pJoinInfo->rowCtx.rightUseBuildTable = true;
|
||||||
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
|
pJoinInfo->rowCtx.rightRowLocations = rightRowLocations;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue