enh: read/write from disk cache

This commit is contained in:
dapan1121 2023-07-24 11:39:00 +08:00
parent ae8caa8275
commit ebf1a91fb5
2 changed files with 151 additions and 261 deletions

View File

@ -24,19 +24,11 @@ extern "C" {
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct SGcBlkBufBasic { typedef struct SGcBlkBufBasic {
uint32_t fileId;
int64_t blkId; int64_t blkId;
int64_t offset; int64_t offset;
int64_t bufSize; int64_t bufSize;
} SGcBlkBufBasic; } SGcBlkBufBasic;
typedef struct SGcBlkBufInfo {
SGcBlkBufBasic basic;
uint32_t fileId;
void* next;
void* pBuf;
SGcDownstreamCtx* pCtx;
SGroupCacheData* pGroup;
} SGcBlkBufInfo;
#pragma pack(pop) #pragma pack(pop)
typedef struct SGroupCacheFileFd { typedef struct SGroupCacheFileFd {
@ -44,25 +36,40 @@ typedef struct SGroupCacheFileFd {
TdFilePtr fd; TdFilePtr fd;
} SGroupCacheFileFd; } SGroupCacheFileFd;
typedef struct SGcVgroupCtx { typedef struct SGcFileCacheCtx {
SArray* pTbList;
uint64_t lastUid;
int64_t fileSize; int64_t fileSize;
uint32_t fileId; uint32_t fileId;
SSHashObj* pCacheFile; SHashObj* pCacheFile;
int32_t baseNameLen; int32_t baseNameLen;
char baseFilename[PATH_MAX]; char baseFilename[PATH_MAX];
} SGcFileCacheCtx;
typedef struct SGcDownstreamCtx {
int32_t id;
SRWLatch grpLock;
int64_t fetchSessionId;
SArray* pNewGrpList; // SArray<SGcNewGroupInfo>
SSHashObj* pVgTbHash;
SHashObj* pGrpHash;
SRWLatch blkLock;
SSDataBlock* pBaseBlock;
SArray* pFreeBlock;
int64_t lastBlkUid;
SHashObj* pSessions;
SHashObj* pWaitSessions;
SGcFileCacheCtx fileCtx;
} SGcDownstreamCtx;
typedef struct SGcVgroupCtx {
SArray* pTbList;
uint64_t lastBlkUid;
SGcFileCacheCtx fileCtx;
} SGcVgroupCtx; } SGcVgroupCtx;
typedef struct SGroupSeqBlkList { typedef struct SGcBlkList {
SRWLatch lock;
SArray* pList;
} SGroupSeqBlkList;
typedef struct SGroupBatchBlkList {
SRWLatch lock; SRWLatch lock;
SArray* pList; SArray* pList;
} SGroupBatchBlkList; } SGcBlkList;
typedef struct SGroupCacheData { typedef struct SGroupCacheData {
TdThreadMutex mutex; TdThreadMutex mutex;
@ -72,10 +79,7 @@ typedef struct SGroupCacheData {
SGcVgroupCtx* pVgCtx; SGcVgroupCtx* pVgCtx;
int32_t downstreamIdx; int32_t downstreamIdx;
int32_t vgId; int32_t vgId;
union { SGcBlkList blkList;
SGroupSeqBlkList seqList;
SGroupBatchBlkList batchList;
};
uint32_t fileId; uint32_t fileId;
int64_t startOffset; int64_t startOffset;
} SGroupCacheData; } SGroupCacheData;
@ -103,24 +107,6 @@ typedef struct SGcNewGroupInfo {
SOperatorParam* pParam; SOperatorParam* pParam;
} SGcNewGroupInfo; } SGcNewGroupInfo;
typedef struct SGcDownstreamCtx {
int32_t id;
SRWLatch grpLock;
int64_t fetchSessionId;
SArray* pNewGrpList; // SArray<SGcNewGroupInfo>
SSHashObj* pVgTbHash;
SHashObj* pGrpHash;
SRWLatch blkLock;
SSDataBlock* pBaseBlock;
SArray* pFreeBlock;
int64_t lastBlkUid;
SHashObj* pSessions;
SHashObj* pWaitSessions;
int32_t cacheFileFdNum;
SGroupCacheFileFd cacheFileFd;
char baseFilename[PATH_MAX];
} SGcDownstreamCtx;
typedef struct SGcSessionCtx { typedef struct SGcSessionCtx {
int32_t downstreamIdx; int32_t downstreamIdx;
SGcOperatorParam* pParam; SGcOperatorParam* pParam;
@ -131,6 +117,14 @@ typedef struct SGcSessionCtx {
bool newFetch; bool newFetch;
} SGcSessionCtx; } SGcSessionCtx;
typedef struct SGcBlkBufInfo {
SGcBlkBufBasic basic;
void* next;
void* pBuf;
SGcDownstreamCtx* pCtx;
SGroupCacheData* pGroup;
} SGcBlkBufInfo;
typedef struct SGcExecInfo { typedef struct SGcExecInfo {
int32_t downstreamNum; int32_t downstreamNum;
int64_t* pDownstreamBlkNum; int64_t* pDownstreamBlkNum;

View File

@ -95,17 +95,24 @@ static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, uint32_t fileId, SGroupCacheFileFd** ppFd) { static int32_t acquireFdFromFileCtx(SGcFileCacheCtx* pFileCtx, uint32_t fileId, SGroupCacheFileFd** ppFd) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheFileFd* pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); if (NULL == pFileCtx->pCacheFile) {
pFileCtx->pCacheFile = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
if (NULL == pFileCtx->pCacheFile) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
SGroupCacheFileFd* pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) { if (NULL == pTmp) {
sprintf(pVgCtx->baseFilename[pVgCtx->baseNameLen], "_%u", fileId); sprintf(&pFileCtx->baseFilename[pFileCtx->baseNameLen], "_%u", fileId);
SGroupCacheFileFd newVgFd = {0}; SGroupCacheFileFd newVgFd = {0};
tSimpleHashPut(pVgCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd)); taosHashPut(pFileCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd));
pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); pTmp = taosHashGet(pFileCtx->pCacheFile, &fileId, sizeof(fileId));
code = initOpenCacheFile(pTmp, pVgCtx->baseFilename); code = initOpenCacheFile(pTmp, pFileCtx->baseFilename);
if (code) { if (code) {
return code; return code;
} }
@ -117,37 +124,41 @@ static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstre
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void releaseVgroupFileFd(SGroupCacheFileFd* pFd) { static FORCE_INLINE void releaseFdToFileCtx(SGroupCacheFileFd* pFd) {
taosThreadMutexUnlock(&pFd->mutex); taosThreadMutexUnlock(&pFd->mutex);
} }
static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufBasic blkBasic; SGroupCacheFileFd *pFd;
SGcFileCacheCtx* pFileCtx = NULL;
while (NULL != pHead) { while (NULL != pHead) {
SGroupCacheFileFd *pFd; pFileCtx = pGCache->batchFetch ? &pHead->pCtx->fileCtx : &pHead->pGroup->pVgCtx->fileCtx;
code = acquireDownstreamFileFd(pHead->pCtx, &pFd);
code = acquireFdFromFileCtx(pFileCtx, pHead->basic.fileId, &pFd);
if (code) { if (code) {
return code; goto _return;
} }
int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET); int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
if (ret == -1) { if (ret == -1) {
releaseDownstreamFileFd(); releaseFdToFileCtx(pFd);
return TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _return;
} }
ret = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize); ret = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
if (ret != pHead->basic.bufSize) { if (ret != pHead->basic.bufSize) {
releaseDownstreamFileFd(); releaseFdToFileCtx(pFd);
return TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _return;
} }
releaseDownstreamFileFd();
releaseFdToFileCtx(pFd);
taosWLockLatch(&pHead->pGroup->batchList.lock); qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " written to offset %" PRIu64,
taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic); pHead->basic.fileId, pHead->basic.blkId, pHead->basic.offset, pHead->basic.bufSize);
taosWUnLockLatch(&pHead->pGroup->batchList.lock);
int64_t blkId = pHead->basic.blkId; int64_t blkId = pHead->basic.blkId;
pHead = pHead->next; pHead = pHead->next;
@ -155,56 +166,8 @@ static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBuf
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
} }
return code;
}
_return:
static int32_t saveSeqBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheFileFd* pFd = NULL;
while (NULL != pHead) {
code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
if (ret == -1) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
releaseVgroupFileFd(pFd);
if (code != pHead->basic.bufSize) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
taosWLockLatch(&pHead->pGroup->batchList.lock);
taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic);
taosWUnLockLatch(&pHead->pGroup->batchList.lock);
int64_t blkId = pHead->basic.blkId;
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
}
return code;
}
static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
if (pGCache->batchFetch) {
code = saveBatchBlocksToDisk(pGCache, pHead);
} else {
code = saveSeqBlocksToDisk(pGCache, pHead);
}
atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1); atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1);
@ -212,16 +175,16 @@ static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamC
} }
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) {
if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId), pBufInfo, sizeof(*pBufInfo))) { if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId), pBufInfo, sizeof(*pBufInfo))) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId)); pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->basic.blkId, sizeof(pBufInfo->basic.blkId));
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufInfo* pWriteHead = NULL; SGcBlkBufInfo* pWriteHead = NULL;
taosWLockLatch(&pCache->dirtyLock); taosWLockLatch(&pCache->dirtyLock);
pCache->blkCacheSize += pBufInfo->bufSize; pCache->blkCacheSize += pBufInfo->basic.bufSize;
qError("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize); qError("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize);
if (NULL == pCache->pDirtyHead) { if (NULL == pCache->pDirtyHead) {
@ -236,7 +199,7 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
pWriteHead = pCache->pDirtyHead; pWriteHead = pCache->pDirtyHead;
SGcBlkBufInfo* pTmp = pCache->pDirtyHead; SGcBlkBufInfo* pTmp = pCache->pDirtyHead;
while (NULL != pTmp) { while (NULL != pTmp) {
pCache->blkCacheSize -= pTmp->bufSize; pCache->blkCacheSize -= pTmp->basic.bufSize;
if (pCache->blkCacheSize <= pGCache->maxCacheSize) { if (pCache->blkCacheSize <= pGCache->maxCacheSize) {
pCache->pDirtyHead = pTmp->next; pCache->pDirtyHead = pTmp->next;
pTmp->next = NULL; pTmp->next = NULL;
@ -255,6 +218,17 @@ static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstr
return code; return code;
} }
static FORCE_INLINE void groupCacheSwitchNewFile(SGcFileCacheCtx* pFileCtx) {
if (pFileCtx->fileSize < GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
return;
}
pFileCtx->fileId++;
pFileCtx->fileSize = 0;
}
static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) { static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) {
SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheOperatorInfo* pGCache = pOperator->info;
int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t); int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t);
@ -265,14 +239,20 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB
} }
blockDataToBuf(pBufInfo->pBuf, pBlock); blockDataToBuf(pBufInfo->pBuf, pBlock);
SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pCtx->fileCtx : &pGroup->pVgCtx->fileCtx;
pBufInfo->next = NULL; pBufInfo->next = NULL;
pBufInfo->blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1); pBufInfo->basic.blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1);
pBufInfo->fileId = pGroup->fileId; pBufInfo->basic.fileId = pGCache->batchFetch ? pFileCtx->fileId : pGroup->fileId;
pBufInfo->bufSize = bufSize; pBufInfo->basic.bufSize = bufSize;
pBufInfo->offset = atomic_fetch_add_64(&pGroup->pVgCtx->fileSize, bufSize); pBufInfo->basic.offset = atomic_fetch_add_64(&pFileCtx->fileSize, bufSize);
pBufInfo->pCtx = pCtx; pBufInfo->pCtx = pCtx;
pBufInfo->pGroup = pGroup; pBufInfo->pGroup = pGroup;
if (pGCache->batchFetch) {
groupCacheSwitchNewFile(pFileCtx);
}
int32_t code = addBlkToDirtyBufList(pGCache, pCtx, pGroup->pVgCtx, &pGCache->blkCache, pBufInfo); int32_t code = addBlkToDirtyBufList(pGCache, pCtx, pGroup->pVgCtx, &pGCache->blkCache, pBufInfo);
return code; return code;
@ -340,98 +320,38 @@ static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int3
return blockDataFromBuf(*ppRes, pBuf); return blockDataFromBuf(*ppRes, pBuf);
} }
static int32_t acquireDownstreamFileFd(SGcDownstreamCtx* pCtx, SGroupCacheFileFd** ppFd) { static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) {
if (NULL == pCtx->cacheFileFd.fd) { SGroupCacheFileFd *pFileFd = NULL;
pCtx->cacheFileFd.fd = taosOpenFile(pCtx->baseFilename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); SGcFileCacheCtx* pFileCtx = pGCache->batchFetch ? &pGCache->pDownstreams[pGrp->downstreamIdx].fileCtx : &pGrp->pVgCtx->fileCtx;
if (NULL == pCtx->cacheFileFd.fd) { int32_t code = acquireFdFromFileCtx(pFileCtx, pBasic->fileId, &pFileFd);
return TAOS_SYSTEM_ERROR(errno);
}
pCtx->cacheFileFdNum = 1;
taosThreadMutexInit(&pCtx->cacheFileFd.mutex, NULL);
} else {
taosThreadMutexLock(&pCtx->cacheFileFd.mutex);
}
*ppFd = &pCtx->cacheFileFd;
return TSDB_CODE_SUCCESS;
}
static int32_t readBatchBlocksFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) {
TdFilePtr cacheFileFd = NULL;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGrp->downstreamIdx];
int32_t code = acquireDownstreamFileFd(pCtx, &cacheFileFd);
if (code) { if (code) {
return code; return code;
} }
int32_t ret = taosLSeekFile(cacheFileFd, pBasic->offset, SEEK_SET); int32_t ret = taosLSeekFile(pFileFd->fd, pBasic->offset, SEEK_SET);
if (ret == -1) { if (ret == -1) {
return TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _return;
} }
*ppBuf = taosMemoryMalloc(pBasic->bufSize); *ppBuf = taosMemoryMalloc(pBasic->bufSize);
if (NULL == *ppBuf) { if (NULL == *ppBuf) {
releaseDownstreamFileFd(); code = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY; goto _return;
} }
ret = (int32_t)taosReadFile(cacheFileFd, *ppBuf, pBasic->bufSize); ret = (int32_t)taosReadFile(pFileFd->fd, *ppBuf, pBasic->bufSize);
if (ret != pBasic->bufSize) { if (ret != pBasic->bufSize) {
taosMemoryFreeClear(*ppBuf); taosMemoryFreeClear(*ppBuf);
return TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _return;
} }
releaseDownstreamFileFd(); qTrace("FileId %u, blk %" PRIu64 " size %" PRIu64 " read from offset %" PRIu64,
return code; pBasic->fileId, pBasic->blkId, pBasic->offset, pBasic->bufSize);
}
_return:
static int32_t readSeqBlocksFromDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { releaseFdToFileCtx(pFileFd);
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheFileFd* pFd = NULL;
while (NULL != pHead) {
code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
if (ret == -1) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
releaseVgroupFileFd(pFd);
if (code != pHead->basic.bufSize) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
taosWLockLatch(&pHead->pGroup->batchList.lock);
taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic);
taosWUnLockLatch(&pHead->pGroup->batchList.lock);
int64_t blkId = pHead->basic.blkId;
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
}
return code;
}
static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
if (pGCache->batchFetch) {
code = readBatchBlocksFromDisk(pGCache, pGrp, pBasic, ppRes);
} else {
code = readSeqBlocksFromDisk(pGCache, pGrp, pBasic, ppRes);
}
return code; return code;
} }
@ -440,7 +360,6 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC
SGcBlkCacheInfo* pCache = &pGCache->blkCache; SGcBlkCacheInfo* pCache = &pGCache->blkCache;
void* pBuf = NULL; void* pBuf = NULL;
taosRLockLatch(&pCache->dirtyLock);
SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &pBasic->blkId, sizeof(pBasic->blkId)); SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &pBasic->blkId, sizeof(pBasic->blkId));
if (NULL == pBufInfo) { if (NULL == pBufInfo) {
code = readBlockFromDisk(pGCache, pGrp, pBasic, &pBuf); code = readBlockFromDisk(pGCache, pGrp, pBasic, &pBuf);
@ -452,7 +371,11 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC
} }
code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes); code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes);
taosRUnLockLatch(&pCache->dirtyLock); taosHashRelease(pCache->pDirtyBlk, pBufInfo);
if (NULL == pBufInfo) {
taosMemoryFree(pBuf);
}
if (code) { if (code) {
return code; return code;
} }
@ -461,16 +384,16 @@ static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupC
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void initGcVgroupCtx(SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { static FORCE_INLINE void initGcVgroupCtx(SOperatorInfo* pOperator, SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
pVgCtx->pTbList = pTbList; pVgCtx->pTbList = pTbList;
snprintf(pVgCtx->baseFilename, sizeof(pVgCtx->baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId); snprintf(pVgCtx->fileCtx.baseFilename, sizeof(pVgCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId);
pVgCtx->baseFilename[sizeof(pVgCtx->baseFilename) - 1] = 0; pVgCtx->fileCtx.baseFilename[sizeof(pVgCtx->fileCtx.baseFilename) - 1] = 0;
pVgCtx->baseNameLen = strlen(pVgCtx->baseFilename); pVgCtx->fileCtx.baseNameLen = strlen(pVgCtx->fileCtx.baseFilename);
} }
static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) { static int32_t addNewGroupToVgHash(SOperatorInfo* pOperator, SSHashObj* pHash, SGcNewGroupInfo* pNew) {
SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx; SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx;
if (NULL == pVgCtx) { if (NULL == pVgCtx) {
SArray* pList = taosArrayInit(10, sizeof(*pNew)); SArray* pList = taosArrayInit(10, sizeof(*pNew));
@ -479,7 +402,7 @@ static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) {
} }
taosArrayPush(pList, pNew); taosArrayPush(pList, pNew);
SGcVgroupCtx vgCtx = {0}; SGcVgroupCtx vgCtx = {0};
initGcVgroupCtx(&vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList); initGcVgroupCtx(pOperator, &vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList);
tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)); tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx));
pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId)); pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -504,7 +427,7 @@ static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOp
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i); SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i);
if (!pGCache->batchFetch) { if (!pGCache->batchFetch) {
code = addNewGroupToVgHash(pCtx->pVgTbHash, pNew); code = addNewGroupToVgHash(pOperator, pCtx->pVgTbHash, pNew);
if (code) { if (code) {
goto _return; goto _return;
} }
@ -586,29 +509,13 @@ static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) {
taosThreadMutexUnlock(&pGroup->mutex); taosThreadMutexUnlock(&pGroup->mutex);
} }
static int32_t vgroupSwitchNewFile(SGcVgroupCtx* pVgCtx) {
if (NULL != pVgCtx->cacheFileFd) {
if (NULL == pVgCtx->pCacheFile) {
pVgCtx->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pVgCtx->pCacheFile) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
tSimpleHashPut(pVgCtx->pCacheFile, &pVgCtx->fileId, sizeof(pVgCtx->fileId), &pVgCtx->cacheFileFd, sizeof(pVgCtx->cacheFileFd));
}
pVgCtx->fileId++;
pVgCtx->fileSize = 0;
return TSDB_CODE_SUCCESS;
}
static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) { static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) {
if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) { if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastBlkUid == uid) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pCtx->lastBlkUid = uid; pCtx->lastBlkUid = uid;
pGroup->pVgCtx->lastUid = uid; pGroup->pVgCtx->lastBlkUid = uid;
int32_t i = 0; int32_t i = 0;
while (true) { while (true) {
@ -619,15 +526,10 @@ static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheDat
handleGroupFetchDone(pNew->pGroup); handleGroupFetchDone(pNew->pGroup);
} }
if (pGroup->pVgCtx->fileSize >= GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { groupCacheSwitchNewFile(&pGroup->pVgCtx->fileCtx);
int32_t code = vgroupSwitchNewFile(pGroup->pVgCtx);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
pGroup->fileId = pGroup->pVgCtx->fileId; pGroup->fileId = pGroup->pVgCtx->fileCtx.fileId;
pGroup->startOffset = pGroup->pVgCtx->fileSize; pGroup->startOffset = pGroup->pVgCtx->fileCtx.fileSize;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -637,15 +539,8 @@ static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheDat
taosThreadMutexInit(&pGroup->mutex, NULL); taosThreadMutexInit(&pGroup->mutex, NULL);
pGroup->downstreamIdx = downstreamIdx; pGroup->downstreamIdx = downstreamIdx;
pGroup->vgId = vgId; pGroup->vgId = vgId;
if (batchFetch) { pGroup->fileId = -1;
pGroup->fileId = 0; pGroup->blkList.pList = taosArrayInit(10, sizeof(SGcBlkBufBasic));
pGroup->batchList.pList = taosArrayInit(10, POINTER_BYTES);
} else {
pGroup->fileId = -1;
pGroup->seqList.startBlkId = -1;
pGroup->seqList.endBlkId = -1;
}
pGroup->startOffset = -1; pGroup->startOffset = -1;
pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId)); pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
} }
@ -691,18 +586,14 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int32_t* pIdx) { static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int64_t* pIdx) {
taosWLockLatch(&pGroup->batchList.lock); taosWLockLatch(&pGroup->blkList.lock);
if (batchFetch) { taosArrayPush(pGroup->blkList.pList, &pNewBlk->basic);
taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic); *pIdx = taosArrayGetSize(pGroup->blkList.pList) - 1;
} else { taosWUnLockLatch(&pGroup->blkList.lock);
taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic.offset);
}
*pIdx = taosArrayGetSize(pGroup->batchList.pList) - 1; qError("block added to group cache, total block num:%" PRId64, *pIdx + 1);
qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pList));
taosWUnLockLatch(&pGroup->batchList.lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -835,25 +726,25 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64
SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheOperatorInfo* pGCache = pOperator->info;
*got = true; *got = true;
SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList; SGcBlkList* pBlkList = &pSession->pGroupData->blkList;
taosRLockLatch(&pBatchList->lock); taosRLockLatch(&pBlkList->lock);
int64_t blkNum = taosArrayGetSize(pBatchList->pList); int64_t blkNum = taosArrayGetSize(pBlkList->pList);
if (pSession->lastBlkId < 0) { if (pSession->lastBlkId < 0) {
if (blkNum > 0) { if (blkNum > 0) {
SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, 0); SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, 0);
taosRUnLockLatch(&pBatchList->lock); taosRUnLockLatch(&pBlkList->lock);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
pSession->lastBlkId = 0; pSession->lastBlkId = 0;
return code; return code;
} }
} else if ((pSession->lastBlkId + 1) < blkNum) { } else if ((pSession->lastBlkId + 1) < blkNum) {
SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, pSession->lastBlkId + 1); SGcBlkBufBasic* pBasic = taosArrayGet(pBlkList->pList, pSession->lastBlkId + 1);
taosRUnLockLatch(&pBatchList->lock); taosRUnLockLatch(&pBlkList->lock);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
pSession->lastBlkId++; pSession->lastBlkId++;
return code; return code;
} }
taosRUnLockLatch(&pBatchList->lock); taosRUnLockLatch(&pBlkList->lock);
if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) { if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = NULL; *ppRes = NULL;
@ -950,6 +841,10 @@ _return:
return code; return code;
} }
void freeGcBlkBufInfo(void* ptr) {
SGcBlkBufInfo* pBlk = (SGcBlkBufInfo*)ptr;
taosMemoryFree(pBlk->pBuf);
}
static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
SGcBlkCacheInfo* pCache = &pInfo->blkCache; SGcBlkCacheInfo* pCache = &pInfo->blkCache;
@ -957,7 +852,7 @@ static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
if (NULL == pCache->pDirtyBlk) { if (NULL == pCache->pDirtyBlk) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosHashSetFreeFp(pCache->pDirtyBlk,,,,,,,,); taosHashSetFreeFp(pCache->pDirtyBlk, freeGcBlkBufInfo);
pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pCache->pReadBlk) { if (NULL == pCache->pReadBlk) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1057,7 +952,7 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
if (pInfo->batchFetch) { if (pInfo->batchFetch) {
int32_t defaultVg = 0; int32_t defaultVg = 0;
SGcVgroupCtx vgCtx = {0}; SGcVgroupCtx vgCtx = {0};
initGcVgroupCtx(&vgCtx, pCtx->id, defaultVg, NULL); initGcVgroupCtx(pOperator, &vgCtx, pCtx->id, defaultVg, NULL);
tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)); tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx));
} }
@ -1087,8 +982,9 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
snprintf(pCtx->baseFilename, sizeof(pCtx->baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id); snprintf(pCtx->fileCtx.baseFilename, sizeof(pCtx->fileCtx.baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id);
pCtx->baseFilename[sizeof(pCtx->baseFilename) - 1] = 0; pCtx->fileCtx.baseFilename[sizeof(pCtx->fileCtx.baseFilename) - 1] = 0;
pCtx->fileCtx.baseNameLen = strlen(pCtx->fileCtx.baseFilename);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;