diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index 751b76b0ef..6822333d0a 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -83,6 +83,16 @@ typedef struct SHJoinTableInfo { bool valColExist; } SHJoinTableInfo; +typedef struct SHJoinExecInfo { + int64_t buildBlkNum; + int64_t buildBlkRows; + int64_t probeBlkNum; + int64_t probeBlkRows; + int64_t resRows; + int64_t expectRows; +} SHJoinExecInfo; + + typedef struct SHJoinOperatorInfo { int32_t joinType; SHJoinTableInfo tbs[2]; @@ -96,6 +106,7 @@ typedef struct SHJoinOperatorInfo { SSHashObj* pKeyHash; bool keyHashBuilt; SHJoinCtx ctx; + SHJoinExecInfo execInfo; } SHJoinOperatorInfo; #ifdef __cplusplus diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 68acf07aee..590ac349d0 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -39,6 +39,7 @@ typedef struct STaskIdInfo { uint64_t templateId; char* str; int32_t vgId; + uint64_t taskId; } STaskIdInfo; typedef struct STaskCostInfo { diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index b323ad7874..49cada2549 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -148,6 +148,8 @@ static FORCE_INLINE int32_t buildBatchExchangeOperatorParam(SOperatorParam** ppR basic.tableSeq = false; tSimpleHashPut(pExc->pBatchs, pVgId, sizeof(*pVgId), &basic, sizeof(basic)); + + qTrace("build downstreamIdx %d batch scan, vgId:%d, uidNum:%" PRId64, downstreamIdx, *pVgId, (int64_t)taosArrayGetSize(pUidList)); } (*ppRes)->opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 3bd071d0d1..a6325e71b7 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -86,7 +86,8 @@ static void destroyGroupCacheOperator(void* param) { } static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) { - TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); +// TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); + TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE); if (NULL == newFd) { return TAOS_SYSTEM_ERROR(errno); } @@ -158,7 +159,7 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC releaseFdToFileCtx(pFd); qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, - pHead->basic.fileId, pHead->basic.blkId, pHead->basic.offset, pHead->basic.bufSize); + pHead->basic.fileId, pHead->basic.blkId, pHead->basic.bufSize, pHead->basic.offset); int64_t blkId = pHead->basic.blkId; pHead = pHead->next; @@ -347,7 +348,7 @@ static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheDa } qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64, - pBasic->fileId, pBasic->blkId, pBasic->offset, pBasic->bufSize); + pBasic->fileId, pBasic->blkId, pBasic->bufSize, pBasic->offset); _return: @@ -387,7 +388,8 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { pVgCtx->pTbList = pTbList; - snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId); + snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d_%d", + tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, downstreamId, vgId); pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0; pVgCtx->fileCtx.baseNameLen = strlen(pVgCtx->fileCtx.baseFilename); @@ -857,6 +859,7 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { if (NULL == pCache->pReadBlk) { return TSDB_CODE_OUT_OF_MEMORY; } + pCache->writeDownstreamId = -1; return TSDB_CODE_SUCCESS; } @@ -982,7 +985,8 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { return TSDB_CODE_OUT_OF_MEMORY; } - snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id); + snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%" PRIx64 "_%" PRIu64 "_%d", + tsTempDir, getpid(), pOperator->pTaskInfo->id.queryId, pOperator->pTaskInfo->id.taskId, pCtx->id); pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0; pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename); } @@ -1018,7 +1022,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pInfo->maxCacheSize = 104857600; + pInfo->maxCacheSize = 1; pInfo->grpByUid = pPhyciNode->grpByUid; pInfo->globalGrp = pPhyciNode->globalGrp; pInfo->batchFetch = pPhyciNode->batchFetch; diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index dfbc39252e..77588e02f2 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -27,6 +27,31 @@ #include "ttypes.h" #include "hashjoin.h" + +static int64_t getSingleKeyRowsNum(SBufRowInfo* pRow) { + int64_t rows = 0; + while (pRow) { + rows++; + pRow = pRow->next; + } + return rows; +} + +static int64_t getRowsNumOfKeyHash(SSHashObj* pHash) { + SGroupData* pGroup = NULL; + int32_t iter = 0; + int64_t rowsNum = 0; + + while (pGroup = tSimpleHashIterate(pHash, pGroup, &iter)) { + int32_t* pKey = tSimpleHashGetKey(pGroup, NULL); + int64_t rows = getSingleKeyRowsNum(pGroup->rows); + qTrace("build_key:%d, rows:%" PRId64, *pKey, rows); + rowsNum += rows; + } + + return rowsNum; +} + static int32_t initHJoinKeyColsInfo(SHJoinTableInfo* pTable, SNodeList* pList) { pTable->keyNum = LIST_LENGTH(pList); @@ -274,6 +299,9 @@ static void destroyHJoinKeyHash(SSHashObj** ppHash) { static void destroyHashJoinOperator(void* param) { SHJoinOperatorInfo* pJoinOperator = (SHJoinOperatorInfo*)param; + qError("hashJoin exec info, buildBlk:%" PRId64 ", buildRows:%" PRId64 ", probeBlk:%" PRId64 ", probeRows:%" PRId64 ", resRows:%" PRId64, + pJoinOperator->execInfo.buildBlkNum, pJoinOperator->execInfo.buildBlkRows, pJoinOperator->execInfo.probeBlkNum, + pJoinOperator->execInfo.probeBlkRows, pJoinOperator->execInfo.resRows); destroyHJoinKeyHash(&pJoinOperator->pKeyHash); @@ -348,19 +376,21 @@ static FORCE_INLINE int32_t copyHJoinResRowsToBlock(SHJoinOperatorInfo* pJoin, i } -static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { +static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, SSDataBlock* pRes, bool* allFetched) { SHJoinOperatorInfo* pJoin = pOperator->info; SHJoinCtx* pCtx = &pJoin->ctx; SBufRowInfo* pStart = pCtx->pBuildRow; int32_t rowNum = 0; int32_t resNum = pRes->info.rows; - while (pCtx->pBuildRow && resNum < pRes->info.capacity) { + while (pCtx->pBuildRow && (resNum < pRes->info.capacity)) { rowNum++; resNum++; pCtx->pBuildRow = pCtx->pBuildRow->next; } + pJoin->execInfo.resRows += rowNum; + int32_t code = copyHJoinResRowsToBlock(pJoin, rowNum, pStart, pRes); if (code) { pOperator->pTaskInfo->code = code; @@ -368,7 +398,7 @@ static FORCE_INLINE void appendHJoinResToBlock(struct SOperatorInfo* pOperator, } pRes->info.rows = resNum; - pCtx->rowRemains = pCtx->pBuildRow ? true : false; + *allFetched = pCtx->pBuildRow ? false : true; } @@ -412,23 +442,44 @@ static void doHashJoinImpl(struct SOperatorInfo* pOperator) { SHJoinCtx* pCtx = &pJoin->ctx; SSDataBlock* pRes = pJoin->pRes; size_t bufLen = 0; + bool allFetched = false; if (pJoin->ctx.pBuildRow) { - appendHJoinResToBlock(pOperator, pRes); + appendHJoinResToBlock(pOperator, pRes, &allFetched); if (pRes->info.rows >= pRes->info.capacity) { + if (allFetched) { + ++pCtx->probeIdx; + } + return; + } else { + ++pCtx->probeIdx; } } for (; pCtx->probeIdx < pCtx->pProbeData->info.rows; ++pCtx->probeIdx) { copyKeyColsDataToBuf(pProbe, pCtx->probeIdx, &bufLen); SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen); +/* + size_t keySize = 0; + int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize); + ASSERT(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen)); + int64_t rows = getSingleKeyRowsNum(pGroup->rows); + pJoin->execInfo.expectRows += rows; + qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows); +*/ if (pGroup) { pCtx->pBuildRow = pGroup->rows; - appendHJoinResToBlock(pOperator, pRes); + appendHJoinResToBlock(pOperator, pRes, &allFetched); if (pRes->info.rows >= pRes->info.capacity) { + if (allFetched) { + ++pCtx->probeIdx; + } + return; } + } else { + qTrace("no key matched"); } } @@ -649,6 +700,9 @@ static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) { break; } + pJoin->execInfo.buildBlkNum++; + pJoin->execInfo.buildBlkRows += pBlock->info.rows; + code = addBlockRowsToHash(pBlock, pJoin); if (code) { return code; @@ -673,6 +727,7 @@ static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* pJoin->ctx.probeIdx = 0; pJoin->ctx.pBuildRow = NULL; pJoin->ctx.pProbeData = pBlock; + pJoin->ctx.rowRemains = true; doHashJoinImpl(pOperator); @@ -718,12 +773,14 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { setHJoinDone(pOperator); goto _return; } + + //qTrace("build table rows:%" PRId64, getRowsNumOfKeyHash(pJoin->pKeyHash)); } if (pJoin->ctx.rowRemains) { doHashJoinImpl(pOperator); - if (pRes->info.rows >= pOperator->resultInfo.threshold && pOperator->exprSupp.pFilterInfo != NULL) { + if (pRes->info.rows >= pRes->info.capacity && pOperator->exprSupp.pFilterInfo != NULL) { doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); } if (pRes->info.rows > 0) { @@ -737,6 +794,9 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { setHJoinDone(pOperator); break; } + + pJoin->execInfo.probeBlkNum++; + pJoin->execInfo.probeBlkRows += pBlock->info.rows; code = launchBlockHashJoin(pOperator, pBlock); if (code) { diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 22d171e74a..96fe1bf2f9 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -54,6 +54,7 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP pTaskInfo->id.vgId = vgId; pTaskInfo->id.queryId = queryId; + pTaskInfo->id.taskId = taskId; pTaskInfo->id.str = taosMemoryMalloc(64); buildTaskId(taskId, queryId, pTaskInfo->id.str); return pTaskInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f32fe5ba5b..a45621bf71 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -803,18 +803,24 @@ static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) { pListInfo->oneTableForEachGroup = false; pListInfo->numOfOuputGroups = 1; } - + + STableKeyInfo info = {.groupId = 0}; for (int32_t i = 0; i < num; ++i) { uint64_t* pUid = taosArrayGet(pParam->pUidList, i); - STableKeyInfo info = {.uid = *pUid, .groupId = 0}; + if (taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &i, sizeof(int32_t))) { + if (TSDB_CODE_DUP_KEY == terrno) { + continue; + } + return TSDB_CODE_OUT_OF_MEMORY; + } + + info.uid = *pUid; void* p = taosArrayPush(pListInfo->pTableList, &info); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &i, sizeof(int32_t)); - qError("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo)); } @@ -3596,4 +3602,4 @@ static void destoryTableCountScanOperator(void* param) { taosArrayDestroy(pTableCountScanInfo->stbUidList); taosMemoryFreeClear(param); -} \ No newline at end of file +}