diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index dfdc144750..69bce7bbeb 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -20,6 +20,10 @@ #include "tcoding.h" #include "thash.h" +#define IMPL_WITH_LOCK 1 +// #if IMPL_WITH_LOCK +// #endif + typedef struct { tb_uid_t uid; int32_t sver; @@ -27,6 +31,9 @@ typedef struct { } SSchemaKey; struct SMetaDB { +#if IMPL_WITH_LOCK + pthread_rwlock_t rwlock; +#endif // DB DB *pTbDB; DB *pSchemaDB; @@ -58,6 +65,9 @@ static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); static void metaClearTbCfg(STbCfg *pTbCfg); static int metaEncodeSchema(void **buf, SSchemaWrapper *pSW); static void * metaDecodeSchema(void *buf, SSchemaWrapper *pSW); +static void metaDBWLock(SMetaDB *pDB); +static void metaDBRLock(SMetaDB *pDB); +static void metaDBULock(SMetaDB *pDB); #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) @@ -131,7 +141,8 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { tb_uid_t uid; char buf[512]; void * pBuf; - DBT key, value; + DBT key1, value1; + DBT key2, value2; SSchema *pSchema = NULL; if (pTbCfg->type == META_SUPER_TABLE) { @@ -143,19 +154,17 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { { // save table info pBuf = buf; - memset(&key, 0, sizeof(key)); - memset(&value, 0, sizeof(key)); + memset(&key1, 0, sizeof(key1)); + memset(&value1, 0, sizeof(key1)); - key.data = &uid; - key.size = sizeof(uid); + key1.data = &uid; + key1.size = sizeof(uid); metaEncodeTbInfo(&pBuf, pTbCfg); - value.data = buf; - value.size = POINTER_DISTANCE(pBuf, buf); - value.app_data = pTbCfg; - - pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key, &value, 0); + value1.data = buf; + value1.size = POINTER_DISTANCE(pBuf, buf); + value1.app_data = pTbCfg; } // save schema @@ -170,22 +179,25 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { if (pSchema) { pBuf = buf; - memset(&key, 0, sizeof(key)); - memset(&value, 0, sizeof(key)); + memset(&key2, 0, sizeof(key2)); + memset(&value2, 0, sizeof(key2)); SSchemaKey schemaKey = {uid, 0 /*TODO*/, 0}; - key.data = &schemaKey; - key.size = sizeof(schemaKey); + key2.data = &schemaKey; + key2.size = sizeof(schemaKey); SSchemaWrapper sw = {.nCols = ncols, .pSchema = pSchema}; metaEncodeSchema(&pBuf, &sw); - value.data = buf; - value.size = POINTER_DISTANCE(pBuf, buf); - - pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0); + value2.data = buf; + value2.size = POINTER_DISTANCE(pBuf, buf); } + metaDBWLock(pMeta->pDB); + pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key1, &value1, 0); + pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key2, &value2, 0); + metaDBULock(pMeta->pDB); + return 0; } @@ -234,11 +246,18 @@ static SMetaDB *metaNewDB() { return NULL; } +#if IMPL_WITH_LOCK + pthread_rwlock_init(&pDB->rwlock, NULL); +#endif + return pDB; } static void metaFreeDB(SMetaDB *pDB) { if (pDB) { +#if IMPL_WITH_LOCK + pthread_rwlock_destroy(&pDB->rwlock); +#endif free(pDB); } } @@ -467,7 +486,9 @@ STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { key.size = sizeof(uid); // Query + metaDBRLock(pDB); ret = pDB->pTbDB->get(pDB->pTbDB, NULL, &key, &value, 0); + metaDBULock(pDB); if (ret != 0) { return NULL; } @@ -496,7 +517,9 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { key.size = strlen(tbname); // Query + metaDBRLock(pDB); ret = pDB->pNameIdx->pget(pDB->pNameIdx, NULL, &key, &pkey, &pvalue, 0); + metaDBULock(pDB); if (ret != 0) { return NULL; } @@ -529,7 +552,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo key.size = sizeof(schemaKey); // Query + metaDBRLock(pDB); ret = pDB->pSchemaDB->get(pDB->pSchemaDB, NULL, &key, &value, 0); + metaDBULock(pDB); if (ret != 0) { printf("failed to query schema DB since %s================\n", db_strerror(ret)); return NULL; @@ -687,4 +712,22 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) { } else { return 0; } -} \ No newline at end of file +} + +static void metaDBWLock(SMetaDB *pDB) { +#if IMPL_WITH_LOCK + pthread_rwlock_wrlock(&(pDB->rwlock)); +#endif +} + +static void metaDBRLock(SMetaDB *pDB) { +#if IMPL_WITH_LOCK + pthread_rwlock_rdlock(&(pDB->rwlock)); +#endif +} + +static void metaDBULock(SMetaDB *pDB) { +#if IMPL_WITH_LOCK + pthread_rwlock_unlock(&(pDB->rwlock)); +#endif +}