fix: hash join res issue

This commit is contained in:
dapan1121 2023-07-25 09:03:52 +08:00
parent 4f75eb5655
commit fb63294289
7 changed files with 102 additions and 17 deletions

View File

@ -83,6 +83,16 @@ typedef struct SHJoinTableInfo {
bool valColExist;
} SHJoinTableInfo;
typedef struct SHJoinExecInfo {
int64_t buildBlkNum;
int64_t buildBlkRows;
int64_t probeBlkNum;
int64_t probeBlkRows;
int64_t resRows;
int64_t expectRows;
} SHJoinExecInfo;
typedef struct SHJoinOperatorInfo {
int32_t joinType;
SHJoinTableInfo tbs[2];
@ -96,6 +106,7 @@ typedef struct SHJoinOperatorInfo {
SSHashObj* pKeyHash;
bool keyHashBuilt;
SHJoinCtx ctx;
SHJoinExecInfo execInfo;
} SHJoinOperatorInfo;
#ifdef __cplusplus

View File

@ -39,6 +39,7 @@ typedef struct STaskIdInfo {
uint64_t templateId;
char* str;
int32_t vgId;
uint64_t taskId;
} STaskIdInfo;
typedef struct STaskCostInfo {

View File

@ -148,6 +148,8 @@ static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppR
basic.tableSeq = false;
tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic));
qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList));
}
(*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;

View File

@ -86,7 +86,8 @@ static void destroyGroupCacheOperator(void* param) {
}
static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) {
TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
// TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE);
if (NULL == newFd) {
return TAOS_SYSTEM_ERROR(errno);
}
@ -158,7 +159,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
releaseFdToFileCtx(pFd);
qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64,
pHead->basic.fileId, pHead->basic.blkId, pHead->basic.offset, pHead->basic.bufSize);
pHead->basic.fileId, pHead->basic.blkId, pHead->basic.bufSize, pHead->basic.offset);
int64_t blkId = pHead->basic.blkId;
pHead = pHead->next;
@ -347,7 +348,7 @@ static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheDa
}
qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64,
pBasic->fileId, pBasic->blkId, pBasic->offset, pBasic->bufSize);
pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset);
_return:
@ -387,7 +388,8 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC
static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
pVgCtx->pTbList = pTbList;
snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId);
snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d",
tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId);
pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0;
pVgCtx->fileCtx.baseNameLen = strlen(pVgCtx->fileCtx.baseFilename);
@ -857,6 +859,7 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
if (NULL == pCache->pReadBlk) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCache->writeDownstreamId = -1;
return TSDB_CODE_SUCCESS;
}
@ -982,7 +985,8 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
return TSDB_CODE_OUT_OF_MEMORY;
}
snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id);
snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d",
tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pCtx->id);
pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0;
pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename);
}
@ -1018,7 +1022,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->maxCacheSize = 104857600;
pInfo->maxCacheSize = 1;
pInfo->grpByUid = pPhyciNode->grpByUid;
pInfo->globalGrp = pPhyciNode->globalGrp;
pInfo->batchFetch = pPhyciNode->batchFetch;

View File

@ -27,6 +27,31 @@
#include "ttypes.h"
#include "hashjoin.h"
static int64_t getSingleKeyRowsNum(SBufRowInfo* pRow) {
int64_t rows = 0;
while (pRow) {
rows++;
pRow = pRow->next;
}
return rows;
}
static int64_t getRowsNumOfKeyHash(SSHashObj* pHash) {
SGroupData* pGroup = NULL;
int32_t iter = 0;
int64_t rowsNum = 0;
while (pGroup = tSimpleHashIterate(pHash, pGroup, &iter)) {
int32_t* pKey = tSimpleHashGetKey(pGroup, NULL);
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
qTrace("build_key:%d, rows:%" PRId64, *pKey, rows);
rowsNum += rows;
}
return rowsNum;
}
static int32_t initHJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) {
pTable->keyNum = LIST_LENGTH(pList);
@ -274,6 +299,9 @@ static void destroyHJoinKeyHash(SSHashObj** ppHash) {
static void destroyHashJoinOperator(void* param) {
SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param;
qError("hashJoin exec info, buildBlk:%" PRId64 ", buildRows:%" PRId64 ", probeBlk:%" PRId64 ", probeRows:%" PRId64 ", resRows:%" PRId64,
pJoinOperator->execInfo.buildBlkNum, pJoinOperator->execInfo.buildBlkRows, pJoinOperator->execInfo.probeBlkNum,
pJoinOperator->execInfo.probeBlkRows, pJoinOperator->execInfo.resRows);
destroyHJoinKeyHash(&pJoinOperator->pKeyHash);
@ -348,19 +376,21 @@ static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, i
}
static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinCtx* pCtx = &pJoin->ctx;
SBufRowInfo* pStart = pCtx->pBuildRow;
int32_t rowNum = 0;
int32_t resNum = pRes->info.rows;
while (pCtx->pBuildRow && resNum < pRes->info.capacity) {
while (pCtx->pBuildRow && (resNum < pRes->info.capacity)) {
rowNum++;
resNum++;
pCtx->pBuildRow = pCtx->pBuildRow->next;
}
pJoin->execInfo.resRows += rowNum;
int32_t code = copyHJoinResRowsToBlock(pJoin, rowNum, pStart, pRes);
if (code) {
pOperator->pTaskInfo->code = code;
@ -368,7 +398,7 @@ static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator,
}
pRes->info.rows = resNum;
pCtx->rowRemains = pCtx->pBuildRow ? true : false;
*allFetched = pCtx->pBuildRow ? false : true;
}
@ -412,23 +442,44 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) {
SHJoinCtx* pCtx = &pJoin->ctx;
SSDataBlock* pRes = pJoin->pRes;
size_t bufLen = 0;
bool allFetched = false;
if (pJoin->ctx.pBuildRow) {
appendHJoinResToBlock(pOperator, pRes);
appendHJoinResToBlock(pOperator, pRes, &allFetched);
if (pRes->info.rows >= pRes->info.capacity) {
if (allFetched) {
++pCtx->probeIdx;
}
return;
} else {
++pCtx->probeIdx;
}
}
for (; pCtx->probeIdx < pCtx->pProbeData->info.rows; ++pCtx->probeIdx) {
copyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen);
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
*/
if (pGroup) {
pCtx->pBuildRow = pGroup->rows;
appendHJoinResToBlock(pOperator, pRes);
appendHJoinResToBlock(pOperator, pRes, &allFetched);
if (pRes->info.rows >= pRes->info.capacity) {
if (allFetched) {
++pCtx->probeIdx;
}
return;
}
} else {
qTrace("no key matched");
}
}
@ -649,6 +700,9 @@ static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) {
break;
}
pJoin->execInfo.buildBlkNum++;
pJoin->execInfo.buildBlkRows += pBlock->info.rows;
code = addBlockRowsToHash(pBlock, pJoin);
if (code) {
return code;
@ -673,6 +727,7 @@ static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock*
pJoin->ctx.probeIdx = 0;
pJoin->ctx.pBuildRow = NULL;
pJoin->ctx.pProbeData = pBlock;
pJoin->ctx.rowRemains = true;
doHashJoinImpl(pOperator);
@ -718,12 +773,14 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
setHJoinDone(pOperator);
goto _return;
}
//qTrace("build table rows:%" PRId64, getRowsNumOfKeyHash(pJoin->pKeyHash));
}
if (pJoin->ctx.rowRemains) {
doHashJoinImpl(pOperator);
if (pRes->info.rows >= pOperator->resultInfo.threshold && pOperator->exprSupp.pFilterInfo != NULL) {
if (pRes->info.rows >= pRes->info.capacity && pOperator->exprSupp.pFilterInfo != NULL) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
if (pRes->info.rows > 0) {
@ -737,6 +794,9 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) {
setHJoinDone(pOperator);
break;
}
pJoin->execInfo.probeBlkNum++;
pJoin->execInfo.probeBlkRows += pBlock->info.rows;
code = launchBlockHashJoin(pOperator, pBlock);
if (code) {

View File

@ -54,6 +54,7 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP
pTaskInfo->id.vgId = vgId;
pTaskInfo->id.queryId = queryId;
pTaskInfo->id.taskId = taskId;
pTaskInfo->id.str = taosMemoryMalloc(64);
buildTaskId(taskId, queryId, pTaskInfo->id.str);
return pTaskInfo;

View File

@ -803,18 +803,24 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
pListInfo->oneTableForEachGroup = false;
pListInfo->numOfOuputGroups = 1;
}
STableKeyInfo info = {.groupId = 0};
for (int32_t i = 0; i < num; ++i) {
uint64_t* pUid = taosArrayGet(pParam->pUidList, i);
STableKeyInfo info = {.uid = *pUid, .groupId = 0};
if (taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &i, sizeof(int32_t))) {
if (TSDB_CODE_DUP_KEY == terrno) {
continue;
}
return TSDB_CODE_OUT_OF_MEMORY;
}
info.uid = *pUid;
void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &i, sizeof(int32_t));
qError("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo));
}
@ -3596,4 +3602,4 @@ static void destoryTableCountScanOperator(void* param) {
taosArrayDestroy(pTableCountScanInfo->stbUidList);
taosMemoryFreeClear(param);
}
}