Merge branch '3.0' into feature/dnode3

This commit is contained in:
Shengliang Guan 2021-12-01 15:02:05 +08:00
commit f8fa856507
11 changed files with 523 additions and 128 deletions

View File

@ -18,6 +18,7 @@
#include "os.h"
#include "tdef.h"
#include "tlog.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
#endif
@ -39,12 +40,14 @@ typedef enum {
typedef struct {
int8_t sver;
int8_t reserved[3];
uint8_t msgType;
int8_t reserved[2];
int32_t len;
int64_t version;
uint32_t signature;
uint32_t cksum;
char cont[];
uint32_t cksumHead;
uint32_t cksumBody;
//char cont[];
} SWalHead;
typedef struct {
@ -54,16 +57,20 @@ typedef struct {
} SWalCfg;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN+1)
#define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFEUL))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
//#define WAL_FILE_NUM 1 // 3
#define WAL_FILESET_MAX 128
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_READ_ONLY 1
#define WAL_CUR_FILE_READ_ONLY 2
@ -94,6 +101,10 @@ typedef struct SWal {
pthread_mutex_t mutex;
//path
char path[WAL_PATH_LEN];
//file set
SArray* fileSet;
//reusable write head
SWalHead head;
} SWal; // WAL HANDLE
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
@ -104,21 +115,20 @@ void walCleanUp();
// handle open and ctl
SWal *walOpen(const char *path, SWalCfg *pCfg);
void walStop(SWal *pWal);
int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *);
// write
//int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen);
int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen);
//int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
// apis for lifecycle management
void walFsync(SWal *, bool force);
int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t walRollback(SWal *, int64_t ver);
// notify that previous log can be pruned safely
// notify that previous logs can be pruned safely
int32_t walTakeSnapshot(SWal *, int64_t ver);
//int32_t walDataCorrupted(SWal*);
@ -131,11 +141,6 @@ int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);
//internal
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
#ifdef __cplusplus
}
#endif

View File

@ -21,6 +21,7 @@
#include "index_fst_util.h"
#include "index_fst_registry.h"
#include "index_fst_counting_writer.h"
#include "index_fst_automation.h"
typedef struct FstNode FstNode;
@ -34,11 +35,28 @@ typedef struct FstRange {
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal} State;
typedef enum { Included, Excluded, Unbounded} FstBound;
typedef enum {Ordered, OutOfOrdered, DuplicateKey} OrderType;
typedef enum { Included, Excluded, Unbounded} FstBound;
typedef struct FstBoundWithData {
FstSlice data;
FstBound type;
} FstBoundWithData;
FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data);
bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice);
bool fstBoundWithDataIsEmpty(FstBoundWithData *bound);
bool fstBoundWithDataIsIncluded(FstBoundWithData *bound);
typedef struct FstOutput {
bool null;
Output out;
} FstOutput;
/*
*
@ -251,8 +269,51 @@ void fstDestroy(Fst *fst);
bool fstGet(Fst *fst, FstSlice *b, Output *out);
FstNode* fstGetNode(Fst *fst, CompiledAddr);
FstNode* fstGetRoot(Fst *fst);
FstType fstGetType(Fst *fst);
CompiledAddr fstGetRootAddr(Fst *fst);
Output fstEmptyFinalOutput(Fst *fst, bool *null);
bool fstVerify(Fst *fst);
//refactor this function
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
typedef struct StreamState {
FstNode *node;
uint64_t trans;
FstOutput out;
void *autState;
} StreamState;
void streamStateDestroy(void *s);
typedef struct StreamWithState {
Fst *fst;
Automation *aut;
SArray *inp;
FstOutput emptyOutput;
SArray *stack; // <StreamState>
FstBoundWithData *endAt;
} StreamWithState ;
typedef struct StreamWithStateResult {
FstSlice data;
FstOutput out;
void *state;
} StreamWithStateResult;
StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state);
typedef void* (*StreamCallback)(void *);
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) ;
void streamWithStateDestroy(StreamWithState *sws);
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min);
StreamWithStateResult* streamWithStateNextWith(StreamWithState *sws, StreamCallback callback);
#endif

View File

@ -15,7 +15,7 @@
#ifndef __INDEX_FST_AUTAOMATION_H__
#define __INDEX_FST_AUTAOMATION_H__
struct AutomationCtx;
typedef struct AutomationCtx AutomationCtx;
typedef struct StartWith {
AutomationCtx *autoSelf;
@ -23,20 +23,25 @@ typedef struct StartWith {
typedef struct Complement {
AutomationCtx *autoSelf;
} Complement;
// automation
typedef struct AutomationCtx {
// automation interface
void *data;
} AutomationCtx;
// automation interface
void (*start)(AutomationCtx *ctx);
bool (*isMatch)(AutomationCtx *ctx);
bool (*canMatch)(AutomationCtx *ctx, void *data);
bool (*willAlwaysMatch)(AutomationCtx *ctx, void *state);
void* (*accpet)(AutomationCtx *ctx, void *state, uint8_t byte);
void* (*accpetEof)(AutomationCtx *ctx, *state);
typedef struct Automation {
void* (*start)() ;
bool (*isMatch)(void *);
bool (*canMatch)(void *data);
bool (*willAlwaysMatch)(void *state);
void* (*accept)(void *state, uint8_t byte);
void* (*acceptEof)(void *state);
void *data;
} Automation;
#endif

View File

@ -41,7 +41,7 @@ FstBuilderNode *fstBuilderNodeClone(FstBuilderNode *src);
void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src);
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr);
void fstBuilderNodeDestroy(FstBuilderNode *node);

View File

@ -733,6 +733,11 @@ bool fstNodeCompile(FstNode *node, void *w, CompiledAddr lastAddr, CompiledAddr
return true;
}
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
return fstNodeCompile(NULL, wrt, lastAddr, startAddr, b);
}
FstBuilder *fstBuilderCreate(void *w, FstType ty) {
FstBuilder *b = malloc(sizeof(FstBuilder));
@ -985,16 +990,39 @@ void fstDestroy(Fst *fst) {
}
bool fstGet(Fst *fst, FstSlice *b, Output *out) {
FstNode *root = fstGetRoot(fst);
Output tOut = 0;
for (uint32_t i = 0; i < b->dLen; i++) {
uint8_t inp = b->data[i];
Output res = 0;
bool null = fstNodeFindInput(root, inp, &res);
if (null) { return false; }
FstTransition trn;
fstNodeGetTransitionAt(root, res, &trn);
tOut += trn.out;
root = fstGetNode(fst, trn.addr);
}
if (!FST_NODE_IS_FINAL(root)) {
return false;
} else {
tOut = tOut + FST_NODE_FINAL_OUTPUT(root);
}
*out = tOut;
return false;
}
FstNode* fstGetNode(Fst *fst, CompiledAddr addr) {
FstNode *fstGetRoot(Fst *fst) {
if (fst->root != NULL) {
return fst->root;
}
fst->root = fstNodeCreate(fst->meta->version, addr, fst->data);
CompiledAddr rAddr = fstGetRootAddr(fst);
fst->root = fstGetNode(fst, rAddr);
return fst->root;
}
FstNode* fstGetNode(Fst *fst, CompiledAddr addr) {
return fstNodeCreate(fst->meta->version, addr, fst->data);
}
FstType fstGetType(Fst *fst) {
@ -1016,15 +1044,254 @@ Output fstEmptyFinalOutput(Fst *fst, bool *null) {
return res;
}
bool fstVerify(Fst *fst) {
uint32_t checkSum = fst->meta->checkSum;
FstSlice *data = fst->data;
TSCKSUM initSum = 0;
if (taosCheckChecksumWhole(data->data, data->dLen)) {
if (!taosCheckChecksumWhole(data->data, data->dLen)) {
return false;
}
return true;
}
// data bound function
FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) {
FstBoundWithData *b = calloc(1, sizeof(FstBoundWithData));
if (b == NULL) { return NULL; }
b->type = type;
b->data = fstSliceCopy(data, data->start, data->end);
return b;
}
bool fstBoundWithDataExceededBy(FstBoundWithData *bound, FstSlice *slice) {
int comp = fstSliceCompare(slice, &bound->data);
if (bound->type == Included) {
return comp > 0 ? true : false;
} else if (bound->type == Excluded) {
return comp >= 0 ? true : false;
} else {
return true;
}
}
bool fstBoundWithDataIsEmpty(FstBoundWithData *bound) {
if (bound->type == Unbounded) {
return true;
} else {
return fstSliceEmpty(&bound->data);
}
}
bool fstBoundWithDataIsIncluded(FstBoundWithData *bound) {
return bound->type == Included ? true : false;
}
void fstBoundDestroy(FstBoundWithData *bound) {
free(bound);
}
StreamWithState *streamWithStateCreate(Fst *fst, Automation *automation, FstBoundWithData *min, FstBoundWithData *max) {
StreamWithState *sws = calloc(1, sizeof(StreamWithState));
if (sws == NULL) { return NULL; }
sws->fst = fst;
sws->aut = automation;
sws->inp = (SArray *)taosArrayInit(256, sizeof(uint8_t));
sws->emptyOutput.null = false;
sws->emptyOutput.out = 0;
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
sws->endAt = max;
streamWithStateSeekMin(sws, min);
return sws;
}
void streamWithStateDestroy(StreamWithState *sws) {
if (sws == NULL) { return; }
taosArrayDestroy(sws->inp);
taosArrayDestroyEx(sws->stack, streamStateDestroy);
free(sws);
}
bool streamWithStateSeekMin(StreamWithState *sws, FstBoundWithData *min) {
if (fstBoundWithDataIsEmpty(min)) {
if (fstBoundWithDataIsIncluded(min)) {
sws->emptyOutput.out = fstEmptyFinalOutput(sws->fst, &(sws->emptyOutput.null));
}
StreamState s = {.node = fstGetRoot(sws->fst),
.trans = 0,
.out = {.null = false, .out = 0},
.autState = sws->aut->start()}; // auto.start callback
taosArrayPush(sws->stack, &s);
return true;
}
FstSlice *key = NULL;
bool inclusize = false;;
if (min->type == Included) {
key = &min->data;
inclusize = true;
} else if (min->type == Excluded) {
key = &min->data;
} else {
return false;
}
FstNode *node = fstGetRoot(sws->fst);
Output out = 0;
void* autState = sws->aut->start();
for (uint32_t i = 0; i < key->dLen; i++) {
uint8_t b = key->data[i];
uint64_t res = 0;
bool null = fstNodeFindInput(node, b, &res);
if (null == false) {
FstTransition trn;
fstNodeGetTransitionAt(node, res, &trn);
void *preState = autState;
autState = sws->aut->accept(preState, b);
taosArrayPush(sws->inp, &b);
StreamState s = {.node = node,
.trans = res + 1,
.out = {.null = false, .out = out},
.autState = preState};
taosArrayPush(sws->stack, &s);
out += trn.out;
node = fstGetNode(sws->fst, trn.addr);
} else {
// This is a little tricky. We're in this case if the
// given bound is not a prefix of any key in the FST.
// Since this is a minimum bound, we need to find the
// first transition in this node that proceeds the current
// input byte.
FstTransitions *trans = fstNodeTransitions(node);
uint64_t i = 0;
for (i = trans->range.start; i < trans->range.end; i++) {
FstTransition trn;
if (fstNodeGetTransitionAt(node, i, &trn) && trn.inp > b) {
break;
}
}
StreamState s = {.node = node,
.trans = i,
.out = {.null = false, .out = out},
.autState = autState};
taosArrayPush(sws->stack, &s);
return true;
}
}
uint32_t sz = taosArrayGetSize(sws->stack);
if (sz != 0) {
StreamState *s = taosArrayGet(sws->stack, sz - 1);
if (inclusize) {
s->trans -= 1;
taosArrayPop(sws->inp);
} else {
FstNode *n = s->node;
uint64_t trans = s->trans;
FstTransition trn;
fstNodeGetTransitionAt(n, trans - 1, &trn);
StreamState s = {.node = fstGetNode(sws->fst, trn.addr),
.trans= 0,
.out = {.null = false, .out = out},
.autState = autState};
taosArrayPush(sws->stack, &s);
return true;
}
return false;
}
}
StreamWithStateResult *streamWithStateNextWith(StreamWithState *sws, StreamCallback callback) {
FstOutput output = sws->emptyOutput;
if (output.null == false) {
FstSlice emptySlice = fstSliceCreate(NULL, 0);
if (fstBoundWithDataExceededBy(sws->endAt, &emptySlice)) {
taosArrayDestroyEx(sws->stack, streamStateDestroy);
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
return NULL;
}
void* start = sws->aut->start();
if (sws->aut->isMatch(start)) {
FstSlice s = fstSliceCreate(NULL, 0);
return swsResultCreate(&s, output, callback(start));
}
}
while (taosArrayGetSize(sws->stack) > 0) {
StreamState *p = (StreamState *)taosArrayPop(sws->stack);
if (p->trans >= FST_NODE_LEN(p->node) || !sws->aut->canMatch(p->autState)) {
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
taosArrayPop(sws->inp);
}
streamStateDestroy(p);
continue;
}
FstTransition trn;
fstNodeGetTransitionAt(p->node, p->trans, &trn);
Output out = p->out.out + trn.out;
void* nextState = sws->aut->accept(p->autState, trn.inp);
void* tState = callback(nextState);
bool isMatch = sws->aut->isMatch(nextState);
FstNode *nextNode = fstGetNode(sws->fst, trn.addr);
taosArrayPush(sws->inp, &(trn.inp));
if (FST_NODE_IS_FINAL(nextNode)) {
void *eofState = sws->aut->acceptEof(nextState);
if (eofState != NULL) {
isMatch = sws->aut->isMatch(eofState);
}
}
StreamState s1 = { .node = p->node, .trans = p->trans + 1, .out = p->out, .autState = p->autState};
taosArrayPush(sws->stack, &s1);
StreamState s2 = {.node = nextNode, .trans = 0, .out = {.null = false, .out = out}, .autState = nextState};
taosArrayPush(sws->stack, &s2);
uint8_t *buf = (uint8_t *)malloc(taosArrayGetSize(sws->inp) * sizeof(uint8_t));
for (uint32_t i = 0; i < taosArrayGetSize(sws->inp); i++) {
uint8_t *t = (uint8_t *)taosArrayGet(sws->inp, i);
buf[i] = *t;
}
FstSlice slice = fstSliceCreate(buf, taosArrayGetSize(sws->inp));
if (fstBoundWithDataExceededBy(sws->endAt, &slice)) {
taosArrayDestroyEx(sws->stack, streamStateDestroy);
sws->stack = (SArray *)taosArrayInit(256, sizeof(StreamState));
return NULL;
}
if (FST_NODE_IS_FINAL(nextNode) && isMatch) {
FstOutput fOutput = {.null = false, out = out + FST_NODE_FINAL_OUTPUT(nextNode)};
return swsResultCreate(&slice, fOutput , tState);
}
}
return NULL;
}
StreamWithStateResult *swsResultCreate(FstSlice *data, FstOutput fOut, void *state) {
StreamWithStateResult *result = calloc(1, sizeof(StreamWithStateResult));
if (result == NULL) { return NULL; }
FstSlice slice = fstSliceCopy(data, 0, data->dLen - 1);
result->data = slice;
result->out = fOut;
result->state = state;
return result;
}
void streamStateDestroy(void *s) {
if (NULL == s) { return; }
StreamState *ss = (StreamState *)s;
fstNodeDestroy(ss->node);
//free(s->autoState);
}

View File

@ -59,26 +59,27 @@ void fstBuilderNodeCloneFrom(FstBuilderNode *dst, FstBuilderNode *src) {
src->trans = NULL;
}
bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
size_t sz = taosArrayGetSize(b->trans);
assert(sz < 256);
if (FST_BUILDER_NODE_IS_FINAL(b)
&& FST_BUILDER_NODE_TRANS_ISEMPTY(b)
&& FST_BUILDER_NODE_FINALOUTPUT_ISZERO(b)) {
return true;
} else if (sz != 1 || b->isFinal) {
// AnyTrans->Compile(w, addr, node);
} else {
FstTransition *tran = taosArrayGet(b->trans, 0);
if (tran->addr == lastAddr && tran->out == 0) {
//OneTransNext::compile(w, lastAddr, tran->inp);
return true;
} else {
//OneTrans::Compile(w, lastAddr, *tran);
return true;
}
}
return true;
}
//bool fstBuilderNodeCompileTo(FstBuilderNode *b, FstCountingWriter *wrt, CompiledAddr lastAddr, CompiledAddr startAddr) {
//size_t sz = taosArrayGetSize(b->trans);
//assert(sz < 256);
//if (FST_BUILDER_NODE_IS_FINAL(b)
// && FST_BUILDER_NODE_TRANS_ISEMPTY(b)
// && FST_BUILDER_NODE_FINALOUTPUT_ISZERO(b)) {
// return true;
//} else if (sz != 1 || b->isFinal) {
// // AnyTrans->Compile(w, addr, node);
//} else {
// FstTransition *tran = taosArrayGet(b->trans, 0);
// if (tran->addr == lastAddr && tran->out == 0) {
// //OneTransNext::compile(w, lastAddr, tran->inp);
// return true;
// } else {
// //OneTrans::Compile(w, lastAddr, *tran);
// return true;
// }
//}
//return true;
//}

View File

@ -95,6 +95,7 @@ FstSlice fstSliceCreate(uint8_t *data, uint64_t dLen) {
FstSlice slice = {.data = data, .dLen = dLen, .start = 0, .end = dLen - 1};
return slice;
}
// just shallow copy
FstSlice fstSliceCopy(FstSlice *slice, int32_t start, int32_t end) {
FstSlice t;
if (start >= slice->dLen || end >= slice->dLen || start > end) {

View File

@ -20,16 +20,42 @@
#include "tfile.h"
#include "walInt.h"
int walSetCurVerImpl(SWal *pWal, int64_t ver) {
int walSeekVerImpl(SWal *pWal, int64_t ver) {
//close old file
//iterate all files
//open right file
int code = 0;
code = tfClose(pWal->curLogTfd);
if(code != 0) {
//TODO
}
code = tfClose(pWal->curIdxTfd);
if(code != 0) {
//TODO
}
//bsearch in fileSet
int fName = 0;//TODO
//open the right file
char fNameStr[WAL_FILE_LEN];
sprintf(fNameStr, "%d."WAL_INDEX_SUFFIX, fName);
bool closed = 1; //TODO:read only
int64_t idxTfd = tfOpenReadWrite(fNameStr);
sprintf(fNameStr, "%d."WAL_LOG_SUFFIX, fName);
int64_t logTfd = tfOpenReadWrite(fNameStr);
//seek position
int64_t offset = (ver - fName) * WAL_IDX_ENTRY_SIZE;
tfLseek(idxTfd, offset, SEEK_SET);
//set cur version, cur file version and cur status
return 0;
pWal->curFileFirstVersion = fName;
pWal->curFileLastVersion = 1;//TODO
pWal->curLogTfd = logTfd;
pWal->curIdxTfd = idxTfd;
pWal->curVersion = ver;
pWal->curOffset = offset;
pWal->curStatus = 0;//TODO
return code;
}
int walSetCurVer(SWal *pWal, int64_t ver) {
if(ver > pWal->lastVersion + 1) {
int walSeekVer(SWal *pWal, int64_t ver) {
if(ver > pWal->lastVersion) {
//TODO: some records are skipped
return -1;
}
@ -40,13 +66,19 @@ int walSetCurVer(SWal *pWal, int64_t ver) {
if(ver < pWal->snapshotVersion) {
//TODO: seek snapshotted log
}
if(ver >= pWal->curFileFirstVersion
&& ((pWal->curFileLastVersion == -1 && ver <= pWal->lastVersion) || (ver <= pWal->curFileLastVersion))) {
}
if(ver < pWal->curFileFirstVersion || (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) {
int index = 0;
index = 1;
//back up to avoid inconsistency
int64_t curVersion = pWal->curVersion;
int64_t curOffset = pWal->curOffset;
int64_t curFileFirstVersion = pWal->curFileFirstVersion;
int64_t curFileLastVersion = pWal->curFileLastVersion;
if(walSetCurVerImpl(pWal, ver) < 0) {
if(walSeekVerImpl(pWal, ver) < 0) {
//TODO: errno
pWal->curVersion = curVersion;
pWal->curOffset = curOffset;
@ -67,7 +99,7 @@ int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
}
if(pWal->curVersion != ver) {
if(walSetCurVer(pWal, ver) != 0) {
if(walSeekVer(pWal, ver) != 0) {
//TODO: some records are skipped
return -1;
}

View File

@ -18,8 +18,17 @@
#include "taoserror.h"
#include "tref.h"
#include "tfile.h"
#include "compare.h"
#include "walInt.h"
//internal
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId);
int32_t walGetOldFile(SWal *pWal, int64_t curFileId, int32_t minDiff, int64_t *oldFileId);
int32_t walGetNewFile(SWal *pWal, int64_t *newFileId);
static pthread_mutex_t walInitLock = PTHREAD_MUTEX_INITIALIZER;
static int8_t walInited = 0;
typedef struct {
int32_t refSetId;
int32_t seq;
@ -35,11 +44,21 @@ static int32_t walInitObj(SWal *pWal);
static void walFreeObj(void *pWal);
int32_t walInit() {
//TODO: change to atomic
pthread_mutex_lock(&walInitLock);
if(walInited) {
pthread_mutex_unlock(&walInitLock);
return 0;
} else {
walInited = 1;
pthread_mutex_unlock(&walInitLock);
}
int32_t code = 0;
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
code = pthread_mutex_init(&tsWal.mutex, NULL);
if (code) {
if (code != 0) {
wError("failed to init wal mutex since %s", tstrerror(code));
return code;
}
@ -61,6 +80,27 @@ void walCleanUp() {
wInfo("wal module is cleaned up");
}
static int walLoadFileset(SWal *pWal) {
DIR *dir = opendir(pWal->path);
if (dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno));
return -1;
}
struct dirent* ent;
while ((ent = readdir(dir)) != NULL) {
char *name = ent->d_name;
name[WAL_NOSUFFIX_LEN] = 0;
//validate file name by regex matching
if(1 /* regex match */) {
int64_t fnameInt64 = atoll(name);
taosArrayPush(pWal->fileSet, &fnameInt64);
}
}
taosArraySort(pWal->fileSet, compareInt64Val);
return 0;
}
SWal *walOpen(const char *path, SWalCfg *pCfg) {
SWal *pWal = malloc(sizeof(SWal));
if (pWal == NULL) {
@ -70,9 +110,13 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->vgId = pCfg->vgId;
pWal->curLogTfd = -1;
/*pWal->curFileId = -1;*/
pWal->curIdxTfd = -1;
pWal->level = pCfg->walLevel;
pWal->fsyncPeriod = pCfg->fsyncPeriod;
memset(&pWal->head, 0, sizeof(SWalHead));
pWal->head.sver = 0;
tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL);
@ -129,6 +173,11 @@ static int32_t walInitObj(SWal *pWal) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
pWal->fileSet = taosArrayInit(0, sizeof(int64_t));
if(pWal->fileSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
wDebug("vgId:%d, object is initialized", pWal->vgId);
return 0;

View File

@ -14,18 +14,13 @@
*/
#include "wal.h"
#include "tchecksum.h"
int32_t walCommit(SWal *pWal, int64_t ver) {
return 0;
static int walValidateChecksum(SWalHead *pHead, void* body, int64_t bodyLen) {
return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead) &&
taosCheckChecksum(body, bodyLen, pHead->cksumBody);
}
int32_t walRollback(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
return 0;
@ -36,13 +31,16 @@ int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t r
}
int64_t walGetFirstVer(SWal *pWal) {
return 0;
if (pWal == NULL) return 0;
return pWal->firstVersion;
}
int64_t walGetSnapshotVer(SWal *pWal) {
return 0;
int64_t walGetSnaphostVer(SWal *pWal) {
if (pWal == NULL) return 0;
return pWal->snapshotVersion;
}
int64_t walGetLastVer(SWal *pWal) {
return 0;
if (pWal == NULL) return 0;
return pWal->lastVersion;
}

View File

@ -21,6 +21,18 @@
#include "tfile.h"
#include "walInt.h"
int32_t walCommit(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walRollback(SWal *pWal, int64_t ver) {
return 0;
}
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
return 0;
}
#if 0
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
@ -112,60 +124,40 @@ void walRemoveAllOldFiles(void *handle) {
}
#endif
static void walUpdateChecksum(SWalHead *pHead) {
pHead->sver = 2;
pHead->cksum = taosCalcChecksum(0, (uint8_t *)pHead, sizeof(SWalHead) + pHead->len);
}
static int walValidateChecksum(SWalHead *pHead) {
if (pHead->sver == 0) { // for compatible with wal before sver 1
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
} else if (pHead->sver >= 1) {
uint32_t cksum = pHead->cksum;
pHead->cksum = 0;
return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
}
return 0;
}
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) {
if (pWal == NULL) return -1;
SWalHead *pHead = malloc(sizeof(SWalHead) + bodyLen);
if(pHead == NULL) {
return -1;
}
pHead->version = index;
int32_t code = 0;
// no wal
if (!tfValid(pWal->curLogTfd)) return 0;
if (pWal->level == TAOS_WAL_NOLOG) return 0;
if (pHead->version <= pWal->curVersion) return 0;
if (index > pWal->lastVersion + 1) return -1;
pHead->signature = WAL_SIGNATURE;
pHead->len = bodyLen;
memcpy(pHead->cont, body, bodyLen);
pWal->head.version = index;
int32_t code = 0;
walUpdateChecksum(pHead);
pWal->head.signature = WAL_SIGNATURE;
pWal->head.len = bodyLen;
pWal->head.msgType = msgType;
int32_t contLen = pHead->len + sizeof(SWalHead);
pWal->head.cksumHead = taosCalcChecksum(0, (const uint8_t*)&pWal->head, sizeof(SWalHead)- sizeof(uint32_t)*2);
pWal->head.cksumBody = taosCalcChecksum(0, (const uint8_t*)&body, bodyLen);
pthread_mutex_lock(&pWal->mutex);
if (tfWrite(pWal->curLogTfd, pHead, contLen) != contLen) {
if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
} else {
/*wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId,*/
/*pWal->curFileId, pWal->logTfd, pHead->version, pWal->curVersion, pHead->len);*/
pWal->curVersion = pHead->version;
}
pthread_mutex_unlock(&pWal->mutex);
if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno));
}
//TODO:write idx
ASSERT(contLen == pHead->len + sizeof(SWalHead));
pthread_mutex_unlock(&pWal->mutex);
return code;
}
@ -254,6 +246,7 @@ static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) {
tfFsync(tfd);
}
#if 0
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset;
while (1) {
@ -387,21 +380,4 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
wDebug("vgId:%d, file:%s, it is closed after restore", pWal->vgId, name);
return code;
}
uint64_t walGetVersion(SWal *pWal) {
if (pWal == NULL) return 0;
return pWal->curVersion;
}
// Wal version in slave (dnode1) must be reset.
// Because after the data file is recovered from peer (dnode2), the new file version in dnode1 may become smaller than origin.
// Some new wal record cannot be written to the wal file in dnode1 for wal version not reset, then fversion and the record in wal file may inconsistent,
// At this time, if dnode2 down, dnode1 switched to master. After dnode2 start and restore data from dnode1, data loss will occur
void walResetVersion(SWal *pWal, uint64_t newVer) {
if (pWal == NULL) return;
wInfo("vgId:%d, version reset from %" PRIu64 " to %" PRIu64, pWal->vgId, pWal->curVersion, newVer);
pWal->curVersion = newVer;
}
#endif