From 4bab45c039be102094218eaccb5293d125ef06f7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 2 Dec 2021 15:48:30 +0800 Subject: [PATCH] refactor builder struct --- source/libs/index/inc/index_fst.h | 6 +- .../index/inc/index_fst_counting_writer.h | 30 +++++++- .../index/src/index_fst_counting_writer.c | 76 +++++++++++++++++-- 3 files changed, 104 insertions(+), 8 deletions(-) diff --git a/source/libs/index/inc/index_fst.h b/source/libs/index/inc/index_fst.h index cd5073187c..96838d5843 100644 --- a/source/libs/index/inc/index_fst.h +++ b/source/libs/index/inc/index_fst.h @@ -86,14 +86,16 @@ uint64_t fstUnFinishedNodesFindCommPrefixAndSetOutput(FstUnFinishedNodes *node, typedef struct FstBuilder { FstCountingWriter *wrt; // The FST raw data is written directly to `wtr`. FstUnFinishedNodes *unfinished; // The stack of unfinished nodes - FstRegistry* registry; // A map of finished nodes. - FstSlice last; // The last word added + FstRegistry* registry; // A map of finished nodes. + FstSlice last; // The last word added CompiledAddr lastAddr; // The address of the last compiled node uint64_t len; // num of keys added } FstBuilder; FstBuilder *fstBuilderCreate(void *w, FstType ty); + + void fstBuilderDestroy(FstBuilder *b); void fstBuilderInsertOutput(FstBuilder *b, FstSlice bs, Output in); OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup); diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index 4650804034..42d77cacb0 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -16,6 +16,34 @@ #ifndef __INDEX_FST_COUNTING_WRITER_H__ #define __INDEX_FST_COUNTING_WRITER_H__ +#include "tfile.h" + + +#define DefaultMem 1024*1024 + +static char tmpFile[] = "/tmp/index"; +typedef enum WriterType {TMemory, TFile} WriterType; + +typedef struct WriterCtx { + int (*write)(struct WriterCtx *ctx, uint8_t *buf, int len); + int (*read)(struct WriterCtx *ctx, uint8_t *buf, int len); + int (*flush)(struct WriterCtx *ctx); + WriterType type; + union { + int fd; + void *mem; + }; + int32_t offset; + int32_t limit; +} WriterCtx; + +static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len); +static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len); +static int writeCtxDoFlush(WriterCtx *ctx); + +WriterCtx* writerCtxCreate(WriterType type); +void writerCtxDestroy(WriterCtx *w); + typedef uint32_t CheckSummer; @@ -25,7 +53,7 @@ typedef struct FstCountingWriter { CheckSummer summer; } FstCountingWriter; -uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen); +int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen); int fstCountingWriterFlush(FstCountingWriter *write); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index a0a2c380f1..f04f6eddad 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -16,24 +16,88 @@ #include "index_fst_util.h" #include "index_fst_counting_writer.h" +static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { + if (ctx->offset + len > ctx->limit) { + return -1; + } + + if (ctx->type == TFile) { + assert(len != tfWrite(ctx->fd, buf, len)); + } else { + memcpy(ctx->mem + ctx->offset, buf, len); + } + ctx->offset += len; + return len; +} +static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) { + if (ctx->type == TFile) { + tfRead(ctx->fd, buf, len); + } else { + memcpy(buf, ctx->mem + ctx->offset, len); + } + ctx->offset += len; + + return 1; +} +static int writeCtxDoFlush(WriterCtx *ctx) { + if (ctx->type == TFile) { + //tfFlush(ctx->fd); + } else { + // do nothing + } + return 1; +} + +WriterCtx* writerCtxCreate(WriterType type) { + WriterCtx *ctx = calloc(1, sizeof(WriterCtx)); + if (ctx == NULL) { return NULL; } + + ctx->type == type; + if (ctx->type == TFile) { + ctx->fd = tfOpenCreateWriteAppend(tmpFile); + } else if (ctx->type == TMemory) { + ctx->mem = calloc(1, DefaultMem * sizeof(uint8_t)); + } + ctx->write = writeCtxDoWrite; + ctx->read = writeCtxDoRead; + ctx->flush = writeCtxDoFlush; + + ctx->offset = 0; + ctx->limit = DefaultMem; + + return ctx; +} +void writerCtxDestroy(WriterCtx *ctx) { + if (ctx->type == TMemory) { + free(ctx->mem); + } else { + tfClose(ctx->fd); + } + free(ctx); +} + + FstCountingWriter *fstCountingWriterCreate(void *wrt) { FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); if (cw == NULL) { return NULL; } - - cw->wrt = wrt; + + cw->wrt = (void *)(writerCtxCreate(TMemory)); return cw; } void fstCountingWriterDestroy(FstCountingWriter *cw) { // free wrt object: close fd or free mem + writerCtxDestroy((WriterCtx *)(cw->wrt)); free(cw); } -uint64_t fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) { +int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) { if (write == NULL) { return 0; } // update checksum // write data to file/socket or mem - - write->count += bufLen; + WriterCtx *ctx = write->wrt; + + int nWrite = ctx->write(ctx, buf, bufLen); + write->count += nWrite; return bufLen; } @@ -41,6 +105,8 @@ uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) { return 0; } int fstCountingWriterFlush(FstCountingWriter *write) { + WriterCtx *ctx = write->wrt; + ctx->flush(ctx); //write->wtr->flush return 1; }