refactor and put sma file in smaUid dir
This commit is contained in:
parent
51ea336f08
commit
8a709fdbf3
|
@ -63,6 +63,7 @@ struct STsdb {
|
|||
#define REPO_ID(r) ((r)->vgId)
|
||||
#define REPO_CFG(r) (&(r)->config)
|
||||
#define REPO_FS(r) (r)->fs
|
||||
#define REPO_TFS(r) (r)->pTfs
|
||||
#define IS_REPO_LOCKED(r) (r)->repoLocked
|
||||
#define REPO_SMA_ENV(r, t) ((TSDB_SMA_TYPE_ROLLUP == (t)) ? (r)->pRSmaEnv : (r)->pTSmaEnv)
|
||||
|
||||
|
|
|
@ -42,23 +42,23 @@ typedef struct {
|
|||
typedef struct {
|
||||
STsdbFSMeta meta; // FS meta
|
||||
SArray * df; // data file array
|
||||
SArray * sf; // sma data file array v2(t|r)1900.index_name_1
|
||||
SArray * sf; // sma data file array v2f1900.index_name_1
|
||||
} SFSStatus;
|
||||
|
||||
/**
|
||||
* @brief Directory structure of .tsma data files.
|
||||
*
|
||||
* /vnode2/tsdb $ tree .sma/
|
||||
* .sma/
|
||||
* ├── v2t100.index_name_1
|
||||
* ├── v2t101.index_name_1
|
||||
* ├── v2t102.index_name_1
|
||||
* ├── v2t1900.index_name_3
|
||||
* ├── v2t1901.index_name_3
|
||||
* ├── v2t1902.index_name_3
|
||||
* ├── v2t200.index_name_2
|
||||
* ├── v2t201.index_name_2
|
||||
* └── v2t202.index_name_2
|
||||
* /vnode2/tsdb $ tree tsma/
|
||||
* tsma/
|
||||
* ├── v2f100.index_name_1
|
||||
* ├── v2f101.index_name_1
|
||||
* ├── v2f102.index_name_1
|
||||
* ├── v2f1900.index_name_3
|
||||
* ├── v2f1901.index_name_3
|
||||
* ├── v2f1902.index_name_3
|
||||
* ├── v2f200.index_name_2
|
||||
* ├── v2f201.index_name_2
|
||||
* └── v2f202.index_name_2
|
||||
*
|
||||
* 0 directories, 9 files
|
||||
*/
|
||||
|
|
|
@ -21,12 +21,14 @@ typedef struct SSmaEnv SSmaEnv;
|
|||
|
||||
struct SSmaEnv {
|
||||
pthread_rwlock_t lock;
|
||||
TDBEnv dbEnv;
|
||||
char * path;
|
||||
SDiskID did;
|
||||
TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level?
|
||||
char * path; // relative path
|
||||
SSmaStat * pStat;
|
||||
};
|
||||
|
||||
#define SMA_ENV_LOCK(env) ((env)->lock)
|
||||
#define SMA_ENV_DID(env) ((env)->did)
|
||||
#define SMA_ENV_ENV(env) ((env)->dbEnv)
|
||||
#define SMA_ENV_PATH(env) ((env)->path)
|
||||
#define SMA_ENV_STAT(env) ((env)->pStat)
|
||||
|
|
|
@ -67,7 +67,7 @@ typedef struct {
|
|||
*/
|
||||
int8_t state; // ETsdbSmaStat
|
||||
SHashObj *expiredWindows; // key: skey of time window, value: N/A
|
||||
STSma * pSma;
|
||||
STSma * pSma; // cache schema
|
||||
} SSmaStatItem;
|
||||
|
||||
struct SSmaStat {
|
||||
|
@ -81,8 +81,8 @@ struct SSmaStat {
|
|||
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
|
||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
||||
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did);
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv);
|
||||
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
|
||||
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
|
||||
static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
|
||||
|
@ -102,8 +102,8 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
|
|||
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
|
||||
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
|
||||
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
|
||||
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
|
||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey);
|
||||
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid);
|
||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
|
||||
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
|
||||
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
|
||||
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
|
||||
|
@ -111,10 +111,11 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
|
|||
|
||||
// implementation
|
||||
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]);
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP,
|
||||
TSDB_SMA_DNAME[smaType]);
|
||||
}
|
||||
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) {
|
||||
SSmaEnv *pEnv = NULL;
|
||||
|
||||
pEnv = (SSmaEnv *)calloc(1, sizeof(SSmaEnv));
|
||||
|
@ -137,12 +138,16 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pEnv->did = did;
|
||||
|
||||
if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) {
|
||||
tsdbFreeSmaEnv(pEnv);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsdbOpenBDBEnv(&pEnv->dbEnv, pEnv->path) != TSDB_CODE_SUCCESS) {
|
||||
char aname[TSDB_FILENAME_LEN] = {0};
|
||||
tfsAbsoluteName(pTsdb->pTfs, did, path, aname);
|
||||
if (tsdbOpenBDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
|
||||
tsdbFreeSmaEnv(pEnv);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -150,14 +155,14 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
|
|||
return pEnv;
|
||||
}
|
||||
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
|
||||
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv) {
|
||||
if (!pEnv) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (*pEnv == NULL) {
|
||||
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
|
||||
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path, did)) == NULL) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
@ -296,7 +301,6 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
|||
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&pTsdb->pTSmaEnv) : atomic_load_ptr(&pTsdb->pRSmaEnv);
|
||||
if (pEnv == NULL) {
|
||||
char rname[TSDB_FILENAME_LEN] = {0};
|
||||
char aname[TSDB_FILENAME_LEN] = {0}; // use TSDB_FILENAME_LEN currently
|
||||
|
||||
SDiskID did = {0};
|
||||
tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
|
||||
|
@ -305,14 +309,13 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);
|
||||
tfsAbsoluteName(pTsdb->pTfs, did, rname, aname);
|
||||
|
||||
if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tsdbInitSmaEnv(pTsdb, aname, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||
if (tsdbInitSmaEnv(pTsdb, rname, did, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -339,11 +342,6 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// TODO: decode the msg from Stream Computing module => start
|
||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
||||
|
@ -354,6 +352,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
|
|||
}
|
||||
// TODO: decode the msg <= end
|
||||
|
||||
if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSmaEnv * pEnv = REPO_SMA_ENV(pTsdb, smaType);
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
||||
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
|
||||
|
@ -660,14 +663,13 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
|
||||
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid) {
|
||||
STsdb *pTsdb = pSmaH->pTsdb;
|
||||
ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL);
|
||||
|
||||
pSmaH->dFile.fid = fid;
|
||||
|
||||
char tSmaFile[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid);
|
||||
snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
|
||||
pSmaH->dFile.path = strdup(tSmaFile);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -708,8 +710,9 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
|
|||
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||
STsdbCfg * pCfg = REPO_CFG(pTsdb);
|
||||
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
|
||||
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
|
||||
|
||||
if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) {
|
||||
if (pEnv == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
||||
return terrno;
|
||||
|
@ -727,6 +730,17 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||
char rPath[TSDB_FILENAME_LEN] = {0};
|
||||
char aPath[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
|
||||
tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
|
||||
if (!taosCheckExistFile(aPath)) {
|
||||
if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 1: Judge the storage level and days
|
||||
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
|
||||
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
|
||||
|
@ -735,7 +749,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
||||
// - Set and open the DFile or the B+Tree file
|
||||
// TODO: tsdbStartTSmaCommit();
|
||||
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
|
||||
tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
|
||||
if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
|
||||
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
|
||||
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
|
||||
|
@ -822,13 +836,16 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interv
|
|||
* @brief Init of tSma FS
|
||||
*
|
||||
* @param pReadH
|
||||
* @param indexUid
|
||||
* @param skey
|
||||
* @return int32_t
|
||||
*/
|
||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey) {
|
||||
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pSmaH->pTsdb)->precision));
|
||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey) {
|
||||
STsdb *pTsdb = pSmaH->pTsdb;
|
||||
|
||||
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pTsdb)->precision));
|
||||
char tSmaFile[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pSmaH->pTsdb), fid);
|
||||
snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
|
||||
pSmaH->dFile.path = strdup(tSmaFile);
|
||||
pSmaH->smaFsIter.iter = 0;
|
||||
pSmaH->smaFsIter.fid = fid;
|
||||
|
@ -887,14 +904,16 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
|
|||
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
|
||||
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
|
||||
int32_t nMaxResult) {
|
||||
if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) {
|
||||
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
|
||||
|
||||
if (!pEnv) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
|
||||
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
|
||||
tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pEnv));
|
||||
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
|
||||
if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) {
|
||||
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
|
||||
// it's NULL.
|
||||
|
@ -926,11 +945,12 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
|||
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
|
||||
|
||||
#endif
|
||||
|
||||
STSmaReadH tReadH = {0};
|
||||
tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit);
|
||||
tsdbCloseDBF(&tReadH.dFile);
|
||||
|
||||
tsdbInitTSmaFile(&tReadH, querySKey);
|
||||
tsdbInitTSmaFile(&tReadH, indexUid, querySKey);
|
||||
if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) {
|
||||
tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
|
||||
return TSDB_CODE_FAILED;
|
||||
|
|
|
@ -33,6 +33,7 @@ int main(int argc, char **argv) {
|
|||
return RUN_ALL_TESTS();
|
||||
}
|
||||
|
||||
#if 1
|
||||
TEST(testCase, tSma_Meta_Encode_Decode_Test) {
|
||||
// encode
|
||||
STSma tSma = {0};
|
||||
|
@ -88,6 +89,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
|
|||
tdDestroyTSma(&tSma);
|
||||
tdDestroyTSmaWrapper(&dstTSmaWrapper);
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
||||
|
|
Loading…
Reference in New Issue