diff --git a/include/common/cos_cp.h b/include/common/cos_cp.h new file mode 100644 index 0000000000..5e80ddde11 --- /dev/null +++ b/include/common/cos_cp.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_COMMON_COS_CP_H_ +#define _TD_COMMON_COS_CP_H_ + +#include "os.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef enum { + COS_CP_TYPE_UPLOAD, // upload + COS_CP_TYPE_DOWNLOAD // download +} ECpType; + +typedef struct { + int32_t index; // the index of part, start from 0 + int64_t offset; // the offset point of part + int64_t size; // the size of part + int completed; // COS_TRUE completed, COS_FALSE uncompleted + char* etag; // the etag of part, for upload + uint64_t crc64; +} SCheckpointPart; + +typedef struct { + char* md5; // the md5 of checkout content + ECpType cp_type; // 1 upload, 2 download + TdFilePtr* thefile; // the handle of checkpoint file + + char* file_path; // local file path + int64_t file_size; // local file size, for upload + int32_t file_last_modified; // local file last modified time, for upload + char* file_md5; // the md5 of the local file content, for upload, reserved + + char* object_name; // object name + int64_t object_size; // object size, for download + char* object_last_modified; // object last modified time, for download + char* object_etag; // object etag, for download + + char* upload_id; // upload id + + int part_num; // the total number of parts + int64_t part_size; // the part size, byte + SCheckpointPart* parts; // the parts of local or object, from 0 +} SCheckpoint; + +void cos_cp_get_path(char const* filepath, char* cp_path); +TdFilePtr cos_cp_open(char const* cp_path, SCheckpoint* checkpoint); +void cos_cp_close(TdFilePtr fd); +void cos_cp_remove(char const* filepath); +bool cos_cp_exist(char const* filepath); + +int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint); +int32_t cos_cp_dump(SCheckpoint* checkpoint); +void cos_cp_get_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t consume_bytes); +void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64); +void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, + char const* upload_id, int64_t part_size); +bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime); + +void cos_cp_build_download(SCheckpoint* checkpoint, char const* filepath, char const* object_name, int64_t object_size, + char const* object_lmtime, char const* object_etag, int64_t part_size); +bool cos_cp_is_valid_download(SCheckpoint* checkpoint, char const* object_name, int64_t object_size, + char const* object_lmtime, char const* object_etag); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_COMMON_COS_CP_H_*/ diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 7c8676e9f5..c24c962ac6 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1,6 +1,7 @@ #define ALLOW_FORBID_FUNC #include "cos.h" +#include "cos_cp.h" extern char tsS3Endpoint[]; extern char tsS3AccessKeyId[]; @@ -86,7 +87,7 @@ typedef struct { char err_msg[128]; S3Status status; uint64_t content_length; - char * buf; + char *buf; int64_t buf_pos; } TS3SizeCBD; @@ -270,7 +271,7 @@ typedef struct list_parts_callback_data { typedef struct MultipartPartData { put_object_callback_data put_object_data; int seq; - UploadManager * manager; + UploadManager *manager; } MultipartPartData; static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) { @@ -317,7 +318,7 @@ S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properti MultipartPartData *data = (MultipartPartData *)callbackData; int seq = data->seq; - const char * etag = properties->eTag; + const char *etag = properties->eTag; data->manager->etags[seq - 1] = strdup(etag); data->manager->next_etags_pos = seq; return S3StatusOK; @@ -446,14 +447,28 @@ static int try_get_parts_info(const char *bucketName, const char *key, UploadMan return 0; } */ + +static int32_t s3PutObjectFromFileWithoutCp(const char *file, int64_t size, int32_t lmtime, const char *object) { + int32_t code = 0; + + return code; +} + +static int32_t s3PutObjectFromFileWithCp(const char *file, int64_t size, int32_t lmtime, const char *object) { + int32_t code = 0; + + return code; +} + int32_t s3PutObjectFromFile2(const char *file, const char *object) { int32_t code = 0; + int32_t lmtime = 0; const char *key = object; // const char *uploadId = 0; - const char * filename = 0; + const char *filename = 0; uint64_t contentLength = 0; - const char * cacheControl = 0, *contentType = 0, *md5 = 0; - const char * contentDispositionFilename = 0, *contentEncoding = 0; + const char *cacheControl = 0, *contentType = 0, *md5 = 0; + const char *contentDispositionFilename = 0, *contentEncoding = 0; int64_t expires = -1; S3CannedAcl cannedAcl = S3CannedAclPrivate; int metaPropertiesCount = 0; @@ -468,7 +483,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) { // data.noStatus = noStatus; // uError("ERROR: %s stat file %s: ", __func__, file); - if (taosStatFile(file, &contentLength, NULL, NULL) < 0) { + if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) { uError("ERROR: %s Failed to stat file %s: ", __func__, file); code = TAOS_SYSTEM_ERROR(errno); return code; @@ -648,7 +663,7 @@ typedef struct list_bucket_callback_data { char nextMarker[1024]; int keyCount; int allDetails; - SArray * objectArray; + SArray *objectArray; } list_bucket_callback_data; static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount, @@ -693,11 +708,11 @@ static void s3FreeObjectKey(void *pItem) { static SArray *getListByPrefix(const char *prefix) { S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &listBucketCallback}; - const char * marker = 0, *delimiter = 0; + const char *marker = 0, *delimiter = 0; int maxkeys = 0, allDetails = 0; list_bucket_callback_data data; data.objectArray = taosArrayInit(32, sizeof(void *)); @@ -738,7 +753,7 @@ static SArray *getListByPrefix(const char *prefix) { void s3DeleteObjects(const char *object_name[], int nobject) { S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3ResponseHandler responseHandler = {0, &responseCompleteCallback}; for (int i = 0; i < nobject; ++i) { @@ -789,7 +804,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, const char *ifMatch = 0, *ifNotMatch = 0; S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback}, &getObjectDataCallback}; @@ -827,7 +842,7 @@ int32_t s3GetObjectToFile(const char *object_name, char *fileName) { const char *ifMatch = 0, *ifNotMatch = 0; S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret, - 0, awsRegionG}; + 0, awsRegionG}; S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch}; S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback}, &getObjectCallback}; @@ -858,7 +873,7 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) { if (objectArray == NULL) return -1; for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) { - char * object = taosArrayGetP(objectArray, i); + char *object = taosArrayGetP(objectArray, i); const char *tmp = strchr(object, '/'); tmp = (tmp == NULL) ? object : tmp + 1; char fileName[PATH_MAX] = {0}; @@ -949,12 +964,12 @@ static void s3InitRequestOptions(cos_request_options_t *options, int is_cname) { int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { int32_t code = 0; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket, object, file; - cos_table_t * resp_headers; + cos_table_t *resp_headers; // int traffic_limit = 0; cos_pool_create(&p, NULL); @@ -985,14 +1000,14 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) { int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { int32_t code = 0; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; - cos_request_options_t * options = NULL; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; cos_string_t bucket, object, file; - cos_table_t * resp_headers; + cos_table_t *resp_headers; int traffic_limit = 0; - cos_table_t * headers = NULL; + cos_table_t *headers = NULL; cos_resumable_clt_params_t *clt_params = NULL; cos_pool_create(&p, NULL); @@ -1025,11 +1040,11 @@ int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) { } void s3DeleteObjectsByPrefix(const char *prefix_str) { - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; cos_request_options_t *options = NULL; int is_cname = 0; cos_string_t bucket; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_string_t prefix; cos_pool_create(&p, NULL); @@ -1044,10 +1059,10 @@ void s3DeleteObjectsByPrefix(const char *prefix_str) { } void s3DeleteObjects(const char *object_name[], int nobject) { - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; cos_string_t bucket; - cos_table_t * resp_headers = NULL; + cos_table_t *resp_headers = NULL; cos_request_options_t *options = NULL; cos_list_t object_list; cos_list_t deleted_object_list; @@ -1081,14 +1096,14 @@ void s3DeleteObjects(const char *object_name[], int nobject) { bool s3Exists(const char *object_name) { bool ret = false; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; - cos_request_options_t * options = NULL; + cos_status_t *s = NULL; + cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t * resp_headers; - cos_table_t * headers = NULL; + cos_table_t *resp_headers; + cos_table_t *headers = NULL; cos_object_exist_status_e object_exist; cos_pool_create(&p, NULL); @@ -1115,15 +1130,15 @@ bool s3Exists(const char *object_name) { bool s3Get(const char *object_name, const char *path) { bool ret = false; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; cos_string_t file; - cos_table_t * resp_headers = NULL; - cos_table_t * headers = NULL; + cos_table_t *resp_headers = NULL; + cos_table_t *headers = NULL; int traffic_limit = 0; //创建内存池 @@ -1159,15 +1174,15 @@ bool s3Get(const char *object_name, const char *path) { int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) { (void)check; int32_t code = 0; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t * resp_headers; - cos_table_t * headers = NULL; - cos_buf_t * content = NULL; + cos_table_t *resp_headers; + cos_table_t *headers = NULL; + cos_buf_t *content = NULL; // cos_string_t file; // int traffic_limit = 0; char range_buf[64]; @@ -1261,7 +1276,7 @@ void s3EvictCache(const char *path, long object_size) { terrno = TAOS_SYSTEM_ERROR(errno); vError("failed to open %s since %s", dir_name, terrstr()); } - SArray * evict_files = taosArrayInit(16, sizeof(SEvictFile)); + SArray *evict_files = taosArrayInit(16, sizeof(SEvictFile)); tdbDirEntryPtr pDirEntry; while ((pDirEntry = taosReadDir(pDir)) != NULL) { char *name = taosGetDirEntryName(pDirEntry); @@ -1303,13 +1318,13 @@ void s3EvictCache(const char *path, long object_size) { long s3Size(const char *object_name) { long size = 0; - cos_pool_t * p = NULL; + cos_pool_t *p = NULL; int is_cname = 0; - cos_status_t * s = NULL; + cos_status_t *s = NULL; cos_request_options_t *options = NULL; cos_string_t bucket; cos_string_t object; - cos_table_t * resp_headers = NULL; + cos_table_t *resp_headers = NULL; //创建内存池 cos_pool_create(&p, NULL); diff --git a/source/common/src/cos_cp.c b/source/common/src/cos_cp.c new file mode 100644 index 0000000000..8899c2d9f2 --- /dev/null +++ b/source/common/src/cos_cp.c @@ -0,0 +1,17 @@ +#define ALLOW_FORBID_FUNC + +#include "cos_cp.h" + +void cos_cp_get_path(char const* filepath, char* cp_path) {} +TdFilePtr cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) { return NULL; } +void cos_cp_close(TdFilePtr fd) {} +void cos_cp_remove(char const* filepath) {} +bool cos_cp_exist(char const* filepath) { return true; } + +int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint) { return 0; } +int32_t cos_cp_dump(SCheckpoint* checkpoint) { return 0; } +void cos_cp_get_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t consume_bytes) {} +void cos_cp_update(SCheckpoint* checkpoint, int32_t part_index, char const* etag, uint64_t crc64) {} +void cos_cp_build_upload(SCheckpoint* checkpoint, char const* filepath, int64_t size, int32_t mtime, + char const* upload_id, int64_t part_size) {} +bool cos_cp_is_valid_upload(SCheckpoint* checkpoint, int64_t size, int32_t mtime) { return true; }