test insert and query

This commit is contained in:
Cary Xu 2022-03-14 17:38:10 +08:00
parent abbd818fe6
commit 65870538fd
7 changed files with 314 additions and 136 deletions

View File

@ -95,7 +95,6 @@ int tsdbCommit(STsdb *pTsdb);
* @return int32_t * @return int32_t
*/ */
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
/** /**
@ -107,6 +106,12 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg);
*/ */
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
int32_t nMaxResult);
// STsdbCfg // STsdbCfg
int tsdbOptionsInit(STsdbCfg *); int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *);

View File

@ -35,6 +35,7 @@ void tsdbCloseDBF(SDBFile* pDBF);
int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path); int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path);
void tsdbCloseBDBEnv(DB_ENV* pEnv); void tsdbCloseBDBEnv(DB_ENV* pEnv);
int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize); int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize);
void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -27,29 +27,18 @@ struct SSmaEnv {
}; };
#define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path) #define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_STAT(env) ((env)->pStat) #define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
// insert/update interface
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
// query interface
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult);
// management interface
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg);
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
void * tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
#if 0 #if 0
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
#endif #endif
// internal func // internal func
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) {
int32_t len = 0; int32_t len = 0;

View File

@ -29,7 +29,7 @@ static void tsdbCloseBDBDb(DB *pDB);
#define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code)) #define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code))
int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) {
// TDBEnv is shared by a group of SDBFile // TDBEnv is shared by a group of SDBFile
if (!pEnv) { if (!pEnv) {
terrno = TSDB_CODE_INVALID_PTR; terrno = TSDB_CODE_INVALID_PTR;
@ -46,18 +46,12 @@ int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) {
return 0; return 0;
} }
static void *tsdbFreeDBF(SDBFile *pDBF) {
if (pDBF) {
free(pDBF);
}
return NULL;
}
void tsdbCloseDBF(SDBFile *pDBF) { void tsdbCloseDBF(SDBFile *pDBF) {
if (pDBF->pDB) { if (pDBF->pDB) {
tsdbCloseBDBDb(pDBF->pDB); tsdbCloseBDBDb(pDBF->pDB);
pDBF->pDB = tsdbFreeDBF(pDBF); pDBF->pDB = NULL;
} }
tfree(pDBF->path);
} }
int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
@ -146,3 +140,34 @@ int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data,
return 0; return 0;
} }
void *tsdbGetSmaDataByKey(SDBFile *pDBF, void* key, uint32_t keySize, uint32_t *valueSize) {
void *result = NULL;
DBT key1 = {0};
DBT value1 = {0};
int ret;
// Set key/value
key1.data = key;
key1.size = keySize;
// Query
// TODO: lock
ret = pDBF->pDB->get(pDBF->pDB, NULL, &key1, &value1, 0);
// TODO: unlock
if (ret != 0) {
return NULL;
}
result = calloc(1, value1.size);
if (result == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
*valueSize = value1.size;
memcpy(result, value1.data, value1.size);
return result;
}

View File

@ -15,6 +15,7 @@
#include "tsdbDef.h" #include "tsdbDef.h"
#undef SMA_PRINT_DEBUG_LOG
#define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_DAYS 30
#define SMA_STORAGE_TSDB_TIMES 30 #define SMA_STORAGE_TSDB_TIMES 30
#define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_STORAGE_SPLIT_HOURS 24
@ -32,23 +33,22 @@ typedef enum {
typedef struct { typedef struct {
STsdb * pTsdb; STsdb * pTsdb;
SDBFile *pDFile; SDBFile dFile;
int32_t interval; // interval with the precision of DB int32_t interval; // interval with the precision of DB
// TODO
} STSmaWriteH; } STSmaWriteH;
typedef struct { typedef struct {
int32_t iter; int32_t iter;
int32_t fid;
} SmaFsIter; } SmaFsIter;
typedef struct { typedef struct {
STsdb * pTsdb; STsdb * pTsdb;
char * pDFile; // TODO: use the real DFile type, not char* SDBFile dFile;
int32_t interval; // interval with the precision of DB int32_t interval; // interval with the precision of DB
int32_t blockSize; // size of SMA block item int32_t blockSize; // size of SMA block item
int8_t storageLevel; int8_t storageLevel;
int8_t days; int8_t days;
SmaFsIter smaFsIter; SmaFsIter smaFsIter;
// TODO
} STSmaReadH; } STSmaReadH;
typedef struct { typedef struct {
@ -69,23 +69,30 @@ struct SSmaStat {
}; };
// declaration of static functions // declaration of static functions
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg);
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
int32_t nMaxResult);
static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg);
static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat);
static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat);
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path);
static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv);
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData);
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen);
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision);
static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel); static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel);
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey);
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin);
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
SSmaEnv *pEnv = NULL; SSmaEnv *pEnv = NULL;
@ -408,25 +415,35 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
} }
/** /**
* @brief Insert TSma data blocks to B+Tree * @brief Insert TSma data blocks to DB File build by B+Tree
* *
* @param bTree * @param pSmaH
* @param smaKey * @param smaKey
* @param keyLen
* @param pData * @param pData
* @param dataLen * @param dataLen
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) { static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) {
SDBFile *pDBFile = pSmaH->pDFile; SDBFile *pDBFile = &pSmaH->dFile;
// TODO: insert sma data blocks into B+Tree // TODO: insert sma data blocks into B+Tree
tsdbDebug("insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", pDBFile->path, tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d",
*(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
if(tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0){ if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
#ifdef SMA_PRINT_DEBUG_LOG
uint32_t valueSize = 0;
void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize);
ASSERT(data != NULL);
for (uint32_t v = 0; v < valueSize; v += 8) {
tsdbWarn("vgId:%d sma data - val[%d] is %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v));
}
#endif
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -458,41 +475,41 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit
} }
} }
switch (intervalUnit) { switch (precision) {
case TD_TIME_UNIT_MILLISEC: case TSDB_TIME_PRECISION_MILLI:
if (TSDB_TIME_PRECISION_MILLI == precision) { if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
return interval;
} else if (TSDB_TIME_PRECISION_MICRO == precision) {
return interval * 1e3;
} else { // nano second
return interval * 1e6;
}
break;
case TD_TIME_UNIT_MICROSEC:
if (TSDB_TIME_PRECISION_MILLI == precision) {
return interval / 1e3; return interval / 1e3;
} else if (TSDB_TIME_PRECISION_MICRO == precision) { } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
return interval;
} else { // nano second
return interval * 1e3;
}
break;
case TD_TIME_UNIT_NANOSEC:
if (TSDB_TIME_PRECISION_MILLI == precision) {
return interval / 1e6; return interval / 1e6;
} else if (TSDB_TIME_PRECISION_MICRO == precision) { } else {
return interval / 1e3;
} else { // nano second
return interval; return interval;
} }
break; break;
default: case TSDB_TIME_PRECISION_MICRO:
if (TSDB_TIME_PRECISION_MILLI == precision) { if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
return interval;
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
return interval / 1e3;
} else {
return interval * 1e3; return interval * 1e3;
} else if (TSDB_TIME_PRECISION_MICRO == precision) { }
break;
case TSDB_TIME_PRECISION_NANO:
if (TD_TIME_UNIT_MICROSEC == intervalUnit) {
return interval * 1e3;
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
return interval;
} else {
return interval * 1e6; return interval * 1e6;
} else { // nano second }
return interval * 1e9; break;
default: // ms
if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
return interval / 1e3;
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
return interval / 1e6;
} else {
return interval;
} }
break; break;
} }
@ -551,37 +568,38 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) {
pSmaH->pTsdb = pTsdb; pSmaH->pTsdb = pTsdb;
pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision);
pSmaH->pDFile = (SDBFile *)calloc(1, sizeof(SDBFile *));
if (!pSmaH->pDFile) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) {
if (pSmaH) { if (pSmaH) {
if (pSmaH->pDFile) { tsdbCloseDBF(&pSmaH->dFile);
tsdbCloseDBF(pSmaH->pDFile);
}
} }
} }
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) {
STsdb *pTsdb = pSmaH->pTsdb; STsdb *pTsdb = pSmaH->pTsdb;
ASSERT(pSmaH->pDFile->path == NULL && pSmaH->pDFile->pDB == NULL); ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL);
char tSmaFile[TSDB_FILENAME_LEN] = {0}; char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid); snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid);
pSmaH->pDFile->path = strdup(tSmaFile); pSmaH->dFile.path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) { /**
STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb); * @brief
*
* @param pTsdb
* @param interval Interval calculated by DB's precision
* @param storageLevel
* @return int32_t
*/
static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) {
STsdbCfg *pCfg = REPO_CFG(pTsdb);
int32_t daysPerFile = pCfg->daysPerFile; int32_t daysPerFile = pCfg->daysPerFile;
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
int32_t days = 30 * (pSmaH->interval / tsTickPerDay[pCfg->precision]); int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]);
daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS; daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
} }
@ -600,7 +618,7 @@ static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) {
* @param msg * @param msg
* @return int32_t * @return int32_t
*/ */
int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
@ -624,16 +642,16 @@ int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
// Step 1: Judge the storage level and days // Step 1: Judge the storage level and days
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
int32_t daysPerFile = tsdbGetTSmaDays(&tSmaH, storageLevel); int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
// - Set and open the DFile or the B+Tree file // - Set and open the DFile or the B+Tree file
// TODO: tsdbStartTSmaCommit(); // TODO: tsdbStartTSmaCommit();
tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid);
if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, tSmaH.pDFile) != 0) { if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) {
tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb),
tSmaH.pDFile->path ? tSmaH.pDFile->path : "path is NULL", tstrerror(terrno)); tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
tsdbDestroyTSmaWriteH(&tSmaH); tsdbDestroyTSmaWriteH(&tSmaH);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -657,12 +675,12 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData,
char tSmaFile[TSDB_FILENAME_LEN] = {0}; char tSmaFile[TSDB_FILENAME_LEN] = {0};
snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid); snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid);
pSmaH->pDFile->path = strdup(tSmaFile); pSmaH->dFile.path = strdup(tSmaFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
STsdbCfg * pCfg = REPO_CFG(pTsdb); STsdbCfg * pCfg = REPO_CFG(pTsdb);
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
STSmaWriteH tSmaH = {0}; STSmaWriteH tSmaH = {0};
@ -698,35 +716,35 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
} }
/** /**
* @brief Init of tSma ReadH * @brief
* *
* @param pSmaH * @param pSmaH
* @param pTsdb * @param pTsdb
* @param param * @param interval
* @param pData * @param intervalUnit
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) {
pSmaH->pTsdb = pTsdb; pSmaH->pTsdb = pTsdb;
pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision);
// pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit);
pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel);
} }
/** /**
* @brief Init of tSma FS * @brief Init of tSma FS
* *
* @param pReadH * @param pReadH
* @param param * @param skey
* @param queryWin
* @return int32_t * @return int32_t
*/ */
static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey) {
int32_t storageLevel = 0; // tsdbGetSmaStorageLevel(param->interval, param->intervalUnit); int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pSmaH->pTsdb)->precision));
int32_t daysPerFile = char tSmaFile[TSDB_FILENAME_LEN] = {0};
storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pSmaH->pTsdb), fid);
pReadH->storageLevel = storageLevel; pSmaH->dFile.path = strdup(tSmaFile);
pReadH->days = daysPerFile; pSmaH->smaFsIter.iter = 0;
pReadH->smaFsIter.iter = 0; pSmaH->smaFsIter.fid = fid;
} }
/** /**
@ -738,17 +756,18 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
* @return true * @return true
* @return false * @return false
*/ */
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf; SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf;
int32_t nSmaFs = taosArrayGetSize(smaFs); int32_t nSmaFs = taosArrayGetSize(smaFs);
pReadH->pDFile = NULL; tsdbCloseDBF(&pReadH->dFile);
#if 0
while (pReadH->smaFsIter.iter < nSmaFs) { while (pReadH->smaFsIter.iter < nSmaFs) {
void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter); void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter);
if (pSmaFile) { // match(indexName, queryWindow) if (pSmaFile) { // match(indexName, queryWindow)
// TODO: select the file by index_name ... // TODO: select the file by index_name ...
pReadH->pDFile = pSmaFile; pReadH->dFile = pSmaFile;
++pReadH->smaFsIter.iter; ++pReadH->smaFsIter.iter;
break; break;
} }
@ -759,41 +778,82 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) {
tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]");
return true; return true;
} }
#endif
return false; return false;
} }
/** /**
* @brief Return the data between queryWin and fill the pData. * @brief
* *
* @param pTsdb * @param pTsdb Return the data between queryWin and fill the pData.
* @param param
* @param pData * @param pData
* @param queryWin * @param indexUid
* @param interval
* @param intervalUnit
* @param tableUid
* @param colId
* @param pQuerySKey
* @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM.
* @return int32_t * @return int32_t
*/ */
int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) { static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
SSmaStatItem *pItem = int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
(SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &pData->indexUid, sizeof(pData->indexUid)); int32_t nMaxResult) {
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
if (pItem == NULL) { if (pItem == NULL) {
// mark all window as expired and notify query module to query raw TS data. // mark all window as expired and notify query module to query raw TS data.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t nQueryWin = 0; #if 0
int32_t nQueryWin = taosArrayGetSize(pQuerySKey);
for (int32_t n = 0; n < nQueryWin; ++n) { for (int32_t n = 0; n < nQueryWin; ++n) {
TSKEY thisWindow = n; TSKEY skey = taosArrayGet(pQuerySKey, n);
if (taosHashGet(pItem->expiredWindows, &thisWindow, sizeof(thisWindow)) != NULL) { if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY)) != NULL) {
// TODO: mark this window as expired. // TODO: mark this window as expired.
} }
} }
#endif
#if 0
if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) {
// TODO: mark this window as expired.
}
#endif
STSmaReadH tReadH = {0}; STSmaReadH tReadH = {0};
tsdbInitTSmaReadH(&tReadH, pTsdb, pData); tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit);
tsdbCloseDBF(&tReadH.dFile);
tsdbInitTSmaFile(&tReadH, queryWin); tsdbInitTSmaFile(&tReadH, querySkey);
if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) {
tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno));
return TSDB_CODE_FAILED;
}
char smaKey[SMA_KEY_LEN] = {0};
void *pSmaKey = &smaKey;
tsdbEncodeTSmaKey(tableUid, colId, querySkey, (void **)&pSmaKey);
tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN);
void * result = NULL;
uint32_t valueSize = 0;
if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) {
tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64
" since %s",
REPO_ID(pTsdb), indexUid, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
*(int64_t *)POINTER_SHIFT(smaKey, 10), tstrerror(terrno));
tsdbCloseDBF(&tReadH.dFile);
return TSDB_CODE_FAILED;
}
#ifdef SMA_PRINT_DEBUG_LOG
for (uint32_t v = 0; v < valueSize; v += 8) {
tsdbWarn("vgId:%d v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v));
}
#endif
#if 0
int32_t nResult = 0; int32_t nResult = 0;
int64_t lastKey = 0; int64_t lastKey = 0;
@ -815,8 +875,9 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *
} }
} }
} }
#endif
// read data from file and fill the result // read data from file and fill the result
tsdbCloseDBF(&tReadH.dFile);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -855,3 +916,54 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif #endif
/**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
* TODO: Who is responsible for resource allocate and release?
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) {
tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
/**
* @brief Insert Time-range-wise Rollup Sma(RSma) data
*
* @param pTsdb
* @param param
* @param msg
* @return int32_t
*/
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) {
tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}
int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit,
tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, int32_t nMaxResult) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySkey,
nMaxResult)) < 0) {
tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno));
}
return code;
}

View File

@ -34,6 +34,7 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL);
} }
#if 0
/** /**
* @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine
* *
@ -74,3 +75,5 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) {
} }
return code; return code;
} }
#endif

View File

@ -33,7 +33,7 @@ int main(int argc, char **argv) {
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
TEST(testCase, tSmaEncodeDecodeTest) { TEST(testCase, tSma_Meta_Encode_Decode_Test) {
// encode // encode
STSma tSma = {0}; STSma tSma = {0};
tSma.version = 0; tSma.version = 0;
@ -87,8 +87,9 @@ TEST(testCase, tSmaEncodeDecodeTest) {
tdDestroyTSma(&tSma); tdDestroyTSma(&tSma);
tdDestroyTSmaWrapper(&dstTSmaWrapper); tdDestroyTSmaWrapper(&dstTSmaWrapper);
} }
#if 1 #if 1
TEST(testCase, tSma_DB_Put_Get_Del_Test) { TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName1 = "sma_index_test_1";
const char * smaIndexName2 = "sma_index_test_2"; const char * smaIndexName2 = "sma_index_test_2";
const char * timezone = "Asia/Shanghai"; const char * timezone = "Asia/Shanghai";
@ -220,16 +221,21 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) {
#endif #endif
#if 1 #if 1
TEST(testCase, tSmaInsertTest) { TEST(testCase, tSma_Data_Insert_Query_Test) {
// prepare meta // step 1: prepare meta
const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName1 = "sma_index_test_1";
const char * timezone = "Asia/Shanghai"; const char * timezone = "Asia/Shanghai";
const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;";
const char * tagsFilter = "I'm tags filter"; const char * tagsFilter = "where tags.location='Beijing' and tags.district='ChaoYang'";
const char * smaTestDir = "./smaTest"; const char * smaTestDir = "./smaTest";
const tb_uid_t tbUid = 1234567890; const tb_uid_t tbUid = 1234567890;
const int64_t indexUid1 = 2000000001; const int64_t indexUid1 = 2000000001;
const int64_t interval1 = 1;
const int8_t intervalUnit1 = TD_TIME_UNIT_DAY;
const uint32_t nCntTSma = 2; const uint32_t nCntTSma = 2;
TSKEY skey1 = 1646987196;
const int64_t testSmaData1 = 100;
const int64_t testSmaData2 = 200;
// encode // encode
STSma tSma = {0}; STSma tSma = {0};
tSma.version = 0; tSma.version = 0;
@ -261,7 +267,7 @@ TEST(testCase, tSmaInsertTest) {
// save index 1 // save index 1
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
// insert data // step 2: insert data
STSmaDataWrapper *pSmaData = NULL; STSmaDataWrapper *pSmaData = NULL;
STsdb tsdb = {0}; STsdb tsdb = {0};
STsdbCfg * pCfg = &tsdb.config; STsdbCfg * pCfg = &tsdb.config;
@ -276,6 +282,21 @@ TEST(testCase, tSmaInsertTest) {
tsdb.config.update = TD_ROW_OVERWRITE_UPDATE; tsdb.config.update = TD_ROW_OVERWRITE_UPDATE;
tsdb.config.compression = TWO_STAGE_COMP; tsdb.config.compression = TWO_STAGE_COMP;
switch (tsdb.config.precision) {
case TSDB_TIME_PRECISION_MILLI:
skey1 *= 1e3;
break;
case TSDB_TIME_PRECISION_MICRO:
skey1 *= 1e6;
break;
case TSDB_TIME_PRECISION_NANO:
skey1 *= 1e9;
break;
default: // ms
skey1 *= 1e3;
break;
}
char *msg = (char *)calloc(100, 1); char *msg = (char *)calloc(100, 1);
EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
@ -286,21 +307,21 @@ TEST(testCase, tSmaInsertTest) {
void * buf = NULL; void * buf = NULL;
EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0); EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
int32_t bufSize = taosTSizeof(buf); int32_t bufSize = taosTSizeof(buf);
int32_t numOfTables = 5; int32_t numOfTables = 10;
col_id_t numOfCols = 10; col_id_t numOfCols = 4096;
EXPECT_GT(numOfCols, 0); EXPECT_GT(numOfCols, 0);
pSmaData = (STSmaDataWrapper *)buf; pSmaData = (STSmaDataWrapper *)buf;
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
pSmaData->skey = 1646987196000; pSmaData->skey = skey1;
pSmaData->interval = 10; pSmaData->interval = interval1;
pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE; pSmaData->intervalUnit = intervalUnit1;
pSmaData->indexUid = indexUid1; pSmaData->indexUid = indexUid1;
int32_t len = sizeof(STSmaDataWrapper); int32_t len = sizeof(STSmaDataWrapper);
for (int32_t t = 0; t < numOfTables; ++t) { for (int32_t t = 0; t < numOfTables; ++t) {
STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len); STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len);
pTbData->tableUid = t; pTbData->tableUid = tbUid + t;
int32_t tableDataLen = sizeof(STSmaTbData); int32_t tableDataLen = sizeof(STSmaTbData);
for (col_id_t c = 0; c < numOfCols; ++c) { for (col_id_t c = 0; c < numOfCols; ++c) {
@ -313,8 +334,17 @@ TEST(testCase, tSmaInsertTest) {
} }
STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen); STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen);
pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID; pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID;
pColData->blockSize = ((c & 1) == 0) ? 8 : 16;
// TODO: fill col data // TODO: fill col data
if ((c & 1) == 0) {
pColData->blockSize = 8;
memcpy(pColData->data, &testSmaData1, 8);
} else {
pColData->blockSize = 16;
memcpy(pColData->data, &testSmaData1, 8);
memcpy(POINTER_SHIFT(pColData->data, 8), &testSmaData2, 8);
}
tableDataLen += (sizeof(STSmaColData) + pColData->blockSize); tableDataLen += (sizeof(STSmaColData) + pColData->blockSize);
} }
pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData)); pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData));
@ -328,6 +358,19 @@ TEST(testCase, tSmaInsertTest) {
// execute // execute
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
// step 3: query
uint32_t checkDataCnt = 0;
for (int32_t t = 0; t < numOfTables; ++t) {
for (col_id_t c = 0; c < numOfCols; ++c) {
EXPECT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1),
TSDB_CODE_SUCCESS);
++checkDataCnt;
}
}
printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt);
// release data // release data
taosTZfree(buf); taosTZfree(buf);
// release meta // release meta