diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 1278fa5553..6a19b9bd08 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -20,35 +20,48 @@ extern "C" { #endif #define GROUP_CACHE_DEFAULT_MAX_FILE_SIZE 104857600 +#define GROUP_CACHE_MAX_FILE_FDS 10 #pragma pack(push, 1) +typedef struct SGcBlkBufBasic { + int64_t blkId; + int64_t offset; + int64_t bufSize; + uint32_t fileId; +} SGcBlkBufBasic; + typedef struct SGcBlkBufInfo { - void* prev; - void* next; - int64_t blkId; - int64_t offset; - int64_t bufSize; - void* pBuf; - uint32_t fileId; + SGcBlkBufBasic basic; + void* next; + void* pBuf; + SGcDownstreamCtx* pCtx; + SGroupCacheData* pGroup; } SGcBlkBufInfo; #pragma pack(pop) +typedef struct SGcVgroupFileFd { + TdThreadMutex mutex; + TdFilePtr fd; +} SGcVgroupFileFd; typedef struct SGcVgroupCtx { - SArray* pTbList; - uint64_t lastUid; - int64_t fileSize; - uint32_t fileId; + SArray* pTbList; + uint64_t lastUid; + int64_t fileSize; + uint32_t fileId; + SSHashObj* pCacheFile; + int32_t baseNameLen; + char baseFilename[PATH_MAX]; } SGcVgroupCtx; typedef struct SGroupSeqBlkList { - int64_t startBlkId; - int64_t endBlkId; + SRWLatch lock; + SArray* pList; } SGroupSeqBlkList; typedef struct SGroupBatchBlkList { SRWLatch lock; - SArray* pBlkList; + SArray* pList; } SGroupBatchBlkList; typedef struct SGroupCacheData { @@ -91,6 +104,7 @@ typedef struct SGcNewGroupInfo { } SGcNewGroupInfo; typedef struct SGcDownstreamCtx { + int32_t id; SRWLatch grpLock; int64_t fetchSessionId; SArray* pNewGrpList; // SArray @@ -101,7 +115,10 @@ typedef struct SGcDownstreamCtx { SArray* pFreeBlock; int64_t lastBlkUid; SHashObj* pSessions; - SHashObj* pWaitSessions; + SHashObj* pWaitSessions; + int32_t cacheFileFdNum; + TdFilePtr cacheFileFd[GROUP_CACHE_MAX_FILE_FDS]; + char baseFilename[PATH_MAX]; } SGcDownstreamCtx; typedef struct SGcSessionCtx { @@ -128,12 +145,12 @@ typedef struct SGcCacheFile { typedef struct SGcBlkCacheInfo { SRWLatch dirtyLock; - SSHashObj* pCacheFile; SHashObj* pDirtyBlk; SGcBlkBufInfo* pDirtyHead; SGcBlkBufInfo* pDirtyTail; SHashObj* pReadBlk; int64_t blkCacheSize; + int32_t writeDownstreamId; } SGcBlkCacheInfo; typedef struct SGroupCacheOperatorInfo { @@ -144,7 +161,6 @@ typedef struct SGroupCacheOperatorInfo { bool globalGrp; bool grpByUid; bool batchFetch; - bool fetchDone; SGcDownstreamCtx* pDownstreams; SGcBlkCacheInfo blkCache; SHashObj* pGrpHash; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index f56fc98285..a219c4c0a0 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -85,30 +85,166 @@ static void destroyGroupCacheOperator(void* param) { taosMemoryFreeClear(param); } -static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { +static FORCE_INLINE int32_t initOpenCacheFile(SGcVgroupFileFd* pFileFd, char* filename) { + TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); + if (NULL == newFd) { + return TAOS_SYSTEM_ERROR(errno); + } + pFileFd->fd = newFd; + taosThreadMutexInit(&pFileFd->mutex, NULL); + return TSDB_CODE_SUCCESS; +} + +static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, uint32_t fileId, SGcVgroupFileFd** ppFd) { + int32_t code = TSDB_CODE_SUCCESS; + SGcVgroupFileFd* pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); + if (NULL == pTmp) { + sprintf(pVgCtx->baseFilename[pVgCtx->baseNameLen], "_%u", fileId); + + SGcVgroupFileFd newVgFd = {0}; + tSimpleHashPut(pVgCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd)); + pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId)); + + code = initOpenCacheFile(pTmp, pVgCtx->baseFilename); + if (code) { + return code; + } + } + + taosThreadMutexLock(&pTmp->mutex); + *ppFd = pTmp; + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE void releaseVgroupFileFd(SGcVgroupFileFd* pFd) { + taosThreadMutexUnlock(&pFd->mutex); +} + +static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { + int32_t code = TSDB_CODE_SUCCESS; + SGcBlkBufBasic blkBasic; + + while (NULL != pHead) { + if (NULL == pHead->pCtx->cacheFileFd[0]) { + pHead->pCtx->cacheFileFd[0] = taosOpenFile(pHead->pCtx->baseFilename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL); + if (NULL == pHead->pCtx->cacheFileFd[0]) { + return TAOS_SYSTEM_ERROR(errno); + } + pHead->pCtx->cacheFileFdNum = 1; + } + + int32_t ret = taosLSeekFile(pHead->pCtx->cacheFileFd[0], pHead->offset, SEEK_SET); + if (ret == -1) { + return TAOS_SYSTEM_ERROR(errno); + } + + ret = (int32_t)taosWriteFile(pHead->pCtx->cacheFileFd[0], pHead->pBuf, pHead->bufSize); + if (ret != pHead->bufSize) { + return TAOS_SYSTEM_ERROR(errno); + } + + taos + + int64_t blkId = pHead->blkId; + pHead = pHead->next; + + taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + } + + return code; +} + + +static int32_t saveSeqBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) { + int32_t code = TSDB_CODE_SUCCESS; + + SGcVgroupFileFd* pFd = NULL; + while (NULL != pHead) { + code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + + int32_t ret = taosLSeekFile(pFd->fd, pHead->offset, SEEK_SET); + if (ret == -1) { + code= TAOS_SYSTEM_ERROR(errno); + return code; + } + + code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->bufSize); + releaseVgroupFileFd(pFd); + if (code != pHead->bufSize) { + code= TAOS_SYSTEM_ERROR(errno); + return code; + } + + int64_t blkId = pHead->blkId; + pHead = pHead->next; + + taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId)); + } + + return code; +} + + +static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) { + int32_t code = TSDB_CODE_SUCCESS; + + if (pGCache->batchFetch) { + code = saveBatchBlocksToDisk(pGCache, pHead); + } else { + code = saveSeqBlocksToDisk(pGCache, pHead); + } + + atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1); + + return code; +} + +static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) { if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId), pBufInfo, sizeof(*pBufInfo))) { return TSDB_CODE_OUT_OF_MEMORY; } pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId)); + int32_t code = TSDB_CODE_SUCCESS; + SGcBlkBufInfo* pWriteHead = NULL; + taosWLockLatch(&pCache->dirtyLock); + pCache->blkCacheSize += pBufInfo->bufSize; + qError("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize); + if (NULL == pCache->pDirtyHead) { pCache->pDirtyHead = pBufInfo; } else { - pBufInfo->prev = pCache->pDirtyTail; pCache->pDirtyTail->next = pBufInfo; } pCache->pDirtyTail = pBufInfo; + + if (pGCache->maxCacheSize > 0 && pCache->blkCacheSize > pGCache->maxCacheSize) { + if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) { + pWriteHead = pCache->pDirtyHead; + SGcBlkBufInfo* pTmp = pCache->pDirtyHead; + while (NULL != pTmp) { + pCache->blkCacheSize -= pTmp->bufSize; + if (pCache->blkCacheSize <= pGCache->maxCacheSize) { + pCache->pDirtyHead = pTmp->next; + pTmp->next = NULL; + break; + } + pTmp = pTmp->next; + } + } + } taosWUnLockLatch(&pCache->dirtyLock); - - int64_t blkCacheSize = atomic_add_fetch_64(&pCache->blkCacheSize, pBufInfo->bufSize); - qDebug("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), blkCacheSize); - - if (pGCache->maxCacheSize > 0 && blkCacheSize > pGCache->maxCacheSize) { - //TODO + + if (NULL != pWriteHead) { + code = saveBlocksToDisk(pGCache, pCtx, pWriteHead); } - return TSDB_CODE_SUCCESS; + return code; } static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) { @@ -121,16 +257,15 @@ static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pB } blockDataToBuf(pBufInfo->pBuf, pBlock); - pBufInfo->prev = NULL; pBufInfo->next = NULL; pBufInfo->blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1); pBufInfo->fileId = pGroup->fileId; - pBufInfo->offset = pGroup->pVgCtx->fileSize; pBufInfo->bufSize = bufSize; + pBufInfo->offset = atomic_fetch_add_64(&pGroup->pVgCtx->fileSize, bufSize); + pBufInfo->pCtx = pCtx; + pBufInfo->pGroup = pGroup; - pGroup->pVgCtx->fileSize += bufSize; - - int32_t code = addBlkToDirtyBufList(pGCache, &pGCache->blkCache, pBufInfo); + int32_t code = addBlkToDirtyBufList(pGCache, pCtx, pGroup->pVgCtx, &pGCache->blkCache, pBufInfo); return code; } @@ -188,40 +323,54 @@ static void releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) } -static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, SGcBlkBufInfo* pBufInfo, SSDataBlock** ppRes) { +static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, void* pBuf, SSDataBlock** ppRes) { int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes); if (code) { return code; } //TODO OPTIMIZE PERF - return blockDataFromBuf(*ppRes, pBufInfo->pBuf); + return blockDataFromBuf(*ppRes, pBuf); } static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, int64_t blkId, int64_t* nextOffset, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; SGcBlkCacheInfo* pCache = &pGCache->blkCache; + SGcBlkBufInfo bufInfo; + SGcBlkBufBasic* pBlkBasic = NULL; + void* pBuf = NULL; taosRLockLatch(&pCache->dirtyLock); SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &blkId, sizeof(blkId)); - if (pBufInfo) { - code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBufInfo, ppRes); - taosRUnLockLatch(&pCache->dirtyLock); + if (NULL == pBufInfo) { + code = readBlockFromDisk(pGCache, pGrp, blkId, nextOffset, &bufInfo.basic, &pBuf); if (code) { return code; } - - *nextOffset = pBufInfo->offset + pBufInfo->bufSize; - - taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES); - return TSDB_CODE_SUCCESS; + pBlkBasic = &bufInfo.basic; + } else { + pBlkBasic = &pBufInfo->basic; + pBuf = pBufInfo->pBuf; } - taosRUnLockLatch(&pCache->dirtyLock); - - //TODO READ FROM FILE - code = TSDB_CODE_INVALID_PARA; - qError("block %" PRId64 "not found in dirty block list", blkId); - return code; + code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes); + taosRUnLockLatch(&pCache->dirtyLock); + if (code) { + return code; + } + + *nextOffset = pBlkBasic->offset + pBlkBasic->bufSize; + + taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES); + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE void initGcVgroupCtx(SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) { + pVgCtx->pTbList = pTbList; + + snprintf(pVgCtx->baseFilename, sizeof(pVgCtx->baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId); + pVgCtx->baseFilename[sizeof(pVgCtx->baseFilename) - 1] = 0; + + pVgCtx->baseNameLen = strlen(pVgCtx->baseFilename); } static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) { @@ -232,7 +381,8 @@ static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) { return TSDB_CODE_OUT_OF_MEMORY; } taosArrayPush(pList, pNew); - SGcVgroupCtx vgCtx = {.pTbList = pList, .lastUid = 0, .fileSize = 0, .fileId = 0}; + SGcVgroupCtx vgCtx = {0}; + initGcVgroupCtx(&vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList); tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx)); pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId)); return TSDB_CODE_SUCCESS; @@ -331,7 +481,7 @@ static void notifyWaitingSessions(SArray* pWaitQueue) { static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) { pGroup->pBlock = NULL; - pGroup->fetchDone = true; + atomic_store_8((int8_t*)&pGroup->fetchDone, true); taosThreadMutexLock(&pGroup->mutex); notifyWaitingSessions(pGroup->waitQueue); @@ -339,10 +489,27 @@ static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) { taosThreadMutexUnlock(&pGroup->mutex); } -static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) { - if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) { - return; +static int32_t vgroupSwitchNewFile(SGcVgroupCtx* pVgCtx) { + if (NULL != pVgCtx->cacheFileFd) { + if (NULL == pVgCtx->pCacheFile) { + pVgCtx->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pVgCtx->pCacheFile) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + tSimpleHashPut(pVgCtx->pCacheFile, &pVgCtx->fileId, sizeof(pVgCtx->fileId), &pVgCtx->cacheFileFd, sizeof(pVgCtx->cacheFileFd)); } + + pVgCtx->fileId++; + pVgCtx->fileSize = 0; + return TSDB_CODE_SUCCESS; +} + +static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) { + if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) { + return TSDB_CODE_SUCCESS; + } + pCtx->lastBlkUid = uid; pGroup->pVgCtx->lastUid = uid; @@ -356,12 +523,16 @@ static void handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* } if (pGroup->pVgCtx->fileSize >= GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) { - pGroup->pVgCtx->fileId++; - pGroup->pVgCtx->fileSize = 0; + int32_t code = vgroupSwitchNewFile(pGroup->pVgCtx); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } pGroup->fileId = pGroup->pVgCtx->fileId; pGroup->startOffset = pGroup->pVgCtx->fileSize; + + return TSDB_CODE_SUCCESS; } @@ -369,10 +540,11 @@ static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheDat taosThreadMutexInit(&pGroup->mutex, NULL); pGroup->downstreamIdx = downstreamIdx; pGroup->vgId = vgId; - pGroup->fileId = -1; if (batchFetch) { - pGroup->batchList.pBlkList = taosArrayInit(10, POINTER_BYTES); + pGroup->fileId = 0; + pGroup->batchList.pList = taosArrayInit(10, POINTER_BYTES); } else { + pGroup->fileId = -1; pGroup->seqList.startBlkId = -1; pGroup->seqList.endBlkId = -1; @@ -425,11 +597,13 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk) { if (batchFetch) { taosWLockLatch(&pGroup->batchList.lock); - taosArrayPush(pGroup->batchList.pBlkList, &pNewBlk->blkId); - qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pBlkList)); + taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic); + qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pList)); taosWUnLockLatch(&pGroup->batchList.lock); return TSDB_CODE_SUCCESS; } + + taosWLockLatch(&pGroup->batchList.lock); if (pGroup->seqList.endBlkId > 0) { atomic_store_64(&pGroup->seqList.endBlkId, pNewBlk->blkId); @@ -465,7 +639,10 @@ static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSD } if (!pGCache->batchFetch) { - handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId); + code = handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } SGcBlkBufInfo newBlkBuf; @@ -495,8 +672,6 @@ static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSes SGroupCacheOperatorInfo* pGCache = pOperator->info; SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx]; if (pGCache->batchFetch) { - atomic_store_8((int8_t*)&pGCache->fetchDone, true); - SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash; SGroupCacheData* pGroup = NULL; while (pGroup = taosHashIterate(pGrpHash, pGroup)) { @@ -574,19 +749,19 @@ static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64 if (pGCache->batchFetch) { SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList; taosRLockLatch(&pBatchList->lock); - int64_t blkNum = taosArrayGetSize(pBatchList->pBlkList); + int64_t blkNum = taosArrayGetSize(pBatchList->pList); if (pSession->lastBlkId < 0) { if (blkNum > 0) { - int64_t* pIdx = taosArrayGet(pBatchList->pBlkList, 0); + SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, 0); taosRUnLockLatch(&pBatchList->lock); - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes); + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic->blkId, &pSession->nextOffset, ppRes); pSession->lastBlkId = 0; return code; } } else if ((pSession->lastBlkId + 1) < blkNum) { - int64_t* pIdx = taosArrayGet(pBatchList->pBlkList, pSession->lastBlkId + 1); + SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, pSession->lastBlkId + 1); taosRUnLockLatch(&pBatchList->lock); - code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, *pIdx, &pSession->nextOffset, ppRes); + code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic->blkId, &pSession->nextOffset, ppRes); pSession->lastBlkId++; return code; } @@ -704,14 +879,11 @@ _return: static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) { SGcBlkCacheInfo* pCache = &pInfo->blkCache; - pCache->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - if (NULL == pCache->pCacheFile) { - return TSDB_CODE_OUT_OF_MEMORY; - } pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (NULL == pCache->pDirtyBlk) { return TSDB_CODE_OUT_OF_MEMORY; } + taosHashSetFreeFp(pCache->pDirtyBlk,,,,,,,,); pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (NULL == pCache->pReadBlk) { return TSDB_CODE_OUT_OF_MEMORY; @@ -725,7 +897,7 @@ static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOp pSession->downstreamIdx = pGcParam->downstreamIdx; pSession->pGroupData = pGroup; pSession->lastBlkId = -1; - pSession->nextOffset = -1; + pSession->nextOffset = 0; } static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) { @@ -801,43 +973,49 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { } for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - pInfo->pDownstreams[i].fetchSessionId = -1; - pInfo->pDownstreams[i].lastBlkUid = 0; - pInfo->pDownstreams[i].pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); - if (NULL == pInfo->pDownstreams[i].pVgTbHash) { + SGcDownstreamCtx* pCtx = &pInfo->pDownstreams[i]; + pCtx->id = i; + pCtx->fetchSessionId = -1; + pCtx->lastBlkUid = 0; + pCtx->pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pCtx->pVgTbHash) { return TSDB_CODE_OUT_OF_MEMORY; } if (pInfo->batchFetch) { - SGcVgroupCtx vgCtx = {.pTbList = NULL, .lastUid = 0, .fileSize = 0, .fileId = i}; - int32_t defaultVg = -1; - tSimpleHashPut(pInfo->pDownstreams[i].pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)); + int32_t defaultVg = 0; + SGcVgroupCtx vgCtx = {0}; + initGcVgroupCtx(&vgCtx, pCtx->id, defaultVg, NULL); + tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx)); } - pInfo->pDownstreams[i].pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo)); - if (NULL == pInfo->pDownstreams[i].pNewGrpList) { + pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo)); + if (NULL == pCtx->pNewGrpList) { return TSDB_CODE_OUT_OF_MEMORY; } if (!pInfo->globalGrp) { - pInfo->pDownstreams[i].pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (pInfo->pDownstreams[i].pGrpHash == NULL) { + pCtx->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (pCtx->pGrpHash == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } } - pInfo->pDownstreams[i].pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (pInfo->pDownstreams[i].pSessions == NULL) { + pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (pCtx->pSessions == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - pInfo->pDownstreams[i].pFreeBlock = taosArrayInit(10, POINTER_BYTES); - if (NULL == pInfo->pDownstreams[i].pFreeBlock) { + pCtx->pFreeBlock = taosArrayInit(10, POINTER_BYTES); + if (NULL == pCtx->pFreeBlock) { return TSDB_CODE_OUT_OF_MEMORY; } - pInfo->pDownstreams[i].pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (pInfo->pDownstreams[i].pWaitSessions == NULL) { + pCtx->pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (pCtx->pWaitSessions == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + + snprintf(pCtx->baseFilename, sizeof(pCtx->baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id); + pCtx->baseFilename[sizeof(pCtx->baseFilename) - 1] = 0; } return TSDB_CODE_SUCCESS; @@ -871,7 +1049,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pInfo->maxCacheSize = -1; + pInfo->maxCacheSize = 104857600; pInfo->grpByUid = pPhyciNode->grpByUid; pInfo->globalGrp = pPhyciNode->globalGrp; pInfo->batchFetch = pPhyciNode->batchFetch;