|
|
|
@ -67,7 +67,7 @@ typedef struct {
|
|
|
|
|
*/
|
|
|
|
|
int8_t state; // ETsdbSmaStat
|
|
|
|
|
SHashObj *expiredWindows; // key: skey of time window, value: N/A
|
|
|
|
|
STSma * pSma;
|
|
|
|
|
STSma * pSma; // cache schema
|
|
|
|
|
} SSmaStatItem;
|
|
|
|
|
|
|
|
|
|
struct SSmaStat {
|
|
|
|
@ -81,8 +81,8 @@ struct SSmaStat {
|
|
|
|
|
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg);
|
|
|
|
|
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
|
|
|
|
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
|
|
|
|
|
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
|
|
|
|
|
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
|
|
|
|
|
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did);
|
|
|
|
|
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv);
|
|
|
|
|
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey);
|
|
|
|
|
static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
|
|
|
|
|
static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat);
|
|
|
|
@ -102,8 +102,8 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
|
|
|
|
|
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
|
|
|
|
|
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
|
|
|
|
|
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
|
|
|
|
|
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
|
|
|
|
|
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey);
|
|
|
|
|
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid);
|
|
|
|
|
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey);
|
|
|
|
|
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
|
|
|
|
|
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
|
|
|
|
|
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
|
|
|
|
@ -111,10 +111,11 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
|
|
|
|
|
|
|
|
|
|
// implementation
|
|
|
|
|
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
|
|
|
|
|
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]);
|
|
|
|
|
snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP,
|
|
|
|
|
TSDB_SMA_DNAME[smaType]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
|
|
|
|
|
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) {
|
|
|
|
|
SSmaEnv *pEnv = NULL;
|
|
|
|
|
|
|
|
|
|
pEnv = (SSmaEnv *)calloc(1, sizeof(SSmaEnv));
|
|
|
|
@ -137,12 +138,16 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pEnv->did = did;
|
|
|
|
|
|
|
|
|
|
if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tsdbFreeSmaEnv(pEnv);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (tsdbOpenBDBEnv(&pEnv->dbEnv, pEnv->path) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
char aname[TSDB_FILENAME_LEN] = {0};
|
|
|
|
|
tfsAbsoluteName(pTsdb->pTfs, did, path, aname);
|
|
|
|
|
if (tsdbOpenBDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tsdbFreeSmaEnv(pEnv);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
@ -150,14 +155,14 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
|
|
|
|
|
return pEnv;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
|
|
|
|
|
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv) {
|
|
|
|
|
if (!pEnv) {
|
|
|
|
|
terrno = TSDB_CODE_INVALID_PTR;
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (*pEnv == NULL) {
|
|
|
|
|
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
|
|
|
|
|
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path, did)) == NULL) {
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -296,7 +301,6 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
|
|
|
|
pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&pTsdb->pTSmaEnv) : atomic_load_ptr(&pTsdb->pRSmaEnv);
|
|
|
|
|
if (pEnv == NULL) {
|
|
|
|
|
char rname[TSDB_FILENAME_LEN] = {0};
|
|
|
|
|
char aname[TSDB_FILENAME_LEN] = {0}; // use TSDB_FILENAME_LEN currently
|
|
|
|
|
|
|
|
|
|
SDiskID did = {0};
|
|
|
|
|
tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
|
|
|
|
@ -305,14 +309,13 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);
|
|
|
|
|
tfsAbsoluteName(pTsdb->pTfs, did, rname, aname);
|
|
|
|
|
|
|
|
|
|
if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tsdbUnlockRepo(pTsdb);
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (tsdbInitSmaEnv(pTsdb, aname, &pEnv) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
if (tsdbInitSmaEnv(pTsdb, rname, did, &pEnv) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
tsdbUnlockRepo(pTsdb);
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
@ -339,11 +342,6 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: decode the msg from Stream Computing module => start
|
|
|
|
|
int64_t indexUid = SMA_TEST_INDEX_UID;
|
|
|
|
|
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
|
|
|
@ -354,6 +352,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) {
|
|
|
|
|
}
|
|
|
|
|
// TODO: decode the msg <= end
|
|
|
|
|
|
|
|
|
|
if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SSmaEnv * pEnv = REPO_SMA_ENV(pTsdb, smaType);
|
|
|
|
|
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
|
|
|
|
|
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
|
|
|
|
@ -660,14 +663,13 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
|
|
|
|
|
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid) {
|
|
|
|
|
STsdb *pTsdb = pSmaH->pTsdb;
|
|
|
|
|
ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL);
|
|
|
|
|
|
|
|
|
|
pSmaH->dFile.fid = fid;
|
|
|
|
|
|
|
|
|
|
char tSmaFile[TSDB_FILENAME_LEN] = {0};
|
|
|
|
|
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid);
|
|
|
|
|
snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
|
|
|
|
|
pSmaH->dFile.path = strdup(tSmaFile);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
@ -708,8 +710,9 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
|
|
|
|
|
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|
|
|
|
STsdbCfg * pCfg = REPO_CFG(pTsdb);
|
|
|
|
|
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
|
|
|
|
|
SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
|
|
|
|
|
|
|
|
|
|
if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) {
|
|
|
|
|
if (pEnv == NULL) {
|
|
|
|
|
terrno = TSDB_CODE_INVALID_PTR;
|
|
|
|
|
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
|
|
|
|
return terrno;
|
|
|
|
@ -727,6 +730,17 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int64_t indexUid = SMA_TEST_INDEX_UID;
|
|
|
|
|
char rPath[TSDB_FILENAME_LEN] = {0};
|
|
|
|
|
char aPath[TSDB_FILENAME_LEN] = {0};
|
|
|
|
|
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
|
|
|
|
|
tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath);
|
|
|
|
|
if (!taosCheckExistFile(aPath)) {
|
|
|
|
|
if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Step 1: Judge the storage level and days
|
|
|
|
|
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
|
|
|
|
|
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
|
|
|
|
@ -735,7 +749,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|
|
|
|
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
|
|
|
|
// - Set and open the DFile or the B+Tree file
|
|
|
|
|
// TODO: tsdbStartTSmaCommit();
|
|
|
|
|
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
|
|
|
|
|
tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid);
|
|
|
|
|
if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
|
|
|
|
|
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
|
|
|
|
|
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
|
|
|
|
@ -822,13 +836,16 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interv
|
|
|
|
|
* @brief Init of tSma FS
|
|
|
|
|
*
|
|
|
|
|
* @param pReadH
|
|
|
|
|
* @param indexUid
|
|
|
|
|
* @param skey
|
|
|
|
|
* @return int32_t
|
|
|
|
|
*/
|
|
|
|
|
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey) {
|
|
|
|
|
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pSmaH->pTsdb)->precision));
|
|
|
|
|
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey) {
|
|
|
|
|
STsdb *pTsdb = pSmaH->pTsdb;
|
|
|
|
|
|
|
|
|
|
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pTsdb)->precision));
|
|
|
|
|
char tSmaFile[TSDB_FILENAME_LEN] = {0};
|
|
|
|
|
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pSmaH->pTsdb), fid);
|
|
|
|
|
snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid);
|
|
|
|
|
pSmaH->dFile.path = strdup(tSmaFile);
|
|
|
|
|
pSmaH->smaFsIter.iter = 0;
|
|
|
|
|
pSmaH->smaFsIter.fid = fid;
|
|
|
|
@ -887,14 +904,16 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
|
|
|
|
|
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
|
|
|
|
|
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey,
|
|
|
|
|
int32_t nMaxResult) {
|
|
|
|
|
if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) {
|
|
|
|
|
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
|
|
|
|
|
|
|
|
|
|
if (!pEnv) {
|
|
|
|
|
terrno = TSDB_CODE_INVALID_PTR;
|
|
|
|
|
tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
|
|
|
|
|
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
|
|
|
|
|
tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pEnv));
|
|
|
|
|
SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid));
|
|
|
|
|
if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) {
|
|
|
|
|
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
|
|
|
|
|
// it's NULL.
|
|
|
|
@ -926,11 +945,12 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
|
|
|
|
tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv));
|
|
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
STSmaReadH tReadH = {0};
|
|
|
|
|
tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit);
|
|
|
|
|
tsdbCloseDBF(&tReadH.dFile);
|
|
|
|
|
|
|
|
|
|
tsdbInitTSmaFile(&tReadH, querySKey);
|
|
|
|
|
tsdbInitTSmaFile(&tReadH, indexUid, querySKey);
|
|
|
|
|
if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) {
|
|
|
|
|
tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
|
|
|
|
|
return TSDB_CODE_FAILED;
|
|
|
|
|