Merge pull request #12885 from taosdata/fix/indexMultThread
fix: avoid multi thread read/write crash
This commit is contained in:
commit
8a7f6f4acc
|
@ -34,7 +34,7 @@ if (${BUILD_WITH_INVERTEDINDEX})
|
|||
endif(${BUILD_WITH_INVERTEDINDEX})
|
||||
|
||||
|
||||
#if (${BUILD_TEST})
|
||||
# add_subdirectory(test)
|
||||
#endif(${BUILD_TEST})
|
||||
if (${BUILD_TEST})
|
||||
add_subdirectory(test)
|
||||
endif(${BUILD_TEST})
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@ typedef struct TFileHeader {
|
|||
} TFileHeader;
|
||||
#pragma pack(pop)
|
||||
|
||||
#define TFILE_HEADER_SIZE (sizeof(TFileHeader))
|
||||
#define TFILE_HEADER_SIZE (sizeof(TFileHeader))
|
||||
#define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t))
|
||||
|
||||
typedef struct TFileValue {
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
#include "lucene++/Lucene_c.h"
|
||||
#endif
|
||||
|
||||
#define INDEX_NUM_OF_THREADS 4
|
||||
#define INDEX_NUM_OF_THREADS 1
|
||||
#define INDEX_QUEUE_SIZE 200
|
||||
|
||||
#define INDEX_DATA_BOOL_NULL 0x02
|
||||
|
@ -117,7 +117,6 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
|||
sIdx->path = tstrdup(path);
|
||||
taosThreadMutexInit(&sIdx->mtx, NULL);
|
||||
tsem_init(&sIdx->sem, 0, 0);
|
||||
// taosThreadCondInit(&sIdx->finished, NULL);
|
||||
|
||||
sIdx->refId = indexAddRef(sIdx);
|
||||
indexAcquireRef(sIdx->refId);
|
||||
|
@ -143,13 +142,13 @@ void indexDestroy(void* handle) {
|
|||
return;
|
||||
}
|
||||
void indexClose(SIndex* sIdx) {
|
||||
indexReleaseRef(sIdx->refId);
|
||||
bool ref = 0;
|
||||
if (sIdx->colObj != NULL) {
|
||||
void* iter = taosHashIterate(sIdx->colObj, NULL);
|
||||
while (iter) {
|
||||
IndexCache** pCache = iter;
|
||||
indexCacheForceToMerge((void*)(*pCache));
|
||||
indexInfo("%s wait to merge", (*pCache)->colName);
|
||||
indexWait((void*)(sIdx));
|
||||
iter = taosHashIterate(sIdx->colObj, iter);
|
||||
indexCacheUnRef(*pCache);
|
||||
|
@ -157,7 +156,7 @@ void indexClose(SIndex* sIdx) {
|
|||
taosHashCleanup(sIdx->colObj);
|
||||
sIdx->colObj = NULL;
|
||||
}
|
||||
// taosMsleep(1000 * 5);
|
||||
indexReleaseRef(sIdx->refId);
|
||||
indexRemoveRef(sIdx->refId);
|
||||
}
|
||||
int64_t indexAddRef(void* p) {
|
||||
|
@ -554,8 +553,29 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
|
|||
taosMemoryFree(value->colVal);
|
||||
value->colVal = NULL;
|
||||
}
|
||||
|
||||
static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
|
||||
ICacheKey key = {.suid = cache->suid, .colName = cache->colName, .nColName = strlen(cache->colName)};
|
||||
int64_t ver = CACHE_VERSION(cache);
|
||||
taosThreadMutexLock(&sIdx->mtx);
|
||||
TFileReader* trd = tfileCacheGet(((IndexTFile*)sIdx->tindex)->cache, &key);
|
||||
if (trd != NULL) {
|
||||
if (ver < trd->header.version) {
|
||||
ver = trd->header.version + 1;
|
||||
} else {
|
||||
ver += 1;
|
||||
}
|
||||
indexInfo("header: %d, ver: %" PRId64 "", trd->header.version, ver);
|
||||
tfileReaderUnRef(trd);
|
||||
} else {
|
||||
indexInfo("not found reader base %p", trd);
|
||||
}
|
||||
taosThreadMutexUnlock(&sIdx->mtx);
|
||||
return ver;
|
||||
}
|
||||
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||
int32_t version = CACHE_VERSION(cache);
|
||||
int64_t version = indexGetAvaialbleVer(sIdx, cache);
|
||||
indexInfo("file name version: %" PRId64 "", version);
|
||||
uint8_t colType = cache->type;
|
||||
|
||||
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
|
||||
|
@ -575,6 +595,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
|||
if (reader == NULL) {
|
||||
return -1;
|
||||
}
|
||||
indexInfo("success to create tfile, reopen it, %s", reader->ctx->file.buf);
|
||||
|
||||
TFileHeader* header = &reader->header;
|
||||
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
||||
|
|
|
@ -335,6 +335,9 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
|
|||
taosThreadCondInit(&cache->finished, NULL);
|
||||
|
||||
indexCacheRef(cache);
|
||||
if (idx != NULL) {
|
||||
indexAcquireRef(idx->refId);
|
||||
}
|
||||
return cache;
|
||||
}
|
||||
void indexCacheDebug(IndexCache* cache) {
|
||||
|
@ -426,13 +429,16 @@ void indexCacheDestroy(void* cache) {
|
|||
if (pCache == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
indexMemUnRef(pCache->mem);
|
||||
indexMemUnRef(pCache->imm);
|
||||
taosMemoryFree(pCache->colName);
|
||||
|
||||
taosThreadMutexDestroy(&pCache->mtx);
|
||||
taosThreadCondDestroy(&pCache->finished);
|
||||
|
||||
if (pCache->index != NULL) {
|
||||
indexReleaseRef(((SIndex*)pCache->index)->refId);
|
||||
}
|
||||
taosMemoryFree(pCache);
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
|||
int64_t file_size;
|
||||
taosStatFile(path, &file_size, NULL);
|
||||
ctx->file.size = (int)file_size;
|
||||
|
||||
} else {
|
||||
// ctx->file.pFile = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_READ);
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
p *
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
|
@ -152,10 +151,13 @@ TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
|
|||
char buf[128] = {0};
|
||||
int32_t sz = indexSerialCacheKey(key, buf);
|
||||
assert(sz < sizeof(buf));
|
||||
indexInfo("Try to get key: %s", buf);
|
||||
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
|
||||
if (reader == NULL) {
|
||||
if (reader == NULL || *reader == NULL) {
|
||||
indexInfo("failed to get key: %s", buf);
|
||||
return NULL;
|
||||
}
|
||||
indexInfo("Get key: %s file: %s", buf, (*reader)->ctx->file.buf);
|
||||
tfileReaderRef(*reader);
|
||||
|
||||
return *reader;
|
||||
|
@ -165,9 +167,10 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
|||
int32_t sz = indexSerialCacheKey(key, buf);
|
||||
// remove last version index reader
|
||||
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
|
||||
if (p != NULL) {
|
||||
if (p != NULL && *p != NULL) {
|
||||
TFileReader* oldReader = *p;
|
||||
taosHashRemove(tcache->tableCache, buf, sz);
|
||||
indexInfo("found %s, remove file %s", buf, oldReader->ctx->file.buf);
|
||||
oldReader->remove = true;
|
||||
tfileReaderUnRef(oldReader);
|
||||
}
|
||||
|
@ -180,7 +183,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
if (reader == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
reader->ctx = ctx;
|
||||
|
||||
if (0 != tfileReaderVerify(reader)) {
|
||||
|
@ -202,6 +204,7 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
|||
tfileReaderDestroy(reader);
|
||||
return NULL;
|
||||
}
|
||||
reader->remove = false;
|
||||
|
||||
return reader;
|
||||
}
|
||||
|
@ -536,7 +539,7 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
|||
indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr());
|
||||
return NULL;
|
||||
}
|
||||
indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
|
||||
indexTrace("open read file name:%s, file size: %d", wc->file.buf, wc->file.size);
|
||||
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
return reader;
|
||||
|
|
|
@ -674,10 +674,13 @@ class IndexObj {
|
|||
// opt
|
||||
numOfWrite = 0;
|
||||
numOfRead = 0;
|
||||
indexInit();
|
||||
// indexInit();
|
||||
}
|
||||
int Init(const std::string& dir) {
|
||||
taosRemoveDir(dir.c_str());
|
||||
int Init(const std::string& dir, bool remove = true) {
|
||||
if (remove) {
|
||||
taosRemoveDir(dir.c_str());
|
||||
taosMkDir(dir.c_str());
|
||||
}
|
||||
taosMkDir(dir.c_str());
|
||||
int ret = indexOpen(&opts, dir.c_str(), &idx);
|
||||
if (ret != 0) {
|
||||
|
@ -838,8 +841,11 @@ class IndexEnv2 : public ::testing::Test {
|
|||
initLog();
|
||||
index = new IndexObj();
|
||||
}
|
||||
virtual void TearDown() { delete index; }
|
||||
IndexObj* index;
|
||||
virtual void TearDown() {
|
||||
// taosMsleep(500);
|
||||
delete index;
|
||||
}
|
||||
IndexObj* index;
|
||||
};
|
||||
TEST_F(IndexEnv2, testIndexOpen) {
|
||||
std::string path = TD_TMP_DIR_PATH "test";
|
||||
|
@ -951,6 +957,8 @@ static void single_write_and_search(IndexObj* idx) {
|
|||
target = idx->SearchOne("tag2", "Test");
|
||||
}
|
||||
static void multi_write_and_search(IndexObj* idx) {
|
||||
idx->PutOne("tag1", "Hello");
|
||||
idx->PutOne("tag2", "Test");
|
||||
int target = idx->SearchOne("tag1", "Hello");
|
||||
target = idx->SearchOne("tag2", "Test");
|
||||
idx->WriteMultiMillonData("tag1", "hello world test", 100 * 100);
|
||||
|
@ -992,16 +1000,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
|
|||
}
|
||||
}
|
||||
|
||||
// TEST_F(IndexEnv2, testIndex_restart) {
|
||||
// std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
|
||||
// if (index->Init(path) != 0) {
|
||||
// }
|
||||
// index->SearchOneTarget("tag1", "Hello", 10);
|
||||
// index->SearchOneTarget("tag2", "Test", 10);
|
||||
//}
|
||||
TEST_F(IndexEnv2, testIndex_restart) {
|
||||
std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
|
||||
if (index->Init(path, false) != 0) {
|
||||
}
|
||||
index->SearchOneTarget("tag1", "Hello", 10);
|
||||
index->SearchOneTarget("tag2", "Test", 10);
|
||||
}
|
||||
// TEST_F(IndexEnv2, testIndex_restart1) {
|
||||
// std::string path = TD_TMP_DIR_PATH "cache_and_tfile";
|
||||
// if (index->Init(path) != 0) {
|
||||
// if (index->Init(path, false) != 0) {
|
||||
// }
|
||||
// index->ReadMultiMillonData("tag1", "coding");
|
||||
// index->SearchOneTarget("tag1", "Hello", 10);
|
||||
|
@ -1018,16 +1026,16 @@ TEST_F(IndexEnv2, testIndex_MultiWrite_and_MultiRead) {
|
|||
// std::cout << "reader sz: " << index->SearchOne("tag1", "Hello") << std::endl;
|
||||
// assert(3 == index->SearchOne("tag1", "Hello"));
|
||||
//}
|
||||
// TEST_F(IndexEnv2, testIndexMultiTag) {
|
||||
// std::string path = TD_TMP_DIR_PATH "multi_tag";
|
||||
// if (index->Init(path) != 0) {
|
||||
// }
|
||||
// int64_t st = taosGetTimestampUs();
|
||||
// int32_t num = 1000 * 10000;
|
||||
// index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
|
||||
// std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
|
||||
// // index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
|
||||
//}
|
||||
TEST_F(IndexEnv2, testIndexMultiTag) {
|
||||
std::string path = TD_TMP_DIR_PATH "multi_tag";
|
||||
if (index->Init(path) != 0) {
|
||||
}
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t num = 100 * 100;
|
||||
index->WriteMultiMillonData("tag1", "xxxxxxxxxxxxxxx", num);
|
||||
std::cout << "numOfRow: " << num << "\ttime cost:" << taosGetTimestampUs() - st << std::endl;
|
||||
// index->WriteMultiMillonData("tag2", "xxxxxxxxxxxxxxxxxxxxxxxxx", 100 * 10000);
|
||||
}
|
||||
TEST_F(IndexEnv2, testLongComVal1) {
|
||||
std::string path = TD_TMP_DIR_PATH "long_colVal";
|
||||
if (index->Init(path) != 0) {
|
||||
|
|
Loading…
Reference in New Issue