commit
63ec213013
|
@ -76,16 +76,16 @@ typedef struct {
|
||||||
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||||
} SRpcInit;
|
} SRpcInit;
|
||||||
|
|
||||||
void *rpcOpen(SRpcInit *pRpc);
|
void *rpcOpen(const SRpcInit *pRpc);
|
||||||
void rpcClose(void *);
|
void rpcClose(void *);
|
||||||
void *rpcMallocCont(int contLen);
|
void *rpcMallocCont(int contLen);
|
||||||
void rpcFreeCont(void *pCont);
|
void rpcFreeCont(void *pCont);
|
||||||
void *rpcReallocCont(void *ptr, int contLen);
|
void *rpcReallocCont(void *ptr, int contLen);
|
||||||
void rpcSendRequest(void *thandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg);
|
void rpcSendRequest(void *thandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg);
|
||||||
void rpcSendResponse(SRpcMsg *pMsg);
|
void rpcSendResponse(const SRpcMsg *pMsg);
|
||||||
void rpcSendRedirectRsp(void *pConn, SRpcIpSet *pIpSet);
|
void rpcSendRedirectRsp(void *pConn, const SRpcIpSet *pIpSet);
|
||||||
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
|
||||||
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pOut, SRpcMsg *pRsp);
|
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pReq, SRpcMsg *pRsp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,24 @@ typedef struct {
|
||||||
int role[TAOS_SYNC_MAX_REPLICA];
|
int role[TAOS_SYNC_MAX_REPLICA];
|
||||||
} SNodesRole;
|
} SNodesRole;
|
||||||
|
|
||||||
|
// if name is null, get the file from index or after, used by master
|
||||||
|
// if name is provided, get the named file at the specified index, used by unsynced node
|
||||||
|
// it returns the file magic number and size, if file not there, magic shall be 0.
|
||||||
|
typedef uint32_t (*FGetFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size);
|
||||||
|
|
||||||
|
// get the wal file from index or after
|
||||||
|
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
|
||||||
|
typedef int (*FGetWalInfo)(void *ahandle, char *name, uint32_t *index);
|
||||||
|
|
||||||
|
// when a forward pkt is received, call this to handle data
|
||||||
|
typedef int (*FWriteToCache)(void *ahandle, void *pHead, int type);
|
||||||
|
|
||||||
|
// when forward is confirmed by peer, master call this API to notify app
|
||||||
|
typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
|
||||||
|
|
||||||
|
// when role is changed, call this to notify app
|
||||||
|
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId; // vgroup ID
|
int32_t vgId; // vgroup ID
|
||||||
uint64_t version; // initial version
|
uint64_t version; // initial version
|
||||||
|
@ -62,31 +80,19 @@ typedef struct {
|
||||||
char path[128]; // path to the file
|
char path[128]; // path to the file
|
||||||
|
|
||||||
void *ahandle; // handle provided by APP
|
void *ahandle; // handle provided by APP
|
||||||
|
FGetFileInfo getFileInfo;
|
||||||
|
FGetWalInfo getWalInfo;
|
||||||
|
FWriteToCache writeToCache;
|
||||||
|
FConfirmForward confirmForward;
|
||||||
|
FNotifyRole notifyRole;
|
||||||
|
|
||||||
// if name is null, get the file from index or after, used by master
|
|
||||||
// if name is provided, get the named file at the specified index, used by unsynced node
|
|
||||||
// it returns the file magic number and size, if file not there, magic shall be 0.
|
|
||||||
uint32_t (*getFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size);
|
|
||||||
|
|
||||||
// get the wal file from index or after
|
|
||||||
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
|
|
||||||
int (*getWalInfo)(void *ahandle, char *name, uint32_t *index);
|
|
||||||
|
|
||||||
// when a forward pkt is received, call this to handle data
|
|
||||||
int (*writeToCache)(void *ahandle, void *pHead, int type);
|
|
||||||
|
|
||||||
// when forward is confirmed by peer, master call this API to notify app
|
|
||||||
void (*confirmForward)(void *ahandle, void *mhandle, int32_t code);
|
|
||||||
|
|
||||||
// when role is changed, call this to notify app
|
|
||||||
void (*notifyRole)(void *ahandle, int8_t role);
|
|
||||||
} SSyncInfo;
|
} SSyncInfo;
|
||||||
|
|
||||||
typedef void* tsync_h;
|
typedef void* tsync_h;
|
||||||
|
|
||||||
tsync_h syncStart(SSyncInfo *);
|
tsync_h syncStart(const SSyncInfo *);
|
||||||
void syncStop(tsync_h shandle);
|
void syncStop(tsync_h shandle);
|
||||||
int syncReconfig(tsync_h shandle, SSyncCfg *);
|
int syncReconfig(tsync_h shandle, const SSyncCfg *);
|
||||||
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle);
|
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle);
|
||||||
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
|
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
|
||||||
void syncRecover(tsync_h shandle); // recover from other nodes:
|
void syncRecover(tsync_h shandle); // recover from other nodes:
|
||||||
|
|
|
@ -38,15 +38,16 @@ typedef struct {
|
||||||
int8_t wals; // number of WAL files;
|
int8_t wals; // number of WAL files;
|
||||||
} SWalCfg;
|
} SWalCfg;
|
||||||
|
|
||||||
typedef void* twal_h; // WAL HANDLE
|
typedef void* twalh; // WAL HANDLE
|
||||||
|
typedef int (*FWalWrite)(void *ahandle, void *pHead, int type);
|
||||||
|
|
||||||
twal_h walOpen(char *path, SWalCfg *pCfg);
|
twalh walOpen(const char *path, const SWalCfg *pCfg);
|
||||||
void walClose(twal_h);
|
void walClose(twalh);
|
||||||
int walRenew(twal_h);
|
int walRenew(twalh);
|
||||||
int walWrite(twal_h, SWalHead *);
|
int walWrite(twalh, SWalHead *);
|
||||||
void walFsync(twal_h);
|
void walFsync(twalh);
|
||||||
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pHead, int type));
|
int walRestore(twalh, void *pVnode, FWalWrite writeFp);
|
||||||
int walGetWalFile(twal_h, char *name, uint32_t *index);
|
int walGetWalFile(twalh, char *name, uint32_t *index);
|
||||||
|
|
||||||
extern int wDebugFlag;
|
extern int wDebugFlag;
|
||||||
|
|
||||||
|
|
|
@ -202,7 +202,7 @@ static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen);
|
||||||
static void rpcLockConn(SRpcConn *pConn);
|
static void rpcLockConn(SRpcConn *pConn);
|
||||||
static void rpcUnlockConn(SRpcConn *pConn);
|
static void rpcUnlockConn(SRpcConn *pConn);
|
||||||
|
|
||||||
void *rpcOpen(SRpcInit *pInit) {
|
void *rpcOpen(const SRpcInit *pInit) {
|
||||||
SRpcInfo *pRpc;
|
SRpcInfo *pRpc;
|
||||||
|
|
||||||
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
|
tsRpcMaxRetry = tsRpcMaxTime * 1000 / tsRpcProgressTime;
|
||||||
|
@ -344,22 +344,22 @@ void *rpcReallocCont(void *ptr, int contLen) {
|
||||||
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
|
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
|
void rpcSendRequest(void *shandle, const SRpcIpSet *pIpSet, const SRpcMsg *pMsg) {
|
||||||
SRpcInfo *pRpc = (SRpcInfo *)shandle;
|
SRpcInfo *pRpc = (SRpcInfo *)shandle;
|
||||||
SRpcReqContext *pContext;
|
SRpcReqContext *pContext;
|
||||||
|
|
||||||
pMsg->contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
||||||
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
||||||
pContext->ahandle = pMsg->handle;
|
pContext->ahandle = pMsg->handle;
|
||||||
pContext->pRpc = (SRpcInfo *)shandle;
|
pContext->pRpc = (SRpcInfo *)shandle;
|
||||||
pContext->ipSet = *pIpSet;
|
pContext->ipSet = *pIpSet;
|
||||||
pContext->contLen = pMsg->contLen;
|
pContext->contLen = contLen;
|
||||||
pContext->pCont = pMsg->pCont;
|
pContext->pCont = pMsg->pCont;
|
||||||
pContext->msgType = pMsg->msgType;
|
pContext->msgType = pMsg->msgType;
|
||||||
pContext->oldInUse = pIpSet->inUse;
|
pContext->oldInUse = pIpSet->inUse;
|
||||||
|
|
||||||
pContext->connType = RPC_CONN_UDPC;
|
pContext->connType = RPC_CONN_UDPC;
|
||||||
if (pMsg->contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
|
if (contLen > tsRpcMaxUdpSize) pContext->connType = RPC_CONN_TCPC;
|
||||||
|
|
||||||
// connection type is application specific.
|
// connection type is application specific.
|
||||||
// for TDengine, all the query, show commands shall have TCP connection
|
// for TDengine, all the query, show commands shall have TCP connection
|
||||||
|
@ -374,11 +374,14 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendResponse(SRpcMsg *pMsg) {
|
void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
int msgLen = 0;
|
int msgLen = 0;
|
||||||
SRpcConn *pConn = (SRpcConn *)pMsg->handle;
|
SRpcConn *pConn = (SRpcConn *)pRsp->handle;
|
||||||
SRpcInfo *pRpc = pConn->pRpc;
|
SRpcInfo *pRpc = pConn->pRpc;
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = *pRsp;
|
||||||
|
SRpcMsg *pMsg = &rpcMsg;
|
||||||
|
|
||||||
if ( pMsg->pCont == NULL ) {
|
if ( pMsg->pCont == NULL ) {
|
||||||
pMsg->pCont = rpcMallocCont(0);
|
pMsg->pCont = rpcMallocCont(0);
|
||||||
pMsg->contLen = 0;
|
pMsg->contLen = 0;
|
||||||
|
@ -429,7 +432,7 @@ void rpcSendResponse(SRpcMsg *pMsg) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendRedirectRsp(void *thandle, SRpcIpSet *pIpSet) {
|
void rpcSendRedirectRsp(void *thandle, const SRpcIpSet *pIpSet) {
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
|
||||||
rpcMsg.contLen = sizeof(SRpcIpSet);
|
rpcMsg.contLen = sizeof(SRpcIpSet);
|
||||||
|
@ -458,7 +461,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
SRpcReqContext *pContext;
|
SRpcReqContext *pContext;
|
||||||
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
pContext = (SRpcReqContext *) (pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role);
|
||||||
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
#ifndef _VPEER
|
#ifndef _VPEER
|
||||||
tsync_h syncStart(SSyncInfo *info) { return NULL; }
|
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
|
||||||
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle) { return 0; }
|
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle) { return 0; }
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -48,11 +48,11 @@ typedef struct {
|
||||||
int wDebugFlag = 135;
|
int wDebugFlag = 135;
|
||||||
|
|
||||||
static uint32_t walSignature = 0xFAFBFDFE;
|
static uint32_t walSignature = 0xFAFBFDFE;
|
||||||
static int walHandleExistingFiles(char *path);
|
static int walHandleExistingFiles(const char *path);
|
||||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int));
|
static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp);
|
||||||
static int walRemoveWalFiles(char *path);
|
static int walRemoveWalFiles(const char *path);
|
||||||
|
|
||||||
void *walOpen(char *path, SWalCfg *pCfg) {
|
void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
SWal *pWal = calloc(sizeof(SWal), 1);
|
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||||
if (pWal == NULL) return NULL;
|
if (pWal == NULL) return NULL;
|
||||||
|
|
||||||
|
@ -82,7 +82,7 @@ void *walOpen(char *path, SWalCfg *pCfg) {
|
||||||
void walClose(void *handle) {
|
void walClose(void *handle) {
|
||||||
if (handle == NULL) return;
|
if (handle == NULL) return;
|
||||||
|
|
||||||
SWal *pWal = (SWal *)handle;
|
SWal *pWal = handle;
|
||||||
|
|
||||||
close(pWal->fd);
|
close(pWal->fd);
|
||||||
|
|
||||||
|
@ -101,8 +101,8 @@ void walClose(void *handle) {
|
||||||
free(pWal);
|
free(pWal);
|
||||||
}
|
}
|
||||||
|
|
||||||
int walRenew(twal_h handle) {
|
int walRenew(void *handle) {
|
||||||
SWal *pWal = (SWal *)handle;
|
SWal *pWal = handle;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
pthread_mutex_lock(&pWal->mutex);
|
pthread_mutex_lock(&pWal->mutex);
|
||||||
|
@ -144,7 +144,7 @@ int walRenew(twal_h handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int walWrite(void *handle, SWalHead *pHead) {
|
int walWrite(void *handle, SWalHead *pHead) {
|
||||||
SWal *pWal = (SWal *)handle;
|
SWal *pWal = handle;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
// no wal
|
// no wal
|
||||||
|
@ -164,14 +164,14 @@ int walWrite(void *handle, SWalHead *pHead) {
|
||||||
|
|
||||||
void walFsync(void *handle) {
|
void walFsync(void *handle) {
|
||||||
|
|
||||||
SWal *pWal = (SWal *)handle;
|
SWal *pWal = handle;
|
||||||
|
|
||||||
if (pWal->level == TAOS_WAL_FSYNC)
|
if (pWal->level == TAOS_WAL_FSYNC)
|
||||||
fsync(pWal->fd);
|
fsync(pWal->fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
|
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
|
||||||
SWal *pWal = (SWal *)handle;
|
SWal *pWal = handle;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
struct dirent *ent;
|
struct dirent *ent;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
@ -223,7 +223,7 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
}
|
}
|
||||||
|
|
||||||
int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
SWal *pWal = (SWal *)handle;
|
SWal *pWal = handle;
|
||||||
int code = 1;
|
int code = 1;
|
||||||
int32_t first = 0;
|
int32_t first = 0;
|
||||||
|
|
||||||
|
@ -247,7 +247,7 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)) {
|
static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
||||||
char *buffer = malloc(1024000); // size for one record
|
char *buffer = malloc(1024000); // size for one record
|
||||||
|
@ -293,7 +293,7 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, vo
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int walHandleExistingFiles(char *path) {
|
int walHandleExistingFiles(const char *path) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
char oname[TSDB_FILENAME_LEN * 3];
|
char oname[TSDB_FILENAME_LEN * 3];
|
||||||
char nname[TSDB_FILENAME_LEN * 3];
|
char nname[TSDB_FILENAME_LEN * 3];
|
||||||
|
@ -335,7 +335,7 @@ int walHandleExistingFiles(char *path) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int walRemoveWalFiles(char *path) {
|
static int walRemoveWalFiles(const char *path) {
|
||||||
int plen = strlen(walPrefix);
|
int plen = strlen(walPrefix);
|
||||||
char name[TSDB_FILENAME_LEN * 3];
|
char name[TSDB_FILENAME_LEN * 3];
|
||||||
int code = 0;
|
int code = 0;
|
||||||
|
|
Loading…
Reference in New Issue