make it compilable
This commit is contained in:
parent
44963201ab
commit
d6d1532b0a
|
@ -0,0 +1,101 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TD_TDISK_H
|
||||||
|
#define TD_TDISK_H
|
||||||
|
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "hash.h"
|
||||||
|
#include "hash.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int level;
|
||||||
|
int did;
|
||||||
|
} SDiskID;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint64_t size;
|
||||||
|
uint64_t free;
|
||||||
|
uint64_t nfiles;
|
||||||
|
} SDiskMeta;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char dir[TSDB_FILENAME_LEN];
|
||||||
|
SDiskMeta dmeta;
|
||||||
|
} SDisk;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int level;
|
||||||
|
int nDisks;
|
||||||
|
SDisk *disks[TSDB_MAX_DISKS_PER_TIER];
|
||||||
|
} STier;
|
||||||
|
|
||||||
|
typedef struct SDnodeTier {
|
||||||
|
pthread_mutex_t lock;
|
||||||
|
int nTiers;
|
||||||
|
STier tiers[TSDB_MAX_TIERS];
|
||||||
|
SHashObj * map;
|
||||||
|
} SDnodeTier;
|
||||||
|
|
||||||
|
extern struct SDnodeTier *tsDnodeTier;
|
||||||
|
#define DNODE_PRIMARY_DISK(pDnodeTier) (pDnodeTier)->tiers[0].disks[0]
|
||||||
|
|
||||||
|
static FORCE_INLINE int dnodeLockTiers(SDnodeTier *pDnodeTier) {
|
||||||
|
int code = pthread_mutex_lock(&(pDnodeTier->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int dnodeUnLockTiers(SDnodeTier *pDnodeTier) {
|
||||||
|
int code = pthread_mutex_unlock(&(pDnodeTier->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE SDisk *dnodeGetDisk(SDnodeTier *pDnodeTier, int level, int did) {
|
||||||
|
if (level < 0 || level >= pDnodeTier->nTiers) return NULL;
|
||||||
|
|
||||||
|
if (did < 0 || did >= pDnodeTier->tiers[level].nDisks) return NULL;
|
||||||
|
|
||||||
|
return pDnodeTier->tiers[level].disks[did];
|
||||||
|
}
|
||||||
|
|
||||||
|
SDnodeTier *dnodeNewTier();
|
||||||
|
void * dnodeCloseTier(SDnodeTier *pDnodeTier);
|
||||||
|
int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks);
|
||||||
|
int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier);
|
||||||
|
int dnodeCheckTiers(SDnodeTier *pDnodeTier);
|
||||||
|
SDisk * dnodeAssignDisk(SDnodeTier *pDnodeTier, int level);
|
||||||
|
SDisk * dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName);
|
||||||
|
void dnodeIncDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock);
|
||||||
|
void dnodeDecDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,337 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
#include "os.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "tdisk.h"
|
||||||
|
#include "tulog.h"
|
||||||
|
|
||||||
|
#define DISK_MIN_FREE_SPACE 30 * 1024 * 1024 // disk free space less than 100M will not create new file again
|
||||||
|
#define DNODE_DISK_AVAIL(pDisk) ((pDisk)->dmeta.free > DISK_MIN_FREE_SPACE)
|
||||||
|
|
||||||
|
static int dnodeFormatDir(char *idir, char *odir);
|
||||||
|
static int dnodeCheckDisk(char *dirName, int level, int primary);
|
||||||
|
static int dnodeUpdateDiskMeta(SDisk *pDisk);
|
||||||
|
static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary);
|
||||||
|
|
||||||
|
struct SDnodeTier *tsDnodeTier = NULL;
|
||||||
|
|
||||||
|
SDnodeTier *dnodeNewTier() {
|
||||||
|
SDnodeTier *pDnodeTier = (SDnodeTier *)calloc(1, sizeof(*pDnodeTier));
|
||||||
|
if (pDnodeTier == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ret = pthread_mutex_init(&(pDnodeTier->lock), NULL);
|
||||||
|
if (ret != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(ret);
|
||||||
|
dnodeCloseTier(pDnodeTier);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDnodeTier->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2,
|
||||||
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
if (pDnodeTier->map == NULL) {
|
||||||
|
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
||||||
|
dnodeCloseTier(pDnodeTier);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pDnodeTier;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *dnodeCloseTier(SDnodeTier *pDnodeTier) {
|
||||||
|
if (pDnodeTier) {
|
||||||
|
if (pDnodeTier->map) {
|
||||||
|
taosHashCleanup(pDnodeTier->map);
|
||||||
|
pDnodeTier->map = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&(pDnodeTier->lock));
|
||||||
|
|
||||||
|
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
||||||
|
STier *pTier = pDnodeTier->tiers + i;
|
||||||
|
for (int j = 0; j < pTier->nDisks; j++) {
|
||||||
|
if (pTier->disks[j]) {
|
||||||
|
free(pTier->disks[j]);
|
||||||
|
pTier->disks[j] = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
free(pDnodeTier);
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks) {
|
||||||
|
ASSERT(ndisks > 0);
|
||||||
|
|
||||||
|
for (int i = 0; i < ndisks; i++) {
|
||||||
|
SDiskCfg *pCfg = pDiskCfgs + i;
|
||||||
|
dnodeAddDisk(pDnodeTier, pCfg->dir, pCfg->level, pCfg->primary);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dnodeCheckTiers(pDnodeTier) < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier) {
|
||||||
|
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
||||||
|
STier *pTier = pDnodeTier->tiers + i;
|
||||||
|
|
||||||
|
for (int j = 0; j < pTier->nDisks; j++) {
|
||||||
|
SDisk *pDisk = pTier->disks[j];
|
||||||
|
if (dnodeUpdateDiskMeta(pDisk) < 0) return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int dnodeCheckTiers(SDnodeTier *pDnodeTier) {
|
||||||
|
ASSERT(pDnodeTier->nTiers > 0);
|
||||||
|
if (DNODE_PRIMARY_DISK(pDnodeTier) == NULL) {
|
||||||
|
terrno = TSDB_CODE_DND_LACK_PRIMARY_DISK;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
||||||
|
if (pDnodeTier->tiers[i].nDisks == 0) {
|
||||||
|
terrno = TSDB_CODE_DND_NO_DISK_AT_TIER;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) {
|
||||||
|
ASSERT(level < pDnodeTier->nTiers);
|
||||||
|
|
||||||
|
STier *pTier = pDnodeTier->tiers + level;
|
||||||
|
SDisk *pDisk = NULL;
|
||||||
|
|
||||||
|
ASSERT(pTier->nDisks > 0);
|
||||||
|
|
||||||
|
dnodeLockTiers(pDnodeTier);
|
||||||
|
|
||||||
|
for (int i = 0; i < pTier->nDisks; i++) {
|
||||||
|
SDisk *iDisk = pTier->disks[i];
|
||||||
|
if (dnodeUpdateDiskMeta(iDisk) < 0) return NULL;
|
||||||
|
if (DNODE_DISK_AVAIL(iDisk)) {
|
||||||
|
if (pDisk == NULL || pDisk->dmeta.nfiles > iDisk->dmeta.nfiles) {
|
||||||
|
pDisk = iDisk;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDisk == NULL) {
|
||||||
|
terrno = TSDB_CODE_DND_NO_DISK_SPACE;
|
||||||
|
dnodeUnLockTiers(pDnodeTier);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
dnodeIncDiskFiles(pDnodeTier, pDisk, false);
|
||||||
|
|
||||||
|
dnodeUnLockTiers(pDnodeTier);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDisk *dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) {
|
||||||
|
char fdirName[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
SDiskID *pDiskID = NULL;
|
||||||
|
|
||||||
|
if (dnodeFormatDir(dirName, fdirName) < 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *ptr = taosHashGet(pDnodeTier->map, (void *)fdirName, strnlen(fdirName, TSDB_FILENAME_LEN));
|
||||||
|
if (ptr == NULL) return NULL;
|
||||||
|
pDiskID = (SDiskID *)ptr;
|
||||||
|
|
||||||
|
return dnodeGetDisk(pDnodeTier, pDiskID->level, pDiskID->did);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dnodeIncDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock) {
|
||||||
|
if (lock) {
|
||||||
|
dnodeLockTiers(pDnodeTier);
|
||||||
|
}
|
||||||
|
|
||||||
|
pDisk->dmeta.nfiles++;
|
||||||
|
|
||||||
|
if (lock) {
|
||||||
|
dnodeUnLockTiers(pDnodeTier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void dnodeDecDiskFiles(SDnodeTier *pDnodeTier, SDisk *pDisk, bool lock) {
|
||||||
|
if (lock) {
|
||||||
|
dnodeLockTiers(pDnodeTier);
|
||||||
|
}
|
||||||
|
|
||||||
|
pDisk->dmeta.nfiles--;
|
||||||
|
|
||||||
|
if (lock) {
|
||||||
|
dnodeUnLockTiers(pDnodeTier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int dnodeFormatDir(char *idir, char *odir) {
|
||||||
|
wordexp_t wep;
|
||||||
|
|
||||||
|
int code = wordexp(idir, &wep, 0);
|
||||||
|
if (code != 0) {
|
||||||
|
uError("failed to format dir %s since %s", idir, strerror(code));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (realpath(wep.we_wordv[0], odir) == NULL) {
|
||||||
|
uError("failed to format dir %s since %s", idir, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
wordfree(&wep);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
wordfree(&wep);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int dnodeCheckDisk(char *dirName, int level, int primary) {
|
||||||
|
if (access(dirName, W_OK | R_OK | F_OK) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct stat pstat;
|
||||||
|
if (stat(dirName, &pstat) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (S_ISDIR(pstat.st_mode)) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_DND_DISK_NOT_DIRECTORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int dnodeUpdateDiskMeta(SDisk *pDisk) {
|
||||||
|
struct statvfs dstat;
|
||||||
|
if (statvfs(pDisk->dir, &dstat) < 0) {
|
||||||
|
uError("failed to get dir %s information since %s", pDisk->dir, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDisk->dmeta.size = dstat.f_bsize * dstat.f_blocks;
|
||||||
|
pDisk->dmeta.free = dstat.f_bsize * dstat.f_bavail;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary) {
|
||||||
|
char dirName[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
STier * pTier = NULL;
|
||||||
|
SDiskID diskid = {0};
|
||||||
|
SDisk * pDisk = NULL;
|
||||||
|
|
||||||
|
if (level < 0 || level >= TSDB_MAX_TIERS) {
|
||||||
|
terrno = TSDB_CODE_DND_INVALID_DISK_TIER;
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dnodeFormatDir(dir, dirName) < 0) {
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTier = pDnodeTier->tiers + level;
|
||||||
|
diskid.level = level;
|
||||||
|
|
||||||
|
if (pTier->nDisks >= TSDB_MAX_DISKS_PER_TIER) {
|
||||||
|
terrno = TSDB_CODE_DND_TOO_MANY_DISKS;
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dnodeGetDiskByName(pDnodeTier, dirName) != NULL) {
|
||||||
|
terrno = TSDB_CODE_DND_DISK_ALREADY_EXISTS;
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dnodeCheckDisk(dirName, level, primary) < 0) {
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (primary) {
|
||||||
|
if (level != 0) {
|
||||||
|
terrno = TSDB_CODE_DND_INVALID_DISK_TIER;
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (DNODE_PRIMARY_DISK(pDnodeTier) != NULL) {
|
||||||
|
terrno = TSDB_CODE_DND_DUPLICATE_PRIMARY_DISK;
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
diskid.did = 0;
|
||||||
|
} else {
|
||||||
|
if (level == 0) {
|
||||||
|
if (DNODE_PRIMARY_DISK(pDnodeTier) != NULL) {
|
||||||
|
diskid.did = pTier->nDisks;
|
||||||
|
} else {
|
||||||
|
diskid.did = pTier->nDisks + 1;
|
||||||
|
if (diskid.did >= TSDB_MAX_DISKS_PER_TIER) {
|
||||||
|
terrno = TSDB_CODE_DND_TOO_MANY_DISKS;
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
diskid.did = pTier->nDisks;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pDisk = (SDisk *)calloc(1, sizeof(SDisk));
|
||||||
|
if (pDisk == NULL) {
|
||||||
|
terrno = TSDB_CODE_DND_OUT_OF_MEMORY;
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
strncpy(pDisk->dir, dirName, TSDB_FILENAME_LEN);
|
||||||
|
|
||||||
|
if (taosHashPut(pDnodeTier->map, (void *)dirName, strnlen(dirName, TSDB_FILENAME_LEN), (void *)(&diskid),
|
||||||
|
sizeof(diskid)) < 0) {
|
||||||
|
free(pDisk);
|
||||||
|
terrno = TSDB_CODE_DND_OUT_OF_MEMORY;
|
||||||
|
uError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTier->nDisks++;
|
||||||
|
pTier->disks[diskid.did] = pDisk;
|
||||||
|
pDnodeTier->nTiers = MAX(pDnodeTier->nTiers, level);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -33,6 +33,7 @@
|
||||||
#include "dnodeShell.h"
|
#include "dnodeShell.h"
|
||||||
#include "dnodeTelemetry.h"
|
#include "dnodeTelemetry.h"
|
||||||
#include "tpath.h"
|
#include "tpath.h"
|
||||||
|
#include "tdisk.h"
|
||||||
|
|
||||||
static int32_t dnodeInitStorage();
|
static int32_t dnodeInitStorage();
|
||||||
static void dnodeCleanupStorage();
|
static void dnodeCleanupStorage();
|
||||||
|
|
|
@ -1,304 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#include "os.h"
|
|
||||||
|
|
||||||
#include "dnode.h"
|
|
||||||
#include "dnodeInt.h"
|
|
||||||
#include "taosdef.h"
|
|
||||||
|
|
||||||
#define DISK_MIN_FREE_SPACE 30 * 1024 * 1024 // disk free space less than 100M will not create new file again
|
|
||||||
#define DNODE_DISK_AVAIL(pDisk) ((pDisk)->dmeta.free > DISK_MIN_FREE_SPACE)
|
|
||||||
|
|
||||||
static int dnodeFormatDir(char *idir, char *odir);
|
|
||||||
static int dnodeCheckDisk(char *dirName, int level, int primary);
|
|
||||||
static int dnodeUpdateDiskMeta(SDisk *pDisk);
|
|
||||||
static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary);
|
|
||||||
|
|
||||||
SDnodeTier *dnodeNewTier() {
|
|
||||||
SDnodeTier *pDnodeTier = (SDnodeTier *)calloc(1, sizeof(*pDnodeTier));
|
|
||||||
if (pDnodeTier == NULL) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ret = pthread_rwlock_init(&(pDnodeTier->rwlock), NULL);
|
|
||||||
if (ret != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(ret);
|
|
||||||
dnodeCloseTier(pDnodeTier);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDnodeTier->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2,
|
|
||||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
||||||
if (pDnodeTier->map == NULL) {
|
|
||||||
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
|
|
||||||
dnodeCloseTier(pDnodeTier);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pDnodeTier;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *dnodeCloseTier(SDnodeTier *pDnodeTier) {
|
|
||||||
if (pDnodeTier) {
|
|
||||||
if (pDnodeTier->map) {
|
|
||||||
taosHashCleanup(pDnodeTier->map);
|
|
||||||
pDnodeTier->map = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_destroy(&(pDnodeTier->rwlock));
|
|
||||||
|
|
||||||
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
|
||||||
STier *pTier = pDnodeTier->tiers + i;
|
|
||||||
for (int j = 0; j < pTier->nDisks; j++) {
|
|
||||||
if (pTier->disks[j]) {
|
|
||||||
free(pTier->disks[j]);
|
|
||||||
pTier->disks[j] = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
free(pDnodeTier);
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks) {
|
|
||||||
ASSERT(ndisks > 0);
|
|
||||||
|
|
||||||
for (int i = 0; i < ndisks; i++) {
|
|
||||||
SDiskCfg *pCfg = pDiskCfgs + i;
|
|
||||||
dnodeAddDisk(pDnodeTier, pCfg->dir, pCfg->level, pCfg->primary);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dnodeCheckTiers(pDnodeTier) < 0) return -1;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier) {
|
|
||||||
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
|
||||||
STier *pTier = pDnodeTier->tiers + i;
|
|
||||||
|
|
||||||
for (int j = 0; j < pTier->nDisks; j++) {
|
|
||||||
SDisk *pDisk = pTier->disks[j];
|
|
||||||
if (dnodeUpdateDiskMeta(pDisk) < 0) return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int dnodeCheckTiers(SDnodeTier *pDnodeTier) {
|
|
||||||
ASSERT(pDnodeTier->nTiers > 0);
|
|
||||||
if (DNODE_PRIMARY_DISK(pDnodeTier) == NULL) {
|
|
||||||
terrno = TSDB_CODE_DND_LACK_PRIMARY_DISK;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < pDnodeTier->nTiers; i++) {
|
|
||||||
if (pDnodeTier->tiers[i].nDisks == 0) {
|
|
||||||
terrno = TSDB_CODE_DND_NO_DISK_AT_TIER;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDisk *dnodeAssignDisk(SDnodeTier *pDnodeTier, int level) {
|
|
||||||
ASSERT(level < pDnodeTier->nTiers);
|
|
||||||
|
|
||||||
STier *pTier = pDnodeTier->tiers + level;
|
|
||||||
SDisk *pDisk = NULL;
|
|
||||||
|
|
||||||
ASSERT(pTier->nDisks > 0);
|
|
||||||
|
|
||||||
for (int i = 0; i < pTier->nDisks; i++) {
|
|
||||||
SDisk *iDisk = pTier->disks[i];
|
|
||||||
if (dnodeUpdateDiskMeta(iDisk) < 0) return NULL;
|
|
||||||
if (DNODE_DISK_AVAIL(iDisk)) {
|
|
||||||
if (pDisk == NULL || pDisk->dmeta.nfiles > iDisk->dmeta.nfiles) {
|
|
||||||
pDisk = iDisk;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pDisk == NULL) {
|
|
||||||
terrno = TSDB_CODE_DND_NO_DISK_SPACE;
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDisk *dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName) {
|
|
||||||
char fdirName[TSDB_FILENAME_LEN] = "\0";
|
|
||||||
SDiskID *pDiskID = NULL;
|
|
||||||
|
|
||||||
if (dnodeFormatDir(dirName, fdirName) < 0) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *ptr = taosHashGet(pDnodeTier->map, (void *)fdirName, strnlen(fdirName, TSDB_FILENAME_LEN));
|
|
||||||
if (ptr == NULL) return NULL;
|
|
||||||
pDiskID = (SDiskID *)ptr;
|
|
||||||
|
|
||||||
return dnodeGetDisk(pDnodeTier, pDiskID->level, pDiskID->did);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int dnodeFormatDir(char *idir, char *odir) {
|
|
||||||
wordexp_t wep;
|
|
||||||
|
|
||||||
int code = wordexp(idir, &wep, 0);
|
|
||||||
if (code != 0) {
|
|
||||||
dError("failed to format dir %s since %s", idir, strerror(code));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (realpath(wep.we_wordv[0], odir) == NULL) {
|
|
||||||
dError("failed to format dir %s since %s", idir, strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
wordfree(&wep);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
wordfree(&wep);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int dnodeCheckDisk(char *dirName, int level, int primary) {
|
|
||||||
if (access(dirName, W_OK | R_OK | F_OK) != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct stat pstat;
|
|
||||||
if (stat(dirName, &pstat) < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (S_ISDIR(pstat.st_mode)) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_DND_DISK_NOT_DIRECTORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int dnodeUpdateDiskMeta(SDisk *pDisk) {
|
|
||||||
struct statvfs dstat;
|
|
||||||
if (statvfs(pDisk->dir, &dstat) < 0) {
|
|
||||||
dError("failed to get dir %s information since %s", pDisk->dir, strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDisk->dmeta.size = dstat.f_bsize * dstat.f_blocks;
|
|
||||||
pDisk->dmeta.free = dstat.f_bsize * dstat.f_bavail;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int dnodeAddDisk(SDnodeTier *pDnodeTier, char *dir, int level, int primary) {
|
|
||||||
char dirName[TSDB_FILENAME_LEN] = "\0";
|
|
||||||
STier * pTier = NULL;
|
|
||||||
SDiskID diskid = {0};
|
|
||||||
SDisk * pDisk = NULL;
|
|
||||||
|
|
||||||
if (level < 0 || level >= TSDB_MAX_TIERS) {
|
|
||||||
terrno = TSDB_CODE_DND_INVALID_DISK_TIER;
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dnodeFormatDir(dir, dirName) < 0) {
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTier = pDnodeTier->tiers + level;
|
|
||||||
diskid.level = level;
|
|
||||||
|
|
||||||
if (pTier->nDisks >= TSDB_MAX_DISKS_PER_TIER) {
|
|
||||||
terrno = TSDB_CODE_DND_TOO_MANY_DISKS;
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dnodeGetDiskByName(pDnodeTier, dirName) != NULL) {
|
|
||||||
terrno = TSDB_CODE_DND_DISK_ALREADY_EXISTS;
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (dnodeCheckDisk(dirName, level, primary) < 0) {
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (primary) {
|
|
||||||
if (level != 0) {
|
|
||||||
terrno = TSDB_CODE_DND_INVALID_DISK_TIER;
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (DNODE_PRIMARY_DISK(pDnodeTier) != NULL) {
|
|
||||||
terrno = TSDB_CODE_DND_DUPLICATE_PRIMARY_DISK;
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
diskid.did = 0;
|
|
||||||
} else {
|
|
||||||
if (level == 0) {
|
|
||||||
if (DNODE_PRIMARY_DISK(pDnodeTier) != NULL) {
|
|
||||||
diskid.did = pTier->nDisks;
|
|
||||||
} else {
|
|
||||||
diskid.did = pTier->nDisks + 1;
|
|
||||||
if (diskid.did >= TSDB_MAX_DISKS_PER_TIER) {
|
|
||||||
terrno = TSDB_CODE_DND_TOO_MANY_DISKS;
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
diskid.did = pTier->nDisks;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pDisk = (SDisk *)calloc(1, sizeof(SDisk));
|
|
||||||
if (pDisk == NULL) {
|
|
||||||
terrno = TSDB_CODE_DND_OUT_OF_MEMORY;
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
strncpy(pDisk->dir, dirName, TSDB_FILENAME_LEN);
|
|
||||||
|
|
||||||
if (taosHashPut(pDnodeTier->map, (void *)dirName, strnlen(dirName, TSDB_FILENAME_LEN), (void *)(&diskid),
|
|
||||||
sizeof(diskid)) < 0) {
|
|
||||||
free(pDisk);
|
|
||||||
terrno = TSDB_CODE_DND_OUT_OF_MEMORY;
|
|
||||||
dError("failed to add disk %s to tier %d level since %s", dir, level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTier->nDisks++;
|
|
||||||
pTier->disks[diskid.did] = pDisk;
|
|
||||||
pDnodeTier->nTiers = MAX(pDnodeTier->nTiers, level);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
|
@ -73,81 +73,6 @@ void dnodeDelayReprocessMnodeWriteMsg(void *pMsg);
|
||||||
|
|
||||||
void dnodeSendStatusMsgToMnode();
|
void dnodeSendStatusMsgToMnode();
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int level;
|
|
||||||
int did;
|
|
||||||
} SDiskID;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint64_t size;
|
|
||||||
uint64_t free;
|
|
||||||
uint64_t nfiles;
|
|
||||||
} SDiskMeta;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char dir[TSDB_FILENAME_LEN];
|
|
||||||
SDiskMeta dmeta;
|
|
||||||
} SDisk;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int level;
|
|
||||||
int nDisks;
|
|
||||||
SDisk *disks[TSDB_MAX_DISKS_PER_TIER];
|
|
||||||
} STier;
|
|
||||||
|
|
||||||
typedef struct SDnodeTier {
|
|
||||||
pthread_rwlock_t rwlock;
|
|
||||||
int nTiers;
|
|
||||||
STier tiers[TSDB_MAX_TIERS];
|
|
||||||
SHashObj * map;
|
|
||||||
} SDnodeTier;
|
|
||||||
|
|
||||||
extern struct SDnodeTier *tsDnodeTier;
|
|
||||||
#define DNODE_PRIMARY_DISK(pDnodeTier) (pDnodeTier)->tiers[0].disks[0]
|
|
||||||
|
|
||||||
static FORCE_INLINE int dnodeRLockTiers(SDnodeTier *pDnodeTier) {
|
|
||||||
int code = pthread_rwlock_rdlock(&(pDnodeTier->rwlock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int dnodeWLockTiers(SDnodeTier *pDnodeTier) {
|
|
||||||
int code = pthread_rwlock_wrlock(&(pDnodeTier->rwlock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int dnodeUnLockTiers(SDnodeTier *pDnodeTier) {
|
|
||||||
int code = pthread_rwlock_unlock(&(pDnodeTier->rwlock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE SDisk *dnodeGetDisk(SDnodeTier *pDnodeTier, int level, int did) {
|
|
||||||
if (level < 0 || level >= pDnodeTier->nTiers) return NULL;
|
|
||||||
|
|
||||||
if (did < 0 || did >= pDnodeTier->tiers[level].nDisks) return NULL;
|
|
||||||
|
|
||||||
return pDnodeTier->tiers[level].disks[did];
|
|
||||||
}
|
|
||||||
|
|
||||||
SDnodeTier *dnodeNewTier();
|
|
||||||
void * dnodeCloseTier(SDnodeTier *pDnodeTier);
|
|
||||||
int dnodeAddDisks(SDnodeTier *pDnodeTier, SDiskCfg *pDiskCfgs, int ndisks);
|
|
||||||
int dnodeUpdateTiersInfo(SDnodeTier *pDnodeTier);
|
|
||||||
int dnodeCheckTiers(SDnodeTier *pDnodeTier);
|
|
||||||
SDisk * dnodeAssignDisk(SDnodeTier *pDnodeTier, int level);
|
|
||||||
SDisk * dnodeGetDiskByName(SDnodeTier *pDnodeTier, char *dirName);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tskiplist.h"
|
#include "tskiplist.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "tdisk.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -474,13 +475,13 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
|
||||||
void tsdbFreeFileH(STsdbFileH* pFileH);
|
void tsdbFreeFileH(STsdbFileH* pFileH);
|
||||||
int tsdbOpenFileH(STsdbRepo* pRepo);
|
int tsdbOpenFileH(STsdbRepo* pRepo);
|
||||||
void tsdbCloseFileH(STsdbRepo* pRepo);
|
void tsdbCloseFileH(STsdbRepo* pRepo);
|
||||||
SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid);
|
SFileGroup* tsdbCreateFGroup(STsdbRepo* pRepo, int fid);
|
||||||
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
|
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
|
||||||
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
|
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
|
||||||
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
|
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
|
||||||
int tsdbOpenFile(SFile* pFile, int oflag);
|
int tsdbOpenFile(SFile* pFile, int oflag);
|
||||||
void tsdbCloseFile(SFile* pFile);
|
void tsdbCloseFile(SFile* pFile);
|
||||||
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type);
|
int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type, SDisk* pDisk);
|
||||||
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
|
SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags);
|
||||||
void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, int mfid);
|
void tsdbRemoveFilesBeyondRetention(STsdbRepo* pRepo, int mfid);
|
||||||
int tsdbUpdateFileHeader(SFile* pFile);
|
int tsdbUpdateFileHeader(SFile* pFile);
|
||||||
|
|
|
@ -23,10 +23,9 @@
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tsdbMain.h"
|
#include "tsdbMain.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "dnode.h"
|
|
||||||
#include "tpath.h"
|
#include "tpath.h"
|
||||||
|
#include "tdisk.h"
|
||||||
|
|
||||||
struct SDnodeTier *tsDnodeTier = NULL;
|
|
||||||
const char * tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
const char * tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
|
||||||
|
|
||||||
static void tsdbDestroyFile(SFile *pFile);
|
static void tsdbDestroyFile(SFile *pFile);
|
||||||
|
@ -109,43 +108,35 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
|
SFileGroup *tsdbCreateFGroup(STsdbRepo *pRepo, int fid) {
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
STsdbCfg * pCfg = &(pRepo->config);
|
SFileGroup fGroup = {0};
|
||||||
|
|
||||||
if (pFileH->nFGroups >= pFileH->maxFGroups) {
|
ASSERT(tsdbSearchFGroup(pFileH, fid, TD_EQ) == NULL);
|
||||||
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
|
|
||||||
if (pFileH->pFGroup[0].fileId < mfid) {
|
// TODO: think about if (level == 0) is correct
|
||||||
pthread_rwlock_wrlock(&pFileH->fhlock);
|
SDisk *pDisk = dnodeAssignDisk(tsDnodeTier, 0);
|
||||||
tsdbRemoveFileGroup(pRepo, &(pFileH->pFGroup[0]));
|
if (pDisk == NULL) {
|
||||||
pthread_rwlock_unlock(&pFileH->fhlock);
|
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||||
}
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pFileH->nFGroups < pFileH->maxFGroups);
|
fGroup.fileId = fid;
|
||||||
|
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
SFileGroup fGroup;
|
if (tsdbCreateFile(&(fGroup.files[type]), pRepo, fid, type, pDisk) < 0) goto _err;
|
||||||
SFileGroup *pFGroup = &fGroup;
|
|
||||||
|
|
||||||
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
|
||||||
if (pGroup == NULL) { // if not exists, create one
|
|
||||||
pFGroup->fileId = fid;
|
|
||||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
|
||||||
if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0)
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&pFileH->fhlock);
|
|
||||||
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
|
|
||||||
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
|
||||||
pthread_rwlock_unlock(&pFileH->fhlock);
|
|
||||||
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pGroup;
|
pthread_rwlock_wrlock(&pFileH->fhlock);
|
||||||
|
pFileH->pFGroup[pFileH->nFGroups++] = fGroup;
|
||||||
|
qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup);
|
||||||
|
pthread_rwlock_unlock(&pFileH->fhlock);
|
||||||
|
return tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]);
|
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
tsdbDestroyFile(&(fGroup.files[type]));
|
||||||
|
}
|
||||||
|
dnodeDecDiskFiles(tsDnodeTier, pDisk, true);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,7 +231,7 @@ void tsdbCloseFile(SFile *pFile) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
|
int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type, SDisk *pDisk) {
|
||||||
memset((void *)pFile, 0, sizeof(SFile));
|
memset((void *)pFile, 0, sizeof(SFile));
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
|
|
||||||
|
@ -348,7 +339,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
||||||
|
|
||||||
SFileGroup fileGroup = *pFGroup;
|
SFileGroup fileGroup = *pFGroup;
|
||||||
tsdbGetBaseDirFromFile(fileGroup.files[0].fname, baseDir);
|
tsdbGetBaseDirFromFile(fileGroup.files[0].fname, baseDir);
|
||||||
pDisk = dnodeGetDiskByName(baseDir);
|
pDisk = dnodeGetDiskByName(tsDnodeTier, baseDir);
|
||||||
ASSERT(pDisk != NULL);
|
ASSERT(pDisk != NULL);
|
||||||
|
|
||||||
int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
|
int nFilesLeft = pFileH->nFGroups - (int)(POINTER_DISTANCE(pFGroup, pFileH->pFGroup) / sizeof(SFileGroup) + 1);
|
||||||
|
|
|
@ -472,7 +472,6 @@ static void *tsdbCommitData(void *arg) {
|
||||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||||
SCommitIter *iters = NULL;
|
SCommitIter *iters = NULL;
|
||||||
SRWHelper whelper = {0};
|
SRWHelper whelper = {0};
|
||||||
STsdbFileH * pFileH = pRepo->tsdbFileH;
|
|
||||||
TSKEY minKey = 0, maxKey = 0;
|
TSKEY minKey = 0, maxKey = 0;
|
||||||
ASSERT(pRepo->commit == 1);
|
ASSERT(pRepo->commit == 1);
|
||||||
ASSERT(pMem != NULL);
|
ASSERT(pMem != NULL);
|
||||||
|
@ -605,7 +604,6 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *mi
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||||
char * dataDir = NULL;
|
|
||||||
STsdbCfg * pCfg = &pRepo->config;
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
SFileGroup *pGroup = NULL;
|
SFileGroup *pGroup = NULL;
|
||||||
|
@ -623,19 +621,11 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) {
|
if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) {
|
||||||
// file group not exists
|
pGroup = tsdbCreateFGroup(pRepo, fid);
|
||||||
}
|
if (pGroup == NULL) {
|
||||||
|
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||||
// Create and open files for commit
|
return -1;
|
||||||
dataDir = tsdbGetDataDirName(pRepo->rootDir);
|
}
|
||||||
if (dataDir == NULL) {
|
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
|
|
||||||
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open files for write/read
|
// Open files for write/read
|
||||||
|
@ -695,7 +685,6 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTFree(dataDir);
|
|
||||||
tsdbCloseHelperFile(pHelper, 0, pGroup);
|
tsdbCloseHelperFile(pHelper, 0, pGroup);
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
|
@ -717,7 +706,6 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
taosTFree(dataDir);
|
|
||||||
tsdbCloseHelperFile(pHelper, 1, NULL);
|
tsdbCloseHelperFile(pHelper, 1, NULL);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
#include "tscompression.h"
|
#include "tscompression.h"
|
||||||
#include "tsdbMain.h"
|
#include "tsdbMain.h"
|
||||||
|
#include "tpath.h"
|
||||||
|
|
||||||
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
|
#define TSDB_GET_COMPCOL_LEN(nCols) (sizeof(SCompData) + sizeof(SCompCol) * (nCols) + sizeof(TSCKSUM))
|
||||||
#define TSDB_KEY_COL_OFFSET 0
|
#define TSDB_KEY_COL_OFFSET 0
|
||||||
|
@ -104,18 +105,22 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
ASSERT(pHelper != NULL && pGroup != NULL);
|
ASSERT(pHelper != NULL && pGroup != NULL);
|
||||||
SFile * pFile = NULL;
|
SFile * pFile = NULL;
|
||||||
STsdbRepo *pRepo = pHelper->pRepo;
|
STsdbRepo *pRepo = pHelper->pRepo;
|
||||||
|
char baseDir[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
char tsdbRootDir[TSDB_FILENAME_LEN] = "\0";
|
||||||
|
|
||||||
// Clear the helper object
|
// Clear the helper object
|
||||||
tsdbResetHelper(pHelper);
|
tsdbResetHelper(pHelper);
|
||||||
|
|
||||||
ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
|
ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
|
||||||
|
|
||||||
|
tsdbGetBaseDirFromFile(pGroup->files[0].fname, baseDir);
|
||||||
|
tdGetTsdbRootDir(baseDir, REPO_ID(pRepo), tsdbRootDir);
|
||||||
// Set the files
|
// Set the files
|
||||||
pHelper->files.fGroup = *pGroup;
|
pHelper->files.fGroup = *pGroup;
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD,
|
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NHEAD,
|
||||||
helperNewHeadF(pHelper)->fname);
|
helperNewHeadF(pHelper)->fname);
|
||||||
tsdbGetDataFileName(pRepo->rootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
|
tsdbGetDataFileName(tsdbRootDir, REPO_ID(pRepo), pGroup->fileId, TSDB_FILE_TYPE_NLAST,
|
||||||
helperNewLastF(pHelper)->fname);
|
helperNewLastF(pHelper)->fname);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,5 +11,5 @@ AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
|
||||||
IF (TD_LINUX)
|
IF (TD_LINUX)
|
||||||
ADD_LIBRARY(vnode ${SRC})
|
ADD_LIBRARY(vnode ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(vnode tsdb tcq)
|
TARGET_LINK_LIBRARIES(vnode tsdb tcq common)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
|
@ -30,8 +30,8 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "dnode.h"
|
|
||||||
#include "tpath.h"
|
#include "tpath.h"
|
||||||
|
#include "tdisk.h"
|
||||||
|
|
||||||
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
|
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue