From f9d83c044ddfa316aae92d1f624840e74d29e23f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 10 Jan 2022 23:42:58 +0800 Subject: [PATCH 1/8] profile performance --- source/dnode/vnode/meta/src/metaIdx.c | 10 +-- source/libs/index/src/index.c | 76 ++++++++++--------- source/libs/index/src/index_cache.c | 22 +++++- source/libs/index/src/index_fst.c | 1 + .../index/src/index_fst_counting_writer.c | 14 +++- source/libs/index/src/index_tfile.c | 16 +++- 6 files changed, 85 insertions(+), 54 deletions(-) diff --git a/source/dnode/vnode/meta/src/metaIdx.c b/source/dnode/vnode/meta/src/metaIdx.c index 828bd12088..3da56fc394 100644 --- a/source/dnode/vnode/meta/src/metaIdx.c +++ b/source/dnode/vnode/meta/src/metaIdx.c @@ -49,9 +49,7 @@ int metaOpenIdx(SMeta *pMeta) { #ifdef USE_INVERTED_INDEX SIndexOpts opts; - if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) { - return -1; - } + if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) { return -1; } #endif return 0; @@ -67,16 +65,14 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */ #ifdef USE_INVERTED_INDEX SIndexOpts opts; - if (indexClose(pMeta->pIdx->pIdx) != 0) { - return -1; - } + if (indexClose(pMeta->pIdx->pIdx) != 0) { return -1; } #endif } int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) { #ifdef USE_INVERTED_INDEX - if (pTbCfgs - type == META_CHILD_TABLE) { + if (pTbCfgs->type == META_CHILD_TABLE) { char buf[8] = {0}; int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId; sprintf(buf, "%d", colId); // colname diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 19e9375491..0c222eae1a 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -59,6 +59,10 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); +// merge cache and tfile by opera type +static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); +static void indexMergeSameKey(SArray* result, TFileValue* tv); + int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { pthread_once(&isInit, indexInit); SIndex* sIdx = calloc(1, sizeof(SIndex)); @@ -385,6 +389,27 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { taosArrayPush(result, &tv); } } +static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv) { + // opt + char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; + // design merge-algorithm later, too complicated to handle all kind of situation + TFileValue* tfv = tfileValueCreate(colVal); + if (cv != NULL) { + if (cv->type == ADD_VALUE) { + taosArrayAddAll(tfv->tableId, cv->val); + } else if (cv->type == DEL_VALUE) { + } else if (cv->type == UPDATE_VALUE) { + } else { + // do nothing + } + } + if (tv != NULL) { + // opt later + taosArrayAddAll(tfv->tableId, tv->val); + } + + indexMergeSameKey(result, tfv); +} static void indexDestroyTempResult(SArray* result) { int32_t sz = result ? taosArrayGetSize(result) : 0; for (size_t i = 0; i < sz; i++) { @@ -411,51 +436,30 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; - while (cn == true && tn == true) { - IterateValue* cv = cacheIter->getValue(cacheIter); - IterateValue* tv = tfileIter->getValue(tfileIter); + while (cn == true || tn == true) { + IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; + IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; - // dump value - int comp = strcmp(cv->colVal, tv->colVal); + int comp = 0; + if (cn == true && tn == true) { + comp = strcmp(cv->colVal, tv->colVal); + } else if (cn == true) { + comp = -1; + } else { + comp = 1; + } if (comp == 0) { - TFileValue* tfv = tfileValueCreate(cv->colVal); - taosArrayAddAll(tfv->tableId, cv->val); - taosArrayAddAll(tfv->tableId, tv->val); - indexMergeSameKey(result, tfv); - + indexMergeCacheAndTFile(result, cv, tv); cn = cacheIter->next(cacheIter); tn = tfileIter->next(tfileIter); - continue; } else if (comp < 0) { - TFileValue* tfv = tfileValueCreate(cv->colVal); - taosArrayAddAll(tfv->tableId, cv->val); - - indexMergeSameKey(result, tfv); - // copy to final Result; + indexMergeCacheAndTFile(result, cv, NULL); cn = cacheIter->next(cacheIter); } else { - TFileValue* tfv = tfileValueCreate(tv->colVal); - taosArrayAddAll(tfv->tableId, tv->val); - - indexMergeSameKey(result, tfv); - // copy to final result + indexMergeCacheAndTFile(result, NULL, tv); tn = tfileIter->next(tfileIter); } } - while (cn == true) { - IterateValue* cv = cacheIter->getValue(cacheIter); - TFileValue* tfv = tfileValueCreate(cv->colVal); - taosArrayAddAll(tfv->tableId, cv->val); - indexMergeSameKey(result, tfv); - cn = cacheIter->next(cacheIter); - } - while (tn == true) { - IterateValue* tv = tfileIter->getValue(tfileIter); - TFileValue* tfv = tfileValueCreate(tv->colVal); - taosArrayAddAll(tfv->tableId, tv->val); - indexMergeSameKey(result, tfv); - tn = tfileIter->next(tfileIter); - } int ret = indexGenTFile(sIdx, pCache, result); indexDestroyTempResult(result); @@ -503,7 +507,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { tfileWriterClose(tw); TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName); - if (reader == NULL) { goto END; } + if (reader == NULL) { return -1; } TFileHeader* header = &reader->header; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 294c8192e8..48566a8674 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -217,9 +217,9 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) { // set value ct->uid = uid; ct->operaType = term->operType; - // ugly code, refactor later int64_t estimate = sizeof(ct) + strlen(ct->colVal); + pthread_mutex_lock(&pCache->mtx); pCache->occupiedMem += estimate; indexCacheMakeRoomForWrite(pCache); @@ -331,7 +331,6 @@ static char* indexCacheTermGet(const void* pData) { static int32_t indexCacheTermCompare(const void* l, const void* r) { CacheTerm* lt = (CacheTerm*)l; CacheTerm* rt = (CacheTerm*)r; - // compare colVal int32_t cmp = strcmp(lt->colVal, rt->colVal); if (cmp == 0) { return rt->version - lt->version; } @@ -359,17 +358,32 @@ static bool indexCacheIteratorNext(Iterate* itera) { IterateValue* iv = &itera->val; iterateValueDestroy(iv, false); + // IterateValue* iv = &itera->val; + // IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))}; + bool next = tSkipListIterNext(iter); if (next) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); + // equal func + // if (iv->colVal != NULL && ct->colVal != NULL) { + // if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) } + //} else { + // tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1); + // tIterval.colVal = tstrdup(ct->colVal); + //} iv->type = ct->operaType; - iv->colVal = calloc(1, strlen(ct->colVal) + 1); - memcpy(iv->colVal, ct->colVal, strlen(ct->colVal)); + iv->colVal = tstrdup(ct->colVal); + // iv->colVal = calloc(1, strlen(ct->colVal) + 1); + // memcpy(iv->colVal, ct->colVal, strlen(ct->colVal)); taosArrayPush(iv->val, &ct->uid); } + // IterateValue* iv = &itera->val; + // iterateValueDestroy(iv, true); + //*iv = tIterVal; + return next; } diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 4f782cef26..5299a7dc5f 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -936,6 +936,7 @@ Fst* fstCreate(FstSlice* slice) { len -= sizeof(checkSum); taosDecodeFixedU32(buf + len, &checkSum); if (taosCheckChecksum(buf, len, checkSum)) { + indexError("index file is corrupted"); // verify fst return NULL; } diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 6db5555aa6..b57f639726 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -60,9 +60,10 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off return nRead; } static int writeCtxGetSize(WriterCtx* ctx) { - if (ctx->type == TFile && ctx->file.readOnly) { - // refactor later - return ctx->file.size; + if (ctx->type == TFile) { + struct stat fstat; + stat(ctx->file.buf, &fstat); + return fstat.st_size; } return 0; } @@ -88,7 +89,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int if (readOnly == false) { // ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); ctx->file.fd = tfOpenCreateWriteAppend(path); - + tfFtruncate(ctx->file.fd, 0); struct stat fstat; stat(path, &fstat); ctx->file.size = fstat.st_size; @@ -138,6 +139,11 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) { munmap(ctx->file.ptr, ctx->file.size); #endif } + if (ctx->file.readOnly == false) { + struct stat fstat; + stat(ctx->file.buf, &fstat); + // indexError("write file size: %d", (int)(fstat.st_size)); + } if (remove) { unlink(ctx->file.buf); } } free(ctx); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 4b76402560..98fede4f7b 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -147,21 +147,22 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { reader->ctx = ctx; if (0 != tfileReaderVerify(reader)) { - tfileReaderDestroy(reader); indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); + tfileReaderDestroy(reader); return NULL; } // T_REF_INC(reader); if (0 != tfileReaderLoadHeader(reader)) { - tfileReaderDestroy(reader); indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); + tfileReaderDestroy(reader); return NULL; } if (0 != tfileReaderLoadFst(reader)) { + indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s, errno: %d", reader->header.suid, + reader->header.colName, errno); tfileReaderDestroy(reader); - indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName); return NULL; } @@ -303,6 +304,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { } else { // indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, // (int)taosArrayGetSize(v->tableId)); + + // indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx)); } } fstBuilderFinish(tw->fb); @@ -485,7 +488,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) { static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) { int32_t fstOffset = offset + sizeof(tw->header.fstOffset); tw->header.fstOffset = fstOffset; + if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; } + indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx)); tw->offset += sizeof(fstOffset); return 0; } @@ -495,8 +500,11 @@ static int tfileWriteHeader(TFileWriter* writer) { TFileHeader* header = &writer->header; memcpy(buf, (char*)header, sizeof(buf)); + indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx)); int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf)); if (sizeof(buf) != nwrite) { return -1; } + + indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx)); writer->offset = nwrite; return 0; } @@ -521,6 +529,8 @@ static int tfileWriteFooter(TFileWriter* write) { void* pBuf = (void*)buf; taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber); int nwrite = write->ctx->write(write->ctx, buf, strlen(buf)); + + indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx)); assert(nwrite == sizeof(tfileMagicNumber)); return nwrite; } From f4bb5d06c82f807a8074b8b1af3003ec524338a7 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 10 Jan 2022 17:33:25 -0800 Subject: [PATCH 2/8] minor changes --- source/dnode/mnode/impl/src/mndDb.c | 5 +- source/dnode/mnode/impl/src/mndTrans.c | 4 +- source/dnode/mnode/impl/src/mndVgroup.c | 14 ++--- source/dnode/mnode/impl/test/db/db.cpp | 19 ++---- tests/script/jenkins/basic.txt | 9 +-- tests/script/{general => sim}/db/basic1.sim | 0 tests/script/sim/db/error1.sim | 61 +++++++++++++++++++ tests/script/{unique => sim}/dnode/basic1.sim | 0 .../script/{general => sim}/table/basic1.sim | 0 tests/script/{general => sim}/user/basic1.sim | 0 10 files changed, 82 insertions(+), 30 deletions(-) rename tests/script/{general => sim}/db/basic1.sim (100%) create mode 100644 tests/script/sim/db/error1.sim rename tests/script/{unique => sim}/dnode/basic1.sim (100%) rename tests/script/{general => sim}/table/basic1.sim (100%) rename tests/script/{general => sim}/user/basic1.sim (100%) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index f2f8931aa1..85b7fbbb42 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -463,7 +463,7 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) { pCreate->commitTime = htonl(pCreate->commitTime); pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod); - mDebug("db:%s, start to create", pCreate->db); + mDebug("db:%s, start to create, vgroups:%d", pCreate->db, pCreate->numOfVgroups); SDbObj *pDb = mndAcquireDb(pMnode, pCreate->db); if (pDb != NULL) { @@ -476,6 +476,9 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) { mError("db:%s, failed to create since %s", pCreate->db, terrstr()); return -1; } + } else if (terrno != TSDB_CODE_MND_DB_NOT_EXIST) { + mError("db:%s, failed to create since %s", pCreate->db, terrstr()); + return -1; } SUserObj *pOperUser = mndAcquireUser(pMnode, pReq->user); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 4a42133ce3..058178260f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -809,7 +809,7 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, stage from undoLog to rollback", pTrans->id); continueExec = true; } else { - mDebug("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr()); + mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr()); continueExec = false; } @@ -825,7 +825,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); continueExec = true; } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); + mError("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); continueExec = false; } else { pTrans->failedTimes++; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 4b7b370371..e0d6d3dd42 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -273,15 +273,10 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2 SDnodeObj *pDnode = pObj; SArray *pArray = p1; - pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); - int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - if (online && pDnode->numOfSupportVnodes > 0) { - taosArrayPush(pArray, pDnode); - } - - bool isMnode = mndIsMnode(pMnode, pDnode->id); + bool isMnode = mndIsMnode(pMnode, pDnode->id); + pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id); mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes, pDnode->numOfSupportVnodes, isMnode, online); @@ -290,6 +285,9 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2 pDnode->numOfVnodes++; } + if (online && pDnode->numOfSupportVnodes > 0) { + taosArrayPush(pArray, pDnode); + } return true; } @@ -311,7 +309,7 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) { static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) { float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes; float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes; - return d1Score > d2Score ? 1 : 0; + return d1Score >= d2Score ? 1 : 0; } static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index 4f0ba9b0e7..5d5947b644 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -13,28 +13,17 @@ class MndTestDb : public ::testing::Test { protected: - static void SetUpTestSuite() { - test.Init("/tmp/mnode_test_db", 9030); - const char* fqdn = "localhost"; - const char* firstEp = "localhost:9030"; + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_db", 9030); } + static void TearDownTestSuite() { test.Cleanup(); } - server2.Start("/tmp/mnode_test_db2", fqdn, 9031, firstEp); - } - static void TearDownTestSuite() { - server2.Stop(); - test.Cleanup(); - } - - static Testbase test; - static TestServer server2; + static Testbase test; public: void SetUp() override {} void TearDown() override {} }; -Testbase MndTestDb::test; -TestServer MndTestDb::server2; +Testbase MndTestDb::test; TEST_F(MndTestDb, 01_ShowDb) { test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index f960abb4e0..bc0c3a4f64 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -2,15 +2,16 @@ #======================b1-start=============== # ---- user -./test.sh -f general/user/basic1.sim +./test.sh -f sim/user/basic1.sim # ---- db -./test.sh -f general/db/basic1.sim +./test.sh -f sim/db/basic1.sim +./test.sh -f sim/db/error1.sim # ---- table -./test.sh -f general/table/basic1.sim +./test.sh -f sim/table/basic1.sim # ---- dnode -./test.sh -f unique/dnode/basic1.sim +./test.sh -f sim/dnode/basic1.sim #======================b1-end=============== diff --git a/tests/script/general/db/basic1.sim b/tests/script/sim/db/basic1.sim similarity index 100% rename from tests/script/general/db/basic1.sim rename to tests/script/sim/db/basic1.sim diff --git a/tests/script/sim/db/error1.sim b/tests/script/sim/db/error1.sim new file mode 100644 index 0000000000..6698ca6f67 --- /dev/null +++ b/tests/script/sim/db/error1.sim @@ -0,0 +1,61 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +system sh/deploy.sh -n dnode2 -i 2 +system sh/exec.sh -n dnode2 -s start +sql connect + +print ========== create dnodes +sql create dnode $hostname port 7200 + +$x = 0 +create1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql show dnodes +if $data4_2 != ready then + goto create1 +endi + +print ========== stop dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGKILL + +print =============== create database +sql_error create database d1 vgroups 4 + +print ========== start dnode2 +system sh/exec.sh -n dnode2 -s start + +print =============== re-create database +$x = 0 +re-create1: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql create database d1 vgroups 2 -x re-create1 + +sql show databases +if $rows != 1 then + return -1 +endi + +if $data00 != d1 then + return -1 +endi + +if $data02 != 2 then + return -1 +endi + +if $data03 != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/script/unique/dnode/basic1.sim b/tests/script/sim/dnode/basic1.sim similarity index 100% rename from tests/script/unique/dnode/basic1.sim rename to tests/script/sim/dnode/basic1.sim diff --git a/tests/script/general/table/basic1.sim b/tests/script/sim/table/basic1.sim similarity index 100% rename from tests/script/general/table/basic1.sim rename to tests/script/sim/table/basic1.sim diff --git a/tests/script/general/user/basic1.sim b/tests/script/sim/user/basic1.sim similarity index 100% rename from tests/script/general/user/basic1.sim rename to tests/script/sim/user/basic1.sim From 1d5a97cffb1eabc5022b36aac3b16eab077af413 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 10 Jan 2022 18:48:19 -0800 Subject: [PATCH 3/8] minor changes --- source/dnode/mnode/impl/src/mndStb.c | 6 ++-- source/dnode/mnode/impl/src/mndTrans.c | 2 +- tests/script/sim/db/error1.sim | 40 +++++++++++++++++++++++++- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7d77e29d74..ad8c16f826 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -204,7 +204,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) { SSdb *pSdb = pMnode->pSdb; SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName); - if (pStb == NULL) { + if (pStb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_STB_NOT_EXIST; } return pStb; @@ -513,9 +513,11 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) { return 0; } else { terrno = TSDB_CODE_MND_STB_ALREADY_EXIST; - mError("db:%s, failed to create since %s", pCreate->name, terrstr()); + mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); return -1; } + } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { + mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); } // topic should have different name with stb diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 058178260f..2301df65d7 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -825,7 +825,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); continueExec = true; } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mError("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); + mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); continueExec = false; } else { pTrans->failedTimes++; diff --git a/tests/script/sim/db/error1.sim b/tests/script/sim/db/error1.sim index 6698ca6f67..bf9e04c017 100644 --- a/tests/script/sim/db/error1.sim +++ b/tests/script/sim/db/error1.sim @@ -58,4 +58,42 @@ if $data03 != 0 then return -1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +print ========== stop dnode2 +system sh/exec.sh -n dnode2 -s stop -x SIGKILL + +print =============== create database +sql_error drop database d1 + +print ========== start dnode2 +system sh/exec.sh -n dnode2 -s start + +print =============== re-create database +$x = 0 +re-create2: + $x = $x + 1 + sleep 1000 + if $x == 10 then + return -1 + endi + +sql create database d1 vgroups 5 -x re-create2 + +sql show databases +if $rows != 1 then + return -1 +endi + +if $data00 != d1 then + return -1 +endi + +if $data02 != 5 then + return -1 +endi + +if $data03 != 0 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file From 553cc16cbd32de3b33efbe4bb5edcf959dfe8573 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 10 Jan 2022 18:53:25 -0800 Subject: [PATCH 4/8] rename --- tests/script/{general/db/basic.sim => sim/db/basic6.sim} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/script/{general/db/basic.sim => sim/db/basic6.sim} (100%) diff --git a/tests/script/general/db/basic.sim b/tests/script/sim/db/basic6.sim similarity index 100% rename from tests/script/general/db/basic.sim rename to tests/script/sim/db/basic6.sim From 47f1558e7c4c503cb2ccc7b0b22fbb44909e737b Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 11 Jan 2022 15:40:10 +0800 Subject: [PATCH 5/8] add wal handle meta corrupt --- include/libs/wal/wal.h | 2 + source/libs/wal/inc/walInt.h | 25 +++- source/libs/wal/src/walMeta.c | 180 +++++++++++++++++++++++---- source/libs/wal/src/walSeek.c | 76 ++++++----- source/libs/wal/src/walWrite.c | 25 ++-- source/libs/wal/test/walMetaTest.cpp | 95 ++++++++++++++ source/util/src/tarray.c | 2 +- source/util/test/arrayTest.cpp | 32 +++++ 8 files changed, 361 insertions(+), 76 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 67d2009d3b..51aaa7d903 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -71,6 +71,7 @@ extern int32_t wDebugFlag; #define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (WAL_PATH_LEN + 32) +#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_CUR_FAILED 1 @@ -98,6 +99,7 @@ typedef struct { } SWalCfg; typedef struct { + uint64_t magic; uint32_t cksumHead; uint32_t cksumBody; SWalReadHead head; diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 7631593dd8..4624e05f10 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -17,10 +17,10 @@ #define _TD_WAL_INT_H_ #include "compare.h" -#include "tchecksum.h" -#include "wal.h" - #include "taoserror.h" +#include "tchecksum.h" +#include "tcoding.h" +#include "wal.h" #ifdef __cplusplus extern "C" { @@ -40,6 +40,19 @@ typedef struct WalIdxEntry { int64_t offset; } SWalIdxEntry; +static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) { + int tlen; + tlen += taosEncodeFixedI64(buf, pIdxEntry->ver); + tlen += taosEncodeFixedI64(buf, pIdxEntry->offset); + return 0; +} + +static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) { + buf = taosDecodeFixedI64(buf, &pIdxEntry->ver); + buf = taosDecodeFixedI64(buf, &pIdxEntry->offset); + return buf; +} + static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { SWalFileInfo* pInfoLeft = (SWalFileInfo*)pLeft; SWalFileInfo* pInfoRight = (SWalFileInfo*)pRight; @@ -130,12 +143,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes); // meta section end // seek section -int walChangeFile(SWal* pWal, int64_t ver); -int walChangeFileToLast(SWal* pWal); +int walChangeWrite(SWal* pWal, int64_t ver); +int walSetWrite(SWal* pWal); // seek section end int64_t walGetSeq(); -int walSeekVer(SWal* pWal, int64_t ver); +int walSeekWriteVer(SWal* pWal, int64_t ver); int walRoll(SWal* pWal); #ifdef __cplusplus diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 270a26bf80..ab3aa02f4a 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -17,7 +17,6 @@ #include "cJSON.h" #include "os.h" #include "taoserror.h" -#include "tfile.h" #include "tref.h" #include "walInt.h" @@ -34,13 +33,74 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } +static inline int64_t walScanLogGetLastVer(SWal* pWal) { + ASSERT(pWal->fileInfoSet != NULL); + int sz = taosArrayGetSize(pWal->fileInfoSet); + ASSERT(sz > 0); + for (int i = 0; i < sz; i++) { + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i); + + } + SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1); + char fnameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); + + struct stat statbuf; + stat(fnameStr, &statbuf); + int readSize = MIN(WAL_MAX_SIZE, statbuf.st_size); + + FileFd fd = taosOpenFileRead(fnameStr); + if (fd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + uint64_t magic = WAL_MAGIC; + + char* buf = malloc(readSize + 5); + if (buf == NULL) { + taosCloseFile(fd); + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; + return -1; + } + + if (readSize != taosReadFile(fd, buf, readSize)) { + free(buf); + taosCloseFile(fd); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + char* found = strstr(buf, (const char*)&magic); + if (found == NULL) { + ASSERT(false); + // file has to be deleted + free(buf); + taosCloseFile(fd); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } + char *another; + while((another = strstr(found + 1, (const char*)&magic)) != NULL) { + // read and validate + SWalHead *logContent = (SWalHead*)another; + if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { + found = another; + } + } + taosCloseFile(fd); + SWalHead *lastEntry = (SWalHead*)found; + + return lastEntry->head.version; +} + int walCheckAndRepairMeta(SWal* pWal) { // load log files, get first/snapshot/last version info const char* logPattern = "^[0-9]+.log$"; const char* idxPattern = "^[0-9]+.idx$"; regex_t logRegPattern; regex_t idxRegPattern; - SArray* pLogArray = taosArrayInit(8, sizeof(int64_t)); + SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo)); regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); @@ -51,19 +111,84 @@ int walCheckAndRepairMeta(SWal* pWal) { return -1; } + // scan log files and build new meta struct dirent* ent; while ((ent = readdir(dir)) != NULL) { char* name = basename(ent->d_name); int code = regexec(&logRegPattern, name, 0, NULL, 0); if (code == 0) { - int64_t firstVer; - sscanf(name, "%" PRId64 ".log", &firstVer); - taosArrayPush(pLogArray, &firstVer); + SWalFileInfo fileInfo; + memset(&fileInfo, -1, sizeof(SWalFileInfo)); + sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer); + FileFd fd = taosOpenFileRead(ent->d_name); + //get lastVer + //get size + taosArrayPush(pLogInfoArray, &fileInfo); } } - // load meta - // if not match, or meta missing + regfree(&logRegPattern); + regfree(&idxRegPattern); + + taosArraySort(pLogInfoArray, compareWalFileInfo); + int oldSz = 0; + if (pWal->fileInfoSet) { + oldSz = taosArrayGetSize(pWal->fileInfoSet); + } + int newSz = taosArrayGetSize(pLogInfoArray); + // case 1. meta file not exist / cannot be parsed + if (pWal->fileInfoSet == NULL && newSz != 0) { + // recover fileInfo set + pWal->fileInfoSet = pLogInfoArray; + if (newSz != 0) { + // recover meta version + pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer; + pWal->writeCur = newSz - 1; + } + // recover file size + } else if (oldSz < newSz) { + for (int i = oldSz; i < newSz; i++) { + SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i); + taosArrayPush(pWal->fileInfoSet, pFileInfo); + } + pWal->writeCur = newSz - 1; + } + + if (pWal->fileInfoSet && taosArrayGetSize(pWal->fileInfoSet) != 0) { + pWal->vers.lastVer = walScanLogGetLastVer(pWal); + ASSERT(pWal->vers.lastVer != -1); + } + + // case 2. versions in meta not match log + // or some log not included in meta + // (e.g. program killed) + // + // case 3. other corrupt cases + // +#if 0 + int sz = taosArrayGetSize(pLogInfoArray); + for (int i = 0; i < sz; i++) { + SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i); + if (i == 0 && pFileInfo->firstVer != walGetFirstVer(pWal)) { + //repair + } + + if (i > 0) { + SWalFileInfo* pLastFileInfo = taosArrayGet(pLogInfoArray, i-1); + if (pLastFileInfo->lastVer != pFileInfo->firstVer) { + + } + } + } +#endif + + int code = walSaveMeta(pWal); + if (code < 0) { + return -1; + } + + // get last version of this file + // // rebuild meta return 0; } @@ -87,6 +212,7 @@ int walRollFileInfo(SWal* pWal) { // TODO: change to emplace back SWalFileInfo* pNewInfo = malloc(sizeof(SWalFileInfo)); if (pNewInfo == NULL) { + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } pNewInfo->firstVer = pWal->vers.lastVer + 1; @@ -94,7 +220,7 @@ int walRollFileInfo(SWal* pWal) { pNewInfo->createTs = ts; pNewInfo->closeTs = -1; pNewInfo->fileSize = 0; - taosArrayPush(pWal->fileInfoSet, pNewInfo); + taosArrayPush(pArray, pNewInfo); free(pNewInfo); return 0; } @@ -108,7 +234,16 @@ char* walMetaSerialize(SWal* pWal) { cJSON* pFiles = cJSON_CreateArray(); cJSON* pField; if (pRoot == NULL || pMeta == NULL || pFiles == NULL) { - // TODO + if(pRoot) { + cJSON_Delete(pRoot); + } + if(pMeta) { + cJSON_Delete(pMeta); + } + if(pFiles) { + cJSON_Delete(pFiles); + } + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return NULL; } cJSON_AddItemToObject(pRoot, "meta", pMeta); @@ -221,18 +356,18 @@ int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer + 1, fnameStr); - int metaTfd = tfOpenCreateWrite(fnameStr); - if (metaTfd < 0) { + FileFd metaFd = taosOpenFileCreateWrite(fnameStr); + if (metaFd < 0) { return -1; } char* serialized = walMetaSerialize(pWal); int len = strlen(serialized); - if (len != tfWrite(metaTfd, serialized, len)) { + if (len != taosWriteFile(metaFd, serialized, len)) { // TODO:clean file return -1; } - tfClose(metaTfd); + taosCloseFile(metaFd); // delete old file if (metaVer > -1) { walBuildMetaName(pWal, metaVer, fnameStr); @@ -247,7 +382,7 @@ int walLoadMeta(SWal* pWal) { // find existing meta file int metaVer = walFindCurMetaVer(pWal); if (metaVer == -1) { - return 0; + return -1; } char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer, fnameStr); @@ -257,23 +392,20 @@ int walLoadMeta(SWal* pWal) { int size = statbuf.st_size; char* buf = malloc(size + 5); if (buf == NULL) { + terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } memset(buf, 0, size + 5); - int tfd = tfOpenRead(fnameStr); - if (tfRead(tfd, buf, size) != size) { - tfClose(tfd); + FileFd fd = taosOpenFileRead(fnameStr); + if (taosReadFile(fd, buf, size) != size) { + terrno = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(fd); free(buf); return -1; } // load into fileInfoSet int code = walMetaDeserialize(pWal, buf); - if (code != 0) { - tfClose(tfd); - free(buf); - return -1; - } - tfClose(tfd); + taosCloseFile(fd); free(buf); - return 0; + return code; } diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 1d9f7bdf4d..91b172444b 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -20,7 +20,7 @@ #include "tref.h" #include "walInt.h" -static int walSeekFilePos(SWal* pWal, int64_t ver) { +static int walSeekWritePos(SWal* pWal, int64_t ver) { int code = 0; int64_t idxTfd = pWal->writeIdxTfd; @@ -41,7 +41,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { return -1; } ASSERT(entry.ver == ver); - code = tfLseek(logTfd, entry.offset, SEEK_CUR); + code = tfLseek(logTfd, entry.offset, SEEK_SET); if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -49,7 +49,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { return code; } -int walChangeFileToLast(SWal* pWal) { +int walSetWrite(SWal* pWal) { int64_t idxTfd, logTfd; SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); ASSERT(pRet != NULL); @@ -57,13 +57,13 @@ int walChangeFileToLast(SWal* pWal) { char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, fileFirstVer, fnameStr); - idxTfd = tfOpenReadWrite(fnameStr); + idxTfd = tfOpenCreateWriteAppend(fnameStr); if (idxTfd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } walBuildLogName(pWal, fileFirstVer, fnameStr); - logTfd = tfOpenReadWrite(fnameStr); + logTfd = tfOpenCreateWriteAppend(fnameStr); if (logTfd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -74,46 +74,57 @@ int walChangeFileToLast(SWal* pWal) { return 0; } -int walChangeFile(SWal* pWal, int64_t ver) { +int walChangeWrite(SWal* pWal, int64_t ver) { int code = 0; int64_t idxTfd, logTfd; char fnameStr[WAL_FILE_LEN]; - code = tfClose(pWal->writeLogTfd); - if (code != 0) { - // TODO - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + if (pWal->writeLogTfd != -1) { + code = tfClose(pWal->writeLogTfd); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } } - code = tfClose(pWal->writeIdxTfd); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + if (pWal->writeIdxTfd != -1) { + code = tfClose(pWal->writeIdxTfd); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } } + SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; // bsearch in fileSet - SWalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - ASSERT(pRet != NULL); - int64_t fileFirstVer = pRet->firstVer; - // closed - if (taosArrayGetLast(pWal->fileInfoSet) != pRet) { - walBuildIdxName(pWal, fileFirstVer, fnameStr); - idxTfd = tfOpenRead(fnameStr); - walBuildLogName(pWal, fileFirstVer, fnameStr); - logTfd = tfOpenRead(fnameStr); - } else { - walBuildIdxName(pWal, fileFirstVer, fnameStr); - idxTfd = tfOpenReadWrite(fnameStr); - walBuildLogName(pWal, fileFirstVer, fnameStr); - logTfd = tfOpenReadWrite(fnameStr); + int32_t idx = taosArraySearchIdx(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(idx != -1); + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, idx); + /*ASSERT(pFileInfo != NULL);*/ + + int64_t fileFirstVer = pFileInfo->firstVer; + walBuildIdxName(pWal, fileFirstVer, fnameStr); + idxTfd = tfOpenCreateWriteAppend(fnameStr); + if (idxTfd < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + pWal->writeIdxTfd = -1; + return -1; + } + walBuildLogName(pWal, fileFirstVer, fnameStr); + logTfd = tfOpenCreateWriteAppend(fnameStr); + if (logTfd < 0) { + tfClose(idxTfd); + terrno = TAOS_SYSTEM_ERROR(errno); + pWal->writeLogTfd = -1; + return -1; } pWal->writeLogTfd = logTfd; pWal->writeIdxTfd = idxTfd; + pWal->writeCur = idx; return fileFirstVer; } -int walSeekVer(SWal* pWal, int64_t ver) { +int walSeekWriteVer(SWal* pWal, int64_t ver) { int code; if (ver == pWal->vers.lastVer) { return 0; @@ -123,14 +134,15 @@ int walSeekVer(SWal* pWal, int64_t ver) { return -1; } if (ver < pWal->vers.snapshotVer) { + } if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { - code = walChangeFile(pWal, ver); + code = walChangeWrite(pWal, ver); if (code != 0) { return -1; } } - code = walSeekFilePos(pWal, ver); + code = walSeekWritePos(pWal, ver); if (code != 0) { return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 975f232e3d..2bc328b4e2 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -46,12 +46,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // find correct file if (ver < walGetLastFileFirstVer(pWal)) { - // close current files - tfClose(pWal->writeIdxTfd); - tfClose(pWal->writeLogTfd); - // open old files - code = walChangeFile(pWal, ver); - if (code != 0) { + // change current files + code = walChangeWrite(pWal, ver); + if (code < 0) { return -1; } @@ -166,7 +163,8 @@ int32_t walEndSnapshot(SWal *pWal) { } // iterate files, until the searched result for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) { + if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize) + || (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) { // delete according to file size or close time deleteCnt++; newTotSize -= iter->fileSize; @@ -191,13 +189,12 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; } pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; - ; pWal->totSize = newTotSize; pWal->vers.verInSnapshotting = -1; // save snapshot ver, commit ver int code = walSaveMeta(pWal); - if (code != 0) { + if (code < 0) { return -1; } @@ -225,18 +222,17 @@ int walRoll(SWal *pWal) { walBuildIdxName(pWal, newFileFirstVersion, fnameStr); idxTfd = tfOpenCreateWriteAppend(fnameStr); if (idxTfd < 0) { - ASSERT(0); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } walBuildLogName(pWal, newFileFirstVersion, fnameStr); logTfd = tfOpenCreateWriteAppend(fnameStr); if (logTfd < 0) { - ASSERT(0); + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } code = walRollFileInfo(pWal); if (code != 0) { - ASSERT(0); return -1; } @@ -291,8 +287,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ASSERT(pWal->writeCur >= 0); pthread_mutex_lock(&pWal->mutex); + if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) { - walChangeFileToLast(pWal); + walSetWrite(pWal); + tfLseek(pWal->writeLogTfd, 0, SEEK_END); + tfLseek(pWal->writeIdxTfd, 0, SEEK_END); } pWal->writeHead.head.version = index; diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index cd082a3a43..a95c75b11d 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -107,6 +107,43 @@ class WalKeepEnv : public ::testing::Test { const char* pathName = "/tmp/wal_test"; }; +class WalRetentionEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(); + ASSERT(code == 0); + } + + static void TearDownTestCase() { walCleanUp(); } + + void walResetEnv() { + TearDown(); + taosRemoveDir(pathName); + SetUp(); + } + + void SetUp() override { + SWalCfg cfg; + cfg.rollPeriod = -1, + cfg.segSize = -1, + cfg.retentionPeriod = -1, + cfg.retentionSize = 0, + cfg.rollPeriod = 0, + cfg.vgId = 0, + cfg.level = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, &cfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = "/tmp/wal_test"; +}; + TEST_F(WalCleanEnv, createNew) { walRollFileInfo(pWal); ASSERT(pWal->fileInfoSet != NULL); @@ -283,3 +320,61 @@ TEST_F(WalKeepEnv, readHandleRead) { } } } + +TEST_F(WalRetentionEnv, repairMeta1) { + walResetEnv(); + int code; + + int i; + for (i = 0; i < 100; i++) { + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, i); + int len = strlen(newStr); + code = walWrite(pWal, i, 0, newStr, len); + ASSERT_EQ(code, 0); + } + + TearDown(); + + //getchar(); + char buf[100]; + sprintf(buf, "%s/meta-ver%d", pathName, 0); + remove(buf); + SetUp(); + + ASSERT_EQ(pWal->vers.lastVer, 99); + + SWalReadHandle* pRead = walOpenReadHandle(pWal); + ASSERT(pRead != NULL); + + for (int i = 0; i < 1000; i++) { + int ver = rand() % 100; + code = walReadWithHandle(pRead, ver); + ASSERT_EQ(code, 0); + + // printf("rrbody: \n"); + // for(int i = 0; i < pRead->pHead->head.len; i++) { + // printf("%d ", pRead->pHead->head.body[i]); + //} + // printf("\n"); + + ASSERT_EQ(pRead->pHead->head.version, ver); + ASSERT_EQ(pRead->curVersion, ver + 1); + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, ver); + int len = strlen(newStr); + ASSERT_EQ(pRead->pHead->head.len, len); + for (int j = 0; j < len; j++) { + EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); + } + } + + for (i = 100; i < 200; i++) { + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, i); + int len = strlen(newStr); + code = walWrite(pWal, i, 0, newStr, len); + ASSERT_EQ(code, 0); + } + +} diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index cc8d6646b6..09aeff5ff3 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -342,7 +342,7 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) { void* item = taosArraySearch(pArray, key, comparFn, flags); - return (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize; + return item == NULL ? -1 : (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize; } void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) { diff --git a/source/util/test/arrayTest.cpp b/source/util/test/arrayTest.cpp index 94b08ca6d7..939d4a701d 100644 --- a/source/util/test/arrayTest.cpp +++ b/source/util/test/arrayTest.cpp @@ -4,6 +4,7 @@ #include #include "tarray.h" +#include "tcompare.h" namespace { @@ -48,3 +49,34 @@ static void remove_batch_test() { TEST(arrayTest, array_list_test) { remove_batch_test(); } + +TEST(arrayTest, array_search_test) { + SArray *pa = (SArray*) taosArrayInit(4, sizeof(int32_t)); + + for(int32_t i = 10; i < 20; ++i) { + int32_t a = i; + taosArrayPush(pa, &a); + } + + for(int i = 0; i < 30; i++) { + int32_t k = i; + int32_t* pRet = (int32_t*)taosArraySearch(pa, &k, compareInt32Val, TD_GE); + int32_t idx = taosArraySearchIdx(pa, &k, compareInt32Val, TD_GE); + + if(pRet == NULL) { + ASSERT_EQ(idx, -1); + } else { + ASSERT_EQ(taosArrayGet(pa, idx), pRet); + } + + pRet = (int32_t*)taosArraySearch(pa, &k, compareInt32Val, TD_LE); + idx = taosArraySearchIdx(pa, &k, compareInt32Val, TD_LE); + + if(pRet == NULL) { + ASSERT_EQ(idx, -1); + } else { + ASSERT_EQ(taosArrayGet(pa, idx), pRet); + } + + } +} From e3ee88244e8fd08a0f90bf499613112ff425d3f3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 10 Jan 2022 23:42:25 -0800 Subject: [PATCH 6/8] TD-12868 --- source/common/src/tglobal.c | 2 +- tests/script/sh/deploy.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 9a20fadbfb..463b107aa7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -37,7 +37,7 @@ uint16_t tsServerPort = 6030; int32_t tsStatusInterval = 1; // second int8_t tsEnableTelemetryReporting = 0; char tsEmail[TSDB_FQDN_LEN] = {0}; -int32_t tsNumOfSupportVnodes = 16; +int32_t tsNumOfSupportVnodes = 128; // common int32_t tsRpcTimer = 300; diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index fcc11ca213..e00363b28f 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -120,7 +120,7 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG echo "fqdn ${HOSTNAME}" >> $TAOS_CFG echo "serverPort ${NODE}" >> $TAOS_CFG -echo "supportVnodes 16" >> $TAOS_CFG +echo "supportVnodes 128" >> $TAOS_CFG echo "dataDir $DATA_DIR" >> $TAOS_CFG echo "logDir $LOG_DIR" >> $TAOS_CFG echo "debugFlag 0" >> $TAOS_CFG From c22de779ac3956c267bcd9e913c015f072665359 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 11 Jan 2022 16:55:40 +0800 Subject: [PATCH 7/8] add tmemmem --- source/libs/wal/src/walMeta.c | 52 ++++++++++++++++++++-------- source/libs/wal/src/walMgmt.c | 1 + source/libs/wal/test/walMetaTest.cpp | 3 ++ 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index ab3aa02f4a..1c08f88dc6 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "cJSON.h" #include "os.h" #include "taoserror.h" @@ -33,6 +32,24 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } +void* tmemmem(char* haystack, int hlen, char* needle, int nlen) { + char* limit; + + if (nlen == 0 || hlen < nlen) { + return false; + } + + limit = haystack + hlen - nlen + 1; + while ((haystack = (char*)memchr( + haystack, needle[0], limit - haystack)) != NULL) { + if (memcmp(haystack, needle, nlen) == 0) { + return haystack; + } + haystack++; + } + return NULL; +} + static inline int64_t walScanLogGetLastVer(SWal* pWal) { ASSERT(pWal->fileInfoSet != NULL); int sz = taosArrayGetSize(pWal->fileInfoSet); @@ -47,7 +64,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { struct stat statbuf; stat(fnameStr, &statbuf); - int readSize = MIN(WAL_MAX_SIZE, statbuf.st_size); + int readSize = MIN(WAL_MAX_SIZE + 2, statbuf.st_size); FileFd fd = taosOpenFileRead(fnameStr); if (fd < 0) { @@ -64,6 +81,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { return -1; } + taosLSeekFile(fd, -readSize, SEEK_END); if (readSize != taosReadFile(fd, buf, readSize)) { free(buf); taosCloseFile(fd); @@ -71,21 +89,25 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { return -1; } - char* found = strstr(buf, (const char*)&magic); - if (found == NULL) { - ASSERT(false); - // file has to be deleted - free(buf); - taosCloseFile(fd); - terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - return -1; - } - char *another; - while((another = strstr(found + 1, (const char*)&magic)) != NULL) { + char* haystack = buf; + char* found = NULL; + char *candidate = NULL; + while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { // read and validate - SWalHead *logContent = (SWalHead*)another; + SWalHead *logContent = (SWalHead*)candidate; if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { - found = another; + found = candidate; + } + haystack = candidate + 1; + } + if (found == buf) { + SWalHead *logContent = (SWalHead*)found; + if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { + // file has to be deleted + free(buf); + taosCloseFile(fd); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; } } taosCloseFile(fd); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index d12acb52c6..93ec4693a3 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -106,6 +106,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { // init write buffer memset(&pWal->writeHead, 0, sizeof(SWalHead)); pWal->writeHead.head.headVer = WAL_HEAD_VER; + pWal->writeHead.magic = WAL_MAGIC; if (pthread_mutex_init(&pWal->mutex, NULL) < 0) { taosArrayDestroy(pWal->fileInfoSet); diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index a95c75b11d..5774eea8c0 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -340,7 +340,10 @@ TEST_F(WalRetentionEnv, repairMeta1) { char buf[100]; sprintf(buf, "%s/meta-ver%d", pathName, 0); remove(buf); + sprintf(buf, "%s/meta-ver%d", pathName, 1); + remove(buf); SetUp(); + //getchar(); ASSERT_EQ(pWal->vers.lastVer, 99); From a6303b07d38ca8cdbd6507ea6119310281eaa6e8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 11 Jan 2022 17:55:53 +0800 Subject: [PATCH 8/8] force repair --- source/libs/wal/src/walMeta.c | 40 +++++++++++++--------------- source/libs/wal/src/walMgmt.c | 5 +++- source/libs/wal/test/walMetaTest.cpp | 22 +++++++++++++++ tests/script/sim/db/basic1.sim | 3 +-- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 1c08f88dc6..cac80c0a5f 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -65,6 +65,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { struct stat statbuf; stat(fnameStr, &statbuf); int readSize = MIN(WAL_MAX_SIZE + 2, statbuf.st_size); + pLastFileInfo->fileSize = statbuf.st_size; FileFd fd = taosOpenFileRead(fnameStr); if (fd < 0) { @@ -91,7 +92,7 @@ static inline int64_t walScanLogGetLastVer(SWal* pWal) { char* haystack = buf; char* found = NULL; - char *candidate = NULL; + char *candidate; while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { // read and validate SWalHead *logContent = (SWalHead*)candidate; @@ -142,7 +143,6 @@ int walCheckAndRepairMeta(SWal* pWal) { SWalFileInfo fileInfo; memset(&fileInfo, -1, sizeof(SWalFileInfo)); sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer); - FileFd fd = taosOpenFileRead(ent->d_name); //get lastVer //get size taosArrayPush(pLogInfoArray, &fileInfo); @@ -159,28 +159,25 @@ int walCheckAndRepairMeta(SWal* pWal) { } int newSz = taosArrayGetSize(pLogInfoArray); // case 1. meta file not exist / cannot be parsed - if (pWal->fileInfoSet == NULL && newSz != 0) { - // recover fileInfo set - pWal->fileInfoSet = pLogInfoArray; - if (newSz != 0) { - // recover meta version - pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer; - pWal->writeCur = newSz - 1; - } - // recover file size - } else if (oldSz < newSz) { + if (oldSz < newSz) { for (int i = oldSz; i < newSz; i++) { SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i); taosArrayPush(pWal->fileInfoSet, pFileInfo); } + pWal->writeCur = newSz - 1; + pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer; + pWal->vers.lastVer = walScanLogGetLastVer(pWal); + ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer; + ASSERT(pWal->vers.lastVer != -1); + + int code = walSaveMeta(pWal); + if (code < 0) { + taosArrayDestroy(pLogInfoArray); + return -1; + } } - if (pWal->fileInfoSet && taosArrayGetSize(pWal->fileInfoSet) != 0) { - pWal->vers.lastVer = walScanLogGetLastVer(pWal); - ASSERT(pWal->vers.lastVer != -1); - } - // case 2. versions in meta not match log // or some log not included in meta // (e.g. program killed) @@ -204,14 +201,11 @@ int walCheckAndRepairMeta(SWal* pWal) { } #endif - int code = walSaveMeta(pWal); - if (code < 0) { - return -1; - } // get last version of this file // // rebuild meta + taosArrayDestroy(pLogInfoArray); return 0; } @@ -419,6 +413,10 @@ int walLoadMeta(SWal* pWal) { } memset(buf, 0, size + 5); FileFd fd = taosOpenFileRead(fnameStr); + if (fd < 0) { + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + return -1; + } if (taosReadFile(fd, buf, size) != size) { terrno = TAOS_SYSTEM_ERROR(errno); taosCloseFile(fd); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 93ec4693a3..d5c28d9d9b 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -122,7 +122,9 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { return NULL; } - if (walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) { + walLoadMeta(pWal); + + if (walCheckAndRepairMeta(pWal) < 0) { taosRemoveRef(tsWal.refSetId, pWal->refId); pthread_mutex_destroy(&pWal->mutex); taosArrayDestroy(pWal->fileInfoSet); @@ -131,6 +133,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } if (walCheckAndRepairIdx(pWal) < 0) { + } wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index 5774eea8c0..b65a200ca1 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -380,4 +380,26 @@ TEST_F(WalRetentionEnv, repairMeta1) { ASSERT_EQ(code, 0); } + for (int i = 0; i < 1000; i++) { + int ver = rand() % 200; + code = walReadWithHandle(pRead, ver); + ASSERT_EQ(code, 0); + + // printf("rrbody: \n"); + // for(int i = 0; i < pRead->pHead->head.len; i++) { + // printf("%d ", pRead->pHead->head.body[i]); + //} + // printf("\n"); + + ASSERT_EQ(pRead->pHead->head.version, ver); + ASSERT_EQ(pRead->curVersion, ver + 1); + char newStr[100]; + sprintf(newStr, "%s-%d", ranStr, ver); + int len = strlen(newStr); + ASSERT_EQ(pRead->pHead->head.len, len); + for (int j = 0; j < len; j++) { + EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]); + } + } + } diff --git a/tests/script/sim/db/basic1.sim b/tests/script/sim/db/basic1.sim index 52af7d93ea..33af1c5b59 100644 --- a/tests/script/sim/db/basic1.sim +++ b/tests/script/sim/db/basic1.sim @@ -85,7 +85,6 @@ if $data02 != 2 then return -1 endi -return system sh/exec.sh -n dnode1 -s stop -x SIGKILL system sh/exec.sh -n dnode1 -s start @@ -104,4 +103,4 @@ if $rows != 2 then return -1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT