more code

This commit is contained in:
Hongze Cheng 2025-02-08 14:34:22 +08:00
parent f2dafd9362
commit 499cf0c294
4 changed files with 62 additions and 25 deletions

View File

@ -1813,12 +1813,14 @@ int32_t tsdbAllocateDisk(STsdb *tsdb, int32_t fid, const char *label, SDiskID *d
int32_t expectedLevel = tsdbFidLevel(fid, &tsdb->keepCfg, taosGetTimestampSec()); int32_t expectedLevel = tsdbFidLevel(fid, &tsdb->keepCfg, taosGetTimestampSec());
code = tfsAllocDisk(tfs, expectedLevel, label, &did); code = tfsAllocDisk(tfs, expectedLevel, label, &did);
if (code) { if (code) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
tstrerror(code));
return code; return code;
} }
if (tfsMkdirRecurAt(tfs, tsdb->path, did) != 0) { if (tfsMkdirRecurAt(tfs, tsdb->path, did) != 0) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
tstrerror(code));
} }
if (diskId) { if (diskId) {
@ -1840,7 +1842,8 @@ int32_t tsdbAllocateDiskAtLevel(STsdb *tsdb, int32_t fid, int32_t level, const c
} }
if (tfsMkdirRecurAt(tfs, tsdb->path, did) != 0) { if (tfsMkdirRecurAt(tfs, tsdb->path, did) != 0) {
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(tsdb->pVnode), __func__, __FILE__, __LINE__,
tstrerror(code));
} }
if (diskId) { if (diskId) {

View File

@ -50,10 +50,11 @@ typedef struct {
typedef struct { typedef struct {
TdThreadSpinlock lock; TdThreadSpinlock lock;
int32_t level; int32_t level;
int32_t nextid; // next disk id to allocate int32_t nextid;
int32_t ndisk; // # of disks mounted to this tier int32_t ndisk; // # of disks mounted to this tier
int32_t nAvailDisks; // # of Available disks int32_t nAvailDisks; // # of Available disks
STfsDisk *disks[TFS_MAX_DISKS_PER_TIER]; STfsDisk *disks[TFS_MAX_DISKS_PER_TIER];
SHashObj *hash; // label -> nextid
SDiskSize size; SDiskSize size;
} STfsTier; } STfsTier;
@ -87,7 +88,7 @@ int32_t tfsInitTier(STfsTier *pTier, int32_t level);
void tfsDestroyTier(STfsTier *pTier); void tfsDestroyTier(STfsTier *pTier);
int32_t tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg, STfsDisk **ppDisk); int32_t tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg, STfsDisk **ppDisk);
void tfsUpdateTierSize(STfsTier *pTier); void tfsUpdateTierSize(STfsTier *pTier);
int32_t tfsAllocDiskOnTier(STfsTier *pTier); int32_t tfsAllocDiskOnTier(STfsTier *pTier, const char *label);
void tfsPosNextId(STfsTier *pTier); void tfsPosNextId(STfsTier *pTier);
#define tfsLockTier(pTier) taosThreadSpinLock(&(pTier)->lock) #define tfsLockTier(pTier) taosThreadSpinLock(&(pTier)->lock)

View File

@ -156,7 +156,7 @@ int32_t tfsAllocDiskAtLevel(STfs *pTfs, int32_t level, const char *label, SDiskI
pDiskId->level = level; pDiskId->level = level;
pDiskId->id = -1; pDiskId->id = -1;
pDiskId->id = tfsAllocDiskOnTier(&pTfs->tiers[pDiskId->level]); pDiskId->id = tfsAllocDiskOnTier(&pTfs->tiers[pDiskId->level], label);
if (pDiskId->id < 0) { if (pDiskId->id < 0) {
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK); TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
} }
@ -165,10 +165,8 @@ int32_t tfsAllocDiskAtLevel(STfs *pTfs, int32_t level, const char *label, SDiskI
} }
int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, const char *label, SDiskID *pDiskId) { int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, const char *label, SDiskID *pDiskId) {
if (expLevel >= pTfs->nlevel) { if (expLevel >= pTfs->nlevel || expLevel < 0) {
expLevel = pTfs->nlevel - 1; expLevel = pTfs->nlevel - 1;
} else if (expLevel < 0) {
expLevel = 0;
} }
for (; expLevel >= 0; expLevel--) { for (; expLevel >= 0; expLevel--) {

View File

@ -18,6 +18,26 @@
extern int64_t tsMinDiskFreeSize; extern int64_t tsMinDiskFreeSize;
typedef struct {
int32_t nextid;
} SDiskCursor;
static SDiskCursor *tfsDiskCursorNew(int32_t nextid) {
SDiskCursor *pCursor = (SDiskCursor *)taosMemoryMalloc(sizeof(SDiskCursor));
if (pCursor == NULL) {
return NULL;
}
pCursor->nextid = nextid;
return pCursor;
}
static void tfsDiskCursorFree(void *p) {
if (p) {
SDiskCursor *pCursor = *(SDiskCursor **)p;
taosMemoryFree(pCursor);
}
}
int32_t tfsInitTier(STfsTier *pTier, int32_t level) { int32_t tfsInitTier(STfsTier *pTier, int32_t level) {
(void)memset(pTier, 0, sizeof(STfsTier)); (void)memset(pTier, 0, sizeof(STfsTier));
@ -26,10 +46,17 @@ int32_t tfsInitTier(STfsTier *pTier, int32_t level) {
} }
pTier->level = level; pTier->level = level;
pTier->hash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pTier->hash == NULL) {
TAOS_RETURN(terrno);
}
taosHashSetFreeFp(pTier->hash, tfsDiskCursorFree);
return 0; return 0;
} }
void tfsDestroyTier(STfsTier *pTier) { void tfsDestroyTier(STfsTier *pTier) {
taosHashCleanup(pTier->hash);
pTier->hash = NULL;
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; id++) { for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; id++) {
pTier->disks[id] = tfsFreeDisk(pTier->disks[id]); pTier->disks[id] = tfsFreeDisk(pTier->disks[id]);
} }
@ -108,7 +135,7 @@ void tfsUpdateTierSize(STfsTier *pTier) {
} }
// Round-Robin to allocate disk on a tier // Round-Robin to allocate disk on a tier
int32_t tfsAllocDiskOnTier(STfsTier *pTier) { int32_t tfsAllocDiskOnTier(STfsTier *pTier, const char *label) {
TAOS_UNUSED(tfsLockTier(pTier)); TAOS_UNUSED(tfsLockTier(pTier));
if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) { if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) {
@ -116,11 +143,31 @@ int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK); TAOS_RETURN(TSDB_CODE_FS_NO_VALID_DISK);
} }
// get the existing cursor or create a new one
SDiskCursor *pCursor = NULL;
void *p = taosHashGet(pTier->hash, label, strlen(label));
if (p) {
pCursor = *(SDiskCursor **)p;
} else {
pCursor = tfsDiskCursorNew(pTier->nextid);
if (pCursor == NULL) {
TAOS_UNUSED(tfsUnLockTier(pTier));
TAOS_RETURN(terrno);
}
int32_t code = taosHashPut(pTier->hash, label, strlen(label), &pCursor, sizeof(pCursor));
if (code != 0) {
TAOS_UNUSED(tfsUnLockTier(pTier));
TAOS_RETURN(code);
}
}
// do allocation
int32_t retId = -1; int32_t retId = -1;
int64_t avail = 0; int64_t avail = 0;
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) { for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) {
#if 1 // round-robin int32_t diskId = (pCursor->nextid + id) % pTier->ndisk;
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
STfsDisk *pDisk = pTier->disks[diskId]; STfsDisk *pDisk = pTier->disks[diskId];
if (pDisk == NULL) continue; if (pDisk == NULL) continue;
@ -139,20 +186,8 @@ int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
retId = diskId; retId = diskId;
terrno = 0; terrno = 0;
pTier->nextid = (diskId + 1) % pTier->ndisk; pCursor->nextid = (diskId + 1) % pTier->ndisk;
break; break;
#else // select the disk with the most available space
STfsDisk *pDisk = pTier->disks[id];
if (pDisk == NULL) continue;
if (pDisk->size.avail < tsMinDiskFreeSize) continue;
if (pDisk->size.avail > avail) {
avail = pDisk->size.avail;
retId = id;
terrno = 0;
}
#endif
} }
TAOS_UNUSED(tfsUnLockTier(pTier)); TAOS_UNUSED(tfsUnLockTier(pTier));