diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index d33f018a2c..16b0ae0406 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -32,6 +32,7 @@ typedef struct SBackendFileItem { char* name; int8_t type; int64_t size; + int8_t ref; } SBackendFileItem; typedef struct SBackendFile { char* pCurrent; @@ -74,7 +75,7 @@ struct SStreamSnapHandle { int32_t currFileIdx; char* metaPath; - SArray* pBackendSnapSet; + SArray* pDbSnapSet; int32_t currIdx; }; struct SStreamSnapBlockHdr { @@ -142,9 +143,11 @@ int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { char* buf = taosMemoryCalloc(1, 512); - sprintf(buf, "[current: %s,", pSnapFile->pCurrent); - sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest); - sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions); + sprintf(buf + strlen(buf), "["); + + if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent); + if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest); + if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions); if (pSnapFile->pSst) { for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { char* name = taosArrayGetP(pSnapFile->pSst, i); @@ -160,7 +163,8 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { } int32_t snapFileCvtMeta(SBackendSnapFile2* pSnapFile) { - SBackendFileItem item; + SBackendFileItem item = {0}; + item.ref = 1; // current item.name = pSnapFile->pCurrent; item.type = ROCKSDB_CURRENT_TYPE; @@ -270,6 +274,13 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { char* sst = taosArrayGetP(pSnap->pSst, i); taosMemoryFree(sst); } + // unite read/write snap file + for (int i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) { + SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i); + if (pItem->ref == 0) { + taosMemoryFree(pItem->name); + } + } taosArrayDestroy(pSnap->pFileList); taosArrayDestroy(pSnap->pSst); taosCloseFile(&pSnap->fd); @@ -285,7 +296,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk return -1; } - SArray* pBdSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); + SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); @@ -293,10 +304,10 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk SBackendSnapFile2 snapFile = {0}; code = streamBackendSnapInitFile(path, pSnap, &snapFile); ASSERT(code == 0); - taosArrayPush(pBdSnapSet, &snapFile); + taosArrayPush(pDbSnapSet, &snapFile); } - pHandle->pBackendSnapSet = pBdSnapSet; + pHandle->pDbSnapSet = pDbSnapSet; pHandle->currIdx = 0; return 0; @@ -308,40 +319,15 @@ _err: } void streamSnapHandleDestroy(SStreamSnapHandle* handle) { - // SBanckendFile* pFile = handle->pBackendFile; - if (handle->pBackendSnapSet) { - for (int i = 0; i < taosArrayGetSize(handle->pBackendSnapSet); i++) { - SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pBackendSnapSet, i); + if (handle->pDbSnapSet) { + for (int i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) { + SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pDbSnapSet, i); snapFileDebugInfo(pSnapFile); snapFileDestroy(pSnapFile); } - taosArrayDestroy(handle->pBackendSnapSet); + taosArrayDestroy(handle->pDbSnapSet); } taosMemoryFree(handle->metaPath); - - // if (handle->checkpointId == 0) { - // // del tmp dir - // if (pFile && taosIsDir(pFile->path)) { - // taosRemoveDir(pFile->path); - // } - // } else { - // streamBackendDelInUseChkp(handle->handle, handle->checkpointId); - // } - // if (pFile) { - // taosMemoryFree(pFile->pCheckpointMeta); - // taosMemoryFree(pFile->pCurrent); - // taosMemoryFree(pFile->pMainfest); - // taosMemoryFree(pFile->pOptions); - // taosMemoryFree(pFile->path); - // for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { - // char* sst = taosArrayGetP(pFile->pSst, i); - // taosMemoryFree(sst); - // } - // taosArrayDestroy(pFile->pSst); - // taosMemoryFree(pFile); - // } - // taosArrayDestroy(handle->pFileList); - // taosCloseFile(&handle->fd); return; } @@ -374,17 +360,17 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si int32_t code = 0; SStreamSnapHandle* pHandle = &pReader->handle; int32_t idx = pHandle->currIdx; - SBackendSnapFile2* pSnapFile = taosArrayGet(pHandle->pBackendSnapSet, idx); + SBackendSnapFile2* pSnapFile = taosArrayGet(pHandle->pDbSnapSet, idx); SBackendFileItem* item = NULL; _NEXT: if (pSnapFile->fd == NULL) { if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) { - if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pBackendSnapSet)) { + if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pDbSnapSet)) { pHandle->currIdx += 1; - pSnapFile = taosArrayGet(pHandle->pBackendSnapSet, pHandle->currIdx); + pSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx); goto _NEXT; } else { *ppData = NULL; @@ -466,27 +452,12 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path SBackendSnapFile2 snapFile = {0}; SStreamSnapHandle* pHandle = &pWriter->handle; - pHandle->pBackendSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); + pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); - taosArrayPush(pHandle->pBackendSnapSet, &snapFile); + taosArrayPush(pHandle->pDbSnapSet, &snapFile); pHandle->currIdx = 0; pHandle->metaPath = taosStrdup(path); - // SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); - // pFile->path = taosStrdup(path); - // SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); - - // SBackendFileItem item; - // item.name = taosStrdup((char*)ROCKSDB_CURRENT); - // item.type = ROCKSDB_CURRENT_TYPE; - // taosArrayPush(list, &item); - - // pHandle->pBackendFile = pFile; - - // pHandle->pFileList = list; - // pHandle->currFileIdx = 0; - // pHandle->offset = 0; - *ppWriter = pWriter; return 0; } @@ -530,9 +501,10 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t pBackendFile->offset = 0; pBackendFile->currFileIdx += 1; - SBackendFileItem item; + SBackendFileItem item = {0}; item.name = taosStrdup(pHdr->name); item.type = pHdr->type; + taosArrayPush(pBackendFile->pFileList, &item); SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx); @@ -557,26 +529,26 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa SStreamSnapHandle* pHandle = &pWriter->handle; SStreamTaskSnap snapInfo = pHdr->snapInfo; - SBackendSnapFile2* pBackendFile = taosArrayGet(pHandle->pBackendSnapSet, pHandle->currIdx); - if (pBackendFile->inited == 0) { - pBackendFile->snapInfo = snapInfo; - pBackendFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); - pBackendFile->currFileIdx = 0; - pBackendFile->offset = 0; + SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx); + if (pDbSnapFile->inited == 0) { + pDbSnapFile->snapInfo = snapInfo; + pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); + pDbSnapFile->currFileIdx = 0; + pDbSnapFile->offset = 0; - SBackendFileItem item; + SBackendFileItem item = {0}; item.name = taosStrdup((char*)ROCKSDB_CURRENT); item.type = ROCKSDB_CURRENT_TYPE; - taosArrayPush(pBackendFile->pFileList, &item); + taosArrayPush(pDbSnapFile->pFileList, &item); - pBackendFile->inited = 1; - return streamSnapWriteImpl(pWriter, pData, nData, pBackendFile); + pDbSnapFile->inited = 1; + return streamSnapWriteImpl(pWriter, pData, nData, pDbSnapFile); } else { - if (snapInfoEqual(&snapInfo, &pBackendFile->snapInfo)) { - return streamSnapWriteImpl(pWriter, pData, nData, pBackendFile); + if (snapInfoEqual(&snapInfo, &pDbSnapFile->snapInfo)) { + return streamSnapWriteImpl(pWriter, pData, nData, pDbSnapFile); } else { SBackendSnapFile2 snapFile = {0}; - taosArrayPush(pHandle->pBackendSnapSet, &snapFile); + taosArrayPush(pHandle->pDbSnapSet, &snapFile); pHandle->currIdx += 1; return streamSnapWrite(pWriter, pData, nData); @@ -585,8 +557,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa return code; } int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { - SStreamSnapHandle* handle = &pWriter->handle; - streamSnapHandleDestroy(handle); + if (pWriter == NULL) return 0; + streamSnapHandleDestroy(&pWriter->handle); taosMemoryFree(pWriter); return 0;