Merge pull request #9422 from taosdata/feature/index_cache
complete index write/search
This commit is contained in:
commit
968212911a
|
@ -51,6 +51,8 @@ struct SIndex {
|
|||
int64_t suid; // current super table id, -1 is normal table
|
||||
int32_t cVersion; // current version allocated to cache
|
||||
|
||||
char* path;
|
||||
|
||||
SIndexStat stat;
|
||||
pthread_mutex_t mtx;
|
||||
};
|
||||
|
@ -87,12 +89,23 @@ typedef struct SIndexTermQuery {
|
|||
EIndexQueryType qType;
|
||||
} SIndexTermQuery;
|
||||
|
||||
typedef struct Iterate {
|
||||
void* iter;
|
||||
typedef struct Iterate Iterate;
|
||||
|
||||
typedef struct IterateValue {
|
||||
int8_t type;
|
||||
char* colVal;
|
||||
SArray* val;
|
||||
} IterateValue;
|
||||
|
||||
typedef struct Iterate {
|
||||
void* iter;
|
||||
IterateValue val;
|
||||
bool (*next)(Iterate* iter);
|
||||
IterateValue* (*getValue)(Iterate* iter);
|
||||
} Iterate;
|
||||
|
||||
void iterateValueDestroy(IterateValue* iv, bool destroy);
|
||||
|
||||
extern void* indexQhandle;
|
||||
|
||||
int indexFlushCacheTFile(SIndex* sIdx, void*);
|
||||
|
|
|
@ -41,6 +41,7 @@ typedef struct IndexCache {
|
|||
|
||||
} IndexCache;
|
||||
|
||||
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
|
||||
typedef struct CacheTerm {
|
||||
// key
|
||||
int32_t nColVal;
|
||||
|
@ -57,6 +58,9 @@ IndexCache* indexCacheCreate(SIndex* idx, const char* colName, int8_t type);
|
|||
|
||||
void indexCacheDestroy(void* cache);
|
||||
|
||||
Iterate* indexCacheIteratorCreate(IndexCache* cache);
|
||||
void indexCacheIteratorDestroy(Iterate* iiter);
|
||||
|
||||
int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid);
|
||||
|
||||
// int indexCacheGet(void *cache, uint64_t *rst);
|
||||
|
@ -66,6 +70,8 @@ void indexCacheRef(IndexCache* cache);
|
|||
void indexCacheUnRef(IndexCache* cache);
|
||||
|
||||
void indexCacheDebug(IndexCache* cache);
|
||||
|
||||
void indexCacheDestroySkiplist(SSkipList* slt);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -319,6 +319,8 @@ bool streamWithStateSeekMin(StreamWithState* sws, FstBoundWithData* min);
|
|||
StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallback callback);
|
||||
|
||||
FstStreamBuilder* fstStreamBuilderCreate(Fst* fst, AutomationCtx* aut);
|
||||
|
||||
void fstStreamBuilderDestroy(FstStreamBuilder* b);
|
||||
// set up bound range
|
||||
// refator, simple code by marco
|
||||
|
||||
|
|
|
@ -113,6 +113,8 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArr
|
|||
void tfileReaderRef(TFileReader* reader);
|
||||
void tfileReaderUnRef(TFileReader* reader);
|
||||
|
||||
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type);
|
||||
void tfileWriteClose(TFileWriter* tw);
|
||||
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
|
||||
void tfileWriterDestroy(TFileWriter* tw);
|
||||
int tfileWriterPut(TFileWriter* tw, void* data);
|
||||
|
@ -123,6 +125,14 @@ IndexTFile* indexTFileCreate(const char* path);
|
|||
int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid);
|
||||
int indexTFileSearch(void* tfile, SIndexTermQuery* query, SArray* result);
|
||||
|
||||
Iterate* tfileIteratorCreate(TFileReader* reader);
|
||||
void tfileIteratorDestroy(Iterate* iterator);
|
||||
|
||||
TFileValue* tfileValueCreate(char* val);
|
||||
|
||||
int tfileValuePush(TFileValue* tf, uint64_t val);
|
||||
void tfileValueDestroy(TFileValue* tf);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
||||
|
|
|
@ -75,9 +75,12 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
|||
sIdx->tindex = indexTFileCreate(path);
|
||||
sIdx->colObj = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
sIdx->cVersion = 1;
|
||||
sIdx->path = calloc(1, strlen(path) + 1);
|
||||
memcpy(sIdx->path, path, strlen(path));
|
||||
pthread_mutex_init(&sIdx->mtx, NULL);
|
||||
|
||||
*index = sIdx;
|
||||
|
||||
return 0;
|
||||
#endif
|
||||
|
||||
|
@ -361,14 +364,96 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
|
||||
if (sIdx == NULL) { return -1; }
|
||||
indexWarn("suid %" PRIu64 " merge cache into tindex", sIdx->suid);
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
|
||||
IndexCache* pCache = (IndexCache*)cache;
|
||||
TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->colName);
|
||||
|
||||
// handle flush
|
||||
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
|
||||
Iterate* tfileIter = tfileIteratorCreate(pReader);
|
||||
|
||||
SArray* result = taosArrayInit(1024, sizeof(void*));
|
||||
|
||||
bool cn = cacheIter->next(cacheIter);
|
||||
bool tn = tfileIter->next(tfileIter);
|
||||
while (cn == true && tn == true) {
|
||||
IterateValue* cv = cacheIter->getValue(cacheIter);
|
||||
IterateValue* tv = tfileIter->getValue(tfileIter);
|
||||
|
||||
// dump value
|
||||
int comp = strcmp(cv->colVal, tv->colVal);
|
||||
if (comp == 0) {
|
||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, cv->val);
|
||||
taosArrayAddAll(tfv->tableId, tv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
|
||||
cn = cacheIter->next(cacheIter);
|
||||
tn = tfileIter->next(tfileIter);
|
||||
continue;
|
||||
} else if (comp < 0) {
|
||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, cv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
|
||||
// copy to final Result;
|
||||
cn = cacheIter->next(cacheIter);
|
||||
} else {
|
||||
TFileValue* tfv = tfileValueCreate(tv->colVal);
|
||||
taosArrayPush(result, &tfv);
|
||||
taosArrayAddAll(tfv->tableId, tv->val);
|
||||
// copy to final result
|
||||
tn = tfileIter->next(tfileIter);
|
||||
}
|
||||
}
|
||||
while (cn == true) {
|
||||
IterateValue* cv = cacheIter->getValue(cacheIter);
|
||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, cv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
cn = cacheIter->next(cacheIter);
|
||||
}
|
||||
while (tn == true) {
|
||||
IterateValue* tv = tfileIter->getValue(tfileIter);
|
||||
TFileValue* tfv = tfileValueCreate(tv->colVal);
|
||||
taosArrayAddAll(tfv->tableId, tv->val);
|
||||
taosArrayPush(result, &tfv);
|
||||
tn = tfileIter->next(tfileIter);
|
||||
}
|
||||
|
||||
int32_t version = CACHE_VERSION(pCache);
|
||||
uint8_t colType = pCache->type;
|
||||
|
||||
TFileWriter* tw = tfileWriterOpen(sIdx->path, sIdx->suid, version, pCache->colName, colType);
|
||||
if (tw == NULL) {
|
||||
indexError("faile to open file to write");
|
||||
} else {
|
||||
int ret = tfileWriterPut(tw, result);
|
||||
if (ret != 0) { indexError("faile to write into tindex "); }
|
||||
}
|
||||
// not free later, just put int table cache
|
||||
SSkipList* timm = (SSkipList*)pCache->imm;
|
||||
pCache->imm = NULL; // or throw int bg thread
|
||||
indexCacheDestroySkiplist(timm);
|
||||
|
||||
tfileWriteClose(tw);
|
||||
indexCacheIteratorDestroy(cacheIter);
|
||||
tfileIteratorDestroy(tfileIter);
|
||||
|
||||
tfileReaderUnRef(pReader);
|
||||
indexCacheUnRef(pCache);
|
||||
return 0;
|
||||
}
|
||||
void iterateValueDestroy(IterateValue* value, bool destroy) {
|
||||
if (destroy) {
|
||||
taosArrayDestroy(value->val);
|
||||
} else {
|
||||
taosArrayClear(value->val);
|
||||
}
|
||||
free(value->colVal);
|
||||
value->colVal = NULL;
|
||||
}
|
||||
|
|
|
@ -94,6 +94,16 @@ void indexCacheDebug(IndexCache* cache) {
|
|||
tSkipListDestroyIter(iter);
|
||||
}
|
||||
|
||||
void indexCacheDestroySkiplist(SSkipList* slt) {
|
||||
SSkipListIterator* iter = tSkipListCreateIter(slt);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
if (ct != NULL) {}
|
||||
}
|
||||
tSkipListDestroyIter(iter);
|
||||
}
|
||||
|
||||
void indexCacheDestroy(void* cache) {
|
||||
IndexCache* pCache = cache;
|
||||
if (pCache == NULL) { return; }
|
||||
|
@ -108,6 +118,48 @@ static void doMergeWork(SSchedMsg* msg) {
|
|||
SIndex* sidx = (SIndex*)pCache->index;
|
||||
indexFlushCacheTFile(sidx, pCache);
|
||||
}
|
||||
static bool indexCacheIteratorNext(Iterate* itera) {
|
||||
SSkipListIterator* iter = itera->iter;
|
||||
if (iter == NULL) { return false; }
|
||||
|
||||
IterateValue* iv = &itera->val;
|
||||
iterateValueDestroy(iv, false);
|
||||
|
||||
bool next = tSkipListIterNext(iter);
|
||||
if (next) {
|
||||
SSkipListNode* node = tSkipListIterGet(iter);
|
||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||
|
||||
iv->type = ct->operaType;
|
||||
iv->colVal = ct->colVal;
|
||||
|
||||
taosArrayPush(iv->val, &ct->uid);
|
||||
}
|
||||
|
||||
return next;
|
||||
}
|
||||
|
||||
static IterateValue* indexCacheIteratorGetValue(Iterate* iter) {
|
||||
return &iter->val;
|
||||
}
|
||||
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
|
||||
Iterate* iiter = calloc(1, sizeof(Iterate));
|
||||
if (iiter == NULL) { return NULL; }
|
||||
|
||||
iiter->val.val = taosArrayInit(1, sizeof(uint64_t));
|
||||
iiter->iter = cache->imm != NULL ? tSkipListCreateIter(cache->imm) : NULL;
|
||||
iiter->next = indexCacheIteratorNext;
|
||||
iiter->getValue = indexCacheIteratorGetValue;
|
||||
|
||||
return iiter;
|
||||
}
|
||||
void indexCacheIteratorDestroy(Iterate* iter) {
|
||||
if (iter == NULL) { return; }
|
||||
|
||||
tSkipListDestroyIter(iter->iter);
|
||||
iterateValueDestroy(&iter->val, true);
|
||||
free(iter);
|
||||
}
|
||||
|
||||
int indexCacheSchedToMerge(IndexCache* pCache) {
|
||||
SSchedMsg schedMsg = {0};
|
||||
|
|
|
@ -23,6 +23,13 @@
|
|||
#include "taosdef.h"
|
||||
#include "tcompare.h"
|
||||
|
||||
typedef struct TFileFstIter {
|
||||
FstStreamBuilder* fb;
|
||||
StreamWithState* st;
|
||||
AutomationCtx* ctx;
|
||||
TFileReader* rdr;
|
||||
} TFileFstIter;
|
||||
|
||||
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))
|
||||
|
||||
static int tfileStrCompare(const void* a, const void* b);
|
||||
|
@ -184,6 +191,23 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* resul
|
|||
return ret;
|
||||
}
|
||||
|
||||
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
|
||||
char filename[128] = {0};
|
||||
int32_t coldId = 1;
|
||||
tfileGenFileName(filename, suid, coldId, version);
|
||||
|
||||
char fullname[256] = {0};
|
||||
snprintf(fullname, sizeof(fullname), "%s/%s", path, filename);
|
||||
WriterCtx* wcx = writerCtxCreate(TFile, fullname, true, 1024 * 1024);
|
||||
|
||||
TFileHeader tfh = {0};
|
||||
tfh.suid = suid;
|
||||
tfh.version = version;
|
||||
memcpy(tfh.colName, colName, strlen(colName));
|
||||
tfh.colType = colType;
|
||||
|
||||
return tfileWriterCreate(wcx, &tfh);
|
||||
}
|
||||
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
|
||||
// char pathBuf[128] = {0};
|
||||
// sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
|
||||
|
@ -279,6 +303,11 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
|
|||
tw->fb = NULL;
|
||||
return 0;
|
||||
}
|
||||
void tfileWriteClose(TFileWriter* tw) {
|
||||
if (tw == NULL) { return; }
|
||||
writerCtxDestroy(tw->ctx);
|
||||
free(tw);
|
||||
}
|
||||
void tfileWriterDestroy(TFileWriter* tw) {
|
||||
if (tw == NULL) { return; }
|
||||
|
||||
|
@ -314,6 +343,71 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
static bool tfileIteratorNext(Iterate* iiter) {
|
||||
IterateValue* iv = &iiter->val;
|
||||
iterateValueDestroy(iv, false);
|
||||
// SArray* tblIds = iv->val;
|
||||
|
||||
char* colVal = NULL;
|
||||
uint64_t offset = 0;
|
||||
|
||||
TFileFstIter* tIter = iiter->iter;
|
||||
StreamWithStateResult* rt = streamWithStateNextWith(tIter->st, NULL);
|
||||
if (rt == NULL) { return false; }
|
||||
|
||||
int32_t sz = 0;
|
||||
char* ch = (char*)fstSliceData(&rt->data, &sz);
|
||||
colVal = calloc(1, sz + 1);
|
||||
memcpy(colVal, ch, sz);
|
||||
|
||||
offset = (uint64_t)(rt->out.out);
|
||||
|
||||
swsResultDestroy(rt);
|
||||
// set up iterate value
|
||||
if (tfileReaderLoadTableIds(tIter->rdr, offset, iv->val) != 0) { return false; }
|
||||
|
||||
iv->colVal = colVal;
|
||||
|
||||
// std::string key(ch, sz);
|
||||
}
|
||||
|
||||
static IterateValue* tifileIterateGetValue(Iterate* iter) {
|
||||
return &iter->val;
|
||||
}
|
||||
|
||||
static TFileFstIter* tfileFstIteratorCreate(TFileReader* reader) {
|
||||
TFileFstIter* tIter = calloc(1, sizeof(Iterate));
|
||||
if (tIter == NULL) { return NULL; }
|
||||
tIter->ctx = automCtxCreate(NULL, AUTOMATION_ALWAYS);
|
||||
tIter->fb = fstSearch(reader->fst, tIter->ctx);
|
||||
tIter->st = streamBuilderIntoStream(tIter->fb);
|
||||
tIter->rdr = reader;
|
||||
return tIter;
|
||||
}
|
||||
|
||||
Iterate* tfileIteratorCreate(TFileReader* reader) {
|
||||
Iterate* iter = calloc(1, sizeof(Iterate));
|
||||
|
||||
iter->iter = tfileFstIteratorCreate(reader);
|
||||
if (iter->iter == NULL) { return NULL; }
|
||||
|
||||
iter->next = tfileIteratorNext;
|
||||
iter->getValue = tifileIterateGetValue;
|
||||
return iter;
|
||||
}
|
||||
void tfileIteratorDestroy(Iterate* iter) {
|
||||
if (iter == NULL) { return; }
|
||||
IterateValue* iv = &iter->val;
|
||||
iterateValueDestroy(iv, true);
|
||||
|
||||
TFileFstIter* tIter = iter->iter;
|
||||
streamWithStateDestroy(tIter->st);
|
||||
fstStreamBuilderDestroy(tIter->fb);
|
||||
automCtxDestroy(tIter->ctx);
|
||||
|
||||
free(iter);
|
||||
}
|
||||
|
||||
TFileReader* tfileGetReaderByCol(IndexTFile* tf, char* colName) {
|
||||
if (tf == NULL) { return NULL; }
|
||||
TFileCacheKey key = {.suid = 0, .colType = TSDB_DATA_TYPE_BINARY, .colName = colName, .nColName = strlen(colName)};
|
||||
|
@ -334,6 +428,23 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
|||
|
||||
return fn(av->colVal, bv->colVal);
|
||||
}
|
||||
|
||||
TFileValue* tfileValueCreate(char* val) {
|
||||
TFileValue* tf = calloc(1, sizeof(TFileValue));
|
||||
if (tf == NULL) { return NULL; }
|
||||
|
||||
tf->tableId = taosArrayInit(32, sizeof(uint64_t));
|
||||
return tf;
|
||||
}
|
||||
int tfileValuePush(TFileValue* tf, uint64_t val) {
|
||||
if (tf == NULL) { return -1; }
|
||||
taosArrayPush(tf->tableId, &val);
|
||||
return 0;
|
||||
}
|
||||
void tfileValueDestroy(TFileValue* tf) {
|
||||
taosArrayDestroy(tf->tableId);
|
||||
free(tf);
|
||||
}
|
||||
static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
|
||||
int sz = taosArrayGetSize(ids);
|
||||
SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
|
||||
|
|
Loading…
Reference in New Issue