From f0da893bfb6fc5450cec971c6e8d9846935084fc Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 23 May 2022 21:59:47 +0800 Subject: [PATCH] fix: avoid multi thread read/write crash --- source/libs/index/CMakeLists.txt | 6 +-- source/libs/index/inc/indexTfile.h | 2 +- source/libs/index/src/index.c | 31 +++++++++-- source/libs/index/src/indexCache.c | 8 ++- .../libs/index/src/indexFstCountingWriter.c | 1 + source/libs/index/src/indexTfile.c | 13 +++-- source/libs/index/test/indexTests.cc | 54 +++++++++++-------- 7 files changed, 77 insertions(+), 38 deletions(-) diff --git a/source/libs/index/CMakeLists.txt b/source/libs/index/CMakeLists.txt index d5fd574aad..7dc66e4789 100644 --- a/source/libs/index/CMakeLists.txt +++ b/source/libs/index/CMakeLists.txt @@ -31,7 +31,7 @@ if (${BUILD_WITH_INVERTEDINDEX}) endif(${BUILD_WITH_INVERTEDINDEX}) -#if (${BUILD_TEST}) -# add_subdirectory(test) -#endif(${BUILD_TEST}) +if (${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) diff --git a/source/libs/index/inc/indexTfile.h b/source/libs/index/inc/indexTfile.h index 9712e4b30f..85ed397b0a 100644 --- a/source/libs/index/inc/indexTfile.h +++ b/source/libs/index/inc/indexTfile.h @@ -40,7 +40,7 @@ typedef struct TFileHeader { } TFileHeader; #pragma pack(pop) -#define TFILE_HEADER_SIZE (sizeof(TFileHeader)) +#define TFILE_HEADER_SIZE (sizeof(TFileHeader)) #define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t)) typedef struct TFileValue { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 2141e90bbd..6add788a89 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -29,7 +29,7 @@ #include "lucene++/Lucene_c.h" #endif -#define INDEX_NUM_OF_THREADS 4 +#define INDEX_NUM_OF_THREADS 1 #define INDEX_QUEUE_SIZE 200 #define INDEX_DATA_BOOL_NULL 0x02 @@ -117,7 +117,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { sIdx->path = tstrdup(path); taosThreadMutexInit(&sIdx->mtx, NULL); tsem_init(&sIdx->sem, 0, 0); - // taosThreadCondInit(&sIdx->finished, NULL); sIdx->refId = indexAddRef(sIdx); indexAcquireRef(sIdx->refId); @@ -143,13 +142,13 @@ void indexDestroy(void* handle) { return; } void indexClose(SIndex* sIdx) { - indexReleaseRef(sIdx->refId); bool ref = 0; if (sIdx->colObj != NULL) { void* iter = taosHashIterate(sIdx->colObj, NULL); while (iter) { IndexCache** pCache = iter; indexCacheForceToMerge((void*)(*pCache)); + indexInfo("%s wait to merge", (*pCache)->colName); indexWait((void*)(sIdx)); iter = taosHashIterate(sIdx->colObj, iter); indexCacheUnRef(*pCache); @@ -157,7 +156,7 @@ void indexClose(SIndex* sIdx) { taosHashCleanup(sIdx->colObj); sIdx->colObj = NULL; } - // taosMsleep(1000 * 5); + indexReleaseRef(sIdx->refId); indexRemoveRef(sIdx->refId); } int64_t indexAddRef(void* p) { @@ -554,8 +553,29 @@ void iterateValueDestroy(IterateValue* value, bool destroy) { taosMemoryFree(value->colVal); value->colVal = NULL; } + +static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) { + ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)}; + int64_t ver = CACHE_VERSION(cache); + taosThreadMutexLock(&sIdx->mtx); + TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key); + if (trd != NULL) { + if (ver < trd->header.version) { + ver = trd->header.version + 1; + } else { + ver += 1; + } + indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver); + tfileReaderUnRef(trd); + } else { + indexInfo("not found reader base %p", trd); + } + taosThreadMutexUnlock(&sIdx->mtx); + return ver; +} static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { - int32_t version = CACHE_VERSION(cache); + int64_t version = indexGetAvaialbleVer(sIdx, cache); + indexInfo("file name version: %" PRId64 "", version); uint8_t colType = cache->type; TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType); @@ -575,6 +595,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { if (reader == NULL) { return -1; } + indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf); TFileHeader* header = &reader->header; ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)}; diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 232eca9304..d704e3876e 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -335,6 +335,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in taosThreadCondInit(&cache->finished, NULL); indexCacheRef(cache); + if (idx != NULL) { + indexAcquireRef(idx->refId); + } return cache; } void indexCacheDebug(IndexCache* cache) { @@ -426,13 +429,16 @@ void indexCacheDestroy(void* cache) { if (pCache == NULL) { return; } + indexMemUnRef(pCache->mem); indexMemUnRef(pCache->imm); taosMemoryFree(pCache->colName); taosThreadMutexDestroy(&pCache->mtx); taosThreadCondDestroy(&pCache->finished); - + if (pCache->index != NULL) { + indexReleaseRef(((SIndex*)pCache->index)->refId); + } taosMemoryFree(pCache); } diff --git a/source/libs/index/src/indexFstCountingWriter.c b/source/libs/index/src/indexFstCountingWriter.c index 1d4395aff6..8ba5173602 100644 --- a/source/libs/index/src/indexFstCountingWriter.c +++ b/source/libs/index/src/indexFstCountingWriter.c @@ -97,6 +97,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int int64_t file_size; taosStatFile(path, &file_size, NULL); ctx->file.size = (int)file_size; + } else { // ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); ctx->file.pFile = taosOpenFile(path, TD_FILE_READ); diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index f5b3cbf227..3d85646bd2 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -1,6 +1,5 @@ /* * Copyright (c) 2019 TAOS Data, Inc. -p * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. @@ -152,10 +151,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) { char buf[128] = {0}; int32_t sz = indexSerialCacheKey(key, buf); assert(sz < sizeof(buf)); + indexInfo("Try to get key: %s", buf); TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz); - if (reader == NULL) { + if (reader == NULL || *reader == NULL) { + indexInfo("failed to get key: %s", buf); return NULL; } + indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf); tfileReaderRef(*reader); return *reader; @@ -165,9 +167,10 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) { int32_t sz = indexSerialCacheKey(key, buf); // remove last version index reader TFileReader** p = taosHashGet(tcache->tableCache, buf, sz); - if (p != NULL) { + if (p != NULL && *p != NULL) { TFileReader* oldReader = *p; taosHashRemove(tcache->tableCache, buf, sz); + indexInfo("found %s, remove file %s", buf, oldReader->ctx->file.buf); oldReader->remove = true; tfileReaderUnRef(oldReader); } @@ -180,7 +183,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { if (reader == NULL) { return NULL; } - reader->ctx = ctx; if (0 != tfileReaderVerify(reader)) { @@ -202,6 +204,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) { tfileReaderDestroy(reader); return NULL; } + reader->remove = false; return reader; } @@ -536,7 +539,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr()); return NULL; } - indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); + indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); TFileReader* reader = tfileReaderCreate(wc); return reader; diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 11a25a798f..f848cee86b 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -674,10 +674,13 @@ class IndexObj { // opt numOfWrite = 0; numOfRead = 0; - indexInit(); + // indexInit(); } - int Init(const std::string& dir) { - taosRemoveDir(dir.c_str()); + int Init(const std::string& dir, bool remove = true) { + if (remove) { + taosRemoveDir(dir.c_str()); + taosMkDir(dir.c_str()); + } taosMkDir(dir.c_str()); int ret = indexOpen(&opts, dir.c_str(), &idx); if (ret != 0) { @@ -838,8 +841,11 @@ class IndexEnv2 : public ::testing::Test { initLog(); index = new IndexObj(); } - virtual void TearDown() { delete index; } - IndexObj* index; + virtual void TearDown() { + // taosMsleep(500); + delete index; + } + IndexObj* index; }; TEST_F(IndexEnv2, testIndexOpen) { std::string path = TD_TMP_DIR_PATH "test"; @@ -951,6 +957,8 @@ static void single_write_and_search(IndexObj* idx) { target = idx->SearchOne("tag2", "Test"); } static void multi_write_and_search(IndexObj* idx) { + idx->PutOne("tag1", "Hello"); + idx->PutOne("tag2", "Test"); int target = idx->SearchOne("tag1", "Hello"); target = idx->SearchOne("tag2", "Test"); idx->WriteMultiMillonData("tag1", "hello world test", 100 * 100); @@ -992,16 +1000,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { } } -// TEST_F(IndexEnv2, testIndex_restart) { -// std::string path = TD_TMP_DIR_PATH "cache_and_tfile"; -// if (index->Init(path) != 0) { -// } -// index->SearchOneTarget("tag1", "Hello", 10); -// index->SearchOneTarget("tag2", "Test", 10); -//} +TEST_F(IndexEnv2, testIndex_restart) { + std::string path = TD_TMP_DIR_PATH "cache_and_tfile"; + if (index->Init(path, false) != 0) { + } + index->SearchOneTarget("tag1", "Hello", 10); + index->SearchOneTarget("tag2", "Test", 10); +} // TEST_F(IndexEnv2, testIndex_restart1) { // std::string path = TD_TMP_DIR_PATH "cache_and_tfile"; -// if (index->Init(path) != 0) { +// if (index->Init(path, false) != 0) { // } // index->ReadMultiMillonData("tag1", "coding"); // index->SearchOneTarget("tag1", "Hello", 10); @@ -1018,16 +1026,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) { // std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl; // assert(3 == index->SearchOne("tag1", "Hello")); //} -// TEST_F(IndexEnv2, testIndexMultiTag) { -// std::string path = TD_TMP_DIR_PATH "multi_tag"; -// if (index->Init(path) != 0) { -// } -// int64_t st = taosGetTimestampUs(); -// int32_t num = 1000 * 10000; -// index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num); -// std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl; -// // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000); -//} +TEST_F(IndexEnv2, testIndexMultiTag) { + std::string path = TD_TMP_DIR_PATH "multi_tag"; + if (index->Init(path) != 0) { + } + int64_t st = taosGetTimestampUs(); + int32_t num = 100 * 100; + index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num); + std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl; + // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000); +} TEST_F(IndexEnv2, testLongComVal1) { std::string path = TD_TMP_DIR_PATH "long_colVal"; if (index->Init(path) != 0) {