diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b514648bbd..f84297670b 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -55,12 +55,14 @@ typedef struct { uint32_t signature; uint32_t cksumHead; uint32_t cksumBody; - //char cont[]; + char cont[]; } SWalHead; typedef struct { int32_t vgId; int32_t fsyncPeriod; // millisecond + int32_t rollPeriod; + int64_t segSize; EWalType walLevel; // wal level } SWalCfg; @@ -87,36 +89,41 @@ typedef struct SWal { // cfg int32_t vgId; int32_t fsyncPeriod; // millisecond - int32_t fsyncSeq; int32_t rollPeriod; // second int64_t segSize; + int64_t retentionSize; + int32_t retentionPeriod; EWalType level; + //total size + int64_t totSize; + //fsync seq + int32_t fsyncSeq; //reference int64_t refId; - //current tfd - int64_t curLogTfd; - int64_t curIdxTfd; + //write tfd + int64_t writeLogTfd; + int64_t writeIdxTfd; + //read tfd + int64_t readLogTfd; + int64_t readIdxTfd; //current version int64_t curVersion; - int64_t curLogOffset; - //current file version - int64_t curFileFirstVersion; - int64_t curFileLastVersion; - //wal fileset version + //wal lifecycle int64_t firstVersion; int64_t snapshotVersion; + int64_t commitVersion; int64_t lastVersion; - int64_t lastFileName; //roll status int64_t lastRollSeq; - int64_t lastFileWriteSize; + //file set + int32_t writeCur; + int32_t readCur; + SArray* fileInfoSet; //ctl int32_t curStatus; pthread_mutex_t mutex; //path char path[WAL_PATH_LEN]; - //file set - SArray* fileSet; //reusable write head SWalHead head; } SWal; // WAL HANDLE @@ -133,7 +140,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write -int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen); +int64_t walWrite(SWal *, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen); void walFsync(SWal *, bool force); // apis for lifecycle management diff --git a/include/util/tarray.h b/include/util/tarray.h index 5807c980e0..25862a7119 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -146,6 +146,13 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData); */ void taosArraySet(SArray* pArray, size_t index, void* pData); +/** + * remove some data entry from front + * @param pArray + * @param cnt + */ +void taosArrayPopFrontBatch(SArray* pArray, size_t cnt); + /** * remove data entry of the given index * @param pArray diff --git a/include/util/tfile.h b/include/util/tfile.h index 3d0e2177ac..af4c19e7d1 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -16,6 +16,8 @@ #ifndef _TD_UTIL_FILE_H #define _TD_UTIL_FILE_H +#include "os.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/include/util/tmd5.h b/include/util/tmd5.h index 329f4acf11..f8114ad57b 100644 --- a/include/util/tmd5.h +++ b/include/util/tmd5.h @@ -30,10 +30,10 @@ typedef struct { uint32_t buf[4]; /* scratch buffer */ uint8_t in[64]; /* input buffer */ uint8_t digest[16]; /* actual digest after MD5Final call */ -} MD5_CTX; +} T_MD5_CTX; -void MD5Init(MD5_CTX *mdContext); -void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen); -void MD5Final(MD5_CTX *mdContext); +void tMD5Init(T_MD5_CTX *mdContext); +void tMD5Update(T_MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen); +void tMD5Final(T_MD5_CTX *mdContext); #endif /*_TD_UTIL_MD5_H*/ diff --git a/include/util/tutil.h b/include/util/tutil.h index 41eb2ca26b..e4e66901d3 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -48,10 +48,10 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str); int32_t taosGetFqdnPortFromEp(const char *ep, char *fqdn, uint16_t *port); static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { - MD5_CTX context; - MD5Init(&context); - MD5Update(&context, inBuf, (uint32_t)inLen); - MD5Final(&context); + T_MD5_CTX context; + tMD5Init(&context); + tMD5Update(&context, inBuf, (unsigned int)inLen); + tMD5Final(&context); memcpy(target, context.digest, TSDB_KEY_LEN); } diff --git a/source/dnode/vnode/impl/test/vnodeApiTests.cpp b/source/dnode/vnode/impl/test/vnodeApiTests.cpp index 493fe4448b..df784181b7 100644 --- a/source/dnode/vnode/impl/test/vnodeApiTests.cpp +++ b/source/dnode/vnode/impl/test/vnodeApiTests.cpp @@ -45,7 +45,7 @@ static SKVRow createBasicTag() { tdInitKVRowBuilder(&rb); - for (int i = 10; i < 12; i++) { + for (int i = 0; i < 2; i++) { void *pVal = malloc(sizeof(VarDataLenT) + strlen("foo")); varDataLen(pVal) = strlen("foo"); memcpy(varDataVal(pVal), "foo", strlen("foo")); @@ -120,7 +120,7 @@ TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) { { // Create some child tables - int ntables = 100000; + int ntables = 1000000; int batch = 10; for (int i = 0; i < ntables / batch; i++) { SArray *pMsgs = (SArray *)taosArrayInit(batch, sizeof(SRpcMsg *)); diff --git a/source/dnode/vnode/meta/src/metaBDBImpl.c b/source/dnode/vnode/meta/src/metaBDBImpl.c index 9e52a7151d..3c1ccc72dc 100644 --- a/source/dnode/vnode/meta/src/metaBDBImpl.c +++ b/source/dnode/vnode/meta/src/metaBDBImpl.c @@ -20,6 +20,11 @@ #include "tcoding.h" #include "thash.h" +typedef struct { + tb_uid_t uid; + int32_t sver; +} SSchemaKey; + struct SMetaDB { // DB DB *pTbDB; @@ -39,13 +44,17 @@ static SMetaDB *metaNewDB(); static void metaFreeDB(SMetaDB *pDB); static int metaOpenBDBEnv(DB_ENV **ppEnv, const char *path); static void metaCloseBDBEnv(DB_ENV *pEnv); -static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName); +static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup); static void metaCloseBDBDb(DB *pDB); -static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf); +static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup); +static void metaCloseBDBIdx(DB *pIdx); static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey); +static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); +static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); +static void metaClearTbCfg(STbCfg *pTbCfg); #define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) @@ -67,33 +76,33 @@ int metaOpenDB(SMeta *pMeta) { } // Open DBs - if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db") < 0) { + if (metaOpenBDBDb(&(pDB->pTbDB), pDB->pEvn, "meta.db", false) < 0) { metaCloseDB(pMeta); return -1; } - if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db") < 0) { + if (metaOpenBDBDb(&(pDB->pSchemaDB), pDB->pEvn, "meta.db", false) < 0) { metaCloseDB(pMeta); return -1; } // Open Indices - if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaNameIdxCb) < 0) { + if (metaOpenBDBIdx(&(pDB->pNameIdx), pDB->pEvn, "name.index", pDB->pTbDB, &metaNameIdxCb, false) < 0) { metaCloseDB(pMeta); return -1; } - if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaStbIdxCb) < 0) { + if (metaOpenBDBIdx(&(pDB->pStbIdx), pDB->pEvn, "stb.index", pDB->pTbDB, &metaStbIdxCb, false) < 0) { metaCloseDB(pMeta); return -1; } - if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaNtbIdxCb) < 0) { + if (metaOpenBDBIdx(&(pDB->pNtbIdx), pDB->pEvn, "ntb.index", pDB->pTbDB, &metaNtbIdxCb, false) < 0) { metaCloseDB(pMeta); return -1; } - if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "index.db", pDB->pTbDB, &metaCtbIdxCb) < 0) { + if (metaOpenBDBIdx(&(pDB->pCtbIdx), pDB->pEvn, "ctb.index", pDB->pTbDB, &metaCtbIdxCb, true) < 0) { metaCloseDB(pMeta); return -1; } @@ -103,6 +112,12 @@ int metaOpenDB(SMeta *pMeta) { void metaCloseDB(SMeta *pMeta) { if (pMeta->pDB) { + metaCloseBDBIdx(pMeta->pDB->pCtbIdx); + metaCloseBDBIdx(pMeta->pDB->pNtbIdx); + metaCloseBDBIdx(pMeta->pDB->pStbIdx); + metaCloseBDBIdx(pMeta->pDB->pNameIdx); + metaCloseBDBDb(pMeta->pDB->pSchemaDB); + metaCloseBDBDb(pMeta->pDB->pTbDB); metaCloseBDBEnv(pMeta->pDB->pEvn); metaFreeDB(pMeta->pDB); pMeta->pDB = NULL; @@ -110,7 +125,60 @@ void metaCloseDB(SMeta *pMeta) { } int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { - // TODO + tb_uid_t uid; + char buf[512]; + void * pBuf; + DBT key, value; + STSchema *pSchema = NULL; + + if (pTbCfg->type == META_SUPER_TABLE) { + uid = pTbCfg->stbCfg.suid; + } else { + uid = metaGenerateUid(pMeta); + } + + { + // save table info + pBuf = buf; + memset(&key, 0, sizeof(key)); + memset(&value, 0, sizeof(key)); + + key.data = &uid; + key.size = sizeof(uid); + + metaEncodeTbInfo(&pBuf, pTbCfg); + + value.data = buf; + value.size = POINTER_DISTANCE(pBuf, buf); + value.app_data = pTbCfg; + + pMeta->pDB->pTbDB->put(pMeta->pDB->pTbDB, NULL, &key, &value, 0); + } + + // save schema + if (pTbCfg->type == META_SUPER_TABLE) { + pSchema = pTbCfg->stbCfg.pSchema; + } else if (pTbCfg->type == META_NORMAL_TABLE) { + pSchema = pTbCfg->ntbCfg.pSchema; + } + + if (pSchema) { + pBuf = buf; + memset(&key, 0, sizeof(key)); + memset(&value, 0, sizeof(key)); + SSchemaKey schemaKey = {uid, schemaVersion(pSchema)}; + + key.data = &schemaKey; + key.size = sizeof(schemaKey); + + tdEncodeSchema(&pBuf, pSchema); + + value.data = buf; + value.size = POINTER_DISTANCE(pBuf, buf); + + pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0); + } + return 0; } @@ -165,16 +233,24 @@ static void metaCloseBDBEnv(DB_ENV *pEnv) { } } -static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName) { +static int metaOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) { int ret; DB *pDB; - ret = db_create(&((pDB)), (pEnv), 0); + ret = db_create(&(pDB), pEnv, 0); if (ret != 0) { BDB_PERR("Failed to create META DB", ret); return -1; } + if (isDup) { + ret = pDB->set_flags(pDB, DB_DUPSORT); + if (ret != 0) { + BDB_PERR("Failed to set DB flags", ret); + return -1; + } + } + ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0); if (ret) { BDB_PERR("Failed to open META DB", ret); @@ -192,11 +268,11 @@ static void metaCloseBDBDb(DB *pDB) { } } -static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf) { +static int metaOpenBDBIdx(DB **ppIdx, DB_ENV *pEnv, const char *pFName, DB *pDB, bdbIdxCbPtr cbf, bool isDup) { DB *pIdx; int ret; - if (metaOpenBDBDb(ppIdx, pEnv, pFName) < 0) { + if (metaOpenBDBDb(ppIdx, pEnv, pFName, isDup) < 0) { return -1; } @@ -216,158 +292,70 @@ static void metaCloseBDBIdx(DB *pIdx) { } static int metaNameIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO + STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); + + memset(pSKey, 0, sizeof(*pSKey)); + + pSKey->data = pTbCfg->name; + pSKey->size = strlen(pTbCfg->name); + return 0; } static int metaStbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; + STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); + + if (pTbCfg->type == META_SUPER_TABLE) { + memset(pSKey, 0, sizeof(*pSKey)); + pSKey->data = pKey->data; + pSKey->size = pKey->size; + + return 0; + } else { + return DB_DONOTINDEX; + } } static int metaNtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; + STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); + + if (pTbCfg->type == META_NORMAL_TABLE) { + memset(pSKey, 0, sizeof(*pSKey)); + pSKey->data = pKey->data; + pSKey->size = pKey->size; + + return 0; + } else { + return DB_DONOTINDEX; + } } static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; -} + STbCfg *pTbCfg = (STbCfg *)(pValue->app_data); + DBT * pDbt; -#if 0 -typedef struct { - tb_uid_t uid; - int32_t sver; -} SSchemaKey; + if (pTbCfg->type == META_CHILD_TABLE) { + pDbt = calloc(2, sizeof(DBT)); + // First key is suid + pDbt[0].data = &(pTbCfg->ctbCfg.suid); + pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); -static SMetaDB *metaNewDB(); -static void metaFreeDB(SMetaDB *pDB); -static int metaCreateDBEnv(SMetaDB *pDB, const char *path); -static void metaDestroyDBEnv(SMetaDB *pDB); -static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey); -static void * metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey); -static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey); -static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey); -static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema); -static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg); -static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg); -static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg); + // Second key is the first tag + void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, 0); + pDbt[1].data = varDataVal(pTagVal); + pDbt[1].size = varDataLen(pTagVal); -#define META_ASSOCIATE_IDX(pDB, pIdx, cbf) \ - do { \ - int ret = (pDB)->associate((pDB), NULL, (pIdx), (cbf), 0); \ - if (ret != 0) { \ - P_ERROR("Failed to associate META DB", ret); \ - metaCloseDB(pMeta); \ - } \ - } while (0) + // Set index key + memset(pSKey, 0, sizeof(*pSKey)); + pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC; + pSKey->data = pDbt; + pSKey->size = 2; - -int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { - char buf[512]; - void * pBuf; - DBT key = {0}; - DBT value = {0}; - SSchemaKey schemaKey; - tb_uid_t uid; - - if (pTbCfg->type == META_SUPER_TABLE) { - // Handle SUPER table - uid = pTbCfg->stbCfg.suid; - - // Same table info - metaSaveTbInfo(pMeta->pDB->pStbDB, uid, pTbCfg); - - // save schema - metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema); - - { - // Create a super table DB and corresponding index DB - DB *pStbDB; - DB *pStbIdxDB; - - META_OPEN_DB(pStbDB, pMeta->pDB->pEvn, "meta.db"); - - META_OPEN_DB(pStbIdxDB, pMeta->pDB->pEvn, "index.db"); - - // TODO META_ASSOCIATE_IDX(); - } - } else if (pTbCfg->type == META_CHILD_TABLE) { - // Handle CHILD table - uid = metaGenerateUid(pMeta); - - DB *pCTbDB = taosHashGet(pMeta->pDB->pCtbMap, &(pTbCfg->ctbCfg.suid), sizeof(pTbCfg->ctbCfg.suid)); - if (pCTbDB == NULL) { - ASSERT(0); - } - - metaSaveTbInfo(pCTbDB, uid, pTbCfg); - - } else if (pTbCfg->type == META_NORMAL_TABLE) { - // Handle NORMAL table - uid = metaGenerateUid(pMeta); - - metaSaveTbInfo(pMeta->pDB->pNtbDB, uid, pTbCfg); - - metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema); + return 0; } else { - ASSERT(0); + return DB_DONOTINDEX; } - - return 0; -} - -int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { - // TODO -} - -/* ------------------------ STATIC METHODS ------------------------ */ -static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey) { - int tsize = 0; - - tsize += taosEncodeFixedU64(buf, pSchemaKey->uid); - tsize += taosEncodeFixedI32(buf, pSchemaKey->sver); - - return tsize; -} - -static void *metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey) { - buf = taosDecodeFixedU64(buf, &(pSchemaKey->uid)); - buf = taosDecodeFixedI32(buf, &(pSchemaKey->sver)); - - return buf; -} - -static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; -} - -static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) { - // TODO - return 0; -} - -static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) { - SSchemaKey skey; - char buf[256]; - void * pBuf = buf; - DBT key = {0}; - DBT value = {0}; - - skey.uid = uid; - skey.sver = schemaVersion(pSchema); - - key.data = &skey; - key.size = sizeof(skey); - - tdEncodeSchema(&pBuf, pSchema); - value.data = buf; - value.size = POINTER_DISTANCE(pBuf, buf); - - pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0); } static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { @@ -376,6 +364,7 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { tsize += taosEncodeString(buf, pTbCfg->name); tsize += taosEncodeFixedU32(buf, pTbCfg->ttl); tsize += taosEncodeFixedU32(buf, pTbCfg->keep); + tsize += taosEncodeFixedU8(buf, pTbCfg->type); if (pTbCfg->type == META_SUPER_TABLE) { tsize += tdEncodeSchema(buf, pTbCfg->stbCfg.pTagSchema); @@ -391,10 +380,10 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) { } static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { - // TODO buf = taosDecodeString(buf, &(pTbCfg->name)); buf = taosDecodeFixedU32(buf, &(pTbCfg->ttl)); buf = taosDecodeFixedU32(buf, &(pTbCfg->keep)); + buf = taosDecodeFixedU8(buf, &(pTbCfg->type)); if (pTbCfg->type == META_SUPER_TABLE) { buf = tdDecodeSchema(buf, &(pTbCfg->stbCfg.pTagSchema)); @@ -408,22 +397,11 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) { return buf; } -static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg) { - DBT key = {0}; - DBT value = {0}; - char buf[512]; - void *pBuf = buf; - - key.data = &uid; - key.size = sizeof(uid); - - metaEncodeTbInfo(&pBuf, pTbCfg); - - value.data = buf; - value.size = POINTER_DISTANCE(pBuf, buf); - - pDB->put(pDB, NULL, &key, &value, 0); - - return 0; -} -#endif \ No newline at end of file +static void metaClearTbCfg(STbCfg *pTbCfg) { + tfree(pTbCfg->name); + if (pTbCfg->type == META_SUPER_TABLE) { + tdFreeSchema(pTbCfg->stbCfg.pTagSchema); + } else if (pTbCfg->type == META_CHILD_TABLE) { + tfree(pTbCfg->ctbCfg.pTag); + } +} \ No newline at end of file diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 40e35306a2..403b4a9122 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -297,15 +297,16 @@ void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuil // any value greater than or equal to the number of transitions in // this node indicates an absent transition. uint8_t *index = (uint8_t *)malloc(sizeof(uint8_t) * 256); - for (uint8_t i = 0; i < 256; i++) { - index[i] = 255; - } + memset(index, 255, sizeof(uint8_t) * 256); + ///for (uint8_t i = 0; i < 256; i++) { + // index[i] = 255; + ///} for (size_t i = 0; i < sz; i++) { FstTransition *t = taosArrayGet(node->trans, i); index[t->inp] = i; - fstCountingWriterWrite(w, (char *)index, sizeof(index)); //fstPackDeltaIn(w, addr, t->addr, tSize); } + fstCountingWriterWrite(w, (char *)index, 256); free(index); } fstCountingWriterWrite(w, (char *)&packSizes, 1); @@ -478,6 +479,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { return 0; } FstSlice *slice = &node->data; + uint8_t *data = fstSliceData(slice, NULL); uint64_t at = node->start - fstStateNtransLen(s) - 1 // pack size @@ -485,7 +487,6 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) { - (i * oSizes) - oSizes; - uint8_t *data = fstSliceData(slice, NULL); return unpackUint64(data + at, oSizes); } @@ -555,6 +556,7 @@ Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, Pack uint64_t at = FST_SLICE_LEN(slice) - 1 - fstStateNtransLen(s) + - 1 // pack size - fstStateTotalTransSize(s, version, sizes, nTrans) - (nTrans * oSizes) - oSizes; @@ -587,7 +589,8 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { FstSlice t = fstSliceCopy(slice, start, end - 1); int32_t len = 0; uint8_t *data = fstSliceData(&t, &len); - for(int i = 0; i < len; i++) { + int i = 0; + for(; i < len; i++) { //uint8_t v = slice->data[slice->start + i]; ////slice->data[slice->start + i]; uint8_t v = data[i]; @@ -595,6 +598,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) { return node->nTrans - i - 1; // bug } } + if (i == len) { *null = true; } } } @@ -774,7 +778,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) { if (NULL == b) { return b; } - b->wrt = fstCountingWriterCreate(w, false); + b->wrt = fstCountingWriterCreate(w, false); b->unfinished = fstUnFinishedNodesCreate(); b->registry = fstRegistryCreate(10000, 2) ; b->last = fstSliceCreate(NULL, 0); @@ -857,6 +861,7 @@ OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) { return OutOfOrdered; } // deep copy or not + fstSliceDestroy(&b->last); b->last = fstSliceCopy(&bs, input->start, input->end); } return Ordered; @@ -1007,8 +1012,7 @@ Fst* fstCreate(FstSlice *slice) { uint64_t fstLen; len -= sizeof(fstLen); taosDecodeFixedU64(buf + len, &fstLen); - //TODO(validat root addr) - // + //TODO(validate root addr) Fst *fst= (Fst *)calloc(1, sizeof(Fst)); if (fst == NULL) { return NULL; } @@ -1023,6 +1027,7 @@ Fst* fstCreate(FstSlice *slice) { fst->meta->len = fstLen; fst->meta->checkSum = checkSum; fst->data = slice; + return fst; FST_CREAT_FAILED: @@ -1122,6 +1127,7 @@ FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) { return b; } + bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice) { int comp = fstSliceCompare(slice, &bound->data); if (bound->type == Included) { @@ -1374,7 +1380,9 @@ FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) { } void fstStreamBuilderDestroy(FstStreamBuilder *b) { fstSliceDestroy(&b->min->data); + tfree(b->min); fstSliceDestroy(&b->max->data); + tfree(b->max); free(b); } FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, RangeType type) { diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 9135a7a173..928c3875b0 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -2,13 +2,79 @@ #include #include #include "index.h" +#include "tutil.h" #include "indexInt.h" #include "index_fst.h" #include "index_fst_util.h" #include "index_fst_counting_writer.h" +class FstWriter { + public: + FstWriter() { + _b = fstBuilderCreate(NULL, 0); + } + bool Put(const std::string &key, uint64_t val) { + FstSlice skey = fstSliceCreate((uint8_t *)key.c_str(), key.size()); + bool ok = fstBuilderInsert(_b, skey, val); + fstSliceDestroy(&skey); + return ok; + } + ~FstWriter() { + fstBuilderFinish(_b); + fstBuilderDestroy(_b); + } + private: + FstBuilder *_b; +}; +class FstReadMemory { + public: + FstReadMemory(size_t size) { + _w = fstCountingWriterCreate(NULL, true); + _size = size; + memset((void *)&_s, 0, sizeof(_s)); + } + bool init() { + char *buf = (char *)calloc(1, sizeof(char) * _size); + int nRead = fstCountingWriterRead(_w, (uint8_t *)buf, _size); + if (nRead <= 0) { return false; } + _size = nRead; + _s = fstSliceCreate((uint8_t *)buf, _size); + _fst = fstCreate(&_s); + free(buf); + return _fst != NULL; + } + bool Get(const std::string &key, uint64_t *val) { + FstSlice skey = fstSliceCreate((uint8_t *)key.c_str(), key.size()); + bool ok = fstGet(_fst, &skey, val); + fstSliceDestroy(&skey); + return ok; + } + bool GetWithTimeCostUs(const std::string &key, uint64_t *val, uint64_t *elapse) { + int64_t s = taosGetTimestampUs(); + bool ok = this->Get(key, val); + int64_t e = taosGetTimestampUs(); + *elapse = e - s; + return ok; + } + // add later + bool Search(const std::string &key, std::vector &result) { + return true; + } + + ~FstReadMemory() { + fstCountingWriterDestroy(_w); + fstSliceDestroy(&_s); + } + + private: + FstCountingWriter *_w; + Fst *_fst; + FstSlice _s; + size_t _size; + +}; //TEST(IndexTest, index_create_test) { // SIndexOpts *opts = indexOptsCreate(); @@ -62,69 +128,104 @@ // // //} -int main(int argc, char** argv) { - // test write - FstBuilder *b = fstBuilderCreate(NULL, 0); - { - std::string str("aaa"); - FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); - Output val = 1; - fstBuilderInsert(b, key, val); + + +int Performance_fstWriteRecords(FstWriter *b) { + std::string str("aa"); + int L = 100, M = 100, N = 10; + for (int i = 0; i < L; i++) { + str[0] = 'a' + i; + str.resize(2); + for(int j = 0; j < M; j++) { + str[1] = 'a' + j; + str.resize(2); + for (int k = 0; k < N; k++) { + str.push_back('a'); + b->Put(str, k); + printf("(%d, %d, %d, %s)\n", i, j, k, str.c_str()); + } + } } + return L * M * N; +} - //std::string str1("bcd"); - //FstSlice key1 = fstSliceCreate((uint8_t *)str1.c_str(), str1.size()); - //Output val2 = 10; - // - - { - - for (size_t i = 1; i < 26; i++) { - std::string str("aaa"); - str[2] = 'a' + i ; - FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); - Output val = 0; - fstBuilderInsert(b, key, val); - } - - } - fstBuilderFinish(b); - fstBuilderDestroy(b); +void Performance_fstReadRecords(FstReadMemory *m) { + std::string str("a"); + for (int i = 0; i < 50; i++) { + //std::string str("aa"); + str.push_back('a'); + uint64_t out, cost; + bool ok = m->GetWithTimeCostUs(str, &out, &cost); + if (ok == true) { + printf("success to get (%s, %" PRId64"), time cost: %" PRId64")\n", str.c_str(), out, cost); + } else { + printf("failed to get(%s)\n", str.c_str()); + } + } +} +void checkFstPerf() { + FstWriter *fw = new FstWriter; + int64_t s = taosGetTimestampUs(); + int num = Performance_fstWriteRecords(fw); + int64_t e = taosGetTimestampUs(); + printf("write %d record cost %" PRId64"us\n", num, e - s); + delete fw; - - char buf[64 * 1024] = {0}; - - FstSlice s; - - FstCountingWriter *w = fstCountingWriterCreate(NULL, true); - int nRead = fstCountingWriterRead(w, (uint8_t *)buf, sizeof(buf)); - assert(nRead <= sizeof(buf)); - s = fstSliceCreate((uint8_t *)buf, nRead); - fstCountingWriterDestroy(w); - - - // test reader - - - Fst *fst = fstCreate(&s); - { - std::string str("aax"); - uint64_t out; - - - FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); - bool ok = fstGet(fst, &key, &out); - if (ok == true) { - printf("val = %d\n", out); - //indexInfo("Get key-value success, %s, %d", str.c_str(), out); + FstReadMemory *m = new FstReadMemory(1024 * 64); + if (m->init()) { + uint64_t val; + if(m->Get("aaaaaaa", &val)) { + std::cout << "succes to Get val: " << val << std::endl; } else { - //indexError("Get key-value failed, %s", str.c_str()); + std::cout << "failed to Get " << std::endl; + } + } +} + + +void validateFst() { + int val = 100; + int count = 100; + FstWriter *fw = new FstWriter; + // write + { + std::string key("ab"); + for (int i = 0; i < count; i++) { + key.push_back('a' + i); + fw->Put(key, val - i); } } - fstSliceDestroy(&s); - + delete fw; - + // read + FstReadMemory *m = new FstReadMemory(1024 * 64); + if (m->init() == false) { + std::cout << "init readMemory failed" << std::endl; + } + + { + std::string key("ab"); + uint64_t out; + if (m->Get(key, &out)) { + printf("success to get (%s, %" PRId64")\n", key.c_str(), out); + } else { + printf("failed to get(%s)\n", key.c_str()); + } + for (int i = 0; i < count; i++) { + key.push_back('a' + i); + if (m->Get(key, &out) ) { + assert(val - i == out); + printf("success to get (%s, %" PRId64")\n", key.c_str(), out); + } else { + printf("failed to get(%s)\n", key.c_str()); + } + } + } + delete m; + +} +int main(int argc, char** argv) { + checkFstPerf(); return 1; } diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index e392351366..911e8472ab 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -1523,14 +1523,14 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { } static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) { - MD5_CTX context; + T_MD5_CTX context; int ret = -1; - MD5Init(&context); - MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); - MD5Update(&context, (uint8_t *)pMsg, msgLen); - MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); - MD5Final(&context); + tMD5Init(&context); + tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); + tMD5Update(&context, (uint8_t *)pMsg, msgLen); + tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); + tMD5Final(&context); if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; @@ -1538,13 +1538,13 @@ static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) { } static void rpcBuildAuthHead(void *pMsg, int msgLen, void *pAuth, void *pKey) { - MD5_CTX context; + T_MD5_CTX context; - MD5Init(&context); - MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); - MD5Update(&context, (uint8_t *)pMsg, msgLen); - MD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); - MD5Final(&context); + tMD5Init(&context); + tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); + tMD5Update(&context, (uint8_t *)pMsg, msgLen); + tMD5Update(&context, (uint8_t *)pKey, TSDB_KEY_LEN); + tMD5Final(&context); memcpy(pAuth, context.digest, sizeof(context.digest)); } diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index e5697415f1..4d2dd97c87 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -8,6 +8,11 @@ target_include_directories( target_link_libraries( wal + PUBLIC cjson PUBLIC os PUBLIC util ) + +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 285d7e2576..f56f240904 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -23,9 +23,73 @@ extern "C" { #endif -int walGetFile(SWal* pWal, int32_t version); +//meta section begin +typedef struct WalFileInfo { + int64_t firstVer; + int64_t lastVer; + int64_t createTs; + int64_t closeTs; + int64_t fileSize; +} WalFileInfo; + +static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { + WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; + WalFileInfo* pInfoRight = (WalFileInfo*)pRight; + return compareInt64Val(&pInfoLeft->firstVer, &pInfoRight->firstVer); +} + +static inline int64_t walGetLastFileSize(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return pInfo->fileSize; +} + +static inline int64_t walGetLastFileFirstVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileFirstVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileLastVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileOffset(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); + return pInfo->fileSize; +} + +static inline bool walCurFileClosed(SWal* pWal) { + return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; +} + +static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { + return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); +} + +static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { + return sprintf(buf, "%s/%" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer); +} + +static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) { + return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); +} + +int walReadMeta(SWal* pWal); +int walWriteMeta(SWal* pWal); +int walRollFileInfo(SWal* pWal); + +char* walMetaSerialize(SWal* pWal); +int walMetaDeserialize(SWal* pWal, const char* bytes); +//meta section end int64_t walGetSeq(); +int walSeekVer(SWal *pWal, int64_t ver); +int walRoll(SWal *pWal); #ifdef __cplusplus } diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index 1aa64b34b5..bf51be4346 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -23,27 +23,27 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int code = 0; - int64_t idxTfd = pWal->curIdxTfd; - int64_t logTfd = pWal->curLogTfd; + int64_t idxTfd = pWal->writeIdxTfd; + int64_t logTfd = pWal->writeLogTfd; //seek position - int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE; + int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; code = tfLseek(idxTfd, offset, SEEK_SET); if(code != 0) { - + return -1; } int64_t readBuf[2]; code = tfRead(idxTfd, readBuf, sizeof(readBuf)); if(code != 0) { - + return -1; } //TODO:deserialize ASSERT(readBuf[0] == ver); code = tfLseek(logTfd, readBuf[1], SEEK_CUR); if (code != 0) { - + return -1; } - pWal->curLogOffset = readBuf[1]; + /*pWal->curLogOffset = readBuf[1];*/ pWal->curVersion = ver; return code; } @@ -52,43 +52,43 @@ static int walChangeFile(SWal *pWal, int64_t ver) { int code = 0; int64_t idxTfd, logTfd; char fnameStr[WAL_FILE_LEN]; - code = tfClose(pWal->curLogTfd); + code = tfClose(pWal->writeLogTfd); if(code != 0) { //TODO } - code = tfClose(pWal->curIdxTfd); + code = tfClose(pWal->writeIdxTfd); if(code != 0) { //TODO } + WalFileInfo tmpInfo; + tmpInfo.firstVer = ver; //bsearch in fileSet - int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); + WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); - int64_t fname = *pRet; - if(fname < pWal->lastFileName) { + int64_t fileFirstVer = pRet->firstVer; + //closed + if(taosArrayGetLast(pWal->fileInfoSet) != pRet) { pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE; - pWal->curFileLastVersion = pRet[1]-1; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenRead(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenRead(fnameStr); } else { pWal->curStatus |= WAL_CUR_FILE_WRITABLE; - pWal->curFileLastVersion = -1; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenReadWrite(fnameStr); } - pWal->curFileFirstVersion = fname; - pWal->curLogTfd = logTfd; - pWal->curIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; return code; } int walSeekVer(SWal *pWal, int64_t ver) { - if((!(pWal->curStatus & WAL_CUR_FAILED)) - && ver == pWal->curVersion) { + int code; + if((!(pWal->curStatus & WAL_CUR_FAILED)) && ver == pWal->curVersion) { return 0; } if(ver > pWal->lastVersion) { @@ -102,11 +102,16 @@ int walSeekVer(SWal *pWal, int64_t ver) { if(ver < pWal->snapshotVersion) { //TODO: seek snapshotted log, invalid in some cases } - if(ver < pWal->curFileFirstVersion || - (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { - walChangeFile(pWal, ver); + if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { + code = walChangeFile(pWal, ver); + if(code != 0) { + return -1; + } } - walSeekFilePos(pWal, ver); - + code = walSeekFilePos(pWal, ver); + if(code != 0) { + return -1; + } + return 0; } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c new file mode 100644 index 0000000000..65085bb96d --- /dev/null +++ b/source/libs/wal/src/walMeta.c @@ -0,0 +1,219 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tref.h" +#include "tfile.h" +#include "cJSON.h" +#include "walInt.h" + +#include +#include + +int walRollFileInfo(SWal* pWal) { + int64_t ts = taosGetTimestampSec(); + + SArray* pArray = pWal->fileInfoSet; + if(taosArrayGetSize(pArray) != 0) { + WalFileInfo *pInfo = taosArrayGetLast(pArray); + pInfo->lastVer = pWal->lastVersion; + pInfo->closeTs = ts; + } + + WalFileInfo *pNewInfo = malloc(sizeof(WalFileInfo)); + if(pNewInfo == NULL) { + return -1; + } + pNewInfo->firstVer = pWal->lastVersion + 1; + pNewInfo->lastVer = -1; + pNewInfo->createTs = ts; + pNewInfo->closeTs = -1; + pNewInfo->fileSize = 0; + taosArrayPush(pWal->fileInfoSet, pNewInfo); + return 0; +} + +char* walMetaSerialize(SWal* pWal) { + char buf[30]; + if(pWal == NULL || pWal->fileInfoSet == NULL) return 0; + int sz = pWal->fileInfoSet->size; + cJSON* pRoot = cJSON_CreateObject(); + cJSON* pMeta = cJSON_CreateObject(); + cJSON* pFiles = cJSON_CreateArray(); + cJSON* pField; + if(pRoot == NULL || pMeta == NULL || pFiles == NULL) { + //TODO + return NULL; + } + cJSON_AddItemToObject(pRoot, "meta", pMeta); + sprintf(buf, "%" PRId64, pWal->firstVersion); + cJSON_AddStringToObject(pMeta, "firstVer", buf); + sprintf(buf, "%" PRId64, pWal->snapshotVersion); + cJSON_AddStringToObject(pMeta, "snapshotVer", buf); + sprintf(buf, "%" PRId64, pWal->commitVersion); + cJSON_AddStringToObject(pMeta, "commitVer", buf); + sprintf(buf, "%" PRId64, pWal->lastVersion); + cJSON_AddStringToObject(pMeta, "lastVer", buf); + + cJSON_AddItemToObject(pRoot, "files", pFiles); + WalFileInfo* pData = pWal->fileInfoSet->pData; + for(int i = 0; i < sz; i++) { + WalFileInfo* pInfo = &pData[i]; + cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject()); + if(pField == NULL) { + cJSON_Delete(pRoot); + return NULL; + } + //cjson only support int32_t or double + //string are used to prohibit the loss of precision + sprintf(buf, "%" PRId64, pInfo->firstVer); + cJSON_AddStringToObject(pField, "firstVer", buf); + sprintf(buf, "%" PRId64, pInfo->lastVer); + cJSON_AddStringToObject(pField, "lastVer", buf); + sprintf(buf, "%" PRId64, pInfo->createTs); + cJSON_AddStringToObject(pField, "createTs", buf); + sprintf(buf, "%" PRId64, pInfo->closeTs); + cJSON_AddStringToObject(pField, "closeTs", buf); + sprintf(buf, "%" PRId64, pInfo->fileSize); + cJSON_AddStringToObject(pField, "fileSize", buf); + } + return cJSON_Print(pRoot); +} + +int walMetaDeserialize(SWal* pWal, const char* bytes) { + ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0); + cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField; + pRoot = cJSON_Parse(bytes); + pMeta = cJSON_GetObjectItem(pRoot, "meta"); + pField = cJSON_GetObjectItem(pMeta, "firstVer"); + pWal->firstVersion = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); + pWal->snapshotVersion = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pMeta, "commitVer"); + pWal->commitVersion = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pMeta, "lastVer"); + pWal->lastVersion = atoll(cJSON_GetStringValue(pField)); + + pFiles = cJSON_GetObjectItem(pRoot, "files"); + int sz = cJSON_GetArraySize(pFiles); + //deserialize + SArray* pArray = taosArrayInit(sz, sizeof(WalFileInfo)); + WalFileInfo *pData = pArray->pData; + for(int i = 0; i < sz; i++) { + cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); + WalFileInfo* pInfo = &pData[i]; + pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); + pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "lastVer"); + pInfo->lastVer = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "createTs"); + pInfo->createTs = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "closeTs"); + pInfo->closeTs = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "fileSize"); + pInfo->fileSize = atoll(cJSON_GetStringValue(pField)); + } + taosArraySetSize(pArray, sz); + pWal->fileInfoSet = pArray; + return 0; +} + +static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { + return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); +} + +static int walFindCurMetaVer(SWal* pWal) { + const char * pattern = "^meta-ver[0-9]+$"; + regex_t walMetaRegexPattern; + regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED); + + DIR *dir = opendir(pWal->path); + if(dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent* ent; + + //find existing meta-ver[x].json + int metaVer = -1; + while((ent = readdir(dir)) != NULL) { + char *name = basename(ent->d_name); + int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0); + if(code == 0) { + sscanf(name, "meta-ver%d", &metaVer); + break; + } + } + return metaVer; +} + +int walWriteMeta(SWal* pWal) { + int metaVer = walFindCurMetaVer(pWal); + char fnameStr[WAL_FILE_LEN]; + walBuildMetaName(pWal, metaVer+1, fnameStr); + int metaTfd = tfOpenCreateWrite(fnameStr); + if(metaTfd < 0) { + return -1; + } + char* serialized = walMetaSerialize(pWal); + int len = strlen(serialized); + if(len != tfWrite(metaTfd, serialized, len)) { + //TODO:clean file + return -1; + } + + tfClose(metaTfd); + //delete old file + if(metaVer > -1) { + walBuildMetaName(pWal, metaVer, fnameStr); + remove(fnameStr); + } + return 0; +} + +int walReadMeta(SWal* pWal) { + ASSERT(pWal->fileInfoSet->size == 0); + //find existing meta file + int metaVer = walFindCurMetaVer(pWal); + if(metaVer == -1) { + return 0; + } + char fnameStr[WAL_FILE_LEN]; + walBuildMetaName(pWal, metaVer, fnameStr); + //read metafile + struct stat statbuf; + stat(fnameStr, &statbuf); + int size = statbuf.st_size; + char* buf = malloc(size + 5); + if(buf == NULL) { + return -1; + } + int tfd = tfOpenRead(fnameStr); + if(tfRead(tfd, buf, size) != size) { + free(buf); + return -1; + } + //load into fileInfoSet + int code = walMetaDeserialize(pWal, buf); + if(code != 0) { + free(buf); + return -1; + } + free(buf); + return 0; +} diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index bc2e687069..6bf9008917 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -48,9 +48,15 @@ int32_t walInit() { int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1); if(old == 1) return 0; + int code = tfInit(); + if(code != 0) { + wError("failed to init tfile since %s", tstrerror(code)); + atomic_store_8(&tsWal.inited, 0); + return code; + } tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); - int code = walCreateThread(); + code = walCreateThread(); if (code != 0) { wError("failed to init wal module since %s", tstrerror(code)); atomic_store_8(&tsWal.inited, 0); @@ -64,43 +70,31 @@ int32_t walInit() { void walCleanUp() { walStopThread(); taosCloseRef(tsWal.refSetId); + atomic_store_8(&tsWal.inited, 0); wInfo("wal module is cleaned up"); } -static int walLoadFileset(SWal *pWal) { - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent* ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - name[WAL_NOSUFFIX_LEN] = 0; - //validate file name by regex matching - if(1 /* TODO:regex match */) { - int64_t fnameInt64 = atoll(name); - taosArrayPush(pWal->fileSet, &fnameInt64); - } - } - taosArraySort(pWal->fileSet, compareInt64Val); - return 0; -} - SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *pWal = malloc(sizeof(SWal)); if (pWal == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } + pWal->writeLogTfd = -1; + pWal->writeIdxTfd = -1; + //set config pWal->vgId = pCfg->vgId; - pWal->curLogTfd = -1; - pWal->curIdxTfd = -1; - pWal->level = pCfg->walLevel; pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->rollPeriod = pCfg->rollPeriod; + pWal->segSize = pCfg->segSize; + pWal->level = pCfg->walLevel; + //init status + pWal->lastVersion = -1; + pWal->lastRollSeq = -1; + + //init write buffer memset(&pWal->head, 0, sizeof(SWalHead)); pWal->head.sver = 0; @@ -120,7 +114,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { walFreeObj(pWal); return NULL; } - walLoadFileset(pWal); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); @@ -151,10 +144,10 @@ void walClose(SWal *pWal) { if (pWal == NULL) return; pthread_mutex_lock(&pWal->mutex); - tfClose(pWal->curLogTfd); - tfClose(pWal->curIdxTfd); - taosArrayDestroy(pWal->fileSet); - pWal->fileSet = NULL; + tfClose(pWal->writeLogTfd); + tfClose(pWal->writeIdxTfd); + /*taosArrayDestroy(pWal->fileInfoSet);*/ + /*pWal->fileInfoSet = NULL;*/ pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refSetId, pWal->refId); } @@ -164,8 +157,8 @@ static int32_t walInitObj(SWal *pWal) { wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } - pWal->fileSet = taosArrayInit(0, sizeof(int64_t)); - if(pWal->fileSet == NULL) { + pWal->fileInfoSet = taosArrayInit(0, sizeof(WalFileInfo)); + if(pWal->fileInfoSet == NULL) { wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } @@ -178,10 +171,10 @@ static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); - tfClose(pWal->curLogTfd); - tfClose(pWal->curIdxTfd); - taosArrayDestroy(pWal->fileSet); - pWal->fileSet = NULL; + tfClose(pWal->writeLogTfd); + tfClose(pWal->writeIdxTfd); + taosArrayDestroy(pWal->fileInfoSet); + pWal->fileInfoSet = NULL; pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } @@ -208,9 +201,9 @@ static void walFsyncAll() { while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); - int32_t code = tfFsync(pWal->curLogTfd); + int32_t code = tfFsync(pWal->writeLogTfd); if (code != 0) { - wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code)); + wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); } } pWal = taosIterateRef(tsWal.refSetId, pWal->refId); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b475183b7b..e9f5bcbc5d 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -13,16 +13,56 @@ * along with this program. If not, see . */ -#include "wal.h" +#include "walInt.h" +#include "tfile.h" #include "tchecksum.h" -static int walValidateChecksum(SWalHead *pHead, void* body, int64_t bodyLen) { - return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead) && - taosCheckChecksum(body, bodyLen, pHead->cksumBody); +static inline int walValidHeadCksum(SWalHead* pHead) { + return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead); } +static inline int walValidBodyCksum(SWalHead* pHead) { + return taosCheckChecksum((uint8_t*)pHead->cont, pHead->len, pHead->cksumBody); +} + +static int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { + return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); +} int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { + int code; + code = walSeekVer(pWal, ver); + if(code != 0) { + return code; + } + if(*ppHead == NULL) { + void* ptr = realloc(*ppHead, sizeof(SWalHead)); + if(ptr == NULL) { + return -1; + } + *ppHead = ptr; + } + if(tfRead(pWal->writeLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) { + return -1; + } + //TODO: endian compatibility processing after read + if(walValidHeadCksum(*ppHead) != 0) { + return -1; + } + void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->len); + if(ptr == NULL) { + free(*ppHead); + *ppHead = NULL; + return -1; + } + if(tfRead(pWal->writeLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { + return -1; + } + //TODO: endian compatibility processing after read + if(walValidBodyCksum(*ppHead) != 0) { + return -1; + } + return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 69c83a9912..0c4989300f 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,29 +21,63 @@ #include "tfile.h" #include "walInt.h" +static void walFtruncate(SWal *pWal, int64_t ver); + int32_t walCommit(SWal *pWal, int64_t ver) { + ASSERT(pWal->snapshotVersion <= pWal->commitVersion); + ASSERT(pWal->commitVersion <= pWal->lastVersion); + ASSERT(ver >= pWal->commitVersion); + ASSERT(ver <= pWal->lastVersion); + pWal->commitVersion = ver; return 0; } int32_t walRollback(SWal *pWal, int64_t ver) { //TODO: ftruncate + ASSERT(ver > pWal->commitVersion); + ASSERT(ver <= pWal->lastVersion); + //seek position + walSeekVer(pWal, ver); + walFtruncate(pWal, ver); return 0; } int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { pWal->snapshotVersion = ver; + int ts = taosGetTimestampSec(); + int deleteCnt = 0; + int64_t newTotSize = pWal->totSize; + WalFileInfo tmp; + tmp.firstVer = ver; //mark files safe to delete - int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); - if(pRet != pWal->fileSet->pData) { - //delete files until less than retention size - - //find first file that exceeds retention time - + WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + //iterate files, until the searched result + for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { + if(pWal->totSize > pWal->retentionSize || + iter->closeTs + pWal->retentionPeriod > ts) { + //delete according to file size or close time + deleteCnt++; + newTotSize -= iter->fileSize; + } + } + char fnameStr[WAL_FILE_LEN]; + //remove file + for(int i = 0; i < deleteCnt; i++) { + WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); + remove(fnameStr); + walBuildIdxName(pWal, pInfo->firstVer, fnameStr); + remove(fnameStr); } - //delete files living longer than retention limit - //remove file from fileset + //save snapshot ver, commit ver + + + //make new array, remove files + taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); + pWal->totSize = newTotSize; + return 0; } @@ -138,105 +172,122 @@ void walRemoveAllOldFiles(void *handle) { } #endif -static int walRoll(SWal *pWal) { +int walRoll(SWal *pWal) { int code = 0; - code = tfClose(pWal->curIdxTfd); - if(code != 0) { - return code; + if(pWal->writeIdxTfd != -1) { + code = tfClose(pWal->writeIdxTfd); + if(code != 0) { + return -1; + } } - code = tfClose(pWal->curLogTfd); - if(code != 0) { - return code; + if(pWal->writeLogTfd != -1) { + code = tfClose(pWal->writeLogTfd); + if(code != 0) { + return -1; + } } int64_t idxTfd, logTfd; //create new file int64_t newFileFirstVersion = pWal->lastVersion + 1; char fnameStr[WAL_FILE_LEN]; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion); + walBuildIdxName(pWal, newFileFirstVersion, fnameStr); idxTfd = tfOpenCreateWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion); + if(idxTfd < 0) { + ASSERT(0); + return -1; + } + walBuildLogName(pWal, newFileFirstVersion, fnameStr); logTfd = tfOpenCreateWrite(fnameStr); + if(logTfd < 0) { + ASSERT(0); + return -1; + } + code = walRollFileInfo(pWal); + if(code != 0) { + ASSERT(0); + return -1; + } - taosArrayPush(pWal->fileSet, &newFileFirstVersion); - //switch file - pWal->curIdxTfd = idxTfd; - pWal->curLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; //change status - pWal->curFileLastVersion = -1; - pWal->curFileFirstVersion = newFileFirstVersion; - pWal->curVersion = newFileFirstVersion; - pWal->curLogOffset = 0; pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; - pWal->lastFileName = newFileFirstVersion; - pWal->lastFileWriteSize = 0; pWal->lastRollSeq = walGetSeq(); return 0; } int walChangeFileToLast(SWal *pWal) { int64_t idxTfd, logTfd; - int64_t* pRet = taosArrayGetLast(pWal->fileSet); + WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); ASSERT(pRet != NULL); - int64_t fname = *pRet; + int64_t fileFirstVer = pRet->firstVer; char fnameStr[WAL_FILE_LEN]; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + if(idxTfd < 0) { + return -1; + } + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenReadWrite(fnameStr); + if(logTfd < 0) { + return -1; + } //switch file - pWal->curIdxTfd = idxTfd; - pWal->curLogTfd = logTfd; + pWal->writeIdxTfd = idxTfd; + pWal->writeLogTfd = logTfd; //change status - pWal->curFileLastVersion = -1; - pWal->curFileFirstVersion = fname; - pWal->curVersion = fname; - pWal->curLogOffset = 0; + pWal->curVersion = fileFirstVer; pWal->curStatus = WAL_CUR_FILE_WRITABLE; return 0; } -int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { +static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { int code = 0; //get index file - if(!tfValid(pWal->curIdxTfd)) { + if(!tfValid(pWal->writeIdxTfd)) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + + wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + return code; } int64_t writeBuf[2] = { ver, offset }; - int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf)); + int size = tfWrite(pWal->writeIdxTfd, writeBuf, sizeof(writeBuf)); if(size != sizeof(writeBuf)) { - //TODO: + return -1; } return 0; } -int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) { +int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) { if (pWal == NULL) return -1; + int code = 0; // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; if (index == pWal->lastVersion + 1) { - int64_t passed = walGetSeq() - pWal->lastRollSeq; - if(passed > pWal->rollPeriod) { - walRoll(pWal); - } else if(pWal->lastFileWriteSize > pWal->segSize) { - walRoll(pWal); + if(taosArrayGetSize(pWal->fileInfoSet) == 0) { + code = walRoll(pWal); + ASSERT(code == 0); } else { - walChangeFileToLast(pWal); + int64_t passed = walGetSeq() - pWal->lastRollSeq; + if(pWal->rollPeriod != -1 && passed > pWal->rollPeriod) { + walRoll(pWal); + } else if(pWal->segSize != -1 && walGetLastFileSize(pWal) > pWal->segSize) { + walRoll(pWal); + } } } else { //reject skip log or rewrite log //must truncate explicitly first return -1; } - if (!tfValid(pWal->curLogTfd)) return 0; + /*if (!tfValid(pWal->curLogTfd)) return 0;*/ pWal->head.version = index; - int32_t code = 0; pWal->head.signature = WAL_SIGNATURE; pWal->head.len = bodyLen; @@ -247,22 +298,27 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t pthread_mutex_lock(&pWal->mutex); - if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { + if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } - if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) { + if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + } + code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); + if(code != 0) { + //TODO } - walWriteIndex(pWal, index, pWal->curLogOffset); - pWal->curLogOffset += sizeof(SWalHead) + bodyLen; //set status pWal->lastVersion = index; + pWal->totSize += sizeof(SWalHead) + bodyLen; + walGetCurFileInfo(pWal)->lastVer = index; + walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; pthread_mutex_unlock(&pWal->mutex); @@ -270,12 +326,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t } void walFsync(SWal *pWal, bool forceFsync) { - if (pWal == NULL || !tfValid(pWal->curLogTfd)) return; + if (pWal == NULL || !tfValid(pWal->writeLogTfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { - wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, pWal->curFileFirstVersion); - if (tfFsync(pWal->curLogTfd) < 0) { - wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); + if (tfFsync(pWal->writeLogTfd) < 0) { + wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); } } } @@ -348,8 +404,36 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { } #endif -static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) { - tfFtruncate(tfd, offset); +static int walValidateOffset(SWal* pWal, int64_t ver) { + int code = 0; + SWalHead *pHead = NULL; + code = (int)walRead(pWal, &pHead, ver); + if(pHead->version != ver) { + return -1; + } + return 0; +} + +static int64_t walGetOffset(SWal* pWal, int64_t ver) { + int code = walSeekVer(pWal, ver); + if(code != 0) { + return -1; + } + + code = walValidateOffset(pWal, ver); + if(code != 0) { + return -1; + } + + return 0; +} + +static void walFtruncate(SWal *pWal, int64_t ver) { + int64_t tfd = pWal->writeLogTfd; + tfFtruncate(tfd, ver); + tfFsync(tfd); + tfd = pWal->writeIdxTfd; + tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE); tfFsync(tfd); } diff --git a/source/libs/wal/test/CMakeLists.txt b/source/libs/wal/test/CMakeLists.txt new file mode 100644 index 0000000000..1c0a3a162a --- /dev/null +++ b/source/libs/wal/test/CMakeLists.txt @@ -0,0 +1,20 @@ +add_executable(walTest "") +target_sources(walTest + PRIVATE + "walMetaTest.cpp" +) +target_include_directories(walTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/wal" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + +target_link_libraries(walTest + wal + gtest_main +) +enable_testing() +add_test( + NAME wal_test + COMMAND walTest +) diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp new file mode 100644 index 0000000000..96258662a1 --- /dev/null +++ b/source/libs/wal/test/walMetaTest.cpp @@ -0,0 +1,149 @@ +#include +#include +#include +#include + +#include "walInt.h" + +class WalCleanEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(); + ASSERT(code == 0); + } + + static void TearDownTestCase() { + walCleanUp(); + } + + void SetUp() override { + taosRemoveDir(pathName); + SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->rollPeriod = -1; + pCfg->segSize = -1; + pCfg->walLevel = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = "/tmp/wal_test"; +}; + +class WalKeepEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(); + ASSERT(code == 0); + } + + static void TearDownTestCase() { + walCleanUp(); + } + + void SetUp() override { + SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->rollPeriod = -1; + pCfg->segSize = -1; + pCfg->walLevel = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = "/tmp/wal_test"; +}; + +TEST_F(WalCleanEnv, createNew) { + walRollFileInfo(pWal); + ASSERT(pWal->fileInfoSet != NULL); + ASSERT_EQ(pWal->fileInfoSet->size, 1); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + ASSERT_EQ(pInfo->firstVer, 0); + ASSERT_EQ(pInfo->lastVer, -1); + ASSERT_EQ(pInfo->closeTs, -1); + ASSERT_EQ(pInfo->fileSize, 0); +} + +TEST_F(WalCleanEnv, serialize) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + char*ss = walMetaSerialize(pWal); + printf("%s\n", ss); + code = walWriteMeta(pWal); + ASSERT(code == 0); +} + +TEST_F(WalCleanEnv, removeOldMeta) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + code = walWriteMeta(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walWriteMeta(pWal); + ASSERT(code == 0); +} + +TEST_F(WalKeepEnv, readOldMeta) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walWriteMeta(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walWriteMeta(pWal); + ASSERT(code == 0); + char*oldss = walMetaSerialize(pWal); + + TearDown(); + SetUp(); + code = walReadMeta(pWal); + ASSERT(code == 0); + char* newss = walMetaSerialize(pWal); + + int len = strlen(oldss); + ASSERT_EQ(len, strlen(newss)); + for(int i = 0; i < len; i++) { + EXPECT_EQ(oldss[i], newss[i]); + } +} + +TEST_F(WalKeepEnv, write) { + const char* ranStr = "tvapq02tcp"; + const int len = strlen(ranStr); + int code; + for(int i = 0; i < 10; i++) { + code = walWrite(pWal, i, i+1, (void*)ranStr, len); + ASSERT_EQ(code, 0); + code = walWrite(pWal, i+2, i, (void*)ranStr, len); + ASSERT_EQ(code, -1); + } + code = walWriteMeta(pWal); + ASSERT_EQ(code, 0); +} diff --git a/source/libs/wal/test/walTests.cpp b/source/libs/wal/test/walTests.cpp deleted file mode 100644 index 505728fbe4..0000000000 --- a/source/libs/wal/test/walTests.cpp +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -//#define _DEFAULT_SOURCE -#include "os.h" -#include "tutil.h" -#include "tglobal.h" -#include "tlog.h" -#include "twal.h" -#include "tfile.h" - -int64_t ver = 0; -void *pWal = NULL; - -int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { - // do nothing - SWalHead *pHead = data; - - if (pHead->version > ver) - ver = pHead->version; - - walWrite(pWal, pHead); - - return 0; -} - -int main(int argc, char *argv[]) { - char path[128] = "/tmp/wal"; - int level = 2; - int total = 5; - int rows = 10000; - int size = 128; - int keep = 0; - - for (int i=1; iversion = ++ver; - pHead->len = size; - walWrite(pWal, pHead); - } - - printf("renew a wal, i:%d\n", i); - walRenew(pWal); - } - - printf("%d wal files are written\n", total); - - int64_t index = 0; - char name[256]; - - while (1) { - int code = walGetWalFile(pWal, name, &index); - if (code == -1) { - printf("failed to get wal file, index:%" PRId64 "\n", index); - break; - } - - printf("index:%" PRId64 " wal:%s\n", index, name); - if (code == 0) break; - } - - getchar(); - - walClose(pWal); - walCleanUp(); - tfCleanup(); - - return 0; -} diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 007ce06829..ff52477a6f 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -237,6 +237,16 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) { memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize); } +void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { + assert(cnt <= pArray->size); + pArray->size = pArray->size - cnt; + if(pArray->size == 0) { + pArray->size = 0; + return; + } + memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size); +} + void taosArrayRemove(SArray* pArray, size_t index) { assert(index < pArray->size); diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 5d4789aae6..313f1d97af 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -22,20 +22,26 @@ static int32_t tsFileRsetId = -1; +static int8_t tfInited = 0; + static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); } int32_t tfInit() { + int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1); + if(old == 1) return 0; tsFileRsetId = taosOpenRef(2000, tfCloseFile); if (tsFileRsetId > 0) { return 0; } else { + atomic_store_8(&tfInited, 0); return -1; } } void tfCleanup() { + atomic_store_8(&tfInited, 0); if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); tsFileRsetId = -1; } diff --git a/source/util/src/tmd5.c b/source/util/src/tmd5.c index 9cc4b3b9d5..807f3c8122 100644 --- a/source/util/src/tmd5.c +++ b/source/util/src/tmd5.c @@ -84,8 +84,8 @@ static uint8_t PADDING[64] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x /* The routine MD5Init initializes the message-digest context mdContext. All fields are set to zero. */ -void MD5Init(MD5_CTX *mdContext) { - memset(mdContext, 0, sizeof(MD5_CTX)); +void tMD5Init(T_MD5_CTX *mdContext) { + memset(mdContext, 0, sizeof(T_MD5_CTX)); /* Load magic initialization constants. */ mdContext->buf[0] = (uint32_t)0x67452301; @@ -98,7 +98,7 @@ void MD5Init(MD5_CTX *mdContext) { account for the presence of each of the characters inBuf[0..inLen-1] in the message whose digest is being computed. */ -void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) { +void tMD5Update(T_MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) { uint32_t in[16]; int mdi; unsigned int i, ii; @@ -129,7 +129,7 @@ void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) { /* The routine MD5Final terminates the message-digest computation and ends with the desired message digest in mdContext->digest[0...15]. */ -void MD5Final(MD5_CTX *mdContext) { +void tMD5Final(T_MD5_CTX *mdContext) { uint32_t in[16]; int mdi; unsigned int i, ii; @@ -144,7 +144,7 @@ void MD5Final(MD5_CTX *mdContext) { /* pad out to 56 mod 64 */ padLen = (mdi < 56) ? (56 - mdi) : (120 - mdi); - MD5Update(mdContext, PADDING, padLen); + tMD5Update(mdContext, PADDING, padLen); /* append length in bits and transform */ for (i = 0, ii = 0; i < 14; i++, ii += 4)