From d081c73a17c3516e0f7f53d20f54ca6a252bd557 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 11 Oct 2024 14:27:58 +0800 Subject: [PATCH] az: interfaces for stream checkpoints --- source/libs/azure/src/az.cpp | 116 +++++++++++++++++++++++++++++++++-- 1 file changed, 112 insertions(+), 4 deletions(-) diff --git a/source/libs/azure/src/az.cpp b/source/libs/azure/src/az.cpp index 83cba1c877..453740224d 100644 --- a/source/libs/azure/src/az.cpp +++ b/source/libs/azure/src/az.cpp @@ -365,13 +365,121 @@ void azDeleteObjectsByPrefix(const char *prefix) { } } -int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; } +int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp) { + int32_t code = 0, lino = 0; + uint64_t contentLength = 0; -int32_t azGetObjectsByPrefix(const char *prefix, const char *path) { return 0; } + if (taosStatFile(file, (int64_t *)&contentLength, NULL, NULL) < 0) { + uError("ERROR: %s Failed to stat file %s: ", __func__, file); + TAOS_RETURN(terrno); + } -int32_t azGetObjectToFile(const char *object_name, const char *fileName) { return 0; } + code = azPutObjectFromFileOffset(file, object, 0, contentLength); + if (code != 0) { + uError("ERROR: %s Failed to put file %s: ", __func__, file); + TAOS_CHECK_GOTO(code, &lino, _exit); + } -int32_t azDeleteObjects(const char *object_name[], int nobject) { return 0; } +_exit: + if (code) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + + return 0; +} + +int32_t azGetObjectToFile(const char *object_name, const char *fileName) { + int32_t code = TSDB_CODE_SUCCESS; + std::string accountName = tsS3AccessKeyId[0]; + std::string accountKey = tsS3AccessKeySecret[0]; + std::string accountURL = tsS3Hostname[0]; + accountURL = "https://" + accountURL; + + try { + auto sharedKeyCredential = std::make_shared(accountName, accountKey); + + StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey); + + BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential); + + std::string containerName = tsS3BucketName; + auto containerClient = blobServiceClient.GetBlobContainerClient(containerName); + + TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name)); + + auto res = blobClient.DownloadTo(fileName); + if (res.Value.ContentRange.Length.Value() <= 0) { + code = TAOS_SYSTEM_ERROR(EIO); + uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + TAOS_RETURN(code); + } + } catch (const Azure::Core::RequestFailedException &e) { + uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast(e.StatusCode), + e.ReasonPhrase.c_str()); + code = TAOS_SYSTEM_ERROR(EIO); + uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + TAOS_RETURN(code); + } + + TAOS_RETURN(code); +} + +int32_t azGetObjectsByPrefix(const char *prefix, const char *path) { + const std::string delimiter = "/"; + std::string accountName = tsS3AccessKeyId[0]; + std::string accountKey = tsS3AccessKeySecret[0]; + std::string accountURL = tsS3Hostname[0]; + accountURL = "https://" + accountURL; + + try { + auto sharedKeyCredential = std::make_shared(accountName, accountKey); + + StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey); + + BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential); + + std::string containerName = tsS3BucketName; + auto containerClient = blobServiceClient.GetBlobContainerClient(containerName); + + Azure::Storage::Blobs::ListBlobsOptions options; + options.Prefix = prefix; + + std::set listBlobs; + for (auto pageResult = containerClient.ListBlobs(options); pageResult.HasPage(); pageResult.MoveToNextPage()) { + for (const auto &blob : pageResult.Blobs) { + listBlobs.insert(blob.Name); + } + } + + for (auto blobName : listBlobs) { + const char *tmp = strchr(blobName.c_str(), '/'); + tmp = (tmp == NULL) ? blobName.c_str() : tmp + 1; + char fileName[PATH_MAX] = {0}; + if (path[strlen(path) - 1] != TD_DIRSEP_CHAR) { + (void)snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp); + } else { + (void)snprintf(fileName, PATH_MAX, "%s%s", path, tmp); + } + if (!azGetObjectToFile(blobName.c_str(), fileName)) { + TAOS_RETURN(TSDB_CODE_FAILED); + } + } + } catch (const Azure::Core::RequestFailedException &e) { + uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast(e.StatusCode), + e.ReasonPhrase.c_str()); + TAOS_RETURN(TSDB_CODE_FAILED); + } + + return 0; +} + +int32_t azDeleteObjects(const char *object_name[], int nobject) { + for (int i = 0; i < nobject; ++i) { + azDeleteObjectsByPrefix(object_name[i]); + } + + return 0; +} #else