update tindex write

This commit is contained in:
yihaoDeng 2021-12-22 11:48:51 +08:00
parent ecc435803f
commit 844c5ac4bb
1 changed files with 60 additions and 2 deletions

View File

@ -21,7 +21,33 @@
#include "index_fst_counting_writer.h" #include "index_fst_counting_writer.h"
#include "index_util.h" #include "index_util.h"
#include "taosdef.h" #include "taosdef.h"
#include "tcompare.h"
#define TF_TABLE_TATOAL_SIZE(sz) (sizeof(sz) + sz * sizeof(uint64_t))
typedef struct TFileValue {
char* colVal; // null terminated
SArray* tableId;
int32_t offset;
} TFileValue;
// static tfileGetCompareFunc(uint8_t byte) {}
static int tfileValueCompare(const void* a, const void* b, const void* param) {
__compar_fn_t fn = *(__compar_fn_t*)param;
TFileValue* av = (TFileValue*)a;
TFileValue* bv = (TFileValue*)b;
return fn(av->colVal, bv->colVal);
}
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) {
int tbSz = taosArrayGetSize(tableIds);
SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t);
for (size_t i = 0; i < tbSz; i++) {
uint64_t* v = taosArrayGet(tableIds, i);
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
}
}
static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) { static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) {
char buf[TFILE_HEADER_SIZE] = {0}; char buf[TFILE_HEADER_SIZE] = {0};
char* p = buf; char* p = buf;
@ -245,9 +271,41 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
} }
int TFileWriterPut(TFileWriter* tw, void* data) { int TFileWriterPut(TFileWriter* tw, void* data) {
int32_t sz = taosArrayGetSize((SArray*)data); // sort by coltype and write to tindex
__compar_fn_t fn = getComparFunc(tw->header.colType, 0);
taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
// sort by and write to int32_t bufLimit = 4096, offset = 0;
char* buf = calloc(1, sizeof(bufLimit));
char* p = buf;
int32_t sz = taosArrayGetSize((SArray*)data);
for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i);
int32_t tbsz = taosArrayGetSize(v->tableId);
// check buf has enough space or not
int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);
if (offset + ttsz > bufLimit) {
// batch write
tw->ctx->write(tw->ctx, buf, offset);
offset = 0;
memset(buf, 0, bufLimit);
p = buf;
}
tfileSerialTableIdsToBuf(p, v->tableId);
offset += ttsz;
p = buf + offset;
// set up value offset and
v->offset = tw->offset;
tw->offset += ttsz;
}
if (offset != 0) {
// write reversed data in buf to tindex
tw->ctx->write(tw->ctx, buf, offset);
}
tfree(buf);
return 0; return 0;
} }
void tfileWriterDestroy(TFileWriter* tw) { void tfileWriterDestroy(TFileWriter* tw) {