From 844c5ac4bb89b5f3ac17c0f6cf65768781b49396 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 22 Dec 2021 11:48:51 +0800 Subject: [PATCH] update tindex write --- source/libs/index/src/index_tfile.c | 62 ++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index ad249868d5..79d4432180 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -21,7 +21,33 @@ #include "index_fst_counting_writer.h" #include "index_util.h" #include "taosdef.h" +#include "tcompare.h" +#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t)) + +typedef struct TFileValue { + char* colVal; // null terminated + SArray* tableId; + int32_t offset; +} TFileValue; + +// static tfileGetCompareFunc(uint8_t byte) {} +static int tfileValueCompare(const void* a, const void* b, const void* param) { + __compar_fn_t fn = *(__compar_fn_t*)param; + + TFileValue* av = (TFileValue*)a; + TFileValue* bv = (TFileValue*)b; + + return fn(av->colVal, bv->colVal); +} +static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) { + int tbSz = taosArrayGetSize(tableIds); + SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t); + for (size_t i = 0; i < tbSz; i++) { + uint64_t* v = taosArrayGet(tableIds, i); + SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t); + } +} static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) { char buf[TFILE_HEADER_SIZE] = {0}; char* p = buf; @@ -245,9 +271,41 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) { } int TFileWriterPut(TFileWriter* tw, void* data) { - int32_t sz = taosArrayGetSize((SArray*)data); + // sort by coltype and write to tindex + __compar_fn_t fn = getComparFunc(tw->header.colType, 0); + taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn); - // sort by and write to + int32_t bufLimit = 4096, offset = 0; + char* buf = calloc(1, sizeof(bufLimit)); + char* p = buf; + int32_t sz = taosArrayGetSize((SArray*)data); + for (size_t i = 0; i < sz; i++) { + TFileValue* v = taosArrayGetP((SArray*)data, i); + + int32_t tbsz = taosArrayGetSize(v->tableId); + // check buf has enough space or not + int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz); + if (offset + ttsz > bufLimit) { + // batch write + tw->ctx->write(tw->ctx, buf, offset); + offset = 0; + memset(buf, 0, bufLimit); + p = buf; + } + + tfileSerialTableIdsToBuf(p, v->tableId); + offset += ttsz; + p = buf + offset; + // set up value offset and + v->offset = tw->offset; + tw->offset += ttsz; + } + if (offset != 0) { + // write reversed data in buf to tindex + tw->ctx->write(tw->ctx, buf, offset); + } + + tfree(buf); return 0; } void tfileWriterDestroy(TFileWriter* tw) {