add file size statistics

This commit is contained in:
Hongze Cheng 2020-08-08 17:14:57 +08:00
parent bd7cb46ae4
commit 222ebe3602
4 changed files with 42 additions and 14 deletions

View File

@ -154,11 +154,19 @@ typedef enum {
TSDB_FILE_TYPE_HEAD = 0, TSDB_FILE_TYPE_HEAD = 0,
TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_DATA,
TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_LAST,
TSDB_FILE_TYPE_MAX, TSDB_FILE_TYPE_STAT,
TSDB_FILE_TYPE_NHEAD, TSDB_FILE_TYPE_NHEAD,
TSDB_FILE_TYPE_NLAST TSDB_FILE_TYPE_NDATA,
TSDB_FILE_TYPE_NLAST,
TSDB_FILE_TYPE_NSTAT
} TSDB_FILE_TYPE; } TSDB_FILE_TYPE;
#ifndef TDINTERNAL
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1)
#else
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1)
#endif
typedef struct { typedef struct {
uint32_t magic; uint32_t magic;
uint32_t len; uint32_t len;
@ -497,7 +505,7 @@ int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
void tsdbDestroyHelper(SRWHelper* pHelper); void tsdbDestroyHelper(SRWHelper* pHelper);
void tsdbResetHelper(SRWHelper* pHelper); void tsdbResetHelper(SRWHelper* pHelper);
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError); int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup);
int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey); int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);

View File

@ -20,7 +20,7 @@
#include "tutil.h" #include "tutil.h"
#define TAOS_RANDOM_FILE_FAIL_TEST #define TAOS_RANDOM_FILE_FAIL_TEST
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"}; const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile); static void tsdbDestroyFile(SFile *pFile);

View File

@ -679,7 +679,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
} }
taosTFree(dataDir); taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 0); tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock)); pthread_rwlock_wrlock(&(pFileH->fhlock));
@ -701,7 +701,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
_err: _err:
taosTFree(dataDir); taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 1); tsdbCloseHelperFile(pHelper, 1, NULL);
return -1; return -1;
} }

View File

@ -91,7 +91,7 @@ void tsdbResetHelper(SRWHelper *pHelper) {
tsdbResetHelperTableImpl(pHelper); tsdbResetHelperTableImpl(pHelper);
// Reset the file part // Reset the file part
tsdbCloseHelperFile(pHelper, false); tsdbCloseHelperFile(pHelper, false, NULL);
tsdbResetHelperFileImpl(pHelper); tsdbResetHelperFileImpl(pHelper);
pHelper->state = TSDB_HELPER_CLEAR_STATE; pHelper->state = TSDB_HELPER_CLEAR_STATE;
@ -120,6 +120,14 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) return -1; if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) return -1;
if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) return -1; if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) return -1;
// NOTE: For data file compatibility
if (helperDataF(pHelper)->info.size == TSDB_FILE_HEAD_SIZE) {
helperDataF(pHelper)->info.size = (uint64_t)lseek(helperDataF(pHelper)->fd, 0, SEEK_END);
}
if (helperLastF(pHelper)->info.size == TSDB_FILE_HEAD_SIZE) {
helperLastF(pHelper)->info.size = (uint64_t)lseek(helperLastF(pHelper)->fd, 0, SEEK_END);
}
// Create and open .h // Create and open .h
pFile = helperNewHeadF(pHelper); pFile = helperNewHeadF(pHelper);
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1; if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
@ -146,7 +154,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
return 0; return 0;
} }
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError, SFileGroup *pGroup) {
SFile *pFile = NULL; SFile *pFile = NULL;
pFile = helperHeadF(pHelper); pFile = helperHeadF(pHelper);
@ -157,10 +165,11 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (helperType(pHelper) == TSDB_WRITE_HELPER) { if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (!hasError) { if (!hasError) {
tsdbUpdateFileHeader(pFile); tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else { } else {
// TODO: shrink back to origin ASSERT(pGroup != NULL);
taosFtruncate(pFile->fd, pGroup->files[TSDB_FILE_TYPE_DATA].info.size);
} }
fsync(pFile->fd);
} }
tsdbCloseFile(pFile); tsdbCloseFile(pFile);
} }
@ -170,10 +179,11 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) { if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (!hasError) { if (!hasError) {
tsdbUpdateFileHeader(pFile); tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else { } else {
// TODO: shrink back to origin ASSERT(pGroup != NULL);
taosFtruncate(pFile->fd, pGroup->files[TSDB_FILE_TYPE_LAST].info.size);
} }
fsync(pFile->fd);
} }
tsdbCloseFile(pFile); tsdbCloseFile(pFile);
} }
@ -390,6 +400,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len); void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len);
pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx)); pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx));
pFile->info.size += pIdx->len;
ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
} }
return 0; return 0;
@ -420,7 +433,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
return -1; return -1;
} }
pFile->info.offset = offset; ASSERT(offset == pFile->info.size);
if (taosTWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < pFile->info.len) { if (taosTWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < pFile->info.len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len, tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
@ -429,6 +442,10 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
return -1; return -1;
} }
pFile->info.offset = offset;
pFile->info.size += pFile->info.len;
ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
return 0; return 0;
} }
@ -803,6 +820,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
(int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, (int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst,
pCompBlock->keyLast); pCompBlock->keyLast);
pFile->info.size += pCompBlock->len;
ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
return 0; return 0;
_err: _err:
@ -1016,7 +1036,7 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) {
} }
static void tsdbDestroyHelperFile(SRWHelper *pHelper) { static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
tsdbCloseHelperFile(pHelper, false); tsdbCloseHelperFile(pHelper, false, NULL);
tsdbResetHelperFileImpl(pHelper); tsdbResetHelperFileImpl(pHelper);
taosTZfree(pHelper->idxH.pIdxArray); taosTZfree(pHelper->idxH.pIdxArray);
taosTZfree(pHelper->pWIdx); taosTZfree(pHelper->pWIdx);