diff --git a/src/inc/dnode.h b/src/inc/dnode.h index fa5e3a3b3d..db39906c68 100644 --- a/src/inc/dnode.h +++ b/src/inc/dnode.h @@ -42,7 +42,7 @@ void *dnodeAllocateWqueue(void *pVnode); void dnodeFreeWqueue(void *queue); void *dnodeAllocateRqueue(void *pVnode); void dnodeFreeRqueue(void *rqueue); -void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code); +void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code); #ifdef __cplusplus } diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 555e0503da..ea20ceec79 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -66,11 +66,11 @@ typedef struct { // if name is null, get the file from index or after, used by master // if name is provided, get the named file at the specified index, used by unsynced node // it returns the file magic number and size, if file not there, magic shall be 0. - uint32_t (*getFileInfo)(char *name, int *index, int *size); + uint32_t (*getFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size); // get the wal file from index or after // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file - int (*getWalInfo)(char *name, int *index); + int (*getWalInfo)(void *ahandle, char *name, uint32_t *index); // when a forward pkt is received, call this to handle data int (*writeToCache)(void *ahandle, void *pHead, int type); @@ -94,11 +94,13 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole *); extern char *syncRole[]; +//global configurable parameters extern int tsMaxSyncNum; extern int tsSyncTcpThreads; extern int tsMaxWatchFiles; extern short tsSyncPort; extern int tsMaxFwdInfo; +extern int sDebugFlag; #ifdef __cplusplus } diff --git a/src/inc/twal.h b/src/inc/twal.h index 3648f5ae29..53d4f835b0 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -40,12 +40,12 @@ typedef struct { typedef void* twal_h; // WAL HANDLE -twal_h walOpen(char *path, int max, int level); +twal_h walOpen(char *path, SWalCfg *pCfg); void walClose(twal_h); int walRenew(twal_h); int walWrite(twal_h, SWalHead *); void walFsync(twal_h); -int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, SWalHead *pHead, int type)); +int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pHead, int type)); int walGetWalFile(twal_h, char *name, uint32_t *index); extern int wDebugFlag; diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/main/inc/vnodeInt.h index 4d078869c4..75d3117eac 100644 --- a/src/vnode/main/inc/vnodeInt.h +++ b/src/vnode/main/inc/vnodeInt.h @@ -35,7 +35,7 @@ typedef struct { int32_t vgId; // global vnode group ID int32_t refCount; // reference count EVnStatus status; - int role; + int8_t role; int64_t version; void *wqueue; void *rqueue; @@ -49,7 +49,7 @@ typedef struct { SWalCfg walCfg; } SVnodeObj; -int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); +int vnodeWriteToQueue(void *param, void *pHead, int type); void vnodeInitWriteFp(void); void vnodeInitReadFp(void); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 1211828a47..a3c60c4387 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -29,13 +29,18 @@ #include "vnode.h" #include "vnodeInt.h" -static void *tsDnodeVnodesHash; -static void vnodeCleanUp(SVnodeObj *pVnode); -static void vnodeBuildVloadMsg(char *pNode, void * param); -static int vnodeWALCallback(void *arg); -static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); -static int32_t vnodeReadCfg(SVnodeObj *pVnode); +static void *tsDnodeVnodesHash; +static void vnodeCleanUp(SVnodeObj *pVnode); +static void vnodeBuildVloadMsg(char *pNode, void * param); +static int vnodeWalCallback(void *arg); +static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); +static int32_t vnodeReadCfg(SVnodeObj *pVnode); +static int vnodeWalCallback(void *arg); +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); +static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); +static void vnodeNotifyRole(void *ahandle, int8_t role); +// module global static int tsOpennedVnodes; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; @@ -140,14 +145,27 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->rqueue = dnodeAllocateRqueue(pVnode); sprintf(temp, "%s/wal", rootDir); - pVnode->wal = walOpen(temp, pVnode->walCfg.wals, pVnode->walCfg.commitLog); - pVnode->sync = NULL; + pVnode->wal = walOpen(temp, &pVnode->walCfg); + + SSyncInfo syncInfo; + syncInfo.vgId = pVnode->vgId; + syncInfo.vgId = pVnode->version; + syncInfo.syncCfg = pVnode->syncCfg; + sprintf(syncInfo.path, "%s/tsdb/", rootDir); + syncInfo.ahandle = pVnode; + syncInfo.getWalInfo = vnodeGetWalInfo; + syncInfo.getFileInfo = vnodeGetFileInfo; + syncInfo.writeToCache = vnodeWriteToQueue; + syncInfo.confirmForward = dnodeSendRpcWriteRsp; + syncInfo.notifyRole = vnodeNotifyRole; + // pVnode->sync = syncStart(&syncInfo);; + pVnode->events = NULL; pVnode->cq = NULL; STsdbAppH appH = {0}; appH.appH = (void *)pVnode; - appH.walCallBack = vnodeWALCallback; + appH.walCallBack = vnodeWalCallback; sprintf(temp, "%s/tsdb", rootDir); void *pTsdb = tsdbOpenRepo(temp, &appH); @@ -281,11 +299,27 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { } // TODO: this is a simple implement -static int vnodeWALCallback(void *arg) { +static int vnodeWalCallback(void *arg) { SVnodeObj *pVnode = arg; return walRenew(pVnode->wal); } +static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) { + // SVnodeObj *pVnode = ahandle; + //tsdbGetFileInfo(pVnode->tsdb, name, index, size); + return 0; +} + +static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) { + SVnodeObj *pVnode = ahandle; + return walGetWalFile(pVnode->wal, name, index); +} + +static void vnodeNotifyRole(void *ahandle, int8_t role) { + SVnodeObj *pVnode = ahandle; + pVnode->role = role; +} + static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId); diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/main/src/vnodeWrite.c index c6699bd62c..fc73850f40 100644 --- a/src/vnode/main/src/vnodeWrite.c +++ b/src/vnode/main/src/vnodeWrite.c @@ -248,8 +248,9 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet return code; } -int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) { +int vnodeWriteToQueue(void *param, void *data, int type) { SVnodeObj *pVnode = param; + SWalHead *pHead = data; int size = sizeof(SWalHead) + pHead->len; SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index 99de5cceda..7c5602680f 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -49,18 +49,18 @@ int wDebugFlag = 135; static uint32_t walSignature = 0xFAFBFDFE; static int walHandleExistingFiles(char *path); -static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)); +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)); static int walRemoveWalFiles(char *path); -void *walOpen(char *path, int max, int level) { +void *walOpen(char *path, SWalCfg *pCfg) { SWal *pWal = calloc(sizeof(SWal), 1); if (pWal == NULL) return NULL; pWal->fd = -1; - pWal->max = max; + pWal->max = pCfg->wals; pWal->id = 0; pWal->num = 0; - pWal->level = level; + pWal->level = pCfg->commitLog; strcpy(pWal->path, path); pthread_mutex_init(&pWal->mutex, NULL); @@ -169,7 +169,7 @@ void walFsync(void *handle) { fsync(pWal->fd); } -int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) { +int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) { SWal *pWal = (SWal *)handle; int code = 0; struct dirent *ent; @@ -246,7 +246,7 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) { return code; } -static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) { +static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)) { int code = 0; char *buffer = malloc(1024000); // size for one record diff --git a/src/vnode/wal/test/waltest.c b/src/vnode/wal/test/waltest.c index 37d1d8e84c..8e10bc11e5 100644 --- a/src/vnode/wal/test/waltest.c +++ b/src/vnode/wal/test/waltest.c @@ -21,8 +21,10 @@ int64_t ver = 0; void *pWal = NULL; -int writeToQueue(void *pVnode, SWalHead *pHead, int type) { +int writeToQueue(void *pVnode, void *data, int type) { // do nothing + SWalHead *pHead = data; + if (pHead->version > ver) ver = pHead->version; @@ -74,7 +76,11 @@ int main(int argc, char *argv[]) { taosInitLog("wal.log", 100000, 10); - pWal = walOpen(path, max, level); + SWalCfg walCfg; + walCfg.commitLog = level; + walCfg.wals = max; + + pWal = walOpen(path, &walCfg); if (pWal == NULL) { printf("failed to open wal\n"); exit(-1);