From 0baef335d70fdcd53b86b9600ad19ebd45b5f629 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 11 Oct 2023 16:03:04 +0800 Subject: [PATCH] refactor backend --- source/libs/stream/src/streamSnapshot.c | 125 +++++++++++++++++------- 1 file changed, 89 insertions(+), 36 deletions(-) diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 64ff7b7569..a4363f46b1 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -59,6 +59,7 @@ typedef struct SBackendSnapFiles2 { SArray* pFileList; int32_t currFileIdx; SStreamTaskSnap snapInfo; + int8_t inited; } SBackendSnapFile2; struct SStreamSnapHandle { @@ -71,6 +72,7 @@ struct SStreamSnapHandle { int8_t filetype; SArray* pFileList; int32_t currFileIdx; + char* metaPath; SArray* pBackendSnapSet; int32_t currIdx; @@ -310,6 +312,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { } taosArrayDestroy(handle->pBackendSnapSet); } + taosMemoryFree(handle->metaPath); // if (handle->checkpointId == 0) { // // del tmp dir @@ -455,76 +458,126 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path if (pWriter == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } + SBackendSnapFile2 snapFile = {0}; + SStreamSnapHandle* pHandle = &pWriter->handle; + pHandle->pBackendSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); - SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); - pFile->path = taosStrdup(path); - SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); + taosArrayPush(pHandle->pBackendSnapSet, &snapFile); + pHandle->currIdx = 0; + pHandle->metaPath = taosStrdup(path); - SBackendFileItem item; - item.name = taosStrdup((char*)ROCKSDB_CURRENT); - item.type = ROCKSDB_CURRENT_TYPE; - taosArrayPush(list, &item); + // SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); + // pFile->path = taosStrdup(path); + // SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); - pHandle->pBackendFile = pFile; + // SBackendFileItem item; + // item.name = taosStrdup((char*)ROCKSDB_CURRENT); + // item.type = ROCKSDB_CURRENT_TYPE; + // taosArrayPush(list, &item); - pHandle->pFileList = list; - pHandle->currFileIdx = 0; - pHandle->offset = 0; + // pHandle->pBackendFile = pFile; + + // pHandle->pFileList = list; + // pHandle->currFileIdx = 0; + // pHandle->offset = 0; *ppWriter = pWriter; return 0; } -int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; +int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) { + if (a->streamId != b->streamId || a->taskId != b->taskId || a->chkpId != b->chkpId) { + return 0; + } + return 1; +} +int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, + SBackendSnapFile2* pBackendFile) { + int code = -1; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapHandle* pHandle = &pWriter->handle; - SBanckendFile* pFile = pHandle->pBackendFile; - SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + SStreamTaskSnap snapInfo = pHdr->snapInfo; - if (pHandle->fd == NULL) { - pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - if (pHandle->fd == NULL) { + SStreamTaskSnap* pSnapInfo = &pBackendFile->snapInfo; + + SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx); + + if (pBackendFile->fd == 0) { + pBackendFile->fd = streamOpenFile(pHandle->metaPath, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pBackendFile->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, - tstrerror(code)); + qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP, + pHdr->name, tstrerror(code)); } } - if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { - int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); + int64_t bytes = taosPWriteFile(pBackendFile->fd, pHdr->data, pHdr->size, pBackendFile->offset); if (bytes != pHdr->size) { code = TAOS_SYSTEM_ERROR(terrno); qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); return code; } - pHandle->offset += bytes; + pBackendFile->offset += bytes; } else { - taosCloseFile(&pHandle->fd); - pHandle->offset = 0; - pHandle->currFileIdx += 1; + taosCloseFile(&pBackendFile->fd); + pBackendFile->offset = 0; + pBackendFile->currFileIdx += 1; SBackendFileItem item; item.name = taosStrdup(pHdr->name); item.type = pHdr->type; - taosArrayPush(pHandle->pFileList, &item); + taosArrayPush(pBackendFile->pFileList, &item); - SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); - if (pHandle->fd == NULL) { + SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx); + pBackendFile->fd = streamOpenFile(pHandle->metaPath, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + if (pBackendFile->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, - tstrerror(code)); + qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pBackendFile->path, TD_DIRSEP, + pHdr->name, tstrerror(code)); } - taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); - pHandle->offset += pHdr->size; + taosPWriteFile(pBackendFile->fd, pHdr->data, pHdr->size, pBackendFile->offset); + pBackendFile->offset += pHdr->size; } + code = 0; +_EXIT: + return code; +} +int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { + int32_t code = 0; - // impl later - return 0; + SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; + SStreamSnapHandle* pHandle = &pWriter->handle; + SStreamTaskSnap snapInfo = pHdr->snapInfo; + + SBackendSnapFile2* pBackendFile = taosArrayGet(pHandle->pBackendSnapSet, pHandle->currIdx); + if (pBackendFile->inited == 0) { + pBackendFile->snapInfo = snapInfo; + pBackendFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); + pBackendFile->currFileIdx = 0; + pBackendFile->offset = 0; + + SBackendFileItem item; + item.name = taosStrdup((char*)ROCKSDB_CURRENT); + item.type = ROCKSDB_CURRENT_TYPE; + taosArrayPush(pBackendFile->pFileList, &item); + + pBackendFile->inited = 1; + return streamSnapWriteImpl(pWriter, pData, nData, pBackendFile); + } else { + if (snapInfoEqual(&snapInfo, &pBackendFile->snapInfo)) { + return streamSnapWriteImpl(pWriter, pData, nData, pBackendFile); + } else { + SBackendSnapFile2 snapFile = {0}; + taosArrayPush(pHandle->pBackendSnapSet, &snapFile); + pHandle->currIdx += 1; + + return streamSnapWrite(pWriter, pData, nData); + } + } + return code; } int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { SStreamSnapHandle* handle = &pWriter->handle;