tsdb/cos: not remove cp files

This commit is contained in:
Minglei Jin 2023-12-11 19:36:06 +08:00
parent 2dbb718101
commit fe824c51e2
3 changed files with 30 additions and 11 deletions

View File

@ -744,9 +744,13 @@ clean:
taosMemoryFree(parts); taosMemoryFree(parts);
} }
*/ */
if (cp.thefile) {
cos_cp_close(cp.thefile);
}
if (cp.parts) { if (cp.parts) {
taosMemoryFree(cp.parts); taosMemoryFree(cp.parts);
} }
if (manager.upload_id) { if (manager.upload_id) {
taosMemoryFree(manager.upload_id); taosMemoryFree(manager.upload_id);
} }

View File

@ -7,7 +7,7 @@
int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) { int32_t cos_cp_open(char const* cp_path, SCheckpoint* checkpoint) {
int32_t code = 0; int32_t code = 0;
TdFilePtr fd = taosOpenFile(cp_path, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); TdFilePtr fd = taosOpenFile(cp_path, TD_FILE_WRITE | TD_FILE_CREATE /* | TD_FILE_TRUNC*/ | TD_FILE_WRITE_THROUGH);
if (!fd) { if (!fd) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
uError("ERROR: %s Failed to open %s", __func__, cp_path); uError("ERROR: %s Failed to open %s", __func__, cp_path);
@ -53,6 +53,11 @@ static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) {
memcpy(cp->md5, item->valuestring, strlen(item->valuestring)); memcpy(cp->md5, item->valuestring, strlen(item->valuestring));
} }
item = cJSON_GetObjectItem(json, "upload_id");
if (cJSON_IsString(item)) {
strncpy(cp->upload_id, item->valuestring, 128);
}
item2 = cJSON_GetObjectItem(json, "file"); item2 = cJSON_GetObjectItem(json, "file");
if (cJSON_IsObject(item2)) { if (cJSON_IsObject(item2)) {
item = cJSON_GetObjectItem(item2, "size"); item = cJSON_GetObjectItem(item2, "size");
@ -111,39 +116,39 @@ static int32_t cos_cp_parse_body(char* cp_body, SCheckpoint* cp) {
cp->part_size = item->valuedouble; cp->part_size = item->valuedouble;
} }
item2 = cJSON_GetObjectItem(json, "parts"); item2 = cJSON_GetObjectItem(item2, "parts");
if (cJSON_IsArray(item2) && cp->part_num > 0) { if (cJSON_IsArray(item2) && cp->part_num > 0) {
cJSON_ArrayForEach(item, item2) { cJSON_ArrayForEach(item, item2) {
cJSON const* item3 = cJSON_GetObjectItem(item, "index"); cJSON const* item3 = cJSON_GetObjectItem(item, "index");
int32_t index = 0; int32_t index = 0;
if (cJSON_IsNumber(item3)) { if (cJSON_IsNumber(item3)) {
index = item->valuedouble; index = item3->valuedouble;
cp->parts[index].index = index; cp->parts[index].index = index;
} }
item3 = cJSON_GetObjectItem(item, "offset"); item3 = cJSON_GetObjectItem(item, "offset");
if (cJSON_IsNumber(item3)) { if (cJSON_IsNumber(item3)) {
cp->parts[index].offset = item->valuedouble; cp->parts[index].offset = item3->valuedouble;
} }
item3 = cJSON_GetObjectItem(item, "size"); item3 = cJSON_GetObjectItem(item, "size");
if (cJSON_IsNumber(item3)) { if (cJSON_IsNumber(item3)) {
cp->parts[index].size = item->valuedouble; cp->parts[index].size = item3->valuedouble;
} }
item3 = cJSON_GetObjectItem(item, "completed"); item3 = cJSON_GetObjectItem(item, "completed");
if (cJSON_IsNumber(item3)) { if (cJSON_IsNumber(item3)) {
cp->parts[index].completed = item->valuedouble; cp->parts[index].completed = item3->valuedouble;
} }
item3 = cJSON_GetObjectItem(item, "crc64"); item3 = cJSON_GetObjectItem(item, "crc64");
if (cJSON_IsNumber(item3)) { if (cJSON_IsNumber(item3)) {
cp->parts[index].crc64 = item->valuedouble; cp->parts[index].crc64 = item3->valuedouble;
} }
item3 = cJSON_GetObjectItem(item, "etag"); item3 = cJSON_GetObjectItem(item, "etag");
if (cJSON_IsString(item)) { if (cJSON_IsString(item3)) {
strncpy(cp->parts[index].etag, item->valuestring, 128); strncpy(cp->parts[index].etag, item3->valuestring, 128);
} }
} }
} }
@ -214,6 +219,10 @@ static int32_t cos_cp_save_json(cJSON const* json, SCheckpoint* checkpoint) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _exit; goto _exit;
} }
if (taosLSeekFile(fp, 0, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _exit;
}
if (taosWriteFile(fp, data, strlen(data)) < 0) { if (taosWriteFile(fp, data, strlen(data)) < 0) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
goto _exit; goto _exit;
@ -252,6 +261,11 @@ int32_t cos_cp_dump(SCheckpoint* cp) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
if (NULL == cJSON_AddStringToObject(json, "upload_id", cp->upload_id)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (COS_CP_TYPE_UPLOAD == cp->cp_type) { if (COS_CP_TYPE_UPLOAD == cp->cp_type) {
ojson = cJSON_AddObjectToObject(json, "file"); ojson = cJSON_AddObjectToObject(json, "file");
if (!ojson) { if (!ojson) {

View File

@ -529,7 +529,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) { for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
if (taosIsDir(file->aname)) continue; if (taosIsDir(file->aname)) continue;
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) { if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL &&
strncmp(file->aname + strlen(file->aname) - 3, ".cp", 3)) {
int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs); int32_t nlevel = tfsGetLevel(fs->tsdb->pVnode->pTfs);
remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1); remove_file(file->aname, nlevel > 1 && file->did.level == nlevel - 1);
} }