From fc26750281911a0930f07740c532607f6250b378 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 8 Dec 2021 17:46:46 +0800 Subject: [PATCH 01/15] more work --- source/dnode/vnode/meta/src/metaBDBImpl.c | 91 +++++++++++++++-------- 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index 9e52a7151d..ac8e9df3f8 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -42,6 +42,7 @@ static void metaCloseBDBEnv(DB_ENV *pEnv); static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName); static void metaCloseBDBDb(DB *pDB); static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf); +static void metaCloseBDBIdx(DB *pIdx); static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); @@ -103,6 +104,12 @@ int metaOpenDB(SMeta *pMeta) { void metaCloseDB(SMeta *pMeta) { if (pMeta->pDB) { + metaCloseBDBIdx(pMeta->pDB->pCtbIdx); + metaCloseBDBIdx(pMeta->pDB->pNtbIdx); + metaCloseBDBIdx(pMeta->pDB->pStbIdx); + metaCloseBDBIdx(pMeta->pDB->pNameIdx); + metaCloseBDBDb(pMeta->pDB->pSchemaDB); + metaCloseBDBDb(pMeta->pDB->pTbDB); metaCloseBDBEnv(pMeta->pDB->pEvn); metaFreeDB(pMeta->pDB); pMeta->pDB = NULL; @@ -110,7 +117,58 @@ void metaCloseDB(SMeta *pMeta) { } int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { - // TODO + tb_uid_t uid; + STSchema *pSchema = NULL; + + if (pTbCfg->type == META_SUPER_TABLE) { + uid = pTbCfg->stbCfg.suid; + } else { + uid = metaGenerateUid(pMeta); + } + + { + // save table info + char buf[512]; + void *pBuf = buf; + DBT key = {0}; + DBT value = {0}; + + key.data = &uid; + key.size = sizeof(uid); + + // metaEncodeTbInfo(&pBuf, pTbCfg); + + value.data = buf; + value.size = POINTER_DISTANCE(pBuf, buf); + + pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key, &value, 0); + } + + // save schema + if (pTbCfg->type == META_SUPER_TABLE) { + pSchema = pTbCfg->stbCfg.pSchema; + } else if (pTbCfg->type == META_NORMAL_TABLE) { + pSchema = pTbCfg->ntbCfg.pSchema; + } + + if (pSchema) { + char buf[512]; + void *pBuf = buf; + DBT key = {0}; + DBT value = {0}; + + // TODO + key.data = NULL; + key.size = 0; + + tdEncodeSchema(&pBuf, pSchema); + + value.data = buf; + value.size = POINTER_DISTANCE(pBuf, buf); + + pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0); + } + return 0; } @@ -169,7 +227,7 @@ static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName) { int ret; DB *pDB; - ret = db_create(&((pDB)), (pEnv), 0); + ret = db_create(&(pDB), pEnv, 0); if (ret != 0) { BDB_PERR("Failed to create META DB", ret); return -1; @@ -236,35 +294,6 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey } #if 0 -typedef struct { - tb_uid_t uid; - int32_t sver; -} SSchemaKey; - - -static SMetaDB *metaNewDB(); -static void metaFreeDB(SMetaDB *pDB); -static int metaCreateDBEnv(SMetaDB *pDB, const char *path); -static void metaDestroyDBEnv(SMetaDB *pDB); -static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey); -static void * metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey); -static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey); -static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey); -static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema); -static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); -static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); -static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg); - -#define META_ASSOCIATE_IDX(pDB, pIdx, cbf) \ - do { \ - int ret = (pDB)->associate((pDB), NULL, (pIdx), (cbf), 0); \ - if (ret != 0) { \ - P_ERROR("Failed to associate META DB", ret); \ - metaCloseDB(pMeta); \ - } \ - } while (0) - - int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { char buf[512]; void * pBuf; From a1e281b706f710c72d60dd24a8344ce4a489396a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 8 Dec 2021 18:39:15 +0800 Subject: [PATCH 02/15] Implement index callback function --- source/dnode/vnode/meta/src/metaBDBImpl.c | 218 ++++++++-------------- 1 file changed, 79 insertions(+), 139 deletions(-) diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index ac8e9df3f8..31fe69d595 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -20,6 +20,11 @@ #include "tcoding.h" #include "thash.h" +typedef struct { + tb_uid_t uid; + int32_t sver; +} SSchemaKey; + struct SMetaDB { // DB DB *pTbDB; @@ -47,6 +52,9 @@ static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); +static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); +static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); +static void metaClearTbCfg(STbCfg *pTbCfg); #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) @@ -118,6 +126,9 @@ void metaCloseDB(SMeta *pMeta) { int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { tb_uid_t uid; + char buf[512]; + void * pBuf; + DBT key, value; STSchema *pSchema = NULL; if (pTbCfg->type == META_SUPER_TABLE) { @@ -128,15 +139,14 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { { // save table info - char buf[512]; - void *pBuf = buf; - DBT key = {0}; - DBT value = {0}; + pBuf = buf; + memset(&key, 0, sizeof(key)); + memset(&value, 0, sizeof(key)); key.data = &uid; key.size = sizeof(uid); - // metaEncodeTbInfo(&pBuf, pTbCfg); + metaEncodeTbInfo(&pBuf, pTbCfg); value.data = buf; value.size = POINTER_DISTANCE(pBuf, buf); @@ -152,14 +162,13 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { } if (pSchema) { - char buf[512]; - void *pBuf = buf; - DBT key = {0}; - DBT value = {0}; + pBuf = buf; + memset(&key, 0, sizeof(key)); + memset(&value, 0, sizeof(key)); + SSchemaKey schemaKey = {uid, schemaVersion(pSchema)}; - // TODO - key.data = NULL; - key.size = 0; + key.data = &schemaKey; + key.size = sizeof(schemaKey); tdEncodeSchema(&pBuf, pSchema); @@ -274,129 +283,70 @@ static void metaCloseBDBIdx(DB *pIdx) { } static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO + char *name; + + memset(pSKey, 0, sizeof(*pSKey)); + taosDecodeString(pValue->data, &name); + + pSKey->data = name; + pSKey->size = strlen(name); + pSKey->flags = DB_DBT_APPMALLOC; + return 0; } static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; + STbCfg tbCfg = {0}; + + metaDecodeTbInfo(pValue->data, &tbCfg); + if (tbCfg.type == META_SUPER_TABLE) { + memset(pSKey, 0, sizeof(*pSKey)); + + pSKey->data = pKey->data; + pSKey->size = pKey->size; + + metaClearTbCfg(&tbCfg); + + return 0; + } else { + return DB_DONOTINDEX; + } } static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; + STbCfg tbCfg = {0}; + + metaDecodeTbInfo(pValue->data, &tbCfg); + if (tbCfg.type == META_NORMAL_TABLE) { + memset(pSKey, 0, sizeof(*pSKey)); + + pSKey->data = pKey->data; + pSKey->size = pKey->size; + + metaClearTbCfg(&tbCfg); + + return 0; + } else { + return DB_DONOTINDEX; + } } static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; -} + STbCfg tbCfg = {0}; -#if 0 -int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { - char buf[512]; - void * pBuf; - DBT key = {0}; - DBT value = {0}; - SSchemaKey schemaKey; - tb_uid_t uid; + metaDecodeTbInfo(pValue->data, &tbCfg); + if (tbCfg.type == META_CHILD_TABLE) { + memset(pSKey, 0, sizeof(*pSKey)); - if (pTbCfg->type == META_SUPER_TABLE) { - // Handle SUPER table - uid = pTbCfg->stbCfg.suid; + pSKey->data = pKey->data; + pSKey->size = pKey->size; - // Same table info - metaSaveTbInfo(pMeta->pDB->pStbDB, uid, pTbCfg); + metaClearTbCfg(&tbCfg); - // save schema - metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema); - - { - // Create a super table DB and corresponding index DB - DB *pStbDB; - DB *pStbIdxDB; - - META_OPEN_DB(pStbDB, pMeta->pDB->pEvn, "meta.db"); - - META_OPEN_DB(pStbIdxDB, pMeta->pDB->pEvn, "index.db"); - - // TODO META_ASSOCIATE_IDX(); - } - } else if (pTbCfg->type == META_CHILD_TABLE) { - // Handle CHILD table - uid = metaGenerateUid(pMeta); - - DB *pCTbDB = taosHashGet(pMeta->pDB->pCtbMap, &(pTbCfg->ctbCfg.suid), sizeof(pTbCfg->ctbCfg.suid)); - if (pCTbDB == NULL) { - ASSERT(0); - } - - metaSaveTbInfo(pCTbDB, uid, pTbCfg); - - } else if (pTbCfg->type == META_NORMAL_TABLE) { - // Handle NORMAL table - uid = metaGenerateUid(pMeta); - - metaSaveTbInfo(pMeta->pDB->pNtbDB, uid, pTbCfg); - - metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema); + return 0; } else { - ASSERT(0); + return DB_DONOTINDEX; } - - return 0; -} - -int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { - // TODO -} - -/* ------------------------ STATIC METHODS ------------------------ */ -static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey) { - int tsize = 0; - - tsize += taosEncodeFixedU64(buf, pSchemaKey->uid); - tsize += taosEncodeFixedI32(buf, pSchemaKey->sver); - - return tsize; -} - -static void *metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey) { - buf = taosDecodeFixedU64(buf, &(pSchemaKey->uid)); - buf = taosDecodeFixedI32(buf, &(pSchemaKey->sver)); - - return buf; -} - -static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; -} - -static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; -} - -static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) { - SSchemaKey skey; - char buf[256]; - void * pBuf = buf; - DBT key = {0}; - DBT value = {0}; - - skey.uid = uid; - skey.sver = schemaVersion(pSchema); - - key.data = &skey; - key.size = sizeof(skey); - - tdEncodeSchema(&pBuf, pSchema); - value.data = buf; - value.size = POINTER_DISTANCE(pBuf, buf); - - pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0); } static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { @@ -405,6 +355,7 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { tsize += taosEncodeString(buf, pTbCfg->name); tsize += taosEncodeFixedU32(buf, pTbCfg->ttl); tsize += taosEncodeFixedU32(buf, pTbCfg->keep); + tsize += taosEncodeFixedU8(buf, pTbCfg->type); if (pTbCfg->type == META_SUPER_TABLE) { tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema); @@ -420,10 +371,10 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { } static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { - // TODO buf = taosDecodeString(buf, &(pTbCfg->name)); buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl)); buf = taosDecodeFixedU32(buf, &(pTbCfg->keep)); + buf = taosDecodeFixedU8(buf, &(pTbCfg->type)); if (pTbCfg->type == META_SUPER_TABLE) { buf = tdDecodeSchema(buf, &(pTbCfg->stbCfg.pTagSchema)); @@ -437,22 +388,11 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { return buf; } -static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg) { - DBT key = {0}; - DBT value = {0}; - char buf[512]; - void *pBuf = buf; - - key.data = &uid; - key.size = sizeof(uid); - - metaEncodeTbInfo(&pBuf, pTbCfg); - - value.data = buf; - value.size = POINTER_DISTANCE(pBuf, buf); - - pDB->put(pDB, NULL, &key, &value, 0); - - return 0; -} -#endif \ No newline at end of file +static void metaClearTbCfg(STbCfg *pTbCfg) { + tfree(pTbCfg->name); + if (pTbCfg->type == META_SUPER_TABLE) { + tdFreeSchema(pTbCfg->stbCfg.pTagSchema); + } else if (pTbCfg->type == META_CHILD_TABLE) { + tfree(pTbCfg->ctbCfg.pTag); + } +} \ No newline at end of file From d8d6c04fd50eb4302c36700acc154efc245d5123 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Dec 2021 16:18:35 +0800 Subject: [PATCH 03/15] modify test case --- source/libs/index/src/index_fst.c | 14 +- source/libs/index/test/indexTests.cpp | 184 ++++++++++++++++++-------- 2 files changed, 138 insertions(+), 60 deletions(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 40e35306a2..457b5422a4 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -478,6 +478,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { return 0; } FstSlice *slice = &node->data; + uint8_t *data = fstSliceData(slice, NULL); uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size @@ -485,7 +486,6 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { - (i * oSizes) - oSizes; - uint8_t *data = fstSliceData(slice, NULL); return unpackUint64(data + at, oSizes); } @@ -555,6 +555,7 @@ Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, Pack uint64_t at = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) + - 1 // pack size - fstStateTotalTransSize(s, version, sizes, nTrans) - (nTrans * oSizes) - oSizes; @@ -587,7 +588,8 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { FstSlice t = fstSliceCopy(slice, start, end - 1); int32_t len = 0; uint8_t *data = fstSliceData(&t, &len); - for(int i = 0; i < len; i++) { + int i = 0; + for(; i < len; i++) { //uint8_t v = slice->data[slice->start + i]; ////slice->data[slice->start + i]; uint8_t v = data[i]; @@ -595,6 +597,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { return node->nTrans - i - 1; // bug } } + if (i == len) { *null = true; } } } @@ -774,7 +777,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) { if (NULL == b) { return b; } - b->wrt = fstCountingWriterCreate(w, false); + b->wrt = fstCountingWriterCreate(w, false); b->unfinished = fstUnFinishedNodesCreate(); b->registry = fstRegistryCreate(10000, 2) ; b->last = fstSliceCreate(NULL, 0); @@ -857,6 +860,7 @@ OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) { return OutOfOrdered; } // deep copy or not + fstSliceDestroy(&b->last); b->last = fstSliceCopy(&bs, input->start, input->end); } return Ordered; @@ -1007,8 +1011,7 @@ Fst* fstCreate(FstSlice *slice) { uint64_t fstLen; len -= sizeof(fstLen); taosDecodeFixedU64(buf + len, &fstLen); - //TODO(validat root addr) - // + //TODO(validate root addr) Fst *fst= (Fst *)calloc(1, sizeof(Fst)); if (fst == NULL) { return NULL; } @@ -1023,6 +1026,7 @@ Fst* fstCreate(FstSlice *slice) { fst->meta->len = fstLen; fst->meta->checkSum = checkSum; fst->data = slice; + return fst; FST_CREAT_FAILED: diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 9135a7a173..86f19e8044 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -2,13 +2,79 @@ #include #include #include "index.h" +#include "tutil.h" #include "indexInt.h" #include "index_fst.h" #include "index_fst_util.h" #include "index_fst_counting_writer.h" +class FstWriter { + public: + FstWriter() { + _b = fstBuilderCreate(NULL, 0); + } + bool Put(const std::string &key, uint64_t val) { + FstSlice skey = fstSliceCreate((uint8_t *)key.c_str(), key.size()); + bool ok = fstBuilderInsert(_b, skey, val); + fstSliceDestroy(&skey); + return ok; + } + ~FstWriter() { + fstBuilderFinish(_b); + fstBuilderDestroy(_b); + } + private: + FstBuilder *_b; +}; +class FstReadMemory { + public: + FstReadMemory(size_t size) { + _w = fstCountingWriterCreate(NULL, true); + _size = size; + memset((void *)&_s, 0, sizeof(_s)); + } + bool init() { + char *buf = (char *)calloc(1, sizeof(char) * _size); + int nRead = fstCountingWriterRead(_w, (uint8_t *)buf, _size); + if (nRead <= 0) { return false; } + _size = nRead; + _s = fstSliceCreate((uint8_t *)buf, _size); + _fst = fstCreate(&_s); + free(buf); + return _fst != NULL; + } + bool Get(const std::string &key, uint64_t *val) { + FstSlice skey = fstSliceCreate((uint8_t *)key.c_str(), key.size()); + bool ok = fstGet(_fst, &skey, val); + fstSliceDestroy(&skey); + return ok; + } + bool GetWithTimeCostUs(const std::string &key, uint64_t *val, uint64_t *elapse) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Get(key, val); + int64_t e = taosGetTimestampUs(); + *elapse = e - s; + return ok; + } + // add later + bool Search(const std::string &key, std::vector &result) { + return true; + } + + ~FstReadMemory() { + fstCountingWriterDestroy(_w); + fstSliceDestroy(&_s); + } + + private: + FstCountingWriter *_w; + Fst *_fst; + FstSlice _s; + size_t _size; + +}; //TEST(IndexTest, index_create_test) { // SIndexOpts *opts = indexOptsCreate(); @@ -62,69 +128,77 @@ // // //} + + +void Performance_fstWriteRecords(FstWriter *b) { + std::string str("aa"); + for (int i = 0; i < 26; i++) { + str[0] = 'a' + i; + str.resize(2); + for(int j = 0; j < 26; j++) { + str[1] = 'a' + j; + str.resize(2); + for (int k = 0; k < 10; k++) { + str.push_back('a'); + b->Put(str, k); + } + } + } +} + +void Performance_fstReadRecords(FstReadMemory *m) { + std::string str("a"); + for (int i = 0; i < 500; i++) { + //std::string str("aa"); + str.push_back('a'); + uint64_t out, cost; + bool ok = m->GetWithTimeCostUs(str, &out, &cost); + if (ok == true) { + printf("success to get (%s, %" PRId64"), time cost: %" PRId64")\n", str.c_str(), out, cost); + } else { + printf("failed to get(%s)\n", str.c_str()); + } + } +} + int main(int argc, char** argv) { // test write - FstBuilder *b = fstBuilderCreate(NULL, 0); - { - std::string str("aaa"); - FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); - Output val = 1; - fstBuilderInsert(b, key, val); - } - - //std::string str1("bcd"); - //FstSlice key1 = fstSliceCreate((uint8_t *)str1.c_str(), str1.size()); - //Output val2 = 10; // - + FstWriter *fw = new FstWriter; { - - for (size_t i = 1; i < 26; i++) { - std::string str("aaa"); - str[2] = 'a' + i ; - FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); - Output val = 0; - fstBuilderInsert(b, key, val); - } - - } - fstBuilderFinish(b); - fstBuilderDestroy(b); - - - char buf[64 * 1024] = {0}; - - FstSlice s; - - FstCountingWriter *w = fstCountingWriterCreate(NULL, true); - int nRead = fstCountingWriterRead(w, (uint8_t *)buf, sizeof(buf)); - assert(nRead <= sizeof(buf)); - s = fstSliceCreate((uint8_t *)buf, nRead); - fstCountingWriterDestroy(w); - - - // test reader - - - Fst *fst = fstCreate(&s); - { - std::string str("aax"); - uint64_t out; - - - FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); - bool ok = fstGet(fst, &key, &out); - if (ok == true) { - printf("val = %d\n", out); - //indexInfo("Get key-value success, %s, %d", str.c_str(), out); - } else { - //indexError("Get key-value failed, %s", str.c_str()); + std::string key("ab"); + int64_t val = 100; + for (int i = 0; i < 26; i++) { + key.push_back('a' + i); + fw->Put(key, val++); } } - fstSliceDestroy(&s); - + delete fw; + FstReadMemory *m = new FstReadMemory(1024 * 64); + if (m->init() == false) { + std::cout << "init readMemory failed" << std::endl; + } + + { + std::string key("ab"); + uint64_t out; + if (m->Get(key, &out)) { + printf("success to get (%s, %" PRId64")\n", key.c_str(), out); + } else { + printf("failed to get(%s)\n", key.c_str()); + } + for (int i = 0; i < 26; i++) { + key.push_back('a' + i); + if (m->Get(key, &out)) { + printf("success to get (%s, %" PRId64")\n", key.c_str(), out); + } else { + printf("failed to get(%s)\n", key.c_str()); + } + } + } + return 1; } From 5f4be73d7ec2c95d7e2566dbc166d6f7eb1cf5ea Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Dec 2021 17:11:45 +0800 Subject: [PATCH 04/15] [TD-11894] rename MD5 func name --- include/util/tmd5.h | 8 ++++---- include/util/tutil.h | 8 ++++---- source/libs/transport/src/rpcMain.c | 24 ++++++++++++------------ source/util/src/tmd5.c | 10 +++++----- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/include/util/tmd5.h b/include/util/tmd5.h index 329f4acf11..f8114ad57b 100644 --- a/include/util/tmd5.h +++ b/include/util/tmd5.h @@ -30,10 +30,10 @@ typedef struct { uint32_t buf[4]; /* scratch buffer */ uint8_t in[64]; /* input buffer */ uint8_t digest[16]; /* actual digest after MD5Final call */ -} MD5_CTX; +} T_MD5_CTX; -void MD5Init(MD5_CTX *mdContext); -void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen); -void MD5Final(MD5_CTX *mdContext); +void tMD5Init(T_MD5_CTX *mdContext); +void tMD5Update(T_MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen); +void tMD5Final(T_MD5_CTX *mdContext); #endif /*_TD_UTIL_MD5_H*/ diff --git a/include/util/tutil.h b/include/util/tutil.h index 8dbcb7e8d5..0bb8f0fbfc 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -45,10 +45,10 @@ char * taosIpStr(uint32_t ipInt); uint32_t ip2uint(const char *const ip_addr); static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { - MD5_CTX context; - MD5Init(&context); - MD5Update(&context, inBuf, (unsigned int)inLen); - MD5Final(&context); + T_MD5_CTX context; + tMD5Init(&context); + tMD5Update(&context, inBuf, (unsigned int)inLen); + tMD5Final(&context); memcpy(target, context.digest, TSDB_KEY_LEN); } diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index a2041c76fc..fbdc94e800 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -1527,14 +1527,14 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { } static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) { - MD5_CTX context; + T_MD5_CTX context; int ret = -1; - MD5Init(&context); - MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); - MD5Update(&context, (uint8_t *)pMsg, msgLen); - MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); - MD5Final(&context); + tMD5Init(&context); + tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); + tMD5Update(&context, (uint8_t *)pMsg, msgLen); + tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); + tMD5Final(&context); if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; @@ -1542,13 +1542,13 @@ static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) { } static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) { - MD5_CTX context; + T_MD5_CTX context; - MD5Init(&context); - MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); - MD5Update(&context, (uint8_t *)pMsg, msgLen); - MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); - MD5Final(&context); + tMD5Init(&context); + tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); + tMD5Update(&context, (uint8_t *)pMsg, msgLen); + tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); + tMD5Final(&context); memcpy(pAuth, context.digest, sizeof(context.digest)); } diff --git a/source/util/src/tmd5.c b/source/util/src/tmd5.c index 9cc4b3b9d5..807f3c8122 100644 --- a/source/util/src/tmd5.c +++ b/source/util/src/tmd5.c @@ -84,8 +84,8 @@ static uint8_t PADDING[64] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x /* The routine MD5Init initializes the message-digest context mdContext. All fields are set to zero. */ -void MD5Init(MD5_CTX *mdContext) { - memset(mdContext, 0, sizeof(MD5_CTX)); +void tMD5Init(T_MD5_CTX *mdContext) { + memset(mdContext, 0, sizeof(T_MD5_CTX)); /* Load magic initialization constants. */ mdContext->buf[0] = (uint32_t)0x67452301; @@ -98,7 +98,7 @@ void MD5Init(MD5_CTX *mdContext) { account for the presence of each of the characters inBuf[0..inLen-1] in the message whose digest is being computed. */ -void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) { +void tMD5Update(T_MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) { uint32_t in[16]; int mdi; unsigned int i, ii; @@ -129,7 +129,7 @@ void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) { /* The routine MD5Final terminates the message-digest computation and ends with the desired message digest in mdContext->digest[0...15]. */ -void MD5Final(MD5_CTX *mdContext) { +void tMD5Final(T_MD5_CTX *mdContext) { uint32_t in[16]; int mdi; unsigned int i, ii; @@ -144,7 +144,7 @@ void MD5Final(MD5_CTX *mdContext) { /* pad out to 56 mod 64 */ padLen = (mdi < 56) ? (56 - mdi) : (120 - mdi); - MD5Update(mdContext, PADDING, padLen); + tMD5Update(mdContext, PADDING, padLen); /* append length in bits and transform */ for (i = 0, ii = 0; i < 14; i++, ii += 4) From 8faba0f5ccd1f5e4449f826e4ba85b0a1f027fc4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 9 Dec 2021 17:21:45 +0800 Subject: [PATCH 05/15] more work --- .../dnode/vnode/impl/test/vnodeApiTests.cpp | 2 +- source/dnode/vnode/meta/src/metaBDBImpl.c | 64 +++++++++---------- 2 files changed, 31 insertions(+), 35 deletions(-) diff --git a/source/dnode/vnode/impl/test/vnodeApiTests.cpp b/source/dnode/vnode/impl/test/vnodeApiTests.cpp index 493fe4448b..e65363d689 100644 --- a/source/dnode/vnode/impl/test/vnodeApiTests.cpp +++ b/source/dnode/vnode/impl/test/vnodeApiTests.cpp @@ -45,7 +45,7 @@ static SKVRow createBasicTag() { tdInitKVRowBuilder(&rb); - for (int i = 10; i < 12; i++) { + for (int i = 0; i < 2; i++) { void *pVal = malloc(sizeof(VarDataLenT) + strlen("foo")); varDataLen(pVal) = strlen("foo"); memcpy(varDataVal(pVal), "foo", strlen("foo")); diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index 31fe69d595..48fa73a8f2 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -44,9 +44,9 @@ static SMetaDB *metaNewDB(); static void metaFreeDB(SMetaDB *pDB); static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path); static void metaCloseBDBEnv(DB_ENV *pEnv); -static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName); +static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup); static void metaCloseBDBDb(DB *pDB); -static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf); +static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup); static void metaCloseBDBIdx(DB *pIdx); static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); @@ -76,33 +76,33 @@ int metaOpenDB(SMeta *pMeta) { } // Open DBs - if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db") < 0) { + if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db", false) < 0) { metaCloseDB(pMeta); return -1; } - if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db") < 0) { + if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db", false) < 0) { metaCloseDB(pMeta); return -1; } // Open Indices - if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaNameIdxCb) < 0) { + if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) { metaCloseDB(pMeta); return -1; } - if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaStbIdxCb) < 0) { + if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "stb.index", pDB->pTbDB, &metaStbIdxCb, false) < 0) { metaCloseDB(pMeta); return -1; } - if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaNtbIdxCb) < 0) { + if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "ntb.index", pDB->pTbDB, &metaNtbIdxCb, false) < 0) { metaCloseDB(pMeta); return -1; } - if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaCtbIdxCb) < 0) { + if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "ctb.index", pDB->pTbDB, &metaCtbIdxCb, true) < 0) { metaCloseDB(pMeta); return -1; } @@ -150,6 +150,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { value.data = buf; value.size = POINTER_DISTANCE(pBuf, buf); + value.app_data = pTbCfg; pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key, &value, 0); } @@ -232,7 +233,7 @@ static void metaCloseBDBEnv(DB_ENV *pEnv) { } } -static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName) { +static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) { int ret; DB *pDB; @@ -242,6 +243,14 @@ static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName) { return -1; } + if (isDup) { + ret = pDB->set_flags(pDB, DB_DUPSORT); + if (ret != 0) { + BDB_PERR("Failed to set DB flags", ret); + return -1; + } + } + ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0); if (ret) { BDB_PERR("Failed to open META DB", ret); @@ -259,11 +268,11 @@ static void metaCloseBDBDb(DB *pDB) { } } -static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf) { +static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup) { DB *pIdx; int ret; - if (metaOpenBDBDb(ppIdx, pEnv, pFName) < 0) { + if (metaOpenBDBDb(ppIdx, pEnv, pFName, isDup) < 0) { return -1; } @@ -283,30 +292,24 @@ static void metaCloseBDBIdx(DB *pIdx) { } static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - char *name; + STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); memset(pSKey, 0, sizeof(*pSKey)); - taosDecodeString(pValue->data, &name); - pSKey->data = name; - pSKey->size = strlen(name); - pSKey->flags = DB_DBT_APPMALLOC; + pSKey->data = pTbCfg->name; + pSKey->size = strlen(pTbCfg->name); return 0; } static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - STbCfg tbCfg = {0}; + STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); - metaDecodeTbInfo(pValue->data, &tbCfg); - if (tbCfg.type == META_SUPER_TABLE) { + if (pTbCfg->type == META_SUPER_TABLE) { memset(pSKey, 0, sizeof(*pSKey)); - pSKey->data = pKey->data; pSKey->size = pKey->size; - metaClearTbCfg(&tbCfg); - return 0; } else { return DB_DONOTINDEX; @@ -314,17 +317,13 @@ static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey } static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - STbCfg tbCfg = {0}; + STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); - metaDecodeTbInfo(pValue->data, &tbCfg); - if (tbCfg.type == META_NORMAL_TABLE) { + if (pTbCfg->type == META_NORMAL_TABLE) { memset(pSKey, 0, sizeof(*pSKey)); - pSKey->data = pKey->data; pSKey->size = pKey->size; - metaClearTbCfg(&tbCfg); - return 0; } else { return DB_DONOTINDEX; @@ -332,17 +331,14 @@ static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey } static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - STbCfg tbCfg = {0}; + STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); - metaDecodeTbInfo(pValue->data, &tbCfg); - if (tbCfg.type == META_CHILD_TABLE) { + if (pTbCfg->type == META_CHILD_TABLE) { + // Set index key memset(pSKey, 0, sizeof(*pSKey)); - pSKey->data = pKey->data; pSKey->size = pKey->size; - metaClearTbCfg(&tbCfg); - return 0; } else { return DB_DONOTINDEX; From 516cddca72a3c250cc7f177ae54de8bc4e92183f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 9 Dec 2021 17:31:39 +0800 Subject: [PATCH 06/15] create 1000000 tables --- source/dnode/vnode/impl/test/vnodeApiTests.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/impl/test/vnodeApiTests.cpp b/source/dnode/vnode/impl/test/vnodeApiTests.cpp index e65363d689..df784181b7 100644 --- a/source/dnode/vnode/impl/test/vnodeApiTests.cpp +++ b/source/dnode/vnode/impl/test/vnodeApiTests.cpp @@ -120,7 +120,7 @@ TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) { { // Create some child tables - int ntables = 100000; + int ntables = 1000000; int batch = 10; for (int i = 0; i < ntables / batch; i++) { SArray *pMsgs = (SArray *)taosArrayInit(batch, sizeof(SRpcMsg *)); From cf5e188d06d6178afafde4d327e73d928cc426de Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Dec 2021 17:39:11 +0800 Subject: [PATCH 07/15] add test case --- source/libs/index/test/indexTests.cpp | 29 +++++++++++++++++---------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 86f19e8044..475775c01e 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -132,13 +132,13 @@ class FstReadMemory { void Performance_fstWriteRecords(FstWriter *b) { std::string str("aa"); - for (int i = 0; i < 26; i++) { + for (int i = 0; i < 100; i++) { str[0] = 'a' + i; str.resize(2); - for(int j = 0; j < 26; j++) { + for(int j = 0; j < 100; j++) { str[1] = 'a' + j; str.resize(2); - for (int k = 0; k < 10; k++) { + for (int k = 0; k < 100; k++) { str.push_back('a'); b->Put(str, k); } @@ -161,16 +161,17 @@ void Performance_fstReadRecords(FstReadMemory *m) { } } -int main(int argc, char** argv) { - // test write - // +void validateFst() { + r + int val = 100; + int count = 100; FstWriter *fw = new FstWriter; { std::string key("ab"); int64_t val = 100; - for (int i = 0; i < 26; i++) { + for (int i = 0; i < count; i++) { key.push_back('a' + i); - fw->Put(key, val++); + fw->Put(key, val + i); } } delete fw; @@ -188,16 +189,22 @@ int main(int argc, char** argv) { } else { printf("failed to get(%s)\n", key.c_str()); } - for (int i = 0; i < 26; i++) { + for (int i = 0; i < count; i++) { key.push_back('a' + i); - if (m->Get(key, &out)) { + if (m->Get(key, &out) ) { + assert(val + i == out); printf("success to get (%s, %" PRId64")\n", key.c_str(), out); } else { printf("failed to get(%s)\n", key.c_str()); } } } - + delete m; + +} +int main(int argc, char** argv) { + // test write + validateFst(); return 1; } From dad17aa71e15717ad2779cfba9cf53dc15201653 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 9 Dec 2021 18:10:07 +0800 Subject: [PATCH 08/15] more work --- source/dnode/vnode/meta/src/metaBDBImpl.c | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index 48fa73a8f2..dbb26143cc 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -332,12 +332,24 @@ static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); + DBT * pDbt; if (pTbCfg->type == META_CHILD_TABLE) { + pDbt = calloc(2, sizeof(DBT)); + + // First key is suid + pDbt[0].data = &(pTbCfg->ctbCfg.suid); + pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); + + // Second key is the first tag + pDbt[1].data = NULL; + pDbt[1].size = 0; + // Set index key memset(pSKey, 0, sizeof(*pSKey)); - pSKey->data = pKey->data; - pSKey->size = pKey->size; + pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC; + pSKey->data = pDbt; + pSKey->size = 2; return 0; } else { From d9662eb67fffa8e5785f024300da7063f705f0ae Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 9 Dec 2021 18:21:30 +0800 Subject: [PATCH 09/15] more --- source/dnode/vnode/meta/src/metaBDBImpl.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index dbb26143cc..3c1ccc72dc 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -342,8 +342,9 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); // Second key is the first tag - pDbt[1].data = NULL; - pDbt[1].size = 0; + void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, 0); + pDbt[1].data = varDataVal(pTagVal); + pDbt[1].size = varDataLen(pTagVal); // Set index key memset(pSKey, 0, sizeof(*pSKey)); From cf93f4c23952f41230817f8f2815a305152f08fc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 9 Dec 2021 18:32:38 +0800 Subject: [PATCH 10/15] add unit test for wal --- include/libs/wal/wal.h | 35 +++-- include/util/tfile.h | 2 + source/libs/wal/CMakeLists.txt | 5 + source/libs/wal/inc/walInt.h | 66 ++++++++- source/libs/wal/src/walIndex.c | 27 ++-- source/libs/wal/src/walMeta.c | 193 +++++++++++++++++++++++++++ source/libs/wal/src/walMgmt.c | 55 +++----- source/libs/wal/src/walRead.c | 48 ++++++- source/libs/wal/src/walWrite.c | 171 ++++++++++++++++-------- source/libs/wal/test/CMakeLists.txt | 20 +++ source/libs/wal/test/walMetaTest.cpp | 157 ++++++++++++++++++++++ source/libs/wal/test/walTests.cpp | 137 ------------------- 12 files changed, 662 insertions(+), 254 deletions(-) create mode 100644 source/libs/wal/src/walMeta.c create mode 100644 source/libs/wal/test/CMakeLists.txt create mode 100644 source/libs/wal/test/walMetaTest.cpp delete mode 100644 source/libs/wal/test/walTests.cpp diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b514648bbd..a72765583e 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -55,12 +55,14 @@ typedef struct { uint32_t signature; uint32_t cksumHead; uint32_t cksumBody; - //char cont[]; + char cont[]; } SWalHead; typedef struct { int32_t vgId; int32_t fsyncPeriod; // millisecond + int32_t rollPeriod; + int64_t segSize; EWalType walLevel; // wal level } SWalCfg; @@ -87,10 +89,14 @@ typedef struct SWal { // cfg int32_t vgId; int32_t fsyncPeriod; // millisecond - int32_t fsyncSeq; int32_t rollPeriod; // second int64_t segSize; + int64_t rtSize; EWalType level; + //total size + int64_t totSize; + //fsync seq + int32_t fsyncSeq; //reference int64_t refId; //current tfd @@ -98,25 +104,32 @@ typedef struct SWal { int64_t curIdxTfd; //current version int64_t curVersion; - int64_t curLogOffset; + //current file version - int64_t curFileFirstVersion; - int64_t curFileLastVersion; - //wal fileset version + //int64_t curFileFirstVersion; + //int64_t curFileLastVersion; + + //wal lifecycle int64_t firstVersion; int64_t snapshotVersion; + int64_t commitVersion; int64_t lastVersion; - int64_t lastFileName; + + //last file + //int64_t lastFileName; + //roll status int64_t lastRollSeq; - int64_t lastFileWriteSize; + //int64_t lastFileWriteSize; + + //file set + int32_t fileCursor; + SArray* fileInfoSet; //ctl int32_t curStatus; pthread_mutex_t mutex; //path char path[WAL_PATH_LEN]; - //file set - SArray* fileSet; //reusable write head SWalHead head; } SWal; // WAL HANDLE @@ -133,7 +146,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write -int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen); +int64_t walWrite(SWal *, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen); void walFsync(SWal *, bool force); // apis for lifecycle management diff --git a/include/util/tfile.h b/include/util/tfile.h index 3d0e2177ac..af4c19e7d1 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -16,6 +16,8 @@ #ifndef _TD_UTIL_FILE_H #define _TD_UTIL_FILE_H +#include "os.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index e5697415f1..4d2dd97c87 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -8,6 +8,11 @@ target_include_directories( target_link_libraries( wal + PUBLIC cjson PUBLIC os PUBLIC util ) + +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 285d7e2576..ae655d61da 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -23,9 +23,73 @@ extern "C" { #endif -int walGetFile(SWal* pWal, int32_t version); +//meta section begin +typedef struct WalFileInfo { + int64_t firstVer; + int64_t lastVer; + int64_t createTs; + int64_t closeTs; + int64_t fileSize; +} WalFileInfo; + +static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { + WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; + WalFileInfo* pInfoRight = (WalFileInfo*)pRight; + return compareInt64Val(&pInfoLeft->firstVer, &pInfoRight->firstVer); +} + +static inline int64_t walGetLastFileSize(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return pInfo->fileSize; +} + +static inline int64_t walGetLastFileFirstVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileFirstVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileLastVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileOffset(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + return pInfo->fileSize; +} + +static inline bool walCurFileClosed(SWal* pWal) { + return taosArrayGetSize(pWal->fileInfoSet) != pWal->fileCursor; +} + +static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { + return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); +} + +static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { + return sprintf(buf, "%s/%" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer); +} + +static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) { + return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); +} + +int walReadMeta(SWal* pWal); +int walWriteMeta(SWal* pWal); +int walRollFileInfo(SWal* pWal); + +char* walFileInfoSerialize(SWal* pWal); +SArray* walFileInfoDeserialize(const char* bytes); +//meta section end int64_t walGetSeq(); +int walSeekVer(SWal *pWal, int64_t ver); +int walRoll(SWal *pWal); #ifdef __cplusplus } diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index 1aa64b34b5..b4d66226d6 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int64_t logTfd = pWal->curLogTfd; //seek position - int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE; + int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; code = tfLseek(idxTfd, offset, SEEK_SET); if(code != 0) { @@ -43,7 +43,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { if (code != 0) { } - pWal->curLogOffset = readBuf[1]; + /*pWal->curLogOffset = readBuf[1];*/ pWal->curVersion = ver; return code; } @@ -60,27 +60,27 @@ static int walChangeFile(SWal *pWal, int64_t ver) { if(code != 0) { //TODO } + WalFileInfo tmpInfo; + tmpInfo.firstVer = ver; //bsearch in fileSet - int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); + WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); - int64_t fname = *pRet; - if(fname < pWal->lastFileName) { + int64_t fileFirstVer = pRet->firstVer; + //closed + if(taosArrayGetLast(pWal->fileInfoSet) != pRet) { pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE; - pWal->curFileLastVersion = pRet[1]-1; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenRead(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenRead(fnameStr); } else { pWal->curStatus |= WAL_CUR_FILE_WRITABLE; - pWal->curFileLastVersion = -1; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenReadWrite(fnameStr); } - pWal->curFileFirstVersion = fname; pWal->curLogTfd = logTfd; pWal->curIdxTfd = idxTfd; return code; @@ -102,8 +102,7 @@ int walSeekVer(SWal *pWal, int64_t ver) { if(ver < pWal->snapshotVersion) { //TODO: seek snapshotted log, invalid in some cases } - if(ver < pWal->curFileFirstVersion || - (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { + if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { walChangeFile(pWal, ver); } walSeekFilePos(pWal, ver); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c new file mode 100644 index 0000000000..2eec4328e6 --- /dev/null +++ b/source/libs/wal/src/walMeta.c @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tref.h" +#include "tfile.h" +#include "cJSON.h" +#include "walInt.h" + +#include +#include + +int walRollFileInfo(SWal* pWal) { + int64_t ts = taosGetTimestampSec(); + + SArray* pArray = pWal->fileInfoSet; + if(taosArrayGetSize(pArray) != 0) { + WalFileInfo *pInfo = taosArrayGetLast(pArray); + pInfo->lastVer = pWal->lastVersion; + pInfo->closeTs = ts; + } + + WalFileInfo *pNewInfo = malloc(sizeof(WalFileInfo)); + if(pNewInfo == NULL) { + return -1; + } + pNewInfo->firstVer = pWal->lastVersion + 1; + pNewInfo->lastVer = -1; + pNewInfo->createTs = ts; + pNewInfo->closeTs = -1; + pNewInfo->fileSize = 0; + taosArrayPush(pWal->fileInfoSet, pNewInfo); + return 0; +} + +char* walFileInfoSerialize(SWal* pWal) { + char buf[30]; + if(pWal == NULL || pWal->fileInfoSet == NULL) return 0; + int sz = pWal->fileInfoSet->size; + cJSON* root = cJSON_CreateArray(); + cJSON* field; + if(root == NULL) { + //TODO + return NULL; + } + WalFileInfo* pData = pWal->fileInfoSet->pData; + for(int i = 0; i < sz; i++) { + WalFileInfo* pInfo = &pData[i]; + cJSON_AddItemToArray(root, field = cJSON_CreateObject()); + if(field == NULL) { + cJSON_Delete(root); + return NULL; + } + //cjson only support int32_t or double + //string are used to prohibit the loss of precision + sprintf(buf, "%ld", pInfo->firstVer); + cJSON_AddStringToObject(field, "firstVer", buf); + sprintf(buf, "%ld", pInfo->lastVer); + cJSON_AddStringToObject(field, "lastVer", buf); + sprintf(buf, "%ld", pInfo->createTs); + cJSON_AddStringToObject(field, "createTs", buf); + sprintf(buf, "%ld", pInfo->closeTs); + cJSON_AddStringToObject(field, "closeTs", buf); + sprintf(buf, "%ld", pInfo->fileSize); + cJSON_AddStringToObject(field, "fileSize", buf); + } + return cJSON_Print(root); +} + +SArray* walFileInfoDeserialize(const char* bytes) { + cJSON *root, *pInfoJson, *pField; + root = cJSON_Parse(bytes); + int sz = cJSON_GetArraySize(root); + //deserialize + SArray* pArray = taosArrayInit(sz, sizeof(WalFileInfo)); + WalFileInfo *pData = pArray->pData; + for(int i = 0; i < sz; i++) { + cJSON* pInfoJson = cJSON_GetArrayItem(root, i); + WalFileInfo* pInfo = &pData[i]; + pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); + pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "lastVer"); + pInfo->lastVer = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "createTs"); + pInfo->createTs = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "closeTs"); + pInfo->closeTs = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "fileSize"); + pInfo->fileSize = atoll(cJSON_GetStringValue(pField)); + } + taosArraySetSize(pArray, sz); + return pArray; +} + +static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { + return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); +} + +static int walFindCurMetaVer(SWal* pWal) { + const char * pattern = "^meta-ver[0-9]+$"; + regex_t walMetaRegexPattern; + regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED); + + DIR *dir = opendir(pWal->path); + if(dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent* ent; + + //find existing meta-ver[x].json + int metaVer = -1; + while((ent = readdir(dir)) != NULL) { + char *name = basename(ent->d_name); + int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0); + if(code == 0) { + sscanf(name, "meta-ver%d", &metaVer); + break; + } + } + return metaVer; +} + +int walWriteMeta(SWal* pWal) { + int metaVer = walFindCurMetaVer(pWal); + char fnameStr[WAL_FILE_LEN]; + walBuildMetaName(pWal, metaVer+1, fnameStr); + int metaTfd = tfOpenCreateWrite(fnameStr); + if(metaTfd < 0) { + return -1; + } + char* serialized = walFileInfoSerialize(pWal); + int len = strlen(serialized); + if(len != tfWrite(metaTfd, serialized, len)) { + //TODO:clean file + return -1; + } + + tfClose(metaTfd); + //delete old file + if(metaVer > -1) { + walBuildMetaName(pWal, metaVer, fnameStr); + remove(fnameStr); + } + return 0; +} + +int walReadMeta(SWal* pWal) { + ASSERT(pWal->fileInfoSet->size == 0); + //find existing meta file + int metaVer = walFindCurMetaVer(pWal); + if(metaVer == -1) { + return 0; + } + char fnameStr[WAL_FILE_LEN]; + walBuildMetaName(pWal, metaVer, fnameStr); + //read metafile + struct stat statbuf; + stat(fnameStr, &statbuf); + int size = statbuf.st_size; + char* buf = malloc(size + 5); + if(buf == NULL) { + return -1; + } + int tfd = tfOpenRead(fnameStr); + if(tfRead(tfd, buf, size) != size) { + free(buf); + return -1; + } + //load into fileInfoSet + pWal->fileInfoSet = walFileInfoDeserialize(buf); + if(pWal->fileInfoSet == NULL) { + free(buf); + return -1; + } + free(buf); + return 0; +} diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index bc2e687069..acb173b17b 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -64,43 +64,31 @@ int32_t walInit() { void walCleanUp() { walStopThread(); taosCloseRef(tsWal.refSetId); + atomic_store_8(&tsWal.inited, 0); wInfo("wal module is cleaned up"); } -static int walLoadFileset(SWal *pWal) { - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent* ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - name[WAL_NOSUFFIX_LEN] = 0; - //validate file name by regex matching - if(1 /* TODO:regex match */) { - int64_t fnameInt64 = atoll(name); - taosArrayPush(pWal->fileSet, &fnameInt64); - } - } - taosArraySort(pWal->fileSet, compareInt64Val); - return 0; -} - SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *pWal = malloc(sizeof(SWal)); if (pWal == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - - pWal->vgId = pCfg->vgId; pWal->curLogTfd = -1; pWal->curIdxTfd = -1; - pWal->level = pCfg->walLevel; - pWal->fsyncPeriod = pCfg->fsyncPeriod; + //set config + pWal->vgId = pCfg->vgId; + pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->rollPeriod = pCfg->rollPeriod; + pWal->segSize = pCfg->segSize; + pWal->level = pCfg->walLevel; + + //init status + pWal->lastVersion = -1; + pWal->lastRollSeq = -1; + + //init write buffer memset(&pWal->head, 0, sizeof(SWalHead)); pWal->head.sver = 0; @@ -120,7 +108,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { walFreeObj(pWal); return NULL; } - walLoadFileset(pWal); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); @@ -153,8 +140,8 @@ void walClose(SWal *pWal) { pthread_mutex_lock(&pWal->mutex); tfClose(pWal->curLogTfd); tfClose(pWal->curIdxTfd); - taosArrayDestroy(pWal->fileSet); - pWal->fileSet = NULL; + /*taosArrayDestroy(pWal->fileInfoSet);*/ + /*pWal->fileInfoSet = NULL;*/ pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refSetId, pWal->refId); } @@ -164,8 +151,8 @@ static int32_t walInitObj(SWal *pWal) { wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } - pWal->fileSet = taosArrayInit(0, sizeof(int64_t)); - if(pWal->fileSet == NULL) { + pWal->fileInfoSet = taosArrayInit(0, sizeof(WalFileInfo)); + if(pWal->fileInfoSet == NULL) { wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } @@ -180,8 +167,10 @@ static void walFreeObj(void *wal) { tfClose(pWal->curLogTfd); tfClose(pWal->curIdxTfd); - taosArrayDestroy(pWal->fileSet); - pWal->fileSet = NULL; + taosArrayDestroy(pWal->fileInfoSet); + pWal->fileInfoSet = NULL; + taosArrayDestroy(pWal->fileInfoSet); + pWal->fileInfoSet = NULL; pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } @@ -210,7 +199,7 @@ static void walFsyncAll() { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); int32_t code = tfFsync(pWal->curLogTfd); if (code != 0) { - wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code)); + wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); } } pWal = taosIterateRef(tsWal.refSetId, pWal->refId); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b475183b7b..90ec5528c4 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -13,16 +13,56 @@ * along with this program. If not, see . */ -#include "wal.h" +#include "walInt.h" +#include "tfile.h" #include "tchecksum.h" -static int walValidateChecksum(SWalHead *pHead, void* body, int64_t bodyLen) { - return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead) && - taosCheckChecksum(body, bodyLen, pHead->cksumBody); +static inline int walValidHeadCksum(SWalHead* pHead) { + return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead); } +static inline int walValidBodyCksum(SWalHead* pHead) { + return taosCheckChecksum((uint8_t*)pHead->cont, pHead->len, pHead->cksumBody); +} + +static int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { + return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); +} int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { + int code; + code = walSeekVer(pWal, ver); + if(code != 0) { + return code; + } + if(*ppHead == NULL) { + void* ptr = realloc(*ppHead, sizeof(SWalHead)); + if(ptr == NULL) { + return -1; + } + *ppHead = ptr; + } + if(tfRead(pWal->curLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) { + return -1; + } + //TODO: endian compatibility processing after read + if(walValidHeadCksum(*ppHead) != 0) { + return -1; + } + void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->len); + if(ptr == NULL) { + free(*ppHead); + *ppHead = NULL; + return -1; + } + if(tfRead(pWal->curLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { + return -1; + } + //TODO: endian compatibility processing after read + if(walValidBodyCksum(*ppHead) != 0) { + return -1; + } + return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 69c83a9912..3c65698938 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,29 +21,42 @@ #include "tfile.h" #include "walInt.h" +static void walFtruncate(SWal *pWal, int64_t ver); + int32_t walCommit(SWal *pWal, int64_t ver) { + ASSERT(pWal->snapshotVersion <= pWal->commitVersion); + ASSERT(pWal->commitVersion <= pWal->lastVersion); + ASSERT(ver >= pWal->commitVersion); + ASSERT(ver <= pWal->lastVersion); + pWal->commitVersion = ver; return 0; } int32_t walRollback(SWal *pWal, int64_t ver) { //TODO: ftruncate + ASSERT(ver > pWal->commitVersion); + ASSERT(ver <= pWal->lastVersion); + //seek position + walSeekVer(pWal, ver); + walFtruncate(pWal, ver); return 0; } int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { pWal->snapshotVersion = ver; + WalFileInfo tmp; + tmp.firstVer = ver; //mark files safe to delete - int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); - if(pRet != pWal->fileSet->pData) { - //delete files until less than retention size - - //find first file that exceeds retention time - - } + WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + //iterate files, until the searched result + //if totSize > rtSize, delete + //if createTs > retentionTs, delete + + //save snapshot ver, commit ver + + //make new array, remove files - //delete files living longer than retention limit - //remove file from fileset return 0; } @@ -138,105 +151,123 @@ void walRemoveAllOldFiles(void *handle) { } #endif -static int walRoll(SWal *pWal) { +int walRoll(SWal *pWal) { int code = 0; - code = tfClose(pWal->curIdxTfd); - if(code != 0) { - return code; + if(pWal->curIdxTfd != -1) { + code = tfClose(pWal->curIdxTfd); + if(code != 0) { + return -1; + } } - code = tfClose(pWal->curLogTfd); - if(code != 0) { - return code; + if(pWal->curLogTfd != -1) { + code = tfClose(pWal->curLogTfd); + if(code != 0) { + return -1; + } } int64_t idxTfd, logTfd; //create new file int64_t newFileFirstVersion = pWal->lastVersion + 1; char fnameStr[WAL_FILE_LEN]; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion); + walBuildIdxName(pWal, newFileFirstVersion, fnameStr); idxTfd = tfOpenCreateWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion); + if(idxTfd < 0) { + ASSERT(0); + return -1; + } + walBuildLogName(pWal, newFileFirstVersion, fnameStr); logTfd = tfOpenCreateWrite(fnameStr); + if(logTfd < 0) { + ASSERT(0); + return -1; + } + code = walRollFileInfo(pWal); + if(code != 0) { + ASSERT(0); + return -1; + } - taosArrayPush(pWal->fileSet, &newFileFirstVersion); - //switch file pWal->curIdxTfd = idxTfd; pWal->curLogTfd = logTfd; //change status - pWal->curFileLastVersion = -1; - pWal->curFileFirstVersion = newFileFirstVersion; - pWal->curVersion = newFileFirstVersion; - pWal->curLogOffset = 0; pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; - pWal->lastFileName = newFileFirstVersion; - pWal->lastFileWriteSize = 0; pWal->lastRollSeq = walGetSeq(); return 0; } int walChangeFileToLast(SWal *pWal) { int64_t idxTfd, logTfd; - int64_t* pRet = taosArrayGetLast(pWal->fileSet); + WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); ASSERT(pRet != NULL); - int64_t fname = *pRet; + int64_t fileFirstVer = pRet->firstVer; char fnameStr[WAL_FILE_LEN]; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + if(idxTfd < 0) { + return -1; + } + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenReadWrite(fnameStr); + if(logTfd < 0) { + return -1; + } //switch file pWal->curIdxTfd = idxTfd; pWal->curLogTfd = logTfd; //change status - pWal->curFileLastVersion = -1; - pWal->curFileFirstVersion = fname; - pWal->curVersion = fname; - pWal->curLogOffset = 0; + pWal->curVersion = fileFirstVer; pWal->curStatus = WAL_CUR_FILE_WRITABLE; return 0; } -int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { +static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { int code = 0; //get index file if(!tfValid(pWal->curIdxTfd)) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + + WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pInfo->firstVer, strerror(errno)); + return code; } int64_t writeBuf[2] = { ver, offset }; int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf)); if(size != sizeof(writeBuf)) { - //TODO: + return -1; } return 0; } -int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) { +int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) { if (pWal == NULL) return -1; + int code = 0; // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; if (index == pWal->lastVersion + 1) { - int64_t passed = walGetSeq() - pWal->lastRollSeq; - if(passed > pWal->rollPeriod) { - walRoll(pWal); - } else if(pWal->lastFileWriteSize > pWal->segSize) { - walRoll(pWal); + if(taosArrayGetSize(pWal->fileInfoSet) == 0) { + code = walRoll(pWal); + ASSERT(code == 0); } else { - walChangeFileToLast(pWal); + int64_t passed = walGetSeq() - pWal->lastRollSeq; + if(pWal->rollPeriod != -1 && passed > pWal->rollPeriod) { + walRoll(pWal); + } else if(pWal->segSize != -1 && walGetLastFileSize(pWal) > pWal->segSize) { + walRoll(pWal); + } } } else { //reject skip log or rewrite log //must truncate explicitly first return -1; } - if (!tfValid(pWal->curLogTfd)) return 0; + /*if (!tfValid(pWal->curLogTfd)) return 0;*/ pWal->head.version = index; - int32_t code = 0; pWal->head.signature = WAL_SIGNATURE; pWal->head.len = bodyLen; @@ -250,19 +281,23 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + } + code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); + if(code != 0) { + //TODO } - walWriteIndex(pWal, index, pWal->curLogOffset); - pWal->curLogOffset += sizeof(SWalHead) + bodyLen; //set status pWal->lastVersion = index; + walGetCurFileInfo(pWal)->lastVer = index; + walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; pthread_mutex_unlock(&pWal->mutex); @@ -273,9 +308,9 @@ void walFsync(SWal *pWal, bool forceFsync) { if (pWal == NULL || !tfValid(pWal->curLogTfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { - wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, pWal->curFileFirstVersion); + wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); if (tfFsync(pWal->curLogTfd) < 0) { - wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); } } } @@ -348,8 +383,36 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { } #endif -static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) { - tfFtruncate(tfd, offset); +static int walValidateOffset(SWal* pWal, int64_t ver) { + int code = 0; + SWalHead *pHead = NULL; + code = (int)walRead(pWal, &pHead, ver); + if(pHead->version != ver) { + return -1; + } + return 0; +} + +static int64_t walGetOffset(SWal* pWal, int64_t ver) { + int code = walSeekVer(pWal, ver); + if(code != 0) { + return -1; + } + + code = walValidateOffset(pWal, ver); + if(code != 0) { + return -1; + } + + return 0; +} + +static void walFtruncate(SWal *pWal, int64_t ver) { + int64_t tfd = pWal->curLogTfd; + tfFtruncate(tfd, ver); + tfFsync(tfd); + tfd = pWal->curIdxTfd; + tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE); tfFsync(tfd); } diff --git a/source/libs/wal/test/CMakeLists.txt b/source/libs/wal/test/CMakeLists.txt new file mode 100644 index 0000000000..1c0a3a162a --- /dev/null +++ b/source/libs/wal/test/CMakeLists.txt @@ -0,0 +1,20 @@ +add_executable(walTest "") +target_sources(walTest + PRIVATE + "walMetaTest.cpp" +) +target_include_directories(walTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/wal" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + +target_link_libraries(walTest + wal + gtest_main +) +enable_testing() +add_test( + NAME wal_test + COMMAND walTest +) diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp new file mode 100644 index 0000000000..4c0533d389 --- /dev/null +++ b/source/libs/wal/test/walMetaTest.cpp @@ -0,0 +1,157 @@ +#include +#include +#include +#include + +#include "tfile.h" +#include "walInt.h" + +class WalCleanEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(); + ASSERT(code == 0); + code = tfInit(); + ASSERT(code == 0); + } + + static void TearDownTestCase() { + walCleanUp(); + tfCleanup(); + } + + void SetUp() override { + taosRemoveDir(pathName); + SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->rollPeriod = -1; + pCfg->segSize = -1; + pCfg->walLevel = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = "/tmp/wal_test"; +}; + +class WalKeepEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(); + ASSERT(code == 0); + code = tfInit(); + ASSERT(code == 0); + } + + static void TearDownTestCase() { + walCleanUp(); + tfCleanup(); + } + + void SetUp() override { + SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->rollPeriod = -1; + pCfg->segSize = -1; + pCfg->walLevel = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = "/tmp/wal_test"; +}; + +TEST_F(WalCleanEnv, createNew) { + walRollFileInfo(pWal); + ASSERT(pWal->fileInfoSet != NULL); + ASSERT_EQ(pWal->fileInfoSet->size, 1); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + ASSERT_EQ(pInfo->firstVer, 0); + ASSERT_EQ(pInfo->lastVer, -1); + ASSERT_EQ(pInfo->closeTs, -1); + ASSERT_EQ(pInfo->fileSize, 0); +} + +TEST_F(WalCleanEnv, serialize) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + char*ss = walFileInfoSerialize(pWal); + printf("%s\n", ss); + code = walWriteMeta(pWal); + ASSERT(code == 0); +} + +TEST_F(WalCleanEnv, removeOldMeta) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + code = walWriteMeta(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walWriteMeta(pWal); + ASSERT(code == 0); +} + +TEST_F(WalKeepEnv, readOldMeta) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + code = walWriteMeta(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walWriteMeta(pWal); + ASSERT(code == 0); + char*oldss = walFileInfoSerialize(pWal); + + TearDown(); + SetUp(); + code = walReadMeta(pWal); + ASSERT(code == 0); + char* newss = walFileInfoSerialize(pWal); + + int len = strlen(oldss); + ASSERT_EQ(len, strlen(newss)); + for(int i = 0; i < len; i++) { + EXPECT_EQ(oldss[i], newss[i]); + } +} + +TEST_F(WalKeepEnv, write) { + const char* ranStr = "tvapq02tcp"; + const int len = strlen(ranStr); + int code; + for(int i = 0; i < 10; i++) { + code = walWrite(pWal, i, i+1, (void*)ranStr, len); + ASSERT_EQ(code, 0); + code = walWrite(pWal, i+2, i, (void*)ranStr, len); + ASSERT_EQ(code, -1); + } + code = walWriteMeta(pWal); + ASSERT_EQ(code, 0); +} diff --git a/source/libs/wal/test/walTests.cpp b/source/libs/wal/test/walTests.cpp deleted file mode 100644 index 505728fbe4..0000000000 --- a/source/libs/wal/test/walTests.cpp +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -//#define _DEFAULT_SOURCE -#include "os.h" -#include "tutil.h" -#include "tglobal.h" -#include "tlog.h" -#include "twal.h" -#include "tfile.h" - -int64_t ver = 0; -void *pWal = NULL; - -int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { - // do nothing - SWalHead *pHead = data; - - if (pHead->version > ver) - ver = pHead->version; - - walWrite(pWal, pHead); - - return 0; -} - -int main(int argc, char *argv[]) { - char path[128] = "/tmp/wal"; - int level = 2; - int total = 5; - int rows = 10000; - int size = 128; - int keep = 0; - - for (int i=1; iversion = ++ver; - pHead->len = size; - walWrite(pWal, pHead); - } - - printf("renew a wal, i:%d\n", i); - walRenew(pWal); - } - - printf("%d wal files are written\n", total); - - int64_t index = 0; - char name[256]; - - while (1) { - int code = walGetWalFile(pWal, name, &index); - if (code == -1) { - printf("failed to get wal file, index:%" PRId64 "\n", index); - break; - } - - printf("index:%" PRId64 " wal:%s\n", index, name); - if (code == 0) break; - } - - getchar(); - - walClose(pWal); - walCleanUp(); - tfCleanup(); - - return 0; -} From d502191a59e9193c4cbd7edb0f29841d741875e9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Dec 2021 19:30:03 +0800 Subject: [PATCH 11/15] update test case --- source/libs/index/test/indexTests.cpp | 35 ++++++++++++++++++--------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 475775c01e..82b5ca1e24 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -130,25 +130,28 @@ class FstReadMemory { -void Performance_fstWriteRecords(FstWriter *b) { +int Performance_fstWriteRecords(FstWriter *b) { std::string str("aa"); - for (int i = 0; i < 100; i++) { + int L = 100, M = 100, N = 10; + for (int i = 0; i < L; i++) { str[0] = 'a' + i; str.resize(2); - for(int j = 0; j < 100; j++) { + for(int j = 0; j < M; j++) { str[1] = 'a' + j; str.resize(2); - for (int k = 0; k < 100; k++) { + for (int k = 0; k < N; k++) { str.push_back('a'); b->Put(str, k); + printf("(%d, %d, %d, %s)\n", i, j, k, str.c_str()); } } } + return L * M * N; } void Performance_fstReadRecords(FstReadMemory *m) { std::string str("a"); - for (int i = 0; i < 500; i++) { + for (int i = 0; i < 50; i++) { //std::string str("aa"); str.push_back('a'); uint64_t out, cost; @@ -160,22 +163,32 @@ void Performance_fstReadRecords(FstReadMemory *m) { } } } +void checkFstPerf() { + FstWriter *fw = new FstWriter; + int64_t s = taosGetTimestampUs(); + int num = Performance_fstWriteRecords(fw); + int64_t e = taosGetTimestampUs(); + + printf("write %d record cost %" PRId64"us\n", num, e - s); + +} + void validateFst() { - r int val = 100; int count = 100; FstWriter *fw = new FstWriter; + // write { std::string key("ab"); - int64_t val = 100; for (int i = 0; i < count; i++) { key.push_back('a' + i); - fw->Put(key, val + i); + fw->Put(key, val - i); } } delete fw; + // read FstReadMemory *m = new FstReadMemory(1024 * 64); if (m->init() == false) { std::cout << "init readMemory failed" << std::endl; @@ -192,7 +205,7 @@ void validateFst() { for (int i = 0; i < count; i++) { key.push_back('a' + i); if (m->Get(key, &out) ) { - assert(val + i == out); + assert(val - i == out); printf("success to get (%s, %" PRId64")\n", key.c_str(), out); } else { printf("failed to get(%s)\n", key.c_str()); @@ -203,9 +216,7 @@ void validateFst() { } int main(int argc, char** argv) { - // test write - validateFst(); - + checkFstPerf(); return 1; } From 984f3023537b90409c0d050603dc562c57bc3961 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Dec 2021 20:08:40 +0800 Subject: [PATCH 12/15] update test case --- source/libs/index/src/index_fst.c | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 457b5422a4..cdd62c5f11 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -297,9 +297,10 @@ void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuil // any value greater than or equal to the number of transitions in // this node indicates an absent transition. uint8_t *index = (uint8_t *)malloc(sizeof(uint8_t) * 256); - for (uint8_t i = 0; i < 256; i++) { - index[i] = 255; - } + memset(index, 255, sizeof(uint8_t) * 256); + ///for (uint8_t i = 0; i < 256; i++) { + // index[i] = 255; + ///} for (size_t i = 0; i < sz; i++) { FstTransition *t = taosArrayGet(node->trans, i); index[t->inp] = i; @@ -1126,6 +1127,7 @@ FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) { return b; } + bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice) { int comp = fstSliceCompare(slice, &bound->data); if (bound->type == Included) { @@ -1378,7 +1380,9 @@ FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) { } void fstStreamBuilderDestroy(FstStreamBuilder *b) { fstSliceDestroy(&b->min->data); + tfree(b->min); fstSliceDestroy(&b->max->data); + tfree(b->max); free(b); } FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type) { From 210db65b7d86a810be64ed6a239138c3185f6533 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 9 Dec 2021 20:24:04 +0800 Subject: [PATCH 13/15] fix error resp on large kvs set --- source/libs/index/src/index_fst.c | 2 +- source/libs/index/test/indexTests.cpp | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index cdd62c5f11..403b4a9122 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -304,9 +304,9 @@ void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuil for (size_t i = 0; i < sz; i++) { FstTransition *t = taosArrayGet(node->trans, i); index[t->inp] = i; - fstCountingWriterWrite(w, (char *)index, sizeof(index)); //fstPackDeltaIn(w, addr, t->addr, tSize); } + fstCountingWriterWrite(w, (char *)index, 256); free(index); } fstCountingWriterWrite(w, (char *)&packSizes, 1); diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 82b5ca1e24..928c3875b0 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -168,9 +168,18 @@ void checkFstPerf() { int64_t s = taosGetTimestampUs(); int num = Performance_fstWriteRecords(fw); int64_t e = taosGetTimestampUs(); - printf("write %d record cost %" PRId64"us\n", num, e - s); - + delete fw; + + FstReadMemory *m = new FstReadMemory(1024 * 64); + if (m->init()) { + uint64_t val; + if(m->Get("aaaaaaa", &val)) { + std::cout << "succes to Get val: " << val << std::endl; + } else { + std::cout << "failed to Get " << std::endl; + } + } } From 03a54b5a35f6e0cb9910fd6abc843a272a29d058 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 10 Dec 2021 10:30:57 +0800 Subject: [PATCH 14/15] make tfinit called once --- include/libs/wal/wal.h | 26 +++++------- include/util/tarray.h | 7 ++++ source/libs/wal/inc/walInt.h | 10 ++--- source/libs/wal/src/walIndex.c | 34 +++++++++------- source/libs/wal/src/walMgmt.c | 22 ++++++---- source/libs/wal/src/walRead.c | 4 +- source/libs/wal/src/walWrite.c | 61 +++++++++++++++++++--------- source/libs/wal/test/walMetaTest.cpp | 7 ---- source/util/src/tarray.c | 10 +++++ source/util/src/tfile.c | 6 +++ 10 files changed, 115 insertions(+), 72 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a72765583e..f84297670b 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -91,7 +91,8 @@ typedef struct SWal { int32_t fsyncPeriod; // millisecond int32_t rollPeriod; // second int64_t segSize; - int64_t rtSize; + int64_t retentionSize; + int32_t retentionPeriod; EWalType level; //total size int64_t totSize; @@ -99,31 +100,24 @@ typedef struct SWal { int32_t fsyncSeq; //reference int64_t refId; - //current tfd - int64_t curLogTfd; - int64_t curIdxTfd; + //write tfd + int64_t writeLogTfd; + int64_t writeIdxTfd; + //read tfd + int64_t readLogTfd; + int64_t readIdxTfd; //current version int64_t curVersion; - - //current file version - //int64_t curFileFirstVersion; - //int64_t curFileLastVersion; - //wal lifecycle int64_t firstVersion; int64_t snapshotVersion; int64_t commitVersion; int64_t lastVersion; - - //last file - //int64_t lastFileName; - //roll status int64_t lastRollSeq; - //int64_t lastFileWriteSize; - //file set - int32_t fileCursor; + int32_t writeCur; + int32_t readCur; SArray* fileInfoSet; //ctl int32_t curStatus; diff --git a/include/util/tarray.h b/include/util/tarray.h index 5807c980e0..25862a7119 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -146,6 +146,13 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData); */ void taosArraySet(SArray* pArray, size_t index, void* pData); +/** + * remove some data entry from front + * @param pArray + * @param cnt + */ +void taosArrayPopFrontBatch(SArray* pArray, size_t cnt); + /** * remove data entry of the given index * @param pArray diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index ae655d61da..a8b2d2fc7f 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -49,26 +49,26 @@ static inline int64_t walGetLastFileFirstVer(SWal* pWal) { } static inline int64_t walGetCurFileFirstVer(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->firstVer; } static inline int64_t walGetCurFileLastVer(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->firstVer; } static inline int64_t walGetCurFileOffset(SWal* pWal) { - WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->fileSize; } static inline bool walCurFileClosed(SWal* pWal) { - return taosArrayGetSize(pWal->fileInfoSet) != pWal->fileCursor; + return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; } static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { - return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); } static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index b4d66226d6..bf51be4346 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -23,25 +23,25 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int code = 0; - int64_t idxTfd = pWal->curIdxTfd; - int64_t logTfd = pWal->curLogTfd; + int64_t idxTfd = pWal->writeIdxTfd; + int64_t logTfd = pWal->writeLogTfd; //seek position int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; code = tfLseek(idxTfd, offset, SEEK_SET); if(code != 0) { - + return -1; } int64_t readBuf[2]; code = tfRead(idxTfd, readBuf, sizeof(readBuf)); if(code != 0) { - + return -1; } //TODO:deserialize ASSERT(readBuf[0] == ver); code = tfLseek(logTfd, readBuf[1], SEEK_CUR); if (code != 0) { - + return -1; } /*pWal->curLogOffset = readBuf[1];*/ pWal->curVersion = ver; @@ -52,11 +52,11 @@ static int walChangeFile(SWal *pWal, int64_t ver) { int code = 0; int64_t idxTfd, logTfd; char fnameStr[WAL_FILE_LEN]; - code = tfClose(pWal->curLogTfd); + code = tfClose(pWal->writeLogTfd); if(code != 0) { //TODO } - code = tfClose(pWal->curIdxTfd); + code = tfClose(pWal->writeIdxTfd); if(code != 0) { //TODO } @@ -81,14 +81,14 @@ static int walChangeFile(SWal *pWal, int64_t ver) { logTfd = tfOpenReadWrite(fnameStr); } - pWal->curLogTfd = logTfd; - pWal->curIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; return code; } int walSeekVer(SWal *pWal, int64_t ver) { - if((!(pWal->curStatus & WAL_CUR_FAILED)) - && ver == pWal->curVersion) { + int code; + if((!(pWal->curStatus & WAL_CUR_FAILED)) && ver == pWal->curVersion) { return 0; } if(ver > pWal->lastVersion) { @@ -103,9 +103,15 @@ int walSeekVer(SWal *pWal, int64_t ver) { //TODO: seek snapshotted log, invalid in some cases } if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { - walChangeFile(pWal, ver); + code = walChangeFile(pWal, ver); + if(code != 0) { + return -1; + } } - walSeekFilePos(pWal, ver); - + code = walSeekFilePos(pWal, ver); + if(code != 0) { + return -1; + } + return 0; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index acb173b17b..4104e0e6ce 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -48,9 +48,15 @@ int32_t walInit() { int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1); if(old == 1) return 0; + int code = tfInit(); + if(code != 0) { + wError("failed to init tfile since %s", tstrerror(code)); + atomic_store_8(&tsWal.inited, 0); + return code; + } tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); - int code = walCreateThread(); + code = walCreateThread(); if (code != 0) { wError("failed to init wal module since %s", tstrerror(code)); atomic_store_8(&tsWal.inited, 0); @@ -74,8 +80,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - pWal->curLogTfd = -1; - pWal->curIdxTfd = -1; + pWal->writeLogTfd = -1; + pWal->writeIdxTfd = -1; //set config pWal->vgId = pCfg->vgId; @@ -138,8 +144,8 @@ void walClose(SWal *pWal) { if (pWal == NULL) return; pthread_mutex_lock(&pWal->mutex); - tfClose(pWal->curLogTfd); - tfClose(pWal->curIdxTfd); + tfClose(pWal->writeLogTfd); + tfClose(pWal->writeIdxTfd); /*taosArrayDestroy(pWal->fileInfoSet);*/ /*pWal->fileInfoSet = NULL;*/ pthread_mutex_unlock(&pWal->mutex); @@ -165,8 +171,8 @@ static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); - tfClose(pWal->curLogTfd); - tfClose(pWal->curIdxTfd); + tfClose(pWal->writeLogTfd); + tfClose(pWal->writeIdxTfd); taosArrayDestroy(pWal->fileInfoSet); pWal->fileInfoSet = NULL; taosArrayDestroy(pWal->fileInfoSet); @@ -197,7 +203,7 @@ static void walFsyncAll() { while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); - int32_t code = tfFsync(pWal->curLogTfd); + int32_t code = tfFsync(pWal->writeLogTfd); if (code != 0) { wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 90ec5528c4..e9f5bcbc5d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -42,7 +42,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { } *ppHead = ptr; } - if(tfRead(pWal->curLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) { + if(tfRead(pWal->writeLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) { return -1; } //TODO: endian compatibility processing after read @@ -55,7 +55,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { *ppHead = NULL; return -1; } - if(tfRead(pWal->curLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { + if(tfRead(pWal->writeLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { return -1; } //TODO: endian compatibility processing after read diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 3c65698938..0c4989300f 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -44,18 +44,39 @@ int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { pWal->snapshotVersion = ver; + int ts = taosGetTimestampSec(); + int deleteCnt = 0; + int64_t newTotSize = pWal->totSize; WalFileInfo tmp; tmp.firstVer = ver; //mark files safe to delete WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); //iterate files, until the searched result - //if totSize > rtSize, delete - //if createTs > retentionTs, delete + for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { + if(pWal->totSize > pWal->retentionSize || + iter->closeTs + pWal->retentionPeriod > ts) { + //delete according to file size or close time + deleteCnt++; + newTotSize -= iter->fileSize; + } + } + char fnameStr[WAL_FILE_LEN]; + //remove file + for(int i = 0; i < deleteCnt; i++) { + WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); + remove(fnameStr); + walBuildIdxName(pWal, pInfo->firstVer, fnameStr); + remove(fnameStr); + } //save snapshot ver, commit ver + //make new array, remove files + taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); + pWal->totSize = newTotSize; return 0; } @@ -153,14 +174,14 @@ void walRemoveAllOldFiles(void *handle) { int walRoll(SWal *pWal) { int code = 0; - if(pWal->curIdxTfd != -1) { - code = tfClose(pWal->curIdxTfd); + if(pWal->writeIdxTfd != -1) { + code = tfClose(pWal->writeIdxTfd); if(code != 0) { return -1; } } - if(pWal->curLogTfd != -1) { - code = tfClose(pWal->curLogTfd); + if(pWal->writeLogTfd != -1) { + code = tfClose(pWal->writeLogTfd); if(code != 0) { return -1; } @@ -188,8 +209,8 @@ int walRoll(SWal *pWal) { } //switch file - pWal->curIdxTfd = idxTfd; - pWal->curLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; //change status pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; @@ -215,8 +236,8 @@ int walChangeFileToLast(SWal *pWal) { return -1; } //switch file - pWal->curIdxTfd = idxTfd; - pWal->curLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; //change status pWal->curVersion = fileFirstVer; pWal->curStatus = WAL_CUR_FILE_WRITABLE; @@ -226,15 +247,14 @@ int walChangeFileToLast(SWal *pWal) { static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { int code = 0; //get index file - if(!tfValid(pWal->curIdxTfd)) { + if(!tfValid(pWal->writeIdxTfd)) { code = TAOS_SYSTEM_ERROR(errno); - WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); - wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pInfo->firstVer, strerror(errno)); + wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); return code; } int64_t writeBuf[2] = { ver, offset }; - int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf)); + int size = tfWrite(pWal->writeIdxTfd, writeBuf, sizeof(writeBuf)); if(size != sizeof(writeBuf)) { return -1; } @@ -278,13 +298,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i pthread_mutex_lock(&pWal->mutex); - if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { + if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } - if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) { + if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); @@ -296,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i //set status pWal->lastVersion = index; + pWal->totSize += sizeof(SWalHead) + bodyLen; walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; @@ -305,11 +326,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } void walFsync(SWal *pWal, bool forceFsync) { - if (pWal == NULL || !tfValid(pWal->curLogTfd)) return; + if (pWal == NULL || !tfValid(pWal->writeLogTfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); - if (tfFsync(pWal->curLogTfd) < 0) { + if (tfFsync(pWal->writeLogTfd) < 0) { wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); } } @@ -408,10 +429,10 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) { } static void walFtruncate(SWal *pWal, int64_t ver) { - int64_t tfd = pWal->curLogTfd; + int64_t tfd = pWal->writeLogTfd; tfFtruncate(tfd, ver); tfFsync(tfd); - tfd = pWal->curIdxTfd; + tfd = pWal->writeIdxTfd; tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE); tfFsync(tfd); } diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 4c0533d389..46328bb626 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -3,7 +3,6 @@ #include #include -#include "tfile.h" #include "walInt.h" class WalCleanEnv : public ::testing::Test { @@ -11,13 +10,10 @@ class WalCleanEnv : public ::testing::Test { static void SetUpTestCase() { int code = walInit(); ASSERT(code == 0); - code = tfInit(); - ASSERT(code == 0); } static void TearDownTestCase() { walCleanUp(); - tfCleanup(); } void SetUp() override { @@ -45,13 +41,10 @@ class WalKeepEnv : public ::testing::Test { static void SetUpTestCase() { int code = walInit(); ASSERT(code == 0); - code = tfInit(); - ASSERT(code == 0); } static void TearDownTestCase() { walCleanUp(); - tfCleanup(); } void SetUp() override { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 007ce06829..ff52477a6f 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -237,6 +237,16 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) { memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize); } +void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { + assert(cnt <= pArray->size); + pArray->size = pArray->size - cnt; + if(pArray->size == 0) { + pArray->size = 0; + return; + } + memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size); +} + void taosArrayRemove(SArray* pArray, size_t index) { assert(index < pArray->size); diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 5d4789aae6..313f1d97af 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -22,20 +22,26 @@ static int32_t tsFileRsetId = -1; +static int8_t tfInited = 0; + static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); } int32_t tfInit() { + int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1); + if(old == 1) return 0; tsFileRsetId = taosOpenRef(2000, tfCloseFile); if (tsFileRsetId > 0) { return 0; } else { + atomic_store_8(&tfInited, 0); return -1; } } void tfCleanup() { + atomic_store_8(&tfInited, 0); if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); tsFileRsetId = -1; } From bd408504bb98d1f122758182831470140ba5f791 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 10 Dec 2021 11:31:36 +0800 Subject: [PATCH 15/15] make wal meta serialized and flushed into disk --- source/libs/wal/inc/walInt.h | 4 +- source/libs/wal/src/walMeta.c | 80 ++++++++++++++++++---------- source/libs/wal/src/walMgmt.c | 2 - source/libs/wal/test/walMetaTest.cpp | 7 ++- 4 files changed, 58 insertions(+), 35 deletions(-) diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index a8b2d2fc7f..f56f240904 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -83,8 +83,8 @@ int walReadMeta(SWal* pWal); int walWriteMeta(SWal* pWal); int walRollFileInfo(SWal* pWal); -char* walFileInfoSerialize(SWal* pWal); -SArray* walFileInfoDeserialize(const char* bytes); +char* walMetaSerialize(SWal* pWal); +int walMetaDeserialize(SWal* pWal, const char* bytes); //meta section end int64_t walGetSeq(); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 2eec4328e6..65085bb96d 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -47,49 +47,74 @@ int walRollFileInfo(SWal* pWal) { return 0; } -char* walFileInfoSerialize(SWal* pWal) { +char* walMetaSerialize(SWal* pWal) { char buf[30]; if(pWal == NULL || pWal->fileInfoSet == NULL) return 0; int sz = pWal->fileInfoSet->size; - cJSON* root = cJSON_CreateArray(); - cJSON* field; - if(root == NULL) { + cJSON* pRoot = cJSON_CreateObject(); + cJSON* pMeta = cJSON_CreateObject(); + cJSON* pFiles = cJSON_CreateArray(); + cJSON* pField; + if(pRoot == NULL || pMeta == NULL || pFiles == NULL) { //TODO return NULL; } + cJSON_AddItemToObject(pRoot, "meta", pMeta); + sprintf(buf, "%" PRId64, pWal->firstVersion); + cJSON_AddStringToObject(pMeta, "firstVer", buf); + sprintf(buf, "%" PRId64, pWal->snapshotVersion); + cJSON_AddStringToObject(pMeta, "snapshotVer", buf); + sprintf(buf, "%" PRId64, pWal->commitVersion); + cJSON_AddStringToObject(pMeta, "commitVer", buf); + sprintf(buf, "%" PRId64, pWal->lastVersion); + cJSON_AddStringToObject(pMeta, "lastVer", buf); + + cJSON_AddItemToObject(pRoot, "files", pFiles); WalFileInfo* pData = pWal->fileInfoSet->pData; for(int i = 0; i < sz; i++) { WalFileInfo* pInfo = &pData[i]; - cJSON_AddItemToArray(root, field = cJSON_CreateObject()); - if(field == NULL) { - cJSON_Delete(root); + cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject()); + if(pField == NULL) { + cJSON_Delete(pRoot); return NULL; } //cjson only support int32_t or double //string are used to prohibit the loss of precision - sprintf(buf, "%ld", pInfo->firstVer); - cJSON_AddStringToObject(field, "firstVer", buf); - sprintf(buf, "%ld", pInfo->lastVer); - cJSON_AddStringToObject(field, "lastVer", buf); - sprintf(buf, "%ld", pInfo->createTs); - cJSON_AddStringToObject(field, "createTs", buf); - sprintf(buf, "%ld", pInfo->closeTs); - cJSON_AddStringToObject(field, "closeTs", buf); - sprintf(buf, "%ld", pInfo->fileSize); - cJSON_AddStringToObject(field, "fileSize", buf); + sprintf(buf, "%" PRId64, pInfo->firstVer); + cJSON_AddStringToObject(pField, "firstVer", buf); + sprintf(buf, "%" PRId64, pInfo->lastVer); + cJSON_AddStringToObject(pField, "lastVer", buf); + sprintf(buf, "%" PRId64, pInfo->createTs); + cJSON_AddStringToObject(pField, "createTs", buf); + sprintf(buf, "%" PRId64, pInfo->closeTs); + cJSON_AddStringToObject(pField, "closeTs", buf); + sprintf(buf, "%" PRId64, pInfo->fileSize); + cJSON_AddStringToObject(pField, "fileSize", buf); } - return cJSON_Print(root); + return cJSON_Print(pRoot); } -SArray* walFileInfoDeserialize(const char* bytes) { - cJSON *root, *pInfoJson, *pField; - root = cJSON_Parse(bytes); - int sz = cJSON_GetArraySize(root); +int walMetaDeserialize(SWal* pWal, const char* bytes) { + ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0); + cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField; + pRoot = cJSON_Parse(bytes); + pMeta = cJSON_GetObjectItem(pRoot, "meta"); + pField = cJSON_GetObjectItem(pMeta, "firstVer"); + pWal->firstVersion = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); + pWal->snapshotVersion = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pMeta, "commitVer"); + pWal->commitVersion = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pMeta, "lastVer"); + pWal->lastVersion = atoll(cJSON_GetStringValue(pField)); + + pFiles = cJSON_GetObjectItem(pRoot, "files"); + int sz = cJSON_GetArraySize(pFiles); //deserialize SArray* pArray = taosArrayInit(sz, sizeof(WalFileInfo)); WalFileInfo *pData = pArray->pData; for(int i = 0; i < sz; i++) { - cJSON* pInfoJson = cJSON_GetArrayItem(root, i); + cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); WalFileInfo* pInfo = &pData[i]; pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); @@ -103,7 +128,8 @@ SArray* walFileInfoDeserialize(const char* bytes) { pInfo->fileSize = atoll(cJSON_GetStringValue(pField)); } taosArraySetSize(pArray, sz); - return pArray; + pWal->fileInfoSet = pArray; + return 0; } static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { @@ -144,7 +170,7 @@ int walWriteMeta(SWal* pWal) { if(metaTfd < 0) { return -1; } - char* serialized = walFileInfoSerialize(pWal); + char* serialized = walMetaSerialize(pWal); int len = strlen(serialized); if(len != tfWrite(metaTfd, serialized, len)) { //TODO:clean file @@ -183,8 +209,8 @@ int walReadMeta(SWal* pWal) { return -1; } //load into fileInfoSet - pWal->fileInfoSet = walFileInfoDeserialize(buf); - if(pWal->fileInfoSet == NULL) { + int code = walMetaDeserialize(pWal, buf); + if(code != 0) { free(buf); return -1; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 4104e0e6ce..6bf9008917 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -175,8 +175,6 @@ static void walFreeObj(void *wal) { tfClose(pWal->writeIdxTfd); taosArrayDestroy(pWal->fileInfoSet); pWal->fileInfoSet = NULL; - taosArrayDestroy(pWal->fileInfoSet); - pWal->fileInfoSet = NULL; pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 46328bb626..96258662a1 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -92,7 +92,7 @@ TEST_F(WalCleanEnv, serialize) { ASSERT(code == 0); code = walRollFileInfo(pWal); ASSERT(code == 0); - char*ss = walFileInfoSerialize(pWal); + char*ss = walMetaSerialize(pWal); printf("%s\n", ss); code = walWriteMeta(pWal); ASSERT(code == 0); @@ -113,20 +113,19 @@ TEST_F(WalCleanEnv, removeOldMeta) { TEST_F(WalKeepEnv, readOldMeta) { int code = walRollFileInfo(pWal); ASSERT(code == 0); - ASSERT(pWal->fileInfoSet != NULL); code = walWriteMeta(pWal); ASSERT(code == 0); code = walRollFileInfo(pWal); ASSERT(code == 0); code = walWriteMeta(pWal); ASSERT(code == 0); - char*oldss = walFileInfoSerialize(pWal); + char*oldss = walMetaSerialize(pWal); TearDown(); SetUp(); code = walReadMeta(pWal); ASSERT(code == 0); - char* newss = walFileInfoSerialize(pWal); + char* newss = walMetaSerialize(pWal); int len = strlen(oldss); ASSERT_EQ(len, strlen(newss));