Merge pull request #9621 from taosdata/feature/index_complete

Feature/index complete
This commit is contained in:
Shengliang Guan 2022-01-06 11:21:03 +08:00 committed by GitHub
commit c71c111da0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 17 deletions

View File

@ -40,11 +40,12 @@ typedef struct IndexCache {
SIndex* index; SIndex* index;
char* colName; char* colName;
int32_t version; int32_t version;
int32_t nTerm; int64_t occupiedMem;
int8_t type; int8_t type;
uint64_t suid; uint64_t suid;
pthread_mutex_t mtx; pthread_mutex_t mtx;
pthread_cond_t finished;
} IndexCache; } IndexCache;
#define CACHE_VERSION(cache) atomic_load_32(&cache->version) #define CACHE_VERSION(cache) atomic_load_32(&cache->version)

View File

@ -399,6 +399,8 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { return -1; } if (sIdx == NULL) { return -1; }
indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
int64_t st = taosGetTimestampUs();
IndexCache* pCache = (IndexCache*)cache; IndexCache* pCache = (IndexCache*)cache;
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
if (pReader == NULL) { indexWarn("empty tfile reader found"); } if (pReader == NULL) { indexWarn("empty tfile reader found"); }
@ -458,6 +460,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
} }
int ret = indexGenTFile(sIdx, pCache, result); int ret = indexGenTFile(sIdx, pCache, result);
indexDestroyTempResult(result); indexDestroyTempResult(result);
indexCacheDestroyImm(pCache); indexCacheDestroyImm(pCache);
indexCacheIteratorDestroy(cacheIter); indexCacheIteratorDestroy(cacheIter);
@ -465,7 +468,14 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
tfileReaderUnRef(pReader); tfileReaderUnRef(pReader);
indexCacheUnRef(pCache); indexCacheUnRef(pCache);
return 0;
int64_t cost = taosGetTimestampUs() - st;
if (ret != 0) {
indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
} else {
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
}
return ret;
} }
void iterateValueDestroy(IterateValue* value, bool destroy) { void iterateValueDestroy(IterateValue* value, bool destroy) {
if (destroy) { if (destroy) {
@ -506,7 +516,10 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
pthread_mutex_unlock(&sIdx->mtx); pthread_mutex_unlock(&sIdx->mtx);
return ret; return ret;
END: END:
tfileWriterClose(tw); if (tw != NULL) {
writerCtxDestroy(tw->ctx, true);
free(tw);
}
return -1; return -1;
} }

View File

@ -21,6 +21,8 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000 #define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD 1024 * 1024 * 2
#define MEM_ESTIMATE_RADIO 1.5
static void indexMemRef(MemTable* tbl); static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl);
@ -50,7 +52,11 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
cache->index = idx; cache->index = idx;
cache->version = 0; cache->version = 0;
cache->suid = suid; cache->suid = suid;
cache->occupiedMem = 0;
pthread_mutex_init(&cache->mtx, NULL); pthread_mutex_init(&cache->mtx, NULL);
pthread_cond_init(&cache->finished, NULL);
indexCacheRef(cache); indexCacheRef(cache);
return cache; return cache;
} }
@ -121,6 +127,7 @@ void indexCacheDestroyImm(IndexCache* cache) {
pthread_mutex_lock(&cache->mtx); pthread_mutex_lock(&cache->mtx);
tbl = cache->imm; tbl = cache->imm;
cache->imm = NULL; // or throw int bg thread cache->imm = NULL; // or throw int bg thread
pthread_cond_broadcast(&cache->finished);
pthread_mutex_unlock(&cache->mtx); pthread_mutex_unlock(&cache->mtx);
indexMemUnRef(tbl); indexMemUnRef(tbl);
@ -133,6 +140,9 @@ void indexCacheDestroy(void* cache) {
indexMemUnRef(pCache->imm); indexMemUnRef(pCache->imm);
free(pCache->colName); free(pCache->colName);
pthread_mutex_destroy(&pCache->mtx);
pthread_cond_destroy(&pCache->finished);
free(pCache); free(pCache);
} }
@ -173,19 +183,19 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
} }
static void indexCacheMakeRoomForWrite(IndexCache* cache) { static void indexCacheMakeRoomForWrite(IndexCache* cache) {
while (true) { while (true) {
if (cache->nTerm < MEM_TERM_LIMIT) { if (cache->occupiedMem * MEM_ESTIMATE_RADIO < MEM_THRESHOLD) {
cache->nTerm += 1;
break; break;
} else if (cache->imm != NULL) { } else if (cache->imm != NULL) {
// TODO: wake up by condition variable // TODO: wake up by condition variable
pthread_mutex_unlock(&cache->mtx); // pthread_mutex_unlock(&cache->mtx);
taosMsleep(50); pthread_cond_wait(&cache->finished, &cache->mtx);
pthread_mutex_lock(&cache->mtx); // taosMsleep(50);
// pthread_mutex_lock(&cache->mtx);
} else { } else {
indexCacheRef(cache); indexCacheRef(cache);
cache->imm = cache->mem; cache->imm = cache->mem;
cache->mem = indexInternalCacheCreate(cache->type); cache->mem = indexInternalCacheCreate(cache->type);
cache->nTerm = 1; cache->occupiedMem = 0;
// sched to merge // sched to merge
// unref cache in bgwork // unref cache in bgwork
indexCacheSchedToMerge(cache); indexCacheSchedToMerge(cache);
@ -211,8 +221,9 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
ct->operaType = term->operType; ct->operaType = term->operType;
// ugly code, refactor later // ugly code, refactor later
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
pthread_mutex_lock(&pCache->mtx); pthread_mutex_lock(&pCache->mtx);
pCache->occupiedMem += estimate;
indexCacheMakeRoomForWrite(pCache); indexCacheMakeRoomForWrite(pCache);
MemTable* tbl = pCache->mem; MemTable* tbl = pCache->mem;
indexMemRef(tbl); indexMemRef(tbl);
@ -333,7 +344,7 @@ static int32_t compareKey(const void* l, const void* r) {
static MemTable* indexInternalCacheCreate(int8_t type) { static MemTable* indexInternalCacheCreate(int8_t type) {
MemTable* tbl = calloc(1, sizeof(MemTable)); MemTable* tbl = calloc(1, sizeof(MemTable));
indexMemRef(tbl); indexMemRef(tbl);
if (type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey); tbl->mem = tSkipListCreate(MAX_SKIP_LIST_LEVEL, type, MAX_INDEX_KEY_LEN, compareKey, SL_ALLOW_DUP_KEY, getIndexKey);
} }
return tbl; return tbl;

View File

@ -346,9 +346,6 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
} }
static bool tfileIteratorNext(Iterate* iiter) { static bool tfileIteratorNext(Iterate* iiter) {
IterateValue* iv = &iiter->val; IterateValue* iv = &iiter->val;
if (iv->colVal != NULL && iv->val != NULL) {
// indexError("value in fst: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
}
iterateValueDestroy(iv, false); iterateValueDestroy(iv, false);
char* colVal = NULL; char* colVal = NULL;

View File

@ -28,7 +28,7 @@
#include "tutil.h" #include "tutil.h"
using namespace std; using namespace std;
#define NUM_OF_THREAD 5 #define NUM_OF_THREAD 10
class DebugInfo { class DebugInfo {
public: public:
@ -882,8 +882,8 @@ static void single_write_and_search(IndexObj* idx) {
static void multi_write_and_search(IndexObj* idx) { static void multi_write_and_search(IndexObj* idx) {
int target = idx->SearchOne("tag1", "Hello"); int target = idx->SearchOne("tag1", "Hello");
target = idx->SearchOne("tag2", "Test"); target = idx->SearchOne("tag2", "Test");
idx->WriteMultiMillonData("tag1", "Hello", 100 * 10000); idx->WriteMultiMillonData("tag1", "hello world test", 100 * 10000);
idx->WriteMultiMillonData("tag2", "Test", 100 * 10000); idx->WriteMultiMillonData("tag2", "world test nothing", 100 * 10000);
} }
TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
std::string path = "/tmp/cache_and_tfile"; std::string path = "/tmp/cache_and_tfile";