diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h
index 67d2009d3b..51aaa7d903 100644
--- a/include/libs/wal/wal.h
+++ b/include/libs/wal/wal.h
@@ -71,6 +71,7 @@ extern int32_t wDebugFlag;
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
+#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_CUR_FAILED 1
@@ -98,6 +99,7 @@ typedef struct {
} SWalCfg;
typedef struct {
+ uint64_t magic;
uint32_t cksumHead;
uint32_t cksumBody;
SWalReadHead head;
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index 9a20fadbfb..463b107aa7 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -37,7 +37,7 @@ uint16_t tsServerPort = 6030;
int32_t tsStatusInterval = 1; // second
int8_t tsEnableTelemetryReporting = 0;
char tsEmail[TSDB_FQDN_LEN] = {0};
-int32_t tsNumOfSupportVnodes = 16;
+int32_t tsNumOfSupportVnodes = 128;
// common
int32_t tsRpcTimer = 300;
diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c
index f2f8931aa1..85b7fbbb42 100644
--- a/source/dnode/mnode/impl/src/mndDb.c
+++ b/source/dnode/mnode/impl/src/mndDb.c
@@ -463,7 +463,7 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
pCreate->commitTime = htonl(pCreate->commitTime);
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
- mDebug("db:%s, start to create", pCreate->db);
+ mDebug("db:%s, start to create, vgroups:%d", pCreate->db, pCreate->numOfVgroups);
SDbObj *pDb = mndAcquireDb(pMnode, pCreate->db);
if (pDb != NULL) {
@@ -476,6 +476,9 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1;
}
+ } else if (terrno != TSDB_CODE_MND_DB_NOT_EXIST) {
+ mError("db:%s, failed to create since %s", pCreate->db, terrstr());
+ return -1;
}
SUserObj *pOperUser = mndAcquireUser(pMnode, pReq->user);
diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c
index 7d77e29d74..ad8c16f826 100644
--- a/source/dnode/mnode/impl/src/mndStb.c
+++ b/source/dnode/mnode/impl/src/mndStb.c
@@ -204,7 +204,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName) {
SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = sdbAcquire(pSdb, SDB_STB, stbName);
- if (pStb == NULL) {
+ if (pStb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
}
return pStb;
@@ -513,9 +513,11 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
return 0;
} else {
terrno = TSDB_CODE_MND_STB_ALREADY_EXIST;
- mError("db:%s, failed to create since %s", pCreate->name, terrstr());
+ mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
return -1;
}
+ } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
+ mError("stb:%s, failed to create since %s", pCreate->name, terrstr());
}
// topic should have different name with stb
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 4a42133ce3..2301df65d7 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -809,7 +809,7 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
continueExec = true;
} else {
- mDebug("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
+ mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
continueExec = false;
}
diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c
index 4b7b370371..e0d6d3dd42 100644
--- a/source/dnode/mnode/impl/src/mndVgroup.c
+++ b/source/dnode/mnode/impl/src/mndVgroup.c
@@ -273,15 +273,10 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
SDnodeObj *pDnode = pObj;
SArray *pArray = p1;
- pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
-
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
- if (online && pDnode->numOfSupportVnodes > 0) {
- taosArrayPush(pArray, pDnode);
- }
-
- bool isMnode = mndIsMnode(pMnode, pDnode->id);
+ bool isMnode = mndIsMnode(pMnode, pDnode->id);
+ pDnode->numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
mDebug("dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d", pDnode->id, pDnode->numOfVnodes,
pDnode->numOfSupportVnodes, isMnode, online);
@@ -290,6 +285,9 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
pDnode->numOfVnodes++;
}
+ if (online && pDnode->numOfSupportVnodes > 0) {
+ taosArrayPush(pArray, pDnode);
+ }
return true;
}
@@ -311,7 +309,7 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) {
static int32_t mndCompareDnodeVnodes(SDnodeObj *pDnode1, SDnodeObj *pDnode2) {
float d1Score = (float)pDnode1->numOfVnodes / pDnode1->numOfSupportVnodes;
float d2Score = (float)pDnode2->numOfVnodes / pDnode2->numOfSupportVnodes;
- return d1Score > d2Score ? 1 : 0;
+ return d1Score >= d2Score ? 1 : 0;
}
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp
index 4f0ba9b0e7..5d5947b644 100644
--- a/source/dnode/mnode/impl/test/db/db.cpp
+++ b/source/dnode/mnode/impl/test/db/db.cpp
@@ -13,28 +13,17 @@
class MndTestDb : public ::testing::Test {
protected:
- static void SetUpTestSuite() {
- test.Init("/tmp/mnode_test_db", 9030);
- const char* fqdn = "localhost";
- const char* firstEp = "localhost:9030";
+ static void SetUpTestSuite() { test.Init("/tmp/mnode_test_db", 9030); }
+ static void TearDownTestSuite() { test.Cleanup(); }
- server2.Start("/tmp/mnode_test_db2", fqdn, 9031, firstEp);
- }
- static void TearDownTestSuite() {
- server2.Stop();
- test.Cleanup();
- }
-
- static Testbase test;
- static TestServer server2;
+ static Testbase test;
public:
void SetUp() override {}
void TearDown() override {}
};
-Testbase MndTestDb::test;
-TestServer MndTestDb::server2;
+Testbase MndTestDb::test;
TEST_F(MndTestDb, 01_ShowDb) {
test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, "");
diff --git a/source/dnode/vnode/meta/src/metaIdx.c b/source/dnode/vnode/meta/src/metaIdx.c
index 828bd12088..3da56fc394 100644
--- a/source/dnode/vnode/meta/src/metaIdx.c
+++ b/source/dnode/vnode/meta/src/metaIdx.c
@@ -49,9 +49,7 @@ int metaOpenIdx(SMeta *pMeta) {
#ifdef USE_INVERTED_INDEX
SIndexOpts opts;
- if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) {
- return -1;
- }
+ if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) { return -1; }
#endif
return 0;
@@ -67,16 +65,14 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */
#ifdef USE_INVERTED_INDEX
SIndexOpts opts;
- if (indexClose(pMeta->pIdx->pIdx) != 0) {
- return -1;
- }
+ if (indexClose(pMeta->pIdx->pIdx) != 0) { return -1; }
#endif
}
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) {
#ifdef USE_INVERTED_INDEX
- if (pTbCfgs - type == META_CHILD_TABLE) {
+ if (pTbCfgs->type == META_CHILD_TABLE) {
char buf[8] = {0};
int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
sprintf(buf, "%d", colId); // colname
diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c
index 19e9375491..0c222eae1a 100644
--- a/source/libs/index/src/index.c
+++ b/source/libs/index/src/index.c
@@ -59,6 +59,10 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
+// merge cache and tfile by opera type
+static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv);
+static void indexMergeSameKey(SArray* result, TFileValue* tv);
+
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
pthread_once(&isInit, indexInit);
SIndex* sIdx = calloc(1, sizeof(SIndex));
@@ -385,6 +389,27 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
taosArrayPush(result, &tv);
}
}
+static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv) {
+ // opt
+ char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
+ // design merge-algorithm later, too complicated to handle all kind of situation
+ TFileValue* tfv = tfileValueCreate(colVal);
+ if (cv != NULL) {
+ if (cv->type == ADD_VALUE) {
+ taosArrayAddAll(tfv->tableId, cv->val);
+ } else if (cv->type == DEL_VALUE) {
+ } else if (cv->type == UPDATE_VALUE) {
+ } else {
+ // do nothing
+ }
+ }
+ if (tv != NULL) {
+ // opt later
+ taosArrayAddAll(tfv->tableId, tv->val);
+ }
+
+ indexMergeSameKey(result, tfv);
+}
static void indexDestroyTempResult(SArray* result) {
int32_t sz = result ? taosArrayGetSize(result) : 0;
for (size_t i = 0; i < sz; i++) {
@@ -411,51 +436,30 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
- while (cn == true && tn == true) {
- IterateValue* cv = cacheIter->getValue(cacheIter);
- IterateValue* tv = tfileIter->getValue(tfileIter);
+ while (cn == true || tn == true) {
+ IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
+ IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
- // dump value
- int comp = strcmp(cv->colVal, tv->colVal);
+ int comp = 0;
+ if (cn == true && tn == true) {
+ comp = strcmp(cv->colVal, tv->colVal);
+ } else if (cn == true) {
+ comp = -1;
+ } else {
+ comp = 1;
+ }
if (comp == 0) {
- TFileValue* tfv = tfileValueCreate(cv->colVal);
- taosArrayAddAll(tfv->tableId, cv->val);
- taosArrayAddAll(tfv->tableId, tv->val);
- indexMergeSameKey(result, tfv);
-
+ indexMergeCacheAndTFile(result, cv, tv);
cn = cacheIter->next(cacheIter);
tn = tfileIter->next(tfileIter);
- continue;
} else if (comp < 0) {
- TFileValue* tfv = tfileValueCreate(cv->colVal);
- taosArrayAddAll(tfv->tableId, cv->val);
-
- indexMergeSameKey(result, tfv);
- // copy to final Result;
+ indexMergeCacheAndTFile(result, cv, NULL);
cn = cacheIter->next(cacheIter);
} else {
- TFileValue* tfv = tfileValueCreate(tv->colVal);
- taosArrayAddAll(tfv->tableId, tv->val);
-
- indexMergeSameKey(result, tfv);
- // copy to final result
+ indexMergeCacheAndTFile(result, NULL, tv);
tn = tfileIter->next(tfileIter);
}
}
- while (cn == true) {
- IterateValue* cv = cacheIter->getValue(cacheIter);
- TFileValue* tfv = tfileValueCreate(cv->colVal);
- taosArrayAddAll(tfv->tableId, cv->val);
- indexMergeSameKey(result, tfv);
- cn = cacheIter->next(cacheIter);
- }
- while (tn == true) {
- IterateValue* tv = tfileIter->getValue(tfileIter);
- TFileValue* tfv = tfileValueCreate(tv->colVal);
- taosArrayAddAll(tfv->tableId, tv->val);
- indexMergeSameKey(result, tfv);
- tn = tfileIter->next(tfileIter);
- }
int ret = indexGenTFile(sIdx, pCache, result);
indexDestroyTempResult(result);
@@ -503,7 +507,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
tfileWriterClose(tw);
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
- if (reader == NULL) { goto END; }
+ if (reader == NULL) { return -1; }
TFileHeader* header = &reader->header;
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c
index 294c8192e8..48566a8674 100644
--- a/source/libs/index/src/index_cache.c
+++ b/source/libs/index/src/index_cache.c
@@ -217,9 +217,9 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
// set value
ct->uid = uid;
ct->operaType = term->operType;
-
// ugly code, refactor later
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
+
pthread_mutex_lock(&pCache->mtx);
pCache->occupiedMem += estimate;
indexCacheMakeRoomForWrite(pCache);
@@ -331,7 +331,6 @@ static char* indexCacheTermGet(const void* pData) {
static int32_t indexCacheTermCompare(const void* l, const void* r) {
CacheTerm* lt = (CacheTerm*)l;
CacheTerm* rt = (CacheTerm*)r;
-
// compare colVal
int32_t cmp = strcmp(lt->colVal, rt->colVal);
if (cmp == 0) { return rt->version - lt->version; }
@@ -359,17 +358,32 @@ static bool indexCacheIteratorNext(Iterate* itera) {
IterateValue* iv = &itera->val;
iterateValueDestroy(iv, false);
+ // IterateValue* iv = &itera->val;
+ // IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))};
+
bool next = tSkipListIterNext(iter);
if (next) {
SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
+ // equal func
+ // if (iv->colVal != NULL && ct->colVal != NULL) {
+ // if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) }
+ //} else {
+ // tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1);
+ // tIterval.colVal = tstrdup(ct->colVal);
+ //}
iv->type = ct->operaType;
- iv->colVal = calloc(1, strlen(ct->colVal) + 1);
- memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
+ iv->colVal = tstrdup(ct->colVal);
+ // iv->colVal = calloc(1, strlen(ct->colVal) + 1);
+ // memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
taosArrayPush(iv->val, &ct->uid);
}
+ // IterateValue* iv = &itera->val;
+ // iterateValueDestroy(iv, true);
+ //*iv = tIterVal;
+
return next;
}
diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c
index 4f782cef26..5299a7dc5f 100644
--- a/source/libs/index/src/index_fst.c
+++ b/source/libs/index/src/index_fst.c
@@ -936,6 +936,7 @@ Fst* fstCreate(FstSlice* slice) {
len -= sizeof(checkSum);
taosDecodeFixedU32(buf + len, &checkSum);
if (taosCheckChecksum(buf, len, checkSum)) {
+ indexError("index file is corrupted");
// verify fst
return NULL;
}
diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c
index 6db5555aa6..b57f639726 100644
--- a/source/libs/index/src/index_fst_counting_writer.c
+++ b/source/libs/index/src/index_fst_counting_writer.c
@@ -60,9 +60,10 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
return nRead;
}
static int writeCtxGetSize(WriterCtx* ctx) {
- if (ctx->type == TFile && ctx->file.readOnly) {
- // refactor later
- return ctx->file.size;
+ if (ctx->type == TFile) {
+ struct stat fstat;
+ stat(ctx->file.buf, &fstat);
+ return fstat.st_size;
}
return 0;
}
@@ -88,7 +89,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
if (readOnly == false) {
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx->file.fd = tfOpenCreateWriteAppend(path);
-
+ tfFtruncate(ctx->file.fd, 0);
struct stat fstat;
stat(path, &fstat);
ctx->file.size = fstat.st_size;
@@ -138,6 +139,11 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
munmap(ctx->file.ptr, ctx->file.size);
#endif
}
+ if (ctx->file.readOnly == false) {
+ struct stat fstat;
+ stat(ctx->file.buf, &fstat);
+ // indexError("write file size: %d", (int)(fstat.st_size));
+ }
if (remove) { unlink(ctx->file.buf); }
}
free(ctx);
diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c
index 4b76402560..98fede4f7b 100644
--- a/source/libs/index/src/index_tfile.c
+++ b/source/libs/index/src/index_tfile.c
@@ -147,21 +147,22 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
reader->ctx = ctx;
if (0 != tfileReaderVerify(reader)) {
- tfileReaderDestroy(reader);
indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
+ tfileReaderDestroy(reader);
return NULL;
}
// T_REF_INC(reader);
if (0 != tfileReaderLoadHeader(reader)) {
- tfileReaderDestroy(reader);
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
reader->header.colName);
+ tfileReaderDestroy(reader);
return NULL;
}
if (0 != tfileReaderLoadFst(reader)) {
+ indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s, errno: %d", reader->header.suid,
+ reader->header.colName, errno);
tfileReaderDestroy(reader);
- indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
return NULL;
}
@@ -303,6 +304,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
} else {
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
// (int)taosArrayGetSize(v->tableId));
+
+ // indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
}
}
fstBuilderFinish(tw->fb);
@@ -485,7 +488,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
tw->header.fstOffset = fstOffset;
+
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
+ indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx));
tw->offset += sizeof(fstOffset);
return 0;
}
@@ -495,8 +500,11 @@ static int tfileWriteHeader(TFileWriter* writer) {
TFileHeader* header = &writer->header;
memcpy(buf, (char*)header, sizeof(buf));
+ indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx));
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
if (sizeof(buf) != nwrite) { return -1; }
+
+ indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx));
writer->offset = nwrite;
return 0;
}
@@ -521,6 +529,8 @@ static int tfileWriteFooter(TFileWriter* write) {
void* pBuf = (void*)buf;
taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
int nwrite = write->ctx->write(write->ctx, buf, strlen(buf));
+
+ indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx));
assert(nwrite == sizeof(tfileMagicNumber));
return nwrite;
}
diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h
index 7631593dd8..4624e05f10 100644
--- a/source/libs/wal/inc/walInt.h
+++ b/source/libs/wal/inc/walInt.h
@@ -17,10 +17,10 @@
#define _TD_WAL_INT_H_
#include "compare.h"
-#include "tchecksum.h"
-#include "wal.h"
-
#include "taoserror.h"
+#include "tchecksum.h"
+#include "tcoding.h"
+#include "wal.h"
#ifdef __cplusplus
extern "C" {
@@ -40,6 +40,19 @@ typedef struct WalIdxEntry {
int64_t offset;
} SWalIdxEntry;
+static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) {
+ int tlen;
+ tlen += taosEncodeFixedI64(buf, pIdxEntry->ver);
+ tlen += taosEncodeFixedI64(buf, pIdxEntry->offset);
+ return 0;
+}
+
+static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) {
+ buf = taosDecodeFixedI64(buf, &pIdxEntry->ver);
+ buf = taosDecodeFixedI64(buf, &pIdxEntry->offset);
+ return buf;
+}
+
static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) {
SWalFileInfo* pInfoLeft = (SWalFileInfo*)pLeft;
SWalFileInfo* pInfoRight = (SWalFileInfo*)pRight;
@@ -130,12 +143,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// meta section end
// seek section
-int walChangeFile(SWal* pWal, int64_t ver);
-int walChangeFileToLast(SWal* pWal);
+int walChangeWrite(SWal* pWal, int64_t ver);
+int walSetWrite(SWal* pWal);
// seek section end
int64_t walGetSeq();
-int walSeekVer(SWal* pWal, int64_t ver);
+int walSeekWriteVer(SWal* pWal, int64_t ver);
int walRoll(SWal* pWal);
#ifdef __cplusplus
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index 270a26bf80..cac80c0a5f 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -13,11 +13,9 @@
* along with this program. If not, see .
*/
-#define _DEFAULT_SOURCE
#include "cJSON.h"
#include "os.h"
#include "taoserror.h"
-#include "tfile.h"
#include "tref.h"
#include "walInt.h"
@@ -34,13 +32,98 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
}
+void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
+ char* limit;
+
+ if (nlen == 0 || hlen < nlen) {
+ return false;
+ }
+
+ limit = haystack + hlen - nlen + 1;
+ while ((haystack = (char*)memchr(
+ haystack, needle[0], limit - haystack)) != NULL) {
+ if (memcmp(haystack, needle, nlen) == 0) {
+ return haystack;
+ }
+ haystack++;
+ }
+ return NULL;
+}
+
+static inline int64_t walScanLogGetLastVer(SWal* pWal) {
+ ASSERT(pWal->fileInfoSet != NULL);
+ int sz = taosArrayGetSize(pWal->fileInfoSet);
+ ASSERT(sz > 0);
+ for (int i = 0; i < sz; i++) {
+ SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
+
+ }
+ SWalFileInfo *pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz-1);
+ char fnameStr[WAL_FILE_LEN];
+ walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr);
+
+ struct stat statbuf;
+ stat(fnameStr, &statbuf);
+ int readSize = MIN(WAL_MAX_SIZE + 2, statbuf.st_size);
+ pLastFileInfo->fileSize = statbuf.st_size;
+
+ FileFd fd = taosOpenFileRead(fnameStr);
+ if (fd < 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ uint64_t magic = WAL_MAGIC;
+
+ char* buf = malloc(readSize + 5);
+ if (buf == NULL) {
+ taosCloseFile(fd);
+ terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
+ return -1;
+ }
+
+ taosLSeekFile(fd, -readSize, SEEK_END);
+ if (readSize != taosReadFile(fd, buf, readSize)) {
+ free(buf);
+ taosCloseFile(fd);
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ char* haystack = buf;
+ char* found = NULL;
+ char *candidate;
+ while((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) {
+ // read and validate
+ SWalHead *logContent = (SWalHead*)candidate;
+ if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) {
+ found = candidate;
+ }
+ haystack = candidate + 1;
+ }
+ if (found == buf) {
+ SWalHead *logContent = (SWalHead*)found;
+ if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) {
+ // file has to be deleted
+ free(buf);
+ taosCloseFile(fd);
+ terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
+ return -1;
+ }
+ }
+ taosCloseFile(fd);
+ SWalHead *lastEntry = (SWalHead*)found;
+
+ return lastEntry->head.version;
+}
+
int walCheckAndRepairMeta(SWal* pWal) {
// load log files, get first/snapshot/last version info
const char* logPattern = "^[0-9]+.log$";
const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern;
regex_t idxRegPattern;
- SArray* pLogArray = taosArrayInit(8, sizeof(int64_t));
+ SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo));
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
@@ -51,20 +134,78 @@ int walCheckAndRepairMeta(SWal* pWal) {
return -1;
}
+ // scan log files and build new meta
struct dirent* ent;
while ((ent = readdir(dir)) != NULL) {
char* name = basename(ent->d_name);
int code = regexec(&logRegPattern, name, 0, NULL, 0);
if (code == 0) {
- int64_t firstVer;
- sscanf(name, "%" PRId64 ".log", &firstVer);
- taosArrayPush(pLogArray, &firstVer);
+ SWalFileInfo fileInfo;
+ memset(&fileInfo, -1, sizeof(SWalFileInfo));
+ sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
+ //get lastVer
+ //get size
+ taosArrayPush(pLogInfoArray, &fileInfo);
}
}
- // load meta
- // if not match, or meta missing
+ regfree(&logRegPattern);
+ regfree(&idxRegPattern);
+
+ taosArraySort(pLogInfoArray, compareWalFileInfo);
+ int oldSz = 0;
+ if (pWal->fileInfoSet) {
+ oldSz = taosArrayGetSize(pWal->fileInfoSet);
+ }
+ int newSz = taosArrayGetSize(pLogInfoArray);
+ // case 1. meta file not exist / cannot be parsed
+ if (oldSz < newSz) {
+ for (int i = oldSz; i < newSz; i++) {
+ SWalFileInfo *pFileInfo = taosArrayGet(pLogInfoArray, i);
+ taosArrayPush(pWal->fileInfoSet, pFileInfo);
+ }
+
+ pWal->writeCur = newSz - 1;
+ pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pLogInfoArray, 0))->firstVer;
+ pWal->vers.lastVer = walScanLogGetLastVer(pWal);
+ ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer;
+ ASSERT(pWal->vers.lastVer != -1);
+
+ int code = walSaveMeta(pWal);
+ if (code < 0) {
+ taosArrayDestroy(pLogInfoArray);
+ return -1;
+ }
+ }
+
+ // case 2. versions in meta not match log
+ // or some log not included in meta
+ // (e.g. program killed)
+ //
+ // case 3. other corrupt cases
+ //
+#if 0
+ int sz = taosArrayGetSize(pLogInfoArray);
+ for (int i = 0; i < sz; i++) {
+ SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
+ if (i == 0 && pFileInfo->firstVer != walGetFirstVer(pWal)) {
+ //repair
+ }
+
+ if (i > 0) {
+ SWalFileInfo* pLastFileInfo = taosArrayGet(pLogInfoArray, i-1);
+ if (pLastFileInfo->lastVer != pFileInfo->firstVer) {
+
+ }
+ }
+ }
+#endif
+
+
+ // get last version of this file
+ //
// rebuild meta
+ taosArrayDestroy(pLogInfoArray);
return 0;
}
@@ -87,6 +228,7 @@ int walRollFileInfo(SWal* pWal) {
// TODO: change to emplace back
SWalFileInfo* pNewInfo = malloc(sizeof(SWalFileInfo));
if (pNewInfo == NULL) {
+ terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
pNewInfo->firstVer = pWal->vers.lastVer + 1;
@@ -94,7 +236,7 @@ int walRollFileInfo(SWal* pWal) {
pNewInfo->createTs = ts;
pNewInfo->closeTs = -1;
pNewInfo->fileSize = 0;
- taosArrayPush(pWal->fileInfoSet, pNewInfo);
+ taosArrayPush(pArray, pNewInfo);
free(pNewInfo);
return 0;
}
@@ -108,7 +250,16 @@ char* walMetaSerialize(SWal* pWal) {
cJSON* pFiles = cJSON_CreateArray();
cJSON* pField;
if (pRoot == NULL || pMeta == NULL || pFiles == NULL) {
- // TODO
+ if(pRoot) {
+ cJSON_Delete(pRoot);
+ }
+ if(pMeta) {
+ cJSON_Delete(pMeta);
+ }
+ if(pFiles) {
+ cJSON_Delete(pFiles);
+ }
+ terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return NULL;
}
cJSON_AddItemToObject(pRoot, "meta", pMeta);
@@ -221,18 +372,18 @@ int walSaveMeta(SWal* pWal) {
int metaVer = walFindCurMetaVer(pWal);
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer + 1, fnameStr);
- int metaTfd = tfOpenCreateWrite(fnameStr);
- if (metaTfd < 0) {
+ FileFd metaFd = taosOpenFileCreateWrite(fnameStr);
+ if (metaFd < 0) {
return -1;
}
char* serialized = walMetaSerialize(pWal);
int len = strlen(serialized);
- if (len != tfWrite(metaTfd, serialized, len)) {
+ if (len != taosWriteFile(metaFd, serialized, len)) {
// TODO:clean file
return -1;
}
- tfClose(metaTfd);
+ taosCloseFile(metaFd);
// delete old file
if (metaVer > -1) {
walBuildMetaName(pWal, metaVer, fnameStr);
@@ -247,7 +398,7 @@ int walLoadMeta(SWal* pWal) {
// find existing meta file
int metaVer = walFindCurMetaVer(pWal);
if (metaVer == -1) {
- return 0;
+ return -1;
}
char fnameStr[WAL_FILE_LEN];
walBuildMetaName(pWal, metaVer, fnameStr);
@@ -257,23 +408,24 @@ int walLoadMeta(SWal* pWal) {
int size = statbuf.st_size;
char* buf = malloc(size + 5);
if (buf == NULL) {
+ terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1;
}
memset(buf, 0, size + 5);
- int tfd = tfOpenRead(fnameStr);
- if (tfRead(tfd, buf, size) != size) {
- tfClose(tfd);
+ FileFd fd = taosOpenFileRead(fnameStr);
+ if (fd < 0) {
+ terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
+ return -1;
+ }
+ if (taosReadFile(fd, buf, size) != size) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ taosCloseFile(fd);
free(buf);
return -1;
}
// load into fileInfoSet
int code = walMetaDeserialize(pWal, buf);
- if (code != 0) {
- tfClose(tfd);
- free(buf);
- return -1;
- }
- tfClose(tfd);
+ taosCloseFile(fd);
free(buf);
- return 0;
+ return code;
}
diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c
index d12acb52c6..d5c28d9d9b 100644
--- a/source/libs/wal/src/walMgmt.c
+++ b/source/libs/wal/src/walMgmt.c
@@ -106,6 +106,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
// init write buffer
memset(&pWal->writeHead, 0, sizeof(SWalHead));
pWal->writeHead.head.headVer = WAL_HEAD_VER;
+ pWal->writeHead.magic = WAL_MAGIC;
if (pthread_mutex_init(&pWal->mutex, NULL) < 0) {
taosArrayDestroy(pWal->fileInfoSet);
@@ -121,7 +122,9 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return NULL;
}
- if (walLoadMeta(pWal) < 0 && walCheckAndRepairMeta(pWal) < 0) {
+ walLoadMeta(pWal);
+
+ if (walCheckAndRepairMeta(pWal) < 0) {
taosRemoveRef(tsWal.refSetId, pWal->refId);
pthread_mutex_destroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
@@ -130,6 +133,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
if (walCheckAndRepairIdx(pWal) < 0) {
+
}
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c
index 1d9f7bdf4d..91b172444b 100644
--- a/source/libs/wal/src/walSeek.c
+++ b/source/libs/wal/src/walSeek.c
@@ -20,7 +20,7 @@
#include "tref.h"
#include "walInt.h"
-static int walSeekFilePos(SWal* pWal, int64_t ver) {
+static int walSeekWritePos(SWal* pWal, int64_t ver) {
int code = 0;
int64_t idxTfd = pWal->writeIdxTfd;
@@ -41,7 +41,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
return -1;
}
ASSERT(entry.ver == ver);
- code = tfLseek(logTfd, entry.offset, SEEK_CUR);
+ code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@@ -49,7 +49,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
return code;
}
-int walChangeFileToLast(SWal* pWal) {
+int walSetWrite(SWal* pWal) {
int64_t idxTfd, logTfd;
SWalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet);
ASSERT(pRet != NULL);
@@ -57,13 +57,13 @@ int walChangeFileToLast(SWal* pWal) {
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, fileFirstVer, fnameStr);
- idxTfd = tfOpenReadWrite(fnameStr);
+ idxTfd = tfOpenCreateWriteAppend(fnameStr);
if (idxTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
walBuildLogName(pWal, fileFirstVer, fnameStr);
- logTfd = tfOpenReadWrite(fnameStr);
+ logTfd = tfOpenCreateWriteAppend(fnameStr);
if (logTfd < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@@ -74,46 +74,57 @@ int walChangeFileToLast(SWal* pWal) {
return 0;
}
-int walChangeFile(SWal* pWal, int64_t ver) {
+int walChangeWrite(SWal* pWal, int64_t ver) {
int code = 0;
int64_t idxTfd, logTfd;
char fnameStr[WAL_FILE_LEN];
- code = tfClose(pWal->writeLogTfd);
- if (code != 0) {
- // TODO
- terrno = TAOS_SYSTEM_ERROR(errno);
- return -1;
+ if (pWal->writeLogTfd != -1) {
+ code = tfClose(pWal->writeLogTfd);
+ if (code != 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
}
- code = tfClose(pWal->writeIdxTfd);
- if (code != 0) {
- terrno = TAOS_SYSTEM_ERROR(errno);
- return -1;
+ if (pWal->writeIdxTfd != -1) {
+ code = tfClose(pWal->writeIdxTfd);
+ if (code != 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
}
+
SWalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
// bsearch in fileSet
- SWalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
- ASSERT(pRet != NULL);
- int64_t fileFirstVer = pRet->firstVer;
- // closed
- if (taosArrayGetLast(pWal->fileInfoSet) != pRet) {
- walBuildIdxName(pWal, fileFirstVer, fnameStr);
- idxTfd = tfOpenRead(fnameStr);
- walBuildLogName(pWal, fileFirstVer, fnameStr);
- logTfd = tfOpenRead(fnameStr);
- } else {
- walBuildIdxName(pWal, fileFirstVer, fnameStr);
- idxTfd = tfOpenReadWrite(fnameStr);
- walBuildLogName(pWal, fileFirstVer, fnameStr);
- logTfd = tfOpenReadWrite(fnameStr);
+ int32_t idx = taosArraySearchIdx(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
+ ASSERT(idx != -1);
+ SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, idx);
+ /*ASSERT(pFileInfo != NULL);*/
+
+ int64_t fileFirstVer = pFileInfo->firstVer;
+ walBuildIdxName(pWal, fileFirstVer, fnameStr);
+ idxTfd = tfOpenCreateWriteAppend(fnameStr);
+ if (idxTfd < 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ pWal->writeIdxTfd = -1;
+ return -1;
+ }
+ walBuildLogName(pWal, fileFirstVer, fnameStr);
+ logTfd = tfOpenCreateWriteAppend(fnameStr);
+ if (logTfd < 0) {
+ tfClose(idxTfd);
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ pWal->writeLogTfd = -1;
+ return -1;
}
pWal->writeLogTfd = logTfd;
pWal->writeIdxTfd = idxTfd;
+ pWal->writeCur = idx;
return fileFirstVer;
}
-int walSeekVer(SWal* pWal, int64_t ver) {
+int walSeekWriteVer(SWal* pWal, int64_t ver) {
int code;
if (ver == pWal->vers.lastVer) {
return 0;
@@ -123,14 +134,15 @@ int walSeekVer(SWal* pWal, int64_t ver) {
return -1;
}
if (ver < pWal->vers.snapshotVer) {
+
}
if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
- code = walChangeFile(pWal, ver);
+ code = walChangeWrite(pWal, ver);
if (code != 0) {
return -1;
}
}
- code = walSeekFilePos(pWal, ver);
+ code = walSeekWritePos(pWal, ver);
if (code != 0) {
return -1;
}
diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c
index 975f232e3d..2bc328b4e2 100644
--- a/source/libs/wal/src/walWrite.c
+++ b/source/libs/wal/src/walWrite.c
@@ -46,12 +46,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// find correct file
if (ver < walGetLastFileFirstVer(pWal)) {
- // close current files
- tfClose(pWal->writeIdxTfd);
- tfClose(pWal->writeLogTfd);
- // open old files
- code = walChangeFile(pWal, ver);
- if (code != 0) {
+ // change current files
+ code = walChangeWrite(pWal, ver);
+ if (code < 0) {
return -1;
}
@@ -166,7 +163,8 @@ int32_t walEndSnapshot(SWal *pWal) {
}
// iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
- if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) {
+ if ((pWal->cfg.retentionSize != -1 && pWal->totSize > pWal->cfg.retentionSize)
+ || (pWal->cfg.retentionPeriod != -1 && iter->closeTs + pWal->cfg.retentionPeriod > ts)) {
// delete according to file size or close time
deleteCnt++;
newTotSize -= iter->fileSize;
@@ -191,13 +189,12 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
}
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
- ;
pWal->totSize = newTotSize;
pWal->vers.verInSnapshotting = -1;
// save snapshot ver, commit ver
int code = walSaveMeta(pWal);
- if (code != 0) {
+ if (code < 0) {
return -1;
}
@@ -225,18 +222,17 @@ int walRoll(SWal *pWal) {
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
idxTfd = tfOpenCreateWriteAppend(fnameStr);
if (idxTfd < 0) {
- ASSERT(0);
+ terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
logTfd = tfOpenCreateWriteAppend(fnameStr);
if (logTfd < 0) {
- ASSERT(0);
+ terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
code = walRollFileInfo(pWal);
if (code != 0) {
- ASSERT(0);
return -1;
}
@@ -291,8 +287,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
ASSERT(pWal->writeCur >= 0);
pthread_mutex_lock(&pWal->mutex);
+
if (pWal->writeIdxTfd == -1 || pWal->writeLogTfd == -1) {
- walChangeFileToLast(pWal);
+ walSetWrite(pWal);
+ tfLseek(pWal->writeLogTfd, 0, SEEK_END);
+ tfLseek(pWal->writeIdxTfd, 0, SEEK_END);
}
pWal->writeHead.head.version = index;
diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp
index cd082a3a43..b65a200ca1 100644
--- a/source/libs/wal/test/walMetaTest.cpp
+++ b/source/libs/wal/test/walMetaTest.cpp
@@ -107,6 +107,43 @@ class WalKeepEnv : public ::testing::Test {
const char* pathName = "/tmp/wal_test";
};
+class WalRetentionEnv : public ::testing::Test {
+ protected:
+ static void SetUpTestCase() {
+ int code = walInit();
+ ASSERT(code == 0);
+ }
+
+ static void TearDownTestCase() { walCleanUp(); }
+
+ void walResetEnv() {
+ TearDown();
+ taosRemoveDir(pathName);
+ SetUp();
+ }
+
+ void SetUp() override {
+ SWalCfg cfg;
+ cfg.rollPeriod = -1,
+ cfg.segSize = -1,
+ cfg.retentionPeriod = -1,
+ cfg.retentionSize = 0,
+ cfg.rollPeriod = 0,
+ cfg.vgId = 0,
+ cfg.level = TAOS_WAL_FSYNC;
+ pWal = walOpen(pathName, &cfg);
+ ASSERT(pWal != NULL);
+ }
+
+ void TearDown() override {
+ walClose(pWal);
+ pWal = NULL;
+ }
+
+ SWal* pWal = NULL;
+ const char* pathName = "/tmp/wal_test";
+};
+
TEST_F(WalCleanEnv, createNew) {
walRollFileInfo(pWal);
ASSERT(pWal->fileInfoSet != NULL);
@@ -283,3 +320,86 @@ TEST_F(WalKeepEnv, readHandleRead) {
}
}
}
+
+TEST_F(WalRetentionEnv, repairMeta1) {
+ walResetEnv();
+ int code;
+
+ int i;
+ for (i = 0; i < 100; i++) {
+ char newStr[100];
+ sprintf(newStr, "%s-%d", ranStr, i);
+ int len = strlen(newStr);
+ code = walWrite(pWal, i, 0, newStr, len);
+ ASSERT_EQ(code, 0);
+ }
+
+ TearDown();
+
+ //getchar();
+ char buf[100];
+ sprintf(buf, "%s/meta-ver%d", pathName, 0);
+ remove(buf);
+ sprintf(buf, "%s/meta-ver%d", pathName, 1);
+ remove(buf);
+ SetUp();
+ //getchar();
+
+ ASSERT_EQ(pWal->vers.lastVer, 99);
+
+ SWalReadHandle* pRead = walOpenReadHandle(pWal);
+ ASSERT(pRead != NULL);
+
+ for (int i = 0; i < 1000; i++) {
+ int ver = rand() % 100;
+ code = walReadWithHandle(pRead, ver);
+ ASSERT_EQ(code, 0);
+
+ // printf("rrbody: \n");
+ // for(int i = 0; i < pRead->pHead->head.len; i++) {
+ // printf("%d ", pRead->pHead->head.body[i]);
+ //}
+ // printf("\n");
+
+ ASSERT_EQ(pRead->pHead->head.version, ver);
+ ASSERT_EQ(pRead->curVersion, ver + 1);
+ char newStr[100];
+ sprintf(newStr, "%s-%d", ranStr, ver);
+ int len = strlen(newStr);
+ ASSERT_EQ(pRead->pHead->head.len, len);
+ for (int j = 0; j < len; j++) {
+ EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
+ }
+ }
+
+ for (i = 100; i < 200; i++) {
+ char newStr[100];
+ sprintf(newStr, "%s-%d", ranStr, i);
+ int len = strlen(newStr);
+ code = walWrite(pWal, i, 0, newStr, len);
+ ASSERT_EQ(code, 0);
+ }
+
+ for (int i = 0; i < 1000; i++) {
+ int ver = rand() % 200;
+ code = walReadWithHandle(pRead, ver);
+ ASSERT_EQ(code, 0);
+
+ // printf("rrbody: \n");
+ // for(int i = 0; i < pRead->pHead->head.len; i++) {
+ // printf("%d ", pRead->pHead->head.body[i]);
+ //}
+ // printf("\n");
+
+ ASSERT_EQ(pRead->pHead->head.version, ver);
+ ASSERT_EQ(pRead->curVersion, ver + 1);
+ char newStr[100];
+ sprintf(newStr, "%s-%d", ranStr, ver);
+ int len = strlen(newStr);
+ ASSERT_EQ(pRead->pHead->head.len, len);
+ for (int j = 0; j < len; j++) {
+ EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
+ }
+ }
+
+}
diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c
index cc8d6646b6..09aeff5ff3 100644
--- a/source/util/src/tarray.c
+++ b/source/util/src/tarray.c
@@ -342,7 +342,7 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t comparFn, int flags) {
void* item = taosArraySearch(pArray, key, comparFn, flags);
- return (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize;
+ return item == NULL ? -1 : (int32_t)((char*)item - (char*)pArray->pData) / pArray->elemSize;
}
void taosArraySortString(SArray* pArray, __compar_fn_t comparFn) {
diff --git a/source/util/test/arrayTest.cpp b/source/util/test/arrayTest.cpp
index 94b08ca6d7..939d4a701d 100644
--- a/source/util/test/arrayTest.cpp
+++ b/source/util/test/arrayTest.cpp
@@ -4,6 +4,7 @@
#include
#include "tarray.h"
+#include "tcompare.h"
namespace {
@@ -48,3 +49,34 @@ static void remove_batch_test() {
TEST(arrayTest, array_list_test) {
remove_batch_test();
}
+
+TEST(arrayTest, array_search_test) {
+ SArray *pa = (SArray*) taosArrayInit(4, sizeof(int32_t));
+
+ for(int32_t i = 10; i < 20; ++i) {
+ int32_t a = i;
+ taosArrayPush(pa, &a);
+ }
+
+ for(int i = 0; i < 30; i++) {
+ int32_t k = i;
+ int32_t* pRet = (int32_t*)taosArraySearch(pa, &k, compareInt32Val, TD_GE);
+ int32_t idx = taosArraySearchIdx(pa, &k, compareInt32Val, TD_GE);
+
+ if(pRet == NULL) {
+ ASSERT_EQ(idx, -1);
+ } else {
+ ASSERT_EQ(taosArrayGet(pa, idx), pRet);
+ }
+
+ pRet = (int32_t*)taosArraySearch(pa, &k, compareInt32Val, TD_LE);
+ idx = taosArraySearchIdx(pa, &k, compareInt32Val, TD_LE);
+
+ if(pRet == NULL) {
+ ASSERT_EQ(idx, -1);
+ } else {
+ ASSERT_EQ(taosArrayGet(pa, idx), pRet);
+ }
+
+ }
+}
diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt
index f960abb4e0..bc0c3a4f64 100644
--- a/tests/script/jenkins/basic.txt
+++ b/tests/script/jenkins/basic.txt
@@ -2,15 +2,16 @@
#======================b1-start===============
# ---- user
-./test.sh -f general/user/basic1.sim
+./test.sh -f sim/user/basic1.sim
# ---- db
-./test.sh -f general/db/basic1.sim
+./test.sh -f sim/db/basic1.sim
+./test.sh -f sim/db/error1.sim
# ---- table
-./test.sh -f general/table/basic1.sim
+./test.sh -f sim/table/basic1.sim
# ---- dnode
-./test.sh -f unique/dnode/basic1.sim
+./test.sh -f sim/dnode/basic1.sim
#======================b1-end===============
diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh
index fcc11ca213..e00363b28f 100755
--- a/tests/script/sh/deploy.sh
+++ b/tests/script/sh/deploy.sh
@@ -120,7 +120,7 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG
echo "secondEp ${HOSTNAME}:7200" >> $TAOS_CFG
echo "fqdn ${HOSTNAME}" >> $TAOS_CFG
echo "serverPort ${NODE}" >> $TAOS_CFG
-echo "supportVnodes 16" >> $TAOS_CFG
+echo "supportVnodes 128" >> $TAOS_CFG
echo "dataDir $DATA_DIR" >> $TAOS_CFG
echo "logDir $LOG_DIR" >> $TAOS_CFG
echo "debugFlag 0" >> $TAOS_CFG
diff --git a/tests/script/general/db/basic1.sim b/tests/script/sim/db/basic1.sim
similarity index 96%
rename from tests/script/general/db/basic1.sim
rename to tests/script/sim/db/basic1.sim
index 52af7d93ea..33af1c5b59 100644
--- a/tests/script/general/db/basic1.sim
+++ b/tests/script/sim/db/basic1.sim
@@ -85,7 +85,6 @@ if $data02 != 2 then
return -1
endi
-return
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
system sh/exec.sh -n dnode1 -s start
@@ -104,4 +103,4 @@ if $rows != 2 then
return -1
endi
-system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
+system sh/exec.sh -n dnode1 -s stop -x SIGINT
diff --git a/tests/script/general/db/basic.sim b/tests/script/sim/db/basic6.sim
similarity index 100%
rename from tests/script/general/db/basic.sim
rename to tests/script/sim/db/basic6.sim
diff --git a/tests/script/sim/db/error1.sim b/tests/script/sim/db/error1.sim
new file mode 100644
index 0000000000..bf9e04c017
--- /dev/null
+++ b/tests/script/sim/db/error1.sim
@@ -0,0 +1,99 @@
+system sh/stop_dnodes.sh
+system sh/deploy.sh -n dnode1 -i 1
+system sh/exec.sh -n dnode1 -s start
+system sh/deploy.sh -n dnode2 -i 2
+system sh/exec.sh -n dnode2 -s start
+sql connect
+
+print ========== create dnodes
+sql create dnode $hostname port 7200
+
+$x = 0
+create1:
+ $x = $x + 1
+ sleep 1000
+ if $x == 10 then
+ return -1
+ endi
+
+sql show dnodes
+if $data4_2 != ready then
+ goto create1
+endi
+
+print ========== stop dnode2
+system sh/exec.sh -n dnode2 -s stop -x SIGKILL
+
+print =============== create database
+sql_error create database d1 vgroups 4
+
+print ========== start dnode2
+system sh/exec.sh -n dnode2 -s start
+
+print =============== re-create database
+$x = 0
+re-create1:
+ $x = $x + 1
+ sleep 1000
+ if $x == 10 then
+ return -1
+ endi
+
+sql create database d1 vgroups 2 -x re-create1
+
+sql show databases
+if $rows != 1 then
+ return -1
+endi
+
+if $data00 != d1 then
+ return -1
+endi
+
+if $data02 != 2 then
+ return -1
+endi
+
+if $data03 != 0 then
+ return -1
+endi
+
+print ========== stop dnode2
+system sh/exec.sh -n dnode2 -s stop -x SIGKILL
+
+print =============== create database
+sql_error drop database d1
+
+print ========== start dnode2
+system sh/exec.sh -n dnode2 -s start
+
+print =============== re-create database
+$x = 0
+re-create2:
+ $x = $x + 1
+ sleep 1000
+ if $x == 10 then
+ return -1
+ endi
+
+sql create database d1 vgroups 5 -x re-create2
+
+sql show databases
+if $rows != 1 then
+ return -1
+endi
+
+if $data00 != d1 then
+ return -1
+endi
+
+if $data02 != 5 then
+ return -1
+endi
+
+if $data03 != 0 then
+ return -1
+endi
+
+system sh/exec.sh -n dnode1 -s stop -x SIGINT
+system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
diff --git a/tests/script/unique/dnode/basic1.sim b/tests/script/sim/dnode/basic1.sim
similarity index 100%
rename from tests/script/unique/dnode/basic1.sim
rename to tests/script/sim/dnode/basic1.sim
diff --git a/tests/script/general/table/basic1.sim b/tests/script/sim/table/basic1.sim
similarity index 100%
rename from tests/script/general/table/basic1.sim
rename to tests/script/sim/table/basic1.sim
diff --git a/tests/script/general/user/basic1.sim b/tests/script/sim/user/basic1.sim
similarity index 100%
rename from tests/script/general/user/basic1.sim
rename to tests/script/sim/user/basic1.sim