refactor code
This commit is contained in:
parent
9920b43be5
commit
db474b2078
|
@ -108,8 +108,17 @@ void iterateValueDestroy(IterateValue* iv, bool destroy);
|
|||
|
||||
extern void* indexQhandle;
|
||||
|
||||
typedef struct TFileCacheKey {
|
||||
uint64_t suid;
|
||||
uint8_t colType;
|
||||
char* colName;
|
||||
int32_t nColName;
|
||||
} ICacheKey;
|
||||
|
||||
int indexFlushCacheTFile(SIndex* sIdx, void*);
|
||||
|
||||
int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
|
||||
|
||||
#define indexFatal(...) \
|
||||
do { \
|
||||
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
|
||||
|
|
|
@ -42,6 +42,7 @@ typedef struct IndexCache {
|
|||
int32_t version;
|
||||
int32_t nTerm;
|
||||
int8_t type;
|
||||
uint64_t suid;
|
||||
|
||||
pthread_mutex_t mtx;
|
||||
} IndexCache;
|
||||
|
@ -58,7 +59,7 @@ typedef struct CacheTerm {
|
|||
} CacheTerm;
|
||||
//
|
||||
|
||||
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type);
|
||||
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type);
|
||||
|
||||
void indexCacheDestroy(void* cache);
|
||||
|
||||
|
|
|
@ -49,13 +49,6 @@ typedef struct TFileValue {
|
|||
int32_t offset;
|
||||
} TFileValue;
|
||||
|
||||
typedef struct TFileCacheKey {
|
||||
uint64_t suid;
|
||||
uint8_t colType;
|
||||
char* colName;
|
||||
int32_t nColName;
|
||||
} TFileCacheKey;
|
||||
|
||||
// table cache
|
||||
// refactor to LRU cache later
|
||||
typedef struct TFileCache {
|
||||
|
@ -103,10 +96,10 @@ typedef struct TFileReaderOpt {
|
|||
// tfile cache, manage tindex reader
|
||||
TFileCache* tfileCacheCreate(const char* path);
|
||||
void tfileCacheDestroy(TFileCache* tcache);
|
||||
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key);
|
||||
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader);
|
||||
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key);
|
||||
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
|
||||
|
||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName);
|
||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
|
||||
|
||||
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
|
||||
TFileReader* tfileReaderCreate(WriterCtx* ctx);
|
||||
|
|
|
@ -34,7 +34,7 @@ extern "C" {
|
|||
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
|
||||
do { \
|
||||
type c = var; \
|
||||
assert(sizeof(var) == sizeof(type)); \
|
||||
assert(sizeof(type) == sizeof(c)); \
|
||||
memcpy((void*)buf, (void*)&c, sizeof(c)); \
|
||||
buf += sizeof(c); \
|
||||
} while (0)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "indexInt.h"
|
||||
#include "index_cache.h"
|
||||
#include "index_tfile.h"
|
||||
#include "index_util.h"
|
||||
#include "tdef.h"
|
||||
#include "tsched.h"
|
||||
|
||||
|
@ -130,18 +131,28 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
|||
// TODO(yihao): reduce the lock range
|
||||
pthread_mutex_lock(&index->mtx);
|
||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||
|
||||
char buf[128] = {0};
|
||||
ICacheKey key = {.suid = p->suid, .colName = p->colName};
|
||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||
|
||||
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||
if (cache == NULL) {
|
||||
IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
|
||||
taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
|
||||
IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType);
|
||||
taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&index->mtx);
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||
|
||||
char buf[128] = {0};
|
||||
ICacheKey key = {.suid = p->suid, .colName = p->colName};
|
||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||
|
||||
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||
assert(*cache != NULL);
|
||||
int ret = indexCachePut(*cache, p, uid);
|
||||
if (ret != 0) { return ret; }
|
||||
|
@ -296,7 +307,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
|||
// Get col info
|
||||
IndexCache* cache = NULL;
|
||||
pthread_mutex_lock(&sIdx->mtx);
|
||||
IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
|
||||
|
||||
char buf[128] = {0};
|
||||
ICacheKey key = {.suid = term->suid, .colName = term->colName};
|
||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||
|
||||
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
|
||||
if (pCache == NULL) {
|
||||
pthread_mutex_unlock(&sIdx->mtx);
|
||||
return -1;
|
||||
|
@ -385,10 +401,12 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
|||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
|
||||
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
|
||||
if (pReader == NULL) { indexWarn("empty pReader found"); }
|
||||
// handle flush
|
||||
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
|
||||
Iterate* tfileIter = tfileIteratorCreate(pReader);
|
||||
if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); }
|
||||
|
||||
SArray* result = taosArrayInit(1024, sizeof(void*));
|
||||
|
||||
|
@ -468,7 +486,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
|||
int32_t version = CACHE_VERSION(cache);
|
||||
uint8_t colType = cache->type;
|
||||
|
||||
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, cache->colName, colType);
|
||||
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
|
||||
if (tw == NULL) {
|
||||
indexError("failed to open file to write");
|
||||
return -1;
|
||||
|
@ -481,14 +499,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
|||
}
|
||||
tfileWriterClose(tw);
|
||||
|
||||
TFileReader* reader = tfileReaderOpen(sIdx->path, sIdx->suid, version, cache->colName);
|
||||
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
|
||||
|
||||
char buf[128] = {0};
|
||||
TFileHeader* header = &reader->header;
|
||||
ICacheKey key = {
|
||||
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
|
||||
|
||||
char buf[128] = {0};
|
||||
TFileHeader* header = &reader->header;
|
||||
TFileCacheKey key = {.suid = header->suid,
|
||||
.colName = header->colName,
|
||||
.nColName = strlen(header->colName),
|
||||
.colType = header->colType};
|
||||
pthread_mutex_lock(&sIdx->mtx);
|
||||
|
||||
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
|
||||
|
@ -499,3 +516,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
|||
END:
|
||||
tfileWriterClose(tw);
|
||||
}
|
||||
|
||||
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
|
||||
char* p = buf;
|
||||
SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
||||
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
||||
return buf - p;
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ static bool indexCacheIteratorNext(Iterate* itera);
|
|||
|
||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
||||
|
||||
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
||||
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
|
||||
IndexCache* cache = calloc(1, sizeof(IndexCache));
|
||||
if (cache == NULL) {
|
||||
indexError("failed to create index cache");
|
||||
|
@ -53,7 +53,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
|||
cache->type = type;
|
||||
cache->index = idx;
|
||||
cache->version = 0;
|
||||
|
||||
cache->suid = suid;
|
||||
pthread_mutex_init(&cache->mtx, NULL);
|
||||
indexCacheRef(cache);
|
||||
return cache;
|
||||
|
|
|
@ -51,7 +51,6 @@ static void tfileDestroyFileName(void* elem);
|
|||
static int tfileCompare(const void* a, const void* b);
|
||||
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
|
||||
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
|
||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
||||
|
||||
TFileCache* tfileCacheCreate(const char* path) {
|
||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||
|
@ -80,18 +79,18 @@ TFileCache* tfileCacheCreate(const char* path) {
|
|||
goto End;
|
||||
}
|
||||
|
||||
char buf[128] = {0};
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
TFileHeader* header = &reader->header;
|
||||
TFileCacheKey key = {.suid = header->suid,
|
||||
.colName = header->colName,
|
||||
.nColName = strlen(header->colName),
|
||||
.colType = header->colType};
|
||||
tfileSerialCacheKey(&key, buf);
|
||||
char buf[128] = {0};
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
TFileHeader* header = &reader->header;
|
||||
ICacheKey key = {.suid = header->suid,
|
||||
.colName = header->colName,
|
||||
.nColName = strlen(header->colName),
|
||||
.colType = header->colType};
|
||||
|
||||
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||
assert(sz < sizeof(buf));
|
||||
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
|
||||
tfileReaderRef(reader);
|
||||
// indexTable
|
||||
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
|
||||
}
|
||||
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||
return tcache;
|
||||
|
@ -117,30 +116,30 @@ void tfileCacheDestroy(TFileCache* tcache) {
|
|||
free(tcache);
|
||||
}
|
||||
|
||||
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
|
||||
char buf[128] = {0};
|
||||
tfileSerialCacheKey(key, buf);
|
||||
|
||||
TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
|
||||
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
|
||||
char buf[128] = {0};
|
||||
int32_t sz = indexSerialCacheKey(key, buf);
|
||||
assert(sz < sizeof(buf));
|
||||
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
|
||||
if (reader == NULL) { return NULL; }
|
||||
tfileReaderRef(*reader);
|
||||
|
||||
return *reader;
|
||||
}
|
||||
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
|
||||
char buf[128] = {0};
|
||||
tfileSerialCacheKey(key, buf);
|
||||
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
||||
char buf[128] = {0};
|
||||
int32_t sz = indexSerialCacheKey(key, buf);
|
||||
// remove last version index reader
|
||||
TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf));
|
||||
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
|
||||
if (p != NULL) {
|
||||
TFileReader* oldReader = *p;
|
||||
taosHashRemove(tcache->tableCache, buf, strlen(buf));
|
||||
taosHashRemove(tcache->tableCache, buf, sz);
|
||||
oldReader->remove = true;
|
||||
tfileReaderUnRef(oldReader);
|
||||
}
|
||||
|
||||
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
|
||||
tfileReaderRef(reader);
|
||||
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
|
||||
return;
|
||||
}
|
||||
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||
|
@ -230,8 +229,6 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
|||
|
||||
TFileReader* reader = tfileReaderCreate(wc);
|
||||
return reader;
|
||||
|
||||
// tfileSerialCacheKey(&key, buf);
|
||||
}
|
||||
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
|
||||
// char pathBuf[128] = {0};
|
||||
|
@ -325,15 +322,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
|||
tfileWriterClose(tw);
|
||||
return -1;
|
||||
}
|
||||
// write fst
|
||||
|
||||
// write data
|
||||
indexError("--------Begin----------------");
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
// TODO, fst batch write later
|
||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||
if (tfileWriteData(tw, v) == 0) {
|
||||
//
|
||||
if (tfileWriteData(tw, v) != 0) {
|
||||
indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
||||
(int)taosArrayGetSize(v->tableId));
|
||||
} else {
|
||||
indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
||||
(int)taosArrayGetSize(v->tableId));
|
||||
}
|
||||
indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId));
|
||||
}
|
||||
indexError("--------End----------------");
|
||||
fstBuilderFinish(tw->fb);
|
||||
|
@ -369,9 +370,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
|||
if (tfile == NULL) { return ret; }
|
||||
IndexTFile* pTfile = (IndexTFile*)tfile;
|
||||
|
||||
SIndexTerm* term = query->term;
|
||||
TFileCacheKey key = {
|
||||
.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||
SIndexTerm* term = query->term;
|
||||
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
||||
if (reader == NULL) { return 0; }
|
||||
|
||||
|
@ -453,9 +453,9 @@ void tfileIteratorDestroy(Iterate* iter) {
|
|||
free(iter);
|
||||
}
|
||||
|
||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
|
||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
|
||||
if (tf == NULL) { return NULL; }
|
||||
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||
return tfileCacheGet(tf->cache, &key);
|
||||
}
|
||||
|
||||
|
@ -650,10 +650,3 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
|
|||
}
|
||||
return -1;
|
||||
}
|
||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
|
||||
// SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
||||
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
||||
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
||||
}
|
||||
|
|
|
@ -477,7 +477,7 @@ class CacheObj {
|
|||
public:
|
||||
CacheObj() {
|
||||
// TODO
|
||||
cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY);
|
||||
cache = indexCacheCreate(NULL, 0, "voltage", TSDB_DATA_TYPE_BINARY);
|
||||
}
|
||||
int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) {
|
||||
int ret = indexCachePut(cache, term, uid);
|
||||
|
|
Loading…
Reference in New Issue