From f7a8ef266cdaf56a63fee13691eae66b593e3ed2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 4 Nov 2021 10:30:23 +0800 Subject: [PATCH] more --- include/server/vnode/meta/impl/metaImpl.h | 2 +- source/dnode/vnode/meta/src/metaDB.c | 66 +++++++++++++++++++++-- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/include/server/vnode/meta/impl/metaImpl.h b/include/server/vnode/meta/impl/metaImpl.h index e6cf2de901..c9506bc102 100644 --- a/include/server/vnode/meta/impl/metaImpl.h +++ b/include/server/vnode/meta/impl/metaImpl.h @@ -53,7 +53,7 @@ typedef struct { // normal table options typedef struct { - SSchema* pSchame; + STSchema* pSchame; } SNTbOptions; struct STbOptions { diff --git a/source/dnode/vnode/meta/src/metaDB.c b/source/dnode/vnode/meta/src/metaDB.c index 580608e851..e4c9d8ce97 100644 --- a/source/dnode/vnode/meta/src/metaDB.c +++ b/source/dnode/vnode/meta/src/metaDB.c @@ -15,6 +15,12 @@ #include "metaDef.h" +static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema); +static void metaGetSchemaDBKey(char key[], tb_uid_t uid, int sversion); +static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid); + +#define SCHEMA_KEY_LEN (sizeof(tb_uid_t) + sizeof(int)) + #define META_OPEN_DB_IMPL(pDB, options, dir, err) \ do { \ pDB = rocksdb_open(options, dir, &err); \ @@ -115,22 +121,22 @@ int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions) { switch (pTbOptions->type) { case META_NORMAL_TABLE: // save schemaDB - rocksdb_put(pMeta->pDB->schemaDb, wopt, NULL /* TODO */, NULL /* TODO */, NULL /* TODO */, NULL /* TODO */, &err); + metaSaveSchemaDB(pMeta, uid, pTbOptions->ntbOptions.pSchame); break; case META_SUPER_TABLE: // save schemaDB - rocksdb_put(pMeta->pDB->schemaDb, wopt, NULL /* TODO */, NULL /* TODO */, NULL /* TODO */, NULL /* TODO */, &err); + metaSaveSchemaDB(pMeta, uid, pTbOptions->stbOptions.pSchema); // save mapDB (really need?) rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&uid), sizeof(uid), "", 0, &err); break; case META_CHILD_TABLE: // save tagDB - rocksdb_put(pMeta->pDB->tagDb, wopt, NULL /* TODO */, 0 /* TODO */, NULL /* TODO */, 0 /* TODO */, &err); + rocksdb_put(pMeta->pDB->tagDb, wopt, (char *)(&uid), sizeof(uid), pTbOptions->ctbOptions.tags, + kvRowLen(pTbOptions->ctbOptions.tags), &err); // save mapDB - rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&(pTbOptions->ctbOptions.suid)), sizeof(tb_uid_t), NULL /* TODO */, - 0 /* TODO */, &err); + metaSaveMapDB(pMeta, pTbOptions->ctbOptions.suid, uid); break; default: ASSERT(0); @@ -143,5 +149,55 @@ int metaSaveTableToDB(SMeta *pMeta, const STbOptions *pTbOptions) { int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { /* TODO */ + return 0; +} + +/* ------------------------ STATIC METHODS ------------------------ */ +static void metaSaveSchemaDB(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) { + char key[64]; + char pBuf[1024]; + char * ppBuf = pBuf; + size_t vsize; + char * err = NULL; + + rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create(); + + metaGetSchemaDBKey(key, uid, schemaVersion(pSchema)); + vsize = tdEncodeSchema((void **)(&ppBuf), pSchema); + rocksdb_put(pMeta->pDB->schemaDb, wopt, key, SCHEMA_KEY_LEN, pBuf, vsize, &err); + + rocksdb_writeoptions_destroy(wopt); +} + +static void metaGetSchemaDBKey(char *key, tb_uid_t uid, int sversion) { + *(tb_uid_t *)key = uid; + *(int *)POINTER_SHIFT(key, sizeof(tb_uid_t)) = sversion; +} + +static int metaSaveMapDB(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid) { + size_t vlen; + char * val; + char * err = NULL; + + rocksdb_readoptions_t *ropt = rocksdb_readoptions_create(); + val = rocksdb_get(pMeta->pDB->mapDb, ropt, (char *)(&suid), sizeof(suid), &vlen, &err); + rocksdb_readoptions_destroy(ropt); + + void *nval = malloc(vlen + sizeof(uid)); + if (nval == NULL) { + return -1; + } + + if (vlen) { + memcpy(nval, val, vlen); + } + memcpy(POINTER_SHIFT(nval, vlen), (void *)(&uid), sizeof(uid)); + + rocksdb_writeoptions_t *wopt = rocksdb_writeoptions_create(); + + rocksdb_put(pMeta->pDB->mapDb, wopt, (char *)(&suid), sizeof(suid), nval, vlen + sizeof(uid), &err); + + rocksdb_writeoptions_destroy(wopt); + return 0; } \ No newline at end of file