integrate WAL module
This commit is contained in:
parent
4f6cb328b2
commit
dc3c33a820
|
@ -42,7 +42,7 @@ void *dnodeAllocateWqueue(void *pVnode);
|
|||
void dnodeFreeWqueue(void *queue);
|
||||
void *dnodeAllocateRqueue(void *pVnode);
|
||||
void dnodeFreeRqueue(void *rqueue);
|
||||
void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code);
|
||||
void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -66,11 +66,11 @@ typedef struct {
|
|||
// if name is null, get the file from index or after, used by master
|
||||
// if name is provided, get the named file at the specified index, used by unsynced node
|
||||
// it returns the file magic number and size, if file not there, magic shall be 0.
|
||||
uint32_t (*getFileInfo)(char *name, int *index, int *size);
|
||||
uint32_t (*getFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size);
|
||||
|
||||
// get the wal file from index or after
|
||||
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
|
||||
int (*getWalInfo)(char *name, int *index);
|
||||
int (*getWalInfo)(void *ahandle, char *name, uint32_t *index);
|
||||
|
||||
// when a forward pkt is received, call this to handle data
|
||||
int (*writeToCache)(void *ahandle, void *pHead, int type);
|
||||
|
@ -94,11 +94,13 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole *);
|
|||
|
||||
extern char *syncRole[];
|
||||
|
||||
//global configurable parameters
|
||||
extern int tsMaxSyncNum;
|
||||
extern int tsSyncTcpThreads;
|
||||
extern int tsMaxWatchFiles;
|
||||
extern short tsSyncPort;
|
||||
extern int tsMaxFwdInfo;
|
||||
extern int sDebugFlag;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -40,12 +40,12 @@ typedef struct {
|
|||
|
||||
typedef void* twal_h; // WAL HANDLE
|
||||
|
||||
twal_h walOpen(char *path, int max, int level);
|
||||
twal_h walOpen(char *path, SWalCfg *pCfg);
|
||||
void walClose(twal_h);
|
||||
int walRenew(twal_h);
|
||||
int walWrite(twal_h, SWalHead *);
|
||||
void walFsync(twal_h);
|
||||
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, SWalHead *pHead, int type));
|
||||
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pHead, int type));
|
||||
int walGetWalFile(twal_h, char *name, uint32_t *index);
|
||||
|
||||
extern int wDebugFlag;
|
||||
|
|
|
@ -35,7 +35,7 @@ typedef struct {
|
|||
int32_t vgId; // global vnode group ID
|
||||
int32_t refCount; // reference count
|
||||
EVnStatus status;
|
||||
int role;
|
||||
int8_t role;
|
||||
int64_t version;
|
||||
void *wqueue;
|
||||
void *rqueue;
|
||||
|
@ -49,7 +49,7 @@ typedef struct {
|
|||
SWalCfg walCfg;
|
||||
} SVnodeObj;
|
||||
|
||||
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
|
||||
int vnodeWriteToQueue(void *param, void *pHead, int type);
|
||||
void vnodeInitWriteFp(void);
|
||||
void vnodeInitReadFp(void);
|
||||
|
||||
|
|
|
@ -29,13 +29,18 @@
|
|||
#include "vnode.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
static void *tsDnodeVnodesHash;
|
||||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||
static void vnodeBuildVloadMsg(char *pNode, void * param);
|
||||
static int vnodeWALCallback(void *arg);
|
||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
|
||||
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
|
||||
static void *tsDnodeVnodesHash;
|
||||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||
static void vnodeBuildVloadMsg(char *pNode, void * param);
|
||||
static int vnodeWalCallback(void *arg);
|
||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
|
||||
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
|
||||
static int vnodeWalCallback(void *arg);
|
||||
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size);
|
||||
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
|
||||
static void vnodeNotifyRole(void *ahandle, int8_t role);
|
||||
|
||||
// module global
|
||||
static int tsOpennedVnodes;
|
||||
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||
|
||||
|
@ -140,14 +145,27 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
||||
|
||||
sprintf(temp, "%s/wal", rootDir);
|
||||
pVnode->wal = walOpen(temp, pVnode->walCfg.wals, pVnode->walCfg.commitLog);
|
||||
pVnode->sync = NULL;
|
||||
pVnode->wal = walOpen(temp, &pVnode->walCfg);
|
||||
|
||||
SSyncInfo syncInfo;
|
||||
syncInfo.vgId = pVnode->vgId;
|
||||
syncInfo.vgId = pVnode->version;
|
||||
syncInfo.syncCfg = pVnode->syncCfg;
|
||||
sprintf(syncInfo.path, "%s/tsdb/", rootDir);
|
||||
syncInfo.ahandle = pVnode;
|
||||
syncInfo.getWalInfo = vnodeGetWalInfo;
|
||||
syncInfo.getFileInfo = vnodeGetFileInfo;
|
||||
syncInfo.writeToCache = vnodeWriteToQueue;
|
||||
syncInfo.confirmForward = dnodeSendRpcWriteRsp;
|
||||
syncInfo.notifyRole = vnodeNotifyRole;
|
||||
// pVnode->sync = syncStart(&syncInfo);;
|
||||
|
||||
pVnode->events = NULL;
|
||||
pVnode->cq = NULL;
|
||||
|
||||
STsdbAppH appH = {0};
|
||||
appH.appH = (void *)pVnode;
|
||||
appH.walCallBack = vnodeWALCallback;
|
||||
appH.walCallBack = vnodeWalCallback;
|
||||
|
||||
sprintf(temp, "%s/tsdb", rootDir);
|
||||
void *pTsdb = tsdbOpenRepo(temp, &appH);
|
||||
|
@ -281,11 +299,27 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
|||
}
|
||||
|
||||
// TODO: this is a simple implement
|
||||
static int vnodeWALCallback(void *arg) {
|
||||
static int vnodeWalCallback(void *arg) {
|
||||
SVnodeObj *pVnode = arg;
|
||||
return walRenew(pVnode->wal);
|
||||
}
|
||||
|
||||
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) {
|
||||
// SVnodeObj *pVnode = ahandle;
|
||||
//tsdbGetFileInfo(pVnode->tsdb, name, index, size);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
|
||||
SVnodeObj *pVnode = ahandle;
|
||||
return walGetWalFile(pVnode->wal, name, index);
|
||||
}
|
||||
|
||||
static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
||||
SVnodeObj *pVnode = ahandle;
|
||||
pVnode->role = role;
|
||||
}
|
||||
|
||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
|
||||
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||
|
|
|
@ -248,8 +248,9 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
|||
return code;
|
||||
}
|
||||
|
||||
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) {
|
||||
int vnodeWriteToQueue(void *param, void *data, int type) {
|
||||
SVnodeObj *pVnode = param;
|
||||
SWalHead *pHead = data;
|
||||
|
||||
int size = sizeof(SWalHead) + pHead->len;
|
||||
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
|
||||
|
|
|
@ -49,18 +49,18 @@ int wDebugFlag = 135;
|
|||
|
||||
static uint32_t walSignature = 0xFAFBFDFE;
|
||||
static int walHandleExistingFiles(char *path);
|
||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int));
|
||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int));
|
||||
static int walRemoveWalFiles(char *path);
|
||||
|
||||
void *walOpen(char *path, int max, int level) {
|
||||
void *walOpen(char *path, SWalCfg *pCfg) {
|
||||
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||
if (pWal == NULL) return NULL;
|
||||
|
||||
pWal->fd = -1;
|
||||
pWal->max = max;
|
||||
pWal->max = pCfg->wals;
|
||||
pWal->id = 0;
|
||||
pWal->num = 0;
|
||||
pWal->level = level;
|
||||
pWal->level = pCfg->commitLog;
|
||||
strcpy(pWal->path, path);
|
||||
pthread_mutex_init(&pWal->mutex, NULL);
|
||||
|
||||
|
@ -169,7 +169,7 @@ void walFsync(void *handle) {
|
|||
fsync(pWal->fd);
|
||||
}
|
||||
|
||||
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) {
|
||||
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
|
||||
SWal *pWal = (SWal *)handle;
|
||||
int code = 0;
|
||||
struct dirent *ent;
|
||||
|
@ -246,7 +246,7 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) {
|
||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)) {
|
||||
int code = 0;
|
||||
|
||||
char *buffer = malloc(1024000); // size for one record
|
||||
|
|
|
@ -21,8 +21,10 @@
|
|||
int64_t ver = 0;
|
||||
void *pWal = NULL;
|
||||
|
||||
int writeToQueue(void *pVnode, SWalHead *pHead, int type) {
|
||||
int writeToQueue(void *pVnode, void *data, int type) {
|
||||
// do nothing
|
||||
SWalHead *pHead = data;
|
||||
|
||||
if (pHead->version > ver)
|
||||
ver = pHead->version;
|
||||
|
||||
|
@ -74,7 +76,11 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
taosInitLog("wal.log", 100000, 10);
|
||||
|
||||
pWal = walOpen(path, max, level);
|
||||
SWalCfg walCfg;
|
||||
walCfg.commitLog = level;
|
||||
walCfg.wals = max;
|
||||
|
||||
pWal = walOpen(path, &walCfg);
|
||||
if (pWal == NULL) {
|
||||
printf("failed to open wal\n");
|
||||
exit(-1);
|
||||
|
|
Loading…
Reference in New Issue