put/get sma schema from bdb
This commit is contained in:
parent
1688a9e002
commit
228292387c
|
@ -1885,7 +1885,7 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) {
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdDestroyWrapper(STSmaWrapper* pSW) {
|
||||
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) {
|
||||
if (pSW && pSW->tSma) {
|
||||
for (uint32_t i = 0; i < pSW->number; ++i) {
|
||||
tdDestroyTSma(pSW->tSma + i, false);
|
||||
|
|
|
@ -38,8 +38,10 @@ typedef struct SMetaCfg {
|
|||
|
||||
typedef struct SMTbCursor SMTbCursor;
|
||||
typedef struct SMCtbCursor SMCtbCursor;
|
||||
typedef struct SMSmaCursor SMSmaCursor;
|
||||
|
||||
typedef SVCreateTbReq STbCfg;
|
||||
typedef STSma SSmaCfg;
|
||||
|
||||
// SMeta operations
|
||||
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
|
||||
|
@ -50,19 +52,24 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid);
|
|||
int metaCommit(SMeta *pMeta);
|
||||
|
||||
// For Query
|
||||
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
||||
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
||||
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
||||
SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName);
|
||||
|
||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
char *metaTbCursorNext(SMTbCursor *pTbCur);
|
||||
char * metaTbCursorNext(SMTbCursor *pTbCur);
|
||||
|
||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
|
||||
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
|
||||
|
||||
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
|
||||
void metaCloseSmaCurosr(SMSmaCursor *pSmaCur);
|
||||
const char * metaSmaCursorNext(SMSmaCursor *pSmaCur);
|
||||
|
||||
// Options
|
||||
void metaOptionsInit(SMetaCfg *pMetaCfg);
|
||||
void metaOptionsClear(SMetaCfg *pMetaCfg);
|
||||
|
|
|
@ -33,6 +33,8 @@ int metaOpenDB(SMeta* pMeta);
|
|||
void metaCloseDB(SMeta* pMeta);
|
||||
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg);
|
||||
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
|
||||
int metaSaveSmaToDB(SMeta* pMeta, SSmaCfg* pTbCfg);
|
||||
int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName);
|
||||
|
||||
// SMetaCache
|
||||
int metaOpenCache(SMeta* pMeta);
|
||||
|
|
|
@ -45,6 +45,7 @@ struct SMetaDB {
|
|||
DB *pStbIdx;
|
||||
DB *pNtbIdx;
|
||||
DB *pCtbIdx;
|
||||
DB *pSmaIdx;
|
||||
// ENV
|
||||
DB_ENV *pEvn;
|
||||
};
|
||||
|
@ -63,6 +64,7 @@ static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT
|
|||
static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
|
||||
static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
|
||||
static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
|
||||
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey);
|
||||
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
|
||||
static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
|
||||
static void metaClearTbCfg(STbCfg *pTbCfg);
|
||||
|
@ -128,11 +130,17 @@ int metaOpenDB(SMeta *pMeta) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (metaOpenBDBIdx(&(pDB->pSmaIdx), pDB->pEvn, "sma.index", pDB->pSmaDB, &metaSmaIdxCb, true) < 0) {
|
||||
metaCloseDB(pMeta);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void metaCloseDB(SMeta *pMeta) {
|
||||
if (pMeta->pDB) {
|
||||
metaCloseBDBIdx(pMeta->pDB->pSmaIdx);
|
||||
metaCloseBDBIdx(pMeta->pDB->pCtbIdx);
|
||||
metaCloseBDBIdx(pMeta->pDB->pNtbIdx);
|
||||
metaCloseBDBIdx(pMeta->pDB->pStbIdx);
|
||||
|
@ -218,6 +226,49 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int metaSaveSmaToDB(SMeta *pMeta, SSmaCfg *pSmaCfg) {
|
||||
char buf[512] = {0}; // TODO: may overflow
|
||||
void *pBuf = NULL;
|
||||
DBT key1 = {0}, value1 = {0};
|
||||
|
||||
{
|
||||
// save sma info
|
||||
pBuf = buf;
|
||||
|
||||
key1.data = pSmaCfg->indexName;
|
||||
key1.size = strlen(key1.data);
|
||||
|
||||
tEncodeTSma(&pBuf, pSmaCfg);
|
||||
|
||||
value1.data = buf;
|
||||
value1.size = POINTER_DISTANCE(pBuf, buf);
|
||||
value1.app_data = pSmaCfg;
|
||||
}
|
||||
|
||||
metaDBWLock(pMeta->pDB);
|
||||
pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0);
|
||||
metaDBULock(pMeta->pDB);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int metaRemoveSmaFromDb(SMeta *pMeta, const char *indexName) {
|
||||
// TODO
|
||||
#if 0
|
||||
DBT key = {0};
|
||||
|
||||
key.data = (void *)indexName;
|
||||
key.size = strlen(indexName);
|
||||
|
||||
metaDBWLock(pMeta->pDB);
|
||||
// TODO: No guarantee of consistence.
|
||||
// Use transaction or DB->sync() for some guarantee.
|
||||
pMeta->pDB->pSmaDB->del(pMeta->pDB->pSmaDB, NULL, &key, 0);
|
||||
metaDBULock(pMeta->pDB);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* ------------------------ STATIC METHODS ------------------------ */
|
||||
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW) {
|
||||
int tlen = 0;
|
||||
|
@ -433,6 +484,16 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
|
|||
}
|
||||
}
|
||||
|
||||
static int metaSmaIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
|
||||
SSmaCfg *pSmaCfg = (SSmaCfg *)(pValue->app_data);
|
||||
|
||||
memset(pSKey, 0, sizeof(*pSKey));
|
||||
pSKey->data = &(pSmaCfg->tableUid);
|
||||
pSKey->size = sizeof(pSmaCfg->tableUid);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
|
||||
int tsize = 0;
|
||||
|
||||
|
@ -548,6 +609,36 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
|
|||
return pTbCfg;
|
||||
}
|
||||
|
||||
SSmaCfg *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) {
|
||||
SSmaCfg *pCfg = NULL;
|
||||
SMetaDB *pDB = pMeta->pDB;
|
||||
DBT key = {0};
|
||||
DBT value = {0};
|
||||
int ret;
|
||||
|
||||
// Set key/value
|
||||
key.data = (void *)indexName;
|
||||
key.size = strlen(indexName);
|
||||
|
||||
// Query
|
||||
metaDBRLock(pDB);
|
||||
ret = pDB->pTbDB->get(pDB->pSmaDB, NULL, &key, &value, 0);
|
||||
metaDBULock(pDB);
|
||||
if (ret != 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// Decode
|
||||
pCfg = (SSmaCfg *)malloc(sizeof(SSmaCfg));
|
||||
if (pCfg == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tDecodeTSma(value.data, pCfg);
|
||||
|
||||
return pCfg;
|
||||
}
|
||||
|
||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline) {
|
||||
uint32_t nCols;
|
||||
SSchemaWrapper *pSW = NULL;
|
||||
|
@ -726,6 +817,61 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
|
|||
}
|
||||
}
|
||||
|
||||
struct SMSmaCursor {
|
||||
DBC *pCur;
|
||||
tb_uid_t uid;
|
||||
};
|
||||
|
||||
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
|
||||
SMSmaCursor *pCur = NULL;
|
||||
SMetaDB *pDB = pMeta->pDB;
|
||||
int ret;
|
||||
|
||||
pCur = (SMSmaCursor *)calloc(1, sizeof(*pCur));
|
||||
if (pCur == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCur->uid = uid;
|
||||
ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0);
|
||||
if (ret != 0) {
|
||||
free(pCur);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pCur;
|
||||
}
|
||||
|
||||
void metaCloseSmaCurosr(SMSmaCursor *pCur) {
|
||||
if (pCur) {
|
||||
if (pCur->pCur) {
|
||||
pCur->pCur->close(pCur->pCur);
|
||||
}
|
||||
|
||||
free(pCur);
|
||||
}
|
||||
}
|
||||
|
||||
const char* metaSmaCursorNext(SMSmaCursor *pCur) {
|
||||
DBT skey = {0};
|
||||
DBT pkey = {0};
|
||||
DBT pval = {0};
|
||||
void *pBuf;
|
||||
|
||||
// Set key
|
||||
skey.data = &(pCur->uid);
|
||||
skey.size = sizeof(pCur->uid);
|
||||
|
||||
if (pCur->pCur->pget(pCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
|
||||
const char* indexName = (const char *)pkey.data;
|
||||
assert(indexName != NULL);
|
||||
return indexName;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void metaDBWLock(SMetaDB *pDB) {
|
||||
#if IMPL_WITH_LOCK
|
||||
pthread_rwlock_wrlock(&(pDB->rwlock));
|
||||
|
|
|
@ -106,3 +106,20 @@ int metaRemoveTableFromIdx(SMeta *pMeta, tb_uid_t uid) {
|
|||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
int metaCreateSma(SMeta *pMeta, SSmaCfg *pSmaCfg) {
|
||||
// Validate the tbOptions
|
||||
// if (metaValidateTbCfg(pMeta, pTbCfg) < 0) {
|
||||
// // TODO: handle error
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
// TODO: add atomicity
|
||||
|
||||
if (metaSaveSmaToDB(pMeta, pSmaCfg) < 0) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include <tglobal.h>
|
||||
#include <iostream>
|
||||
|
||||
#include <metaDef.h>
|
||||
#include <tmsg.h>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
|
@ -94,7 +95,93 @@ TEST(testCase, tSmaEncodeDecodeTest) {
|
|||
|
||||
// resource release
|
||||
tdDestroyTSma(&tSma, false);
|
||||
tdDestroyWrapper(&dstTSmaWrapper);
|
||||
tdDestroyTSmaWrapper(&dstTSmaWrapper);
|
||||
}
|
||||
|
||||
TEST(testCase, tSma_DB_Put_Get_Del_Test) {
|
||||
const char *smaIndexName1 = "sma_index_test_1";
|
||||
const char *smaIndexName2 = "sma_index_test_2";
|
||||
const char *smaTestDir = "./smaTest";
|
||||
const uint64_t tbUid = 1234567890;
|
||||
// encode
|
||||
STSma tSma = {0};
|
||||
tSma.version = 0;
|
||||
tSma.intervalUnit = TD_TIME_UNIT_DAY;
|
||||
tSma.interval = 1;
|
||||
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
|
||||
tSma.sliding = 0;
|
||||
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
|
||||
tSma.tableUid = tbUid;
|
||||
tSma.numOfColIds = 2;
|
||||
tSma.numOfFuncIds = 5; // sum/min/max/avg/last
|
||||
tSma.colIds = (col_id_t *)calloc(tSma.numOfColIds, sizeof(col_id_t));
|
||||
tSma.funcIds = (uint16_t *)calloc(tSma.numOfFuncIds, sizeof(uint16_t));
|
||||
|
||||
for (int32_t i = 0; i < tSma.numOfColIds; ++i) {
|
||||
*(tSma.colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
}
|
||||
for (int32_t i = 0; i < tSma.numOfFuncIds; ++i) {
|
||||
*(tSma.funcIds + i) = (i + 2);
|
||||
}
|
||||
|
||||
SMeta * pMeta = NULL;
|
||||
SSmaCfg * pSmaCfg = &tSma;
|
||||
const SMetaCfg *pMetaCfg = &defaultMetaOptions;
|
||||
|
||||
taosRemoveDir(smaTestDir);
|
||||
|
||||
pMeta = metaOpen(smaTestDir, pMetaCfg, NULL);
|
||||
assert(pMeta != NULL);
|
||||
// save index 1
|
||||
metaSaveSmaToDB(pMeta, pSmaCfg);
|
||||
|
||||
tstrncpy(pSmaCfg->indexName, smaIndexName2, TSDB_INDEX_NAME_LEN);
|
||||
pSmaCfg->version = 1;
|
||||
pSmaCfg->intervalUnit = TD_TIME_UNIT_HOUR;
|
||||
pSmaCfg->interval = 1;
|
||||
pSmaCfg->slidingUnit = TD_TIME_UNIT_MINUTE;
|
||||
pSmaCfg->sliding = 5;
|
||||
|
||||
// save index 2
|
||||
metaSaveSmaToDB(pMeta, pSmaCfg);
|
||||
|
||||
// get value by indexName
|
||||
SSmaCfg *qSmaCfg = NULL;
|
||||
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1);
|
||||
assert(qSmaCfg != NULL);
|
||||
printf("name1 = %s\n", qSmaCfg->indexName);
|
||||
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
|
||||
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
|
||||
tdDestroyTSma(qSmaCfg, true);
|
||||
|
||||
qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2);
|
||||
assert(qSmaCfg != NULL);
|
||||
printf("name2 = %s\n", qSmaCfg->indexName);
|
||||
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
|
||||
EXPECT_EQ(qSmaCfg->interval, tSma.interval);
|
||||
tdDestroyTSma(qSmaCfg, true);
|
||||
|
||||
// get value by table uid
|
||||
SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid);
|
||||
assert(pSmaCur != NULL);
|
||||
uint32_t indexCnt = 0;
|
||||
while (1) {
|
||||
const char* indexName = metaSmaCursorNext(pSmaCur);
|
||||
if (indexName == NULL) {
|
||||
break;
|
||||
}
|
||||
printf("indexName = %s\n", indexName);
|
||||
++indexCnt;
|
||||
}
|
||||
EXPECT_EQ(indexCnt, 2);
|
||||
metaCloseSmaCurosr(pSmaCur);
|
||||
|
||||
// resource release
|
||||
metaRemoveSmaFromDb(pMeta, smaIndexName1);
|
||||
metaRemoveSmaFromDb(pMeta, smaIndexName2);
|
||||
|
||||
tdDestroyTSma(&tSma, false);
|
||||
metaClose(pMeta);
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
|
Loading…
Reference in New Issue