feat: add buffer page
This commit is contained in:
parent
e6afcbf07f
commit
1a53a9084e
|
@ -45,13 +45,37 @@ typedef struct SColBufInfo {
|
||||||
char* data;
|
char* data;
|
||||||
} SColBufInfo;
|
} SColBufInfo;
|
||||||
|
|
||||||
|
typedef struct SBufPageInfo {
|
||||||
|
int32_t pageSize;
|
||||||
|
int32_t offset;
|
||||||
|
char* data;
|
||||||
|
} SBufPageInfo;
|
||||||
|
|
||||||
|
#pragma pack(push, 1)
|
||||||
|
typedef struct SBufRowInfo {
|
||||||
|
void* next;
|
||||||
|
uint16_t pageId;
|
||||||
|
int32_t offset;
|
||||||
|
} SBufRowInfo;
|
||||||
|
#pragma pack(pop)
|
||||||
|
|
||||||
|
typedef struct SResRowData {
|
||||||
|
SBufRowInfo* rows;
|
||||||
|
} SResRowData;
|
||||||
|
|
||||||
typedef struct SJoinTableInfo {
|
typedef struct SJoinTableInfo {
|
||||||
|
SOperatorInfo* downStream;
|
||||||
|
int32_t blkId;
|
||||||
|
SQueryStat inputStat;
|
||||||
|
|
||||||
int32_t keyNum;
|
int32_t keyNum;
|
||||||
SColBufInfo* keyCols;
|
SColBufInfo* keyCols;
|
||||||
char* keyBuf;
|
char* keyBuf;
|
||||||
|
|
||||||
int32_t valNum;
|
int32_t valNum;
|
||||||
SColBufInfo* valCols;
|
SColBufInfo* valCols;
|
||||||
char* valBuf;
|
int32_t valBufSize;
|
||||||
|
bool valVarData;
|
||||||
} SJoinTableInfo;
|
} SJoinTableInfo;
|
||||||
|
|
||||||
typedef struct SHJoinOperatorInfo {
|
typedef struct SHJoinOperatorInfo {
|
||||||
|
@ -62,26 +86,12 @@ typedef struct SHJoinOperatorInfo {
|
||||||
|
|
||||||
SJoinTableInfo* pBuild;
|
SJoinTableInfo* pBuild;
|
||||||
SJoinTableInfo* pProbe;
|
SJoinTableInfo* pProbe;
|
||||||
|
SArray* pRowBufs;
|
||||||
int32_t pLeftKeyNum;
|
|
||||||
SColBufInfo* pLeftKeyInfo;
|
|
||||||
char* pLeftKeyBuf;
|
|
||||||
int32_t pLeftValNum;
|
|
||||||
SColBufInfo* pLeftValInfo;
|
|
||||||
char* pLeftValBuf;
|
|
||||||
|
|
||||||
int32_t pRightKeyNum;
|
|
||||||
SColBufInfo* pRightKeyInfo;
|
|
||||||
char* pRightKeyBuf;
|
|
||||||
int32_t pRightValNum;
|
|
||||||
SColBufInfo* pRightValInfo;
|
|
||||||
char* pRightValBuf;
|
|
||||||
|
|
||||||
SNode* pCondAfterJoin;
|
SNode* pCondAfterJoin;
|
||||||
|
|
||||||
SSHashObj* pKeyHash;
|
SSHashObj* pKeyHash;
|
||||||
|
|
||||||
SQueryStat inputStat[2];
|
|
||||||
|
|
||||||
SHJoinRowCtx rowCtx;
|
SHJoinRowCtx rowCtx;
|
||||||
} SHJoinOperatorInfo;
|
} SHJoinOperatorInfo;
|
||||||
|
|
|
@ -36,9 +36,9 @@ int32_t initJoinKeyBufInfo(SColBufInfo** ppInfo, int32_t* colNum, SNodeList* pLi
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t bufSize = 0;
|
int64_t bufSize = 0;
|
||||||
for (int32_t i = 0; i < *colNum; ++i) {
|
SNode* pNode = NULL;
|
||||||
SColumnNode* pColNode = (SColumnNode*)nodesListGetNode(pList, i);
|
FOREACH(pNode, pList) {
|
||||||
|
SColumnNode* pColNode = (SColumnNode*)pNode;
|
||||||
(*ppInfo)->slotId = pColNode->slotId;
|
(*ppInfo)->slotId = pColNode->slotId;
|
||||||
(*ppInfo)->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
|
(*ppInfo)->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
|
||||||
(*ppInfo)->bytes = pColNode->node.resType.bytes;
|
(*ppInfo)->bytes = pColNode->node.resType.bytes;
|
||||||
|
@ -53,15 +53,101 @@ int32_t initJoinKeyBufInfo(SColBufInfo** ppInfo, int32_t* colNum, SNodeList* pLi
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SNodeList* pKeyList, int32_t idx) {
|
void getJoinValColNum(SNodeList* pList, int32_t blkId, int32_t* colNum) {
|
||||||
|
*colNum = 0;
|
||||||
|
|
||||||
|
SNode* pNode = NULL;
|
||||||
|
FOREACH(pNode, pList) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
if (pCol->dataBlockId == blkId) {
|
||||||
|
(*colNum)++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t initJoinValBufInfo(SJoinTableInfo* pTable, SNodeList* pList) {
|
||||||
|
getJoinValColNum(pList, pTable->blkId, &pTable->valNum);
|
||||||
|
if (pTable->valNum <= 0) {
|
||||||
|
qError("fail to get join value column, num:%d", pTable->valNum);
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTable->valCols = taosMemoryMalloc(pTable->valNum * sizeof(SColBufInfo));
|
||||||
|
if (NULL == pTable->valCols) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pNode = NULL;
|
||||||
|
FOREACH(pNode, pList) {
|
||||||
|
SColumnNode* pColNode = (SColumnNode*)pNode;
|
||||||
|
if (pColNode->dataBlockId == pTable->blkId) {
|
||||||
|
pTable->valCols->slotId = pColNode->slotId;
|
||||||
|
pTable->valCols->vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
|
||||||
|
if (pTable->valCols->vardata) {
|
||||||
|
pTable->valVarData = true;
|
||||||
|
}
|
||||||
|
pTable->valCols->bytes = pColNode->node.resType.bytes;
|
||||||
|
pTable->valBufSize += pColNode->node.resType.bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t initJoinTableInfo(SHJoinOperatorInfo* pJoin, SHashJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
|
||||||
|
SNodeList* pKeyList = NULL;
|
||||||
SJoinTableInfo* pTable = &pJoin->tbs[idx];
|
SJoinTableInfo* pTable = &pJoin->tbs[idx];
|
||||||
|
pTable->downStream = pDownstream[idx];
|
||||||
|
pTable->blkId = pDownstream[idx]->resultDataBlockId;
|
||||||
|
if (0 == idx) {
|
||||||
|
pKeyList = pJoinNode->pOnLeft;
|
||||||
|
} else {
|
||||||
|
pKeyList = pJoinNode->pOnRight;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = initJoinKeyBufInfo(&pTable->keyCols, &pTable->keyNum, pKeyList, &pTable->keyBuf);
|
int32_t code = initJoinKeyBufInfo(&pTable->keyCols, &pTable->keyNum, pKeyList, &pTable->keyBuf);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
int32_t code = initJoinValBufInfo(&pTable->keyCols, &pTable->keyNum, pJoinNode->pTargets, &pTable->keyBuf, pTable->blkId, pTable);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
memcpy(&pTable->inputStat, pStat, sizeof(*pStat));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) {
|
||||||
|
pInfo->pBuild = &pInfo->tbs[1];
|
||||||
|
pInfo->pProbe = &pInfo->tbs[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
FORCE_INLINE int32_t addPageToJoinBuf(SArray* pRowBufs) {
|
||||||
|
SBufPageInfo page;
|
||||||
|
page.pageSize = HASH_JOIN_DEFAULT_PAGE_SIZE;
|
||||||
|
page.offset = 0;
|
||||||
|
page.data = taosMemoryMalloc(page.pageSize);
|
||||||
|
if (NULL == page.data) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pRowBufs, &page);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t initJoinBufPages(SHJoinOperatorInfo* pInfo) {
|
||||||
|
pInfo->pRowBufs = taosArrayInit(32, sizeof(SBufPageInfo));
|
||||||
|
if (NULL == pInfo->pRowBufs) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
return addPageToJoinBuf(pInfo->pRowBufs);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||||
SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
|
SHashJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
|
||||||
SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo));
|
SHJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SHJoinOperatorInfo));
|
||||||
|
@ -84,17 +170,17 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n
|
||||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||||
|
|
||||||
initJoinTableInfo(pJoinNode, pJoinNode->pOnLeft, 0);
|
initJoinTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
|
||||||
initJoinTableInfo(pJoinNode, pJoinNode->pOnRight, 1);
|
initJoinTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
|
||||||
|
|
||||||
code = initJoinColBufInfo(&pInfo->pRightKeyInfo, &pInfo->pRightKeyNum, pJoinNode->pOnRight, &pInfo->pRightKeyBuf);
|
setJoinBuildAndProbeTable(pInfo, pJoinNode);
|
||||||
|
|
||||||
|
code = initJoinBufPages(pInfo);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pInfo->inputStat, pJoinNode->inputStat, sizeof(pJoinNode->inputStat));
|
size_t hashCap = pInfo->pBuild->inputStat.inputRowNum > 0 ? (pInfo->pBuild->inputStat.inputRowNum * 1.5) : 1024;
|
||||||
|
|
||||||
size_t hashCap = pInfo->inputStat[1].inputRowNum > 0 ? (pInfo->inputStat[1].inputRowNum * 1.5) : 1024;
|
|
||||||
pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
|
pInfo->pKeyHash = tSimpleHashInit(hashCap, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
|
||||||
if (pInfo->pKeyHash == NULL) {
|
if (pInfo->pKeyHash == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -234,32 +320,100 @@ int32_t setColBufInfo(SSDataBlock* pBlock, int32_t colNum, SColBufInfo* pColList
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo* pColList, char* pBuf, size_t *bufLen) {
|
FORCE_INLINE void copyColDataToBuf(int32_t colNum, int32_t rowIdx, SColBufInfo* pColList, char* pBuf, size_t *pBufLen) {
|
||||||
char *pData = NULL;
|
char *pData = NULL;
|
||||||
|
|
||||||
*bufLen = 0;
|
size_t bufLen = 0;
|
||||||
for (int32_t i = 0; i < colNum; ++i) {
|
for (int32_t i = 0; i < colNum; ++i) {
|
||||||
if (pColList[i].vardata) {
|
if (pColList[i].vardata) {
|
||||||
pData = pColList[i].data + pColList[i].offset[rowIdx];
|
pData = pColList[i].data + pColList[i].offset[rowIdx];
|
||||||
memcpy(pBuf + *bufLen, pData, varDataTLen(pData));
|
memcpy(pBuf + bufLen, pData, varDataTLen(pData));
|
||||||
*bufLen += varDataTLen(pData);
|
bufLen += varDataTLen(pData);
|
||||||
}
|
} else {
|
||||||
|
pData = pColList[i].data + pColList[i].bytes * rowIdx;
|
||||||
|
memcpy(pBuf + bufLen, pColList[i].data, pColList[i].bytes);
|
||||||
|
bufLen += pColList[i].bytes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pBufLen) {
|
||||||
|
*pBufLen = bufLen;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t addBlockRowsKeyToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) {
|
FORCE_INLINE int32_t getValBufFromPages(SArray* pPages, int32_t bufSize, char** pBuf) {
|
||||||
int32_t code = setColBufInfo(pBlock, pJoin->pRightKeyNum, pJoin->pRightKeyInfo);
|
do {
|
||||||
|
SBufPageInfo* page = taosArrayGetLast(pPages);
|
||||||
|
if ((page->pageSize - page->offset) >= bufSize) {
|
||||||
|
*pBuf = page->data + page->offset;
|
||||||
|
page->offset += bufSize;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = addPageToJoinBuf(pPages);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} while (true);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t getJoinValBuf(SHJoinOperatorInfo* pJoin, SSHashObj* pHash, SResRowData* pRes, SJoinTableInfo* pTable, char** pBuf, size_t keyLen) {
|
||||||
|
SResRowData res = {0};
|
||||||
|
|
||||||
|
if (NULL == pRes) {
|
||||||
|
res.rows = taosMemoryMalloc(sizeof(SBufRowInfo));
|
||||||
|
if (NULL == res.rows) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = getValBufFromPages(pJoin->pRowBufs, getJoinValBufSize(), pBuf);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pRes && tSimpleHashPut(pHash, pTable->keyBuf, keyLen, &res, sizeof(res))) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t addRowToHash(SHJoinOperatorInfo* pJoin, SSDataBlock* pBlock, char* pKey, size_t keyLen, int32_t rowIdx) {
|
||||||
|
SJoinTableInfo* pBuild = pJoin->pBuild;
|
||||||
|
int32_t code = setColBufInfo(pBlock, pBuild->valNum, pBuild->valCols);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *valBuf = NULL;
|
||||||
|
SResRowData* pRes = tSimpleHashGet(pJoin->pKeyHash, pBuild->keyBuf, keyLen);
|
||||||
|
code = getJoinValBuf(pJoin->pKeyHash, pRes, pBuild, &valBuf, keyLen);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
copyColDataToBuf(pBuild->valNum, rowIdx, pBuild->valCols, valBuf, NULL);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin) {
|
||||||
|
SJoinTableInfo* pBuild = pJoin->pBuild;
|
||||||
|
int32_t code = setColBufInfo(pBlock, pBuild->keyNum, pBuild->keyCols);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t bufLen = 0;
|
size_t bufLen = 0;
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
copyColDataToBuf(pJoin->pRightKeyNum, i, pJoin->pRightKeyInfo, pJoin->pRightKeyBuf, &bufLen);
|
copyColDataToBuf(pBuild->keyNum, i, pBuild->keyCols, pBuild->keyBuf, &bufLen);
|
||||||
|
code = addRowToHash(pJoin, pBlock, pBuild->keyBuf, bufLen, i);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tSimpleHashPut(pJoin->pKeyHash, pJoin->pRightKeyBuf, bufLen, NULL, 0);
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
|
int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
|
||||||
|
@ -268,12 +422,12 @@ int32_t buildJoinKeyHash(struct SOperatorInfo* pOperator) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
pBlock = pOperator->pDownstream[1]->fpSet.getNextFn(pOperator->pDownstream[1]);
|
pBlock = pJoin->pBuild->downStream->fpSet.getNextFn(pJoin->pBuild->downStream);
|
||||||
if (NULL == pBlock) {
|
if (NULL == pBlock) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = addBlockRowsKeyToHash(pBlock, pJoin);
|
code = addBlockRowsToHash(pBlock, pJoin->pKeyHash, pJoin->pBuild);
|
||||||
if (code) {
|
if (code) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue