more code
This commit is contained in:
parent
de204a5668
commit
e19a3eaf78
|
@ -16,7 +16,6 @@ SET(TD_GRANT FALSE)
|
|||
SET(TD_SYNC TRUE)
|
||||
SET(TD_MQTT TRUE)
|
||||
SET(TD_TSDB_PLUGINS FALSE)
|
||||
SET(TD_DNODE_PLUGINS FALSE)
|
||||
|
||||
SET(TD_COVER FALSE)
|
||||
SET(TD_MEM_CHECK FALSE)
|
||||
|
|
|
@ -25,10 +25,6 @@ IF (TD_TSDB_PLUGINS)
|
|||
ADD_DEFINITIONS(-D_TSDB_PLUGINS)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_DNODE_PLUGINS)
|
||||
ADD_DEFINITIONS(-D_DNODE_PLUGINS)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_GODLL)
|
||||
ADD_DEFINITIONS(-D_TD_GO_DLL_)
|
||||
ENDIF ()
|
||||
|
|
|
@ -22,28 +22,36 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
static FORCE_INLINE void tdGetMnodeRootDir(char *rootDir, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/mnode", rootDir);
|
||||
static FORCE_INLINE void tdGetMnodeRootDir(char *baseDir, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/mnode", baseDir);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdGetDnodeRootDir(char *rootDir, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/dnode", rootDir);
|
||||
static FORCE_INLINE void tdGetDnodeRootDir(char *baseDir, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/dnode", baseDir);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdGetVnodeRootDir(char *rootDir, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode", rootDir);
|
||||
static FORCE_INLINE void tdGetVnodeRootDir(char *baseDir, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode", baseDir);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdGetVnodeBackRootDir(char *rootDir, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode_bak", rootDir);
|
||||
static FORCE_INLINE void tdGetVnodeBackRootDir(char *baseDir, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode_bak", baseDir);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdGetVnodeDir(char *rootDir, int vid, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode/vnode%d", rootDir, vid);
|
||||
static FORCE_INLINE void tdGetVnodeDir(char *baseDir, int vid, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode/vnode%d", baseDir, vid);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdGetVnodeBackDir(char *rootDir, int vid, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode_bak/vnode%d", rootDir, vid);
|
||||
static FORCE_INLINE void tdGetVnodeBackDir(char *baseDir, int vid, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode_bak/vnode%d", baseDir, vid);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdGetTsdbRootDir(char *baseDir, int vid, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb", baseDir, vid);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdGetTsdbDataDir(char *baseDir, int vid, char *dirName) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/data", baseDir, vid);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -36,10 +36,6 @@ IF (TD_LINUX)
|
|||
TARGET_LINK_LIBRARIES(taosd balance sync)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_DNODE_PLUGINS)
|
||||
TARGET_LINK_LIBRARIES(taosd dnodePlugins)
|
||||
ENDIF()
|
||||
|
||||
SET(PREPARE_ENV_CMD "prepare_env_cmd")
|
||||
SET(PREPARE_ENV_TARGET "prepare_env_target")
|
||||
ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
|
||||
|
|
|
@ -34,8 +34,6 @@
|
|||
#include "dnodeTelemetry.h"
|
||||
#include "tpath.h"
|
||||
|
||||
struct SDnodeTier *pDnodeTier = NULL;
|
||||
|
||||
static int32_t dnodeInitStorage();
|
||||
static void dnodeCleanupStorage();
|
||||
static void dnodeSetRunStatus(SDnodeRunStatus status);
|
||||
|
|
|
@ -12,18 +12,293 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#include "dnode.h"
|
||||
#include "dnodeInt.h"
|
||||
#include "taosdef.h"
|
||||
|
||||
#ifndef _DNODE_PLUGINS
|
||||
#define DISK_MIN_FREE_SPACE 30 * 1024 * 1024 // disk free space less than 100M will not create new file again
|
||||
#define DNODE_DISK_AVAIL(pDisk) ((pDisk)->dmeta.free > DISK_MIN_FREE_SPACE)
|
||||
|
||||
SDnodeTier *dnodeNewTier() { return NULL; }
|
||||
void * dnodeCloseTier(SDnodeTier *pDnodeTier) { return NULL; }
|
||||
int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks) { return 0; }
|
||||
int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier) { return 0; }
|
||||
int dnodeCheckTiers(SDnodeTier *pDnodeTier) { return 0; }
|
||||
SDisk * dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) { return NULL; }
|
||||
SDisk * dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) { return NULL; }
|
||||
static int dnodeFormatDir(char *idir, char *odir);
|
||||
static int dnodeCheckDisk(char *dirName, int level, int primary);
|
||||
static int dnodeUpdateDiskMeta(SDisk *pDisk);
|
||||
static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary);
|
||||
|
||||
#endif
|
||||
SDnodeTier *dnodeNewTier() {
|
||||
SDnodeTier *pDnodeTier = (SDnodeTier *)calloc(1, sizeof(*pDnodeTier));
|
||||
if (pDnodeTier == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int ret = pthread_rwlock_init(&(pDnodeTier->rwlock), NULL);
|
||||
if (ret != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(ret);
|
||||
dnodeCloseTier(pDnodeTier);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pDnodeTier->map = taosHashInit(DNODE_MAX_TIERS * DNODE_MAX_DISKS_PER_TIER * 2,
|
||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (pDnodeTier->map == NULL) {
|
||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||
dnodeCloseTier(pDnodeTier);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pDnodeTier;
|
||||
}
|
||||
|
||||
void *dnodeCloseTier(SDnodeTier *pDnodeTier) {
|
||||
if (pDnodeTier) {
|
||||
if (pDnodeTier->map) {
|
||||
taosHashCleanup(pDnodeTier->map);
|
||||
pDnodeTier->map = NULL;
|
||||
}
|
||||
|
||||
pthread_rwlock_destroy(&(pDnodeTier->rwlock));
|
||||
|
||||
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
||||
STier *pTier = pDnodeTier->tiers + i;
|
||||
for (int j = 0; j < pTier->nDisks; j++) {
|
||||
if (pTier->disks[j]) {
|
||||
free(pTier->disks[j]);
|
||||
pTier->disks[j] = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
free(pDnodeTier);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks) {
|
||||
ASSERT(ndisks > 0);
|
||||
|
||||
for (int i = 0; i < ndisks; i++) {
|
||||
SDiskCfg *pCfg = pDiskCfgs + i;
|
||||
dnodeAddDisk(pDnodeTier, pCfg->dir, pCfg->level, pCfg->primary);
|
||||
}
|
||||
|
||||
if (dnodeCheckTiers(pDnodeTier) < 0) return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier) {
|
||||
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
||||
STier *pTier = pDnodeTier->tiers + i;
|
||||
|
||||
for (int j = 0; j < pTier->nDisks; j++) {
|
||||
SDisk *pDisk = pTier->disks[j];
|
||||
if (dnodeUpdateDiskMeta(pDisk) < 0) return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int dnodeCheckTiers(SDnodeTier *pDnodeTier) {
|
||||
ASSERT(pDnodeTier->nTiers > 0);
|
||||
if (DNODE_PRIMARY_DISK(pDnodeTier) == NULL) {
|
||||
terrno = TSDB_CODE_DND_LACK_PRIMARY_DISK;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
||||
if (pDnodeTier->tiers[i].nDisks == 0) {
|
||||
terrno = TSDB_CODE_DND_NO_DISK_AT_TIER;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) {
|
||||
ASSERT(level < pDnodeTier->nTiers);
|
||||
|
||||
STier *pTier = pDnodeTier->tiers + level;
|
||||
SDisk *pDisk = NULL;
|
||||
|
||||
ASSERT(pTier->nDisks > 0);
|
||||
|
||||
for (int i = 0; i < pTier->nDisks; i++) {
|
||||
SDisk *iDisk = pTier->disks[i];
|
||||
if (dnodeUpdateDiskMeta(iDisk) < 0) return NULL;
|
||||
if (DNODE_DISK_AVAIL(iDisk)) {
|
||||
if (pDisk == NULL || pDisk->dmeta.nfiles > iDisk->dmeta.nfiles) {
|
||||
pDisk = iDisk;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pDisk == NULL) {
|
||||
terrno = TSDB_CODE_DND_NO_DISK_SPACE;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SDisk *dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) {
|
||||
char fdirName[TSDB_FILENAME_LEN] = "\0";
|
||||
SDiskID *pDiskID = NULL;
|
||||
|
||||
if (dnodeFormatDir(dirName, fdirName) < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void *ptr = taosHashGet(pDnodeTier->map, (void *)fdirName, strnlen(fdirName, TSDB_FILENAME_LEN));
|
||||
if (ptr == NULL) return NULL;
|
||||
pDiskID = (SDiskID *)ptr;
|
||||
|
||||
return dnodeGetDisk(pDnodeTier, pDiskID->level, pDiskID->did);
|
||||
}
|
||||
|
||||
static int dnodeFormatDir(char *idir, char *odir) {
|
||||
wordexp_t wep;
|
||||
|
||||
int code = wordexp(idir, &wep, 0);
|
||||
if (code != 0) {
|
||||
dError("failed to format dir %s since %s", idir, strerror(code));
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (realpath(wep.we_wordv[0], odir) == NULL) {
|
||||
dError("failed to format dir %s since %s", idir, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wordfree(&wep);
|
||||
return -1;
|
||||
}
|
||||
|
||||
wordfree(&wep);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int dnodeCheckDisk(char *dirName, int level, int primary) {
|
||||
if (access(dirName, W_OK | R_OK | F_OK) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
struct stat pstat;
|
||||
if (stat(dirName, &pstat) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (S_ISDIR(pstat.st_mode)) {
|
||||
return 0;
|
||||
} else {
|
||||
terrno = TSDB_CODE_DND_DISK_NOT_DIRECTORY;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
static int dnodeUpdateDiskMeta(SDisk *pDisk) {
|
||||
struct statvfs dstat;
|
||||
if (statvfs(pDisk->dir, &dstat) < 0) {
|
||||
dError("failed to get dir %s information since %s", pDisk->dir, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pDisk->dmeta.size = dstat.f_bsize * dstat.f_blocks;
|
||||
pDisk->dmeta.free = dstat.f_bsize * dstat.f_bavail;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary) {
|
||||
char dirName[TSDB_FILENAME_LEN] = "\0";
|
||||
STier * pTier = NULL;
|
||||
SDiskID diskid = {0};
|
||||
SDisk * pDisk = NULL;
|
||||
|
||||
if (level < 0 || level >= DNODE_MAX_TIERS) {
|
||||
terrno = TSDB_CODE_DND_INVALID_DISK_TIER;
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dnodeFormatDir(dir, dirName) < 0) {
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTier = pDnodeTier->tiers + level;
|
||||
diskid.level = level;
|
||||
|
||||
if (pTier->nDisks >= DNODE_MAX_DISKS_PER_TIER) {
|
||||
terrno = TSDB_CODE_DND_TOO_MANY_DISKS;
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dnodeGetDiskByName(pDnodeTier, dirName) != NULL) {
|
||||
terrno = TSDB_CODE_DND_DISK_ALREADY_EXISTS;
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dnodeCheckDisk(dirName, level, primary) < 0) {
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (primary) {
|
||||
if (level != 0) {
|
||||
terrno = TSDB_CODE_DND_INVALID_DISK_TIER;
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (DNODE_PRIMARY_DISK(pDnodeTier) != NULL) {
|
||||
terrno = TSDB_CODE_DND_DUPLICATE_PRIMARY_DISK;
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
diskid.did = 0;
|
||||
} else {
|
||||
if (level == 0) {
|
||||
if (DNODE_PRIMARY_DISK(pDnodeTier) != NULL) {
|
||||
diskid.did = pTier->nDisks;
|
||||
} else {
|
||||
diskid.did = pTier->nDisks + 1;
|
||||
if (diskid.did >= DNODE_MAX_DISKS_PER_TIER) {
|
||||
terrno = TSDB_CODE_DND_TOO_MANY_DISKS;
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
diskid.did = pTier->nDisks;
|
||||
}
|
||||
}
|
||||
|
||||
pDisk = (SDisk *)calloc(1, sizeof(SDisk));
|
||||
if (pDisk == NULL) {
|
||||
terrno = TSDB_CODE_DND_OUT_OF_MEMORY;
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
strncpy(pDisk->dir, dirName, TSDB_FILENAME_LEN);
|
||||
|
||||
if (taosHashPut(pDnodeTier->map, (void *)dirName, strnlen(dirName, TSDB_FILENAME_LEN), (void *)(&diskid),
|
||||
sizeof(diskid)) < 0) {
|
||||
free(pDisk);
|
||||
terrno = TSDB_CODE_DND_OUT_OF_MEMORY;
|
||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTier->nDisks++;
|
||||
pTier->disks[diskid.did] = pDisk;
|
||||
pDnodeTier->nTiers = MAX(pDnodeTier->nTiers, level);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -18,21 +18,25 @@
|
|||
#define TAOS_RANDOM_FILE_FAIL_TEST
|
||||
|
||||
#include "os.h"
|
||||
#include "tglobal.h"
|
||||
#include "talgo.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tsdbMain.h"
|
||||
#include "tutil.h"
|
||||
#include "dnode.h"
|
||||
#include "tpath.h"
|
||||
|
||||
struct SDnodeTier *pDnodeTier = NULL;
|
||||
const char * tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
||||
|
||||
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
||||
|
||||
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
||||
static void tsdbDestroyFile(SFile *pFile);
|
||||
static int compFGroup(const void *arg1, const void *arg2);
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||
static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo);
|
||||
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep);
|
||||
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
|
||||
static void tsdbDestroyFile(SFile *pFile);
|
||||
static int compFGroup(const void *arg1, const void *arg2);
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep);
|
||||
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days);
|
||||
static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk);
|
||||
static SHashObj *tsdbGetAllFids(STsdbRepo *pRepo, char *dirName);
|
||||
static int tsdbRestoreFileGroup(STsdbRepo *pRepo, SDisk *pDisk, int fid, SFileGroup *pFileGroup);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
|
||||
|
@ -74,129 +78,25 @@ void tsdbFreeFileH(STsdbFileH *pFileH) {
|
|||
|
||||
int tsdbOpenFileH(STsdbRepo *pRepo) {
|
||||
ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL);
|
||||
char dataDir[TSDB_FILENAME_LEN] = "\0";
|
||||
|
||||
char * tDataDir = NULL;
|
||||
DIR * dir = NULL;
|
||||
int fid = 0;
|
||||
int vid = 0;
|
||||
regex_t regex1, regex2;
|
||||
int code = 0;
|
||||
char fname[TSDB_FILENAME_LEN] = "\0";
|
||||
for (int level = 0; level < pDnodeTier->nTiers; level++) {
|
||||
STier *pTier = pDnodeTier->tiers + level;
|
||||
for (int did = 0; did < pTier->nDisks; did++) {
|
||||
SDisk *pDisk = pTier->disks[did];
|
||||
|
||||
SFileGroup fileGroup = {0};
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
|
||||
tDataDir = tsdbGetDataDirName(pRepo->rootDir);
|
||||
if (tDataDir == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
dir = opendir(tDataDir);
|
||||
if (dir == NULL) {
|
||||
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = regcomp(®ex1, "^v[0-9]+f[0-9]+\\.(head|data|last|stat)$", REG_EXTENDED);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
code = regcomp(®ex2, "^v[0-9]+f[0-9]+\\.(h|d|l|s)$", REG_EXTENDED);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
|
||||
|
||||
struct dirent *dp = NULL;
|
||||
while ((dp = readdir(dir)) != NULL) {
|
||||
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
|
||||
|
||||
code = regexec(®ex1, dp->d_name, 0, NULL, 0);
|
||||
if (code == 0) {
|
||||
sscanf(dp->d_name, "v%df%d", &vid, &fid);
|
||||
if (vid != REPO_ID(pRepo)) {
|
||||
tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name);
|
||||
tdGetTsdbDataDir(pDisk->dir, REPO_ID(pRepo), dataDir);
|
||||
|
||||
if (access(dataDir, F_OK) != 0) {
|
||||
// Skip those disks without data
|
||||
continue;
|
||||
}
|
||||
|
||||
if (fid < mfid) {
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
tsdbGetDataFileName(pRepo->rootDir, pCfg->tsdbId, fid, type, fname);
|
||||
(void)remove(fname);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue;
|
||||
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
|
||||
fileGroup.fileId = fid;
|
||||
|
||||
tsdbInitFileGroup(&fileGroup, pRepo);
|
||||
} else if (code == REG_NOMATCH) {
|
||||
code = regexec(®ex2, dp->d_name, 0, NULL, 0);
|
||||
if (code == 0) {
|
||||
size_t tsize = strlen(tDataDir) + strlen(dp->d_name) + 2;
|
||||
char * fname1 = malloc(tsize);
|
||||
if (fname1 == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
sprintf(fname1, "%s/%s", tDataDir, dp->d_name);
|
||||
|
||||
tsize = tsize + 64;
|
||||
char *fname2 = malloc(tsize);
|
||||
if (fname2 == NULL) {
|
||||
free(fname1);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
sprintf(fname2, "%s/%s_back_%" PRId64, tDataDir, dp->d_name, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
|
||||
|
||||
(void)rename(fname1, fname2);
|
||||
|
||||
tsdbDebug("vgId:%d file %s exists, backup it as %s", REPO_ID(pRepo), fname1, fname2);
|
||||
|
||||
free(fname1);
|
||||
free(fname2);
|
||||
continue;
|
||||
} else if (code == REG_NOMATCH) {
|
||||
tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name);
|
||||
continue;
|
||||
} else {
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
goto _err;
|
||||
tsdbLoadFilesFromDisk(pRepo, pDisk);
|
||||
}
|
||||
|
||||
pFileH->pFGroup[pFileH->nFGroups++] = fileGroup;
|
||||
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
||||
tsdbDebug("vgId:%d file group %d is restored, nFGroups %d", REPO_ID(pRepo), fileGroup.fileId, pFileH->nFGroups);
|
||||
}
|
||||
|
||||
regfree(®ex1);
|
||||
regfree(®ex2);
|
||||
taosTFree(tDataDir);
|
||||
closedir(dir);
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]);
|
||||
|
||||
regfree(®ex1);
|
||||
regfree(®ex2);
|
||||
|
||||
taosTFree(tDataDir);
|
||||
if (dir != NULL) closedir(dir);
|
||||
tsdbCloseFileH(pRepo);
|
||||
return -1;
|
||||
}
|
||||
|
||||
void tsdbCloseFileH(STsdbRepo *pRepo) {
|
||||
|
@ -522,37 +422,6 @@ _err:
|
|||
}
|
||||
|
||||
// ---------------- LOCAL FUNCTIONS ----------------
|
||||
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
|
||||
uint32_t version;
|
||||
|
||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), fid, type, pFile->fname);
|
||||
|
||||
pFile->fd = -1;
|
||||
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
|
||||
|
||||
if (tsdbLoadFileHeader(pFile, &version) < 0) {
|
||||
tsdbError("vgId:%d failed to load file %s header part since %s", REPO_ID(pRepo), pFile->fname, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pFile->info.size == TSDB_FILE_HEAD_SIZE) {
|
||||
pFile->info.size = lseek(pFile->fd, 0, SEEK_END);
|
||||
}
|
||||
|
||||
if (version != TSDB_FILE_VERSION) {
|
||||
// TODO: deal with error
|
||||
tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
|
||||
REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION);
|
||||
}
|
||||
|
||||
tsdbCloseFile(pFile);
|
||||
|
||||
return 0;
|
||||
_err:
|
||||
tsdbDestroyFile(pFile);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); }
|
||||
|
||||
static int compFGroup(const void *arg1, const void *arg2) {
|
||||
|
@ -578,22 +447,219 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup) {
|
|||
}
|
||||
}
|
||||
|
||||
static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) {
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
if (tsdbInitFile(&pFGroup->files[type], pRepo, pFGroup->fileId, type) < 0) {
|
||||
memset(&pFGroup->files[type].info, 0, sizeof(STsdbFileInfo));
|
||||
pFGroup->files[type].info.magic = TSDB_FILE_INIT_MAGIC;
|
||||
pFGroup->state = 1;
|
||||
pRepo->state = TSDB_STATE_BAD_FILE;
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) {
|
||||
return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]);
|
||||
}
|
||||
|
||||
static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) {
|
||||
return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision));
|
||||
}
|
||||
|
||||
static int tsdbLoadFilesFromDisk(STsdbRepo *pRepo, SDisk *pDisk) {
|
||||
char tsdbDataDir[TSDB_FILENAME_LEN] = "\0";
|
||||
char tsdbRootDir[TSDB_FILENAME_LEN] = "\0";
|
||||
char fname[TSDB_FILENAME_LEN] = "\0";
|
||||
SHashObj * pFids = NULL;
|
||||
SHashMutableIterator *pIter = NULL;
|
||||
STsdbFileH * pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup fgroup = {0};
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
int mfid = 0;
|
||||
|
||||
tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir);
|
||||
tdGetTsdbDataDir(pDisk->dir, REPO_ID(pRepo), tsdbDataDir);
|
||||
|
||||
pFids = tsdbGetAllFids(pRepo, tsdbDataDir);
|
||||
if (pFids == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pIter = taosHashCreateIter(pFids);
|
||||
if (pIter == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
|
||||
|
||||
while (taosHashIterNext(pIter)) {
|
||||
int32_t fid = *(int32_t *)taosHashIterGet(pIter);
|
||||
|
||||
if (fid < mfid) {
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, type, fname);
|
||||
(void)remove(fname);
|
||||
}
|
||||
|
||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NHEAD, fname);
|
||||
(void)remove(fname);
|
||||
|
||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NLAST, fname);
|
||||
(void)remove(fname);
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
tsdbRestoreFileGroup(pRepo, pDisk, fid, &fgroup);
|
||||
pFileH->pFGroup[pFileH->nFGroups++] = fgroup;
|
||||
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(fgroup), compFGroup);
|
||||
|
||||
// TODO
|
||||
pDisk->dmeta.nfiles++;
|
||||
}
|
||||
|
||||
taosHashDestroyIter(pIter);
|
||||
taosHashCleanup(pFids);
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
taosHashDestroyIter(pIter);
|
||||
taosHashCleanup(pFids);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int tsdbRestoreFileGroup(STsdbRepo *pRepo, SDisk *pDisk, int fid, SFileGroup *pFileGroup) {
|
||||
char tsdbRootDir[TSDB_FILENAME_LEN] = "\0";
|
||||
char nheadF[TSDB_FILENAME_LEN] = "\0";
|
||||
char nlastF[TSDB_FILENAME_LEN] = "\0";
|
||||
bool newHeadExists = false;
|
||||
bool newLastExists = false;
|
||||
|
||||
uint32_t version = 0;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
memset((void *)pFileGroup, 0, sizeof(*pFileGroup));
|
||||
pFileGroup->fileId = fid;
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
SFile *pFile = pFileGroup->files + type;
|
||||
pFile->fd = -1;
|
||||
}
|
||||
|
||||
tdGetTsdbRootDir(pDisk->dir, REPO_ID(pRepo), tsdbRootDir);
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
SFile *pFile = pFileGroup->files + type;
|
||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_HEAD, pFile->fname);
|
||||
if (access(pFile->fname, F_OK) != 0) {
|
||||
memset(&(pFile->info), 0, sizeof(pFile->info));
|
||||
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||
pFileGroup->state = 1;
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
}
|
||||
}
|
||||
|
||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NHEAD, nheadF);
|
||||
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), fid, TSDB_FILE_TYPE_NLAST, nlastF);
|
||||
|
||||
if (access(nheadF, F_OK) == 0) {
|
||||
newHeadExists = true;
|
||||
}
|
||||
|
||||
if (access(nlastF, F_OK) == 0) {
|
||||
newLastExists = true;
|
||||
}
|
||||
|
||||
if (newHeadExists) {
|
||||
(void)remove(nheadF);
|
||||
(void)remove(nlastF);
|
||||
} else {
|
||||
if (newLastExists) {
|
||||
(void)rename(nlastF, pFileGroup->files[TSDB_FILE_TYPE_LAST].fname);
|
||||
}
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||
SFile *pFile = pFileGroup->files + type;
|
||||
if (tsdbOpenFile(pFile, O_RDONLY) < 0) {
|
||||
memset(&(pFile->info), 0, sizeof(pFile->info));
|
||||
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||
pFileGroup->state = 1;
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (tsdbLoadFileHeader(pFile, &version) < 0) {
|
||||
memset(&(pFile->info), 0, sizeof(pFile->info));
|
||||
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||
pFileGroup->state = 1;
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
tsdbCloseFile(pFile);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (version != TSDB_FILE_VERSION) {
|
||||
tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
|
||||
REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION);
|
||||
}
|
||||
|
||||
tsdbCloseFile(pFile);
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static SHashObj *tsdbGetAllFids(STsdbRepo *pRepo, char *dirName) {
|
||||
DIR * dir = NULL;
|
||||
regex_t regex = {0};
|
||||
int code = 0;
|
||||
int32_t vid, fid;
|
||||
SHashObj *pHash = NULL;
|
||||
|
||||
code = regcomp(®ex, "^v[0-9]+f[0-9]+\\.(head|data|last|h|d|l)$", REG_EXTENDED);
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
dir = opendir(dirName);
|
||||
if (dir == NULL) {
|
||||
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dirName, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pHash = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (pHash == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
struct dirent *dp = NULL;
|
||||
while ((dp = readdir(dir)) != NULL) {
|
||||
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
|
||||
|
||||
code = regexec(®ex, dp->d_name, 0, NULL, 0);
|
||||
if (code == 0) {
|
||||
sscanf(dp->d_name, "v%df%d", &vid, &fid);
|
||||
|
||||
if (vid != REPO_ID(pRepo)) {
|
||||
tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name);
|
||||
continue;
|
||||
}
|
||||
|
||||
taosHashPut(pHash, (void *)(&fid), sizeof(fid), (void *)(&fid), sizeof(fid));
|
||||
} else if (code == REG_NOMATCH) {
|
||||
tsdbError("vgId:%d invalid file %s exists, ignore it", REPO_ID(pRepo), dp->d_name);
|
||||
continue;
|
||||
} else {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
closedir(dir);
|
||||
regfree(®ex);
|
||||
return pHash;
|
||||
|
||||
_err:
|
||||
taosHashCleanup(pHash);
|
||||
if (dir != NULL) closedir(dir);
|
||||
regfree(®ex);
|
||||
return NULL;
|
||||
}
|
|
@ -105,7 +105,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
}
|
||||
|
||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
tdGetVnodeDir(tsDataDir, pVnodeCfg->cfg.vgId, rootDir);
|
||||
if (mkdir(rootDir, 0755) != 0 && errno != EEXIST) {
|
||||
vError("vgId:%d, failed to create vnode, reason:%s dir:%s", pVnodeCfg->cfg.vgId, strerror(errno), rootDir);
|
||||
if (errno == EACCES) {
|
||||
|
@ -138,7 +138,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
tsdbCfg.compression = pVnodeCfg->cfg.compression;
|
||||
|
||||
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
||||
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
tdGetTsdbRootDir(tsDataDir, pVnodeCfg->cfg.vgId, tsdbDir);
|
||||
if (tsdbCreateRepo(tsdbDir, &tsdbCfg) < 0) {
|
||||
vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
|
||||
return TSDB_CODE_VND_INIT_FAILED;
|
||||
|
|
Loading…
Reference in New Issue