From 2c1fc501ff4b333130e7683431a1a423ea6e23fb Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 26 Jun 2023 12:28:42 +0000 Subject: [PATCH] add checkpoint --- source/libs/stream/src/streamSnapshot.c | 96 +++++++++++++++++-------- 1 file changed, 66 insertions(+), 30 deletions(-) diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 049939b463..bb976748ff 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -81,7 +81,14 @@ static int64_t kBlockSize = 64 * 1024; int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path); void streamSnapHandleDestroy(SStreamSnapHandle* handle); -int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) { +// static void streamBuildFname(char* path, char* file, char* fullname) + +#define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \ + do { \ + sprintf(fullname, "%s/%s", path, file); \ + } while (0) + +int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { // impl later int32_t code = 0; @@ -91,8 +98,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) { } SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); - handle->checkpointId = 0; - handle->seraial = 0; + pHandle->checkpointId = 0; + pHandle->seraial = 0; pFile->path = taosStrdup(path); pFile->pSst = taosArrayInit(16, sizeof(void*)); @@ -156,19 +163,24 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path) { taosStatFile(pFile->pCheckpointMeta, &item.size, NULL); taosArrayPush(list, &item); - handle->pBackendFile = pFile; + pHandle->pBackendFile = pFile; - handle->currFileIdx = 0; - handle->pFileList = list; - handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_READ); - if (handle->fd == NULL) { + pHandle->currFileIdx = 0; + pHandle->pFileList = list; + + char fullname[256] = {0}; + char* file = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname); + + pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); + if (pHandle->fd == NULL) { goto _err; } - handle->seraial = 0; - handle->offset = 0; + pHandle->seraial = 0; + pHandle->offset = 0; return 0; _err: - streamSnapHandleDestroy(handle); + streamSnapHandleDestroy(pHandle); code = -1; return code; @@ -219,7 +231,9 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si // impl later int32_t code = 0; SStreamSnapHandle* pHandle = &pReader->handle; - SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + SBanckendFile* pFile = pHandle->pBackendFile; + + SBackendFileItem* item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); @@ -234,6 +248,7 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); + pHandle->offset = 0; pHandle->currFileIdx += 1; } } else { @@ -242,7 +257,10 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si return 0; } item = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); - pHandle->fd = taosOpenFile(item->name, TD_FILE_READ); + char fullname[256] = {0}; + STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, item->name, fullname); + pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); + nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); pHandle->offset += nread; } @@ -266,7 +284,7 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna if (pWriter == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - SStreamSnapHandle* handle = &pWriter->handle; + SStreamSnapHandle* pHandle = &pWriter->handle; const char* path = NULL; SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); @@ -278,12 +296,12 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, SStreamSna item.type = ROCKSDB_CURRENT_TYPE; taosArrayPush(list, &item); - handle->pBackendFile = pFile; + pHandle->pBackendFile = pFile; - handle->pFileList = list; - handle->currFileIdx = 0; - handle->offset = 0; - handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, handle->currFileIdx), TD_FILE_WRITE); + pHandle->pFileList = list; + pHandle->currFileIdx = 0; + pHandle->offset = 0; + pHandle->fd = taosOpenFile(taosArrayGet(pHandle->pFileList, pHandle->currFileIdx), TD_FILE_WRITE); *ppWriter = pWriter; return 0; } @@ -292,29 +310,33 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa int32_t code = 0; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; - SStreamSnapHandle* handle = &pWriter->handle; - SBackendFileItem* pItem = taosArrayGetP(handle->pFileList, handle->currFileIdx); + SStreamSnapHandle* pHandle = &pWriter->handle; + SBanckendFile* pFile = pHandle->pBackendFile; + SBackendFileItem* pItem = taosArrayGetP(pHandle->pFileList, pHandle->currFileIdx); if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { - if (taosPWriteFile(handle->fd, pHdr->data, pHdr->size, handle->offset) != pHdr->size) { + if (taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset) != pHdr->size) { code = TAOS_SYSTEM_ERROR(terrno); qError("stream snap failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); return code; } - handle->offset += pHdr->size; + pHandle->offset += pHdr->size; } else { - taosCloseFile(&handle->fd); + taosCloseFile(&pHandle->fd); + pHandle->offset = 0; + pHandle->currFileIdx += 1; SBackendFileItem item; item.name = taosStrdup(pHdr->name); item.type = pHdr->type; - taosArrayPush(handle->pFileList, &item); + taosArrayPush(pHandle->pFileList, &item); - handle->offset = 0; - handle->currFileIdx += 1; - handle->fd = taosOpenFile(taosArrayGet(handle->pFileList, taosArrayGetSize(handle->pFileList) - 1), TD_FILE_WRITE); + char fullname[256] = {0}; + char* name = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, taosArrayGetSize(pHandle->pFileList) - 1))->name; + STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, name, fullname); + pHandle->fd = taosOpenFile(fullname, TD_FILE_WRITE); - taosPWriteFile(handle->fd, pHdr->data, pHdr->size, handle->offset); - handle->offset += pHdr->size; + taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); + pHandle->offset += pHdr->size; } // impl later @@ -322,6 +344,20 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa } int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { SStreamSnapHandle* handle = &pWriter->handle; + if (qDebugFlag & DEBUG_DEBUG) { + char* buf = (char*)taosMemoryMalloc(1024); + int n = sprintf(buf, "["); + for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) { + SBackendFileItem* item = taosArrayGet(handle->pFileList, i); + if (i != taosArrayGetSize(handle->pFileList) - 1) { + n += sprintf(buf + n, "%s %" PRId64 ",", item->name, item->size); + } else { + n += sprintf(buf + n, "%s %" PRId64 "]", item->name, item->size); + } + } + qDebug("stream snap get file list, %s", buf); + taosMemoryFree(buf); + } for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) { SBackendFileItem* item = taosArrayGet(handle->pFileList, i); taosMemoryFree(item->name);