diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 6a19b9bd08..ace43813e2 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -27,11 +27,11 @@ typedef struct SGcBlkBufBasic { int64_t blkId; int64_t offset; int64_t bufSize; - uint32_t fileId; } SGcBlkBufBasic; typedef struct SGcBlkBufInfo { SGcBlkBufBasic basic; + uint32_t fileId; void* next; void* pBuf; SGcDownstreamCtx* pCtx; @@ -39,10 +39,10 @@ typedef struct SGcBlkBufInfo { } SGcBlkBufInfo; #pragma pack(pop) -typedef struct SGcVgroupFileFd { +typedef struct SGroupCacheFileFd { TdThreadMutex mutex; TdFilePtr fd; -} SGcVgroupFileFd; +} SGroupCacheFileFd; typedef struct SGcVgroupCtx { SArray* pTbList; @@ -117,7 +117,7 @@ typedef struct SGcDownstreamCtx { SHashObj* pSessions; SHashObj* pWaitSessions; int32_t cacheFileFdNum; - TdFilePtr cacheFileFd[GROUP_CACHE_MAX_FILE_FDS]; + SGroupCacheFileFd cacheFileFd; char baseFilename[PATH_MAX]; } SGcDownstreamCtx; @@ -126,7 +126,6 @@ typedef struct SGcSessionCtx { SGcOperatorParam* pParam; SGroupCacheData* pGroupData; int64_t lastBlkId; - int64_t nextOffset; bool semInit; tsem_t waitSem; bool newFetch; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index a219c4c0a0..1cb0a04603 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -85,7 +85,7 @@ static void destroyGroupCacheOperator(void* param) { taosMemoryFreeClear(param); } -static FORCE_INLINE int32_t initOpenCacheFile(SGcVgroupFileFd* pFileFd, char* filename) { +static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) { TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); if (NULL == newFd) { return TAOS_SYSTEM_ERROR(errno); @@ -95,13 +95,13 @@ static FORCE_INLINE int32_t initOpenCacheFile(SGcVgroupFileFd* pFileFd, char* fi return TSDB_CODE_SUCCESS; } -static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, uint32_t fileId, SGcVgroupFileFd** ppFd) { +static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, uint32_t fileId, SGroupCacheFileFd** ppFd) { int32_t code = TSDB_CODE_SUCCESS; - SGcVgroupFileFd* pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); + SGroupCacheFileFd* pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); if (NULL == pTmp) { sprintf(pVgCtx->baseFilename[pVgCtx->baseNameLen], "_%u", fileId); - SGcVgroupFileFd newVgFd = {0}; + SGroupCacheFileFd newVgFd = {0}; tSimpleHashPut(pVgCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd)); pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); @@ -117,7 +117,7 @@ static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstre return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void releaseVgroupFileFd(SGcVgroupFileFd* pFd) { +static FORCE_INLINE void releaseVgroupFileFd(SGroupCacheFileFd* pFd) { taosThreadMutexUnlock(&pFd->mutex); } @@ -126,27 +126,30 @@ static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBuf SGcBlkBufBasic blkBasic; while (NULL != pHead) { - if (NULL == pHead->pCtx->cacheFileFd[0]) { - pHead->pCtx->cacheFileFd[0] = taosOpenFile(pHead->pCtx->baseFilename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); - if (NULL == pHead->pCtx->cacheFileFd[0]) { - return TAOS_SYSTEM_ERROR(errno); - } - pHead->pCtx->cacheFileFdNum = 1; + SGroupCacheFileFd *pFd; + code = acquireDownstreamFileFd(pHead->pCtx, &pFd); + if (code) { + return code; } - - int32_t ret = taosLSeekFile(pHead->pCtx->cacheFileFd[0], pHead->offset, SEEK_SET); + + int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); if (ret == -1) { + releaseDownstreamFileFd(); return TAOS_SYSTEM_ERROR(errno); } - ret = (int32_t)taosWriteFile(pHead->pCtx->cacheFileFd[0], pHead->pBuf, pHead->bufSize); - if (ret != pHead->bufSize) { + ret = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize); + if (ret != pHead->basic.bufSize) { + releaseDownstreamFileFd(); return TAOS_SYSTEM_ERROR(errno); } + releaseDownstreamFileFd(); - taos - - int64_t blkId = pHead->blkId; + taosWLockLatch(&pHead->pGroup->batchList.lock); + taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic); + taosWUnLockLatch(&pHead->pGroup->batchList.lock); + + int64_t blkId = pHead->basic.blkId; pHead = pHead->next; taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); @@ -159,27 +162,32 @@ static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBuf static int32_t saveSeqBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { int32_t code = TSDB_CODE_SUCCESS; - SGcVgroupFileFd* pFd = NULL; + SGroupCacheFileFd* pFd = NULL; while (NULL != pHead) { code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd); if (TSDB_CODE_SUCCESS != code) { return code; } - int32_t ret = taosLSeekFile(pFd->fd, pHead->offset, SEEK_SET); + int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); if (ret == -1) { code= TAOS_SYSTEM_ERROR(errno); return code; } - code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->bufSize); + code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize); releaseVgroupFileFd(pFd); - if (code != pHead->bufSize) { + if (code != pHead->basic.bufSize) { code= TAOS_SYSTEM_ERROR(errno); return code; } + + taosWLockLatch(&pHead->pGroup->batchList.lock); + taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic); + taosWUnLockLatch(&pHead->pGroup->batchList.lock); + - int64_t blkId = pHead->blkId; + int64_t blkId = pHead->basic.blkId; pHead = pHead->next; taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); @@ -332,23 +340,114 @@ static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int3 return blockDataFromBuf(*ppRes, pBuf); } -static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) { +static int32_t acquireDownstreamFileFd(SGcDownstreamCtx* pCtx, SGroupCacheFileFd** ppFd) { + if (NULL == pCtx->cacheFileFd.fd) { + pCtx->cacheFileFd.fd = taosOpenFile(pCtx->baseFilename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); + if (NULL == pCtx->cacheFileFd.fd) { + return TAOS_SYSTEM_ERROR(errno); + } + pCtx->cacheFileFdNum = 1; + taosThreadMutexInit(&pCtx->cacheFileFd.mutex, NULL); + } else { + taosThreadMutexLock(&pCtx->cacheFileFd.mutex); + } + + *ppFd = &pCtx->cacheFileFd; + return TSDB_CODE_SUCCESS; +} + +static int32_t readBatchBlocksFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) { + TdFilePtr cacheFileFd = NULL; + SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGrp->downstreamIdx]; + int32_t code = acquireDownstreamFileFd(pCtx, &cacheFileFd); + if (code) { + return code; + } + int32_t ret = taosLSeekFile(cacheFileFd, pBasic->offset, SEEK_SET); + if (ret == -1) { + return TAOS_SYSTEM_ERROR(errno); + } + + *ppBuf = taosMemoryMalloc(pBasic->bufSize); + if (NULL == *ppBuf) { + releaseDownstreamFileFd(); + return TSDB_CODE_OUT_OF_MEMORY; + } + + ret = (int32_t)taosReadFile(cacheFileFd, *ppBuf, pBasic->bufSize); + if (ret != pBasic->bufSize) { + taosMemoryFreeClear(*ppBuf); + return TAOS_SYSTEM_ERROR(errno); + } + + releaseDownstreamFileFd(); + return code; +} + + +static int32_t readSeqBlocksFromDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { + int32_t code = TSDB_CODE_SUCCESS; + + SGroupCacheFileFd* pFd = NULL; + while (NULL != pHead) { + code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); + if (ret == -1) { + code= TAOS_SYSTEM_ERROR(errno); + return code; + } + + code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize); + releaseVgroupFileFd(pFd); + if (code != pHead->basic.bufSize) { + code= TAOS_SYSTEM_ERROR(errno); + return code; + } + + taosWLockLatch(&pHead->pGroup->batchList.lock); + taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic); + taosWUnLockLatch(&pHead->pGroup->batchList.lock); + + + int64_t blkId = pHead->basic.blkId; + pHead = pHead->next; + + taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + } + + return code; +} + + +static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppRes) { + int32_t code = TSDB_CODE_SUCCESS; + + if (pGCache->batchFetch) { + code = readBatchBlocksFromDisk(pGCache, pGrp, pBasic, ppRes); + } else { + code = readSeqBlocksFromDisk(pGCache, pGrp, pBasic, ppRes); + } + + return code; +} + +static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, SGcBlkBufBasic* pBasic, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SGcBlkCacheInfo* pCache = &pGCache->blkCache; - SGcBlkBufInfo bufInfo; - SGcBlkBufBasic* pBlkBasic = NULL; void* pBuf = NULL; taosRLockLatch(&pCache->dirtyLock); - SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &blkId, sizeof(blkId)); + SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &pBasic->blkId, sizeof(pBasic->blkId)); if (NULL == pBufInfo) { - code = readBlockFromDisk(pGCache, pGrp, blkId, nextOffset, &bufInfo.basic, &pBuf); + code = readBlockFromDisk(pGCache, pGrp, pBasic, &pBuf); if (code) { return code; } - pBlkBasic = &bufInfo.basic; } else { - pBlkBasic = &pBufInfo->basic; pBuf = pBufInfo->pBuf; } @@ -358,8 +457,6 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC return code; } - *nextOffset = pBlkBasic->offset + pBlkBasic->bufSize; - taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES); return TSDB_CODE_SUCCESS; } @@ -594,26 +691,18 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* return TSDB_CODE_SUCCESS; } -static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk) { - if (batchFetch) { - taosWLockLatch(&pGroup->batchList.lock); - taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic); - qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pList)); - taosWUnLockLatch(&pGroup->batchList.lock); - return TSDB_CODE_SUCCESS; - } - +static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int32_t* pIdx) { taosWLockLatch(&pGroup->batchList.lock); - - if (pGroup->seqList.endBlkId > 0) { - atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId); + if (batchFetch) { + taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic); } else { - atomic_store_64(&pGroup->seqList.startBlkId, pNewBlk->blkId); - atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId); + taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic.offset); } - qError("block added to group cache, total block num:%" PRId64, pGroup->seqList.endBlkId - pGroup->seqList.startBlkId + 1); - + *pIdx = taosArrayGetSize(pGroup->batchList.pList) - 1; + + qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pList)); + taosWUnLockLatch(&pGroup->batchList.lock); return TSDB_CODE_SUCCESS; } @@ -651,15 +740,15 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD return code; } - code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf); + int64_t blkIdx = 0; + code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &blkIdx); if (code) { return code; } notifyWaitingSessions(pGroup->waitQueue); if (pGroup == pSession->pGroupData) { - pSession->lastBlkId = newBlkBuf.blkId; - pSession->nextOffset = newBlkBuf.offset + newBlkBuf.bufSize; + pSession->lastBlkId = blkIdx; *continueFetch = false; } @@ -746,40 +835,25 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 SGroupCacheOperatorInfo* pGCache = pOperator->info; *got = true; - if (pGCache->batchFetch) { - SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList; - taosRLockLatch(&pBatchList->lock); - int64_t blkNum = taosArrayGetSize(pBatchList->pList); - if (pSession->lastBlkId < 0) { - if (blkNum > 0) { - SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, 0); - taosRUnLockLatch(&pBatchList->lock); - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic->blkId, &pSession->nextOffset, ppRes); - pSession->lastBlkId = 0; - return code; - } - } else if ((pSession->lastBlkId + 1) < blkNum) { - SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, pSession->lastBlkId + 1); + SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList; + taosRLockLatch(&pBatchList->lock); + int64_t blkNum = taosArrayGetSize(pBatchList->pList); + if (pSession->lastBlkId < 0) { + if (blkNum > 0) { + SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, 0); taosRUnLockLatch(&pBatchList->lock); - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic->blkId, &pSession->nextOffset, ppRes); - pSession->lastBlkId++; + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); + pSession->lastBlkId = 0; return code; } + } else if ((pSession->lastBlkId + 1) < blkNum) { + SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, pSession->lastBlkId + 1); taosRUnLockLatch(&pBatchList->lock); - } else { - if (pSession->lastBlkId < 0) { - int64_t startBlkId = atomic_load_64(&pSession->pGroupData->seqList.startBlkId); - if (startBlkId > 0) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, startBlkId, &pSession->nextOffset, ppRes); - pSession->lastBlkId = startBlkId; - return code; - } - } else if (pSession->lastBlkId < atomic_load_64(&pSession->pGroupData->seqList.endBlkId)) { - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pSession->lastBlkId + 1, &pSession->nextOffset, ppRes); - pSession->lastBlkId++; - return code; - } + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); + pSession->lastBlkId++; + return code; } + taosRUnLockLatch(&pBatchList->lock); if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { *ppRes = NULL; @@ -897,7 +971,6 @@ static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOp pSession->downstreamIdx = pGcParam->downstreamIdx; pSession->pGroupData = pGroup; pSession->lastBlkId = -1; - pSession->nextOffset = 0; } static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {