more TDB integration
This commit is contained in:
parent
317d4ff2eb
commit
61d0180b85
|
@ -16,8 +16,20 @@
|
||||||
#include "metaDef.h"
|
#include "metaDef.h"
|
||||||
|
|
||||||
#include "tdbInt.h"
|
#include "tdbInt.h"
|
||||||
|
typedef struct SPoolMem {
|
||||||
|
int64_t size;
|
||||||
|
struct SPoolMem *prev;
|
||||||
|
struct SPoolMem *next;
|
||||||
|
} SPoolMem;
|
||||||
|
|
||||||
|
static SPoolMem *openPool();
|
||||||
|
static void clearPool(SPoolMem *pPool);
|
||||||
|
static void closePool(SPoolMem *pPool);
|
||||||
|
static void *poolMalloc(void *arg, size_t size);
|
||||||
|
static void poolFree(void *arg, void *ptr);
|
||||||
|
|
||||||
struct SMetaDB {
|
struct SMetaDB {
|
||||||
|
TXN txn;
|
||||||
TENV *pEnv;
|
TENV *pEnv;
|
||||||
TDB *pTbDB;
|
TDB *pTbDB;
|
||||||
TDB *pSchemaDB;
|
TDB *pSchemaDB;
|
||||||
|
@ -25,6 +37,7 @@ struct SMetaDB {
|
||||||
TDB *pStbIdx;
|
TDB *pStbIdx;
|
||||||
TDB *pNtbIdx;
|
TDB *pNtbIdx;
|
||||||
TDB *pCtbIdx;
|
TDB *pCtbIdx;
|
||||||
|
SPoolMem *pPool;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct __attribute__((__packed__)) {
|
typedef struct __attribute__((__packed__)) {
|
||||||
|
@ -167,6 +180,8 @@ int metaOpenDB(SMeta *pMeta) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMetaDb->pPool = openPool();
|
||||||
|
tdbTxnOpen(&pMetaDb->txn, 0, poolMalloc, poolFree, pMetaDb->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
tdbBegin(pMetaDb->pEnv, NULL);
|
tdbBegin(pMetaDb->pEnv, NULL);
|
||||||
|
|
||||||
pMeta->pDB = pMetaDb;
|
pMeta->pDB = pMetaDb;
|
||||||
|
@ -214,7 +229,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
||||||
pVal = pBuf = buf;
|
pVal = pBuf = buf;
|
||||||
metaEncodeTbInfo(&pBuf, pTbCfg);
|
metaEncodeTbInfo(&pBuf, pTbCfg);
|
||||||
vLen = POINTER_DISTANCE(pBuf, buf);
|
vLen = POINTER_DISTANCE(pBuf, buf);
|
||||||
ret = tdbDbInsert(pMetaDb->pTbDB, pKey, kLen, pVal, vLen, NULL);
|
ret = tdbDbInsert(pMetaDb->pTbDB, pKey, kLen, pVal, vLen, &pMetaDb->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -236,7 +251,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
||||||
pVal = pBuf = buf;
|
pVal = pBuf = buf;
|
||||||
metaEncodeSchemaEx(&pBuf, &schemaWrapper);
|
metaEncodeSchemaEx(&pBuf, &schemaWrapper);
|
||||||
vLen = POINTER_DISTANCE(pBuf, buf);
|
vLen = POINTER_DISTANCE(pBuf, buf);
|
||||||
ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen, NULL);
|
ret = tdbDbInsert(pMetaDb->pSchemaDB, pKey, kLen, pVal, vLen, &pMeta->pDB->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -250,7 +265,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
||||||
kLen = nameLen + 1 + sizeof(uid);
|
kLen = nameLen + 1 + sizeof(uid);
|
||||||
pVal = NULL;
|
pVal = NULL;
|
||||||
vLen = 0;
|
vLen = 0;
|
||||||
ret = tdbDbInsert(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen, NULL);
|
ret = tdbDbInsert(pMetaDb->pNameIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -261,7 +276,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
||||||
kLen = sizeof(uid);
|
kLen = sizeof(uid);
|
||||||
pVal = NULL;
|
pVal = NULL;
|
||||||
vLen = 0;
|
vLen = 0;
|
||||||
ret = tdbDbInsert(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen, NULL);
|
ret = tdbDbInsert(pMetaDb->pStbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -272,7 +287,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
||||||
kLen = sizeof(ctbIdxKey);
|
kLen = sizeof(ctbIdxKey);
|
||||||
pVal = NULL;
|
pVal = NULL;
|
||||||
vLen = 0;
|
vLen = 0;
|
||||||
ret = tdbDbInsert(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen, NULL);
|
ret = tdbDbInsert(pMetaDb->pCtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -281,12 +296,16 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
|
||||||
kLen = sizeof(uid);
|
kLen = sizeof(uid);
|
||||||
pVal = NULL;
|
pVal = NULL;
|
||||||
vLen = 0;
|
vLen = 0;
|
||||||
ret = tdbDbInsert(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen, NULL);
|
ret = tdbDbInsert(pMetaDb->pNtbIdx, pKey, kLen, pVal, vLen, &pMetaDb->txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pMeta->pDB->pPool->size > 0) {
|
||||||
|
metaCommit(pMeta);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -705,6 +724,82 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaCommit(SMeta *pMeta) {
|
int metaCommit(SMeta *pMeta) {
|
||||||
tdbCommit(pMeta->pDB->pEnv, NULL);
|
TXN *pTxn = &pMeta->pDB->txn;
|
||||||
|
|
||||||
|
// Commit current txn
|
||||||
|
tdbCommit(pMeta->pDB->pEnv, pTxn);
|
||||||
|
tdbTxnClose(pTxn);
|
||||||
|
clearPool(pMeta->pDB->pPool);
|
||||||
|
|
||||||
|
// start a new txn
|
||||||
|
tdbTxnOpen(&pMeta->pDB->txn, 0, poolMalloc, poolFree, pMeta->pDB->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
tdbBegin(pMeta->pDB->pEnv, pTxn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SPoolMem *openPool() {
|
||||||
|
SPoolMem *pPool = (SPoolMem *)tdbOsMalloc(sizeof(*pPool));
|
||||||
|
|
||||||
|
pPool->prev = pPool->next = pPool;
|
||||||
|
pPool->size = 0;
|
||||||
|
|
||||||
|
return pPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void clearPool(SPoolMem *pPool) {
|
||||||
|
SPoolMem *pMem;
|
||||||
|
|
||||||
|
do {
|
||||||
|
pMem = pPool->next;
|
||||||
|
|
||||||
|
if (pMem == pPool) break;
|
||||||
|
|
||||||
|
pMem->next->prev = pMem->prev;
|
||||||
|
pMem->prev->next = pMem->next;
|
||||||
|
pPool->size -= pMem->size;
|
||||||
|
|
||||||
|
tdbOsFree(pMem);
|
||||||
|
} while (1);
|
||||||
|
|
||||||
|
assert(pPool->size == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void closePool(SPoolMem *pPool) {
|
||||||
|
clearPool(pPool);
|
||||||
|
tdbOsFree(pPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *poolMalloc(void *arg, size_t size) {
|
||||||
|
void *ptr = NULL;
|
||||||
|
SPoolMem *pPool = (SPoolMem *)arg;
|
||||||
|
SPoolMem *pMem;
|
||||||
|
|
||||||
|
pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size);
|
||||||
|
if (pMem == NULL) {
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
pMem->size = sizeof(*pMem) + size;
|
||||||
|
pMem->next = pPool->next;
|
||||||
|
pMem->prev = pPool;
|
||||||
|
|
||||||
|
pPool->next->prev = pMem;
|
||||||
|
pPool->next = pMem;
|
||||||
|
pPool->size += pMem->size;
|
||||||
|
|
||||||
|
ptr = (void *)(&pMem[1]);
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void poolFree(void *arg, void *ptr) {
|
||||||
|
SPoolMem *pPool = (SPoolMem *)arg;
|
||||||
|
SPoolMem *pMem;
|
||||||
|
|
||||||
|
pMem = &(((SPoolMem *)ptr)[-1]);
|
||||||
|
|
||||||
|
pMem->next->prev = pMem->prev;
|
||||||
|
pMem->prev->next = pMem->next;
|
||||||
|
pPool->size -= pMem->size;
|
||||||
|
|
||||||
|
tdbOsFree(pMem);
|
||||||
|
}
|
||||||
|
|
|
@ -27,5 +27,5 @@ void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */
|
||||||
|
|
||||||
tb_uid_t metaGenerateUid(SMeta *pMeta) {
|
tb_uid_t metaGenerateUid(SMeta *pMeta) {
|
||||||
// Generate a new table UID
|
// Generate a new table UID
|
||||||
return tGenIdPI32();
|
return tGenIdPI64();
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ int vnodeSyncCommit(SVnode *pVnode) {
|
||||||
static int vnodeCommit(void *arg) {
|
static int vnodeCommit(void *arg) {
|
||||||
SVnode *pVnode = (SVnode *)arg;
|
SVnode *pVnode = (SVnode *)arg;
|
||||||
|
|
||||||
metaCommit(pVnode->pMeta);
|
// metaCommit(pVnode->pMeta);
|
||||||
tqCommit(pVnode->pTq);
|
tqCommit(pVnode->pTq);
|
||||||
tsdbCommit(pVnode->pTsdb);
|
tsdbCommit(pVnode->pTsdb);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue