profile performance
This commit is contained in:
parent
cdaf97cb0e
commit
f9d83c044d
|
@ -49,9 +49,7 @@ int metaOpenIdx(SMeta *pMeta) {
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
SIndexOpts opts;
|
SIndexOpts opts;
|
||||||
if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) {
|
if (indexOpen(&opts, pMeta->path, &pMeta->pIdx->pIdx) != 0) { return -1; }
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -67,16 +65,14 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */
|
||||||
|
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
SIndexOpts opts;
|
SIndexOpts opts;
|
||||||
if (indexClose(pMeta->pIdx->pIdx) != 0) {
|
if (indexClose(pMeta->pIdx->pIdx) != 0) { return -1; }
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) {
|
int metaSaveTableToIdx(SMeta *pMeta, const STbCfg *pTbCfg) {
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
if (pTbCfgs - type == META_CHILD_TABLE) {
|
if (pTbCfgs->type == META_CHILD_TABLE) {
|
||||||
char buf[8] = {0};
|
char buf[8] = {0};
|
||||||
int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
|
int16_t colId = (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId;
|
||||||
sprintf(buf, "%d", colId); // colname
|
sprintf(buf, "%d", colId); // colname
|
||||||
|
|
|
@ -59,6 +59,10 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
|
||||||
|
|
||||||
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
|
||||||
|
|
||||||
|
// merge cache and tfile by opera type
|
||||||
|
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv);
|
||||||
|
static void indexMergeSameKey(SArray* result, TFileValue* tv);
|
||||||
|
|
||||||
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
|
||||||
pthread_once(&isInit, indexInit);
|
pthread_once(&isInit, indexInit);
|
||||||
SIndex* sIdx = calloc(1, sizeof(SIndex));
|
SIndex* sIdx = calloc(1, sizeof(SIndex));
|
||||||
|
@ -385,6 +389,27 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
|
||||||
taosArrayPush(result, &tv);
|
taosArrayPush(result, &tv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv) {
|
||||||
|
// opt
|
||||||
|
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
|
||||||
|
// design merge-algorithm later, too complicated to handle all kind of situation
|
||||||
|
TFileValue* tfv = tfileValueCreate(colVal);
|
||||||
|
if (cv != NULL) {
|
||||||
|
if (cv->type == ADD_VALUE) {
|
||||||
|
taosArrayAddAll(tfv->tableId, cv->val);
|
||||||
|
} else if (cv->type == DEL_VALUE) {
|
||||||
|
} else if (cv->type == UPDATE_VALUE) {
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (tv != NULL) {
|
||||||
|
// opt later
|
||||||
|
taosArrayAddAll(tfv->tableId, tv->val);
|
||||||
|
}
|
||||||
|
|
||||||
|
indexMergeSameKey(result, tfv);
|
||||||
|
}
|
||||||
static void indexDestroyTempResult(SArray* result) {
|
static void indexDestroyTempResult(SArray* result) {
|
||||||
int32_t sz = result ? taosArrayGetSize(result) : 0;
|
int32_t sz = result ? taosArrayGetSize(result) : 0;
|
||||||
for (size_t i = 0; i < sz; i++) {
|
for (size_t i = 0; i < sz; i++) {
|
||||||
|
@ -411,51 +436,30 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
|
||||||
|
|
||||||
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
|
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
|
||||||
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
|
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
|
||||||
while (cn == true && tn == true) {
|
while (cn == true || tn == true) {
|
||||||
IterateValue* cv = cacheIter->getValue(cacheIter);
|
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
|
||||||
IterateValue* tv = tfileIter->getValue(tfileIter);
|
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
|
||||||
|
|
||||||
// dump value
|
int comp = 0;
|
||||||
int comp = strcmp(cv->colVal, tv->colVal);
|
if (cn == true && tn == true) {
|
||||||
|
comp = strcmp(cv->colVal, tv->colVal);
|
||||||
|
} else if (cn == true) {
|
||||||
|
comp = -1;
|
||||||
|
} else {
|
||||||
|
comp = 1;
|
||||||
|
}
|
||||||
if (comp == 0) {
|
if (comp == 0) {
|
||||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
indexMergeCacheAndTFile(result, cv, tv);
|
||||||
taosArrayAddAll(tfv->tableId, cv->val);
|
|
||||||
taosArrayAddAll(tfv->tableId, tv->val);
|
|
||||||
indexMergeSameKey(result, tfv);
|
|
||||||
|
|
||||||
cn = cacheIter->next(cacheIter);
|
cn = cacheIter->next(cacheIter);
|
||||||
tn = tfileIter->next(tfileIter);
|
tn = tfileIter->next(tfileIter);
|
||||||
continue;
|
|
||||||
} else if (comp < 0) {
|
} else if (comp < 0) {
|
||||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
indexMergeCacheAndTFile(result, cv, NULL);
|
||||||
taosArrayAddAll(tfv->tableId, cv->val);
|
|
||||||
|
|
||||||
indexMergeSameKey(result, tfv);
|
|
||||||
// copy to final Result;
|
|
||||||
cn = cacheIter->next(cacheIter);
|
cn = cacheIter->next(cacheIter);
|
||||||
} else {
|
} else {
|
||||||
TFileValue* tfv = tfileValueCreate(tv->colVal);
|
indexMergeCacheAndTFile(result, NULL, tv);
|
||||||
taosArrayAddAll(tfv->tableId, tv->val);
|
|
||||||
|
|
||||||
indexMergeSameKey(result, tfv);
|
|
||||||
// copy to final result
|
|
||||||
tn = tfileIter->next(tfileIter);
|
tn = tfileIter->next(tfileIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (cn == true) {
|
|
||||||
IterateValue* cv = cacheIter->getValue(cacheIter);
|
|
||||||
TFileValue* tfv = tfileValueCreate(cv->colVal);
|
|
||||||
taosArrayAddAll(tfv->tableId, cv->val);
|
|
||||||
indexMergeSameKey(result, tfv);
|
|
||||||
cn = cacheIter->next(cacheIter);
|
|
||||||
}
|
|
||||||
while (tn == true) {
|
|
||||||
IterateValue* tv = tfileIter->getValue(tfileIter);
|
|
||||||
TFileValue* tfv = tfileValueCreate(tv->colVal);
|
|
||||||
taosArrayAddAll(tfv->tableId, tv->val);
|
|
||||||
indexMergeSameKey(result, tfv);
|
|
||||||
tn = tfileIter->next(tfileIter);
|
|
||||||
}
|
|
||||||
int ret = indexGenTFile(sIdx, pCache, result);
|
int ret = indexGenTFile(sIdx, pCache, result);
|
||||||
indexDestroyTempResult(result);
|
indexDestroyTempResult(result);
|
||||||
|
|
||||||
|
@ -503,7 +507,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
tfileWriterClose(tw);
|
tfileWriterClose(tw);
|
||||||
|
|
||||||
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
|
TFileReader* reader = tfileReaderOpen(sIdx->path, cache->suid, version, cache->colName);
|
||||||
if (reader == NULL) { goto END; }
|
if (reader == NULL) { return -1; }
|
||||||
|
|
||||||
TFileHeader* header = &reader->header;
|
TFileHeader* header = &reader->header;
|
||||||
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
ICacheKey key = {.suid = cache->suid, .colName = header->colName, .nColName = strlen(header->colName)};
|
||||||
|
|
|
@ -217,9 +217,9 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
|
||||||
// set value
|
// set value
|
||||||
ct->uid = uid;
|
ct->uid = uid;
|
||||||
ct->operaType = term->operType;
|
ct->operaType = term->operType;
|
||||||
|
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
|
int64_t estimate = sizeof(ct) + strlen(ct->colVal);
|
||||||
|
|
||||||
pthread_mutex_lock(&pCache->mtx);
|
pthread_mutex_lock(&pCache->mtx);
|
||||||
pCache->occupiedMem += estimate;
|
pCache->occupiedMem += estimate;
|
||||||
indexCacheMakeRoomForWrite(pCache);
|
indexCacheMakeRoomForWrite(pCache);
|
||||||
|
@ -331,7 +331,6 @@ static char* indexCacheTermGet(const void* pData) {
|
||||||
static int32_t indexCacheTermCompare(const void* l, const void* r) {
|
static int32_t indexCacheTermCompare(const void* l, const void* r) {
|
||||||
CacheTerm* lt = (CacheTerm*)l;
|
CacheTerm* lt = (CacheTerm*)l;
|
||||||
CacheTerm* rt = (CacheTerm*)r;
|
CacheTerm* rt = (CacheTerm*)r;
|
||||||
|
|
||||||
// compare colVal
|
// compare colVal
|
||||||
int32_t cmp = strcmp(lt->colVal, rt->colVal);
|
int32_t cmp = strcmp(lt->colVal, rt->colVal);
|
||||||
if (cmp == 0) { return rt->version - lt->version; }
|
if (cmp == 0) { return rt->version - lt->version; }
|
||||||
|
@ -359,17 +358,32 @@ static bool indexCacheIteratorNext(Iterate* itera) {
|
||||||
IterateValue* iv = &itera->val;
|
IterateValue* iv = &itera->val;
|
||||||
iterateValueDestroy(iv, false);
|
iterateValueDestroy(iv, false);
|
||||||
|
|
||||||
|
// IterateValue* iv = &itera->val;
|
||||||
|
// IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))};
|
||||||
|
|
||||||
bool next = tSkipListIterNext(iter);
|
bool next = tSkipListIterNext(iter);
|
||||||
if (next) {
|
if (next) {
|
||||||
SSkipListNode* node = tSkipListIterGet(iter);
|
SSkipListNode* node = tSkipListIterGet(iter);
|
||||||
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
|
||||||
|
|
||||||
|
// equal func
|
||||||
|
// if (iv->colVal != NULL && ct->colVal != NULL) {
|
||||||
|
// if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) }
|
||||||
|
//} else {
|
||||||
|
// tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1);
|
||||||
|
// tIterval.colVal = tstrdup(ct->colVal);
|
||||||
|
//}
|
||||||
iv->type = ct->operaType;
|
iv->type = ct->operaType;
|
||||||
iv->colVal = calloc(1, strlen(ct->colVal) + 1);
|
iv->colVal = tstrdup(ct->colVal);
|
||||||
memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
|
// iv->colVal = calloc(1, strlen(ct->colVal) + 1);
|
||||||
|
// memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
|
||||||
|
|
||||||
taosArrayPush(iv->val, &ct->uid);
|
taosArrayPush(iv->val, &ct->uid);
|
||||||
}
|
}
|
||||||
|
// IterateValue* iv = &itera->val;
|
||||||
|
// iterateValueDestroy(iv, true);
|
||||||
|
//*iv = tIterVal;
|
||||||
|
|
||||||
return next;
|
return next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -936,6 +936,7 @@ Fst* fstCreate(FstSlice* slice) {
|
||||||
len -= sizeof(checkSum);
|
len -= sizeof(checkSum);
|
||||||
taosDecodeFixedU32(buf + len, &checkSum);
|
taosDecodeFixedU32(buf + len, &checkSum);
|
||||||
if (taosCheckChecksum(buf, len, checkSum)) {
|
if (taosCheckChecksum(buf, len, checkSum)) {
|
||||||
|
indexError("index file is corrupted");
|
||||||
// verify fst
|
// verify fst
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,9 +60,10 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
|
||||||
return nRead;
|
return nRead;
|
||||||
}
|
}
|
||||||
static int writeCtxGetSize(WriterCtx* ctx) {
|
static int writeCtxGetSize(WriterCtx* ctx) {
|
||||||
if (ctx->type == TFile && ctx->file.readOnly) {
|
if (ctx->type == TFile) {
|
||||||
// refactor later
|
struct stat fstat;
|
||||||
return ctx->file.size;
|
stat(ctx->file.buf, &fstat);
|
||||||
|
return fstat.st_size;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -88,7 +89,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
if (readOnly == false) {
|
if (readOnly == false) {
|
||||||
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
ctx->file.fd = tfOpenCreateWriteAppend(path);
|
ctx->file.fd = tfOpenCreateWriteAppend(path);
|
||||||
|
tfFtruncate(ctx->file.fd, 0);
|
||||||
struct stat fstat;
|
struct stat fstat;
|
||||||
stat(path, &fstat);
|
stat(path, &fstat);
|
||||||
ctx->file.size = fstat.st_size;
|
ctx->file.size = fstat.st_size;
|
||||||
|
@ -138,6 +139,11 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
||||||
munmap(ctx->file.ptr, ctx->file.size);
|
munmap(ctx->file.ptr, ctx->file.size);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
if (ctx->file.readOnly == false) {
|
||||||
|
struct stat fstat;
|
||||||
|
stat(ctx->file.buf, &fstat);
|
||||||
|
// indexError("write file size: %d", (int)(fstat.st_size));
|
||||||
|
}
|
||||||
if (remove) { unlink(ctx->file.buf); }
|
if (remove) { unlink(ctx->file.buf); }
|
||||||
}
|
}
|
||||||
free(ctx);
|
free(ctx);
|
||||||
|
|
|
@ -147,21 +147,22 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||||
reader->ctx = ctx;
|
reader->ctx = ctx;
|
||||||
|
|
||||||
if (0 != tfileReaderVerify(reader)) {
|
if (0 != tfileReaderVerify(reader)) {
|
||||||
tfileReaderDestroy(reader);
|
|
||||||
indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
|
indexError("invalid tfile, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
|
||||||
|
tfileReaderDestroy(reader);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
// T_REF_INC(reader);
|
// T_REF_INC(reader);
|
||||||
if (0 != tfileReaderLoadHeader(reader)) {
|
if (0 != tfileReaderLoadHeader(reader)) {
|
||||||
tfileReaderDestroy(reader);
|
|
||||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||||
reader->header.colName);
|
reader->header.colName);
|
||||||
|
tfileReaderDestroy(reader);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 != tfileReaderLoadFst(reader)) {
|
if (0 != tfileReaderLoadFst(reader)) {
|
||||||
|
indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s, errno: %d", reader->header.suid,
|
||||||
|
reader->header.colName, errno);
|
||||||
tfileReaderDestroy(reader);
|
tfileReaderDestroy(reader);
|
||||||
indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s", reader->header.suid, reader->header.colName);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,6 +304,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
|
||||||
} else {
|
} else {
|
||||||
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
|
||||||
// (int)taosArrayGetSize(v->tableId));
|
// (int)taosArrayGetSize(v->tableId));
|
||||||
|
|
||||||
|
// indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fstBuilderFinish(tw->fb);
|
fstBuilderFinish(tw->fb);
|
||||||
|
@ -485,7 +488,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
|
||||||
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
|
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
|
||||||
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
|
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
|
||||||
tw->header.fstOffset = fstOffset;
|
tw->header.fstOffset = fstOffset;
|
||||||
|
|
||||||
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
|
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
|
||||||
|
indexInfo("tfile write fst offset: %d", tw->ctx->size(tw->ctx));
|
||||||
tw->offset += sizeof(fstOffset);
|
tw->offset += sizeof(fstOffset);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -495,8 +500,11 @@ static int tfileWriteHeader(TFileWriter* writer) {
|
||||||
TFileHeader* header = &writer->header;
|
TFileHeader* header = &writer->header;
|
||||||
memcpy(buf, (char*)header, sizeof(buf));
|
memcpy(buf, (char*)header, sizeof(buf));
|
||||||
|
|
||||||
|
indexInfo("tfile pre write header size: %d", writer->ctx->size(writer->ctx));
|
||||||
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
|
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
|
||||||
if (sizeof(buf) != nwrite) { return -1; }
|
if (sizeof(buf) != nwrite) { return -1; }
|
||||||
|
|
||||||
|
indexInfo("tfile after write header size: %d", writer->ctx->size(writer->ctx));
|
||||||
writer->offset = nwrite;
|
writer->offset = nwrite;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -521,6 +529,8 @@ static int tfileWriteFooter(TFileWriter* write) {
|
||||||
void* pBuf = (void*)buf;
|
void* pBuf = (void*)buf;
|
||||||
taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
|
taosEncodeFixedU64((void**)(void*)&pBuf, tfileMagicNumber);
|
||||||
int nwrite = write->ctx->write(write->ctx, buf, strlen(buf));
|
int nwrite = write->ctx->write(write->ctx, buf, strlen(buf));
|
||||||
|
|
||||||
|
indexInfo("tfile write footer size: %d", write->ctx->size(write->ctx));
|
||||||
assert(nwrite == sizeof(tfileMagicNumber));
|
assert(nwrite == sizeof(tfileMagicNumber));
|
||||||
return nwrite;
|
return nwrite;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue