This commit is contained in:
Hongze Cheng 2020-06-16 09:12:21 +00:00
parent 32dcbfe52a
commit ab64511bea
2 changed files with 93 additions and 98 deletions

View File

@ -111,7 +111,13 @@ typedef struct {
} SMemTable; } SMemTable;
// ------------------ tsdbFile.c // ------------------ tsdbFile.c
typedef enum { TSDB_FILE_TYPE_HEAD = 0, TSDB_FILE_TYPE_DATA, TSDB_FILE_TYPE_LAST, TSDB_FILE_TYPE_MAX } TSDB_FILE_TYPE; typedef enum {
TSDB_FILE_TYPE_HEAD = 0,
TSDB_FILE_TYPE_DATA,
TSDB_FILE_TYPE_LAST,
TSDB_FILE_TYPE_NHEAD,
TSDB_FILE_TYPE_NLAST
} TSDB_FILE_TYPE;
typedef struct { typedef struct {
uint32_t offset; uint32_t offset;
@ -123,8 +129,9 @@ typedef struct {
} STsdbFileInfo; } STsdbFileInfo;
typedef struct { typedef struct {
char* fname; char* fname;
int fd; int fd;
STsdbFileInfo info; STsdbFileInfo info;
} SFile; } SFile;
@ -136,6 +143,8 @@ typedef struct {
} SFileGroup; } SFileGroup;
typedef struct { typedef struct {
pthread_rwlock_t fhlock;
int maxFGroups; int maxFGroups;
int nFGroups; int nFGroups;
SFileGroup* pFGroup; SFileGroup* pFGroup;

View File

@ -29,13 +29,7 @@
#include "tutil.h" #include "tutil.h"
#include "ttime.h" #include "ttime.h"
const char *tsdbFileSuffix[] = { const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".h", ".l"};
".head",
".data",
".last",
".h",
".l"
};
// ---------------- INTERNAL FUNCTIONS ---------------- // ---------------- INTERNAL FUNCTIONS ----------------
STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
@ -45,6 +39,13 @@ STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) {
goto _err; goto _err;
} }
int code = pthread_rwlock_init(&(pFileH->fhlock));
if (code != 0) {
tsdbError("vgId:%d failed to init file handle lock since %s", pCfg->tsdbId, strerror(code));
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pFileH->maxFGroups = TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile); pFileH->maxFGroups = TSDB_MAX_FILE(pCfg->keep, pCfg->daysPerFile);
pFileH->pFGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup)); pFileH->pFGroup = (SFileGroup *)calloc(pFileH->maxFGroups, sizeof(SFileGroup));
@ -62,6 +63,7 @@ _err:
void tsdbFreeFileH(STsdbFileH *pFileH) { void tsdbFreeFileH(STsdbFileH *pFileH) {
if (pFileH) { if (pFileH) {
pthread_rwlock_destroy(&pFileH->fhlock);
tfree(pFileH->pFGroup); tfree(pFileH->pFGroup);
free(pFileH); free(pFileH);
} }
@ -74,7 +76,9 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
DIR * dir = NULL; DIR * dir = NULL;
int fid = 0; int fid = 0;
tsdbGetDataDirName(pRepo->rootDir); STsdbFileH pFileH = pRepo->tsdbFileH;
tDataDir = tsdbGetDataDirName(pRepo->rootDir);
if (tDataDir == NULL) { if (tDataDir == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err; goto _err;
@ -91,9 +95,15 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) {
while ((dp = readdir(dir)) != NULL) { while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
sscanf(dp->d_name, "f%d", &fid); sscanf(dp->d_name, "f%d", &fid);
// if (tsdbOpenFGroup(pFileH, dataDir, fid) < 0) {
// break; SFileGroup fileGroup = {0};
// }
if (tsdbSearchFGroup(pFileH, fid, TD_EQ) != NULL) continue;
for (int type = TSDB_FILE_TYPE_HEAD; type <= TSDB_FILE_TYPE_LAST; type++) {
}
for (int type = TSDB_FILE_TYPE_NHEAD; type <= TSDB_FILE_TYPE_NLAST; type++) {
}
} }
tfree(tDataDir); tfree(tDataDir);
@ -106,10 +116,12 @@ _err:
return -1; return -1;
} }
void tsdbCloseFileH(STsdbFileH *pFileH) { void tsdbCloseFileH(STsdbRepo *pRepo) {
if (pFileH) { STsdbFileH *pFileH = pRepo->tsdbFileH;
tfree(pFileH->fGroup);
free(pFileH); for (int i = 0; i < pFileH->nFGroups; i++) {
// TODO
} }
} }
@ -146,7 +158,7 @@ _err:
int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
SFileGroup *pGroup = SFileGroup *pGroup =
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc);
if (pGroup == NULL) return -1; if (pGroup == NULL) return -1;
// Remove from disk // Remove from disk
@ -198,7 +210,7 @@ void tsdbSeekFileGroupIter(SFileGroupIter *pIter, int fid) {
} }
int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE; int flags = (pIter->direction == TSDB_FGROUP_ITER_FORWARD) ? TD_GE : TD_LE;
void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), compFGroupKey, flags); void *ptr = taosbsearch(&fid, pIter->base, pIter->numOfFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags);
if (ptr == NULL) { if (ptr == NULL) {
pIter->pFileGroup = NULL; pIter->pFileGroup = NULL;
} else { } else {
@ -226,81 +238,38 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
return ret; return ret;
} }
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) { char *tsdbGetFileName(char *dataDir, int fileId, int type) {
SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx); int tlen = strlen(dataDir) + strlen(tsdbFileSuffix[type]) + 24;
SCompBlock *pStartBlock = NULL;
SCompBlock *pBlock = NULL;
int numOfBlocks = pSuperBlock->numOfSubBlocks;
if (numOfBlocks == 1) char *fname = (char *)malloc(tlen);
pStartBlock = pSuperBlock; if (fname == NULL) {
else terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
pStartBlock = TSDB_COMPBLOCK_AT(pCompInfo, pSuperBlock->offset); return -1;
int maxNumOfCols = 0;
pBlock = pStartBlock;
for (int i = 0; i < numOfBlocks; i++) {
if (pBlock->numOfCols > maxNumOfCols) maxNumOfCols = pBlock->numOfCols;
pBlock++;
} }
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * maxNumOfCols); sprintf(fname, "%s/v%df%d%s", dataDir, fileId, tsdbFileSuffix[type]);
if (pCompData == NULL) return -1;
// Load data from the block
// if (tsdbLoadDataBlock(pOutFile, pStartBlock, numOfBlocks, pCols, pCompData));
// Write data block to the file
{
// TODO
}
if (pCompData) free(pCompData);
return 0;
}
int compFGroupKey(const void *key, const void *fgroup) {
int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup;
if (fid == pFGroup->fileId) {
return 0;
} else {
return fid > pFGroup->fileId? 1:-1;
}
}
int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) {
if (dataDir == NULL || fname == NULL) return -1;
sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix);
return 0; return 0;
} }
int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function int tsdbOpenFile(SFile *pFile, int oflag) {
if (TSDB_IS_FILE_OPENED(pFile)) return -1; ASSERT(!TSDB_IS_FILE_OPENED(pFile));
pFile->fd = open(pFile->fname, oflag, 0755); pFile->fd = open(pFile->fname, oflag, 0755);
if (pFile->fd < 0) return -1; if (pFile->fd < 0) {
tsdbError("failed to open file %s since %s", pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
return 0; return 0;
} }
int tsdbCloseFile(SFile *pFile) { void tsdbCloseFile(SFile *pFile) {
int ret = close(pFile->fd); if (TSDB_IS_FILE_OPENED(pFile)) {
pFile->fd = -1; close(pFile->fd);
return ret; pFile->fd = -1;
}
SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid);
if (pGroup == NULL) return NULL;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbOpenFile(&(pGroup->files[type]), O_RDWR);
} }
return pGroup;
} }
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) { int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) {
@ -331,11 +300,9 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile)
return 0; return 0;
} }
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) {
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { void *ptr =
if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc);
return NULL;
void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey);
if (ptr == NULL) return NULL; if (ptr == NULL) return NULL;
return (SFileGroup *)ptr; return (SFileGroup *)ptr;
} }
@ -362,20 +329,39 @@ static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile
return 0; return 0;
} }
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) { // static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) {
if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0; // if (tsdbSearchFGroup(pFileH, fid) != NULL) return 0;
SFileGroup fGroup = {0}; // SFileGroup fGroup = {0};
fGroup.fileId = fid; // fGroup.fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { // for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbInitFile(dataDir, fid, tsdbFileSuffix[type], &fGroup.files[type]) < 0) return -1; // if (tsdbInitFile(dataDir, fid, tsdbFileSuffix[type], &fGroup.files[type]) < 0) return -1;
} // }
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; // pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); // qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
return 0; // return 0;
} // }
static int compFGroup(const void *arg1, const void *arg2) { static int compFGroup(const void *arg1, const void *arg2) {
return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId; int val1 = ((SFileGroup *)arg1)->fileId;
int val2 = ((SFileGroup *)arg2)->fileId;
if (val1 < val2) {
return -1;
} else if (val1 > val2) {
return 1;
} else {
return 0;
}
}
static int keyFGroupCompFunc(const void *key, const void *fgroup) {
int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup;
if (fid == pFGroup->fileId) {
return 0;
} else {
return fid > pFGroup->fileId ? 1 : -1;
}
} }