diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index ace43813e2..bbb6c06f3d 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -24,19 +24,11 @@ extern "C" { #pragma pack(push, 1) typedef struct SGcBlkBufBasic { + uint32_t fileId; int64_t blkId; int64_t offset; int64_t bufSize; } SGcBlkBufBasic; - -typedef struct SGcBlkBufInfo { - SGcBlkBufBasic basic; - uint32_t fileId; - void* next; - void* pBuf; - SGcDownstreamCtx* pCtx; - SGroupCacheData* pGroup; -} SGcBlkBufInfo; #pragma pack(pop) typedef struct SGroupCacheFileFd { @@ -44,25 +36,40 @@ typedef struct SGroupCacheFileFd { TdFilePtr fd; } SGroupCacheFileFd; -typedef struct SGcVgroupCtx { - SArray* pTbList; - uint64_t lastUid; +typedef struct SGcFileCacheCtx { int64_t fileSize; uint32_t fileId; - SSHashObj* pCacheFile; + SHashObj* pCacheFile; int32_t baseNameLen; char baseFilename[PATH_MAX]; +} SGcFileCacheCtx; + +typedef struct SGcDownstreamCtx { + int32_t id; + SRWLatch grpLock; + int64_t fetchSessionId; + SArray* pNewGrpList; // SArray + SSHashObj* pVgTbHash; + SHashObj* pGrpHash; + SRWLatch blkLock; + SSDataBlock* pBaseBlock; + SArray* pFreeBlock; + int64_t lastBlkUid; + SHashObj* pSessions; + SHashObj* pWaitSessions; + SGcFileCacheCtx fileCtx; +} SGcDownstreamCtx; + +typedef struct SGcVgroupCtx { + SArray* pTbList; + uint64_t lastBlkUid; + SGcFileCacheCtx fileCtx; } SGcVgroupCtx; -typedef struct SGroupSeqBlkList { - SRWLatch lock; - SArray* pList; -} SGroupSeqBlkList; - -typedef struct SGroupBatchBlkList { +typedef struct SGcBlkList { SRWLatch lock; SArray* pList; -} SGroupBatchBlkList; +} SGcBlkList; typedef struct SGroupCacheData { TdThreadMutex mutex; @@ -72,10 +79,7 @@ typedef struct SGroupCacheData { SGcVgroupCtx* pVgCtx; int32_t downstreamIdx; int32_t vgId; - union { - SGroupSeqBlkList seqList; - SGroupBatchBlkList batchList; - }; + SGcBlkList blkList; uint32_t fileId; int64_t startOffset; } SGroupCacheData; @@ -103,24 +107,6 @@ typedef struct SGcNewGroupInfo { SOperatorParam* pParam; } SGcNewGroupInfo; -typedef struct SGcDownstreamCtx { - int32_t id; - SRWLatch grpLock; - int64_t fetchSessionId; - SArray* pNewGrpList; // SArray - SSHashObj* pVgTbHash; - SHashObj* pGrpHash; - SRWLatch blkLock; - SSDataBlock* pBaseBlock; - SArray* pFreeBlock; - int64_t lastBlkUid; - SHashObj* pSessions; - SHashObj* pWaitSessions; - int32_t cacheFileFdNum; - SGroupCacheFileFd cacheFileFd; - char baseFilename[PATH_MAX]; -} SGcDownstreamCtx; - typedef struct SGcSessionCtx { int32_t downstreamIdx; SGcOperatorParam* pParam; @@ -131,6 +117,14 @@ typedef struct SGcSessionCtx { bool newFetch; } SGcSessionCtx; +typedef struct SGcBlkBufInfo { + SGcBlkBufBasic basic; + void* next; + void* pBuf; + SGcDownstreamCtx* pCtx; + SGroupCacheData* pGroup; +} SGcBlkBufInfo; + typedef struct SGcExecInfo { int32_t downstreamNum; int64_t* pDownstreamBlkNum; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 1cb0a04603..3bd071d0d1 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -95,17 +95,24 @@ static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* return TSDB_CODE_SUCCESS; } -static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, uint32_t fileId, SGroupCacheFileFd** ppFd) { +static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, uint32_t fileId, SGroupCacheFileFd** ppFd) { int32_t code = TSDB_CODE_SUCCESS; - SGroupCacheFileFd* pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); + if (NULL == pFileCtx->pCacheFile) { + pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + if (NULL == pFileCtx->pCacheFile) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + SGroupCacheFileFd* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); if (NULL == pTmp) { - sprintf(pVgCtx->baseFilename[pVgCtx->baseNameLen], "_%u", fileId); + sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId); SGroupCacheFileFd newVgFd = {0}; - tSimpleHashPut(pVgCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd)); - pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); + taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd)); + pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId)); - code = initOpenCacheFile(pTmp, pVgCtx->baseFilename); + code = initOpenCacheFile(pTmp, pFileCtx->baseFilename); if (code) { return code; } @@ -117,37 +124,41 @@ static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstre return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void releaseVgroupFileFd(SGroupCacheFileFd* pFd) { +static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) { taosThreadMutexUnlock(&pFd->mutex); } -static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { +static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) { int32_t code = TSDB_CODE_SUCCESS; - SGcBlkBufBasic blkBasic; + SGroupCacheFileFd *pFd; + SGcFileCacheCtx* pFileCtx = NULL; while (NULL != pHead) { - SGroupCacheFileFd *pFd; - code = acquireDownstreamFileFd(pHead->pCtx, &pFd); + pFileCtx = pGCache->batchFetch ? &pHead->pCtx->fileCtx : &pHead->pGroup->pVgCtx->fileCtx; + + code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd); if (code) { - return code; + goto _return; } int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); if (ret == -1) { - releaseDownstreamFileFd(); - return TAOS_SYSTEM_ERROR(errno); + releaseFdToFileCtx(pFd); + code = TAOS_SYSTEM_ERROR(errno); + goto _return; } ret = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize); if (ret != pHead->basic.bufSize) { - releaseDownstreamFileFd(); - return TAOS_SYSTEM_ERROR(errno); + releaseFdToFileCtx(pFd); + code = TAOS_SYSTEM_ERROR(errno); + goto _return; } - releaseDownstreamFileFd(); + + releaseFdToFileCtx(pFd); - taosWLockLatch(&pHead->pGroup->batchList.lock); - taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic); - taosWUnLockLatch(&pHead->pGroup->batchList.lock); + qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64, + pHead->basic.fileId, pHead->basic.blkId, pHead->basic.offset, pHead->basic.bufSize); int64_t blkId = pHead->basic.blkId; pHead = pHead->next; @@ -155,56 +166,8 @@ static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBuf taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); } - return code; -} - -static int32_t saveSeqBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { - int32_t code = TSDB_CODE_SUCCESS; - - SGroupCacheFileFd* pFd = NULL; - while (NULL != pHead) { - code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); - if (ret == -1) { - code= TAOS_SYSTEM_ERROR(errno); - return code; - } - - code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize); - releaseVgroupFileFd(pFd); - if (code != pHead->basic.bufSize) { - code= TAOS_SYSTEM_ERROR(errno); - return code; - } - - taosWLockLatch(&pHead->pGroup->batchList.lock); - taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic); - taosWUnLockLatch(&pHead->pGroup->batchList.lock); - - - int64_t blkId = pHead->basic.blkId; - pHead = pHead->next; - - taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); - } - - return code; -} - - -static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) { - int32_t code = TSDB_CODE_SUCCESS; - - if (pGCache->batchFetch) { - code = saveBatchBlocksToDisk(pGCache, pHead); - } else { - code = saveSeqBlocksToDisk(pGCache, pHead); - } +_return: atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1); @@ -212,16 +175,16 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC } static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { - if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId), pBufInfo, sizeof(*pBufInfo))) { + if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId), pBufInfo, sizeof(*pBufInfo))) { return TSDB_CODE_OUT_OF_MEMORY; } - pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId)); + pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId)); int32_t code = TSDB_CODE_SUCCESS; SGcBlkBufInfo* pWriteHead = NULL; taosWLockLatch(&pCache->dirtyLock); - pCache->blkCacheSize += pBufInfo->bufSize; + pCache->blkCacheSize += pBufInfo->basic.bufSize; qError("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize); if (NULL == pCache->pDirtyHead) { @@ -236,7 +199,7 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr pWriteHead = pCache->pDirtyHead; SGcBlkBufInfo* pTmp = pCache->pDirtyHead; while (NULL != pTmp) { - pCache->blkCacheSize -= pTmp->bufSize; + pCache->blkCacheSize -= pTmp->basic.bufSize; if (pCache->blkCacheSize <= pGCache->maxCacheSize) { pCache->pDirtyHead = pTmp->next; pTmp->next = NULL; @@ -255,6 +218,17 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr return code; } + +static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx) { + if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { + return; + } + + pFileCtx->fileId++; + pFileCtx->fileSize = 0; +} + + static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) { SGroupCacheOperatorInfo* pGCache = pOperator->info; int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t); @@ -265,14 +239,20 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB } blockDataToBuf(pBufInfo->pBuf, pBlock); + SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pCtx->fileCtx : &pGroup->pVgCtx->fileCtx; + pBufInfo->next = NULL; - pBufInfo->blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1); - pBufInfo->fileId = pGroup->fileId; - pBufInfo->bufSize = bufSize; - pBufInfo->offset = atomic_fetch_add_64(&pGroup->pVgCtx->fileSize, bufSize); + pBufInfo->basic.blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1); + pBufInfo->basic.fileId = pGCache->batchFetch ? pFileCtx->fileId : pGroup->fileId; + pBufInfo->basic.bufSize = bufSize; + pBufInfo->basic.offset = atomic_fetch_add_64(&pFileCtx->fileSize, bufSize); pBufInfo->pCtx = pCtx; pBufInfo->pGroup = pGroup; + if (pGCache->batchFetch) { + groupCacheSwitchNewFile(pFileCtx); + } + int32_t code = addBlkToDirtyBufList(pGCache, pCtx, pGroup->pVgCtx, &pGCache->blkCache, pBufInfo); return code; @@ -340,98 +320,38 @@ static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int3 return blockDataFromBuf(*ppRes, pBuf); } -static int32_t acquireDownstreamFileFd(SGcDownstreamCtx* pCtx, SGroupCacheFileFd** ppFd) { - if (NULL == pCtx->cacheFileFd.fd) { - pCtx->cacheFileFd.fd = taosOpenFile(pCtx->baseFilename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); - if (NULL == pCtx->cacheFileFd.fd) { - return TAOS_SYSTEM_ERROR(errno); - } - pCtx->cacheFileFdNum = 1; - taosThreadMutexInit(&pCtx->cacheFileFd.mutex, NULL); - } else { - taosThreadMutexLock(&pCtx->cacheFileFd.mutex); - } - - *ppFd = &pCtx->cacheFileFd; - return TSDB_CODE_SUCCESS; -} - -static int32_t readBatchBlocksFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) { - TdFilePtr cacheFileFd = NULL; - SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGrp->downstreamIdx]; - int32_t code = acquireDownstreamFileFd(pCtx, &cacheFileFd); +static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) { + SGroupCacheFileFd *pFileFd = NULL; + SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pGCache->pDownstreams[pGrp->downstreamIdx].fileCtx : &pGrp->pVgCtx->fileCtx; + int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd); if (code) { return code; } - int32_t ret = taosLSeekFile(cacheFileFd, pBasic->offset, SEEK_SET); + int32_t ret = taosLSeekFile(pFileFd->fd, pBasic->offset, SEEK_SET); if (ret == -1) { - return TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); + goto _return; } *ppBuf = taosMemoryMalloc(pBasic->bufSize); if (NULL == *ppBuf) { - releaseDownstreamFileFd(); - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _return; } - ret = (int32_t)taosReadFile(cacheFileFd, *ppBuf, pBasic->bufSize); + ret = (int32_t)taosReadFile(pFileFd->fd, *ppBuf, pBasic->bufSize); if (ret != pBasic->bufSize) { taosMemoryFreeClear(*ppBuf); - return TAOS_SYSTEM_ERROR(errno); + code = TAOS_SYSTEM_ERROR(errno); + goto _return; } - releaseDownstreamFileFd(); - return code; -} + qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64, + pBasic->fileId, pBasic->blkId, pBasic->offset, pBasic->bufSize); +_return: -static int32_t readSeqBlocksFromDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { - int32_t code = TSDB_CODE_SUCCESS; - - SGroupCacheFileFd* pFd = NULL; - while (NULL != pHead) { - code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - - int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); - if (ret == -1) { - code= TAOS_SYSTEM_ERROR(errno); - return code; - } - - code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize); - releaseVgroupFileFd(pFd); - if (code != pHead->basic.bufSize) { - code= TAOS_SYSTEM_ERROR(errno); - return code; - } - - taosWLockLatch(&pHead->pGroup->batchList.lock); - taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic); - taosWUnLockLatch(&pHead->pGroup->batchList.lock); - - - int64_t blkId = pHead->basic.blkId; - pHead = pHead->next; - - taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); - } - - return code; -} - - -static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppRes) { - int32_t code = TSDB_CODE_SUCCESS; - - if (pGCache->batchFetch) { - code = readBatchBlocksFromDisk(pGCache, pGrp, pBasic, ppRes); - } else { - code = readSeqBlocksFromDisk(pGCache, pGrp, pBasic, ppRes); - } - + releaseFdToFileCtx(pFileFd); return code; } @@ -440,7 +360,6 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC SGcBlkCacheInfo* pCache = &pGCache->blkCache; void* pBuf = NULL; - taosRLockLatch(&pCache->dirtyLock); SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &pBasic->blkId, sizeof(pBasic->blkId)); if (NULL == pBufInfo) { code = readBlockFromDisk(pGCache, pGrp, pBasic, &pBuf); @@ -452,7 +371,11 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC } code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes); - taosRUnLockLatch(&pCache->dirtyLock); + taosHashRelease(pCache->pDirtyBlk, pBufInfo); + if (NULL == pBufInfo) { + taosMemoryFree(pBuf); + } + if (code) { return code; } @@ -461,16 +384,16 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void initGcVgroupCtx(SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { +static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { pVgCtx->pTbList = pTbList; - snprintf(pVgCtx->baseFilename, sizeof(pVgCtx->baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId); - pVgCtx->baseFilename[sizeof(pVgCtx->baseFilename) - 1] = 0; + snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId); + pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0; - pVgCtx->baseNameLen = strlen(pVgCtx->baseFilename); + pVgCtx->fileCtx.baseNameLen = strlen(pVgCtx->fileCtx.baseFilename); } -static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) { +static int32_t addNewGroupToVgHash(SOperatorInfo* pOperator, SSHashObj* pHash, SGcNewGroupInfo* pNew) { SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx; if (NULL == pVgCtx) { SArray* pList = taosArrayInit(10, sizeof(*pNew)); @@ -479,7 +402,7 @@ static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) { } taosArrayPush(pList, pNew); SGcVgroupCtx vgCtx = {0}; - initGcVgroupCtx(&vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList); + initGcVgroupCtx(pOperator, &vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList); tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)); pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId)); return TSDB_CODE_SUCCESS; @@ -504,7 +427,7 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp for (int32_t i = 0; i < num; ++i) { SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i); if (!pGCache->batchFetch) { - code = addNewGroupToVgHash(pCtx->pVgTbHash, pNew); + code = addNewGroupToVgHash(pOperator, pCtx->pVgTbHash, pNew); if (code) { goto _return; } @@ -586,29 +509,13 @@ static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) { taosThreadMutexUnlock(&pGroup->mutex); } -static int32_t vgroupSwitchNewFile(SGcVgroupCtx* pVgCtx) { - if (NULL != pVgCtx->cacheFileFd) { - if (NULL == pVgCtx->pCacheFile) { - pVgCtx->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - if (NULL == pVgCtx->pCacheFile) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } - tSimpleHashPut(pVgCtx->pCacheFile, &pVgCtx->fileId, sizeof(pVgCtx->fileId), &pVgCtx->cacheFileFd, sizeof(pVgCtx->cacheFileFd)); - } - - pVgCtx->fileId++; - pVgCtx->fileSize = 0; - return TSDB_CODE_SUCCESS; -} - static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) { - if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) { + if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastBlkUid == uid) { return TSDB_CODE_SUCCESS; } pCtx->lastBlkUid = uid; - pGroup->pVgCtx->lastUid = uid; + pGroup->pVgCtx->lastBlkUid = uid; int32_t i = 0; while (true) { @@ -619,15 +526,10 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat handleGroupFetchDone(pNew->pGroup); } - if (pGroup->pVgCtx->fileSize >= GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { - int32_t code = vgroupSwitchNewFile(pGroup->pVgCtx); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - } + groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx); - pGroup->fileId = pGroup->pVgCtx->fileId; - pGroup->startOffset = pGroup->pVgCtx->fileSize; + pGroup->fileId = pGroup->pVgCtx->fileCtx.fileId; + pGroup->startOffset = pGroup->pVgCtx->fileCtx.fileSize; return TSDB_CODE_SUCCESS; } @@ -637,15 +539,8 @@ static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheDat taosThreadMutexInit(&pGroup->mutex, NULL); pGroup->downstreamIdx = downstreamIdx; pGroup->vgId = vgId; - if (batchFetch) { - pGroup->fileId = 0; - pGroup->batchList.pList = taosArrayInit(10, POINTER_BYTES); - } else { - pGroup->fileId = -1; - pGroup->seqList.startBlkId = -1; - pGroup->seqList.endBlkId = -1; - - } + pGroup->fileId = -1; + pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic)); pGroup->startOffset = -1; pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); } @@ -691,18 +586,14 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* return TSDB_CODE_SUCCESS; } -static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int32_t* pIdx) { - taosWLockLatch(&pGroup->batchList.lock); - if (batchFetch) { - taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic); - } else { - taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic.offset); - } +static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int64_t* pIdx) { + taosWLockLatch(&pGroup->blkList.lock); + taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic); + *pIdx = taosArrayGetSize(pGroup->blkList.pList) - 1; + taosWUnLockLatch(&pGroup->blkList.lock); - *pIdx = taosArrayGetSize(pGroup->batchList.pList) - 1; + qError("block added to group cache, total block num:%" PRId64, *pIdx + 1); - qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pList)); - taosWUnLockLatch(&pGroup->batchList.lock); return TSDB_CODE_SUCCESS; } @@ -835,25 +726,25 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 SGroupCacheOperatorInfo* pGCache = pOperator->info; *got = true; - SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList; - taosRLockLatch(&pBatchList->lock); - int64_t blkNum = taosArrayGetSize(pBatchList->pList); + SGcBlkList* pBlkList = &pSession->pGroupData->blkList; + taosRLockLatch(&pBlkList->lock); + int64_t blkNum = taosArrayGetSize(pBlkList->pList); if (pSession->lastBlkId < 0) { if (blkNum > 0) { - SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, 0); - taosRUnLockLatch(&pBatchList->lock); + SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0); + taosRUnLockLatch(&pBlkList->lock); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); pSession->lastBlkId = 0; return code; } } else if ((pSession->lastBlkId + 1) < blkNum) { - SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, pSession->lastBlkId + 1); - taosRUnLockLatch(&pBatchList->lock); + SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1); + taosRUnLockLatch(&pBlkList->lock); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); pSession->lastBlkId++; return code; } - taosRUnLockLatch(&pBatchList->lock); + taosRUnLockLatch(&pBlkList->lock); if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { *ppRes = NULL; @@ -950,6 +841,10 @@ _return: return code; } +void freeGcBlkBufInfo(void* ptr) { + SGcBlkBufInfo* pBlk = (SGcBlkBufInfo*)ptr; + taosMemoryFree(pBlk->pBuf); +} static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { SGcBlkCacheInfo* pCache = &pInfo->blkCache; @@ -957,7 +852,7 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { if (NULL == pCache->pDirtyBlk) { return TSDB_CODE_OUT_OF_MEMORY; } - taosHashSetFreeFp(pCache->pDirtyBlk,,,,,,,,); + taosHashSetFreeFp(pCache->pDirtyBlk, freeGcBlkBufInfo); pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (NULL == pCache->pReadBlk) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1057,7 +952,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { if (pInfo->batchFetch) { int32_t defaultVg = 0; SGcVgroupCtx vgCtx = {0}; - initGcVgroupCtx(&vgCtx, pCtx->id, defaultVg, NULL); + initGcVgroupCtx(pOperator, &vgCtx, pCtx->id, defaultVg, NULL); tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)); } @@ -1087,8 +982,9 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { return TSDB_CODE_OUT_OF_MEMORY; } - snprintf(pCtx->baseFilename, sizeof(pCtx->baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id); - pCtx->baseFilename[sizeof(pCtx->baseFilename) - 1] = 0; + snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id); + pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0; + pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename); } return TSDB_CODE_SUCCESS;