Merge pull request #9278 from taosdata/feature/index_cache
refactor tindex write
This commit is contained in:
commit
317871c752
|
@ -27,18 +27,22 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
// tfile header content
|
||||
// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->|
|
||||
// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->|
|
||||
// |<---suid--->|<---version--->|<-------colName------>|<---type-->|<--fstOffset->|
|
||||
// |<-uint64_t->|<---int32_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->|
|
||||
|
||||
#pragma pack(push, 1)
|
||||
typedef struct TFileHeader {
|
||||
uint64_t suid;
|
||||
int32_t version;
|
||||
char colName[128]; //
|
||||
char colName[TSDB_COL_NAME_LEN]; //
|
||||
uint8_t colType;
|
||||
int32_t fstOffset;
|
||||
} TFileHeader;
|
||||
#pragma pack(pop)
|
||||
|
||||
#define TFILE_HEADER_SIZE (sizeof(TFileHeader) + sizeof(uint32_t))
|
||||
#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
|
||||
#define TFILE_HEADER_SIZE (sizeof(TFileHeader))
|
||||
#define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t))
|
||||
//#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
|
||||
|
||||
typedef struct TFileCacheKey {
|
||||
uint64_t suid;
|
||||
|
|
|
@ -31,124 +31,21 @@ typedef struct TFileValue {
|
|||
int32_t offset;
|
||||
} TFileValue;
|
||||
|
||||
static int tfileValueCompare(const void* a, const void* b, const void* param);
|
||||
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
|
||||
|
||||
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
|
||||
static int tfileWriteHeader(TFileWriter* writer);
|
||||
static int tfileWriteData(TFileWriter* write, TFileValue* tval);
|
||||
|
||||
static int tfileReadLoadHeader(TFileReader* reader);
|
||||
|
||||
static int tfileGetFileList(const char* path, SArray* result);
|
||||
static void tfileDestroyFileName(void* elem);
|
||||
static int tfileCompare(const void* a, const void* b);
|
||||
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
|
||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
|
||||
// static tfileGetCompareFunc(uint8_t byte) {}
|
||||
static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
||||
__compar_fn_t fn = *(__compar_fn_t*)param;
|
||||
|
||||
TFileValue* av = (TFileValue*)a;
|
||||
TFileValue* bv = (TFileValue*)b;
|
||||
|
||||
return fn(av->colVal, bv->colVal);
|
||||
}
|
||||
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) {
|
||||
int tbSz = taosArrayGetSize(tableIds);
|
||||
SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t);
|
||||
for (size_t i = 0; i < tbSz; i++) {
|
||||
uint64_t* v = taosArrayGet(tableIds, i);
|
||||
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
|
||||
}
|
||||
}
|
||||
static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) {
|
||||
char buf[TFILE_HEADER_SIZE] = {0};
|
||||
char* p = buf;
|
||||
|
||||
TFileHeader* header = &writer->header;
|
||||
SERIALIZE_MEM_TO_BUF(p, header, suid);
|
||||
SERIALIZE_MEM_TO_BUF(p, header, version);
|
||||
SERIALIZE_VAR_TO_BUF(p, strlen(header->colName), int32_t);
|
||||
|
||||
SERIALIZE_STR_MEM_TO_BUF(p, header, colName, strlen(header->colName));
|
||||
SERIALIZE_MEM_TO_BUF(p, header, colType);
|
||||
int offset = p - buf;
|
||||
int nwrite = writer->ctx->write(writer->ctx, buf, offset);
|
||||
if (offset != nwrite) { return -1; }
|
||||
writer->offset = offset;
|
||||
return 0;
|
||||
}
|
||||
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
|
||||
TFileHeader* header = &write->header;
|
||||
uint8_t colType = header->colType;
|
||||
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
|
||||
FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
|
||||
if (fstBuilderInsert(write->fb, key, tval->offset)) {
|
||||
fstSliceDestroy(&key);
|
||||
return 0;
|
||||
}
|
||||
fstSliceDestroy(&key);
|
||||
return -1;
|
||||
} else {
|
||||
// handle other type later
|
||||
}
|
||||
}
|
||||
static FORCE_INLINE int tfileReadLoadHeader(TFileReader* reader) {
|
||||
// TODO simple tfile header later
|
||||
char buf[TFILE_HADER_PRE_SIZE];
|
||||
char* p = buf;
|
||||
|
||||
int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HADER_PRE_SIZE);
|
||||
assert(nread == TFILE_HADER_PRE_SIZE);
|
||||
|
||||
TFileHeader* header = &reader->header;
|
||||
memcpy(&header->suid, p, sizeof(header->suid));
|
||||
p += sizeof(header->suid);
|
||||
|
||||
memcpy(&header->version, p, sizeof(header->version));
|
||||
p += sizeof(header->version);
|
||||
|
||||
int32_t colLen = 0;
|
||||
memcpy(&colLen, p, sizeof(colLen));
|
||||
assert(colLen < sizeof(header->colName));
|
||||
nread = reader->ctx->read(reader->ctx, header->colName, colLen);
|
||||
assert(nread == colLen);
|
||||
|
||||
nread = reader->ctx->read(reader->ctx, &header->colType, sizeof(header->colType));
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tfileGetFileList(const char* path, SArray* result) {
|
||||
DIR* dir = opendir(path);
|
||||
if (NULL == dir) { return -1; }
|
||||
|
||||
struct dirent* entry;
|
||||
while ((entry = readdir(dir)) != NULL) {
|
||||
size_t len = strlen(entry->d_name);
|
||||
char* buf = calloc(1, len + 1);
|
||||
memcpy(buf, entry->d_name, len);
|
||||
taosArrayPush(result, &buf);
|
||||
}
|
||||
closedir(dir);
|
||||
return 0;
|
||||
}
|
||||
static void tfileDestroyFileName(void* elem) {
|
||||
char* p = *(char**)elem;
|
||||
free(p);
|
||||
}
|
||||
static int tfileCompare(const void* a, const void* b) {
|
||||
const char* aName = *(char**)a;
|
||||
const char* bName = *(char**)b;
|
||||
|
||||
size_t aLen = strlen(aName);
|
||||
size_t bLen = strlen(bName);
|
||||
|
||||
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
||||
}
|
||||
// tfile name suid-colId-version.tindex
|
||||
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
|
||||
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
|
||||
// read suid & colid & version success
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
|
||||
SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
SERIALIZE_MEM_TO_BUF(buf, key, version);
|
||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
||||
}
|
||||
|
||||
TFileCache* tfileCacheCreate(const char* path) {
|
||||
TFileCache* tcache = calloc(1, sizeof(TFileCache));
|
||||
|
@ -285,7 +182,7 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
|
|||
return tw;
|
||||
}
|
||||
|
||||
int TFileWriterPut(TFileWriter* tw, void* data) {
|
||||
int tfileWriterPut(TFileWriter* tw, void* data) {
|
||||
// sort by coltype and write to tindex
|
||||
__compar_fn_t fn = getComparFunc(tw->header.colType, 0);
|
||||
taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
|
||||
|
@ -294,6 +191,21 @@ int TFileWriterPut(TFileWriter* tw, void* data) {
|
|||
char* buf = calloc(1, sizeof(bufLimit));
|
||||
char* p = buf;
|
||||
int32_t sz = taosArrayGetSize((SArray*)data);
|
||||
int32_t fstOffset = tw->offset;
|
||||
|
||||
// ugly code, refactor later
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||
|
||||
int32_t tbsz = taosArrayGetSize(v->tableId);
|
||||
int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);
|
||||
fstOffset += ttsz;
|
||||
}
|
||||
// check result or not
|
||||
tfileWriteFstOffset(tw, fstOffset);
|
||||
// tw->ctx->header.fstOffset = fstOffset;
|
||||
// tw->ctx->write(tw->ctx, &fstOffset, sizeof(fstOffset));
|
||||
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
TFileValue* v = taosArrayGetP((SArray*)data, i);
|
||||
|
||||
|
@ -311,7 +223,7 @@ int TFileWriterPut(TFileWriter* tw, void* data) {
|
|||
tfileSerialTableIdsToBuf(p, v->tableId);
|
||||
offset += ttsz;
|
||||
p = buf + offset;
|
||||
// set up value offset and
|
||||
// set up value offset
|
||||
v->offset = tw->offset;
|
||||
tw->offset += ttsz;
|
||||
}
|
||||
|
@ -366,3 +278,109 @@ int indexTFilePut(void* tfile, SIndexTerm* term, uint64_t uid) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tfileValueCompare(const void* a, const void* b, const void* param) {
|
||||
__compar_fn_t fn = *(__compar_fn_t*)param;
|
||||
|
||||
TFileValue* av = (TFileValue*)a;
|
||||
TFileValue* bv = (TFileValue*)b;
|
||||
|
||||
return fn(av->colVal, bv->colVal);
|
||||
}
|
||||
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) {
|
||||
int tbSz = taosArrayGetSize(tableIds);
|
||||
SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t);
|
||||
for (size_t i = 0; i < tbSz; i++) {
|
||||
uint64_t* v = taosArrayGet(tableIds, i);
|
||||
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
|
||||
}
|
||||
}
|
||||
|
||||
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
|
||||
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
|
||||
tw->header.fstOffset = fstOffset;
|
||||
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
|
||||
return 0;
|
||||
}
|
||||
static int tfileWriteHeader(TFileWriter* writer) {
|
||||
char buf[TFILE_HEADER_NO_FST] = {0};
|
||||
char* p = buf;
|
||||
|
||||
TFileHeader* header = &writer->header;
|
||||
memcpy(buf, (char*)header, sizeof(buf));
|
||||
|
||||
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
|
||||
if (sizeof(buf) != nwrite) { return -1; }
|
||||
writer->offset = nwrite;
|
||||
return 0;
|
||||
}
|
||||
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
|
||||
TFileHeader* header = &write->header;
|
||||
uint8_t colType = header->colType;
|
||||
if (colType == TSDB_DATA_TYPE_BINARY || colType == TSDB_DATA_TYPE_NCHAR) {
|
||||
FstSlice key = fstSliceCreate((uint8_t*)(tval->colVal), (size_t)strlen(tval->colVal));
|
||||
if (fstBuilderInsert(write->fb, key, tval->offset)) {
|
||||
fstSliceDestroy(&key);
|
||||
return 0;
|
||||
}
|
||||
fstSliceDestroy(&key);
|
||||
return -1;
|
||||
} else {
|
||||
// handle other type later
|
||||
}
|
||||
}
|
||||
static int tfileReadLoadHeader(TFileReader* reader) {
|
||||
// TODO simple tfile header later
|
||||
char buf[TFILE_HEADER_SIZE] = {0};
|
||||
char* p = buf;
|
||||
|
||||
int64_t nread = reader->ctx->read(reader->ctx, buf, sizeof(buf));
|
||||
assert(nread == sizeof(buf));
|
||||
memcpy(&reader->header, buf, sizeof(buf));
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tfileGetFileList(const char* path, SArray* result) {
|
||||
DIR* dir = opendir(path);
|
||||
if (NULL == dir) { return -1; }
|
||||
|
||||
struct dirent* entry;
|
||||
while ((entry = readdir(dir)) != NULL) {
|
||||
size_t len = strlen(entry->d_name);
|
||||
char* buf = calloc(1, len + 1);
|
||||
memcpy(buf, entry->d_name, len);
|
||||
taosArrayPush(result, &buf);
|
||||
}
|
||||
closedir(dir);
|
||||
return 0;
|
||||
}
|
||||
static void tfileDestroyFileName(void* elem) {
|
||||
char* p = *(char**)elem;
|
||||
free(p);
|
||||
}
|
||||
static int tfileCompare(const void* a, const void* b) {
|
||||
const char* aName = *(char**)a;
|
||||
const char* bName = *(char**)b;
|
||||
|
||||
size_t aLen = strlen(aName);
|
||||
size_t bLen = strlen(bName);
|
||||
|
||||
return strncmp(aName, bName, aLen > bLen ? aLen : bLen);
|
||||
}
|
||||
// tfile name suid-colId-version.tindex
|
||||
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version) {
|
||||
if (3 == sscanf(filename, "%" PRIu64 "-%d-%d.tindex", suid, colId, version)) {
|
||||
// read suid & colid & version success
|
||||
return 0;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf) {
|
||||
SERIALIZE_MEM_TO_BUF(buf, key, suid);
|
||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
SERIALIZE_MEM_TO_BUF(buf, key, colType);
|
||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
SERIALIZE_MEM_TO_BUF(buf, key, version);
|
||||
SERIALIZE_VAR_TO_BUF(buf, '_', char);
|
||||
SERIALIZE_STR_MEM_TO_BUF(buf, key, colName, key->nColName);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue