az: interfaces for stream checkpoints
This commit is contained in:
parent
4d0ca2c4d4
commit
d081c73a17
|
@ -365,13 +365,121 @@ void azDeleteObjectsByPrefix(const char *prefix) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; }
|
||||
int32_t azPutObjectFromFile2(const char *file, const char *object, int8_t withcp) {
|
||||
int32_t code = 0, lino = 0;
|
||||
uint64_t contentLength = 0;
|
||||
|
||||
int32_t azGetObjectsByPrefix(const char *prefix, const char *path) { return 0; }
|
||||
if (taosStatFile(file, (int64_t *)&contentLength, NULL, NULL) < 0) {
|
||||
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
|
||||
TAOS_RETURN(terrno);
|
||||
}
|
||||
|
||||
int32_t azGetObjectToFile(const char *object_name, const char *fileName) { return 0; }
|
||||
code = azPutObjectFromFileOffset(file, object, 0, contentLength);
|
||||
if (code != 0) {
|
||||
uError("ERROR: %s Failed to put file %s: ", __func__, file);
|
||||
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||
}
|
||||
|
||||
int32_t azDeleteObjects(const char *object_name[], int nobject) { return 0; }
|
||||
_exit:
|
||||
if (code) {
|
||||
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t azGetObjectToFile(const char *object_name, const char *fileName) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
std::string accountName = tsS3AccessKeyId[0];
|
||||
std::string accountKey = tsS3AccessKeySecret[0];
|
||||
std::string accountURL = tsS3Hostname[0];
|
||||
accountURL = "https://" + accountURL;
|
||||
|
||||
try {
|
||||
auto sharedKeyCredential = std::make_shared<StorageSharedKeyCredential>(accountName, accountKey);
|
||||
|
||||
StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
|
||||
|
||||
BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential);
|
||||
|
||||
std::string containerName = tsS3BucketName;
|
||||
auto containerClient = blobServiceClient.GetBlobContainerClient(containerName);
|
||||
|
||||
TDBlockBlobClient blobClient(containerClient.GetBlobClient(object_name));
|
||||
|
||||
auto res = blobClient.DownloadTo(fileName);
|
||||
if (res.Value.ContentRange.Length.Value() <= 0) {
|
||||
code = TAOS_SYSTEM_ERROR(EIO);
|
||||
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
} catch (const Azure::Core::RequestFailedException &e) {
|
||||
uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast<int>(e.StatusCode),
|
||||
e.ReasonPhrase.c_str());
|
||||
code = TAOS_SYSTEM_ERROR(EIO);
|
||||
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t azGetObjectsByPrefix(const char *prefix, const char *path) {
|
||||
const std::string delimiter = "/";
|
||||
std::string accountName = tsS3AccessKeyId[0];
|
||||
std::string accountKey = tsS3AccessKeySecret[0];
|
||||
std::string accountURL = tsS3Hostname[0];
|
||||
accountURL = "https://" + accountURL;
|
||||
|
||||
try {
|
||||
auto sharedKeyCredential = std::make_shared<StorageSharedKeyCredential>(accountName, accountKey);
|
||||
|
||||
StorageSharedKeyCredential *pSharedKeyCredential = new StorageSharedKeyCredential(accountName, accountKey);
|
||||
|
||||
BlobServiceClient blobServiceClient(accountURL, sharedKeyCredential);
|
||||
|
||||
std::string containerName = tsS3BucketName;
|
||||
auto containerClient = blobServiceClient.GetBlobContainerClient(containerName);
|
||||
|
||||
Azure::Storage::Blobs::ListBlobsOptions options;
|
||||
options.Prefix = prefix;
|
||||
|
||||
std::set<std::string> listBlobs;
|
||||
for (auto pageResult = containerClient.ListBlobs(options); pageResult.HasPage(); pageResult.MoveToNextPage()) {
|
||||
for (const auto &blob : pageResult.Blobs) {
|
||||
listBlobs.insert(blob.Name);
|
||||
}
|
||||
}
|
||||
|
||||
for (auto blobName : listBlobs) {
|
||||
const char *tmp = strchr(blobName.c_str(), '/');
|
||||
tmp = (tmp == NULL) ? blobName.c_str() : tmp + 1;
|
||||
char fileName[PATH_MAX] = {0};
|
||||
if (path[strlen(path) - 1] != TD_DIRSEP_CHAR) {
|
||||
(void)snprintf(fileName, PATH_MAX, "%s%s%s", path, TD_DIRSEP, tmp);
|
||||
} else {
|
||||
(void)snprintf(fileName, PATH_MAX, "%s%s", path, tmp);
|
||||
}
|
||||
if (!azGetObjectToFile(blobName.c_str(), fileName)) {
|
||||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
}
|
||||
}
|
||||
} catch (const Azure::Core::RequestFailedException &e) {
|
||||
uError("%s failed at line %d since %d(%s)", __func__, __LINE__, static_cast<int>(e.StatusCode),
|
||||
e.ReasonPhrase.c_str());
|
||||
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t azDeleteObjects(const char *object_name[], int nobject) {
|
||||
for (int i = 0; i < nobject; ++i) {
|
||||
azDeleteObjectsByPrefix(object_name[i]);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
|
|
Loading…
Reference in New Issue