From 861d26e356297994f01d5cc51b873d1d19fa8fad Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 Oct 2023 15:24:50 +0800 Subject: [PATCH] refactor backend --- source/libs/stream/src/streamMeta.c | 14 ++------ source/libs/stream/src/streamSnapshot.c | 44 ++++++++++++++----------- 2 files changed, 27 insertions(+), 31 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f4c8021ac6..9d76951adb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -203,7 +203,7 @@ _EXIT: streamBackendCleanup((void*)pBackend); return code; } -int32_t streamMetaMayConvertBackendFormat(SStreamMeta* pMeta) { +int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { int8_t compatible = streamMetaCheckBackendCompatible(pMeta); if (compatible == STREAM_STATA_COMPATIBLE) { return 0; @@ -263,7 +263,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - if (streamMetaMayConvertBackendFormat(pMeta) < 0) { + if (streamMetaMayCvtDbFormat(pMeta) < 0) { goto _err; } @@ -759,16 +759,6 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) { if (pMeta == NULL) return 0; - // void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); - // while (pIter) { - // STaskDbWrapper* taskBackend = *(STaskDbWrapper**)pIter; - // if (taskBackend != NULL) { - // taskDbRemoveRef(taskBackend); - // } - // pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); - // } - // taosHashClear(pMeta->pTaskBackendUnique); - return streamMetaLoadAllTasks(pMeta); } int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 1717a284bc..112589613f 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -134,6 +134,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { char fullname[256] = {0}; + STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname); return taosOpenFile(fullname, opt); } @@ -469,54 +470,53 @@ int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) { return 1; } -int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, - SBackendSnapFile2* pBackendFile) { +int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, SBackendSnapFile2* pSnapFile) { int code = -1; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapHandle* pHandle = &pWriter->handle; SStreamTaskSnap snapInfo = pHdr->snapInfo; - SStreamTaskSnap* pSnapInfo = &pBackendFile->snapInfo; + SStreamTaskSnap* pSnapInfo = &pSnapFile->snapInfo; - SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx); + SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); - if (pBackendFile->fd == 0) { - pBackendFile->fd = streamOpenFile(pHandle->metaPath, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - if (pBackendFile->fd == NULL) { + if (pSnapFile->fd == 0) { + pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pSnapFile->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP, pHdr->name, tstrerror(code)); } } if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { - int64_t bytes = taosPWriteFile(pBackendFile->fd, pHdr->data, pHdr->size, pBackendFile->offset); + int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); if (bytes != pHdr->size) { code = TAOS_SYSTEM_ERROR(terrno); qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); return code; } - pBackendFile->offset += bytes; + pSnapFile->offset += bytes; } else { - taosCloseFile(&pBackendFile->fd); - pBackendFile->offset = 0; - pBackendFile->currFileIdx += 1; + taosCloseFile(&pSnapFile->fd); + pSnapFile->offset = 0; + pSnapFile->currFileIdx += 1; SBackendFileItem item = {0}; item.name = taosStrdup(pHdr->name); item.type = pHdr->type; - taosArrayPush(pBackendFile->pFileList, &item); + taosArrayPush(pSnapFile->pFileList, &item); - SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx); - pBackendFile->fd = streamOpenFile(pHandle->metaPath, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - if (pBackendFile->fd == NULL) { + SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); + pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pSnapFile->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pBackendFile->path, TD_DIRSEP, + qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP, pHdr->name, tstrerror(code)); } - taosPWriteFile(pBackendFile->fd, pHdr->data, pHdr->size, pBackendFile->offset); - pBackendFile->offset += pHdr->size; + taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); + pSnapFile->offset += pHdr->size; } code = 0; _EXIT: @@ -531,6 +531,11 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx); if (pDbSnapFile->inited == 0) { + char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256); + sprintf(path, "%s%s%" PRId64 "_%" PRId64 "%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, snapInfo.streamId, + snapInfo.taskId, TD_DIRSEP, "state", TD_DIRSEP, "checkpoints", TD_DIRSEP, snapInfo.chkpId); + + pDbSnapFile->path = path; pDbSnapFile->snapInfo = snapInfo; pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); pDbSnapFile->currFileIdx = 0; @@ -539,6 +544,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa SBackendFileItem item = {0}; item.name = taosStrdup((char*)ROCKSDB_CURRENT); item.type = ROCKSDB_CURRENT_TYPE; + taosArrayPush(pDbSnapFile->pFileList, &item); pDbSnapFile->inited = 1;