impl meta with lock

This commit is contained in:
Hongze Cheng 2022-01-26 08:52:25 +00:00
parent 350cf232ce
commit b31b131833
1 changed files with 62 additions and 19 deletions

View File

@ -20,6 +20,10 @@
#include "tcoding.h" #include "tcoding.h"
#include "thash.h" #include "thash.h"
#define IMPL_WITH_LOCK 1
// #if IMPL_WITH_LOCK
// #endif
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int32_t sver; int32_t sver;
@ -27,6 +31,9 @@ typedef struct {
} SSchemaKey; } SSchemaKey;
struct SMetaDB { struct SMetaDB {
#if IMPL_WITH_LOCK
pthread_rwlock_t rwlock;
#endif
// DB // DB
DB *pTbDB; DB *pTbDB;
DB *pSchemaDB; DB *pSchemaDB;
@ -58,6 +65,9 @@ static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static void metaClearTbCfg(STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg);
static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW);
static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW); static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW);
static void metaDBWLock(SMetaDB *pDB);
static void metaDBRLock(SMetaDB *pDB);
static void metaDBULock(SMetaDB *pDB);
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
@ -131,7 +141,8 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
tb_uid_t uid; tb_uid_t uid;
char buf[512]; char buf[512];
void * pBuf; void * pBuf;
DBT key, value; DBT key1, value1;
DBT key2, value2;
SSchema *pSchema = NULL; SSchema *pSchema = NULL;
if (pTbCfg->type == META_SUPER_TABLE) { if (pTbCfg->type == META_SUPER_TABLE) {
@ -143,19 +154,17 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
{ {
// save table info // save table info
pBuf = buf; pBuf = buf;
memset(&key, 0, sizeof(key)); memset(&key1, 0, sizeof(key1));
memset(&value, 0, sizeof(key)); memset(&value1, 0, sizeof(key1));
key.data = &uid; key1.data = &uid;
key.size = sizeof(uid); key1.size = sizeof(uid);
metaEncodeTbInfo(&pBuf, pTbCfg); metaEncodeTbInfo(&pBuf, pTbCfg);
value.data = buf; value1.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf); value1.size = POINTER_DISTANCE(pBuf, buf);
value.app_data = pTbCfg; value1.app_data = pTbCfg;
pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key, &value, 0);
} }
// save schema // save schema
@ -170,22 +179,25 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
if (pSchema) { if (pSchema) {
pBuf = buf; pBuf = buf;
memset(&key, 0, sizeof(key)); memset(&key2, 0, sizeof(key2));
memset(&value, 0, sizeof(key)); memset(&value2, 0, sizeof(key2));
SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0}; SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0};
key.data = &schemaKey; key2.data = &schemaKey;
key.size = sizeof(schemaKey); key2.size = sizeof(schemaKey);
SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema}; SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema};
metaEncodeSchema(&pBuf, &sw); metaEncodeSchema(&pBuf, &sw);
value.data = buf; value2.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf); value2.size = POINTER_DISTANCE(pBuf, buf);
pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0);
} }
metaDBWLock(pMeta->pDB);
pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key1, &value1, 0);
pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key2, &value2, 0);
metaDBULock(pMeta->pDB);
return 0; return 0;
} }
@ -234,11 +246,18 @@ static SMetaDB *metaNewDB() {
return NULL; return NULL;
} }
#if IMPL_WITH_LOCK
pthread_rwlock_init(&pDB->rwlock, NULL);
#endif
return pDB; return pDB;
} }
static void metaFreeDB(SMetaDB *pDB) { static void metaFreeDB(SMetaDB *pDB) {
if (pDB) { if (pDB) {
#if IMPL_WITH_LOCK
pthread_rwlock_destroy(&pDB->rwlock);
#endif
free(pDB); free(pDB);
} }
} }
@ -467,7 +486,9 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
key.size = sizeof(uid); key.size = sizeof(uid);
// Query // Query
metaDBRLock(pDB);
ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0); ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0);
metaDBULock(pDB);
if (ret != 0) { if (ret != 0) {
return NULL; return NULL;
} }
@ -496,7 +517,9 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
key.size = strlen(tbname); key.size = strlen(tbname);
// Query // Query
metaDBRLock(pDB);
ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0); ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0);
metaDBULock(pDB);
if (ret != 0) { if (ret != 0) {
return NULL; return NULL;
} }
@ -529,7 +552,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
key.size = sizeof(schemaKey); key.size = sizeof(schemaKey);
// Query // Query
metaDBRLock(pDB);
ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0); ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0);
metaDBULock(pDB);
if (ret != 0) { if (ret != 0) {
printf("failed to query schema DB since %s================\n", db_strerror(ret)); printf("failed to query schema DB since %s================\n", db_strerror(ret));
return NULL; return NULL;
@ -687,4 +712,22 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
} else { } else {
return 0; return 0;
} }
} }
static void metaDBWLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
pthread_rwlock_wrlock(&(pDB->rwlock));
#endif
}
static void metaDBRLock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
pthread_rwlock_rdlock(&(pDB->rwlock));
#endif
}
static void metaDBULock(SMetaDB *pDB) {
#if IMPL_WITH_LOCK
pthread_rwlock_unlock(&(pDB->rwlock));
#endif
}