terrno/cos: use new macro to return terrno
This commit is contained in:
parent
c819ac1abe
commit
d93d5325e0
|
@ -60,8 +60,8 @@ typedef struct {
|
||||||
} SCheckpoint;
|
} SCheckpoint;
|
||||||
|
|
||||||
int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint);
|
int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint);
|
||||||
void cos_cp_close(TdFilePtr fd);
|
int32_t cos_cp_close(TdFilePtr fd);
|
||||||
void cos_cp_remove(char const* filepath);
|
int32_t cos_cp_remove(char const* filepath);
|
||||||
|
|
||||||
int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint);
|
int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint);
|
||||||
int32_t cos_cp_dump(SCheckpoint* checkpoint);
|
int32_t cos_cp_dump(SCheckpoint* checkpoint);
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
#include "cos.h"
|
#include "cos.h"
|
||||||
#include "cos_cp.h"
|
#include "cos_cp.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
extern char tsS3Endpoint[];
|
extern char tsS3Endpoint[];
|
||||||
extern char tsS3AccessKeyId[];
|
extern char tsS3AccessKeyId[];
|
||||||
|
@ -40,7 +41,7 @@ int32_t s3Begin() {
|
||||||
|
|
||||||
if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) {
|
if ((status = S3_initialize("s3", verifyPeerG | S3_INIT_ALL, hostname)) != S3StatusOK) {
|
||||||
uError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
|
uError("Failed to initialize libs3: %s\n", S3_get_status_name(status));
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
protocolG = !tsS3Https;
|
protocolG = !tsS3Https;
|
||||||
|
@ -48,12 +49,12 @@ int32_t s3Begin() {
|
||||||
uriStyleG = S3UriStyleVirtualHost;
|
uriStyleG = S3UriStyleVirtualHost;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
void s3End() { S3_deinitialize(); }
|
void s3End() { S3_deinitialize(); }
|
||||||
|
|
||||||
int32_t s3Init() { return 0; /*s3Begin();*/ }
|
int32_t s3Init() { TAOS_RETURN(TSDB_CODE_SUCCESS); /*s3Begin();*/ }
|
||||||
|
|
||||||
void s3CleanUp() { /*s3End();*/
|
void s3CleanUp() { /*s3End();*/
|
||||||
}
|
}
|
||||||
|
@ -61,17 +62,17 @@ void s3CleanUp() { /*s3End();*/
|
||||||
static int32_t s3ListBucket(char const *bucketname);
|
static int32_t s3ListBucket(char const *bucketname);
|
||||||
|
|
||||||
int32_t s3CheckCfg() {
|
int32_t s3CheckCfg() {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
|
||||||
if (!tsS3Enabled) {
|
if (!tsS3Enabled) {
|
||||||
fprintf(stderr, "s3 not configured.\n");
|
fprintf(stderr, "s3 not configured.\n");
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
code = s3Begin();
|
code = s3Begin();
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
fprintf(stderr, "failed to initialize s3.\n");
|
fprintf(stderr, "failed to initialize s3.\n");
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// test put
|
// test put
|
||||||
|
@ -91,20 +92,17 @@ int32_t s3CheckCfg() {
|
||||||
|
|
||||||
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
|
TdFilePtr fp = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
|
||||||
if (!fp) {
|
if (!fp) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
fprintf(stderr, "failed to open test file: %s.\n", path);
|
fprintf(stderr, "failed to open test file: %s.\n", path);
|
||||||
// uError("ERROR: %s Failed to open %s", __func__, path);
|
// uError("ERROR: %s Failed to open %s", __func__, path);
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
}
|
}
|
||||||
if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) {
|
if (taosWriteFile(fp, testdata, strlen(testdata)) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
fprintf(stderr, "failed to write test file: %s.\n", path);
|
fprintf(stderr, "failed to write test file: %s.\n", path);
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
}
|
}
|
||||||
if (taosFsyncFile(fp) < 0) {
|
if (taosFsyncFile(fp) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
fprintf(stderr, "failed to fsync test file: %s.\n", path);
|
fprintf(stderr, "failed to fsync test file: %s.\n", path);
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
}
|
}
|
||||||
taosCloseFile(&fp);
|
taosCloseFile(&fp);
|
||||||
|
|
||||||
|
@ -112,7 +110,7 @@ int32_t s3CheckCfg() {
|
||||||
code = s3PutObjectFromFileOffset(path, objectname[0], 0, 16);
|
code = s3PutObjectFromFileOffset(path, objectname[0], 0, 16);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
fprintf(stderr, "put object %s : failed.\n", objectname[0]);
|
fprintf(stderr, "put object %s : failed.\n", objectname[0]);
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
}
|
}
|
||||||
fprintf(stderr, "put object %s: success.\n\n", objectname[0]);
|
fprintf(stderr, "put object %s: success.\n\n", objectname[0]);
|
||||||
|
|
||||||
|
@ -121,7 +119,7 @@ int32_t s3CheckCfg() {
|
||||||
code = s3ListBucket(tsS3BucketName);
|
code = s3ListBucket(tsS3BucketName);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName);
|
fprintf(stderr, "listing bucket %s : failed.\n", tsS3BucketName);
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
}
|
}
|
||||||
fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName);
|
fprintf(stderr, "listing bucket %s: success.\n\n", tsS3BucketName);
|
||||||
|
|
||||||
|
@ -134,7 +132,7 @@ int32_t s3CheckCfg() {
|
||||||
code = s3GetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock);
|
code = s3GetObjectBlock(objectname[0], c_offset, c_len, true, &pBlock);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
fprintf(stderr, "get object %s : failed.\n", objectname[0]);
|
fprintf(stderr, "get object %s : failed.\n", objectname[0]);
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
}
|
}
|
||||||
char buf[7] = {0};
|
char buf[7] = {0};
|
||||||
memcpy(buf, pBlock, c_len);
|
memcpy(buf, pBlock, c_len);
|
||||||
|
@ -147,14 +145,14 @@ int32_t s3CheckCfg() {
|
||||||
code = s3DeleteObjects(objectname, 1);
|
code = s3DeleteObjects(objectname, 1);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
fprintf(stderr, "delete object %s : failed.\n", objectname[0]);
|
fprintf(stderr, "delete object %s : failed.\n", objectname[0]);
|
||||||
goto _exit;
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
}
|
}
|
||||||
fprintf(stderr, "delete object %s: success.\n\n", objectname[0]);
|
fprintf(stderr, "delete object %s: success.\n\n", objectname[0]);
|
||||||
|
|
||||||
s3End();
|
s3End();
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int should_retry() {
|
static int should_retry() {
|
||||||
|
@ -169,7 +167,7 @@ static int should_retry() {
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void s3PrintError(const char *filename, int lineno, const char *funcname, S3Status status,
|
static void s3PrintError(const char *filename, int lineno, const char *funcname, S3Status status,
|
||||||
|
@ -202,7 +200,6 @@ static S3Status responsePropertiesCallbackNull(const S3ResponseProperties *prope
|
||||||
}
|
}
|
||||||
|
|
||||||
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) {
|
static S3Status responsePropertiesCallback(const S3ResponseProperties *properties, void *callbackData) {
|
||||||
//(void)callbackData;
|
|
||||||
TS3SizeCBD *cbd = callbackData;
|
TS3SizeCBD *cbd = callbackData;
|
||||||
if (properties->contentLength > 0) {
|
if (properties->contentLength > 0) {
|
||||||
cbd->content_length = properties->contentLength;
|
cbd->content_length = properties->contentLength;
|
||||||
|
@ -249,7 +246,7 @@ static int32_t s3ListBucket(char const *bucketname) {
|
||||||
|
|
||||||
SArray *objectArray = getListByPrefix("s3");
|
SArray *objectArray = getListByPrefix("s3");
|
||||||
if (objectArray == NULL) {
|
if (objectArray == NULL) {
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
const char **object_name = TARRAY_DATA(objectArray);
|
const char **object_name = TARRAY_DATA(objectArray);
|
||||||
|
@ -262,7 +259,7 @@ static int32_t s3ListBucket(char const *bucketname) {
|
||||||
|
|
||||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct growbuffer {
|
typedef struct growbuffer {
|
||||||
|
@ -607,13 +604,13 @@ static int32_t s3PutObjectFromFileSimple(S3BucketContext *bucket_context, char c
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
code = TAOS_SYSTEM_ERROR(EIO);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t s3PutObjectFromFileWithoutCp(S3BucketContext *bucket_context, char const *object_name,
|
static int32_t s3PutObjectFromFileWithoutCp(S3BucketContext *bucket_context, char const *object_name,
|
||||||
int64_t contentLength, S3PutProperties *put_prop,
|
int64_t contentLength, S3PutProperties *put_prop,
|
||||||
put_object_callback_data *data) {
|
put_object_callback_data *data) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
uint64_t totalContentLength = contentLength;
|
uint64_t totalContentLength = contentLength;
|
||||||
uint64_t todoContentLength = contentLength;
|
uint64_t todoContentLength = contentLength;
|
||||||
UploadManager manager = {0};
|
UploadManager manager = {0};
|
||||||
|
@ -647,8 +644,7 @@ static int32_t s3PutObjectFromFileWithoutCp(S3BucketContext *bucket_context, cha
|
||||||
|
|
||||||
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
|
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
|
||||||
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
upload:
|
upload:
|
||||||
|
@ -672,8 +668,7 @@ upload:
|
||||||
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
|
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
|
||||||
if (partData.put_object_data.status != S3StatusOK) {
|
if (partData.put_object_data.status != S3StatusOK) {
|
||||||
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
contentLength -= chunk_size;
|
contentLength -= chunk_size;
|
||||||
todoContentLength -= chunk_size;
|
todoContentLength -= chunk_size;
|
||||||
|
@ -686,8 +681,7 @@ upload:
|
||||||
int n;
|
int n;
|
||||||
for (i = 0; i < totalSeq; i++) {
|
for (i = 0; i < totalSeq; i++) {
|
||||||
if (!manager.etags[i]) {
|
if (!manager.etags[i]) {
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
n = snprintf(buf, sizeof(buf),
|
n = snprintf(buf, sizeof(buf),
|
||||||
"<Part><PartNumber>%d</PartNumber>"
|
"<Part><PartNumber>%d</PartNumber>"
|
||||||
|
@ -704,11 +698,13 @@ upload:
|
||||||
} while (S3_status_is_retryable(manager.status) && should_retry());
|
} while (S3_status_is_retryable(manager.status) && should_retry());
|
||||||
if (manager.status != S3StatusOK) {
|
if (manager.status != S3StatusOK) {
|
||||||
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
clean:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
if (manager.upload_id) {
|
if (manager.upload_id) {
|
||||||
taosMemoryFree(manager.upload_id);
|
taosMemoryFree(manager.upload_id);
|
||||||
}
|
}
|
||||||
|
@ -718,13 +714,13 @@ clean:
|
||||||
growbuffer_destroy(manager.gb);
|
growbuffer_destroy(manager.gb);
|
||||||
taosMemoryFree(manager.etags);
|
taosMemoryFree(manager.etags);
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int32_t lmtime,
|
static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const char *file, int32_t lmtime,
|
||||||
char const *object_name, int64_t contentLength, S3PutProperties *put_prop,
|
char const *object_name, int64_t contentLength, S3PutProperties *put_prop,
|
||||||
put_object_callback_data *data) {
|
put_object_callback_data *data) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
|
||||||
uint64_t totalContentLength = contentLength;
|
uint64_t totalContentLength = contentLength;
|
||||||
// uint64_t todoContentLength = contentLength;
|
// uint64_t todoContentLength = contentLength;
|
||||||
|
@ -750,7 +746,7 @@ static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const
|
||||||
manager.upload_id = strdup(cp.upload_id);
|
manager.upload_id = strdup(cp.upload_id);
|
||||||
need_init_upload = false;
|
need_init_upload = false;
|
||||||
} else {
|
} else {
|
||||||
cos_cp_remove(file_cp_path);
|
TAOS_CHECK_GOTO(cos_cp_remove(file_cp_path), &lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -764,15 +760,14 @@ static int32_t s3PutObjectFromFileWithCp(S3BucketContext *bucket_context, const
|
||||||
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
|
if (manager.upload_id == 0 || manager.status != S3StatusOK) {
|
||||||
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
code = TAOS_SYSTEM_ERROR(EIO);
|
||||||
goto clean;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
cos_cp_build_upload(&cp, file, contentLength, lmtime, manager.upload_id, chunk_size);
|
cos_cp_build_upload(&cp, file, contentLength, lmtime, manager.upload_id, chunk_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (cos_cp_open(file_cp_path, &cp)) {
|
if (cos_cp_open(file_cp_path, &cp)) {
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int part_num = 0;
|
int part_num = 0;
|
||||||
|
@ -802,8 +797,7 @@ upload:
|
||||||
|
|
||||||
if (i > 0 && cp.parts[i - 1].completed) {
|
if (i > 0 && cp.parts[i - 1].completed) {
|
||||||
if (taosLSeekFile(data->infileFD, cp.parts[i].offset, SEEK_SET) < 0) {
|
if (taosLSeekFile(data->infileFD, cp.parts[i].offset, SEEK_SET) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -827,25 +821,23 @@ upload:
|
||||||
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
|
} while (S3_status_is_retryable(partData.put_object_data.status) && should_retry());
|
||||||
if (partData.put_object_data.status != S3StatusOK) {
|
if (partData.put_object_data.status != S3StatusOK) {
|
||||||
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, partData.put_object_data.status, partData.put_object_data.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
|
||||||
|
|
||||||
//(void)cos_cp_dump(&cp);
|
//(void)cos_cp_dump(&cp);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!manager.etags[seq - 1]) {
|
if (!manager.etags[seq - 1]) {
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cos_cp_update(&cp, cp.parts[seq - 1].index, manager.etags[seq - 1], 0);
|
cos_cp_update(&cp, cp.parts[seq - 1].index, manager.etags[seq - 1], 0);
|
||||||
(void)cos_cp_dump(&cp);
|
TAOS_CHECK_GOTO(cos_cp_dump(&cp), &lino, _exit);
|
||||||
|
|
||||||
contentLength -= chunk_size;
|
contentLength -= chunk_size;
|
||||||
// todoContentLength -= chunk_size;
|
// todoContentLength -= chunk_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
cos_cp_close(cp.thefile);
|
TAOS_CHECK_GOTO(cos_cp_close(cp.thefile), &lino, _exit);
|
||||||
cp.thefile = 0;
|
cp.thefile = 0;
|
||||||
|
|
||||||
int size = 0;
|
int size = 0;
|
||||||
|
@ -869,20 +861,23 @@ upload:
|
||||||
} while (S3_status_is_retryable(manager.status) && should_retry());
|
} while (S3_status_is_retryable(manager.status) && should_retry());
|
||||||
if (manager.status != S3StatusOK) {
|
if (manager.status != S3StatusOK) {
|
||||||
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, manager.status, manager.err_msg);
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(EIO), &lino, _exit);
|
||||||
goto clean;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cos_cp_remove(file_cp_path);
|
TAOS_CHECK_GOTO(cos_cp_remove(file_cp_path), &lino, _exit);
|
||||||
|
|
||||||
clean:
|
_exit:
|
||||||
/*
|
/*
|
||||||
if (parts) {
|
if (parts) {
|
||||||
taosMemoryFree(parts);
|
taosMemoryFree(parts);
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
if (code) {
|
||||||
|
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
if (cp.thefile) {
|
if (cp.thefile) {
|
||||||
cos_cp_close(cp.thefile);
|
(void)cos_cp_close(cp.thefile);
|
||||||
}
|
}
|
||||||
if (cp.parts) {
|
if (cp.parts) {
|
||||||
taosMemoryFree(cp.parts);
|
taosMemoryFree(cp.parts);
|
||||||
|
@ -899,7 +894,7 @@ clean:
|
||||||
taosMemoryFree(manager.etags);
|
taosMemoryFree(manager.etags);
|
||||||
growbuffer_destroy(manager.gb);
|
growbuffer_destroy(manager.gb);
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) {
|
int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t withcp) {
|
||||||
|
@ -917,15 +912,13 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w
|
||||||
put_object_callback_data data = {0};
|
put_object_callback_data data = {0};
|
||||||
|
|
||||||
if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) {
|
if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
|
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
|
||||||
return code;
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
|
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
uError("ERROR: %s Failed to open file %s: ", __func__, file);
|
uError("ERROR: %s Failed to open file %s: ", __func__, file);
|
||||||
return code;
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
|
data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
|
||||||
|
@ -956,7 +949,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object_name, int8_t w
|
||||||
growbuffer_destroy(data.gb);
|
growbuffer_destroy(data.gb);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
|
int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int64_t offset, int64_t size) {
|
||||||
|
@ -974,22 +967,19 @@ int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int
|
||||||
put_object_callback_data data = {0};
|
put_object_callback_data data = {0};
|
||||||
|
|
||||||
if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) {
|
if (taosStatFile(file, &contentLength, &lmtime, NULL) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
|
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
|
||||||
return code;
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
contentLength = size;
|
contentLength = size;
|
||||||
|
|
||||||
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
|
if (!(data.infileFD = taosOpenFile(file, TD_FILE_READ))) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
uError("ERROR: %s Failed to open file %s: ", __func__, file);
|
uError("ERROR: %s Failed to open file %s: ", __func__, file);
|
||||||
return code;
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
}
|
}
|
||||||
if (taosLSeekFile(data.infileFD, offset, SEEK_SET) < 0) {
|
if (taosLSeekFile(data.infileFD, offset, SEEK_SET) < 0) {
|
||||||
taosCloseFile(&data.infileFD);
|
taosCloseFile(&data.infileFD);
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
|
data.totalContentLength = data.totalOriginalContentLength = data.contentLength = data.originalContentLength =
|
||||||
|
@ -1016,7 +1006,7 @@ int32_t s3PutObjectFromFileOffset(const char *file, const char *object_name, int
|
||||||
growbuffer_destroy(data.gb);
|
growbuffer_destroy(data.gb);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct list_bucket_callback_data {
|
typedef struct list_bucket_callback_data {
|
||||||
|
@ -1130,11 +1120,11 @@ int32_t s3DeleteObjects(const char *object_name[], int nobject) {
|
||||||
|
|
||||||
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
||||||
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
|
||||||
code = -1;
|
code = TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void s3DeleteObjectsByPrefix(const char *prefix) {
|
void s3DeleteObjectsByPrefix(const char *prefix) {
|
||||||
|
@ -1187,17 +1177,19 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
|
||||||
|
|
||||||
if (cbd.status != S3StatusOK) {
|
if (cbd.status != S3StatusOK) {
|
||||||
uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
|
uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
|
||||||
return TAOS_SYSTEM_ERROR(EIO);
|
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (check && cbd.buf_pos != size) {
|
if (check && cbd.buf_pos != size) {
|
||||||
uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
|
uError("%s: %d/%s(%s)", __func__, cbd.status, S3_get_status_name(cbd.status), cbd.err_msg);
|
||||||
return TAOS_SYSTEM_ERROR(EIO);
|
|
||||||
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppBlock = cbd.buf;
|
*ppBlock = cbd.buf;
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
|
static S3Status getObjectCallback(int bufferSize, const char *buffer, void *callbackData) {
|
||||||
|
@ -1218,9 +1210,8 @@ int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
uError("[s3] open file error, errno:%d, fileName:%s", TAOS_SYSTEM_ERROR(errno), fileName);
|
||||||
uError("[s3] open file error, errno:%d, fileName:%s", terrno, fileName);
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TS3GetData cbd = {0};
|
TS3GetData cbd = {0};
|
||||||
|
@ -1232,16 +1223,17 @@ int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
|
||||||
if (cbd.status != S3StatusOK) {
|
if (cbd.status != S3StatusOK) {
|
||||||
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
uError("%s: %d(%s)", __func__, cbd.status, cbd.err_msg);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return TAOS_SYSTEM_ERROR(EIO);
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
|
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
|
||||||
SArray *objectArray = getListByPrefix(prefix);
|
SArray *objectArray = getListByPrefix(prefix);
|
||||||
if (objectArray == NULL) return -1;
|
if (objectArray == NULL) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
|
|
||||||
for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) {
|
for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) {
|
||||||
char *object = taosArrayGetP(objectArray, i);
|
char *object = taosArrayGetP(objectArray, i);
|
||||||
|
@ -1255,7 +1247,7 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
|
||||||
}
|
}
|
||||||
if (s3GetObjectToFile(object, fileName) != 0) {
|
if (s3GetObjectToFile(object, fileName) != 0) {
|
||||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
taosArrayDestroyEx(objectArray, s3FreeObjectKey);
|
||||||
|
@ -1279,7 +1271,7 @@ long s3Size(const char *object_name) {
|
||||||
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
if ((cbd.status != S3StatusOK) && (cbd.status != S3StatusErrorPreconditionFailed)) {
|
||||||
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
|
s3PrintError(__FILE__, __LINE__, __func__, cbd.status, cbd.err_msg);
|
||||||
|
|
||||||
return -1;
|
TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
}
|
}
|
||||||
|
|
||||||
size = cbd.content_length;
|
size = cbd.content_length;
|
||||||
|
@ -1306,7 +1298,7 @@ int32_t s3Init() {
|
||||||
// set log output, default stderr
|
// set log output, default stderr
|
||||||
cos_log_set_output(NULL);
|
cos_log_set_output(NULL);
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
void s3CleanUp() { cos_http_io_deinitialize(); }
|
void s3CleanUp() { cos_http_io_deinitialize(); }
|
||||||
|
@ -1363,10 +1355,10 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
|
||||||
cos_pool_destroy(p);
|
cos_pool_destroy(p);
|
||||||
|
|
||||||
if (s->code != 200) {
|
if (s->code != 200) {
|
||||||
return code = s->code;
|
TAOS_RETURN(s->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str, int8_t withcp) {
|
int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str, int8_t withcp) {
|
||||||
|
@ -1405,10 +1397,10 @@ int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str, int8_
|
||||||
cos_pool_destroy(p);
|
cos_pool_destroy(p);
|
||||||
|
|
||||||
if (s->code != 200) {
|
if (s->code != 200) {
|
||||||
return code = s->code;
|
TAOS_RETURN(s->code);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void s3DeleteObjectsByPrefix(const char *prefix_str) {
|
void s3DeleteObjectsByPrefix(const char *prefix_str) {
|
||||||
|
@ -1465,7 +1457,7 @@ int32_t s3DeleteObjects(const char *object_name[], int nobject) {
|
||||||
cos_warn_log("delete objects failed\n");
|
cos_warn_log("delete objects failed\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool s3Exists(const char *object_name) {
|
bool s3Exists(const char *object_name) {
|
||||||
|
@ -1588,10 +1580,9 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_
|
||||||
s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers);
|
s = cos_get_object_to_buffer(options, &bucket, &object, headers, NULL, &download_buffer, &resp_headers);
|
||||||
log_status(s);
|
log_status(s);
|
||||||
if (!cos_status_is_ok(s)) {
|
if (!cos_status_is_ok(s)) {
|
||||||
vError("s3: %d(%s)", s->code, s->error_msg);
|
uError("s3: %d(%s)", s->code, s->error_msg);
|
||||||
vError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
|
||||||
code = TAOS_SYSTEM_ERROR(EIO);
|
TAOS_RETURN(TAOS_SYSTEM_ERROR(EIO));
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// print_headers(resp_headers);
|
// print_headers(resp_headers);
|
||||||
|
@ -1614,7 +1605,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_
|
||||||
|
|
||||||
*ppBlock = buf;
|
*ppBlock = buf;
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -9,42 +9,35 @@ int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) {
|
||||||
|
|
||||||
TdFilePtr fd = taosOpenFile(cp_path, TD_FILE_WRITE | TD_FILE_CREATE /* | TD_FILE_TRUNC*/ | TD_FILE_WRITE_THROUGH);
|
TdFilePtr fd = taosOpenFile(cp_path, TD_FILE_WRITE | TD_FILE_CREATE /* | TD_FILE_TRUNC*/ | TD_FILE_WRITE_THROUGH);
|
||||||
if (!fd) {
|
if (!fd) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
uError("%s Failed to open %s", __func__, cp_path);
|
||||||
uError("ERROR: %s Failed to open %s", __func__, cp_path);
|
TAOS_CHECK_RETURN(TAOS_SYSTEM_ERROR(errno));
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
checkpoint->thefile = fd;
|
checkpoint->thefile = fd;
|
||||||
|
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cos_cp_close(TdFilePtr fd) { taosCloseFile(&fd); }
|
int32_t cos_cp_close(TdFilePtr fd) { return taosCloseFile(&fd); }
|
||||||
void cos_cp_remove(char const* filepath) { (void)taosRemoveFile(filepath); }
|
int32_t cos_cp_remove(char const* filepath) { return taosRemoveFile(filepath); }
|
||||||
|
|
||||||
static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) {
|
static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
cJSON const* item2 = NULL;
|
cJSON const* item2 = NULL;
|
||||||
|
|
||||||
cJSON* json = cJSON_Parse(cp_body);
|
cJSON* json = cJSON_Parse(cp_body);
|
||||||
if (NULL == json) {
|
if (NULL == json) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TAOS_CHECK_GOTO(TSDB_CODE_FILE_CORRUPTED, &lino, _exit);
|
||||||
uError("ERROR: %s Failed to parse json", __func__);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON const* item = cJSON_GetObjectItem(json, "ver");
|
cJSON const* item = cJSON_GetObjectItem(json, "ver");
|
||||||
if (!cJSON_IsNumber(item) || item->valuedouble != 1) {
|
if (!cJSON_IsNumber(item) || item->valuedouble != 1) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TAOS_CHECK_GOTO(TSDB_CODE_FILE_CORRUPTED, &lino, _exit);
|
||||||
uError("ERROR: %s Failed to parse json ver: %f", __func__, item->valuedouble);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
item = cJSON_GetObjectItem(json, "type");
|
item = cJSON_GetObjectItem(json, "type");
|
||||||
if (!cJSON_IsNumber(item)) {
|
if (!cJSON_IsNumber(item)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TAOS_CHECK_GOTO(TSDB_CODE_FILE_CORRUPTED, &lino, _exit);
|
||||||
uError("ERROR: %s Failed to parse json", __func__);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
cp->cp_type = item->valuedouble;
|
cp->cp_type = item->valuedouble;
|
||||||
|
|
||||||
|
@ -155,40 +148,35 @@ static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
if (json) cJSON_Delete(json);
|
if (json) cJSON_Delete(json);
|
||||||
if (cp_body) taosMemoryFree(cp_body);
|
if (cp_body) taosMemoryFree(cp_body);
|
||||||
|
TAOS_RETURN(code);
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint) {
|
int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
|
||||||
TdFilePtr fd = taosOpenFile(filepath, TD_FILE_READ);
|
TdFilePtr fd = taosOpenFile(filepath, TD_FILE_READ);
|
||||||
if (!fd) {
|
if (!fd) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
uError("ERROR: %s Failed to open %s", __func__, filepath);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t size = -1;
|
int64_t size = -1;
|
||||||
code = taosStatFile(filepath, &size, NULL, NULL);
|
TAOS_CHECK_GOTO(taosStatFile(filepath, &size, NULL, NULL), &lino, _exit);
|
||||||
if (code) {
|
|
||||||
uError("ERROR: %s Failed to stat %s", __func__, filepath);
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
char* cp_body = taosMemoryMalloc(size + 1);
|
char* cp_body = taosMemoryMalloc(size + 1);
|
||||||
|
if (!cp_body) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t n = taosReadFile(fd, cp_body, size);
|
int64_t n = taosReadFile(fd, cp_body, size);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
uError("ERROR: %s Failed to read %s", __func__, filepath);
|
|
||||||
goto _exit;
|
|
||||||
} else if (n != size) {
|
} else if (n != size) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TAOS_CHECK_GOTO(TSDB_CODE_FILE_CORRUPTED, &lino, _exit);
|
||||||
uError("ERROR: %s Failed to read %s %" PRId64 "/%" PRId64, __func__, filepath, n, size);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
taosCloseFile(&fd);
|
taosCloseFile(&fd);
|
||||||
cp_body[size] = '\0';
|
cp_body[size] = '\0';
|
||||||
|
@ -196,184 +184,161 @@ int32_t cos_cp_load(char const* filepath, SCheckpoint* checkpoint) {
|
||||||
return cos_cp_parse_body(cp_body, checkpoint);
|
return cos_cp_parse_body(cp_body, checkpoint);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
if (fd) {
|
if (fd) {
|
||||||
taosCloseFile(&fd);
|
taosCloseFile(&fd);
|
||||||
}
|
}
|
||||||
if (cp_body) {
|
if (cp_body) {
|
||||||
taosMemoryFree(cp_body);
|
taosMemoryFree(cp_body);
|
||||||
}
|
}
|
||||||
|
TAOS_RETURN(code);
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t cos_cp_save_json(cJSON const* json, SCheckpoint* checkpoint) {
|
static int32_t cos_cp_save_json(cJSON const* json, SCheckpoint* checkpoint) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
|
|
||||||
char* data = cJSON_PrintUnformatted(json);
|
char* data = cJSON_PrintUnformatted(json);
|
||||||
if (NULL == data) {
|
if (!data) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
TdFilePtr fp = checkpoint->thefile;
|
TdFilePtr fp = checkpoint->thefile;
|
||||||
if (taosFtruncateFile(fp, 0) < 0) {
|
if (taosFtruncateFile(fp, 0) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
if (taosLSeekFile(fp, 0, SEEK_SET) < 0) {
|
if (taosLSeekFile(fp, 0, SEEK_SET) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
if (taosWriteFile(fp, data, strlen(data)) < 0) {
|
if (taosWriteFile(fp, data, strlen(data)) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosFsyncFile(fp) < 0) {
|
if (taosFsyncFile(fp) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cos_cp_dump(SCheckpoint* cp) {
|
int32_t cos_cp_dump(SCheckpoint* cp) {
|
||||||
int32_t code = 0;
|
int32_t code = 0, lino = 0;
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
cJSON* ojson = NULL;
|
cJSON* ojson = NULL;
|
||||||
cJSON* json = cJSON_CreateObject();
|
cJSON* json = cJSON_CreateObject();
|
||||||
if (!json) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!json) {
|
||||||
|
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == cJSON_AddNumberToObject(json, "ver", 1)) {
|
if (NULL == cJSON_AddNumberToObject(json, "ver", 1)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == cJSON_AddNumberToObject(json, "type", cp->cp_type)) {
|
if (NULL == cJSON_AddNumberToObject(json, "type", cp->cp_type)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == cJSON_AddStringToObject(json, "md5", cp->md5)) {
|
if (NULL == cJSON_AddStringToObject(json, "md5", cp->md5)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == cJSON_AddStringToObject(json, "upload_id", cp->upload_id)) {
|
if (NULL == cJSON_AddStringToObject(json, "upload_id", cp->upload_id)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (COS_CP_TYPE_UPLOAD == cp->cp_type) {
|
if (COS_CP_TYPE_UPLOAD == cp->cp_type) {
|
||||||
ojson = cJSON_AddObjectToObject(json, "file");
|
ojson = cJSON_AddObjectToObject(json, "file");
|
||||||
if (!ojson) {
|
if (!ojson) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(ojson, "size", cp->file_size)) {
|
if (NULL == cJSON_AddNumberToObject(ojson, "size", cp->file_size)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(ojson, "lastmodified", cp->file_last_modified)) {
|
if (NULL == cJSON_AddNumberToObject(ojson, "lastmodified", cp->file_last_modified)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddStringToObject(ojson, "path", cp->file_path)) {
|
if (NULL == cJSON_AddStringToObject(ojson, "path", cp->file_path)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddStringToObject(ojson, "file_md5", cp->file_md5)) {
|
if (NULL == cJSON_AddStringToObject(ojson, "file_md5", cp->file_md5)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
} else if (COS_CP_TYPE_DOWNLOAD == cp->cp_type) {
|
} else if (COS_CP_TYPE_DOWNLOAD == cp->cp_type) {
|
||||||
ojson = cJSON_AddObjectToObject(json, "object");
|
ojson = cJSON_AddObjectToObject(json, "object");
|
||||||
if (!ojson) {
|
if (!ojson) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(ojson, "object_size", cp->object_size)) {
|
if (NULL == cJSON_AddNumberToObject(ojson, "object_size", cp->object_size)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddStringToObject(ojson, "object_name", cp->object_name)) {
|
if (NULL == cJSON_AddStringToObject(ojson, "object_name", cp->object_name)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddStringToObject(ojson, "object_last_modified", cp->object_last_modified)) {
|
if (NULL == cJSON_AddStringToObject(ojson, "object_last_modified", cp->object_last_modified)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddStringToObject(ojson, "object_etag", cp->object_etag)) {
|
if (NULL == cJSON_AddStringToObject(ojson, "object_etag", cp->object_etag)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ojson = cJSON_AddObjectToObject(json, "cpparts");
|
ojson = cJSON_AddObjectToObject(json, "cpparts");
|
||||||
if (!ojson) {
|
if (!ojson) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(ojson, "number", cp->part_num)) {
|
if (NULL == cJSON_AddNumberToObject(ojson, "number", cp->part_num)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(ojson, "size", cp->part_size)) {
|
if (NULL == cJSON_AddNumberToObject(ojson, "size", cp->part_size)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* ajson = cJSON_AddArrayToObject(ojson, "parts");
|
cJSON* ajson = cJSON_AddArrayToObject(ojson, "parts");
|
||||||
if (!ajson) {
|
if (!ajson) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
for (int i = 0; i < cp->part_num; ++i) {
|
for (int i = 0; i < cp->part_num; ++i) {
|
||||||
cJSON* item = cJSON_CreateObject();
|
cJSON* item = cJSON_CreateObject();
|
||||||
if (!item) {
|
if (!item) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
cJSON_AddItemToArray(ajson, item);
|
cJSON_AddItemToArray(ajson, item);
|
||||||
|
|
||||||
if (NULL == cJSON_AddNumberToObject(item, "index", cp->parts[i].index)) {
|
if (NULL == cJSON_AddNumberToObject(item, "index", cp->parts[i].index)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(item, "offset", cp->parts[i].offset)) {
|
if (NULL == cJSON_AddNumberToObject(item, "offset", cp->parts[i].offset)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(item, "size", cp->parts[i].size)) {
|
if (NULL == cJSON_AddNumberToObject(item, "size", cp->parts[i].size)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(item, "completed", cp->parts[i].completed)) {
|
if (NULL == cJSON_AddNumberToObject(item, "completed", cp->parts[i].completed)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddNumberToObject(item, "crc64", cp->parts[i].crc64)) {
|
if (NULL == cJSON_AddNumberToObject(item, "crc64", cp->parts[i].crc64)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
if (NULL == cJSON_AddStringToObject(item, "etag", cp->parts[i].etag)) {
|
if (NULL == cJSON_AddStringToObject(item, "etag", cp->parts[i].etag)) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = cos_cp_save_json(json, cp);
|
code = cos_cp_save_json(json, cp);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TAOS_CHECK_GOTO(code, &lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
cJSON_Delete(json);
|
cJSON_Delete(json);
|
||||||
return code;
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes) {}
|
void cos_cp_get_undo_parts(SCheckpoint* checkpoint, int* part_num, SCheckpointPart* parts, int64_t* consume_bytes) {}
|
||||||
|
|
Loading…
Reference in New Issue