add tsdbBDBImpl files
This commit is contained in:
parent
959bda646b
commit
421d087a8f
|
@ -61,6 +61,12 @@ typedef enum {
|
||||||
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
|
TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired
|
||||||
} ETsdbSmaStat;
|
} ETsdbSmaStat;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA
|
||||||
|
TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA
|
||||||
|
TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA
|
||||||
|
} ETsdbSmaType;
|
||||||
|
|
||||||
extern char *qtypeStr[];
|
extern char *qtypeStr[];
|
||||||
|
|
||||||
#define TSDB_PORT_HTTP 11
|
#define TSDB_PORT_HTTP 11
|
||||||
|
|
|
@ -96,6 +96,8 @@ int tsdbCommit(STsdb *pTsdb);
|
||||||
*/
|
*/
|
||||||
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
|
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
|
||||||
|
|
||||||
|
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Insert RSma(Time-range-wise Rollup SMA) data.
|
* @brief Insert RSma(Time-range-wise Rollup SMA) data.
|
||||||
*
|
*
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_DB_DEF_H_
|
||||||
|
#define _TD_TSDB_DB_DEF_H_
|
||||||
|
|
||||||
|
#include "db.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct SDBFile SDBFile;
|
||||||
|
typedef DB_ENV* TDBEnv;
|
||||||
|
|
||||||
|
int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF);
|
||||||
|
void tsdbCloseDBF(SDBFile* pDBF);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_TSDB_DB_DEF_H_*/
|
|
@ -43,6 +43,8 @@ extern "C" {
|
||||||
|
|
||||||
struct STsdb {
|
struct STsdb {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
bool repoLocked;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
char * path;
|
char * path;
|
||||||
STsdbCfg config;
|
STsdbCfg config;
|
||||||
STsdbMemTable * mem;
|
STsdbMemTable * mem;
|
||||||
|
@ -52,12 +54,18 @@ struct STsdb {
|
||||||
STsdbFS * fs;
|
STsdbFS * fs;
|
||||||
SMeta * pMeta;
|
SMeta * pMeta;
|
||||||
STfs * pTfs;
|
STfs * pTfs;
|
||||||
SSmaStat * pSmaStat;
|
SSmaEnv * pTSmaEnv;
|
||||||
|
SSmaEnv * pRSmaEnv;
|
||||||
|
// SSmaStat * pSmaStat;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define REPO_ID(r) ((r)->vgId)
|
#define REPO_ID(r) ((r)->vgId)
|
||||||
#define REPO_CFG(r) (&(r)->config)
|
#define REPO_CFG(r) (&(r)->config)
|
||||||
#define REPO_FS(r) (r)->fs
|
#define REPO_FS(r) (r)->fs
|
||||||
|
#define IS_REPO_LOCKED(r) (r)->repoLocked
|
||||||
|
|
||||||
|
int tsdbLockRepo(STsdb *pTsdb);
|
||||||
|
int tsdbUnlockRepo(STsdb *pTsdb);
|
||||||
|
|
||||||
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) {
|
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) {
|
||||||
return pTable->pSchema;
|
return pTable->pSchema;
|
||||||
|
|
|
@ -329,21 +329,23 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
|
||||||
// =============== SDFileSet
|
// =============== SDFileSet
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int fid;
|
int fid;
|
||||||
int8_t state; // -128~127
|
int8_t state; // -128~127
|
||||||
uint8_t ver; // 0~255, DFileSet version
|
uint8_t ver; // 0~255, DFileSet version
|
||||||
uint16_t reserve;
|
uint16_t reserve;
|
||||||
SDFile files[TSDB_FILE_MAX];
|
SDFile files[TSDB_FILE_MAX];
|
||||||
} SDFileSet;
|
} SDFileSet;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int fid;
|
int fid;
|
||||||
int8_t state;
|
int8_t state;
|
||||||
uint8_t ver;
|
uint8_t ver;
|
||||||
|
uint16_t reserve;
|
||||||
#if 0
|
#if 0
|
||||||
SDFInfo info;
|
SDFInfo info;
|
||||||
#endif
|
#endif
|
||||||
STfsFile f;
|
STfsFile f;
|
||||||
TdFilePtr pFile;
|
TdFilePtr pFile;
|
||||||
|
|
||||||
} SSFile; // files split by days with fid
|
} SSFile; // files split by days with fid
|
||||||
|
|
||||||
#define TSDB_LATEST_FSET_VER 0
|
#define TSDB_LATEST_FSET_VER 0
|
||||||
|
|
|
@ -17,27 +17,40 @@
|
||||||
#define _TD_TSDB_SMA_H_
|
#define _TD_TSDB_SMA_H_
|
||||||
|
|
||||||
typedef struct SSmaStat SSmaStat;
|
typedef struct SSmaStat SSmaStat;
|
||||||
|
typedef struct SSmaEnv SSmaEnv;
|
||||||
|
|
||||||
|
|
||||||
|
struct SSmaEnv {
|
||||||
|
pthread_rwlock_t lock;
|
||||||
|
char * path;
|
||||||
|
SSmaStat * pStat;
|
||||||
|
};
|
||||||
|
|
||||||
|
#define SMA_ENV_LOCK(env) ((env)->lock)
|
||||||
|
#define SMA_ENV_PATH(env) ((env)->path)
|
||||||
|
#define SMA_ENV_STAT(env) ((env)->pStat)
|
||||||
|
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
|
||||||
|
|
||||||
// insert/update interface
|
// insert/update interface
|
||||||
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
|
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
|
||||||
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
|
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
|
||||||
|
|
||||||
|
|
||||||
// query interface
|
// query interface
|
||||||
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
||||||
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult);
|
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult);
|
||||||
|
|
||||||
// management interface
|
// management interface
|
||||||
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg);
|
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg);
|
||||||
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
|
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||||
|
void * tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||||
#if 0
|
#if 0
|
||||||
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
|
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
|
||||||
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
|
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// internal func
|
// internal func
|
||||||
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
|
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
len += taosEncodeFixedI64(pData, tableUid);
|
len += taosEncodeFixedI64(pData, tableUid);
|
||||||
|
@ -46,4 +59,31 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId,
|
||||||
return len;
|
return len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbRLockSma(SSmaEnv *pEnv) {
|
||||||
|
int code = pthread_rwlock_rdlock(&(pEnv->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbWLockSma(SSmaEnv *pEnv) {
|
||||||
|
int code = pthread_rwlock_wrlock(&(pEnv->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbUnLockSma(SSmaEnv *pEnv) {
|
||||||
|
int code = pthread_rwlock_unlock(&(pEnv->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
#endif /* _TD_TSDB_SMA_H_ */
|
#endif /* _TD_TSDB_SMA_H_ */
|
|
@ -12,3 +12,115 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define ALLOW_FORBID_FUNC
|
||||||
|
#include "db.h"
|
||||||
|
|
||||||
|
#include "tcoding.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "tsdbDBDef.h"
|
||||||
|
|
||||||
|
#define IMPL_WITH_LOCK 1
|
||||||
|
|
||||||
|
struct SDBFile {
|
||||||
|
DB * pDB;
|
||||||
|
char *path;
|
||||||
|
};
|
||||||
|
|
||||||
|
static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path);
|
||||||
|
static void tsdbCloseBDBEnv(DB_ENV *pEnv);
|
||||||
|
static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup);
|
||||||
|
static void tsdbCloseBDBDb(DB *pDB);
|
||||||
|
|
||||||
|
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
|
||||||
|
|
||||||
|
int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) {
|
||||||
|
// TDBEnv is shared by a group of SDBFile
|
||||||
|
ASSERT(pEnv != NULL);
|
||||||
|
|
||||||
|
// Open DBF
|
||||||
|
if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) {
|
||||||
|
tsdbCloseBDBDb(pDBF->pDB);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tsdbFreeDBF(SDBFile *pDBF) {
|
||||||
|
if (pDBF) {
|
||||||
|
free(pDBF);
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbCloseDBF(SDBFile *pDBF) {
|
||||||
|
if (pDBF->pDB) {
|
||||||
|
tsdbCloseBDBDb(pDBF->pDB);
|
||||||
|
pDBF->pDB = tsdbFreeDBF(pDBF);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
|
||||||
|
int ret = 0;
|
||||||
|
DB_ENV *pEnv = NULL;
|
||||||
|
|
||||||
|
if (path == NULL) return 0;
|
||||||
|
|
||||||
|
ret = db_env_create(&pEnv, 0);
|
||||||
|
if (ret != 0) {
|
||||||
|
BDB_PERR("Failed to create tsdb env", ret);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
|
||||||
|
if (ret != 0) {
|
||||||
|
BDB_PERR("Failed to open tsdb env", ret);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppEnv = pEnv;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tsdbCloseBDBEnv(DB_ENV *pEnv) {
|
||||||
|
if (pEnv) {
|
||||||
|
pEnv->close(pEnv, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) {
|
||||||
|
int ret;
|
||||||
|
DB *pDB;
|
||||||
|
|
||||||
|
ret = db_create(&(pDB), pEnv, 0);
|
||||||
|
if (ret != 0) {
|
||||||
|
BDB_PERR("Failed to create DBP", ret);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isDup) {
|
||||||
|
ret = pDB->set_flags(pDB, DB_DUPSORT);
|
||||||
|
if (ret != 0) {
|
||||||
|
BDB_PERR("Failed to set DB flags", ret);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0);
|
||||||
|
if (ret) {
|
||||||
|
BDB_PERR("Failed to open DBF", ret);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppDB = pDB;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tsdbCloseBDBDb(DB *pDB) {
|
||||||
|
if (pDB) {
|
||||||
|
pDB->close(pDB, 0);
|
||||||
|
}
|
||||||
|
}
|
|
@ -80,6 +80,8 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
|
||||||
pTsdb->pmaf = pMAF;
|
pTsdb->pmaf = pMAF;
|
||||||
pTsdb->pMeta = pMeta;
|
pTsdb->pMeta = pMeta;
|
||||||
pTsdb->pTfs = pTfs;
|
pTsdb->pTfs = pTfs;
|
||||||
|
pTsdb->pTSmaEnv = NULL;
|
||||||
|
pTsdb->pRSmaEnv = NULL;
|
||||||
|
|
||||||
pTsdb->fs = tsdbNewFS(pTsdbCfg);
|
pTsdb->fs = tsdbNewFS(pTsdbCfg);
|
||||||
|
|
||||||
|
@ -88,8 +90,9 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
|
||||||
|
|
||||||
static void tsdbFree(STsdb *pTsdb) {
|
static void tsdbFree(STsdb *pTsdb) {
|
||||||
if (pTsdb) {
|
if (pTsdb) {
|
||||||
|
tsdbFreeSmaEnv(pTsdb->pRSmaEnv);
|
||||||
|
tsdbFreeSmaEnv(pTsdb->pTSmaEnv);
|
||||||
tsdbFreeFS(pTsdb->fs);
|
tsdbFreeFS(pTsdb->fs);
|
||||||
tsdbDestroySmaState(pTsdb->pSmaStat);
|
|
||||||
tfree(pTsdb->path);
|
tfree(pTsdb->path);
|
||||||
free(pTsdb);
|
free(pTsdb);
|
||||||
}
|
}
|
||||||
|
@ -105,6 +108,30 @@ static void tsdbCloseImpl(STsdb *pTsdb) {
|
||||||
tsdbCloseFS(pTsdb);
|
tsdbCloseFS(pTsdb);
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbLockRepo(STsdb *pTsdb) {
|
||||||
|
int code = pthread_mutex_lock(&pTsdb->mutex);
|
||||||
|
if (code != 0) {
|
||||||
|
tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pTsdb), strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pTsdb->repoLocked = true;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbUnlockRepo(STsdb *pTsdb) {
|
||||||
|
ASSERT(IS_REPO_LOCKED(pTsdb));
|
||||||
|
pTsdb->repoLocked = false;
|
||||||
|
int code = pthread_mutex_unlock(&pTsdb->mutex);
|
||||||
|
if (code != 0) {
|
||||||
|
tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pTsdb), strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
/*
|
/*
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "tsdbDef.h"
|
#include "tsdbDef.h"
|
||||||
|
|
||||||
#define SMA_STORAGE_TSDB_DAYS 30
|
#define SMA_STORAGE_TSDB_DAYS 30
|
||||||
|
#define SMA_STORAGE_TSDB_TIMES 30
|
||||||
#define SMA_STORAGE_SPLIT_HOURS 24
|
#define SMA_STORAGE_SPLIT_HOURS 24
|
||||||
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
|
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
|
||||||
|
|
||||||
|
@ -68,19 +69,104 @@ struct SSmaStat {
|
||||||
};
|
};
|
||||||
|
|
||||||
// declaration of static functions
|
// declaration of static functions
|
||||||
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
|
||||||
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
|
||||||
static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit);
|
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
|
||||||
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
|
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
|
||||||
static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen);
|
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
||||||
|
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
||||||
|
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
|
||||||
|
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
|
||||||
|
static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen);
|
||||||
|
|
||||||
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
|
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
|
||||||
|
static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel);
|
||||||
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
|
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
|
||||||
|
|
||||||
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
||||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin);
|
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin);
|
||||||
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin);
|
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin);
|
||||||
|
|
||||||
|
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
|
||||||
|
SSmaEnv *pEnv = NULL;
|
||||||
|
|
||||||
|
pEnv = (SSmaEnv *)calloc(1, sizeof(SSmaEnv));
|
||||||
|
if (pEnv == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int code = pthread_rwlock_init(&(pEnv->lock), NULL);
|
||||||
|
if (code) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
free(pEnv);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(path && (strlen(path) > 0));
|
||||||
|
pEnv->path = strdup(path);
|
||||||
|
if (pEnv->path == NULL) {
|
||||||
|
tsdbFreeSmaEnv(pEnv);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbFreeSmaEnv(pEnv);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pEnv;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
|
||||||
|
if (!pEnv) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEnv && *pEnv) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbLockRepo(pTsdb) != 0) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*pEnv == NULL) {
|
||||||
|
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
|
||||||
|
tsdbUnlockRepo(pTsdb);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbUnlockRepo(pTsdb) != 0) {
|
||||||
|
tsdbFreeSmaEnv(*pEnv);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Release resources allocated for its member fields, not including itself.
|
||||||
|
*
|
||||||
|
* @param pSmaEnv
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) {
|
||||||
|
if (pSmaEnv) {
|
||||||
|
tsdbDestroySmaState(pSmaEnv->pStat);
|
||||||
|
tfree(pSmaEnv->pStat);
|
||||||
|
tfree(pSmaEnv->path);
|
||||||
|
pthread_rwlock_destroy(&(pSmaEnv->lock));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) {
|
||||||
|
tsdbDestroySmaEnv(pSmaEnv);
|
||||||
|
tfree(pSmaEnv);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) {
|
||||||
ASSERT(pSmaStat != NULL);
|
ASSERT(pSmaStat != NULL);
|
||||||
|
|
||||||
|
@ -125,6 +211,12 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
|
||||||
return pItem;
|
return pItem;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Release resources allocated for its member fields, not including itself.
|
||||||
|
*
|
||||||
|
* @param pSmaStat
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
|
int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
|
||||||
if (pSmaStat) {
|
if (pSmaStat) {
|
||||||
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
|
||||||
|
@ -135,30 +227,42 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
|
||||||
item = taosHashIterate(pSmaStat->smaStatItems, item);
|
item = taosHashIterate(pSmaStat->smaStatItems, item);
|
||||||
}
|
}
|
||||||
taosHashCleanup(pSmaStat->smaStatItems);
|
taosHashCleanup(pSmaStat->smaStatItems);
|
||||||
free(pSmaStat);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Update expired window according to msg from stream computing module.
|
* @brief Update expired window according to msg from stream computing module.
|
||||||
*
|
*
|
||||||
* @param pTsdb
|
* @param pTsdb
|
||||||
* @param msg
|
* @param smaType ETsdbSmaType
|
||||||
* @return int32_t
|
* @param msg
|
||||||
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
||||||
if (msg == NULL) {
|
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||||
|
SSmaEnv * pEnv = NULL;
|
||||||
|
|
||||||
|
if (!msg || !pTsdb->pMeta) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
// lazy mode
|
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||||
if (tsdbInitSmaStat(&pTsdb->pSmaStat) != TSDB_CODE_SUCCESS) {
|
pEnv = pTsdb->pTSmaEnv;
|
||||||
|
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||||
|
pEnv = pTsdb->pRSmaEnv;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/";
|
||||||
|
if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: decode the msg => start
|
// TODO: decode the msg => start
|
||||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||||
const char * indexName = SMA_TEST_INDEX_NAME;
|
// const char * indexName = SMA_TEST_INDEX_NAME;
|
||||||
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10;
|
||||||
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
|
TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE];
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
@ -167,9 +271,9 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: decode the msg <= end
|
// TODO: decode the msg <= end
|
||||||
SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems;
|
SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv);
|
||||||
|
|
||||||
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName));
|
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid));
|
||||||
if (pItem == NULL) {
|
if (pItem == NULL) {
|
||||||
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
|
pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state
|
||||||
if (pItem == NULL) {
|
if (pItem == NULL) {
|
||||||
|
@ -188,7 +292,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
||||||
pItem->pSma = pSma;
|
pItem->pSma = pSma;
|
||||||
|
|
||||||
// TODO: change indexName to indexUid
|
// TODO: change indexName to indexUid
|
||||||
if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) {
|
if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
|
||||||
// If error occurs during put smaStatItem, free the resources of pItem
|
// If error occurs during put smaStatItem, free the resources of pItem
|
||||||
taosHashCleanup(pItem->expiredWindows);
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
free(pItem);
|
free(pItem);
|
||||||
|
@ -207,7 +311,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
||||||
// windows failed to put into hash table.
|
// windows failed to put into hash table.
|
||||||
taosHashCleanup(pItem->expiredWindows);
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
tfree(pItem->pSma);
|
tfree(pItem->pSma);
|
||||||
taosHashRemove(pItemsHash, indexName, sizeof(indexName));
|
taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -215,11 +319,12 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey) {
|
static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY skey) {
|
||||||
SSmaStatItem *pItem = NULL;
|
SSmaStatItem *pItem = NULL;
|
||||||
|
|
||||||
if (pTsdb->pSmaStat && pTsdb->pSmaStat->smaStatItems) {
|
// TODO: If HASH_ENTRY_LOCK used, whether rwlock needed to handle cases of removing hashNode?
|
||||||
pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &indexUid, sizeof(indexUid));
|
if (pStat && pStat->smaStatItems) {
|
||||||
|
pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pItem != NULL) {
|
if (pItem != NULL) {
|
||||||
|
@ -241,7 +346,7 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey
|
||||||
* @param intervalUnit
|
* @param intervalUnit
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) {
|
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
|
||||||
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
|
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
|
||||||
switch (intervalUnit) {
|
switch (intervalUnit) {
|
||||||
case TD_TIME_UNIT_HOUR:
|
case TD_TIME_UNIT_HOUR:
|
||||||
|
@ -422,12 +527,24 @@ static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWra
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
|
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
|
||||||
// TODO
|
STsdb *pTsdb = pSmaH->pTsdb;
|
||||||
|
|
||||||
pSmaH->pDFile = "tSma_interval_file_name";
|
pSmaH->pDFile = "tSma_interval_file_name";
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) {
|
||||||
|
STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb);
|
||||||
|
int32_t daysPerFile = pCfg->daysPerFile;
|
||||||
|
|
||||||
|
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
|
||||||
|
int32_t days = 30 * (pSmaH->interval / tsTickPerDay[pCfg->precision]);
|
||||||
|
daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
|
||||||
|
}
|
||||||
|
|
||||||
|
return daysPerFile;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Insert/Update Time-range-wise SMA data.
|
* @brief Insert/Update Time-range-wise SMA data.
|
||||||
|
@ -454,23 +571,26 @@ int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 1: Judge the storage level
|
if (!pTsdb->pTSmaEnv) {
|
||||||
int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit);
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Step 1: Judge the storage level and days
|
||||||
|
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
|
||||||
|
int32_t daysPerFile = tsdbGetTSmaDays(&tSmaH, storageLevel);
|
||||||
|
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
|
||||||
|
|
||||||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
||||||
// - Set and open the DFile or the B+Tree file
|
// - Set and open the DFile or the B+Tree file
|
||||||
|
|
||||||
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
|
|
||||||
|
|
||||||
// Save all the TSma data to one file
|
|
||||||
// TODO: tsdbStartTSmaCommit();
|
// TODO: tsdbStartTSmaCommit();
|
||||||
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
|
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
|
||||||
|
|
||||||
tsdbInsertTSmaDataSection(&tSmaH, pData);
|
tsdbInsertTSmaDataSection(&tSmaH, pData);
|
||||||
// TODO:tsdbEndTSmaCommit();
|
// TODO:tsdbEndTSmaCommit();
|
||||||
|
|
||||||
// reset the SSmaStat
|
// reset the SSmaStat
|
||||||
tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey);
|
tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -496,7 +616,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 1: Judge the storage level
|
// Step 1: Judge the storage level
|
||||||
int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit);
|
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
|
||||||
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
|
int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile;
|
||||||
|
|
||||||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
||||||
|
@ -511,7 +631,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||||
// TODO:tsdbEndTSmaCommit();
|
// TODO:tsdbEndTSmaCommit();
|
||||||
|
|
||||||
// reset the SSmaStat
|
// reset the SSmaStat
|
||||||
tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey);
|
tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -540,7 +660,7 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapp
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
|
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
|
||||||
int32_t storageLevel = 0; //tsdbJudgeStorageLevel(param->interval, param->intervalUnit);
|
int32_t storageLevel = 0; // tsdbGetSmaStorageLevel(param->interval, param->intervalUnit);
|
||||||
int32_t daysPerFile =
|
int32_t daysPerFile =
|
||||||
storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile;
|
storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile;
|
||||||
pReadH->storageLevel = storageLevel;
|
pReadH->storageLevel = storageLevel;
|
||||||
|
@ -594,7 +714,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
|
||||||
*/
|
*/
|
||||||
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) {
|
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) {
|
||||||
SSmaStatItem *pItem =
|
SSmaStatItem *pItem =
|
||||||
(SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &pData->indexUid, sizeof(pData->indexUid));
|
(SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &pData->indexUid, sizeof(pData->indexUid));
|
||||||
if (pItem == NULL) {
|
if (pItem == NULL) {
|
||||||
// mark all window as expired and notify query module to query raw TS data.
|
// mark all window as expired and notify query module to query raw TS data.
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -51,6 +51,14 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
|
||||||
|
tsdbWarn("vgId:%d update expired window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Insert Time-range-wise Rollup Sma(RSma) data
|
* @brief Insert Time-range-wise Rollup Sma(RSma) data
|
||||||
*
|
*
|
||||||
|
|
|
@ -221,12 +221,60 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
TEST(testCase, tSmaInsertTest) {
|
TEST(testCase, tSmaInsertTest) {
|
||||||
|
// prepare meta
|
||||||
|
const char * smaIndexName1 = "sma_index_test_1";
|
||||||
|
const char * smaIndexName2 = "sma_index_test_2";
|
||||||
|
const char * timezone = "Asia/Shanghai";
|
||||||
|
const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;";
|
||||||
|
const char * tagsFilter = "I'm tags filter";
|
||||||
|
const char * smaTestDir = "./smaTest";
|
||||||
|
const tb_uid_t tbUid = 1234567890;
|
||||||
|
const int64_t indexUid1 = 2000000001;
|
||||||
|
const int64_t indexUid2 = 2000000002;
|
||||||
|
const uint32_t nCntTSma = 2;
|
||||||
|
// encode
|
||||||
|
STSma tSma = {0};
|
||||||
|
tSma.version = 0;
|
||||||
|
tSma.intervalUnit = TD_TIME_UNIT_DAY;
|
||||||
|
tSma.interval = 1;
|
||||||
|
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
|
||||||
|
tSma.sliding = 0;
|
||||||
|
tSma.indexUid = indexUid1;
|
||||||
|
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
|
||||||
|
tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN);
|
||||||
|
tSma.tableUid = tbUid;
|
||||||
|
|
||||||
|
tSma.exprLen = strlen(expr);
|
||||||
|
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
|
||||||
|
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
|
||||||
|
|
||||||
|
tSma.tagsFilterLen = strlen(tagsFilter);
|
||||||
|
tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1);
|
||||||
|
tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1);
|
||||||
|
|
||||||
|
SMeta * pMeta = NULL;
|
||||||
|
STSma * pSmaCfg = &tSma;
|
||||||
|
const SMetaCfg *pMetaCfg = &defaultMetaOptions;
|
||||||
|
|
||||||
|
taosRemoveDir(smaTestDir);
|
||||||
|
|
||||||
|
pMeta = metaOpen(smaTestDir, pMetaCfg, NULL);
|
||||||
|
assert(pMeta != NULL);
|
||||||
|
// save index 1
|
||||||
|
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
||||||
|
|
||||||
|
|
||||||
|
// insert data
|
||||||
const int64_t indexUid = 2000000002;
|
const int64_t indexUid = 2000000002;
|
||||||
STSmaDataWrapper *pSmaData = NULL;
|
STSmaDataWrapper *pSmaData = NULL;
|
||||||
STsdb tsdb = {0};
|
STsdb tsdb = {0};
|
||||||
STsdbCfg * pCfg = &tsdb.config;
|
STsdbCfg * pCfg = &tsdb.config;
|
||||||
|
|
||||||
pCfg->daysPerFile = 1;
|
pCfg->daysPerFile = 1;
|
||||||
|
tsdb.pMeta = pMeta;
|
||||||
|
|
||||||
|
char *msg = (char *)calloc(100, 1);
|
||||||
|
EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
|
||||||
|
|
||||||
// init
|
// init
|
||||||
int32_t allocCnt = 0;
|
int32_t allocCnt = 0;
|
||||||
|
@ -277,8 +325,11 @@ TEST(testCase, tSmaInsertTest) {
|
||||||
// execute
|
// execute
|
||||||
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
|
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
// release
|
// release data
|
||||||
taosTZfree(buf);
|
taosTZfree(buf);
|
||||||
|
// release meta
|
||||||
|
tdDestroyTSma(&tSma);
|
||||||
|
metaClose(pMeta);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue