homework-jianmu/source/dnode/vnode/src/tsdb/tsdbMain.c

1145 lines
34 KiB
C

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbDef.h"
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta, STfs *pTfs);
static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
STfs *pTfs) {
STsdb *pTsdb = NULL;
// Set default TSDB Options
// if (pTsdbCfg == NULL) {
pTsdbCfg = &defautlTsdbOptions;
// }
// Validate the options
if (tsdbValidateOptions(pTsdbCfg) < 0) {
// TODO: handle error
return NULL;
}
// Create the handle
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta, pTfs);
if (pTsdb == NULL) {
// TODO: handle error
return NULL;
}
taosMkDir(path);
// Open the TSDB
if (tsdbOpenImpl(pTsdb) < 0) {
// TODO: handle error
return NULL;
}
return pTsdb;
}
void tsdbClose(STsdb *pTsdb) {
if (pTsdb) {
tsdbCloseImpl(pTsdb);
tsdbFree(pTsdb);
}
}
void tsdbRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta, STfs *pTfs) {
STsdb *pTsdb = NULL;
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb));
if (pTsdb == NULL) {
// TODO: handle error
return NULL;
}
pTsdb->path = strdup(path);
pTsdb->vgId = vgId;
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
pTsdb->pmaf = pMAF;
pTsdb->pMeta = pMeta;
pTsdb->pTfs = pTfs;
pTsdb->fs = tsdbNewFS(pTsdbCfg);
return pTsdb;
}
static void tsdbFree(STsdb *pTsdb) {
if (pTsdb) {
tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb));
tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb));
tsdbFreeFS(pTsdb->fs);
taosMemoryFreeClear(pTsdb->path);
taosMemoryFree(pTsdb);
}
}
static int tsdbOpenImpl(STsdb *pTsdb) {
tsdbOpenFS(pTsdb);
tsdbInitSma(pTsdb);
// TODO
return 0;
}
static void tsdbCloseImpl(STsdb *pTsdb) {
tsdbCloseFS(pTsdb);
// TODO
}
int tsdbLockRepo(STsdb *pTsdb) {
int code = taosThreadMutexLock(&pTsdb->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pTsdb), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pTsdb->repoLocked = true;
return 0;
}
int tsdbUnlockRepo(STsdb *pTsdb) {
ASSERT(IS_REPO_LOCKED(pTsdb));
pTsdb->repoLocked = false;
int code = taosThreadMutexUnlock(&pTsdb->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pTsdb), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
#if 0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// no test file errors here
#include "taosdef.h"
#include "tsdbint.h"
#include "tthread.h"
#include "ttimer.h"
#define IS_VALID_PRECISION(precision) \
(((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO))
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
static void tsdbFreeRepo(STsdbRepo *pRepo);
static void tsdbStartStream(STsdbRepo *pRepo);
static void tsdbStopStream(STsdbRepo *pRepo);
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh);
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx);
// Function declaration
int32_t tsdbCreateRepo(int repoid) {
char tsdbDir[TSDB_FILENAME_LEN] = "\0";
char dataDir[TSDB_FILENAME_LEN] = "\0";
tsdbGetRootDir(repoid, tsdbDir);
if (tfsMkdir(tsdbDir) < 0) {
goto _err;
}
tsdbGetDataDir(repoid, dataDir);
if (tfsMkdir(dataDir) < 0) {
goto _err;
}
// TODO: need to create current file with nothing in
return 0;
_err:
tsdbError("vgId:%d failed to create TSDB repository since %s", repoid, tstrerror(terrno));
return -1;
}
int32_t tsdbDropRepo(int repoid) {
char tsdbDir[TSDB_FILENAME_LEN] = "\0";
tsdbGetRootDir(repoid, tsdbDir);
return tfsRmdir(tsdbDir);
}
STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdbRepo *pRepo;
STsdbCfg config = *pCfg;
terrno = TSDB_CODE_SUCCESS;
// Check and set default configurations
if (tsdbCheckAndSetDefaultCfg(&config) < 0) {
tsdbError("vgId:%d failed to open TSDB repository since %s", config.tsdbId, tstrerror(terrno));
return NULL;
}
// Create new TSDB object
if ((pRepo = tsdbNewRepo(&config, pAppH)) == NULL) {
tsdbError("vgId:%d failed to open TSDB repository while creating TSDB object since %s", config.tsdbId,
tstrerror(terrno));
return NULL;
}
// Open meta
if (tsdbOpenMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to open TSDB repository while opening Meta since %s", config.tsdbId, tstrerror(terrno));
tsdbCloseRepo(pRepo, false);
return NULL;
}
if (tsdbOpenBufPool(pRepo) < 0) {
tsdbError("vgId:%d failed to open TSDB repository while opening buffer pool since %s", config.tsdbId,
tstrerror(terrno));
tsdbCloseRepo(pRepo, false);
return NULL;
}
if (tsdbOpenFS(pRepo) < 0) {
tsdbError("vgId:%d failed to open TSDB repository while opening FS since %s", config.tsdbId, tstrerror(terrno));
tsdbCloseRepo(pRepo, false);
return NULL;
}
// TODO: Restore information from data
if ((!(pRepo->state & TSDB_STATE_BAD_DATA)) && tsdbRestoreInfo(pRepo) < 0) {
tsdbError("vgId:%d failed to open TSDB repository while restore info since %s", config.tsdbId, tstrerror(terrno));
tsdbCloseRepo(pRepo, false);
return NULL;
}
pRepo->mergeBuf = NULL;
tsdbStartStream(pRepo);
tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo));
return pRepo;
}
// Note: all working thread and query thread must stopped when calling this function
int tsdbCloseRepo(STsdbRepo *repo, int toCommit) {
if (repo == NULL) return 0;
STsdbRepo *pRepo = repo;
int vgId = REPO_ID(pRepo);
terrno = TSDB_CODE_SUCCESS;
tsdbStopStream(pRepo);
if(pRepo->pthread){
taosDestoryThread(pRepo->pthread);
pRepo->pthread = NULL;
}
if (toCommit) {
tsdbSyncCommit(repo);
}
tsem_wait(&(pRepo->readyToCommit));
tsdbUnRefMemTable(pRepo, pRepo->mem);
tsdbUnRefMemTable(pRepo, pRepo->imem);
pRepo->mem = NULL;
pRepo->imem = NULL;
tsdbCloseFS(pRepo);
tsdbCloseBufPool(pRepo);
tsdbCloseMeta(pRepo);
tsdbFreeRepo(pRepo);
tsdbDebug("vgId:%d repository is closed", vgId);
if (terrno != TSDB_CODE_SUCCESS) {
return -1;
} else {
return 0;
}
}
STsdbCfg *tsdbGetCfg(const STsdbRepo *repo) {
ASSERT(repo != NULL);
return &((STsdbRepo *)repo)->config;
}
int tsdbLockRepo(STsdbRepo *pRepo) {
int code = taosThreadMutexLock(&pRepo->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
pRepo->repoLocked = true;
return 0;
}
int tsdbUnlockRepo(STsdbRepo *pRepo) {
ASSERT(IS_REPO_LOCKED(pRepo));
pRepo->repoLocked = false;
int code = taosThreadMutexUnlock(&pRepo->mutex);
if (code != 0) {
tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
return 0;
}
int tsdbCheckCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->mem != NULL);
STsdbCfg *pCfg = &(pRepo->config);
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
ASSERT(pBufBlock != NULL);
if ((pRepo->mem->extraBuffList != NULL) ||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
// trigger commit
if (tsdbAsyncCommit(pRepo) < 0) return -1;
}
return 0;
}
STsdbMeta *tsdbGetMeta(STsdbRepo *pRepo) { return pRepo->tsdbMeta; }
STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; }
int tsdbGetState(STsdbRepo *repo) { return repo->state; }
int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL);
STsdbRepo *pRepo = repo;
*totalPoints = pRepo->stat.pointsWritten;
*totalStorage = pRepo->stat.totalStorage;
*compStorage = pRepo->stat.compStorage;
}
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) {
// TODO: think about multithread cases
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
STsdbCfg * pRCfg = &repo->config;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
bool configChanged = false;
if (pRCfg->compression != pCfg->compression) {
configChanged = true;
}
if (pRCfg->keep != pCfg->keep) {
configChanged = true;
}
if (pRCfg->keep1 != pCfg->keep1) {
configChanged = true;
}
if (pRCfg->keep2 != pCfg->keep2) {
configChanged = true;
}
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
configChanged = true;
}
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
configChanged = true;
}
if (!configChanged) {
tsdbError("vgId:%d no config changed", REPO_ID(repo));
}
int code = taosThreadMutexLock(&repo->save_mutex);
if (code != 0) {
tsdbError("vgId:%d failed to lock tsdb save config mutex since %s", REPO_ID(repo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
}
STsdbCfg * pSaveCfg = &repo->save_config;
*pSaveCfg = repo->config;
pSaveCfg->compression = pCfg->compression;
pSaveCfg->keep = pCfg->keep;
pSaveCfg->keep1 = pCfg->keep1;
pSaveCfg->keep2 = pCfg->keep2;
pSaveCfg->cacheLastRow = pCfg->cacheLastRow;
pSaveCfg->totalBlocks = pCfg->totalBlocks;
tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
REPO_ID(repo),
pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2,
pRCfg->cacheLastRow, pRCfg->totalBlocks);
tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)",
REPO_ID(repo),
pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2,
pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks);
repo->config_changed = true;
taosThreadMutexUnlock(&repo->save_mutex);
// schedule a commit msg and wait for the new config applied
tsdbSyncCommitConfig(repo);
return 0;
#if 0
STsdbRepo *pRepo = (STsdbRepo *)repo;
STsdbCfg config = pRepo->config;
STsdbCfg * pRCfg = &pRepo->config;
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1;
ASSERT(pRCfg->tsdbId == pCfg->tsdbId);
ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize);
ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile);
ASSERT(pRCfg->minRowsPerFileBlock == pCfg->minRowsPerFileBlock);
ASSERT(pRCfg->maxRowsPerFileBlock == pCfg->maxRowsPerFileBlock);
ASSERT(pRCfg->precision == pCfg->precision);
bool configChanged = false;
if (pRCfg->compression != pCfg->compression) {
tsdbAlterCompression(pRepo, pCfg->compression);
config.compression = pCfg->compression;
configChanged = true;
}
if (pRCfg->keep != pCfg->keep) {
if (tsdbAlterKeep(pRepo, pCfg->keep) < 0) {
tsdbError("vgId:%d failed to configure repo when alter keep since %s", REPO_ID(pRepo), tstrerror(terrno));
config.keep = pCfg->keep;
return -1;
}
configChanged = true;
}
if (pRCfg->totalBlocks != pCfg->totalBlocks) {
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
config.totalBlocks = pCfg->totalBlocks;
configChanged = true;
}
if (pRCfg->cacheLastRow != pCfg->cacheLastRow) {
config.cacheLastRow = pCfg->cacheLastRow;
configChanged = true;
}
if (configChanged) {
if (tsdbSaveConfig(pRepo->rootDir, &config) < 0) {
tsdbError("vgId:%d failed to configure repository while save config since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
}
return 0;
#endif
}
uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
// TODO
return 0;
#if 0
STsdbRepo *pRepo = (STsdbRepo *)repo;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
uint32_t magic = 0;
char * fname = NULL;
struct stat fState;
tsdbDebug("vgId:%d name:%s index:%d eindex:%d", pRepo->config.tsdbId, name, *index, eindex);
ASSERT(*index <= eindex);
if (name[0] == 0) { // get the file from index or after, but not larger than eindex
int fid = (*index) / TSDB_FILE_TYPE_MAX;
if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) {
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
fname = tsdbGetMetaFileName(pRepo->rootDir);
*index = TSDB_META_FILE_INDEX;
magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta);
sprintf(name, "tsdb/%s", TSDB_META_FILE_NAME);
} else {
return 0;
}
} else {
SFileGroup *pFGroup =
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
if (pFGroup->fileId == fid) {
SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX];
fname = strdup(TSDB_FILE_NAME(pFile));
magic = pFile->info.magic;
char *tfname = strdup(fname);
sprintf(name, "tsdb/%s/%s", TSDB_DATA_DIR_NAME, basename(tfname));
taosMemoryFreeClear(tfname);
} else {
if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < (int)eindex) {
SFile *pFile = &pFGroup->files[0];
fname = strdup(TSDB_FILE_NAME(pFile));
*index = pFGroup->fileId * TSDB_FILE_TYPE_MAX;
magic = pFile->info.magic;
char *tfname = strdup(fname);
sprintf(name, "tsdb/%s/%s", TSDB_DATA_DIR_NAME, basename(tfname));
taosMemoryFreeClear(tfname);
} else {
return 0;
}
}
}
} else { // get the named file at the specified index. If not there, return 0
fname = taosMemoryMalloc(256);
sprintf(fname, "%s/vnode/vnode%d/%s", tfsGetPrimaryPath(pRepo->pTfs), REPO_ID(pRepo), name);
if (access(fname, F_OK) != 0) {
taosMemoryFreeClear(fname);
return 0;
}
if (*index == TSDB_META_FILE_INDEX) { // get meta file
tsdbGetStoreInfo(fname, &magic, size);
} else {
char tfname[TSDB_FILENAME_LEN] = "\0";
sprintf(tfname, "vnode/vnode%d/tsdb/%s/%s", REPO_ID(pRepo), TSDB_DATA_DIR_NAME, basename(name));
tsdbGetFileInfoImpl(tfname, &magic, size);
}
taosMemoryFreeClear(fname);
return magic;
}
if (stat(fname, &fState) < 0) {
taosMemoryFreeClear(fname);
return 0;
}
*size = fState.st_size;
// magic = *size;
taosMemoryFreeClear(fname);
return magic;
#endif
}
void tsdbGetRootDir(int repoid, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid);
}
void tsdbGetDataDir(int repoid, char dirName[]) {
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid);
}
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
// Check tsdbId
if (pCfg->tsdbId < 0) {
tsdbError("vgId:%d invalid vgroup ID", pCfg->tsdbId);
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
// Check precision
if (pCfg->precision == -1) {
pCfg->precision = TSDB_DEFAULT_PRECISION;
} else {
if (!IS_VALID_PRECISION(pCfg->precision)) {
tsdbError("vgId:%d invalid precision configuration %d", pCfg->tsdbId, pCfg->precision);
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
}
// Check compression
if (pCfg->compression == -1) {
pCfg->compression = TSDB_DEFAULT_COMPRESSION;
} else {
if (!IS_VALID_COMPRESSION(pCfg->compression)) {
tsdbError("vgId:%d invalid compression configuration %d", pCfg->tsdbId, pCfg->precision);
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
}
// Check daysPerFile
if (pCfg->daysPerFile == -1) {
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
} else {
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) {
tsdbError(
"vgId:%d invalid daysPerFile configuration! daysPerFile %d TSDB_MIN_DAYS_PER_FILE %d TSDB_MAX_DAYS_PER_FILE "
"%d",
pCfg->tsdbId, pCfg->daysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
}
// Check minRowsPerFileBlock and maxRowsPerFileBlock
if (pCfg->minRowsPerFileBlock == -1) {
pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK;
} else {
if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK) {
tsdbError(
"vgId:%d invalid minRowsPerFileBlock configuration! minRowsPerFileBlock %d TSDB_MIN_MIN_ROW_FBLOCK %d "
"TSDB_MAX_MIN_ROW_FBLOCK %d",
pCfg->tsdbId, pCfg->minRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
}
if (pCfg->maxRowsPerFileBlock == -1) {
pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK;
} else {
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) {
tsdbError(
"vgId:%d invalid maxRowsPerFileBlock configuration! maxRowsPerFileBlock %d TSDB_MIN_MAX_ROW_FBLOCK %d "
"TSDB_MAX_MAX_ROW_FBLOCK %d",
pCfg->tsdbId, pCfg->maxRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
}
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) {
tsdbError("vgId:%d invalid configuration! minRowsPerFileBlock %d maxRowsPerFileBlock %d", pCfg->tsdbId,
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock);
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
// Check keep
if (pCfg->keep == -1) {
pCfg->keep = TSDB_DEFAULT_KEEP;
} else {
if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) {
tsdbError(
"vgId:%d invalid keep configuration! keep %d TSDB_MIN_KEEP %d "
"TSDB_MAX_KEEP %d",
pCfg->tsdbId, pCfg->keep, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
terrno = TSDB_CODE_TDB_INVALID_CONFIG;
return -1;
}
}
if (pCfg->keep1 == 0) {
pCfg->keep1 = pCfg->keep;
}
if (pCfg->keep2 == 0) {
pCfg->keep2 = pCfg->keep;
}
// update check
if (pCfg->update < TD_ROW_DISCARD_UPDATE || pCfg->update > TD_ROW_PARTIAL_UPDATE)
pCfg->update = TD_ROW_DISCARD_UPDATE;
// update cacheLastRow
if (pCfg->cacheLastRow != 0) {
if (pCfg->cacheLastRow > 3)
pCfg->cacheLastRow = 1;
}
return 0;
}
static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
STsdbRepo *pRepo = (STsdbRepo *)taosMemoryCalloc(1, sizeof(*pRepo));
if (pRepo == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS;
pRepo->compactState = 0;
pRepo->config = *pCfg;
if (pAppH) {
pRepo->appH = *pAppH;
}
pRepo->repoLocked = false;
pRepo->pthread = NULL;
int code = taosThreadMutexInit(&(pRepo->mutex), NULL);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
tsdbFreeRepo(pRepo);
return NULL;
}
code = taosThreadMutexInit(&(pRepo->save_mutex), NULL);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
tsdbFreeRepo(pRepo);
return NULL;
}
pRepo->config_changed = false;
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
code = tsem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) {
code = errno;
terrno = TAOS_SYSTEM_ERROR(code);
tsdbFreeRepo(pRepo);
return NULL;
}
pRepo->tsdbMeta = tsdbNewMeta(pCfg);
if (pRepo->tsdbMeta == NULL) {
tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbFreeRepo(pRepo);
return NULL;
}
pRepo->pPool = tsdbNewBufPool(pCfg);
if (pRepo->pPool == NULL) {
tsdbError("vgId:%d failed to create buffer pool since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbFreeRepo(pRepo);
return NULL;
}
pRepo->fs = tsdbNewFS(pCfg);
if (pRepo->fs == NULL) {
tsdbError("vgId:%d failed to TSDB file system since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbFreeRepo(pRepo);
return NULL;
}
return pRepo;
}
static void tsdbFreeRepo(STsdbRepo *pRepo) {
if (pRepo) {
tsdbFreeFS(pRepo->fs);
tsdbFreeBufPool(pRepo->pPool);
tsdbFreeMeta(pRepo->tsdbMeta);
tsdbFreeMergeBuf(pRepo->mergeBuf);
// tsdbFreeMemTable(pRepo->mem);
// tsdbFreeMemTable(pRepo->imem);
tsem_destroy(&(pRepo->readyToCommit));
taosThreadMutexDestroy(&pRepo->mutex);
taosMemoryFree(pRepo);
}
}
static void tsdbStartStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql,
tsdbGetTableSchemaImpl(pTable, false, false, -1), 0);
}
}
}
static void tsdbStopStream(STsdbRepo *pRepo) {
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
(*pRepo->appH.cqDropFunc)(pTable->cqhandle);
}
}
}
static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) {
//tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data);
STSchema *pSchema = tsdbGetTableLatestSchema(pTable);
if (pSchema == NULL) {
tsdbError("tsdbGetTableLatestSchema of table %s fail", pTable->name->data);
return 0;
}
SBlock* pBlock;
int numColumns;
int32_t blockIdx;
SDataStatis* pBlockStatis = NULL;
STSRow* row = NULL;
// restore last column data with last schema
int err = 0;
numColumns = schemaNCols(pSchema);
if (numColumns <= pTable->restoreColumnNum) {
pTable->hasRestoreLastColumn = true;
return 0;
}
if (pTable->lastColSVersion != schemaVersion(pSchema)) {
if (tsdbInitColIdCacheWithSchema(pTable, pSchema) < 0) {
return -1;
}
}
row = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (row == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
err = -1;
goto out;
}
memRowSetType(row, SMEM_ROW_DATA);
tdInitDataRow(memRowDataBody(row), pSchema);
// first load block index info
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
err = -1;
goto out;
}
pBlockStatis = taosMemoryCalloc(numColumns, sizeof(SDataStatis));
if (pBlockStatis == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
err = -1;
goto out;
}
memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis));
for(int32_t i = 0; i < numColumns; ++i) {
STColumn *pCol = schemaColAt(pSchema, i);
pBlockStatis[i].colId = pCol->colId;
}
// load block from backward
SBlockIdx *pIdx = pReadh->pBlkIdx;
blockIdx = (int32_t)(pIdx->numOfBlocks - 1);
while (numColumns > pTable->restoreColumnNum && blockIdx >= 0) {
bool loadStatisData = false;
pBlock = pReadh->pBlkInfo->blocks + blockIdx;
blockIdx -= 1;
// load block data
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
err = -1;
goto out;
}
// file block with sub-blocks has no statistics data
if (pBlock->numOfSubBlocks <= 1) {
if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) {
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock);
loadStatisData = true;
}
}
for (int16_t i = 0; i < numColumns && numColumns > pTable->restoreColumnNum; ++i) {
STColumn *pCol = schemaColAt(pSchema, i);
// ignore loaded columns
if (pTable->lastCols[i].bytes != 0) {
continue;
}
// ignore block which has no not-null colId column
if (loadStatisData && pBlockStatis[i].numOfNull == pBlock->numOfRows) {
continue;
}
// OK,let's load row from backward to get not-null column
for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) {
SDataCol *pDataCol = pReadh->pDCols[0]->cols + i;
const void* pColData = tdGetColDataOfRow(pDataCol, rowId);
tdAppendColVal(memRowDataBody(row), pColData, pCol->type, pCol->offset);
//SDataCol *pDataCol = readh.pDCols[0]->cols + j;
void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset);
if (isNull(value, pCol->type)) {
continue;
}
int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId);
if (idx == -1) {
tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId);
continue;
}
// save not-null column
uint16_t bytes = IS_VAR_DATA_TYPE(pCol->type) ? varDataTLen(pColData) : pCol->bytes;
SDataCol *pLastCol = &(pTable->lastCols[idx]);
pLastCol->pData = taosMemoryMalloc(bytes);
pLastCol->bytes = bytes;
pLastCol->colId = pCol->colId;
memcpy(pLastCol->pData, value, bytes);
// save row ts(in column 0)
pDataCol = pReadh->pDCols[0]->cols + 0;
pCol = schemaColAt(pSchema, 0);
tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->offset);
pLastCol->ts = TD_ROW_KEY(row);
pTable->restoreColumnNum += 1;
tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts);
break;
}
}
}
out:
taosTZfree(row);
taosMemoryFreeClear(pBlockStatis);
if (err == 0 && numColumns <= pTable->restoreColumnNum) {
pTable->hasRestoreLastColumn = true;
}
return err;
}
static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) {
ASSERT(pTable->lastRow == NULL);
if (tsdbLoadBlockInfo(pReadh, NULL) < 0) {
return -1;
}
SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1;
if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) {
return -1;
}
// Get the data in row
STSchema *pSchema = tsdbGetTableSchema(pTable);
pTable->lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema));
if (pTable->lastRow == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
memRowSetType(pTable->lastRow, SMEM_ROW_DATA);
tdInitDataRow(memRowDataBody(pTable->lastRow), pSchema);
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
STColumn *pCol = schemaColAt(pSchema, icol);
SDataCol *pDataCol = pReadh->pDCols[0]->cols + icol;
tdAppendColVal(memRowDataBody(pTable->lastRow), tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type,
pCol->offset);
}
return 0;
}
int tsdbRestoreInfo(STsdbRepo *pRepo) {
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbCfg * pCfg = REPO_CFG(pRepo);
if (tsdbInitReadH(&readh, pRepo) < 0) {
return -1;
}
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
pTable->restoreColumnNum = 0;
pTable->hasRestoreLastColumn = false;
}
}
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL) {
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
if (tsdbLoadBlockIdx(&readh) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable);
SBlockIdx *pIdx = readh.pBlkIdx;
if (pIdx && lastKey < pIdx->maxKey) {
pTable->lastKey = pIdx->maxKey;
if (CACHE_LAST_ROW(pCfg) && tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
}
// restore NULL columns
if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg) && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
}
}
}
tsdbDestroyReadH(&readh);
if (CACHE_LAST_NULL_COLUMN(pCfg)) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
}
return 0;
}
int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) {
bool cacheLastRow = false, cacheLastCol = false;
SFSIter fsiter;
SReadH readh;
SDFileSet *pSet;
STsdbMeta *pMeta = pRepo->tsdbMeta;
int tableNum = 0;
int maxTableIdx = 0;
int cacheLastRowTableNum = 0;
int cacheLastColTableNum = 0;
bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config));
bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config));
if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) {
tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed");
cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config));
cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config));
}
// calc max table idx and table num
for (int i = 1; i < pMeta->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
tableNum += 1;
maxTableIdx = i;
if (cacheLastCol) {
pTable->restoreColumnNum = 0;
pTable->hasRestoreLastColumn = false;
}
}
// if close last option,need to free data
if (need_free_last_row || need_free_last_col) {
if (need_free_last_col) {
atomic_store_8(&pRepo->hasCachedLastColumn, 0);
}
tsdbInfo("free cache last data since cacheLast option changed");
for (int i = 1; i <= maxTableIdx; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
if (need_free_last_row) {
taosTZfree(pTable->lastRow);
pTable->lastRow = NULL;
}
if (need_free_last_col) {
tsdbFreeLastColumns(pTable);
pTable->hasRestoreLastColumn = false;
}
}
}
if (!cacheLastRow && !cacheLastCol) {
return 0;
}
cacheLastRowTableNum = cacheLastRow ? tableNum : 0;
cacheLastColTableNum = cacheLastCol ? tableNum : 0;
if (tsdbInitReadH(&readh, pRepo) < 0) {
return -1;
}
tsdbFSIterInit(&fsiter, REPO_FS(pRepo), TSDB_FS_ITER_BACKWARD);
while ((pSet = tsdbFSIterNext(&fsiter)) != NULL && (cacheLastRowTableNum > 0 || cacheLastColTableNum > 0)) {
if (tsdbSetAndOpenReadFSet(&readh, pSet) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
if (tsdbLoadBlockIdx(&readh) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
for (int i = 1; i <= maxTableIdx; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
//tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data);
if (tsdbSetReadTable(&readh, pTable) < 0) {
tsdbDestroyReadH(&readh);
return -1;
}
SBlockIdx *pIdx = readh.pBlkIdx;
if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) {
pTable->lastKey = pIdx->maxKey;
if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
cacheLastRowTableNum -= 1;
}
// restore NULL columns
if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) {
if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) {
tsdbDestroyReadH(&readh);
return -1;
}
if (pTable->hasRestoreLastColumn) {
cacheLastColTableNum -= 1;
}
}
}
}
tsdbDestroyReadH(&readh);
if (cacheLastCol) {
atomic_store_8(&pRepo->hasCachedLastColumn, 1);
}
return 0;
}
#endif