This commit is contained in:
Hongze Cheng 2020-06-18 06:27:08 +00:00
parent 05bc6db540
commit 65552dc480
1 changed files with 65 additions and 9 deletions

View File

@ -65,6 +65,8 @@ static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables); static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
static int keyFGroupCompFunc(const void *key, const void *fgroup); static int keyFGroupCompFunc(const void *key, const void *fgroup);
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
// Function declaration // Function declaration
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
@ -141,8 +143,10 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
STsdbRepo *pRepo = (STsdbRepo *)repo; STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbAsyncCommit(pRepo); if (toCommit) {
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); tsdbAsyncCommit(pRepo);
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL);
}
tsdbCloseFileH(pRepo); tsdbCloseFileH(pRepo);
tsdbCloseBufPool(pRepo); tsdbCloseBufPool(pRepo);
@ -179,7 +183,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
// STsdbMeta *pMeta = pRepo->tsdbMeta; // STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbFileH *pFileH = pRepo->tsdbFileH;
uint32_t magic = 0; uint32_t magic = 0;
char *fname = NULL; char * fname = NULL;
struct stat fState; struct stat fState;
@ -513,6 +517,8 @@ static int32_t tsdbUnsetRepoEnv(char *rootDir) {
static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
int fd = -1; int fd = -1;
char *fname = NULL; char *fname = NULL;
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
char *pBuf = buf;
fname = tsdbGetCfgFname(rootDir); fname = tsdbGetCfgFname(rootDir);
if (fname == NULL) { if (fname == NULL) {
@ -527,8 +533,13 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) {
goto _err; goto _err;
} }
if (twrite(fd, (void *)pCfg, sizeof(STsdbCfg)) < sizeof(STsdbCfg)) { int tlen = tsdbEncodeCfg((void *)(&pBuf), pCfg);
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, sizeof(STsdbCfg), fname, ASSERT(tlen + sizeof(TSCKSUM) <= TSDB_FILE_HEAD_SIZE);
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
if (twrite(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", pCfg->tsdbId, TSDB_FILE_HEAD_SIZE, fname,
strerror(errno)); strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
@ -553,6 +564,7 @@ _err:
static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) { static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
char *fname = NULL; char *fname = NULL;
int fd = -1; int fd = -1;
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
fname = tsdbGetCfgFname(rootDir); fname = tsdbGetCfgFname(rootDir);
if (fname == NULL) { if (fname == NULL) {
@ -567,12 +579,20 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg) {
goto _err; goto _err;
} }
if (tread(fd, (void *)pCfg, sizeof(*pCfg)) < sizeof(*pCfg)) { if (tread(fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("failed to read %d bytes from file %s since %s", sizeof(*pCfg), fname, strerror(errno)); tsdbError("failed to read %d bytes from file %s since %s", TSDB_FILE_HEAD_SIZE, fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
tsdbError("file %s is corrupted", fname);
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
goto _err;
}
tsdbDecodeCfg(buf, pCfg);
tfree(fname); tfree(fname);
close(fd); close(fd);
@ -596,7 +616,6 @@ static char *tsdbGetCfgFname(char *rootDir) {
return fname; return fname;
} }
static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(STsdbRepo)); STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(STsdbRepo));
if (pRepo == NULL) { if (pRepo == NULL) {
@ -888,6 +907,42 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) {
} }
} }
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
int tlen = 0;
tlen += taosEncodeVariantI32(buf, pCfg->tsdbId);
tlen += taosEncodeFixedI32(buf, pCfg->cacheBlockSize);
tlen += taosEncodeVariantI32(buf, pCfg->totalBlocks);
tlen += taosEncodeVariantI32(buf, pCfg->maxTables);
tlen += taosEncodeVariantI32(buf, pCfg->daysPerFile);
tlen += taosEncodeVariantI32(buf, pCfg->keep);
tlen += taosEncodeVariantI32(buf, pCfg->keep1);
tlen += taosEncodeVariantI32(buf, pCfg->keep2);
tlen += taosEncodeVariantI32(buf, pCfg->minRowsPerFileBlock);
tlen += taosEncodeVariantI32(buf, pCfg->maxRowsPerFileBlock);
tlen += taosEncodeFixedI8(buf, pCfg->precision);
tlen += taosEncodeFixedI8(buf, pCfg->compression);
return tlen;
}
static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf = taosDecodeVariantI32(buf, &(pCfg->tsdbId));
buf = taosDecodeFixedI32(buf, &(pCfg->cacheBlockSize));
buf = taosDecodeVariantI32(buf, &(pCfg->totalBlocks));
buf = taosDecodeVariantI32(buf, &(pCfg->maxTables));
buf = taosDecodeVariantI32(buf, &(pCfg->daysPerFile));
buf = taosDecodeVariantI32(buf, &(pCfg->keep));
buf = taosDecodeVariantI32(buf, &(pCfg->keep1));
buf = taosDecodeVariantI32(buf, &(pCfg->keep2));
buf = taosDecodeVariantI32(buf, &(pCfg->minRowsPerFileBlock));
buf = taosDecodeVariantI32(buf, &(pCfg->maxRowsPerFileBlock));
buf = taosDecodeFixedI8(buf, &(pCfg->precision));
buf = taosDecodeFixedI8(buf, &(pCfg->compression));
return buf;
}
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
// TODO // TODO
// STsdbCache *pCache = pRepo->tsdbCache; // STsdbCache *pCache = pRepo->tsdbCache;
@ -915,7 +970,8 @@ static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
// pRepo->config.totalBlocks = totalBlocks; // pRepo->config.totalBlocks = totalBlocks;
// tsdbUnLockRepo((TsdbRepoT *)pRepo); // tsdbUnLockRepo((TsdbRepoT *)pRepo);
// tsdbTrace("vgId:%d, tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks, totalBlocks); // tsdbTrace("vgId:%d, tsdb total cache blocks changed from %d to %d", pRepo->config.tsdbId, oldNumOfBlocks,
// totalBlocks);
return 0; return 0;
} }