commit
76b8a76c1d
|
@ -108,8 +108,17 @@ void iterateValueDestroy(IterateValue* iv, bool destroy);
|
||||||
|
|
||||||
extern void* indexQhandle;
|
extern void* indexQhandle;
|
||||||
|
|
||||||
|
typedef struct TFileCacheKey {
|
||||||
|
uint64_t suid;
|
||||||
|
uint8_t colType;
|
||||||
|
char* colName;
|
||||||
|
int32_t nColName;
|
||||||
|
} ICacheKey;
|
||||||
|
|
||||||
int indexFlushCacheTFile(SIndex* sIdx, void*);
|
int indexFlushCacheTFile(SIndex* sIdx, void*);
|
||||||
|
|
||||||
|
int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
|
||||||
|
|
||||||
#define indexFatal(...) \
|
#define indexFatal(...) \
|
||||||
do { \
|
do { \
|
||||||
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
|
if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); } \
|
||||||
|
|
|
@ -42,6 +42,7 @@ typedef struct IndexCache {
|
||||||
int32_t version;
|
int32_t version;
|
||||||
int32_t nTerm;
|
int32_t nTerm;
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
uint64_t suid;
|
||||||
|
|
||||||
pthread_mutex_t mtx;
|
pthread_mutex_t mtx;
|
||||||
} IndexCache;
|
} IndexCache;
|
||||||
|
@ -58,7 +59,7 @@ typedef struct CacheTerm {
|
||||||
} CacheTerm;
|
} CacheTerm;
|
||||||
//
|
//
|
||||||
|
|
||||||
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type);
|
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type);
|
||||||
|
|
||||||
void indexCacheDestroy(void* cache);
|
void indexCacheDestroy(void* cache);
|
||||||
|
|
||||||
|
|
|
@ -49,13 +49,6 @@ typedef struct TFileValue {
|
||||||
int32_t offset;
|
int32_t offset;
|
||||||
} TFileValue;
|
} TFileValue;
|
||||||
|
|
||||||
typedef struct TFileCacheKey {
|
|
||||||
uint64_t suid;
|
|
||||||
uint8_t colType;
|
|
||||||
char* colName;
|
|
||||||
int32_t nColName;
|
|
||||||
} TFileCacheKey;
|
|
||||||
|
|
||||||
// table cache
|
// table cache
|
||||||
// refactor to LRU cache later
|
// refactor to LRU cache later
|
||||||
typedef struct TFileCache {
|
typedef struct TFileCache {
|
||||||
|
@ -103,10 +96,10 @@ typedef struct TFileReaderOpt {
|
||||||
// tfile cache, manage tindex reader
|
// tfile cache, manage tindex reader
|
||||||
TFileCache* tfileCacheCreate(const char* path);
|
TFileCache* tfileCacheCreate(const char* path);
|
||||||
void tfileCacheDestroy(TFileCache* tcache);
|
void tfileCacheDestroy(TFileCache* tcache);
|
||||||
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key);
|
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key);
|
||||||
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader);
|
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader);
|
||||||
|
|
||||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName);
|
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
|
||||||
|
|
||||||
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
|
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
|
||||||
TFileReader* tfileReaderCreate(WriterCtx* ctx);
|
TFileReader* tfileReaderCreate(WriterCtx* ctx);
|
||||||
|
@ -124,6 +117,7 @@ int tfileWriterFinish(TFileWriter* tw);
|
||||||
|
|
||||||
//
|
//
|
||||||
IndexTFile* indexTFileCreate(const char* path);
|
IndexTFile* indexTFileCreate(const char* path);
|
||||||
|
void indexTFileDestroy(IndexTFile* tfile);
|
||||||
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
|
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
|
||||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
|
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,7 @@ extern "C" {
|
||||||
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
|
#define SERIALIZE_VAR_TO_BUF(buf, var, type) \
|
||||||
do { \
|
do { \
|
||||||
type c = var; \
|
type c = var; \
|
||||||
assert(sizeof(var) == sizeof(type)); \
|
assert(sizeof(type) == sizeof(c)); \
|
||||||
memcpy((void*)buf, (void*)&c, sizeof(c)); \
|
memcpy((void*)buf, (void*)&c, sizeof(c)); \
|
||||||
buf += sizeof(c); \
|
buf += sizeof(c); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "indexInt.h"
|
#include "indexInt.h"
|
||||||
#include "index_cache.h"
|
#include "index_cache.h"
|
||||||
#include "index_tfile.h"
|
#include "index_tfile.h"
|
||||||
|
#include "index_util.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tsched.h"
|
#include "tsched.h"
|
||||||
|
|
||||||
|
@ -102,6 +103,7 @@ void indexClose(SIndex* sIdx) {
|
||||||
}
|
}
|
||||||
taosHashCleanup(sIdx->colObj);
|
taosHashCleanup(sIdx->colObj);
|
||||||
pthread_mutex_destroy(&sIdx->mtx);
|
pthread_mutex_destroy(&sIdx->mtx);
|
||||||
|
indexTFileDestroy(sIdx->tindex);
|
||||||
#endif
|
#endif
|
||||||
free(sIdx->path);
|
free(sIdx->path);
|
||||||
free(sIdx);
|
free(sIdx);
|
||||||
|
@ -130,18 +132,28 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
|
||||||
// TODO(yihao): reduce the lock range
|
// TODO(yihao): reduce the lock range
|
||||||
pthread_mutex_lock(&index->mtx);
|
pthread_mutex_lock(&index->mtx);
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
|
||||||
|
char buf[128] = {0};
|
||||||
|
ICacheKey key = {.suid = p->suid, .colName = p->colName};
|
||||||
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
|
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
IndexCache* pCache = indexCacheCreate(index, p->colName, p->colType);
|
IndexCache* pCache = indexCacheCreate(index, p->suid, p->colName, p->colType);
|
||||||
taosHashPut(index->colObj, p->colName, p->nColName, &pCache, sizeof(void*));
|
taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&index->mtx);
|
pthread_mutex_unlock(&index->mtx);
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
|
||||||
SIndexTerm* p = taosArrayGetP(fVals, i);
|
SIndexTerm* p = taosArrayGetP(fVals, i);
|
||||||
IndexCache** cache = taosHashGet(index->colObj, p->colName, p->nColName);
|
|
||||||
|
char buf[128] = {0};
|
||||||
|
ICacheKey key = {.suid = p->suid, .colName = p->colName};
|
||||||
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
|
IndexCache** cache = taosHashGet(index->colObj, buf, sz);
|
||||||
assert(*cache != NULL);
|
assert(*cache != NULL);
|
||||||
int ret = indexCachePut(*cache, p, uid);
|
int ret = indexCachePut(*cache, p, uid);
|
||||||
if (ret != 0) { return ret; }
|
if (ret != 0) { return ret; }
|
||||||
|
@ -296,7 +308,12 @@ static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* query, SArray** result
|
||||||
// Get col info
|
// Get col info
|
||||||
IndexCache* cache = NULL;
|
IndexCache* cache = NULL;
|
||||||
pthread_mutex_lock(&sIdx->mtx);
|
pthread_mutex_lock(&sIdx->mtx);
|
||||||
IndexCache** pCache = taosHashGet(sIdx->colObj, colName, nColName);
|
|
||||||
|
char buf[128] = {0};
|
||||||
|
ICacheKey key = {.suid = term->suid, .colName = term->colName};
|
||||||
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
|
||||||
|
IndexCache** pCache = taosHashGet(sIdx->colObj, buf, sz);
|
||||||
if (pCache == NULL) {
|
if (pCache == NULL) {
|
||||||
pthread_mutex_unlock(&sIdx->mtx);
|
pthread_mutex_unlock(&sIdx->mtx);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -360,6 +377,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
|
||||||
if (sz > 0) {
|
if (sz > 0) {
|
||||||
// TODO(yihao): remove duplicate tableid
|
// TODO(yihao): remove duplicate tableid
|
||||||
TFileValue* lv = taosArrayGetP(result, sz - 1);
|
TFileValue* lv = taosArrayGetP(result, sz - 1);
|
||||||
|
// indexError("merge colVal: %s", lv->colVal);
|
||||||
if (strcmp(lv->colVal, tv->colVal) == 0) {
|
if (strcmp(lv->colVal, tv->colVal) == 0) {
|
||||||
taosArrayAddAll(lv->tableId, tv->tableId);
|
taosArrayAddAll(lv->tableId, tv->tableId);
|
||||||
tfileValueDestroy(tv);
|
tfileValueDestroy(tv);
|
||||||
|
@ -368,6 +386,7 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
taosArrayPush(result, &tv);
|
taosArrayPush(result, &tv);
|
||||||
|
// indexError("merge colVal: %s", tv->colVal);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void indexDestroyTempResult(SArray* result) {
|
static void indexDestroyTempResult(SArray* result) {
|
||||||
|
@ -383,10 +402,12 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||||
|
|
||||||
IndexCache* pCache = (IndexCache*)cache;
|
IndexCache* pCache = (IndexCache*)cache;
|
||||||
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
|
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName);
|
||||||
|
if (pReader == NULL) { indexWarn("empty pReader found"); }
|
||||||
// handle flush
|
// handle flush
|
||||||
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
|
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
|
||||||
Iterate* tfileIter = tfileIteratorCreate(pReader);
|
Iterate* tfileIter = tfileIteratorCreate(pReader);
|
||||||
|
if (tfileIter == NULL) { indexWarn("empty tfile reader iterator"); }
|
||||||
|
|
||||||
SArray* result = taosArrayInit(1024, sizeof(void*));
|
SArray* result = taosArrayInit(1024, sizeof(void*));
|
||||||
|
|
||||||
|
@ -459,14 +480,14 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
|
||||||
} else {
|
} else {
|
||||||
if (value->val != NULL) { taosArrayClear(value->val); }
|
if (value->val != NULL) { taosArrayClear(value->val); }
|
||||||
}
|
}
|
||||||
// free(value->colVal);
|
free(value->colVal);
|
||||||
value->colVal = NULL;
|
value->colVal = NULL;
|
||||||
}
|
}
|
||||||
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
int32_t version = CACHE_VERSION(cache);
|
int32_t version = CACHE_VERSION(cache);
|
||||||
uint8_t colType = cache->type;
|
uint8_t colType = cache->type;
|
||||||
|
|
||||||
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, cache->colName, colType);
|
TFileWriter* tw = tfileWriterOpen(sIdx->path, cache->suid, version, cache->colName, colType);
|
||||||
if (tw == NULL) {
|
if (tw == NULL) {
|
||||||
indexError("failed to open file to write");
|
indexError("failed to open file to write");
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -479,14 +500,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
}
|
}
|
||||||
tfileWriterClose(tw);
|
tfileWriterClose(tw);
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderOpen(sIdx->path, sIdx->suid, version, cache->colName);
|
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
|
||||||
|
|
||||||
|
char buf[128] = {0};
|
||||||
|
TFileHeader* header = &reader->header;
|
||||||
|
ICacheKey key = {
|
||||||
|
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
|
||||||
|
|
||||||
char buf[128] = {0};
|
|
||||||
TFileHeader* header = &reader->header;
|
|
||||||
TFileCacheKey key = {.suid = header->suid,
|
|
||||||
.colName = header->colName,
|
|
||||||
.nColName = strlen(header->colName),
|
|
||||||
.colType = header->colType};
|
|
||||||
pthread_mutex_lock(&sIdx->mtx);
|
pthread_mutex_lock(&sIdx->mtx);
|
||||||
|
|
||||||
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
|
IndexTFile* ifile = (IndexTFile*)sIdx->tindex;
|
||||||
|
@ -497,3 +517,13 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
END:
|
END:
|
||||||
tfileWriterClose(tw);
|
tfileWriterClose(tw);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t indexSerialCacheKey(ICacheKey* key, char* buf) {
|
||||||
|
char* p = buf;
|
||||||
|
SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
||||||
|
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||||
|
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
||||||
|
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||||
|
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
||||||
|
return buf - p;
|
||||||
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
#define MAX_INDEX_KEY_LEN 256 // test only, change later
|
||||||
|
|
||||||
#define MEM_TERM_LIMIT 10000 * 10
|
#define MEM_TERM_LIMIT 10 * 10000
|
||||||
// ref index_cache.h:22
|
// ref index_cache.h:22
|
||||||
//#define CACHE_KEY_LEN(p) \
|
//#define CACHE_KEY_LEN(p) \
|
||||||
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
|
||||||
|
@ -40,7 +40,7 @@ static bool indexCacheIteratorNext(Iterate* itera);
|
||||||
|
|
||||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
static IterateValue* indexCacheIteratorGetValue(Iterate* iter);
|
||||||
|
|
||||||
IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type) {
|
||||||
IndexCache* cache = calloc(1, sizeof(IndexCache));
|
IndexCache* cache = calloc(1, sizeof(IndexCache));
|
||||||
if (cache == NULL) {
|
if (cache == NULL) {
|
||||||
indexError("failed to create index cache");
|
indexError("failed to create index cache");
|
||||||
|
@ -53,7 +53,7 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type) {
|
||||||
cache->type = type;
|
cache->type = type;
|
||||||
cache->index = idx;
|
cache->index = idx;
|
||||||
cache->version = 0;
|
cache->version = 0;
|
||||||
|
cache->suid = suid;
|
||||||
pthread_mutex_init(&cache->mtx, NULL);
|
pthread_mutex_init(&cache->mtx, NULL);
|
||||||
indexCacheRef(cache);
|
indexCacheRef(cache);
|
||||||
return cache;
|
return cache;
|
||||||
|
@ -150,6 +150,7 @@ Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
||||||
|
|
||||||
MemTable* tbl = cache->imm;
|
MemTable* tbl = cache->imm;
|
||||||
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
iiter->val.colVal = NULL;
|
||||||
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
|
iiter->iter = tbl != NULL ? tSkipListCreateIter(tbl->mem) : NULL;
|
||||||
iiter->next = indexCacheIteratorNext;
|
iiter->next = indexCacheIteratorNext;
|
||||||
iiter->getValue = indexCacheIteratorGetValue;
|
iiter->getValue = indexCacheIteratorGetValue;
|
||||||
|
@ -353,6 +354,9 @@ static bool indexCacheIteratorNext(Iterate* itera) {
|
||||||
SSkipListIterator* iter = itera->iter;
|
SSkipListIterator* iter = itera->iter;
|
||||||
if (iter == NULL) { return false; }
|
if (iter == NULL) { return false; }
|
||||||
IterateValue* iv = &itera->val;
|
IterateValue* iv = &itera->val;
|
||||||
|
if (iv->colVal != NULL && iv->val != NULL) {
|
||||||
|
// indexError("value in cache: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
|
||||||
|
}
|
||||||
iterateValueDestroy(iv, false);
|
iterateValueDestroy(iv, false);
|
||||||
|
|
||||||
bool next = tSkipListIterNext(iter);
|
bool next = tSkipListIterNext(iter);
|
||||||
|
|
|
@ -319,7 +319,7 @@ void fstStateSetCommInput(FstState* s, uint8_t inp) {
|
||||||
assert(s->state == OneTransNext || s->state == OneTrans);
|
assert(s->state == OneTransNext || s->state == OneTrans);
|
||||||
|
|
||||||
uint8_t val;
|
uint8_t val;
|
||||||
COMMON_INDEX(inp, 0x111111, val);
|
COMMON_INDEX(inp, 0b111111, val);
|
||||||
s->val = (s->val & fstStateDict[s->state].val) | val;
|
s->val = (s->val & fstStateDict[s->state].val) | val;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -369,7 +369,7 @@ uint8_t fstStateInput(FstState* s, FstNode* node) {
|
||||||
bool null = false;
|
bool null = false;
|
||||||
uint8_t inp = fstStateCommInput(s, &null);
|
uint8_t inp = fstStateCommInput(s, &null);
|
||||||
uint8_t* data = fstSliceData(slice, NULL);
|
uint8_t* data = fstSliceData(slice, NULL);
|
||||||
return null == false ? inp : data[-1];
|
return null == false ? inp : data[node->start - 1];
|
||||||
}
|
}
|
||||||
uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
uint8_t fstStateInputForAnyTrans(FstState* s, FstNode* node, uint64_t i) {
|
||||||
assert(s->state == AnyTrans);
|
assert(s->state == AnyTrans);
|
||||||
|
@ -1062,6 +1062,7 @@ Output fstEmptyFinalOutput(Fst* fst, bool* null) {
|
||||||
} else {
|
} else {
|
||||||
*null = true;
|
*null = true;
|
||||||
}
|
}
|
||||||
|
fstNodeDestroy(node);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1286,6 +1287,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
|
||||||
StreamWithStateResult* result = swsResultCreate(&slice, fOutput, tState);
|
StreamWithStateResult* result = swsResultCreate(&slice, fOutput, tState);
|
||||||
free(buf);
|
free(buf);
|
||||||
fstSliceDestroy(&slice);
|
fstSliceDestroy(&slice);
|
||||||
|
taosArrayDestroy(nodes);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
free(buf);
|
free(buf);
|
||||||
|
|
|
@ -51,7 +51,6 @@ static void tfileDestroyFileName(void* elem);
|
||||||
static int tfileCompare(const void* a, const void* b);
|
static int tfileCompare(const void* a, const void* b);
|
||||||
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
|
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
|
||||||
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
|
static void tfileGenFileName(char* filename, uint64_t suid, int colId, int version);
|
||||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
|
||||||
|
|
||||||
TFileCache* tfileCacheCreate(const char* path) {
|
TFileCache* tfileCacheCreate(const char* path) {
|
||||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||||
|
@ -80,18 +79,18 @@ TFileCache* tfileCacheCreate(const char* path) {
|
||||||
goto End;
|
goto End;
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
TFileReader* reader = tfileReaderCreate(wc);
|
TFileReader* reader = tfileReaderCreate(wc);
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
TFileCacheKey key = {.suid = header->suid,
|
ICacheKey key = {.suid = header->suid,
|
||||||
.colName = header->colName,
|
.colName = header->colName,
|
||||||
.nColName = strlen(header->colName),
|
.nColName = strlen(header->colName),
|
||||||
.colType = header->colType};
|
.colType = header->colType};
|
||||||
tfileSerialCacheKey(&key, buf);
|
|
||||||
|
|
||||||
|
int32_t sz = indexSerialCacheKey(&key, buf);
|
||||||
|
assert(sz < sizeof(buf));
|
||||||
|
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
|
||||||
tfileReaderRef(reader);
|
tfileReaderRef(reader);
|
||||||
// indexTable
|
|
||||||
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
|
|
||||||
}
|
}
|
||||||
taosArrayDestroyEx(files, tfileDestroyFileName);
|
taosArrayDestroyEx(files, tfileDestroyFileName);
|
||||||
return tcache;
|
return tcache;
|
||||||
|
@ -117,30 +116,30 @@ void tfileCacheDestroy(TFileCache* tcache) {
|
||||||
free(tcache);
|
free(tcache);
|
||||||
}
|
}
|
||||||
|
|
||||||
TFileReader* tfileCacheGet(TFileCache* tcache, TFileCacheKey* key) {
|
TFileReader* tfileCacheGet(TFileCache* tcache, ICacheKey* key) {
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
tfileSerialCacheKey(key, buf);
|
int32_t sz = indexSerialCacheKey(key, buf);
|
||||||
|
assert(sz < sizeof(buf));
|
||||||
TFileReader** reader = taosHashGet(tcache->tableCache, buf, strlen(buf));
|
TFileReader** reader = taosHashGet(tcache->tableCache, buf, sz);
|
||||||
if (reader == NULL) { return NULL; }
|
if (reader == NULL) { return NULL; }
|
||||||
tfileReaderRef(*reader);
|
tfileReaderRef(*reader);
|
||||||
|
|
||||||
return *reader;
|
return *reader;
|
||||||
}
|
}
|
||||||
void tfileCachePut(TFileCache* tcache, TFileCacheKey* key, TFileReader* reader) {
|
void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* reader) {
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
tfileSerialCacheKey(key, buf);
|
int32_t sz = indexSerialCacheKey(key, buf);
|
||||||
// remove last version index reader
|
// remove last version index reader
|
||||||
TFileReader** p = taosHashGet(tcache->tableCache, buf, strlen(buf));
|
TFileReader** p = taosHashGet(tcache->tableCache, buf, sz);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
TFileReader* oldReader = *p;
|
TFileReader* oldReader = *p;
|
||||||
taosHashRemove(tcache->tableCache, buf, strlen(buf));
|
taosHashRemove(tcache->tableCache, buf, sz);
|
||||||
oldReader->remove = true;
|
oldReader->remove = true;
|
||||||
tfileReaderUnRef(oldReader);
|
tfileReaderUnRef(oldReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
|
||||||
tfileReaderRef(reader);
|
tfileReaderRef(reader);
|
||||||
taosHashPut(tcache->tableCache, buf, strlen(buf), &reader, sizeof(void*));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||||
|
@ -230,8 +229,6 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderCreate(wc);
|
TFileReader* reader = tfileReaderCreate(wc);
|
||||||
return reader;
|
return reader;
|
||||||
|
|
||||||
// tfileSerialCacheKey(&key, buf);
|
|
||||||
}
|
}
|
||||||
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
|
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
|
||||||
// char pathBuf[128] = {0};
|
// char pathBuf[128] = {0};
|
||||||
|
@ -325,15 +322,19 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
||||||
tfileWriterClose(tw);
|
tfileWriterClose(tw);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// write fst
|
|
||||||
|
// write data
|
||||||
indexError("--------Begin----------------");
|
indexError("--------Begin----------------");
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
// TODO, fst batch write later
|
// TODO, fst batch write later
|
||||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||||
if (tfileWriteData(tw, v) == 0) {
|
if (tfileWriteData(tw, v) != 0) {
|
||||||
//
|
indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
||||||
|
(int)taosArrayGetSize(v->tableId));
|
||||||
|
} else {
|
||||||
|
indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
||||||
|
(int)taosArrayGetSize(v->tableId));
|
||||||
}
|
}
|
||||||
indexError("data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId));
|
|
||||||
}
|
}
|
||||||
indexError("--------End----------------");
|
indexError("--------End----------------");
|
||||||
fstBuilderFinish(tw->fb);
|
fstBuilderFinish(tw->fb);
|
||||||
|
@ -359,7 +360,7 @@ IndexTFile* indexTFileCreate(const char* path) {
|
||||||
tfile->cache = tfileCacheCreate(path);
|
tfile->cache = tfileCacheCreate(path);
|
||||||
return tfile;
|
return tfile;
|
||||||
}
|
}
|
||||||
void IndexTFileDestroy(IndexTFile* tfile) {
|
void indexTFileDestroy(IndexTFile* tfile) {
|
||||||
tfileCacheDestroy(tfile->cache);
|
tfileCacheDestroy(tfile->cache);
|
||||||
free(tfile);
|
free(tfile);
|
||||||
}
|
}
|
||||||
|
@ -369,9 +370,8 @@ int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result) {
|
||||||
if (tfile == NULL) { return ret; }
|
if (tfile == NULL) { return ret; }
|
||||||
IndexTFile* pTfile = (IndexTFile*)tfile;
|
IndexTFile* pTfile = (IndexTFile*)tfile;
|
||||||
|
|
||||||
SIndexTerm* term = query->term;
|
SIndexTerm* term = query->term;
|
||||||
TFileCacheKey key = {
|
ICacheKey key = {.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
||||||
.suid = term->suid, .colType = term->colType, .colName = term->colName, .nColName = term->nColName};
|
|
||||||
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
TFileReader* reader = tfileCacheGet(pTfile->cache, &key);
|
||||||
if (reader == NULL) { return 0; }
|
if (reader == NULL) { return 0; }
|
||||||
|
|
||||||
|
@ -385,8 +385,10 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
|
||||||
}
|
}
|
||||||
static bool tfileIteratorNext(Iterate* iiter) {
|
static bool tfileIteratorNext(Iterate* iiter) {
|
||||||
IterateValue* iv = &iiter->val;
|
IterateValue* iv = &iiter->val;
|
||||||
|
if (iv->colVal != NULL && iv->val != NULL) {
|
||||||
|
// indexError("value in fst: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
|
||||||
|
}
|
||||||
iterateValueDestroy(iv, false);
|
iterateValueDestroy(iv, false);
|
||||||
// SArray* tblIds = iv->val;
|
|
||||||
|
|
||||||
char* colVal = NULL;
|
char* colVal = NULL;
|
||||||
uint64_t offset = 0;
|
uint64_t offset = 0;
|
||||||
|
@ -406,14 +408,14 @@ static bool tfileIteratorNext(Iterate* iiter) {
|
||||||
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
|
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
|
||||||
|
|
||||||
iv->colVal = colVal;
|
iv->colVal = colVal;
|
||||||
|
return true;
|
||||||
// std::string key(ch, sz);
|
// std::string key(ch, sz);
|
||||||
}
|
}
|
||||||
|
|
||||||
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
|
static IterateValue* tifileIterateGetValue(Iterate* iter) { return &iter->val; }
|
||||||
|
|
||||||
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
||||||
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
|
TFileFstIter* tIter = calloc(1, sizeof(TFileFstIter));
|
||||||
if (tIter == NULL) { return NULL; }
|
if (tIter == NULL) { return NULL; }
|
||||||
|
|
||||||
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
|
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
|
||||||
|
@ -435,6 +437,7 @@ Iterate* tfileIteratorCreate(TFileReader* reader) {
|
||||||
iter->next = tfileIteratorNext;
|
iter->next = tfileIteratorNext;
|
||||||
iter->getValue = tifileIterateGetValue;
|
iter->getValue = tifileIterateGetValue;
|
||||||
iter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
iter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
||||||
|
iter->val.colVal = NULL;
|
||||||
return iter;
|
return iter;
|
||||||
}
|
}
|
||||||
void tfileIteratorDestroy(Iterate* iter) {
|
void tfileIteratorDestroy(Iterate* iter) {
|
||||||
|
@ -447,13 +450,14 @@ void tfileIteratorDestroy(Iterate* iter) {
|
||||||
streamWithStateDestroy(tIter->st);
|
streamWithStateDestroy(tIter->st);
|
||||||
fstStreamBuilderDestroy(tIter->fb);
|
fstStreamBuilderDestroy(tIter->fb);
|
||||||
automCtxDestroy(tIter->ctx);
|
automCtxDestroy(tIter->ctx);
|
||||||
|
free(tIter);
|
||||||
|
|
||||||
free(iter);
|
free(iter);
|
||||||
}
|
}
|
||||||
|
|
||||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
|
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName) {
|
||||||
if (tf == NULL) { return NULL; }
|
if (tf == NULL) { return NULL; }
|
||||||
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
ICacheKey key = {.suid = suid, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||||
return tfileCacheGet(tf->cache, &key);
|
return tfileCacheGet(tf->cache, &key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,7 +484,7 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
||||||
TFileValue* tfileValueCreate(char* val) {
|
TFileValue* tfileValueCreate(char* val) {
|
||||||
TFileValue* tf = calloc(1, sizeof(TFileValue));
|
TFileValue* tf = calloc(1, sizeof(TFileValue));
|
||||||
if (tf == NULL) { return NULL; }
|
if (tf == NULL) { return NULL; }
|
||||||
tf->colVal = val;
|
tf->colVal = tstrdup(val);
|
||||||
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
||||||
return tf;
|
return tf;
|
||||||
}
|
}
|
||||||
|
@ -491,6 +495,7 @@ int tfileValuePush(TFileValue* tf, uint64_t val) {
|
||||||
}
|
}
|
||||||
void tfileValueDestroy(TFileValue* tf) {
|
void tfileValueDestroy(TFileValue* tf) {
|
||||||
taosArrayDestroy(tf->tableId);
|
taosArrayDestroy(tf->tableId);
|
||||||
|
free(tf->colVal);
|
||||||
free(tf);
|
free(tf);
|
||||||
}
|
}
|
||||||
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
|
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
|
||||||
|
@ -648,10 +653,3 @@ static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId,
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
|
|
||||||
// SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
|
||||||
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
|
||||||
// SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
|
||||||
// SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
|
||||||
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
|
||||||
}
|
|
||||||
|
|
|
@ -24,8 +24,13 @@ class FstWriter {
|
||||||
_b = fstBuilderCreate(_wc, 0);
|
_b = fstBuilderCreate(_wc, 0);
|
||||||
}
|
}
|
||||||
bool Put(const std::string& key, uint64_t val) {
|
bool Put(const std::string& key, uint64_t val) {
|
||||||
|
// char buf[128] = {0};
|
||||||
|
// int len = 0;
|
||||||
|
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
|
||||||
|
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
|
||||||
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
|
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
|
||||||
bool ok = fstBuilderInsert(_b, skey, val);
|
bool ok = fstBuilderInsert(_b, skey, val);
|
||||||
|
|
||||||
fstSliceDestroy(&skey);
|
fstSliceDestroy(&skey);
|
||||||
return ok;
|
return ok;
|
||||||
}
|
}
|
||||||
|
@ -61,6 +66,11 @@ class FstReadMemory {
|
||||||
return _fst != NULL;
|
return _fst != NULL;
|
||||||
}
|
}
|
||||||
bool Get(const std::string& key, uint64_t* val) {
|
bool Get(const std::string& key, uint64_t* val) {
|
||||||
|
// char buf[128] = {0};
|
||||||
|
// int len = 0;
|
||||||
|
// taosMbsToUcs4(key.c_str(), key.size(), buf, 128, &len);
|
||||||
|
// FstSlice skey = fstSliceCreate((uint8_t*)buf, len);
|
||||||
|
|
||||||
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
|
FstSlice skey = fstSliceCreate((uint8_t*)key.c_str(), key.size());
|
||||||
bool ok = fstGet(_fst, &skey, val);
|
bool ok = fstGet(_fst, &skey, val);
|
||||||
fstSliceDestroy(&skey);
|
fstSliceDestroy(&skey);
|
||||||
|
@ -135,15 +145,109 @@ int Performance_fstWriteRecords(FstWriter* b) {
|
||||||
}
|
}
|
||||||
return L * M * N;
|
return L * M * N;
|
||||||
}
|
}
|
||||||
|
void Performance_fstReadRecords(FstReadMemory* m) {
|
||||||
|
std::string str("aa");
|
||||||
|
for (int i = 0; i < M; i++) {
|
||||||
|
str[0] = 'a' + i;
|
||||||
|
str.resize(2);
|
||||||
|
for (int j = 0; j < N; j++) {
|
||||||
|
str[1] = 'a' + j;
|
||||||
|
str.resize(2);
|
||||||
|
for (int k = 0; k < L; k++) {
|
||||||
|
str.push_back('a');
|
||||||
|
uint64_t val, cost;
|
||||||
|
if (m->GetWithTimeCostUs(str, &val, &cost)) {
|
||||||
|
printf("succes to get kv(%s, %" PRId64 "), cost: %" PRId64 "\n", str.c_str(), val, cost);
|
||||||
|
} else {
|
||||||
|
printf("failed to get key: %s\n", str.c_str());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void checkMillonWriteAndReadOfFst() {
|
||||||
|
tfInit();
|
||||||
|
FstWriter* fw = new FstWriter;
|
||||||
|
Performance_fstWriteRecords(fw);
|
||||||
|
delete fw;
|
||||||
|
FstReadMemory* fr = new FstReadMemory(1024 * 64 * 1024);
|
||||||
|
|
||||||
|
if (fr->init()) { printf("success to init fst read"); }
|
||||||
|
|
||||||
|
Performance_fstReadRecords(fr);
|
||||||
|
tfCleanup();
|
||||||
|
delete fr;
|
||||||
|
}
|
||||||
|
void checkFstLongTerm() {
|
||||||
|
tfInit();
|
||||||
|
FstWriter* fw = new FstWriter;
|
||||||
|
// Performance_fstWriteRecords(fw);
|
||||||
|
|
||||||
|
fw->Put("A B", 1);
|
||||||
|
fw->Put("C", 2);
|
||||||
|
fw->Put("a", 3);
|
||||||
|
delete fw;
|
||||||
|
|
||||||
|
FstReadMemory* m = new FstReadMemory(1024 * 64);
|
||||||
|
if (m->init() == false) {
|
||||||
|
std::cout << "init readMemory failed" << std::endl;
|
||||||
|
delete m;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
{
|
||||||
|
uint64_t val = 0;
|
||||||
|
if (m->Get("A B", &val)) {
|
||||||
|
std::cout << "success to Get: " << val << std::endl;
|
||||||
|
} else {
|
||||||
|
std::cout << "failed to Get:" << val << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
uint64_t val = 0;
|
||||||
|
if (m->Get("C", &val)) {
|
||||||
|
std::cout << "success to Get: " << val << std::endl;
|
||||||
|
} else {
|
||||||
|
std::cout << "failed to Get:" << val << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
uint64_t val = 0;
|
||||||
|
if (m->Get("a", &val)) {
|
||||||
|
std::cout << "success to Get: " << val << std::endl;
|
||||||
|
} else {
|
||||||
|
std::cout << "failed to Get:" << val << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// prefix search
|
||||||
|
// std::vector<uint64_t> result;
|
||||||
|
|
||||||
|
// AutomationCtx* ctx = automCtxCreate((void*)"ab", AUTOMATION_ALWAYS);
|
||||||
|
// m->Search(ctx, result);
|
||||||
|
// std::cout << "size: " << result.size() << std::endl;
|
||||||
|
// assert(result.size() == count);
|
||||||
|
// for (int i = 0; i < result.size(); i++) {
|
||||||
|
// assert(result[i] == i); // check result
|
||||||
|
//}
|
||||||
|
tfCleanup();
|
||||||
|
// free(ctx);
|
||||||
|
// delete m;
|
||||||
|
}
|
||||||
void checkFstCheckIterator() {
|
void checkFstCheckIterator() {
|
||||||
tfInit();
|
tfInit();
|
||||||
FstWriter* fw = new FstWriter;
|
FstWriter* fw = new FstWriter;
|
||||||
int64_t s = taosGetTimestampUs();
|
int64_t s = taosGetTimestampUs();
|
||||||
int count = 2;
|
int count = 2;
|
||||||
Performance_fstWriteRecords(fw);
|
// Performance_fstWriteRecords(fw);
|
||||||
int64_t e = taosGetTimestampUs();
|
int64_t e = taosGetTimestampUs();
|
||||||
|
|
||||||
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
|
std::cout << "insert data count : " << count << "elapas time: " << e - s << std::endl;
|
||||||
|
|
||||||
|
fw->Put("Hello world", 1);
|
||||||
|
fw->Put("hello world", 2);
|
||||||
|
fw->Put("hello worle", 3);
|
||||||
|
fw->Put("hello worlf", 4);
|
||||||
delete fw;
|
delete fw;
|
||||||
|
|
||||||
FstReadMemory* m = new FstReadMemory(1024 * 64);
|
FstReadMemory* m = new FstReadMemory(1024 * 64);
|
||||||
|
@ -171,7 +275,7 @@ void checkFstCheckIterator() {
|
||||||
|
|
||||||
void fst_get(Fst* fst) {
|
void fst_get(Fst* fst) {
|
||||||
for (int i = 0; i < 10000; i++) {
|
for (int i = 0; i < 10000; i++) {
|
||||||
std::string term = "Hello";
|
std::string term = "Hello World";
|
||||||
FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size());
|
FstSlice key = fstSliceCreate((uint8_t*)term.c_str(), term.size());
|
||||||
uint64_t offset = 0;
|
uint64_t offset = 0;
|
||||||
bool ret = fstGet(fst, &key, &offset);
|
bool ret = fstGet(fst, &key, &offset);
|
||||||
|
@ -189,7 +293,7 @@ void validateTFile(char* arg) {
|
||||||
|
|
||||||
std::thread threads[NUM_OF_THREAD];
|
std::thread threads[NUM_OF_THREAD];
|
||||||
// std::vector<std::thread> threads;
|
// std::vector<std::thread> threads;
|
||||||
TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1");
|
TFileReader* reader = tfileReaderOpen(arg, 0, 999992, "tag1");
|
||||||
|
|
||||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||||
threads[i] = std::thread(fst_get, reader->fst);
|
threads[i] = std::thread(fst_get, reader->fst);
|
||||||
|
@ -203,9 +307,12 @@ void validateTFile(char* arg) {
|
||||||
tfCleanup();
|
tfCleanup();
|
||||||
}
|
}
|
||||||
int main(int argc, char* argv[]) {
|
int main(int argc, char* argv[]) {
|
||||||
if (argc > 1) { validateTFile(argv[1]); }
|
// tool to check all kind of fst test
|
||||||
|
// if (argc > 1) { validateTFile(argv[1]); }
|
||||||
// checkFstCheckIterator();
|
// checkFstCheckIterator();
|
||||||
|
// checkFstLongTerm();
|
||||||
// checkFstPrefixSearch();
|
// checkFstPrefixSearch();
|
||||||
|
|
||||||
|
checkMillonWriteAndReadOfFst();
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -457,7 +457,10 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
|
||||||
// taosArrayPush(data, &v4);
|
// taosArrayPush(data, &v4);
|
||||||
|
|
||||||
fObj->Put(data);
|
fObj->Put(data);
|
||||||
for (size_t i = 0; i < taosArrayGetSize(data); i++) { destroyTFileValue(taosArrayGetP(data, i)); }
|
for (size_t i = 0; i < taosArrayGetSize(data); i++) {
|
||||||
|
// data
|
||||||
|
destroyTFileValue(taosArrayGetP(data, i));
|
||||||
|
}
|
||||||
taosArrayDestroy(data);
|
taosArrayDestroy(data);
|
||||||
|
|
||||||
std::string colName("voltage");
|
std::string colName("voltage");
|
||||||
|
@ -470,6 +473,7 @@ TEST_F(IndexTFileEnv, test_tfile_write) {
|
||||||
fObj->Get(&query, result);
|
fObj->Get(&query, result);
|
||||||
assert(taosArrayGetSize(result) == 200);
|
assert(taosArrayGetSize(result) == 200);
|
||||||
indexTermDestroy(term);
|
indexTermDestroy(term);
|
||||||
|
taosArrayDestroy(result);
|
||||||
|
|
||||||
// tfileWriterDestroy(twrite);
|
// tfileWriterDestroy(twrite);
|
||||||
}
|
}
|
||||||
|
@ -477,7 +481,7 @@ class CacheObj {
|
||||||
public:
|
public:
|
||||||
CacheObj() {
|
CacheObj() {
|
||||||
// TODO
|
// TODO
|
||||||
cache = indexCacheCreate(NULL, "voltage", TSDB_DATA_TYPE_BINARY);
|
cache = indexCacheCreate(NULL, 0, "voltage", TSDB_DATA_TYPE_BINARY);
|
||||||
}
|
}
|
||||||
int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) {
|
int Put(SIndexTerm* term, int16_t colId, int32_t version, uint64_t uid) {
|
||||||
int ret = indexCachePut(cache, term, uid);
|
int ret = indexCachePut(cache, term, uid);
|
||||||
|
@ -534,6 +538,7 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
|
indexTermDestroy(term);
|
||||||
// indexTermDestry(term);
|
// indexTermDestry(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
@ -541,24 +546,28 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v2");
|
std::string colVal("v2");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v3");
|
std::string colVal("v3");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
coj->Debug();
|
coj->Debug();
|
||||||
std::cout << "--------first----------" << std::endl;
|
std::cout << "--------first----------" << std::endl;
|
||||||
|
@ -567,12 +576,14 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, othColId, version++, suid++);
|
coj->Put(term, othColId, version++, suid++);
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v4");
|
std::string colVal("v4");
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, othColId, version++, suid++);
|
coj->Put(term, othColId, version++, suid++);
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
coj->Debug();
|
coj->Debug();
|
||||||
std::cout << "--------second----------" << std::endl;
|
std::cout << "--------second----------" << std::endl;
|
||||||
|
@ -583,6 +594,7 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
coj->Put(term, colId, version++, suid++);
|
coj->Put(term, colId, version++, suid++);
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
coj->Debug();
|
coj->Debug();
|
||||||
|
@ -598,6 +610,9 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
coj->Get(&query, colId, 10000, ret, &valType);
|
coj->Get(&query, colId, 10000, ret, &valType);
|
||||||
std::cout << "size : " << taosArrayGetSize(ret) << std::endl;
|
std::cout << "size : " << taosArrayGetSize(ret) << std::endl;
|
||||||
assert(taosArrayGetSize(ret) == 4);
|
assert(taosArrayGetSize(ret) == 4);
|
||||||
|
taosArrayDestroy(ret);
|
||||||
|
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
std::string colVal("v2");
|
std::string colVal("v2");
|
||||||
|
@ -609,6 +624,9 @@ TEST_F(IndexCacheEnv, cache_test) {
|
||||||
|
|
||||||
coj->Get(&query, colId, 10000, ret, &valType);
|
coj->Get(&query, colId, 10000, ret, &valType);
|
||||||
assert(taosArrayGetSize(ret) == 1);
|
assert(taosArrayGetSize(ret) == 1);
|
||||||
|
taosArrayDestroy(ret);
|
||||||
|
|
||||||
|
indexTermDestroy(term);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
class IndexObj {
|
class IndexObj {
|
||||||
|
@ -678,13 +696,16 @@ class IndexObj {
|
||||||
|
|
||||||
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
|
||||||
if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; }
|
if (Search(mq, result) == 0) { std::cout << "search one successfully" << std::endl; }
|
||||||
return taosArrayGetSize(result);
|
int sz = taosArrayGetSize(result);
|
||||||
|
indexMultiTermQueryDestroy(mq);
|
||||||
|
taosArrayDestroy(result);
|
||||||
|
return sz;
|
||||||
// assert(taosArrayGetSize(result) == targetSize);
|
// assert(taosArrayGetSize(result) == targetSize);
|
||||||
}
|
}
|
||||||
void PutOne(const std::string& colName, const std::string& colVal) {
|
void PutOne(const std::string& colName, const std::string& colVal) {
|
||||||
|
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||||
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||||
colVal.c_str(), colVal.size());
|
colVal.c_str(), colVal.size());
|
||||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
|
||||||
indexMultiTermAdd(terms, term);
|
indexMultiTermAdd(terms, term);
|
||||||
Put(terms, 10);
|
Put(terms, 10);
|
||||||
indexMultiTermDestroy(terms);
|
indexMultiTermDestroy(terms);
|
||||||
|
@ -783,18 +804,21 @@ TEST_F(IndexEnv2, testIndexOpen) {
|
||||||
index->Search(mq, result);
|
index->Search(mq, result);
|
||||||
std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
|
std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
|
||||||
assert(taosArrayGetSize(result) == 400);
|
assert(taosArrayGetSize(result) == 400);
|
||||||
|
taosArrayDestroy(result);
|
||||||
|
indexMultiTermQueryDestroy(mq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
|
TEST_F(IndexEnv2, testIndex_TrigeFlush) {
|
||||||
std::string path = "/tmp/test";
|
std::string path = "/tmp/test1";
|
||||||
if (index->Init(path) != 0) {
|
if (index->Init(path) != 0) {
|
||||||
// r
|
// r
|
||||||
std::cout << "failed to init" << std::endl;
|
std::cout << "failed to init" << std::endl;
|
||||||
}
|
}
|
||||||
int numOfTable = 100 * 10000;
|
int numOfTable = 100 * 10000;
|
||||||
index->WriteMillonData("tag1", "Hello", numOfTable);
|
index->WriteMillonData("tag1", "Hello Wolrd", numOfTable);
|
||||||
int target = index->SearchOne("tag1", "Hello");
|
int target = index->SearchOne("tag1", "Hello Wolrd");
|
||||||
|
std::cout << "Get Index: " << target << std::endl;
|
||||||
assert(numOfTable == target);
|
assert(numOfTable == target);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -821,14 +845,6 @@ TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) {
|
||||||
threads[i].join();
|
threads[i].join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
TEST_F(IndexEnv2, testIndex_multi_thread_write) {
|
|
||||||
std::string path = "/tmp";
|
|
||||||
if (index->Init(path) != 0) {}
|
|
||||||
}
|
|
||||||
TEST_F(IndexEnv2, testIndex_multi_thread_read) {
|
|
||||||
std::string path = "/tmp";
|
|
||||||
if (index->Init(path) != 0) {}
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(IndexEnv2, testIndex_restart) {
|
TEST_F(IndexEnv2, testIndex_restart) {
|
||||||
std::string path = "/tmp";
|
std::string path = "/tmp";
|
||||||
|
|
Loading…
Reference in New Issue