Merge pull request #9615 from taosdata/feature/index_complete

refactor code
This commit is contained in:
Shengliang Guan 2022-01-05 19:36:16 +08:00 committed by GitHub
commit d10a47330b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 7 additions and 62 deletions

View File

@ -452,10 +452,6 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) {
while (tn == true) { while (tn == true) {
IterateValue* tv = tfileIter->getValue(tfileIter); IterateValue* tv = tfileIter->getValue(tfileIter);
TFileValue* tfv = tfileValueCreate(tv->colVal); TFileValue* tfv = tfileValueCreate(tv->colVal);
if (tv->val == NULL) {
// HO
printf("NO....");
}
taosArrayAddAll(tfv->tableId, tv->val); taosArrayAddAll(tfv->tableId, tv->val);
indexMergeSameKey(result, tfv); indexMergeSameKey(result, tfv);
tn = tfileIter->next(tfileIter); tn = tfileIter->next(tfileIter);
@ -502,8 +498,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
if (reader == NULL) { goto END; } if (reader == NULL) { goto END; }
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = { ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName), .colType = header->colType};
pthread_mutex_lock(&sIdx->mtx); pthread_mutex_lock(&sIdx->mtx);
IndexTFile* ifile = (IndexTFile*)sIdx->tindex; IndexTFile* ifile = (IndexTFile*)sIdx->tindex;

View File

@ -21,10 +21,6 @@
#define MAX_INDEX_KEY_LEN 256 // test only, change later #define MAX_INDEX_KEY_LEN 256 // test only, change later
#define MEM_TERM_LIMIT 10 * 10000 #define MEM_TERM_LIMIT 10 * 10000
// ref index_cache.h:22
//#define CACHE_KEY_LEN(p) \
// (sizeof(int32_t) + sizeof(uint16_t) + sizeof(p->colType) + sizeof(p->nColVal) + p->nColVal + sizeof(uint64_t) +
// sizeof(p->operType))
static void indexMemRef(MemTable* tbl); static void indexMemRef(MemTable* tbl);
static void indexMemUnRef(MemTable* tbl); static void indexMemUnRef(MemTable* tbl);
@ -275,14 +271,12 @@ int indexCacheSearch(void* cache, SIndexTermQuery* query, SArray* result, STermV
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
EIndexQueryType qtype = query->qType; EIndexQueryType qtype = query->qType;
CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)}; CacheTerm ct = {.colVal = term->colVal, .version = atomic_load_32(&pCache->version)};
// indexCacheDebug(pCache);
int ret = indexQueryMem(mem, &ct, qtype, result, s); int ret = indexQueryMem(mem, &ct, qtype, result, s);
if (ret == 0 && *s != kTypeDeletion) { if (ret == 0 && *s != kTypeDeletion) {
// continue search in imm // continue search in imm
ret = indexQueryMem(imm, &ct, qtype, result, s); ret = indexQueryMem(imm, &ct, qtype, result, s);
} }
// cacheTermDestroy(ct);
indexMemUnRef(mem); indexMemUnRef(mem);
indexMemUnRef(imm); indexMemUnRef(imm);
@ -354,9 +348,6 @@ static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListIterator* iter = itera->iter; SSkipListIterator* iter = itera->iter;
if (iter == NULL) { return false; } if (iter == NULL) { return false; }
IterateValue* iv = &itera->val; IterateValue* iv = &itera->val;
if (iv->colVal != NULL && iv->val != NULL) {
// indexError("value in cache: colVal: %s, size: %d", iv->colVal, (int)taosArrayGetSize(iv->val));
}
iterateValueDestroy(iv, false); iterateValueDestroy(iv, false);
bool next = tSkipListIterNext(iter); bool next = tSkipListIterNext(iter);

View File

@ -61,9 +61,6 @@ TFileCache* tfileCacheCreate(const char* path) {
tcache->capacity = 64; tcache->capacity = 64;
SArray* files = tfileGetFileList(path); SArray* files = tfileGetFileList(path);
uint64_t suid;
int32_t colId, version;
for (size_t i = 0; i < taosArrayGetSize(files); i++) { for (size_t i = 0; i < taosArrayGetSize(files); i++) {
char* file = taosArrayGetP(files, i); char* file = taosArrayGetP(files, i);
@ -76,10 +73,9 @@ TFileCache* tfileCacheCreate(const char* path) {
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
if (reader == NULL) { goto End; } if (reader == NULL) { goto End; }
TFileHeader* header = &reader->header; TFileHeader* header = &reader->header;
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
char buf[128] = {0}; char buf[128] = {0};
ICacheKey key = {.suid = header->suid, .colName = header->colName, .nColName = strlen(header->colName)};
int32_t sz = indexSerialCacheKey(&key, buf); int32_t sz = indexSerialCacheKey(&key, buf);
assert(sz < sizeof(buf)); assert(sz < sizeof(buf));
taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*)); taosHashPut(tcache->tableCache, buf, sz, &reader, sizeof(void*));
@ -212,24 +208,13 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c
tfileGenFileFullName(fullname, path, suid, colName, version); tfileGenFileFullName(fullname, path, suid, colName, version);
WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024);
// indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size); indexInfo("open read file name:%s, size: %d", wc->file.buf, wc->file.size);
if (wc == NULL) { return NULL; } if (wc == NULL) { return NULL; }
TFileReader* reader = tfileReaderCreate(wc); TFileReader* reader = tfileReaderCreate(wc);
return reader; return reader;
} }
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
// char pathBuf[128] = {0};
// sprintf(pathBuf, "%s/% " PRIu64 "-%d-%d.tindex", path, suid, colId, version);
// TFileHeader header = {.suid = suid, .version = version, .colName = {0}, colType = colType};
// memcpy(header.colName, );
// char buf[TFILE_HADER_PRE_SIZE];
// int len = TFILE_HADER_PRE_SIZE;
// if (len != ctx->write(ctx, buf, len)) {
// indexError("index: %" PRIu64 " failed to write header info", header->suid);
// return NULL;
//}
TFileWriter* tw = calloc(1, sizeof(TFileWriter)); TFileWriter* tw = calloc(1, sizeof(TFileWriter));
if (tw == NULL) { if (tw == NULL) {
indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid); indexError("index: %" PRIu64 " failed to alloc TFilerWriter", header->suid);
@ -278,34 +263,14 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
// check buf has enough space or not // check buf has enough space or not
int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz); int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);
// if (offset + ttsz >= bufLimit) {
// // batch write
// indexInfo("offset: %d, ttsz: %d", offset, ttsz);
// // std::cout << "offset: " << offset << std::endl;
// // std::cout << "ttsz:" << ttsz < < < std::endl;
// tw->ctx->write(tw->ctx, buf, offset);
// offset = 0;
// memset(buf, 0, bufLimit);
// p = buf;
//}
// if (ttsz >= bufLimit) {
//}
char* buf = calloc(1, ttsz * sizeof(char)); char* buf = calloc(1, ttsz * sizeof(char));
char* p = buf; char* p = buf;
tfileSerialTableIdsToBuf(p, v->tableId); tfileSerialTableIdsToBuf(p, v->tableId);
tw->ctx->write(tw->ctx, buf, ttsz); tw->ctx->write(tw->ctx, buf, ttsz);
// offset += ttsz;
// p = buf + offset;
// set up value offset
v->offset = tw->offset; v->offset = tw->offset;
tw->offset += ttsz; tw->offset += ttsz;
free(buf); free(buf);
} }
// if (offset != 0) {
// write reversed data in buf to tindex
// tw->ctx->write(tw->ctx, buf, offset);
//}
// tfree(buf);
tw->fb = fstBuilderCreate(tw->ctx, 0); tw->fb = fstBuilderCreate(tw->ctx, 0);
if (tw->fb == NULL) { if (tw->fb == NULL) {
@ -643,15 +608,9 @@ static void tfileDestroyFileName(void* elem) {
free(p); free(p);
} }
static int tfileCompare(const void* a, const void* b) { static int tfileCompare(const void* a, const void* b) {
const char* aName = *(char**)a; const char* as = *(char**)a;
const char* bName = *(char**)b; const char* bs = *(char**)b;
return strcmp(as, bs);
size_t aLen = strlen(aName);
size_t bLen = strlen(bName);
int ret = strncmp(aName, bName, aLen > bLen ? aLen : bLen);
if (ret == 0) { return ret; }
return ret < 0 ? -1 : 1;
} }
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) { static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) {