more code

This commit is contained in:
Hongze Cheng 2023-04-10 17:52:51 +08:00
parent a0039fe271
commit 6bc0d17605
5 changed files with 254 additions and 55 deletions

View File

@ -155,7 +155,7 @@ static int32_t commit_delete_data(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
ASSERTS(0, "not implemented yet"); // ASSERTS(0, "TODO: Not implemented yet");
int64_t nDel = 0; int64_t nDel = 0;
SMemTable *pMem = pCommitter->pTsdb->imem; SMemTable *pMem = pCommitter->pTsdb->imem;
@ -284,7 +284,9 @@ static int32_t close_committer(SCommitter *pCommiter, int32_t eno) {
int32_t code = 0; int32_t code = 0;
int32_t lino; int32_t lino;
// code = tsdbFSBegin(pCommiter->pTsdb, pCommiter->aFileOp); code = tsdbFileSystemEditBegin(pCommiter->pTsdb->pFS, //
pCommiter->aFileOp, //
TSDB_FS_EDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
@ -329,57 +331,77 @@ int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) {
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", //
TD_VID(pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} else { } else {
tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, TD_VID(pTsdb->pVnode), __func__, pMem->nRow, tsdbInfo("vgId:%d %s done, nRow:%" PRId64 " nDel:%" PRId64, //
TD_VID(pTsdb->pVnode), //
__func__, //
pMem->nRow, //
pMem->nDel); pMem->nDel);
} }
return code; return code;
} }
// int32_t tsdbCommitCommit(STsdb *pTsdb) { int32_t tsdbCommitCommit(STsdb *pTsdb) {
// int32_t code = 0; int32_t code = 0;
// int32_t lino = 0; int32_t lino = 0;
// SMemTable *pMemTable = pTsdb->imem; SMemTable *pMemTable = pTsdb->imem;
// // lock // lock
// taosThreadRwlockWrlock(&pTsdb->rwLock); taosThreadRwlockWrlock(&pTsdb->rwLock);
// code = tsdbFSCommit(pTsdb); code = tsdbFileSystemEditCommit(pTsdb->pFS, //
// if (code) { TSDB_FS_EDIT_COMMIT);
// taosThreadRwlockUnlock(&pTsdb->rwLock); if (code) {
// TSDB_CHECK_CODE(code, lino, _exit); taosThreadRwlockUnlock(&pTsdb->rwLock);
// } TSDB_CHECK_CODE(code, lino, _exit);
}
// pTsdb->imem = NULL; pTsdb->imem = NULL;
// // unlock // unlock
// taosThreadRwlockUnlock(&pTsdb->rwLock); taosThreadRwlockUnlock(&pTsdb->rwLock);
// if (pMemTable) { if (pMemTable) {
// tsdbUnrefMemTable(pMemTable, NULL, true); tsdbUnrefMemTable(pMemTable, NULL, true);
// } }
// _exit: _exit:
// if (code) { if (code) {
// tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", //
// } else { TD_VID(pTsdb->pVnode), //
// tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode)); __func__, //
// } lino, //
// return code; tstrerror(code));
// } } else {
tsdbInfo("vgId:%d %s done", //
TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
// int32_t tsdbCommitAbort(STsdb *pTsdb) { int32_t tsdbCommitAbort(STsdb *pTsdb) {
// int32_t code = 0; int32_t code = 0;
// int32_t lino = 0; int32_t lino = 0;
// code = tsdbFSRollback(pTsdb); code = tsdbFileSystemEditAbort(pTsdb->pFS, //
// TSDB_CHECK_CODE(code, lino, _exit); TSDB_FS_EDIT_COMMIT);
TSDB_CHECK_CODE(code, lino, _exit);
// _exit: _exit:
// if (code) { if (code) {
// tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d, %s failed at line %d since %s", //
// } else { TD_VID(pTsdb->pVnode), //
// tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode)); __func__, //
// } lino, //
// return code; tstrerror(code));
// } } else {
tsdbInfo("vgId:%d %s done", //
TD_VID(pTsdb->pVnode), //
__func__);
}
return code;
}

View File

@ -26,7 +26,7 @@ static int32_t create_file_system(STsdb *pTsdb, struct STFileSystem **ppFS) {
} }
ppFS[0]->pTsdb = pTsdb; ppFS[0]->pTsdb = pTsdb;
tsem_init(&ppFS[0]->can_edit, 0, 1); tsem_init(&ppFS[0]->canEdit, 0, 1);
return 0; return 0;
} }
@ -34,7 +34,7 @@ static int32_t create_file_system(STsdb *pTsdb, struct STFileSystem **ppFS) {
static int32_t destroy_file_system(struct STFileSystem **ppFS) { static int32_t destroy_file_system(struct STFileSystem **ppFS) {
if (ppFS[0]) { if (ppFS[0]) {
taosArrayDestroy(ppFS[0]->aFileSet); taosArrayDestroy(ppFS[0]->aFileSet);
tsem_destroy(&ppFS[0]->can_edit); tsem_destroy(&ppFS[0]->canEdit);
taosMemoryFree(ppFS[0]); taosMemoryFree(ppFS[0]);
ppFS[0] = NULL; ppFS[0] = NULL;
} }
@ -42,27 +42,193 @@ static int32_t destroy_file_system(struct STFileSystem **ppFS) {
} }
static int32_t get_current_json(STsdb *pTsdb, char fname[]) { static int32_t get_current_json(STsdb *pTsdb, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json"); if (pTsdb->pVnode->pTfs) {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s%s%s", //
tfsGetPrimaryPath(pTsdb->pVnode->pTfs), //
TD_DIRSEP, //
pTsdb->path, //
TD_DIRSEP, //
"current.json");
} else {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s", //
pTsdb->path, //
TD_DIRSEP, //
"current.json");
}
return 0; return 0;
} }
static int32_t get_current_temp(STsdb *pTsdb, char fname[], tsdb_fs_edit_t etype) { static int32_t get_current_temp(STsdb *pTsdb, char fname[], tsdb_fs_edit_t etype) {
switch (etype) { switch (etype) {
case TSDB_FS_EDIT_COMMIT: case TSDB_FS_EDIT_COMMIT:
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json.commit"); if (pTsdb->pVnode->pTfs) {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s%s%s", //
tfsGetPrimaryPath(pTsdb->pVnode->pTfs), //
TD_DIRSEP, //
pTsdb->path, //
TD_DIRSEP, //
"current.json.commit");
} else {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s", //
pTsdb->path, //
TD_DIRSEP, //
"current.json.commit");
}
break; break;
default: default:
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", pTsdb->path, TD_DIRSEP, "current.json.t"); if (pTsdb->pVnode->pTfs) {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s%s%s", //
tfsGetPrimaryPath(pTsdb->pVnode->pTfs), //
TD_DIRSEP, //
pTsdb->path, //
TD_DIRSEP, //
"current.json.t");
} else {
snprintf(fname, //
TSDB_FILENAME_LEN, //
"%s%s%s", //
pTsdb->path, //
TD_DIRSEP, //
"current.json.t");
}
break; break;
} }
return 0; return 0;
} }
static int32_t fs_to_json_str(struct STFileSystem *pFS, char **ppData) {
int32_t code = 0;
int32_t lino;
cJSON *pJson = cJSON_CreateObject();
if (pJson == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
/* format version */
TSDB_CHECK_NULL( //
cJSON_AddNumberToObject(pJson, //
"format", //
1 /* TODO */),
code, //
lino, //
_exit, //
TSDB_CODE_OUT_OF_MEMORY);
/* next edit id */
TSDB_CHECK_NULL( //
cJSON_AddNumberToObject(pJson, //
"next edit id", //
pFS->nextEditId),
code, //
lino, //
_exit, //
TSDB_CODE_OUT_OF_MEMORY);
/* file sets */
cJSON *aFileSetJson;
TSDB_CHECK_NULL( //
aFileSetJson = cJSON_AddArrayToObject(pJson, "file sets"), //
code, //
lino, //
_exit, //
TSDB_CODE_OUT_OF_MEMORY);
for (int32_t i = 0; i < taosArrayGetSize(pFS->aFileSet); i++) {
struct SFileSet *pFileSet = taosArrayGet(pFS->aFileSet, i);
code = tsdbFileSetToJson(aFileSetJson, pFileSet);
TSDB_CHECK_CODE(code, lino, _exit);
}
ppData[0] = cJSON_Print(pJson);
if (ppData[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
cJSON_Delete(pJson);
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", //
TD_VID(pFS->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
}
return code;
}
static int32_t fs_from_json_str(const char *pData, struct STFileSystem *pFS) {
int32_t code = 0;
int32_t lino;
ASSERTS(0, "TODO: Not implemented yet");
_exit:
return code;
}
static int32_t save_fs_to_file(struct STFileSystem *pFS, const char *fname) { static int32_t save_fs_to_file(struct STFileSystem *pFS, const char *fname) {
cJSON *pJson = NULL; int32_t code = 0;
ASSERTS(0, "TODO: Not implemented yet"); int32_t lino;
return 0; char *pData = NULL;
// to json string
code = fs_to_json_str(pFS, &pData);
TSDB_CHECK_CODE(code, lino, _exit);
TdFilePtr fd = taosOpenFile(fname, //
TD_FILE_WRITE //
| TD_FILE_CREATE //
| TD_FILE_TRUNC);
if (fd == NULL) {
code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t n = taosWriteFile(fd, pData, strlen(pData) + 1);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(code);
taosCloseFile(&fd);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (taosFsyncFile(fd) < 0) {
code = TAOS_SYSTEM_ERROR(code);
taosCloseFile(&fd);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosCloseFile(&fd);
_exit:
if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", //
TD_VID(pFS->pTsdb->pVnode), //
__func__, //
lino, //
tstrerror(code));
} else {
tsdbDebug("vgId:%d %s success", //
TD_VID(pFS->pTsdb->pVnode), //
__func__);
}
if (pData) {
taosMemoryFree(pData);
}
return code;
} }
static int32_t load_fs_from_file(const char *fname, struct STFileSystem *pFS) { static int32_t load_fs_from_file(const char *fname, struct STFileSystem *pFS) {
@ -231,7 +397,7 @@ int32_t tsdbFileSystemEditBegin(struct STFileSystem *pFS, const SArray *aFileOp,
get_current_temp(pFS->pTsdb, fname, etype); get_current_temp(pFS->pTsdb, fname, etype);
tsem_wait(&pFS->can_edit); tsem_wait(&pFS->canEdit);
code = write_fs_to_file(pFS, fname); code = write_fs_to_file(pFS, fname);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -254,7 +420,7 @@ _exit:
int32_t tsdbFileSystemEditCommit(struct STFileSystem *pFS, tsdb_fs_edit_t etype) { int32_t tsdbFileSystemEditCommit(struct STFileSystem *pFS, tsdb_fs_edit_t etype) {
int32_t code = commit_edit(pFS, etype); int32_t code = commit_edit(pFS, etype);
tsem_post(&pFS->can_edit); tsem_post(&pFS->canEdit);
if (code) { if (code) {
tsdbError("vgId:%d %s failed since %s", // tsdbError("vgId:%d %s failed since %s", //
TD_VID(pFS->pTsdb->pVnode), // TD_VID(pFS->pTsdb->pVnode), //
@ -279,6 +445,6 @@ int32_t tsdbFileSystemEditAbort(struct STFileSystem *pFS, tsdb_fs_edit_t etype)
etype); etype);
} else { } else {
} }
tsem_post(&pFS->can_edit); tsem_post(&pFS->canEdit);
return code; return code;
} }

View File

@ -25,8 +25,8 @@ extern "C" {
/* Exposed Handle */ /* Exposed Handle */
struct STFileSystem { struct STFileSystem {
STsdb *pTsdb; STsdb *pTsdb;
tsem_t can_edit; tsem_t canEdit;
int64_t eidt_id; int64_t nextEditId;
SArray *aFileSet; // SArray<struct SFileSet> SArray *aFileSet; // SArray<struct SFileSet>
}; };

View File

@ -14,3 +14,12 @@
*/ */
#include "dev.h" #include "dev.h"
int32_t tsdbFileSetToJson(SJson *pJson, const struct SFileSet *pSet) {
int32_t code = 0;
ASSERTS(0, "TODO: Not implemented yet");
_exit:
return code;
}

View File

@ -52,6 +52,8 @@ struct SFileSet {
} lStt[TSDB_STT_FILE_LEVEL_MAX]; } lStt[TSDB_STT_FILE_LEVEL_MAX];
}; };
int32_t tsdbFileSetToJson(SJson *pJson, const struct SFileSet *pSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif