diff --git a/source/libs/index/inc/index_fst_counting_writer.h b/source/libs/index/inc/index_fst_counting_writer.h index 86bc0d07d5..ea090389bb 100644 --- a/source/libs/index/inc/index_fst_counting_writer.h +++ b/source/libs/index/inc/index_fst_counting_writer.h @@ -45,7 +45,7 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len); static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len); static int writeCtxDoFlush(WriterCtx *ctx); -WriterCtx* writerCtxCreate(WriterType type); +WriterCtx* writerCtxCreate(WriterType type, bool readOnly); void writerCtxDestroy(WriterCtx *w); typedef uint32_t CheckSummer; @@ -57,14 +57,16 @@ typedef struct FstCountingWriter { CheckSummer summer; } FstCountingWriter; -int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen); +int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len); + +int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len); int fstCountingWriterFlush(FstCountingWriter *write); uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write); -FstCountingWriter *fstCountingWriterCreate(void *wtr); +FstCountingWriter *fstCountingWriterCreate(void *wtr, bool readOnly); void fstCountingWriterDestroy(FstCountingWriter *w); diff --git a/source/libs/index/src/index_fst.c b/source/libs/index/src/index_fst.c index 76dbe24b2a..0cc95b738d 100644 --- a/source/libs/index/src/index_fst.c +++ b/source/libs/index/src/index_fst.c @@ -404,7 +404,7 @@ CompiledAddr fstStateTransAddr(FstState *s, FstNode *node) { assert(s->state == OneTransNext || s->state == OneTrans); FstSlice *slice = &node->data; if (s->state == OneTransNext) { - return (CompiledAddr)(node->end); + return (CompiledAddr)(node->end) - 1; } else { PackSizes sizes = node->sizes; uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(sizes); @@ -459,7 +459,7 @@ Output fstStateOutput(FstState *s, FstNode *node) { uint8_t tSizes = FST_GET_TRANSITION_PACK_SIZE(node->sizes); uint64_t i = node->start - - fstStateInputLen(s); + - fstStateInputLen(s) - 1 - tSizes - oSizes; @@ -620,7 +620,7 @@ FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *slice) { n->version = version; n->state = st; n->start = addr; - n->end = fstStateEndAddrForOneTransNext(&st, slice); //? s.end_addr(data); + n->end = fstStateEndAddrForOneTransNext(&st, &n->data); //? s.end_addr(data); n->isFinal = false; n->sizes = 0; n->nTrans = 1; @@ -632,23 +632,24 @@ FstNode *fstNodeCreate(int64_t version, CompiledAddr addr, FstSlice *slice) { n->version = version; n->state = st; n->start = addr; - n->end = fstStateEndAddrForOneTrans(&st, slice, sz); // s.end_addr(data, sz); + n->end = fstStateEndAddrForOneTrans(&st, &data, sz); // s.end_addr(data, sz); n->isFinal = false; n->nTrans = 1; n->sizes = sz; n->finalOutput = 0; } else { - uint64_t sz = fstStateSizes(&st, slice); // s.sizes(data) - uint32_t nTrans = fstStateNtrans(&st, slice); // s.ntrans(data) - n->data = *slice; + FstSlice data = fstSliceCopy(slice, 0, addr); + uint64_t sz = fstStateSizes(&st, &data); // s.sizes(data) + uint32_t nTrans = fstStateNtrans(&st, &data); // s.ntrans(data) + n->data = data; n->version = version; n->state = st; n->start = addr; - n->end = fstStateEndAddrForAnyTrans(&st, version, slice, sz, nTrans); // s.end_addr(version, data, sz, ntrans); + n->end = fstStateEndAddrForAnyTrans(&st, version, &data, sz, nTrans); // s.end_addr(version, data, sz, ntrans); n->isFinal = fstStateIsFinalState(&st); // s.is_final_state(); n->nTrans = nTrans; n->sizes = sz; - n->finalOutput = fstStateFinalOutput(&st, version, slice, sz, nTrans); // s.final_output(version, data, sz, ntrans); + n->finalOutput = fstStateFinalOutput(&st, version, &data, sz, nTrans); // s.final_output(version, data, sz, ntrans); } return n; } @@ -771,7 +772,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) { if (NULL == b) { return b; } - b->wrt = fstCountingWriterCreate(w); + b->wrt = fstCountingWriterCreate(w, false); b->unfinished = fstUnFinishedNodesCreate(); b->registry = fstRegistryCreate(10000, 2) ; b->last = fstSliceCreate(NULL, 0); @@ -1043,8 +1044,9 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) { for (uint32_t i = 0; i < len; i++) { uint8_t inp = data[i]; Output res = 0; - bool null = fstNodeFindInput(root, inp, &res); - if (null) { return false; } + if (false == fstNodeFindInput(root, inp, &res)) { + return false; + } FstTransition trn; fstNodeGetTransitionAt(root, res, &trn); diff --git a/source/libs/index/src/index_fst_counting_writer.c b/source/libs/index/src/index_fst_counting_writer.c index 9c1a0b9f5d..9ec346cebc 100644 --- a/source/libs/index/src/index_fst_counting_writer.c +++ b/source/libs/index/src/index_fst_counting_writer.c @@ -31,17 +31,19 @@ static int writeCtxDoWrite(WriterCtx *ctx, uint8_t *buf, int len) { return len; } static int writeCtxDoRead(WriterCtx *ctx, uint8_t *buf, int len) { + int nRead = 0; if (ctx->type == TFile) { - tfRead(ctx->fd, buf, len); + nRead = tfRead(ctx->fd, buf, len); } else { memcpy(buf, ctx->mem + ctx->offset, len); } - ctx->offset += len; + ctx->offset += nRead; - return 1; + return nRead; } static int writeCtxDoFlush(WriterCtx *ctx) { if (ctx->type == TFile) { + //tfFsync(ctx->fd); //tfFlush(ctx->fd); } else { // do nothing @@ -49,14 +51,19 @@ static int writeCtxDoFlush(WriterCtx *ctx) { return 1; } -WriterCtx* writerCtxCreate(WriterType type) { +WriterCtx* writerCtxCreate(WriterType type, bool readOnly) { WriterCtx *ctx = calloc(1, sizeof(WriterCtx)); if (ctx == NULL) { return NULL; } ctx->type = type; if (ctx->type == TFile) { tfInit(); - ctx->fd = tfOpenCreateWriteAppend(tmpFile); + // ugly code, refactor later + if (readOnly == false) { + ctx->fd = tfOpenCreateWriteAppend(tmpFile); + } else { + ctx->fd = tfOpenReadWrite(tmpFile); + } if (ctx->fd < 0) { indexError("open file error %d", errno); } @@ -76,35 +83,44 @@ void writerCtxDestroy(WriterCtx *ctx) { if (ctx->type == TMemory) { free(ctx->mem); } else { - tfCleanup(); tfClose(ctx->fd); + tfCleanup(); } free(ctx); } -FstCountingWriter *fstCountingWriterCreate(void *wrt) { +FstCountingWriter *fstCountingWriterCreate(void *wrt, bool readOnly) { FstCountingWriter *cw = calloc(1, sizeof(FstCountingWriter)); if (cw == NULL) { return NULL; } - cw->wrt = (void *)(writerCtxCreate(TFile)); + cw->wrt = (void *)(writerCtxCreate(TFile, readOnly)); return cw; } void fstCountingWriterDestroy(FstCountingWriter *cw) { // free wrt object: close fd or free mem + fstCountingWriterFlush(cw); writerCtxDestroy((WriterCtx *)(cw->wrt)); free(cw); } -int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t bufLen) { +int fstCountingWriterWrite(FstCountingWriter *write, uint8_t *buf, uint32_t len) { if (write == NULL) { return 0; } // update checksum // write data to file/socket or mem WriterCtx *ctx = write->wrt; - int nWrite = ctx->write(ctx, buf, bufLen); - write->count += nWrite; - return bufLen; + int nWrite = ctx->write(ctx, buf, len); + assert(nWrite == len); + write->count += len; + return len; +} +int fstCountingWriterRead(FstCountingWriter *write, uint8_t *buf, uint32_t len) { + if (write == NULL) { return 0; } + WriterCtx *ctx = write->wrt; + int nRead = ctx->read(ctx, buf, len); + //assert(nRead == len); + return nRead; } uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter *write) { diff --git a/source/libs/index/src/index_fst_util.c b/source/libs/index/src/index_fst_util.c index ab74a76eb9..c933c6d23b 100644 --- a/source/libs/index/src/index_fst_util.c +++ b/source/libs/index/src/index_fst_util.c @@ -67,7 +67,7 @@ uint8_t packSize(uint64_t n) { } uint64_t unpackUint64(uint8_t *ch, uint8_t sz) { - uint64_t n; + uint64_t n = 0; for (uint8_t i = 0; i < sz; i++) { n = n | (ch[i] << (8 * i)); } diff --git a/source/libs/index/test/indexTests.cpp b/source/libs/index/test/indexTests.cpp index 5164209922..858861529d 100644 --- a/source/libs/index/test/indexTests.cpp +++ b/source/libs/index/test/indexTests.cpp @@ -63,6 +63,7 @@ //} int main(int argc, char** argv) { + // test write FstBuilder *b = fstBuilderCreate(NULL, 0); { std::string str("aaa"); @@ -87,9 +88,42 @@ int main(int argc, char** argv) { } } - //fstBuilderInsert(b, key1, val2); fstBuilderFinish(b); fstBuilderDestroy(b); + + + char buf[64 * 1024] = {0}; + + FstSlice s; + + FstCountingWriter *w = fstCountingWriterCreate(NULL, true); + int nRead = fstCountingWriterRead(w, (uint8_t *)buf, sizeof(buf)); + assert(nRead <= sizeof(buf)); + s = fstSliceCreate((uint8_t *)buf, nRead); + fstCountingWriterDestroy(w); + + + // test reader + + + Fst *fst = fstCreate(&s); + { + std::string str("aaa"); + uint64_t out; + + + FstSlice key = fstSliceCreate((uint8_t *)str.c_str(), str.size()); + bool ok = fstGet(fst, &key, &out); + if (ok == true) { + //indexInfo("Get key-value success, %s, %d", str.c_str(), out); + } else { + //indexError("Get key-value failed, %s", str.c_str()); + } + } + fstSliceDestroy(&s); + + + return 1; }