diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 96a04ea5ed..ee18396b61 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -78,9 +78,9 @@ STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo); // --------- TSDB REPOSITORY DEFINITION int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg); -int32_t tsdbDropRepo(TSDB_REPO_T *repo); +int32_t tsdbDropRepo(char *rootDir); TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH); -int32_t tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); +void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit); int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg); // --------- TSDB TABLE DEFINITION diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index c26beba21c..fa1b98ec94 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -50,7 +50,7 @@ typedef struct STable { uint64_t suid; struct STable* pSuper; // super table pointer uint8_t numOfSchemas; - STSchema schema[TSDB_MAX_TABLE_SCHEMAS]; + STSchema* schema[TSDB_MAX_TABLE_SCHEMAS]; STSchema* tagSchema; SKVRow tagVal; void* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index @@ -316,9 +316,9 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); void tsdbFreeFileH(STsdbFileH* pFileH); -int* tsdbOpenFileH(STsdbRepo* pRepo); +int tsdbOpenFileH(STsdbRepo* pRepo); void tsdbCloseFileH(STsdbRepo* pRepo); -SFileGroup* tsdbCreateFGroupIfNeed(STsdbFileH* pFileH, char* dataDir, int fid, int maxTables); +SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid, int maxTables); void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction); void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid); SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter); @@ -327,6 +327,7 @@ void tsdbCloseFile(SFile* pFile); int tsdbCreateFile(SFile* pFile, STsdbRepo* pRepo, int fid, int type); SFileGroup* tsdbSearchFGroup(STsdbFileH* pFileH, int fid, int flags); void tsdbFitRetention(STsdbRepo* pRepo); +int tsdbUpdateFileHeader(SFile* pFile, uint32_t version); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state @@ -345,14 +346,38 @@ void tsdbFitRetention(STsdbRepo* pRepo); #define helperRepo(h) (h)->pRepo #define helperState(h) (h)->state +int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo); +int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo); +void tsdbDestroyHelper(SRWHelper* pHelper); +void tsdbResetHelper(SRWHelper* pHelper); +int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup); +int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError); +void tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo); +int tsdbWriteDataBlock(SRWHelper* pHelper, SDataCols* pDataCols); +int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper); +int tsdbWriteCompInfo(SRWHelper* pHelper); +int tsdbWriteCompIdx(SRWHelper* pHelper); +int tsdbLoadCompIdx(SRWHelper* pHelper, void* target); +int tsdbLoadCompInfo(SRWHelper* pHelper, void* target); +int tsdbloadcompdata(srwhelper* phelper, scompblock* pcompblock, void* target); +void tsdbGetDataStatis(SRWHelper* pHelper, SDataStatis* pStatis, int numOfCols); +int tsdbLoadBlockDataCols(SRWHelper* pHelper, SDataCols* pDataCols, int blkIdx, int16_t* colIds, int numOfColIds); +int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SDataCols* target); +int tsdbUpdateFileHeader(SFile* pFile, uint32_t version); +void* tsdbEncodeSFileInfo(void* buf, const STsdbFileInfo* pInfo); +void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); + // ------------------ tsdbMain.c #define REPO_ID(r) (r)->config.tsdbId #define IS_REPO_LOCKED(r) (r)->repoLocked -char* tsdbGetMetaFileName(char* rootDir); -char* tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type); -int tsdbLockRepo(STsdbRepo* pRepo); -int tsdbUnlockRepo(STsdbRepo* pRepo); +char* tsdbGetMetaFileName(char* rootDir); +char* tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type); +int tsdbLockRepo(STsdbRepo* pRepo); +int tsdbUnlockRepo(STsdbRepo* pRepo); +char* tsdbGetDataDirName(char* rootDir); +STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo); +STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo); #if 0 @@ -389,8 +414,6 @@ void tsdbAdjustCacheBlocks(STsdbCache *pCache); int32_t tsdbGetMetaFileName(char *rootDir, char *fname); int tsdbUpdateFileHeader(SFile *pFile, uint32_t version); - -int compFGroupKey(const void *key, const void *fgroup); #endif #ifdef __cplusplus diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 25f8e032ed..8af9441227 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -31,11 +31,13 @@ const char *tsdbFileSuffix[] = {".head", ".data", ".last"}; -static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); -static void tsdbDestroyFile(SFile *pFile); -static int compFGroup(const void *arg1, const void *arg2); -static int keyFGroupCompFunc(const void *key, const void *fgroup); -static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup); +static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type); +static void tsdbDestroyFile(SFile *pFile); +static int compFGroup(const void *arg1, const void *arg2); +static int keyFGroupCompFunc(const void *key, const void *fgroup); +static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup); +static void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo); +static void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo); // ---------------- INTERNAL FUNCTIONS ---------------- STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { @@ -75,7 +77,7 @@ void tsdbFreeFileH(STsdbFileH *pFileH) { } } -int *tsdbOpenFileH(STsdbRepo *pRepo) { +int tsdbOpenFileH(STsdbRepo *pRepo) { ASSERT(pRepo != NULL && pRepo->tsdbFileH != NULL); char *tDataDir = NULL; @@ -83,7 +85,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) { int fid = 0; SFileGroup fileGroup = {0}; - STsdbFileH pFileH = pRepo->tsdbFileH; + STsdbFileH *pFileH = pRepo->tsdbFileH; tDataDir = tsdbGetDataDirName(pRepo->rootDir); if (tDataDir == NULL) { @@ -91,7 +93,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) { goto _err; } - DIR *dir = opendir(tDataDir); + dir = opendir(tDataDir); if (dir == NULL) { tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), tDataDir, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); @@ -105,7 +107,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) { if (tsdbSearchFGroup(pRepo->tsdbFileH, fid, TD_EQ) != NULL) return 0; - fileGroup = {0}; + memset((void *)(&fileGroup), 0, sizeof(SFileGroup)); fileGroup.fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) { @@ -116,7 +118,7 @@ int *tsdbOpenFileH(STsdbRepo *pRepo) { tsdbTrace("vgId:%d file group %d init", REPO_ID(pRepo), fid); - pFileH->[pFileH->nFGroups++] = fileGroup; + pFileH->pFGroup[pFileH->nFGroups++] = fileGroup; qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); } @@ -128,7 +130,7 @@ _err: for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]); tfree(tDataDir); - if (dir != NULL) closedir(tDataDir); + if (dir != NULL) closedir(dir); tsdbCloseFileH(pRepo); return -1; } @@ -139,28 +141,30 @@ void tsdbCloseFileH(STsdbRepo *pRepo) { for (int i = 0; i < pFileH->nFGroups; i++) { SFileGroup *pFGroup = pFileH->pFGroup + i; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - tsdbDestroyFile(pFGroup->files[type]); + tsdbDestroyFile(&pFGroup->files[type]); } } } -SFileGroup *tsdbCreateFGroupIfNeed(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { - if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL; +SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int maxTables) { + STsdbFileH *pFileH = pRepo->tsdbFileH; + + if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL; SFileGroup fGroup; SFileGroup *pFGroup = &fGroup; - SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid); + SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pGroup == NULL) { // if not exists, create one pFGroup->fileId = fid; for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], &(pFGroup->files[type])) < 0) + if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0) goto _err; } - pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; - qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); - return tsdbSearchFGroup(pFileH, fid); + pFileH->pFGroup[pFileH->nFGroups++] = fGroup; + qsort((void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), compFGroup); + return tsdbSearchFGroup(pFileH, fid, TD_EQ); } return pGroup; @@ -172,15 +176,15 @@ _err: void tsdbInitFileGroupIter(STsdbFileH *pFileH, SFileGroupIter *pIter, int direction) { // TODO pIter->direction = direction; - pIter->base = pFileH->fGroup; - pIter->numOfFGroups = pFileH->numOfFGroups; - if (pFileH->numOfFGroups == 0) { + pIter->base = pFileH->pFGroup; + pIter->numOfFGroups = pFileH->nFGroups; + if (pFileH->nFGroups == 0) { pIter->pFileGroup = NULL; } else { if (direction == TSDB_FGROUP_ITER_FORWARD) { - pIter->pFileGroup = pFileH->fGroup; + pIter->pFileGroup = pFileH->pFGroup; } else { - pIter->pFileGroup = pFileH->fGroup + pFileH->numOfFGroups - 1; + pIter->pFileGroup = pFileH->pFGroup + pFileH->nFGroups - 1; } } } @@ -274,7 +278,7 @@ _err: SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid, int flags) { void *ptr = - taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc); + taosbsearch((void *)(&fid), (void *)(pFileH->pFGroup), pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, flags); if (ptr == NULL) return NULL; return (SFileGroup *)ptr; } @@ -289,13 +293,35 @@ void tsdbFitRetention(STsdbRepo *pRepo) { pthread_rwlock_wrlock(&(pFileH->fhlock)); - while (pFileH->numOfFGroups > 0 && pGroup[0].fileId < mfid) { - tsdbRemoveFileGroup(pFileH, pGroup); + while (pFileH->nFGroups > 0 && pGroup[0].fileId < mfid) { + tsdbRemoveFileGroup(pRepo, pGroup); } - pthread_rwlock_unlock(&(pFileH->fhlock)) + pthread_rwlock_unlock(&(pFileH->fhlock)); } +int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { + char buf[TSDB_FILE_HEAD_SIZE] = "\0"; + + void *pBuf = (void *)buf; + pBuf = taosEncodeFixedU32(pBuf, version); + pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info)); + + taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); + + if (lseek(pFile->fd, 0, SEEK_SET) < 0) { + tsdbError("failed to lseek file %s since %s", pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if (twrite(pFile->fd, (void *)buf, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { + tsdbError("failed to write %d bytes to file %s since %s", TSDB_FILE_HEAD_SIZE, pFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + return 0; +} // ---------------- LOCAL FUNCTIONS ---------------- static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { @@ -378,4 +404,26 @@ static void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) { remove(fileGroup.files[type].fname); tsdbDestroyFile(&fileGroup.files[type]); } +} + +static void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) { + buf = taosEncodeFixedU32(buf, pInfo->offset); + buf = taosEncodeFixedU32(buf, pInfo->len); + buf = taosEncodeFixedU64(buf, pInfo->size); + buf = taosEncodeFixedU64(buf, pInfo->tombSize); + buf = taosEncodeFixedU32(buf, pInfo->totalBlocks); + buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks); + + return buf; +} + +static void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { + buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU32(buf, &(pInfo->len)); + buf = taosDecodeFixedU64(buf, &(pInfo->size)); + buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); + + return buf; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 4ecd87b250..fe7214f01e 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -29,6 +29,10 @@ #define TSDB_DATA_DIR_NAME "data" #define TSDB_META_FILE_NAME "meta" #define TSDB_META_FILE_INDEX 10000000 +#define IS_VALID_PRECISION(precision) \ + (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) +#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP +#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) typedef struct { int32_t totalLen; @@ -42,6 +46,24 @@ typedef struct { SSubmitBlk *pBlock; } SSubmitMsgIter; +static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); +static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg); +static int32_t tsdbUnsetRepoEnv(char *rootDir); +static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg); +static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg); +static char * tsdbGetCfgFname(char *rootDir); +static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg); +static void tsdbFreeRepo(STsdbRepo *pRepo); +static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); +static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows); +static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter); +static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); +static int tsdbRestoreInfo(STsdbRepo *pRepo); +static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); +static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression); +static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep); +static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables); + // Function declaration int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { if (mkdir(rootDir, 0755) < 0) { @@ -122,8 +144,6 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { tsdbCloseBufPool(pRepo); tsdbCloseMeta(pRepo); tsdbTrace("vgId:%d repository is closed", REPO_ID(pRepo)); - - return 0; } int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { @@ -136,7 +156,6 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * } SSubmitBlk *pBlock = NULL; - int32_t code = TSDB_CODE_SUCCESS; int32_t affectedrows = 0; TSKEY now = taosGetTimestamp(pRepo->config.precision); @@ -156,7 +175,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ // STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; uint32_t magic = 0; - char fname[256] = "\0"; + char *fname = NULL; struct stat fState; @@ -169,9 +188,9 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ if (name[0] == 0) { // get the file from index or after, but not larger than eindex int fid = (*index) / 3; - if (pFileH->numOfFGroups == 0 || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) { + if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) { if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) { - tsdbGetMetaFileName(pRepo->rootDir, fname); + fname = tsdbGetMetaFileName(pRepo->rootDir); *index = TSDB_META_FILE_INDEX; } else { tfree(sdup); @@ -179,7 +198,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ } } else { SFileGroup *pFGroup = - taosbsearch(&fid, pFileH->fGroup, pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey, TD_GE); + taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), compFGroupKey, TD_GE); if (pFGroup->fileId == fid) { strcpy(fname, pFGroup->files[(*index) % 3].fname); } else { @@ -195,10 +214,10 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ strcpy(name, fname + strlen(prefix)); } else { // get the named file at the specified index. If not there, return 0 if (*index == TSDB_META_FILE_INDEX) { // get meta file - tsdbGetMetaFileName(pRepo->rootDir, fname); + fname = tsdbGetMetaFileName(pRepo->rootDir); } else { int fid = (*index) / 3; - SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid); + SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); if (pFGroup == NULL) { // not found tfree(sdup); return 0; @@ -218,6 +237,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_ *size = fState.st_size; magic = *size; + tfree(fname); return magic; } @@ -229,7 +249,7 @@ void tsdbStartStream(TSDB_REPO_T *repo) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TALBE_UID(pTable), TABLE_TID(pTable), pTable->sql, - tsdbGetTableSchema(pMeta, pTable)); + tsdbGetTableSchema(pTable)); } } } @@ -270,7 +290,7 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) { tsdbAlterMaxTables(pRepo, pCfg->maxTables); } - if (configChanged) tsdbSaveConfig(pRepo); + if (configChanged) tsdbSaveConfig(pRepo->rootDir, &pRepo->config); return TSDB_CODE_SUCCESS; } @@ -302,7 +322,7 @@ char *tsdbGetDataFileName(STsdbRepo *pRepo, int fid, int type) { char *fname = malloc(tlen); if (fname == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; + return NULL; } sprintf(fname, "%s/%s/v%df%d.%s", pRepo->rootDir, TSDB_DATA_DIR_NAME, REPO_ID(pRepo), fid, tsdbFileSuffix[type]); @@ -331,6 +351,18 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { return 0; } +char *tsdbGetDataDirName(char *rootDir) { + int tlen = strlen(rootDir) + strlen(TSDB_DATA_DIR_NAME) + 2; + char *fname = calloc(1, tlen); + if (fname == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + snprintf(fname, tlen, "%s/%s", rootDir, TSDB_DATA_DIR_NAME); + return fname; +} + STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; } STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; } STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; } @@ -413,7 +445,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { } if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) { - tsdbError("vgId:%d invalid configuration! minRowsPerFileBlock %d maxRowsPerFileBlock %d" pCfg->tsdbId, + tsdbError("vgId:%d invalid configuration! minRowsPerFileBlock %d maxRowsPerFileBlock %d", pCfg->tsdbId, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock); goto _err; } @@ -488,7 +520,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { if (fd < 0) { tsdbError("vgId:%d failed to open file %s since %s", pCfg->tsdbId, fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - goto _err + goto _err; } if (twrite(fd, (void *)pCfg, sizeof(STsdbCfg)) < sizeof(STsdbCfg)) { @@ -560,17 +592,6 @@ static char *tsdbGetCfgFname(char *rootDir) { return fname; } -static char *tsdbGetDataDirName(char *rootDir) { - int tlen = strlen(rootDir) + strlen(TSDB_DATA_DIR_NAME) + 2; - char *fname = calloc(1, tlen); - if (fname == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - snprintf(fname, tlen, "%s/%s", rootDir, TSDB_DATA_DIR_NAME); - return fname; -} static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(STsdbRepo)); @@ -626,8 +647,8 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { tsdbFreeFileH(pRepo->tsdbFileH); tsdbFreeBufPool(pRepo->pPool); tsdbFreeMeta(pRepo->tsdbMeta); - tsdbFreeMemTable(pRepo->mem); - tsdbFreeMemTable(pRepo->imem); + // tsdbFreeMemTable(pRepo->mem); + // tsdbFreeMemTable(pRepo->imem); tfree(pRepo->rootDir); pthread_mutex_destroy(&pRepo->mutex); free(pRepo); @@ -660,7 +681,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY STsdbMeta *pMeta = pRepo->tsdbMeta; int64_t points = 0; - STable *pTable == tsdbGetTableByUid(pMeta, pBlock->uid); + STable *pTable = tsdbGetTableByUid(pMeta, pBlock->uid); if (pTable == NULL || TABLE_TID(pTable) != pBlock->tid) { tsdbError("vgId:%d failed to get table to insert data, uid " PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid, pBlock->tid); @@ -676,7 +697,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY // Check schema version int32_t tversion = pBlock->sversion; - STSchema *pSchema = tsdbGetTableSchema(pMeta, pTable); + STSchema *pSchema = tsdbGetTableSchema(pTable); ASSERT(pSchema != NULL); int16_t nversion = schemaVersion(pSchema); if (tversion > nversion) { @@ -701,9 +722,9 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY tsdbClearTableCfg(pTableCfg); rpcFreeCont(msg); - pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, tversion); + pSchema = tsdbGetTableSchemaByVersion(pTable, tversion); } else if (tversion < nversion) { - pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, tversion); + pSchema = tsdbGetTableSchemaByVersion(pTable, tversion); if (pSchema == NULL) { tsdbError("vgId:%d table %s tid %d invalid schema version %d from client", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), tversion); @@ -829,8 +850,8 @@ static void tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) { pRepo->tsdbFileH->maxFGroups = maxFiles; } else { pRepo->config.keep = keep; - pRepo->tsdbFileH->fGroup = realloc(pRepo->tsdbFileH->fGroup, sizeof(SFileGroup)); - if (pRepo->tsdbFileH->fGroup == NULL) { + pRepo->tsdbFileH->pFGroup = realloc(pRepo->tsdbFileH->pFGroup, sizeof(SFileGroup)); + if (pRepo->tsdbFileH->pFGroup == NULL) { // TODO: deal with the error } pRepo->tsdbFileH->maxFGroups = maxFiles; @@ -846,7 +867,6 @@ static void tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) { STsdbMeta *pMeta = pRepo->tsdbMeta; - pMeta->maxTables = maxTables; pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *)); memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables - oldMaxTables)); pRepo->config.maxTables = maxTables; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 0c8bca9441..f8bb3850e2 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -92,7 +92,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { pRepo->mem->tData[TABLE_TID(pTable)] = pTableData; } - ASSERT(pTableData != NULL) && pTableData->uid == TALBE_UID(pTable); + ASSERT((pTableData != NULL) && pTableData->uid == TALBE_UID(pTable)); if (tSkipListPut(pTableData->pData, pNode) == NULL) { tsdbFreeBytes(pRepo, (void *)pNode, bytes); @@ -107,7 +107,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData)); STSchema *pSchema = tsdbGetTableSchema(pTable); - if (schemaNCols(pSchema) > pMemTable->maxCols) pMemTable->maxCols = schemaNCols; + if (schemaNCols(pSchema) > pMemTable->maxCols) pMemTable->maxCols = schemaNCols(pSchema); if (schemaTLen(pSchema) > pMemTable->maxRowBytes) pMemTable->maxRowBytes = schemaTLen(pSchema); } @@ -167,6 +167,8 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) { tsdbRefMemTable(pRepo, *pIMem); if (tsdbUnlockRepo(pRepo) < 0) return -1; + + return 0; } // ---------------- LOCAL FUNCTIONS ---------------- @@ -174,11 +176,11 @@ static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo) { ASSERT(pRepo != NULL); if (pRepo->mem == NULL) return NULL; - SListNode *pNode = listTail(pRepo->mem); + SListNode *pNode = listTail(pRepo->mem->bufBlockList); if (pNode == NULL) return NULL; STsdbBufBlock *pBufBlock = NULL; - tdListNodeGetData(pMemTable->bufBlockList, pNode, (void *)(&pBufBlock)); + tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void *)(&pBufBlock)); return pBufBlock; } @@ -189,7 +191,7 @@ static void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int code = 0; if (pBufBlock != NULL && pBufBlock->remain < bytes) { - if (listNEles(pRepo->mem) >= pCfg->totalBlocks / 2) { // need to commit mem + if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 2) { // need to commit mem if (pRepo->imem) { code = pthread_join(pRepo->commitThread, NULL); if (code != 0) { @@ -358,6 +360,7 @@ static void *tsdbCommitData(void *arg) { STsdbCfg * pCfg = &pRepo->config; SDataCols * pDataCols = NULL; SCommitIter *iters = NULL; + SRWHelper whelper = {0}; ASSERT(pRepo->commit == 1); ASSERT(pMem != NULL); @@ -379,7 +382,7 @@ static void *tsdbCommitData(void *arg) { if ((pDataCols = tdNewDataCols(pMem->maxRowBytes, pMem->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", - REPO_ID(pRepo), pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + REPO_ID(pRepo), pMem->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); goto _exit; } diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index f96b0ae1da..2be68cd5de 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -42,7 +42,6 @@ static int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); static int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid); static int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup); static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup); -static void tsdbClearTableCfg(STableCfg *config); static void * tsdbEncodeTableName(void *buf, tstr *name); static void * tsdbDecodeTableName(void *buf, tstr **name); static void * tsdbEncodeTable(void *buf, STable *pTable); @@ -122,7 +121,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) { tsdbTrace("vgId:%d, table %s is dropped! tid:%d, uid:%" PRId64, pRepo->config.tsdbId, varDataVal(pTable->name), tableId.tid, tableId.uid); - if (tsdbRemoveTableFromMeta(pMeta, pTable, true) < 0) return -1; + tsdbRemoveTableFromMeta(pMeta, pTable, true); return 0; } @@ -132,7 +131,7 @@ void *tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, i STsdbMeta *pMeta = tsdbGetMeta(repo); STable * pTable = tsdbGetTableByUid(pMeta, id->uid); - STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable); + STSchema *pSchema = tsdbGetTableTagSchema(pTable); STColumn *pCol = tdGetColOfID(pSchema, colId); if (pCol == NULL) { return NULL; // No matched tag volumn @@ -255,7 +254,7 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { return -1; } if (TABLE_TID(pTable) != htonl(pMsg->tid)) { - terrno = TSDB_CODE_TDB_INVALID_TABLE_ID + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; return -1; } @@ -457,7 +456,7 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) { isChanged = true; } - STSchema *pTSchema = tsdbGetTableSchema(pMeta, pTable); + STSchema *pTSchema = tsdbGetTableSchema(pTable); if (schemaVersion(pTSchema) < schemaVersion(pCfg->schema)) { if (pTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) { pTable->schema[pTable->numOfSchemas++] = tdDupSchema(pCfg->schema); @@ -475,8 +474,8 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) { if (isChanged) { char *buf = malloc(1024 * 1024); int bufLen = 0; - tsdbEncodeTable(pTable, buf, &bufLen); - tsdbInsertMetaRecord(pMeta->mfh, pTable->tableId.uid, buf, bufLen); + tsdbEncodeTable(buf, pTable); + // tsdbInsertMetaRecord(pMeta->mfh, pTable->tableId.uid, buf, bufLen); free(buf); } @@ -966,7 +965,7 @@ static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) { return 0; } -static void tsdbClearTableCfg(STableCfg *config) { +void tsdbClearTableCfg(STableCfg *config) { if (config) { if (config->schema) tdFreeSchema(config->schema); if (config->tagSchema) tdFreeSchema(config->tagSchema); @@ -985,7 +984,7 @@ static void *tsdbEncodeTableName(void *buf, tstr *name) { memcpy(pBuf, name->data, name->len); pBuf = POINTER_SHIFT(pBuf, name->len); - return POINTER_DISTANCE(pBuf, buf); + return pBuf; } static void *tsdbDecodeTableName(void *buf, tstr **name) { diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index d1a41ce550..de81606a08 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -20,6 +20,39 @@ #include "tscompression.h" #include "tsdbMain.h" +static bool tsdbShouldCreateNewLast(SRWHelper *pHelper); +static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, int rowsToWrite, + SCompBlock *pCompBlock, bool isLast, bool isSuperBlock); +static int compareKeyBlock(const void *arg1, const void *arg2); +static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols); +static int compTSKEY(const void *key1, const void *key2); +static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize); +static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx, int rowsAdded); +static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx); +static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey); +static void tsdbResetHelperFileImpl(SRWHelper *pHelper); +static int tsdbInitHelperFile(SRWHelper *pHelper); +static void tsdbDestroyHelperFile(SRWHelper *pHelper); +static void tsdbResetHelperTableImpl(SRWHelper *pHelper); +static void tsdbResetHelperTable(SRWHelper *pHelper); +static void tsdbInitHelperTable(SRWHelper *pHelper); +static void tsdbDestroyHelperTable(SRWHelper *pHelper); +static void tsdbResetHelperBlockImpl(SRWHelper *pHelper); +static void tsdbResetHelperBlock(SRWHelper *pHelper); +static int tsdbInitHelperBlock(SRWHelper *pHelper); +static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type); +static int comparColIdCompCol(const void *arg1, const void *arg2); +static int comparColIdDataCol(const void *arg1, const void *arg2); +static int tsdbLoadSingleColumnData(int fd, SCompBlock *pCompBlock, SCompCol *pCompCol, void *buf); +static int tsdbLoadSingleBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, int16_t *colIds, int numOfColIds, + SDataCols *pDataCols); +static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, char *content, int32_t len, int8_t comp, int numOfRows, + int maxPoints, char *buffer, int bufferSize); +static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols); +static void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx); +static void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx); + // ---------------------- INTERNAL FUNCTIONS ---------------------- int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) { return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER); @@ -69,7 +102,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD]; pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA]; pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST]; - if (TSDB_HELPER_TYPE(pHelper) == TSDB_WRITE_HELPER) { + if (helperType(pHelper) == TSDB_WRITE_HELPER) { char *fnameDup = strdup(pHelper->files.headF.fname); if (fnameDup == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -437,7 +470,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) { return 0; } -int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) { +int tsdbloadcompdata(srwhelper *phelper, scompblock *pcompblock, void *target) { ASSERT(pCompBlock->numOfSubBlocks <= 1); int fd = (pCompBlock->last) ? pHelper->files.lastF.fd : pHelper->files.dataF.fd; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 736be5e788..d6bab10913 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -589,7 +589,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock); } - tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj)); + tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj)); if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) { SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo; @@ -836,7 +836,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, char* pData = NULL; // the schema version info is embeded in SDataRow - STSchema* pSchema = tsdbGetTableSchemaByVersion(pMeta, pTable, dataRowVersion(row)); + STSchema* pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row)); int32_t numOfRowCols = schemaNCols(pSchema); int32_t i = 0, j = 0; diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h index 9483c1cc35..fa2517e19a 100644 --- a/src/util/inc/tref.h +++ b/src/util/inc/tref.h @@ -31,7 +31,7 @@ typedef void (*_ref_fn_t)(const void* pObj); _ref_fn_t end; \ } _ref_func = {.begin = (s), .end = (e)}; -#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1)); +#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1)) #define T_REF_INC_WITH_CB(x, p) \ do { \ @@ -41,7 +41,7 @@ typedef void (*_ref_fn_t)(const void* pObj); } \ } while (0) -#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1)); +#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1)) #define T_REF_DEC_WITH_CB(x, p) \ do { \