enh: add disk cache

This commit is contained in:
dapan1121 2023-07-21 17:30:09 +08:00
parent 7221f447df
commit 3725cafe00
2 changed files with 283 additions and 89 deletions

View File

@ -20,35 +20,48 @@ extern "C" {
#endif #endif
#define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600 #define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600
#define GROUP_CACHE_MAX_FILE_FDS 10
#pragma pack(push, 1) #pragma pack(push, 1)
typedef struct SGcBlkBufBasic {
int64_t blkId;
int64_t offset;
int64_t bufSize;
uint32_t fileId;
} SGcBlkBufBasic;
typedef struct SGcBlkBufInfo { typedef struct SGcBlkBufInfo {
void* prev; SGcBlkBufBasic basic;
void* next; void* next;
int64_t blkId; void* pBuf;
int64_t offset; SGcDownstreamCtx* pCtx;
int64_t bufSize; SGroupCacheData* pGroup;
void* pBuf;
uint32_t fileId;
} SGcBlkBufInfo; } SGcBlkBufInfo;
#pragma pack(pop) #pragma pack(pop)
typedef struct SGcVgroupFileFd {
TdThreadMutex mutex;
TdFilePtr fd;
} SGcVgroupFileFd;
typedef struct SGcVgroupCtx { typedef struct SGcVgroupCtx {
SArray* pTbList; SArray* pTbList;
uint64_t lastUid; uint64_t lastUid;
int64_t fileSize; int64_t fileSize;
uint32_t fileId; uint32_t fileId;
SSHashObj* pCacheFile;
int32_t baseNameLen;
char baseFilename[PATH_MAX];
} SGcVgroupCtx; } SGcVgroupCtx;
typedef struct SGroupSeqBlkList { typedef struct SGroupSeqBlkList {
int64_t startBlkId; SRWLatch lock;
int64_t endBlkId; SArray* pList;
} SGroupSeqBlkList; } SGroupSeqBlkList;
typedef struct SGroupBatchBlkList { typedef struct SGroupBatchBlkList {
SRWLatch lock; SRWLatch lock;
SArray* pBlkList; SArray* pList;
} SGroupBatchBlkList; } SGroupBatchBlkList;
typedef struct SGroupCacheData { typedef struct SGroupCacheData {
@ -91,6 +104,7 @@ typedef struct SGcNewGroupInfo {
} SGcNewGroupInfo; } SGcNewGroupInfo;
typedef struct SGcDownstreamCtx { typedef struct SGcDownstreamCtx {
int32_t id;
SRWLatch grpLock; SRWLatch grpLock;
int64_t fetchSessionId; int64_t fetchSessionId;
SArray* pNewGrpList; // SArray<SGcNewGroupInfo> SArray* pNewGrpList; // SArray<SGcNewGroupInfo>
@ -101,7 +115,10 @@ typedef struct SGcDownstreamCtx {
SArray* pFreeBlock; SArray* pFreeBlock;
int64_t lastBlkUid; int64_t lastBlkUid;
SHashObj* pSessions; SHashObj* pSessions;
SHashObj* pWaitSessions; SHashObj* pWaitSessions;
int32_t cacheFileFdNum;
TdFilePtr cacheFileFd[GROUP_CACHE_MAX_FILE_FDS];
char baseFilename[PATH_MAX];
} SGcDownstreamCtx; } SGcDownstreamCtx;
typedef struct SGcSessionCtx { typedef struct SGcSessionCtx {
@ -128,12 +145,12 @@ typedef struct SGcCacheFile {
typedef struct SGcBlkCacheInfo { typedef struct SGcBlkCacheInfo {
SRWLatch dirtyLock; SRWLatch dirtyLock;
SSHashObj* pCacheFile;
SHashObj* pDirtyBlk; SHashObj* pDirtyBlk;
SGcBlkBufInfo* pDirtyHead; SGcBlkBufInfo* pDirtyHead;
SGcBlkBufInfo* pDirtyTail; SGcBlkBufInfo* pDirtyTail;
SHashObj* pReadBlk; SHashObj* pReadBlk;
int64_t blkCacheSize; int64_t blkCacheSize;
int32_t writeDownstreamId;
} SGcBlkCacheInfo; } SGcBlkCacheInfo;
typedef struct SGroupCacheOperatorInfo { typedef struct SGroupCacheOperatorInfo {
@ -144,7 +161,6 @@ typedef struct SGroupCacheOperatorInfo {
bool globalGrp; bool globalGrp;
bool grpByUid; bool grpByUid;
bool batchFetch; bool batchFetch;
bool fetchDone;
SGcDownstreamCtx* pDownstreams; SGcDownstreamCtx* pDownstreams;
SGcBlkCacheInfo blkCache; SGcBlkCacheInfo blkCache;
SHashObj* pGrpHash; SHashObj* pGrpHash;

View File

@ -85,30 +85,166 @@ static void destroyGroupCacheOperator(void* param) {
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { static FORCE_INLINE int32_t initOpenCacheFile(SGcVgroupFileFd* pFileFd, char* filename) {
TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
if (NULL == newFd) {
return TAOS_SYSTEM_ERROR(errno);
}
pFileFd->fd = newFd;
taosThreadMutexInit(&pFileFd->mutex, NULL);
return TSDB_CODE_SUCCESS;
}
static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, uint32_t fileId, SGcVgroupFileFd** ppFd) {
int32_t code = TSDB_CODE_SUCCESS;
SGcVgroupFileFd* pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId));
if (NULL == pTmp) {
sprintf(pVgCtx->baseFilename[pVgCtx->baseNameLen], "_%u", fileId);
SGcVgroupFileFd newVgFd = {0};
tSimpleHashPut(pVgCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd));
pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId));
code = initOpenCacheFile(pTmp, pVgCtx->baseFilename);
if (code) {
return code;
}
}
taosThreadMutexLock(&pTmp->mutex);
*ppFd = pTmp;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void releaseVgroupFileFd(SGcVgroupFileFd* pFd) {
taosThreadMutexUnlock(&pFd->mutex);
}
static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufBasic blkBasic;
while (NULL != pHead) {
if (NULL == pHead->pCtx->cacheFileFd[0]) {
pHead->pCtx->cacheFileFd[0] = taosOpenFile(pHead->pCtx->baseFilename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
if (NULL == pHead->pCtx->cacheFileFd[0]) {
return TAOS_SYSTEM_ERROR(errno);
}
pHead->pCtx->cacheFileFdNum = 1;
}
int32_t ret = taosLSeekFile(pHead->pCtx->cacheFileFd[0], pHead->offset, SEEK_SET);
if (ret == -1) {
return TAOS_SYSTEM_ERROR(errno);
}
ret = (int32_t)taosWriteFile(pHead->pCtx->cacheFileFd[0], pHead->pBuf, pHead->bufSize);
if (ret != pHead->bufSize) {
return TAOS_SYSTEM_ERROR(errno);
}
taos
int64_t blkId = pHead->blkId;
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
}
return code;
}
static int32_t saveSeqBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
SGcVgroupFileFd* pFd = NULL;
while (NULL != pHead) {
code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
int32_t ret = taosLSeekFile(pFd->fd, pHead->offset, SEEK_SET);
if (ret == -1) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->bufSize);
releaseVgroupFileFd(pFd);
if (code != pHead->bufSize) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
int64_t blkId = pHead->blkId;
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
}
return code;
}
static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
if (pGCache->batchFetch) {
code = saveBatchBlocksToDisk(pGCache, pHead);
} else {
code = saveSeqBlocksToDisk(pGCache, pHead);
}
atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1);
return code;
}
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) {
if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId), pBufInfo, sizeof(*pBufInfo))) { if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId), pBufInfo, sizeof(*pBufInfo))) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId)); pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId));
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufInfo* pWriteHead = NULL;
taosWLockLatch(&pCache->dirtyLock); taosWLockLatch(&pCache->dirtyLock);
pCache->blkCacheSize += pBufInfo->bufSize;
qError("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize);
if (NULL == pCache->pDirtyHead) { if (NULL == pCache->pDirtyHead) {
pCache->pDirtyHead = pBufInfo; pCache->pDirtyHead = pBufInfo;
} else { } else {
pBufInfo->prev = pCache->pDirtyTail;
pCache->pDirtyTail->next = pBufInfo; pCache->pDirtyTail->next = pBufInfo;
} }
pCache->pDirtyTail = pBufInfo; pCache->pDirtyTail = pBufInfo;
if (pGCache->maxCacheSize > 0 && pCache->blkCacheSize > pGCache->maxCacheSize) {
if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) {
pWriteHead = pCache->pDirtyHead;
SGcBlkBufInfo* pTmp = pCache->pDirtyHead;
while (NULL != pTmp) {
pCache->blkCacheSize -= pTmp->bufSize;
if (pCache->blkCacheSize <= pGCache->maxCacheSize) {
pCache->pDirtyHead = pTmp->next;
pTmp->next = NULL;
break;
}
pTmp = pTmp->next;
}
}
}
taosWUnLockLatch(&pCache->dirtyLock); taosWUnLockLatch(&pCache->dirtyLock);
int64_t blkCacheSize = atomic_add_fetch_64(&pCache->blkCacheSize, pBufInfo->bufSize); if (NULL != pWriteHead) {
qDebug("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), blkCacheSize); code = saveBlocksToDisk(pGCache, pCtx, pWriteHead);
if (pGCache->maxCacheSize > 0 && blkCacheSize > pGCache->maxCacheSize) {
//TODO
} }
return TSDB_CODE_SUCCESS; return code;
} }
static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) { static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) {
@ -121,16 +257,15 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB
} }
blockDataToBuf(pBufInfo->pBuf, pBlock); blockDataToBuf(pBufInfo->pBuf, pBlock);
pBufInfo->prev = NULL;
pBufInfo->next = NULL; pBufInfo->next = NULL;
pBufInfo->blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1); pBufInfo->blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1);
pBufInfo->fileId = pGroup->fileId; pBufInfo->fileId = pGroup->fileId;
pBufInfo->offset = pGroup->pVgCtx->fileSize;
pBufInfo->bufSize = bufSize; pBufInfo->bufSize = bufSize;
pBufInfo->offset = atomic_fetch_add_64(&pGroup->pVgCtx->fileSize, bufSize);
pBufInfo->pCtx = pCtx;
pBufInfo->pGroup = pGroup;
pGroup->pVgCtx->fileSize += bufSize; int32_t code = addBlkToDirtyBufList(pGCache, pCtx, pGroup->pVgCtx, &pGCache->blkCache, pBufInfo);
int32_t code = addBlkToDirtyBufList(pGCache, &pGCache->blkCache, pBufInfo);
return code; return code;
} }
@ -188,40 +323,54 @@ static void releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock)
} }
static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, SGcBlkBufInfo* pBufInfo, SSDataBlock** ppRes) { static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, void* pBuf, SSDataBlock** ppRes) {
int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes); int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes);
if (code) { if (code) {
return code; return code;
} }
//TODO OPTIMIZE PERF //TODO OPTIMIZE PERF
return blockDataFromBuf(*ppRes, pBufInfo->pBuf); return blockDataFromBuf(*ppRes, pBuf);
} }
static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) { static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SGcBlkCacheInfo* pCache = &pGCache->blkCache; SGcBlkCacheInfo* pCache = &pGCache->blkCache;
SGcBlkBufInfo bufInfo;
SGcBlkBufBasic* pBlkBasic = NULL;
void* pBuf = NULL;
taosRLockLatch(&pCache->dirtyLock); taosRLockLatch(&pCache->dirtyLock);
SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &blkId, sizeof(blkId)); SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &blkId, sizeof(blkId));
if (pBufInfo) { if (NULL == pBufInfo) {
code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBufInfo, ppRes); code = readBlockFromDisk(pGCache, pGrp, blkId, nextOffset, &bufInfo.basic, &pBuf);
taosRUnLockLatch(&pCache->dirtyLock);
if (code) { if (code) {
return code; return code;
} }
pBlkBasic = &bufInfo.basic;
*nextOffset = pBufInfo->offset + pBufInfo->bufSize; } else {
pBlkBasic = &pBufInfo->basic;
taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES); pBuf = pBufInfo->pBuf;
return TSDB_CODE_SUCCESS;
} }
taosRUnLockLatch(&pCache->dirtyLock);
//TODO READ FROM FILE
code = TSDB_CODE_INVALID_PARA;
qError("block %" PRId64 "not found in dirty block list", blkId);
return code; code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes);
taosRUnLockLatch(&pCache->dirtyLock);
if (code) {
return code;
}
*nextOffset = pBlkBasic->offset + pBlkBasic->bufSize;
taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES);
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void initGcVgroupCtx(SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
pVgCtx->pTbList = pTbList;
snprintf(pVgCtx->baseFilename, sizeof(pVgCtx->baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId);
pVgCtx->baseFilename[sizeof(pVgCtx->baseFilename) - 1] = 0;
pVgCtx->baseNameLen = strlen(pVgCtx->baseFilename);
} }
static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) { static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) {
@ -232,7 +381,8 @@ static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosArrayPush(pList, pNew); taosArrayPush(pList, pNew);
SGcVgroupCtx vgCtx = {.pTbList = pList, .lastUid = 0, .fileSize = 0, .fileId = 0}; SGcVgroupCtx vgCtx = {0};
initGcVgroupCtx(&vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList);
tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)); tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx));
pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId)); pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -331,7 +481,7 @@ static void notifyWaitingSessions(SArray* pWaitQueue) {
static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) { static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) {
pGroup->pBlock = NULL; pGroup->pBlock = NULL;
pGroup->fetchDone = true; atomic_store_8((int8_t*)&pGroup->fetchDone, true);
taosThreadMutexLock(&pGroup->mutex); taosThreadMutexLock(&pGroup->mutex);
notifyWaitingSessions(pGroup->waitQueue); notifyWaitingSessions(pGroup->waitQueue);
@ -339,10 +489,27 @@ static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) {
taosThreadMutexUnlock(&pGroup->mutex); taosThreadMutexUnlock(&pGroup->mutex);
} }
static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) { static int32_t vgroupSwitchNewFile(SGcVgroupCtx* pVgCtx) {
if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) { if (NULL != pVgCtx->cacheFileFd) {
return; if (NULL == pVgCtx->pCacheFile) {
pVgCtx->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pVgCtx->pCacheFile) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
tSimpleHashPut(pVgCtx->pCacheFile, &pVgCtx->fileId, sizeof(pVgCtx->fileId), &pVgCtx->cacheFileFd, sizeof(pVgCtx->cacheFileFd));
} }
pVgCtx->fileId++;
pVgCtx->fileSize = 0;
return TSDB_CODE_SUCCESS;
}
static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) {
if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) {
return TSDB_CODE_SUCCESS;
}
pCtx->lastBlkUid = uid; pCtx->lastBlkUid = uid;
pGroup->pVgCtx->lastUid = uid; pGroup->pVgCtx->lastUid = uid;
@ -356,12 +523,16 @@ static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData*
} }
if (pGroup->pVgCtx->fileSize >= GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { if (pGroup->pVgCtx->fileSize >= GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
pGroup->pVgCtx->fileId++; int32_t code = vgroupSwitchNewFile(pGroup->pVgCtx);
pGroup->pVgCtx->fileSize = 0; if (TSDB_CODE_SUCCESS != code) {
return code;
}
} }
pGroup->fileId = pGroup->pVgCtx->fileId; pGroup->fileId = pGroup->pVgCtx->fileId;
pGroup->startOffset = pGroup->pVgCtx->fileSize; pGroup->startOffset = pGroup->pVgCtx->fileSize;
return TSDB_CODE_SUCCESS;
} }
@ -369,10 +540,11 @@ static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheDat
taosThreadMutexInit(&pGroup->mutex, NULL); taosThreadMutexInit(&pGroup->mutex, NULL);
pGroup->downstreamIdx = downstreamIdx; pGroup->downstreamIdx = downstreamIdx;
pGroup->vgId = vgId; pGroup->vgId = vgId;
pGroup->fileId = -1;
if (batchFetch) { if (batchFetch) {
pGroup->batchList.pBlkList = taosArrayInit(10, POINTER_BYTES); pGroup->fileId = 0;
pGroup->batchList.pList = taosArrayInit(10, POINTER_BYTES);
} else { } else {
pGroup->fileId = -1;
pGroup->seqList.startBlkId = -1; pGroup->seqList.startBlkId = -1;
pGroup->seqList.endBlkId = -1; pGroup->seqList.endBlkId = -1;
@ -425,11 +597,13 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk) { static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk) {
if (batchFetch) { if (batchFetch) {
taosWLockLatch(&pGroup->batchList.lock); taosWLockLatch(&pGroup->batchList.lock);
taosArrayPush(pGroup->batchList.pBlkList, &pNewBlk->blkId); taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic);
qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pBlkList)); qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pList));
taosWUnLockLatch(&pGroup->batchList.lock); taosWUnLockLatch(&pGroup->batchList.lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosWLockLatch(&pGroup->batchList.lock);
if (pGroup->seqList.endBlkId > 0) { if (pGroup->seqList.endBlkId > 0) {
atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId); atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId);
@ -465,7 +639,10 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD
} }
if (!pGCache->batchFetch) { if (!pGCache->batchFetch) {
handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId); code = handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} }
SGcBlkBufInfo newBlkBuf; SGcBlkBufInfo newBlkBuf;
@ -495,8 +672,6 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes
SGroupCacheOperatorInfo* pGCache = pOperator->info; SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
if (pGCache->batchFetch) { if (pGCache->batchFetch) {
atomic_store_8((int8_t*)&pGCache->fetchDone, true);
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData* pGroup = NULL; SGroupCacheData* pGroup = NULL;
while (pGroup = taosHashIterate(pGrpHash, pGroup)) { while (pGroup = taosHashIterate(pGrpHash, pGroup)) {
@ -574,19 +749,19 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64
if (pGCache->batchFetch) { if (pGCache->batchFetch) {
SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList; SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList;
taosRLockLatch(&pBatchList->lock); taosRLockLatch(&pBatchList->lock);
int64_t blkNum = taosArrayGetSize(pBatchList->pBlkList); int64_t blkNum = taosArrayGetSize(pBatchList->pList);
if (pSession->lastBlkId < 0) { if (pSession->lastBlkId < 0) {
if (blkNum > 0) { if (blkNum > 0) {
int64_t* pIdx = taosArrayGet(pBatchList->pBlkList, 0); SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, 0);
taosRUnLockLatch(&pBatchList->lock); taosRUnLockLatch(&pBatchList->lock);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic->blkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId = 0; pSession->lastBlkId = 0;
return code; return code;
} }
} else if ((pSession->lastBlkId + 1) < blkNum) { } else if ((pSession->lastBlkId + 1) < blkNum) {
int64_t* pIdx = taosArrayGet(pBatchList->pBlkList, pSession->lastBlkId + 1); SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, pSession->lastBlkId + 1);
taosRUnLockLatch(&pBatchList->lock); taosRUnLockLatch(&pBatchList->lock);
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes); code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic->blkId, &pSession->nextOffset, ppRes);
pSession->lastBlkId++; pSession->lastBlkId++;
return code; return code;
} }
@ -704,14 +879,11 @@ _return:
static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
SGcBlkCacheInfo* pCache = &pInfo->blkCache; SGcBlkCacheInfo* pCache = &pInfo->blkCache;
pCache->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pCache->pCacheFile) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pCache->pDirtyBlk) { if (NULL == pCache->pDirtyBlk) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
taosHashSetFreeFp(pCache->pDirtyBlk,,,,,,,,);
pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pCache->pReadBlk) { if (NULL == pCache->pReadBlk) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -725,7 +897,7 @@ static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOp
pSession->downstreamIdx = pGcParam->downstreamIdx; pSession->downstreamIdx = pGcParam->downstreamIdx;
pSession->pGroupData = pGroup; pSession->pGroupData = pGroup;
pSession->lastBlkId = -1; pSession->lastBlkId = -1;
pSession->nextOffset = -1; pSession->nextOffset = 0;
} }
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) { static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {
@ -801,43 +973,49 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
} }
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
pInfo->pDownstreams[i].fetchSessionId = -1; SGcDownstreamCtx* pCtx = &pInfo->pDownstreams[i];
pInfo->pDownstreams[i].lastBlkUid = 0; pCtx->id = i;
pInfo->pDownstreams[i].pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); pCtx->fetchSessionId = -1;
if (NULL == pInfo->pDownstreams[i].pVgTbHash) { pCtx->lastBlkUid = 0;
pCtx->pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pCtx->pVgTbHash) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (pInfo->batchFetch) { if (pInfo->batchFetch) {
SGcVgroupCtx vgCtx = {.pTbList = NULL, .lastUid = 0, .fileSize = 0, .fileId = i}; int32_t defaultVg = 0;
int32_t defaultVg = -1; SGcVgroupCtx vgCtx = {0};
tSimpleHashPut(pInfo->pDownstreams[i].pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)); initGcVgroupCtx(&vgCtx, pCtx->id, defaultVg, NULL);
tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx));
} }
pInfo->pDownstreams[i].pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo)); pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo));
if (NULL == pInfo->pDownstreams[i].pNewGrpList) { if (NULL == pCtx->pNewGrpList) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if (!pInfo->globalGrp) { if (!pInfo->globalGrp) {
pInfo->pDownstreams[i].pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); pCtx->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pInfo->pDownstreams[i].pGrpHash == NULL) { if (pCtx->pGrpHash == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
pInfo->pDownstreams[i].pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pInfo->pDownstreams[i].pSessions == NULL) { if (pCtx->pSessions == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pInfo->pDownstreams[i].pFreeBlock = taosArrayInit(10, POINTER_BYTES); pCtx->pFreeBlock = taosArrayInit(10, POINTER_BYTES);
if (NULL == pInfo->pDownstreams[i].pFreeBlock) { if (NULL == pCtx->pFreeBlock) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pInfo->pDownstreams[i].pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); pCtx->pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pInfo->pDownstreams[i].pWaitSessions == NULL) { if (pCtx->pWaitSessions == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
snprintf(pCtx->baseFilename, sizeof(pCtx->baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id);
pCtx->baseFilename[sizeof(pCtx->baseFilename) - 1] = 0;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -871,7 +1049,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->maxCacheSize = -1; pInfo->maxCacheSize = 104857600;
pInfo->grpByUid = pPhyciNode->grpByUid; pInfo->grpByUid = pPhyciNode->grpByUid;
pInfo->globalGrp = pPhyciNode->globalGrp; pInfo->globalGrp = pPhyciNode->globalGrp;
pInfo->batchFetch = pPhyciNode->batchFetch; pInfo->batchFetch = pPhyciNode->batchFetch;