Merge branch '3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-10 16:06:08 +08:00
parent ad119ea4c0
commit a3fdb6dd59
4 changed files with 90 additions and 60 deletions

View File

@ -70,7 +70,7 @@ static void s3PrintError(const char *func, S3Status status, char error_details[]
if (status < S3StatusErrorAccessDenied) {
uError("%s: %s", __func__, S3_get_status_name(status));
} else {
uError("%s: %s, %s", __func__, S3_get_status_name(status), error_details);
uError("%s: %s, %s, %d", __func__, S3_get_status_name(status), error_details, status);
}
}
@ -85,7 +85,7 @@ typedef struct {
char err_msg[128];
S3Status status;
uint64_t content_length;
char *buf;
char * buf;
int64_t buf_pos;
} TS3SizeCBD;
@ -271,7 +271,7 @@ typedef struct MultipartPartData {
S3Status status;
put_object_callback_data put_object_data;
int seq;
UploadManager *manager;
UploadManager * manager;
} MultipartPartData;
static int putObjectDataCallback(int bufferSize, char *buffer, void *callbackData) {
@ -316,7 +316,7 @@ S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properti
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
const char *etag = properties->eTag;
const char * etag = properties->eTag;
data->manager->etags[seq - 1] = strdup(etag);
data->manager->next_etags_pos = seq;
return S3StatusOK;
@ -449,10 +449,10 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
int32_t code = 0;
const char *key = object;
// const char *uploadId = 0;
const char *filename = 0;
const char * filename = 0;
uint64_t contentLength = 0;
const char *cacheControl = 0, *contentType = 0, *md5 = 0;
const char *contentDispositionFilename = 0, *contentEncoding = 0;
const char * cacheControl = 0, *contentType = 0, *md5 = 0;
const char * contentDispositionFilename = 0, *contentEncoding = 0;
int64_t expires = -1;
S3CannedAcl cannedAcl = S3CannedAclPrivate;
int metaPropertiesCount = 0;
@ -466,6 +466,7 @@ int32_t s3PutObjectFromFile2(const char *file, const char *object) {
data.gb = 0;
data.noStatus = noStatus;
uError("ERROR: %s stat file %s: ", __func__, file);
if (taosStatFile(file, &contentLength, NULL, NULL) < 0) {
uError("ERROR: %s Failed to stat file %s: ", __func__, file);
code = TAOS_SYSTEM_ERROR(errno);
@ -637,7 +638,7 @@ typedef struct list_bucket_callback_data {
char nextMarker[1024];
int keyCount;
int allDetails;
SArray *objectArray;
SArray * objectArray;
} list_bucket_callback_data;
static S3Status listBucketCallback(int isTruncated, const char *nextMarker, int contentsCount,
@ -682,11 +683,11 @@ static void s3FreeObjectKey(void *pItem) {
static SArray *getListByPrefix(const char *prefix) {
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
0, awsRegionG};
S3ListBucketHandler listBucketHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&listBucketCallback};
const char *marker = 0, *delimiter = 0;
const char * marker = 0, *delimiter = 0;
int maxkeys = 0, allDetails = 0;
list_bucket_callback_data data;
data.objectArray = taosArrayInit(32, sizeof(void *));
@ -725,7 +726,7 @@ static SArray *getListByPrefix(const char *prefix) {
void s3DeleteObjects(const char *object_name[], int nobject) {
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
0, awsRegionG};
S3ResponseHandler responseHandler = {0, &responseCompleteCallback};
for (int i = 0; i < nobject; ++i) {
@ -776,7 +777,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
const char *ifMatch = 0, *ifNotMatch = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
0, awsRegionG};
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallback, &responseCompleteCallback},
&getObjectDataCallback};
@ -814,7 +815,7 @@ int32_t s3GetObjectToFile(const char *object_name, char *fileName) {
const char *ifMatch = 0, *ifNotMatch = 0;
S3BucketContext bucketContext = {0, tsS3BucketName, protocolG, uriStyleG, tsS3AccessKeyId, tsS3AccessKeySecret,
0, awsRegionG};
0, awsRegionG};
S3GetConditions getConditions = {ifModifiedSince, ifNotModifiedSince, ifMatch, ifNotMatch};
S3GetObjectHandler getObjectHandler = {{&responsePropertiesCallbackNull, &responseCompleteCallback},
&getObjectCallback};
@ -845,7 +846,7 @@ int32_t s3GetObjectsByPrefix(const char *prefix, const char *path) {
if (objectArray == NULL) return -1;
for (size_t i = 0; i < taosArrayGetSize(objectArray); i++) {
char *object = taosArrayGetP(objectArray, i);
char * object = taosArrayGetP(objectArray, i);
const char *tmp = strchr(object, '/');
tmp = (tmp == NULL) ? object : tmp + 1;
char fileName[PATH_MAX] = {0};
@ -934,12 +935,12 @@ static void s3InitRequestOptions(cos_request_options_t *options, int is_cname) {
int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
int32_t code = 0;
cos_pool_t *p = NULL;
cos_pool_t * p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_status_t * s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket, object, file;
cos_table_t *resp_headers;
cos_table_t * resp_headers;
// int traffic_limit = 0;
cos_pool_create(&p, NULL);
@ -970,14 +971,14 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) {
int32_t code = 0;
cos_pool_t *p = NULL;
cos_pool_t * p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_status_t * s = NULL;
cos_request_options_t * options = NULL;
cos_string_t bucket, object, file;
cos_table_t *resp_headers;
cos_table_t * resp_headers;
int traffic_limit = 0;
cos_table_t *headers = NULL;
cos_table_t * headers = NULL;
cos_resumable_clt_params_t *clt_params = NULL;
cos_pool_create(&p, NULL);
@ -1010,11 +1011,11 @@ int32_t s3PutObjectFromFile2(const char *file_str, const char *object_str) {
}
void s3DeleteObjectsByPrefix(const char *prefix_str) {
cos_pool_t *p = NULL;
cos_pool_t * p = NULL;
cos_request_options_t *options = NULL;
int is_cname = 0;
cos_string_t bucket;
cos_status_t *s = NULL;
cos_status_t * s = NULL;
cos_string_t prefix;
cos_pool_create(&p, NULL);
@ -1029,10 +1030,10 @@ void s3DeleteObjectsByPrefix(const char *prefix_str) {
}
void s3DeleteObjects(const char *object_name[], int nobject) {
cos_pool_t *p = NULL;
cos_pool_t * p = NULL;
int is_cname = 0;
cos_string_t bucket;
cos_table_t *resp_headers = NULL;
cos_table_t * resp_headers = NULL;
cos_request_options_t *options = NULL;
cos_list_t object_list;
cos_list_t deleted_object_list;
@ -1066,14 +1067,14 @@ void s3DeleteObjects(const char *object_name[], int nobject) {
bool s3Exists(const char *object_name) {
bool ret = false;
cos_pool_t *p = NULL;
cos_pool_t * p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_request_options_t *options = NULL;
cos_status_t * s = NULL;
cos_request_options_t * options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t *resp_headers;
cos_table_t *headers = NULL;
cos_table_t * resp_headers;
cos_table_t * headers = NULL;
cos_object_exist_status_e object_exist;
cos_pool_create(&p, NULL);
@ -1100,15 +1101,15 @@ bool s3Exists(const char *object_name) {
bool s3Get(const char *object_name, const char *path) {
bool ret = false;
cos_pool_t *p = NULL;
cos_pool_t * p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_status_t * s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_string_t file;
cos_table_t *resp_headers = NULL;
cos_table_t *headers = NULL;
cos_table_t * resp_headers = NULL;
cos_table_t * headers = NULL;
int traffic_limit = 0;
//创建内存池
@ -1144,15 +1145,15 @@ bool s3Get(const char *object_name, const char *path) {
int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t block_size, bool check, uint8_t **ppBlock) {
(void)check;
int32_t code = 0;
cos_pool_t *p = NULL;
cos_pool_t * p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_status_t * s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t *resp_headers;
cos_table_t *headers = NULL;
cos_buf_t *content = NULL;
cos_table_t * resp_headers;
cos_table_t * headers = NULL;
cos_buf_t * content = NULL;
// cos_string_t file;
// int traffic_limit = 0;
char range_buf[64];
@ -1246,7 +1247,7 @@ void s3EvictCache(const char *path, long object_size) {
terrno = TAOS_SYSTEM_ERROR(errno);
vError("failed to open %s since %s", dir_name, terrstr());
}
SArray *evict_files = taosArrayInit(16, sizeof(SEvictFile));
SArray * evict_files = taosArrayInit(16, sizeof(SEvictFile));
tdbDirEntryPtr pDirEntry;
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char *name = taosGetDirEntryName(pDirEntry);
@ -1288,13 +1289,13 @@ void s3EvictCache(const char *path, long object_size) {
long s3Size(const char *object_name) {
long size = 0;
cos_pool_t *p = NULL;
cos_pool_t * p = NULL;
int is_cname = 0;
cos_status_t *s = NULL;
cos_status_t * s = NULL;
cos_request_options_t *options = NULL;
cos_string_t bucket;
cos_string_t object;
cos_table_t *resp_headers = NULL;
cos_table_t * resp_headers = NULL;
//创建内存池
cos_pool_create(&p, NULL);

View File

@ -254,5 +254,5 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadPath(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** pathkj);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** pathkj);
#endif

View File

@ -1736,7 +1736,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
return 0;
}
int32_t taskDbGenChkpUploadPath(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path) {
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path) {
STaskDbWrapper* pDb = arg;
UPLOAD_TYPE utype = type;
@ -3363,13 +3363,27 @@ _err:
return code >= 0 ? 0 : -1;
}
int32_t isBkdDataMeta(char* name) {
const char* pCurrent = "CURRENT";
int32_t currLen = strlen(pCurrent);
const char* pManifest = "MANIFEST-";
int32_t maniLen = strlen(pManifest);
if (strlen(name) >= maniLen && strncmp(name, pManifest, maniLen) == 0) {
return 1;
} else if (strlen(name) == currLen && strcmp(name, pCurrent) == 0) {
return 1;
}
return 0;
}
int32_t compareHashTableImpl(SHashObj* p1, SHashObj* p2, SArray* diff) {
int32_t code = 0;
size_t len = 0;
void* pIter = taosHashIterate(p2, NULL);
while (pIter) {
char* name = taosHashGetKey(pIter, &len);
if (!taosHashGet(p1, name, len)) {
if (!isBkdDataMeta(name) && !taosHashGet(p1, name, len)) {
char* p = taosStrdup(name);
taosArrayPush(diff, &p);
}
@ -3431,13 +3445,22 @@ int32_t dbChkpGetDelta(SDbChkp* p, int64_t chkpId, SArray* list) {
continue;
}
}
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) {
char *name = taosHashGetKey(pIter, NULL);
stError("curr file list: %s", name);
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
}
if (p->init == 0) {
void* pIter = taosHashIterate(p->pSstTbl[1 - p->idx], NULL);
while (pIter) {
size_t len;
char* name = taosHashGetKey(pIter, &len);
if (name != NULL && len != 0) {
taosArrayPush(p->pAdd, &name);
if (name != NULL && !isBkdDataMeta(name)) {
char* fname = taosStrdup(name);
taosArrayPush(p->pAdd, &fname);
}
pIter = taosHashIterate(p->pSstTbl[1 - p->idx], pIter);
}
@ -3538,12 +3561,12 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
goto _ERROR;
}
code = taosMkDir(dstDir);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr());
goto _ERROR;
}
// code = taosMkDir(dstDir);
// if (code != 0) {
// terrno = TAOS_SYSTEM_ERROR(errno);
// stError("failed to mkdir srcDir %s, reason: %s", dstDir, terrstr());
// goto _ERROR;
// }
// clear current file
memset(dstBuf, 0, len);
@ -3563,7 +3586,9 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, filename);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, filename);
taosCopyFile(srcBuf, dstBuf);
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
}
}
// del file in $name
for (int i = 0; i < taosArrayGetSize(p->pDel); i++) {
@ -3580,14 +3605,18 @@ int32_t dbChkpDumpTo(SDbChkp* p, char* dname) {
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pCurrent);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pCurrent);
taosCopyFile(srcBuf, dstBuf);
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
}
// copy manifest file to dst dir
memset(srcBuf, 0, len);
memset(dstBuf, 0, len);
sprintf(srcBuf, "%s%s%s", srcDir, TD_DIRSEP, p->pManifest);
sprintf(dstBuf, "%s%s%s", dstDir, TD_DIRSEP, p->pManifest);
taosCopyFile(srcBuf, dstBuf);
if (taosCopyFile(srcBuf, dstBuf) < 0) {
stError("failed to copy file from %s to %s", srcBuf, dstBuf);
}
// clear delta data buf
taosArrayClearP(p->pAdd, taosMemoryFree);

View File

@ -341,12 +341,12 @@ int32_t doUploadChkp(void* param) {
char* path = NULL;
int32_t code = 0;
if ((code = taskDbGenChkpUploadPath(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
(int8_t)(arg->type), &path)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) {
stError("s-task:%s faile to upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId);
}
taosMemoryFree(path);