add update on index
This commit is contained in:
parent
e00ae4d86f
commit
c368e59689
|
@ -16,6 +16,7 @@
|
|||
#include "index.h"
|
||||
#include "indexInt.h"
|
||||
#include "index_cache.h"
|
||||
#include "index_comm.h"
|
||||
#include "index_tfile.h"
|
||||
#include "index_util.h"
|
||||
#include "tdef.h"
|
||||
|
@ -30,8 +31,6 @@
|
|||
|
||||
void* indexQhandle = NULL;
|
||||
|
||||
static char JSON_COLUMN[] = "JSON";
|
||||
|
||||
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
|
||||
{ \
|
||||
bool f = false; \
|
||||
|
@ -64,13 +63,11 @@ typedef struct SIdxColInfo {
|
|||
int cVersion;
|
||||
} SIdxColInfo;
|
||||
|
||||
typedef struct SIdxMergeHelper {
|
||||
char* colVal;
|
||||
typedef struct SIdxTempResult {
|
||||
SArray* total;
|
||||
SArray* added;
|
||||
SArray* deled;
|
||||
bool reset;
|
||||
} SIdxMergeHelper;
|
||||
} SIdxTempResult;
|
||||
|
||||
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
|
||||
// static void indexInit();
|
||||
|
@ -82,8 +79,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
|
|||
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
||||
|
||||
// merge cache and tfile by opera type
|
||||
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxMergeHelper* helper);
|
||||
static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper);
|
||||
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper);
|
||||
|
||||
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
|
||||
// int32_t indexSerialKey(ICacheKey* key, char* buf);
|
||||
|
@ -399,7 +395,6 @@ static void indexInterResultsDestroy(SArray* results) {
|
|||
|
||||
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
|
||||
// refactor, merge interResults into fResults by oType
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(interResults); i--) {
|
||||
SArray* t = taosArrayGetP(interResults, i);
|
||||
taosArraySort(t, uidCompare);
|
||||
|
@ -418,98 +413,82 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
|
|||
return 0;
|
||||
}
|
||||
|
||||
SIdxMergeHelper* sIdxMergeHelperCreate() {
|
||||
SIdxMergeHelper* hp = calloc(1, sizeof(SIdxMergeHelper));
|
||||
hp->total = taosArrayInit(4, sizeof(uint64_t));
|
||||
hp->added = taosArrayInit(4, sizeof(uint64_t));
|
||||
hp->deled = taosArrayInit(4, sizeof(uint64_t));
|
||||
hp->reset = false;
|
||||
return hp;
|
||||
SIdxTempResult* sIdxTempResultCreate() {
|
||||
SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult));
|
||||
tr->total = taosArrayInit(4, sizeof(uint64_t));
|
||||
tr->added = taosArrayInit(4, sizeof(uint64_t));
|
||||
tr->deled = taosArrayInit(4, sizeof(uint64_t));
|
||||
return tr;
|
||||
}
|
||||
void sIdxMergeHelperClear(SIdxMergeHelper* hp) {
|
||||
if (hp == NULL) {
|
||||
void sIdxTempResultClear(SIdxTempResult* tr) {
|
||||
if (tr == NULL) {
|
||||
return;
|
||||
}
|
||||
hp->reset = false;
|
||||
taosArrayClear(hp->total);
|
||||
taosArrayClear(hp->added);
|
||||
taosArrayClear(hp->deled);
|
||||
taosArrayClear(tr->total);
|
||||
taosArrayClear(tr->added);
|
||||
taosArrayClear(tr->deled);
|
||||
}
|
||||
void sIdxMergeHelperDestroy(SIdxMergeHelper* hp) {
|
||||
if (hp == NULL) {
|
||||
void sIdxTempResultDestroy(SIdxTempResult* tr) {
|
||||
if (tr == NULL) {
|
||||
return;
|
||||
}
|
||||
taosArrayDestroy(hp->total);
|
||||
taosArrayDestroy(hp->added);
|
||||
taosArrayDestroy(hp->deled);
|
||||
taosArrayDestroy(tr->total);
|
||||
taosArrayDestroy(tr->added);
|
||||
taosArrayDestroy(tr->deled);
|
||||
}
|
||||
static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper) {
|
||||
int32_t sz = result ? taosArrayGetSize(result) : 0;
|
||||
if (sz > 0) {
|
||||
// TODO(yihao): remove duplicate tableid
|
||||
TFileValue* lv = taosArrayGetP(result, sz - 1);
|
||||
// indexError("merge colVal: %s", lv->colVal);
|
||||
if (strcmp(lv->colVal, tv->colVal) == 0) {
|
||||
taosArrayAddAll(lv->tableId, tv->tableId);
|
||||
tfileValueDestroy(tv);
|
||||
} else {
|
||||
taosArrayPush(result, &tv);
|
||||
}
|
||||
} else {
|
||||
taosArrayPush(result, &tv);
|
||||
}
|
||||
}
|
||||
static void sIdxMergeResult(SArray* result, SIdxMergeHelper* mh) {
|
||||
taosArraySort(mh->total, uidCompare);
|
||||
taosArraySort(mh->added, uidCompare);
|
||||
taosArraySort(mh->deled, uidCompare);
|
||||
static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) {
|
||||
taosArraySort(tr->total, uidCompare);
|
||||
taosArraySort(tr->added, uidCompare);
|
||||
taosArraySort(tr->deled, uidCompare);
|
||||
|
||||
SArray* arrs = taosArrayInit(2, sizeof(void*));
|
||||
taosArrayPush(arrs, &mh->total);
|
||||
taosArrayPush(arrs, &mh->added);
|
||||
taosArrayPush(arrs, &tr->total);
|
||||
taosArrayPush(arrs, &tr->added);
|
||||
|
||||
iUnion(arrs, result);
|
||||
taosArrayDestroy(arrs);
|
||||
|
||||
iExcept(result, mh->deled);
|
||||
iExcept(result, tr->deled);
|
||||
}
|
||||
static void indexMayMergeToFinalResult(SArray* result, TFileValue* tfv, SIdxMergeHelper* help) {
|
||||
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
|
||||
int32_t sz = taosArrayGetSize(result);
|
||||
if (sz > 0) {
|
||||
TFileValue* lv = taosArrayGetP(result, sz - 1);
|
||||
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
|
||||
sIdxMergeResult(lv->tableId, help);
|
||||
sIdxMergeHelperClear(help);
|
||||
sIdxTempResultMergeTo(lv->tableId, tr);
|
||||
sIdxTempResultClear(tr);
|
||||
|
||||
taosArrayPush(result, &tfv);
|
||||
} else if (tfv == NULL) {
|
||||
sIdxMergeResult(lv->tableId, help);
|
||||
// handle last iterator
|
||||
sIdxTempResultMergeTo(lv->tableId, tr);
|
||||
} else {
|
||||
// temp result saved in help
|
||||
tfileValueDestroy(tfv);
|
||||
}
|
||||
} else {
|
||||
taosArrayPush(result, &tfv);
|
||||
}
|
||||
}
|
||||
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxMergeHelper* mh) {
|
||||
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) {
|
||||
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
|
||||
TFileValue* tfv = tfileValueCreate(colVal);
|
||||
|
||||
indexMayMergeToFinalResult(result, tfv, mh);
|
||||
indexMayMergeTempToFinalResult(result, tfv, tr);
|
||||
|
||||
if (cv != NULL) {
|
||||
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
|
||||
if (cv->type == ADD_VALUE) {
|
||||
INDEX_MERGE_ADD_DEL(mh->deled, mh->added, id)
|
||||
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
|
||||
} else if (cv->type == DEL_VALUE) {
|
||||
INDEX_MERGE_ADD_DEL(mh->added, mh->deled, id)
|
||||
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id)
|
||||
}
|
||||
}
|
||||
if (tv != NULL) {
|
||||
taosArrayAddAll(mh->total, tv->val);
|
||||
taosArrayAddAll(tr->total, tv->val);
|
||||
}
|
||||
}
|
||||
static void indexDestroyTempResult(SArray* result) {
|
||||
static void indexDestroyFinalResult(SArray* result) {
|
||||
int32_t sz = result ? taosArrayGetSize(result) : 0;
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
TFileValue* tv = taosArrayGetP(result, i);
|
||||
|
@ -543,7 +522,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
|||
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
|
||||
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
|
||||
|
||||
SIdxMergeHelper* help = sIdxMergeHelperCreate();
|
||||
SIdxTempResult* tr = sIdxTempResultCreate();
|
||||
while (cn == true || tn == true) {
|
||||
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
|
||||
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
|
||||
|
@ -557,21 +536,22 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
|||
comp = 1;
|
||||
}
|
||||
if (comp == 0) {
|
||||
indexMergeCacheAndTFile(result, cv, tv, help);
|
||||
indexMergeCacheAndTFile(result, cv, tv, tr);
|
||||
cn = cacheIter->next(cacheIter);
|
||||
tn = tfileIter->next(tfileIter);
|
||||
} else if (comp < 0) {
|
||||
indexMergeCacheAndTFile(result, cv, NULL, help);
|
||||
indexMergeCacheAndTFile(result, cv, NULL, tr);
|
||||
cn = cacheIter->next(cacheIter);
|
||||
} else {
|
||||
indexMergeCacheAndTFile(result, NULL, tv, help);
|
||||
indexMergeCacheAndTFile(result, NULL, tv, tr);
|
||||
tn = tfileIter->next(tfileIter);
|
||||
}
|
||||
}
|
||||
indexMayMergeToFinalResult(result, NULL, help);
|
||||
indexMayMergeTempToFinalResult(result, NULL, tr);
|
||||
sIdxTempResultDestroy(tr);
|
||||
|
||||
int ret = indexGenTFile(sIdx, pCache, result);
|
||||
indexDestroyTempResult(result);
|
||||
indexDestroyFinalResult(result);
|
||||
|
||||
indexCacheDestroyImm(pCache);
|
||||
|
||||
|
@ -581,8 +561,6 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
|||
tfileReaderUnRef(pReader);
|
||||
indexCacheUnRef(pCache);
|
||||
|
||||
sIdxMergeHelperDestroy(help);
|
||||
|
||||
int64_t cost = taosGetTimestampUs() - st;
|
||||
if (ret != 0) {
|
||||
indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
|
||||
|
|
|
@ -102,7 +102,6 @@ void tfileCacheDestroy(TFileCache* tcache) {
|
|||
if (tcache == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// free table cache
|
||||
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
|
||||
while (reader) {
|
||||
|
|
|
@ -105,6 +105,22 @@ TEST_F(JsonEnv, testWriteMillonData) {
|
|||
}
|
||||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
{
|
||||
std::string colName("voltagefdadfa");
|
||||
std::string colVal("abxxxxxxxxxxxx");
|
||||
for (uint i = 0; i < 1000; i++) {
|
||||
colVal[i % colVal.size()] = '0' + i % 128;
|
||||
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
|
||||
colVal.c_str(), colVal.size());
|
||||
|
||||
SIndexMultiTerm* terms = indexMultiTermCreate();
|
||||
indexMultiTermAdd(terms, term);
|
||||
for (size_t i = 0; i < 1000; i++) {
|
||||
tIndexJsonPut(index, terms, i);
|
||||
}
|
||||
indexMultiTermDestroy(terms);
|
||||
}
|
||||
}
|
||||
{
|
||||
std::string colName("voltagefdadfa");
|
||||
std::string colVal("abxxxxxxxxxxxx");
|
||||
|
|
Loading…
Reference in New Issue