handle read/write crash concurrently
This commit is contained in:
parent
e309048513
commit
2eb0053e5a
|
@ -38,6 +38,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname);
|
||||||
int64_t tfClose(int64_t tfd);
|
int64_t tfClose(int64_t tfd);
|
||||||
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
|
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
|
||||||
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
|
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
|
||||||
|
int64_t tfPread(int64_t tfd, void *buf, int64_t count, int64_t offset);
|
||||||
int32_t tfFsync(int64_t tfd);
|
int32_t tfFsync(int64_t tfd);
|
||||||
bool tfValid(int64_t tfd);
|
bool tfValid(int64_t tfd);
|
||||||
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
|
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
|
||||||
|
|
|
@ -459,7 +459,7 @@ void iterateValueDestroy(IterateValue* value, bool destroy) {
|
||||||
} else {
|
} else {
|
||||||
if (value->val != NULL) { taosArrayClear(value->val); }
|
if (value->val != NULL) { taosArrayClear(value->val); }
|
||||||
}
|
}
|
||||||
free(value->colVal);
|
// free(value->colVal);
|
||||||
value->colVal = NULL;
|
value->colVal = NULL;
|
||||||
}
|
}
|
||||||
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
|
||||||
|
|
|
@ -1033,15 +1033,17 @@ FstStreamWithStateBuilder* fstSearchWithState(Fst* fst, AutomationCtx* ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
FstNode* fstGetRoot(Fst* fst) {
|
FstNode* fstGetRoot(Fst* fst) {
|
||||||
// pthread_mutex_lock(&fst->mtx);
|
|
||||||
if (fst->root != NULL) {
|
|
||||||
// pthread_mutex_unlock(&fst->mtx);
|
|
||||||
return fst->root;
|
|
||||||
}
|
|
||||||
CompiledAddr rAddr = fstGetRootAddr(fst);
|
CompiledAddr rAddr = fstGetRootAddr(fst);
|
||||||
fst->root = fstGetNode(fst, rAddr);
|
return fstGetNode(fst, rAddr);
|
||||||
// pthread_mutex_unlock(&fst->mtx);
|
// pthread_mutex_lock(&fst->mtx);
|
||||||
return fst->root;
|
// if (fst->root != NULL) {
|
||||||
|
// // pthread_mutex_unlock(&fst->mtx);
|
||||||
|
// return fst->root;
|
||||||
|
//}
|
||||||
|
// CompiledAddr rAddr = fstGetRootAddr(fst);
|
||||||
|
// fst->root = fstGetNode(fst, rAddr);
|
||||||
|
//// pthread_mutex_unlock(&fst->mtx);
|
||||||
|
// return fst->root;
|
||||||
}
|
}
|
||||||
|
|
||||||
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
|
FstNode* fstGetNode(Fst* fst, CompiledAddr addr) {
|
||||||
|
|
|
@ -42,8 +42,8 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
|
||||||
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
|
||||||
int nRead = 0;
|
int nRead = 0;
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFile) {
|
||||||
tfLseek(ctx->file.fd, offset, 0);
|
// tfLseek(ctx->file.fd, offset, 0);
|
||||||
nRead = tfRead(ctx->file.fd, buf, len);
|
nRead = tfPread(ctx->file.fd, buf, len, offset);
|
||||||
} else {
|
} else {
|
||||||
// refactor later
|
// refactor later
|
||||||
assert(0);
|
assert(0);
|
||||||
|
@ -52,6 +52,7 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
|
||||||
}
|
}
|
||||||
static int writeCtxDoFlush(WriterCtx* ctx) {
|
static int writeCtxDoFlush(WriterCtx* ctx) {
|
||||||
if (ctx->type == TFile) {
|
if (ctx->type == TFile) {
|
||||||
|
// taosFsyncFile(ctx->file.fd);
|
||||||
tfFsync(ctx->file.fd);
|
tfFsync(ctx->file.fd);
|
||||||
// tfFlush(ctx->file.fd);
|
// tfFlush(ctx->file.fd);
|
||||||
} else {
|
} else {
|
||||||
|
@ -69,13 +70,15 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
||||||
// ugly code, refactor later
|
// ugly code, refactor later
|
||||||
ctx->file.readOnly = readOnly;
|
ctx->file.readOnly = readOnly;
|
||||||
if (readOnly == false) {
|
if (readOnly == false) {
|
||||||
|
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
ctx->file.fd = tfOpenCreateWriteAppend(path);
|
ctx->file.fd = tfOpenCreateWriteAppend(path);
|
||||||
} else {
|
} else {
|
||||||
ctx->file.fd = tfOpenReadWrite(path);
|
// ctx->file.fd = open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||||
|
ctx->file.fd = tfOpenRead(path);
|
||||||
}
|
}
|
||||||
memcpy(ctx->file.buf, path, strlen(path));
|
memcpy(ctx->file.buf, path, strlen(path));
|
||||||
if (ctx->file.fd < 0) {
|
if (ctx->file.fd < 0) {
|
||||||
indexError("open file error %d", errno);
|
indexError("failed to open file, error %d", errno);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
} else if (ctx->type == TMemory) {
|
} else if (ctx->type == TMemory) {
|
||||||
|
@ -101,10 +104,7 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
|
||||||
free(ctx->mem.buf);
|
free(ctx->mem.buf);
|
||||||
} else {
|
} else {
|
||||||
tfClose(ctx->file.fd);
|
tfClose(ctx->file.fd);
|
||||||
if (remove) {
|
if (remove) { unlink(ctx->file.buf); }
|
||||||
indexError("rm file %s", ctx->file.buf);
|
|
||||||
unlink(ctx->file.buf);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
free(ctx);
|
free(ctx);
|
||||||
}
|
}
|
||||||
|
@ -144,6 +144,7 @@ int fstCountingWriterRead(FstCountingWriter* write, uint8_t* buf, uint32_t len)
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
|
uint32_t fstCountingWriterMaskedCheckSum(FstCountingWriter* write) { return 0; }
|
||||||
|
|
||||||
int fstCountingWriterFlush(FstCountingWriter* write) {
|
int fstCountingWriterFlush(FstCountingWriter* write) {
|
||||||
WriterCtx* ctx = write->wrt;
|
WriterCtx* ctx = write->wrt;
|
||||||
ctx->flush(ctx);
|
ctx->flush(ctx);
|
||||||
|
|
|
@ -149,7 +149,6 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
|
||||||
|
|
||||||
// T_REF_INC(reader);
|
// T_REF_INC(reader);
|
||||||
reader->ctx = ctx;
|
reader->ctx = ctx;
|
||||||
|
|
||||||
if (0 != tfileReaderLoadHeader(reader)) {
|
if (0 != tfileReaderLoadHeader(reader)) {
|
||||||
tfileReaderDestroy(reader);
|
tfileReaderDestroy(reader);
|
||||||
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
indexError("failed to load index header, suid: %" PRIu64 ", colName: %s", reader->header.suid,
|
||||||
|
@ -542,8 +541,14 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
|
||||||
char buf[TFILE_HEADER_SIZE] = {0};
|
char buf[TFILE_HEADER_SIZE] = {0};
|
||||||
|
|
||||||
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
||||||
assert(nread == sizeof(buf));
|
if (nread == -1) {
|
||||||
|
//
|
||||||
|
indexError("actual Read: %d, to read: %d, errno: %d, filefd: %d, filename: %s", (int)(nread), (int)sizeof(buf),
|
||||||
|
errno, reader->ctx->file.fd, reader->ctx->file.buf);
|
||||||
|
}
|
||||||
|
// assert(nread == sizeof(buf));
|
||||||
memcpy(&reader->header, buf, sizeof(buf));
|
memcpy(&reader->header, buf, sizeof(buf));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
static int tfileReaderLoadFst(TFileReader* reader) {
|
static int tfileReaderLoadFst(TFileReader* reader) {
|
||||||
|
@ -576,7 +581,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
|
||||||
char* buf = calloc(1, total);
|
char* buf = calloc(1, total);
|
||||||
if (buf == NULL) { return -1; }
|
if (buf == NULL) { return -1; }
|
||||||
|
|
||||||
nread = ctx->read(ctx, buf, total);
|
nread = ctx->readFrom(ctx, buf, total, offset + sizeof(nid));
|
||||||
assert(total == nread);
|
assert(total == nread);
|
||||||
|
|
||||||
for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
|
for (int32_t i = 0; i < nid; i++) { taosArrayPush(result, (uint64_t*)buf + i); }
|
||||||
|
|
|
@ -189,7 +189,7 @@ void validateTFile(char* arg) {
|
||||||
|
|
||||||
std::thread threads[NUM_OF_THREAD];
|
std::thread threads[NUM_OF_THREAD];
|
||||||
// std::vector<std::thread> threads;
|
// std::vector<std::thread> threads;
|
||||||
TFileReader* reader = tfileReaderOpen(arg, 0, 8417, "tag1");
|
TFileReader* reader = tfileReaderOpen(arg, 0, 295868, "tag1");
|
||||||
|
|
||||||
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
for (int i = 0; i < NUM_OF_THREAD; i++) {
|
||||||
threads[i] = std::thread(fst_get, reader->fst);
|
threads[i] = std::thread(fst_get, reader->fst);
|
||||||
|
|
|
@ -16,17 +16,15 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "ulog.h"
|
|
||||||
#include "tutil.h"
|
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "ulog.h"
|
||||||
|
|
||||||
static int32_t tsFileRsetId = -1;
|
static int32_t tsFileRsetId = -1;
|
||||||
|
|
||||||
static int8_t tfInited = 0;
|
static int8_t tfInited = 0;
|
||||||
|
|
||||||
static void tfCloseFile(void *p) {
|
static void tfCloseFile(void *p) { taosCloseFile((int32_t)(uintptr_t)p); }
|
||||||
taosCloseFile((int32_t)(uintptr_t)p);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tfInit() {
|
int32_t tfInit() {
|
||||||
int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1);
|
int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1);
|
||||||
|
@ -79,9 +77,7 @@ int64_t tfOpenCreateWriteAppend(const char *pathname, int32_t flags, mode_t mode
|
||||||
return tfOpenImp(fd);
|
return tfOpenImp(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t tfClose(int64_t tfd) {
|
int64_t tfClose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); }
|
||||||
return taosRemoveRef(tsFileRsetId, tfd);
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
|
int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
|
||||||
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||||
|
@ -109,6 +105,19 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t tfPread(int64_t tfd, void *buf, int64_t count, int32_t offset) {
|
||||||
|
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||||
|
if (p == NULL) return -1;
|
||||||
|
|
||||||
|
int32_t fd = (int32_t)(uintptr_t)p;
|
||||||
|
|
||||||
|
int64_t ret = pread(fd, buf, count, offset);
|
||||||
|
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
|
||||||
|
taosReleaseRef(tsFileRsetId, tfd);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tfFsync(int64_t tfd) {
|
int32_t tfFsync(int64_t tfd) {
|
||||||
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
void *p = taosAcquireRef(tsFileRsetId, tfd);
|
||||||
if (p == NULL) return -1;
|
if (p == NULL) return -1;
|
||||||
|
|
Loading…
Reference in New Issue