Merge pull request #23891 from taosdata/enh/TD-27119

enh(cos/put): put object with cp
This commit is contained in:
wade zhang 2023-12-12 08:35:26 +08:00 committed by GitHub
commit e2e2488fa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 892 additions and 209 deletions

View File

@ -34,7 +34,7 @@ extern int32_t tsS3UploadDelaySec;
int32_t s3Init();
void s3CleanUp();
int32_t s3PutObjectFromFile(const char *file, const char *object);
int32_t s3PutObjectFromFile2(const char *file, const char *object);
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp);
void s3DeleteObjectsByPrefix(const char *prefix);
void s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name);

83
include/common/cos_cp.h Normal file
View File

@ -0,0 +1,83 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_COS_CP_H_
#define _TD_COMMON_COS_CP_H_
#include "os.h"
#include "tdef.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
COS_CP_TYPE_UPLOAD, // upload
COS_CP_TYPE_DOWNLOAD // download
} ECpType;
typedef struct {
int32_t index; // the index of part, start from 0
int64_t offset; // the offset point of part
int64_t size; // the size of part
int completed; // COS_TRUE completed, COS_FALSE uncompleted
char etag[128]; // the etag of part, for upload
uint64_t crc64;
} SCheckpointPart;
typedef struct {
ECpType cp_type; // 0 upload, 1 download
char md5[64]; // the md5 of checkout content
TdFilePtr thefile; // the handle of checkpoint file
char file_path[TSDB_FILENAME_LEN]; // local file path
int64_t file_size; // local file size, for upload
int32_t file_last_modified; // local file last modified time, for upload
char file_md5[64]; // md5 of the local file content, for upload, reserved
char object_name[128]; // object name
int64_t object_size; // object size, for download
char object_last_modified[64]; // object last modified time, for download
char object_etag[128]; // object etag, for download
char upload_id[128]; // upload id
int part_num; // the total number of parts
int64_t part_size; // the part size, byte
SCheckpointPart* parts; // the parts of local or object, from 0
} SCheckpoint;
int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint);
void cos_cp_close(TdFilePtr fd);
void cos_cp_remove(char const* filepath);
int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint);
int32_t cos_cp_dump(SCheckpoint* checkpoint);
void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes);
void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64);
void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime,
char const* upload_id, int64_t part_size);
bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime);
void cos_cp_build_download(SCheckpoint* checkpoint, char const* filepath, char const* object_name, int64_t object_size,
char const* object_lmtime, char const* object_etag, int64_t part_size);
bool cos_cp_is_valid_download(SCheckpoint* checkpoint, char const* object_name, int64_t object_size,
char const* object_lmtime, char const* object_etag);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_COS_CP_H_*/

View File

@ -1,6 +1,8 @@
#define ALLOW_FORBID_FUNC
#include "cos.h"
#include "cos_cp.h"
#include "tdef.h"
extern char tsS3Endpoint[];
extern char tsS3AccessKeyId[];
@ -86,7 +88,7 @@ typedef struct {
char err_msg[128];
S3Status status;
uint64_t content_length;
char * buf;
char *buf;
int64_t buf_pos;
} TS3SizeCBD;
@ -270,7 +272,7 @@ typedef struct list_parts_callback_data {
typedef struct MultipartPartData {
put_object_callback_data put_object_data;
int seq;
UploadManager * manager;
UploadManager *manager;
} MultipartPartData;
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) {
@ -317,12 +319,23 @@ S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properti
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
const char * etag = properties->eTag;
const char *etag = properties->eTag;
data->manager->etags[seq - 1] = strdup(etag);
data->manager->next_etags_pos = seq;
return S3StatusOK;
}
S3Status MultipartResponseProperiesCallbackWithCp(const S3ResponseProperties *properties, void *callbackData) {
responsePropertiesCallbackNull(properties, callbackData);
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
const char *etag = properties->eTag;
data->manager->etags[seq - 1] = strdup(etag);
// data->manager->next_etags_pos = seq;
return S3StatusOK;
}
static int multipartPutXmlCallback(int bufferSize, char *buffer, void *callbackData) {
UploadManager *manager = (UploadManager *)callbackData;
int ret = 0;
@ -446,37 +459,335 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan
return 0;
}
*/
int32_t s3PutObjectFromFile2(const char *file, const char *object) {
int32_t code = 0;
const char *key = object;
// const char *uploadId = 0;
const char * filename = 0;
static int32_t s3PutObjectFromFileSimple(S3BucketContext *bucket_context, char const *object_name, int64_t size,
S3PutProperties *put_prop, put_object_callback_data *data) {
int32_t code = 0;
S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&putObjectDataCallback};
do {
S3_put_object(bucket_context, object_name, size, put_prop, 0, 0, &putObjectHandler, data);
} while (S3_status_is_retryable(data->status) && should_retry());
if (data->status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, data->status, data->err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
} else if (data->contentLength) {
uError("%s Failed to read remaining %llu bytes from input", __func__, (unsigned long long)data->contentLength);
code = TAOS_SYSTEM_ERROR(EIO);
}
return code;
}
static int32_t s3PutObjectFromFileWithoutCp(S3BucketContext *bucket_context, char const *object_name,
int64_t contentLength, S3PutProperties *put_prop,
put_object_callback_data *data) {
int32_t code = 0;
uint64_t totalContentLength = contentLength;
uint64_t todoContentLength = contentLength;
UploadManager manager = {0};
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
MultipartPartData partData;
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&initial_multipart_callback};
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *));
manager.next_etags_pos = 0;
do {
S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
upload:
todoContentLength -= chunk_size * manager.next_etags_pos;
for (int seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) {
partData.manager = &manager;
partData.seq = seq;
if (partData.put_object_data.gb == NULL) {
partData.put_object_data = *data;
}
partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength);
// printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength);
partData.put_object_data.contentLength = partContentLength;
partData.put_object_data.originalContentLength = partContentLength;
partData.put_object_data.totalContentLength = todoContentLength;
partData.put_object_data.totalOriginalContentLength = totalContentLength;
put_prop->md5 = 0;
do {
S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id,
partContentLength, 0, timeoutMsG, &partData);
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
if (partData.put_object_data.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
contentLength -= chunk_size;
todoContentLength -= chunk_size;
}
int i;
int size = 0;
size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
char buf[256];
int n;
for (i = 0; i < totalSeq; i++) {
if (!manager.etags[i]) {
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
n = snprintf(buf, sizeof(buf),
"<Part><PartNumber>%d</PartNumber>"
"<ETag>%s</ETag></Part>",
i + 1, manager.etags[i]);
size += growbuffer_append(&(manager.gb), buf, n);
}
size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
manager.remaining = size;
do {
S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0,
timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
clean:
if (manager.upload_id) {
taosMemoryFree(manager.upload_id);
}
for (i = 0; i < manager.next_etags_pos; i++) {
taosMemoryFree(manager.etags[i]);
}
growbuffer_destroy(manager.gb);
taosMemoryFree(manager.etags);
return code;
}
static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int32_t lmtime,
char const *object_name, int64_t contentLength, S3PutProperties *put_prop,
put_object_callback_data *data) {
int32_t code = 0;
uint64_t totalContentLength = contentLength;
// uint64_t todoContentLength = contentLength;
UploadManager manager = {0};
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
}
bool need_init_upload = true;
char file_cp_path[TSDB_FILENAME_LEN];
snprintf(file_cp_path, TSDB_FILENAME_LEN, "%s.cp", file);
SCheckpoint cp = {0};
cp.parts = taosMemoryCalloc(max_part_num, sizeof(SCheckpointPart));
if (taosCheckExistFile(file_cp_path)) {
if (!cos_cp_load(file_cp_path, &cp) && cos_cp_is_valid_upload(&cp, contentLength, lmtime)) {
manager.upload_id = strdup(cp.upload_id);
need_init_upload = false;
} else {
cos_cp_remove(file_cp_path);
}
}
if (need_init_upload) {
S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&initial_multipart_callback};
do {
S3_initiate_multipart(bucket_context, object_name, 0, &handler, 0, timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
cos_cp_build_upload(&cp, file, contentLength, lmtime, manager.upload_id, chunk_size);
}
if (cos_cp_open(file_cp_path, &cp)) {
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
int part_num = 0;
int64_t consume_bytes = 0;
// SCheckpointPart *parts = taosMemoryCalloc(cp.part_num, sizeof(SCheckpointPart));
// cos_cp_get_undo_parts(&cp, &part_num, parts, &consume_bytes);
MultipartPartData partData;
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallbackWithCp, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *));
manager.next_etags_pos = 0;
upload:
// todoContentLength -= chunk_size * manager.next_etags_pos;
for (int i = 0; i < cp.part_num; ++i) {
if (cp.parts[i].completed) {
continue;
}
int seq = cp.parts[i].index + 1;
partData.manager = &manager;
partData.seq = seq;
if (partData.put_object_data.gb == NULL) {
partData.put_object_data = *data;
}
partContentLength = cp.parts[i].size;
partData.put_object_data.contentLength = partContentLength;
partData.put_object_data.originalContentLength = partContentLength;
// partData.put_object_data.totalContentLength = todoContentLength;
partData.put_object_data.totalOriginalContentLength = totalContentLength;
put_prop->md5 = 0;
do {
S3_upload_part(bucket_context, object_name, put_prop, &putObjectHandler, seq, manager.upload_id,
partContentLength, 0, timeoutMsG, &partData);
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
if (partData.put_object_data.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
//(void)cos_cp_dump(&cp);
goto clean;
}
if (!manager.etags[seq - 1]) {
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
cos_cp_update(&cp, cp.parts[seq - 1].index, manager.etags[seq - 1], 0);
(void)cos_cp_dump(&cp);
contentLength -= chunk_size;
// todoContentLength -= chunk_size;
}
cos_cp_close(cp.thefile);
int size = 0;
size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
char buf[256];
int n;
for (int i = 0; i < cp.part_num; ++i) {
n = snprintf(buf, sizeof(buf),
"<Part><PartNumber>%d</PartNumber>"
"<ETag>%s</ETag></Part>",
// i + 1, manager.etags[i]);
cp.parts[i].index + 1, cp.parts[i].etag);
size += growbuffer_append(&(manager.gb), buf, n);
}
size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
manager.remaining = size;
do {
S3_complete_multipart_upload(bucket_context, object_name, &commit_handler, manager.upload_id, manager.remaining, 0,
timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
cos_cp_remove(file_cp_path);
clean:
/*
if (parts) {
taosMemoryFree(parts);
}
*/
if (cp.thefile) {
cos_cp_close(cp.thefile);
}
if (cp.parts) {
taosMemoryFree(cp.parts);
}
if (manager.upload_id) {
taosMemoryFree(manager.upload_id);
}
for (int i = 0; i < cp.part_num; ++i) {
if (manager.etags[i]) {
taosMemoryFree(manager.etags[i]);
}
}
taosMemoryFree(manager.etags);
growbuffer_destroy(manager.gb);
return code;
}
int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) {
int32_t code = 0;
int32_t lmtime = 0;
const char *filename = 0;
uint64_t contentLength = 0;
const char * cacheControl = 0, *contentType = 0, *md5 = 0;
const char * contentDispositionFilename = 0, *contentEncoding = 0;
const char *cacheControl = 0, *contentType = 0, *md5 = 0;
const char *contentDispositionFilename = 0, *contentEncoding = 0;
int64_t expires = -1;
S3CannedAcl cannedAcl = S3CannedAclPrivate;
int metaPropertiesCount = 0;
S3NameValue metaProperties[S3_MAX_METADATA_COUNT];
char useServerSideEncryption = 0;
put_object_callback_data data = {0};
// int noStatus = 0;
// data.infile = 0;
// data.gb = 0;
// data.infileFD = NULL;
// data.noStatus = noStatus;
// uError("ERROR: %s stat file %s: ", __func__, file);
if (taosStatFile(file, &contentLength, NULL, NULL) < 0) {
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
return code;
}
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
uError("ERROR: %s Failed to open file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
uError("ERROR: %s Failed to open file %s: ", __func__, file);
return code;
}
@ -493,143 +804,13 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
metaProperties, useServerSideEncryption};
if (contentLength <= MULTIPART_CHUNK_SIZE) {
S3PutObjectHandler putObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&putObjectDataCallback};
do {
S3_put_object(&bucketContext, key, contentLength, &putProperties, 0, 0, &putObjectHandler, &data);
} while (S3_status_is_retryable(data.status) && should_retry());
if (data.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, data.status, data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
} else if (data.contentLength) {
uError("ERROR: %s Failed to read remaining %llu bytes from input", __func__,
(unsigned long long)data.contentLength);
code = TAOS_SYSTEM_ERROR(EIO);
}
code = s3PutObjectFromFileSimple(&bucketContext, object_name, contentLength, &putProperties, &data);
} else {
uint64_t totalContentLength = contentLength;
uint64_t todoContentLength = contentLength;
UploadManager manager = {0};
// manager.upload_id = 0;
// manager.gb = 0;
// div round up
int seq;
uint64_t chunk_size = MULTIPART_CHUNK_SIZE >> 3;
int totalSeq = (contentLength + chunk_size - 1) / chunk_size;
const int max_part_num = 10000;
if (totalSeq > max_part_num) {
chunk_size = (contentLength + max_part_num - contentLength % max_part_num) / max_part_num;
totalSeq = (contentLength + chunk_size - 1) / chunk_size;
if (withcp) {
code = s3PutObjectFromFileWithCp(&bucketContext, file, lmtime, object_name, contentLength, &putProperties, &data);
} else {
code = s3PutObjectFromFileWithoutCp(&bucketContext, object_name, contentLength, &putProperties, &data);
}
MultipartPartData partData;
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3MultipartInitialHandler handler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&initial_multipart_callback};
S3PutObjectHandler putObjectHandler = {{&MultipartResponseProperiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallbackNull, &responseCompleteCallback}, &multipartPutXmlCallback, 0};
manager.etags = (char **)taosMemoryCalloc(totalSeq, sizeof(char *));
manager.next_etags_pos = 0;
/*
if (uploadId) {
manager.upload_id = strdup(uploadId);
manager.remaining = contentLength;
if (!try_get_parts_info(tsS3BucketName, key, &manager)) {
fseek(data.infile, -(manager.remaining), 2);
taosLSeekFile(data.infileFD, -(manager.remaining), SEEK_END);
contentLength = manager.remaining;
goto upload;
} else {
goto clean;
}
}
*/
do {
S3_initiate_multipart(&bucketContext, key, 0, &handler, 0, timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
upload:
todoContentLength -= chunk_size * manager.next_etags_pos;
for (seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++) {
partData.manager = &manager;
partData.seq = seq;
if (partData.put_object_data.gb == NULL) {
partData.put_object_data = data;
}
partContentLength = ((contentLength > chunk_size) ? chunk_size : contentLength);
// printf("%s Part Seq %d, length=%d\n", srcSize ? "Copying" : "Sending", seq, partContentLength);
partData.put_object_data.contentLength = partContentLength;
partData.put_object_data.originalContentLength = partContentLength;
partData.put_object_data.totalContentLength = todoContentLength;
partData.put_object_data.totalOriginalContentLength = totalContentLength;
putProperties.md5 = 0;
do {
S3_upload_part(&bucketContext, key, &putProperties, &putObjectHandler, seq, manager.upload_id,
partContentLength, 0, timeoutMsG, &partData);
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
if (partData.put_object_data.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
contentLength -= chunk_size;
todoContentLength -= chunk_size;
}
int i;
int size = 0;
size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>", strlen("<CompleteMultipartUpload>"));
char buf[256];
int n;
for (i = 0; i < totalSeq; i++) {
if (!manager.etags[i]) {
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
n = snprintf(buf, sizeof(buf),
"<Part><PartNumber>%d</PartNumber>"
"<ETag>%s</ETag></Part>",
i + 1, manager.etags[i]);
size += growbuffer_append(&(manager.gb), buf, n);
}
size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>", strlen("</CompleteMultipartUpload>"));
manager.remaining = size;
do {
S3_complete_multipart_upload(&bucketContext, key, &commit_handler, manager.upload_id, manager.remaining, 0,
timeoutMsG, &manager);
} while (S3_status_is_retryable(manager.status) && should_retry());
if (manager.status != S3StatusOK) {
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
code = TAOS_SYSTEM_ERROR(EIO);
goto clean;
}
clean:
if (manager.upload_id) {
taosMemoryFree(manager.upload_id);
}
for (i = 0; i < manager.next_etags_pos; i++) {
taosMemoryFree(manager.etags[i]);
}
growbuffer_destroy(manager.gb);
taosMemoryFree(manager.etags);
}
if (data.infileFD) {
@ -648,7 +829,7 @@ typedef struct list_bucket_callback_data {
char nextMarker[1024];
int keyCount;
int allDetails;
SArray * objectArray;
SArray *objectArray;
} list_bucket_callback_data;
static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount,
@ -693,11 +874,11 @@ static void s3FreeObjectKey(void *pItem) {
static SArray *getListByPrefix(const char *prefix) {
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
0, awsRegionG};
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&listBucketCallback};
const char * marker = 0, *delimiter = 0;
const char *marker = 0, *delimiter = 0;
int maxkeys = 0, allDetails = 0;
list_bucket_callback_data data;
data.objectArray = taosArrayInit(32, sizeof(void *));
@ -738,7 +919,7 @@ static SArray *getListByPrefix(const char *prefix) {
void s3DeleteObjects(const char *object_name[], int nobject) {
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
0, awsRegionG};
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
for (int i = 0; i < nobject; ++i) {
@ -789,7 +970,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
const char *ifMatch = 0, *ifNotMatch = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
0, awsRegionG};
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
&getObjectDataCallback};
@ -827,7 +1008,7 @@ int32_t s3GetObjectToFile(const char *object_name, char *fileName) {
const char *ifMatch = 0, *ifNotMatch = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
0, awsRegionG};
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&getObjectCallback};
@ -858,7 +1039,7 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
if (objectArray == NULL) return -1;
for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) {
char * object = taosArrayGetP(objectArray, i);
char *object = taosArrayGetP(objectArray, i);
const char *tmp = strchr(object, '/');
tmp = (tmp == NULL) ? object : tmp + 1;
char fileName[PATH_MAX] = {0};
@ -949,12 +1130,12 @@ static void s3InitRequestOptions(cos_request_options_t *options, int is_cname) {
int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
int32_t code = 0;
cos_pool_t * p = NULL;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t * s = NULL;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket, object, file;
cos_table_t * resp_headers;
cos_table_t *resp_headers;
// int traffic_limit = 0;
cos_pool_create(&p, NULL);
@ -983,18 +1164,19 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
return code;
}
int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) {
int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str, int8_t withcp) {
int32_t code = 0;
cos_pool_t * p = NULL;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t * s = NULL;
cos_request_options_t * options = NULL;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket, object, file;
cos_table_t * resp_headers;
cos_table_t *resp_headers;
int traffic_limit = 0;
cos_table_t * headers = NULL;
cos_table_t *headers = NULL;
cos_resumable_clt_params_t *clt_params = NULL;
(void)withcp;
cos_pool_create(&p, NULL);
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
@ -1025,11 +1207,11 @@ int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) {
}
void s3DeleteObjectsByPrefix(const char *prefix_str) {
cos_pool_t * p = NULL;
cos_pool_t *p = NULL;
cos_request_options_t *options = NULL;
int is_cname = 0;
cos_string_t bucket;
cos_status_t * s = NULL;
cos_status_t *s = NULL;
cos_string_t prefix;
cos_pool_create(&p, NULL);
@ -1044,10 +1226,10 @@ void s3DeleteObjectsByPrefix(const char *prefix_str) {
}
void s3DeleteObjects(const char *object_name[], int nobject) {
cos_pool_t * p = NULL;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_string_t bucket;
cos_table_t * resp_headers = NULL;
cos_table_t *resp_headers = NULL;
cos_request_options_t *options = NULL;
cos_list_t object_list;
cos_list_t deleted_object_list;
@ -1081,14 +1263,14 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
bool s3Exists(const char *object_name) {
bool ret = false;
cos_pool_t * p = NULL;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t * s = NULL;
cos_request_options_t * options = NULL;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t * resp_headers;
cos_table_t * headers = NULL;
cos_table_t *resp_headers;
cos_table_t *headers = NULL;
cos_object_exist_status_e object_exist;
cos_pool_create(&p, NULL);
@ -1115,15 +1297,15 @@ bool s3Exists(const char *object_name) {
bool s3Get(const char *object_name, const char *path) {
bool ret = false;
cos_pool_t * p = NULL;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t * s = NULL;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_string_t file;
cos_table_t * resp_headers = NULL;
cos_table_t * headers = NULL;
cos_table_t *resp_headers = NULL;
cos_table_t *headers = NULL;
int traffic_limit = 0;
//创建内存池
@ -1159,15 +1341,15 @@ bool s3Get(const char *object_name, const char *path) {
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) {
(void)check;
int32_t code = 0;
cos_pool_t * p = NULL;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t * s = NULL;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t * resp_headers;
cos_table_t * headers = NULL;
cos_buf_t * content = NULL;
cos_table_t *resp_headers;
cos_table_t *headers = NULL;
cos_buf_t *content = NULL;
// cos_string_t file;
// int traffic_limit = 0;
char range_buf[64];
@ -1261,7 +1443,7 @@ void s3EvictCache(const char *path, long object_size) {
terrno = TAOS_SYSTEM_ERROR(errno);
vError("failed to open %s since %s", dir_name, terrstr());
}
SArray * evict_files = taosArrayInit(16, sizeof(SEvictFile));
SArray *evict_files = taosArrayInit(16, sizeof(SEvictFile));
tdbDirEntryPtr pDirEntry;
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char *name = taosGetDirEntryName(pDirEntry);
@ -1303,13 +1485,13 @@ void s3EvictCache(const char *path, long object_size) {
long s3Size(const char *object_name) {
long size = 0;
cos_pool_t * p = NULL;
cos_pool_t *p = NULL;
int is_cname = 0;
cos_status_t * s = NULL;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t * resp_headers = NULL;
cos_table_t *resp_headers = NULL;
//创建内存池
cos_pool_create(&p, NULL);
@ -1344,7 +1526,7 @@ long s3Size(const char *object_name) {
int32_t s3Init() { return 0; }
void s3CleanUp() {}
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
int32_t s3PutObjectFromFile2(const char *file, const char *object) { return 0; }
int32_t s3PutObjectFromFile2(const char *file, const char *object, int8_t withcp) { return 0; }
void s3DeleteObjectsByPrefix(const char *prefix) {}
void s3DeleteObjects(const char *object_name[], int nobject) {}
bool s3Exists(const char *object_name) { return false; }

417
source/common/src/cos_cp.c Normal file
View File

@ -0,0 +1,417 @@
#define ALLOW_FORBID_FUNC
#include "cos_cp.h"
#include "cJSON.h"
#include "tutil.h"
int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) {
int32_t code = 0;
TdFilePtr fd = taosOpenFile(cp_path, TD_FILE_WRITE | TD_FILE_CREATE /* | TD_FILE_TRUNC*/ | TD_FILE_WRITE_THROUGH);
if (!fd) {
code = TAOS_SYSTEM_ERROR(errno);
uError("ERROR: %s Failed to open %s", __func__, cp_path);
return code;
}
checkpoint->thefile = fd;
return code;
}
void cos_cp_close(TdFilePtr fd) { taosCloseFile(&fd); }
void cos_cp_remove(char const* filepath) { taosRemoveFile(filepath); }
static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) {
int32_t code = 0;
cJSON const* item2 = NULL;
cJSON* json = cJSON_Parse(cp_body);
if (NULL == json) {
code = TSDB_CODE_FILE_CORRUPTED;
uError("ERROR: %s Failed to parse json", __func__);
goto _exit;
}
cJSON const* item = cJSON_GetObjectItem(json, "ver");
if (!cJSON_IsNumber(item) || item->valuedouble != 1) {
code = TSDB_CODE_FILE_CORRUPTED;
uError("ERROR: %s Failed to parse json ver: %f", __func__, item->valuedouble);
goto _exit;
}
item = cJSON_GetObjectItem(json, "type");
if (!cJSON_IsNumber(item)) {
code = TSDB_CODE_FILE_CORRUPTED;
uError("ERROR: %s Failed to parse json", __func__);
goto _exit;
}
cp->cp_type = item->valuedouble;
item = cJSON_GetObjectItem(json, "md5");
if (cJSON_IsString(item)) {
memcpy(cp->md5, item->valuestring, strlen(item->valuestring));
}
item = cJSON_GetObjectItem(json, "upload_id");
if (cJSON_IsString(item)) {
strncpy(cp->upload_id, item->valuestring, 128);
}
item2 = cJSON_GetObjectItem(json, "file");
if (cJSON_IsObject(item2)) {
item = cJSON_GetObjectItem(item2, "size");
if (cJSON_IsNumber(item)) {
cp->file_size = item->valuedouble;
}
item = cJSON_GetObjectItem(item2, "lastmodified");
if (cJSON_IsNumber(item)) {
cp->file_last_modified = item->valuedouble;
}
item = cJSON_GetObjectItem(item2, "path");
if (cJSON_IsString(item)) {
strncpy(cp->file_path, item->valuestring, TSDB_FILENAME_LEN);
}
item = cJSON_GetObjectItem(item2, "file_md5");
if (cJSON_IsString(item)) {
strncpy(cp->file_md5, item->valuestring, 64);
}
}
item2 = cJSON_GetObjectItem(json, "object");
if (cJSON_IsObject(item2)) {
item = cJSON_GetObjectItem(item2, "object_size");
if (cJSON_IsNumber(item)) {
cp->object_size = item->valuedouble;
}
item = cJSON_GetObjectItem(item2, "object_name");
if (cJSON_IsString(item)) {
strncpy(cp->object_name, item->valuestring, 128);
}
item = cJSON_GetObjectItem(item2, "object_last_modified");
if (cJSON_IsString(item)) {
strncpy(cp->object_last_modified, item->valuestring, 64);
}
item = cJSON_GetObjectItem(item2, "object_etag");
if (cJSON_IsString(item)) {
strncpy(cp->object_etag, item->valuestring, 128);
}
}
item2 = cJSON_GetObjectItem(json, "cpparts");
if (cJSON_IsObject(item2)) {
item = cJSON_GetObjectItem(item2, "number");
if (cJSON_IsNumber(item)) {
cp->part_num = item->valuedouble;
}
item = cJSON_GetObjectItem(item2, "size");
if (cJSON_IsNumber(item)) {
cp->part_size = item->valuedouble;
}
item2 = cJSON_GetObjectItem(item2, "parts");
if (cJSON_IsArray(item2) && cp->part_num > 0) {
cJSON_ArrayForEach(item, item2) {
cJSON const* item3 = cJSON_GetObjectItem(item, "index");
int32_t index = 0;
if (cJSON_IsNumber(item3)) {
index = item3->valuedouble;
cp->parts[index].index = index;
}
item3 = cJSON_GetObjectItem(item, "offset");
if (cJSON_IsNumber(item3)) {
cp->parts[index].offset = item3->valuedouble;
}
item3 = cJSON_GetObjectItem(item, "size");
if (cJSON_IsNumber(item3)) {
cp->parts[index].size = item3->valuedouble;
}
item3 = cJSON_GetObjectItem(item, "completed");
if (cJSON_IsNumber(item3)) {
cp->parts[index].completed = item3->valuedouble;
}
item3 = cJSON_GetObjectItem(item, "crc64");
if (cJSON_IsNumber(item3)) {
cp->parts[index].crc64 = item3->valuedouble;
}
item3 = cJSON_GetObjectItem(item, "etag");
if (cJSON_IsString(item3)) {
strncpy(cp->parts[index].etag, item3->valuestring, 128);
}
}
}
}
_exit:
if (json) cJSON_Delete(json);
if (cp_body) taosMemoryFree(cp_body);
return code;
}
int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint) {
int32_t code = 0;
TdFilePtr fd = taosOpenFile(filepath, TD_FILE_READ);
if (!fd) {
code = TAOS_SYSTEM_ERROR(errno);
uError("ERROR: %s Failed to open %s", __func__, filepath);
goto _exit;
}
int64_t size = -1;
code = taosStatFile(filepath, &size, NULL, NULL);
if (code) {
uError("ERROR: %s Failed to stat %s", __func__, filepath);
goto _exit;
}
char* cp_body = taosMemoryMalloc(size + 1);
int64_t n = taosReadFile(fd, cp_body, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
uError("ERROR: %s Failed to read %s", __func__, filepath);
goto _exit;
} else if (n != size) {
code = TSDB_CODE_FILE_CORRUPTED;
uError("ERROR: %s Failed to read %s %" PRId64 "/%" PRId64, __func__, filepath, n, size);
goto _exit;
}
taosCloseFile(&fd);
cp_body[size] = '\0';
return cos_cp_parse_body(cp_body, checkpoint);
_exit:
if (fd) {
taosCloseFile(&fd);
}
if (cp_body) {
taosMemoryFree(cp_body);
}
return code;
}
static int32_t cos_cp_save_json(cJSON const* json, SCheckpoint* checkpoint) {
int32_t code = 0;
char* data = cJSON_PrintUnformatted(json);
if (NULL == data) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TdFilePtr fp = checkpoint->thefile;
if (taosFtruncateFile(fp, 0) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
if (taosLSeekFile(fp, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
if (taosWriteFile(fp, data, strlen(data)) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
if (taosFsyncFile(fp) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
_exit:
taosMemoryFree(data);
return code;
}
int32_t cos_cp_dump(SCheckpoint* cp) {
int32_t code = 0;
int32_t lino = 0;
cJSON* ojson = NULL;
cJSON* json = cJSON_CreateObject();
if (!json) return TSDB_CODE_OUT_OF_MEMORY;
if (NULL == cJSON_AddNumberToObject(json, "ver", 1)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(json, "type", cp->cp_type)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddStringToObject(json, "md5", cp->md5)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddStringToObject(json, "upload_id", cp->upload_id)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (COS_CP_TYPE_UPLOAD == cp->cp_type) {
ojson = cJSON_AddObjectToObject(json, "file");
if (!ojson) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(ojson, "size", cp->file_size)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(ojson, "lastmodified", cp->file_last_modified)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddStringToObject(ojson, "path", cp->file_path)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddStringToObject(ojson, "file_md5", cp->file_md5)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
} else if (COS_CP_TYPE_DOWNLOAD == cp->cp_type) {
ojson = cJSON_AddObjectToObject(json, "object");
if (!ojson) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(ojson, "object_size", cp->object_size)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddStringToObject(ojson, "object_name", cp->object_name)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddStringToObject(ojson, "object_last_modified", cp->object_last_modified)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddStringToObject(ojson, "object_etag", cp->object_etag)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
ojson = cJSON_AddObjectToObject(json, "cpparts");
if (!ojson) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(ojson, "number", cp->part_num)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(ojson, "size", cp->part_size)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
cJSON* ajson = cJSON_AddArrayToObject(ojson, "parts");
if (!ajson) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int i = 0; i < cp->part_num; ++i) {
cJSON* item = cJSON_CreateObject();
if (!item) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
cJSON_AddItemToArray(ajson, item);
if (NULL == cJSON_AddNumberToObject(item, "index", cp->parts[i].index)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(item, "offset", cp->parts[i].offset)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(item, "size", cp->parts[i].size)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(item, "completed", cp->parts[i].completed)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddNumberToObject(item, "crc64", cp->parts[i].crc64)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (NULL == cJSON_AddStringToObject(item, "etag", cp->parts[i].etag)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
code = cos_cp_save_json(json, cp);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
cJSON_Delete(json);
return code;
}
void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes) {}
void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64) {
checkpoint->parts[part_index].completed = 1;
strncpy(checkpoint->parts[part_index].etag, etag, 128);
checkpoint->parts[part_index].crc64 = crc64;
}
void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime,
char const* upload_id, int64_t part_size) {
int i = 0;
checkpoint->cp_type = COS_CP_TYPE_UPLOAD;
strncpy(checkpoint->file_path, filepath, TSDB_FILENAME_LEN);
checkpoint->file_size = size;
checkpoint->file_last_modified = mtime;
strncpy(checkpoint->upload_id, upload_id, 128);
checkpoint->part_size = part_size;
for (; i * part_size < size; i++) {
checkpoint->parts[i].index = i;
checkpoint->parts[i].offset = i * part_size;
checkpoint->parts[i].size = TMIN(part_size, (size - i * part_size));
checkpoint->parts[i].completed = 0;
checkpoint->parts[i].etag[0] = '\0';
}
checkpoint->part_num = i;
}
static bool cos_cp_verify_md5(SCheckpoint* cp) { return true; }
bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime) {
if (cos_cp_verify_md5(checkpoint) && checkpoint->file_size == size && checkpoint->file_last_modified == mtime) {
return true;
}
return false;
}

View File

@ -53,11 +53,10 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
}
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
int32_t code = 0;
// SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * tsS3BlockSize * szPage, 0, .5);
int32_t code = 0;
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
int64_t szBlock = tsS3BlockSize <= 1024 ? 1024 : tsS3BlockSize;
SLRUCache *pCache = taosLRUCacheInit((int64_t)tsS3BlockCacheSize * szBlock * szPage, 0, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
@ -67,8 +66,9 @@ static int32_t tsdbOpenBCache(STsdb *pTsdb) {
taosThreadMutexInit(&pTsdb->bMutex, NULL);
_err:
pTsdb->bCache = pCache;
_err:
return code;
}

View File

@ -529,7 +529,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
if (taosIsDir(file->aname)) continue;
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL &&
strncmp(file->aname + strlen(file->aname) - 3, ".cp", 3)) {
int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs);
remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1);
}
@ -1113,7 +1114,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TFileSetRangeArray *pRa
TARRAY2_FOREACH(fs->fSetArr, fset) {
int64_t ever = VERSION_MAX;
if (pHash) {
int32_t fid = fset->fid;
int32_t fid = fset->fid;
STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
if (u) {
ever = u->sver - 1;
@ -1145,10 +1146,10 @@ int32_t tsdbFSDestroyCopyRangedSnapshot(TFileSetArray **fsetArr) { return tsdbFS
int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ever, TFileSetRangeArray *pRanges,
TFileSetRangeArray **fsrArr) {
int32_t code = 0;
STFileSet *fset;
int32_t code = 0;
STFileSet *fset;
STFileSetRange *fsr1 = NULL;
SHashObj *pHash = NULL;
SHashObj *pHash = NULL;
fsrArr[0] = taosMemoryCalloc(1, sizeof(*fsrArr[0]));
if (fsrArr[0] == NULL) {
@ -1171,7 +1172,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
int64_t ever1 = ever;
if (pHash) {
int32_t fid = fset->fid;
int32_t fid = fset->fid;
STFileSetRange *u = taosHashGet(pHash, &fid, sizeof(fid));
if (u) {
sver1 = u->sver;

View File

@ -112,7 +112,7 @@ static int32_t tsdbCopyFileS3(SRTNer *rtner, const STFileObj *from, const STFile
TSDB_CHECK_CODE(code, lino, _exit);
char *object_name = taosDirEntryBaseName(fname);
code = s3PutObjectFromFile2(from->fname, object_name);
code = s3PutObjectFromFile2(from->fname, object_name, 1);
TSDB_CHECK_CODE(code, lino, _exit);
taosCloseFile(&fdFrom);

View File

@ -515,7 +515,7 @@ static int uploadCheckpointToS3(char* id, char* path) {
char object[PATH_MAX] = {0};
snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
if (s3PutObjectFromFile2(filename, object) != 0) {
if (s3PutObjectFromFile2(filename, object, 0) != 0) {
taosCloseDir(&pDir);
return -1;
}