Merge branch 'feature/sync' into feature/balance

# Conflicts:
#	src/vnode/main/src/vnodeMain.c
This commit is contained in:
slguan 2020-04-09 16:19:12 +08:00
commit dff00db7fc
12 changed files with 82 additions and 39 deletions

View File

@ -28,7 +28,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ENDIF () ENDIF ()
IF (TD_VPEER) IF (TD_VPEER)
TARGET_LINK_LIBRARIES(taosd balance) TARGET_LINK_LIBRARIES(taosd balance sync)
ENDIF () ENDIF ()
SET(PREPARE_ENV_CMD "prepare_env_cmd") SET(PREPARE_ENV_CMD "prepare_env_cmd")

View File

@ -131,7 +131,7 @@ static int32_t dnodeOpenVnodes() {
char vnodeDir[TSDB_FILENAME_LEN * 3]; char vnodeDir[TSDB_FILENAME_LEN * 3];
int32_t failed = 0; int32_t failed = 0;
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000); int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes = dnodeGetVnodeList(vnodeList); int32_t numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
@ -146,7 +146,7 @@ static int32_t dnodeOpenVnodes() {
} }
static void dnodeCloseVnodes() { static void dnodeCloseVnodes() {
int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * 10000); int32_t *vnodeList = (int32_t *)malloc(sizeof(int32_t) * TSDB_MAX_VNODES);
int32_t numOfVnodes = dnodeGetVnodeList(vnodeList); int32_t numOfVnodes = dnodeGetVnodeList(vnodeList);
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {

View File

@ -42,7 +42,7 @@ void *dnodeAllocateWqueue(void *pVnode);
void dnodeFreeWqueue(void *queue); void dnodeFreeWqueue(void *queue);
void *dnodeAllocateRqueue(void *pVnode); void *dnodeAllocateRqueue(void *pVnode);
void dnodeFreeRqueue(void *rqueue); void dnodeFreeRqueue(void *rqueue);
void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code); void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -66,11 +66,11 @@ typedef struct {
// if name is null, get the file from index or after, used by master // if name is null, get the file from index or after, used by master
// if name is provided, get the named file at the specified index, used by unsynced node // if name is provided, get the named file at the specified index, used by unsynced node
// it returns the file magic number and size, if file not there, magic shall be 0. // it returns the file magic number and size, if file not there, magic shall be 0.
uint32_t (*getFileInfo)(char *name, int *index, int *size); uint32_t (*getFileInfo)(void *ahandle, char *name, uint32_t *index, int32_t *size);
// get the wal file from index or after // get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file // return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
int (*getWalInfo)(char *name, int *index); int (*getWalInfo)(void *ahandle, char *name, uint32_t *index);
// when a forward pkt is received, call this to handle data // when a forward pkt is received, call this to handle data
int (*writeToCache)(void *ahandle, void *pHead, int type); int (*writeToCache)(void *ahandle, void *pHead, int type);
@ -94,11 +94,13 @@ int syncGetNodesRole(tsync_h shandle, SNodesRole *);
extern char *syncRole[]; extern char *syncRole[];
//global configurable parameters
extern int tsMaxSyncNum; extern int tsMaxSyncNum;
extern int tsSyncTcpThreads; extern int tsSyncTcpThreads;
extern int tsMaxWatchFiles; extern int tsMaxWatchFiles;
extern short tsSyncPort; extern short tsSyncPort;
extern int tsMaxFwdInfo; extern int tsMaxFwdInfo;
extern int sDebugFlag;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -40,12 +40,12 @@ typedef struct {
typedef void* twal_h; // WAL HANDLE typedef void* twal_h; // WAL HANDLE
twal_h walOpen(char *path, int max, int level); twal_h walOpen(char *path, SWalCfg *pCfg);
void walClose(twal_h); void walClose(twal_h);
int walRenew(twal_h); int walRenew(twal_h);
int walWrite(twal_h, SWalHead *); int walWrite(twal_h, SWalHead *);
void walFsync(twal_h); void walFsync(twal_h);
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, SWalHead *pHead, int type)); int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pHead, int type));
int walGetWalFile(twal_h, char *name, uint32_t *index); int walGetWalFile(twal_h, char *name, uint32_t *index);
extern int wDebugFlag; extern int wDebugFlag;

View File

@ -935,6 +935,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet)); memcpy(&pContext->ipSet, pHead->content, sizeof(pContext->ipSet));
tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps); tTrace("%s %p, redirect is received, numOfIps:%d", pRpc->label, pConn, pContext->ipSet.numOfIps);
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} else if (pHead->code == TSDB_CODE_NOT_READY) {
pConn->pContext->code = pHead->code;
rpcProcessConnError(pConn->pContext, NULL);
} else { } else {
rpcNotifyClient(pContext, &rpcMsg); rpcNotifyClient(pContext, &rpcMsg);
} }
@ -1079,7 +1082,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16)) if (pHead->msgType < TSDB_MSG_TYPE_CM_HEARTBEAT || (rpcDebugFlag & 16))
tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d", tTrace( "%s %p, %s is sent to %s:%hu, code:0x%x len:%d sig:0x%08x:0x%08x:%d",
pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort, pRpc->label, pConn, taosMsg[pHead->msgType], pConn->peerIpstr, pConn->peerPort,
pHead->code, msgLen, pHead->sourceId, pHead->destId, pHead->tranId); htonl(pHead->code), msgLen, pHead->sourceId, pHead->destId, pHead->tranId);
} }
writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle); writtenLen = (*taosSendData[pConn->connType])(pConn->peerIp, pConn->peerPort, pHead, msgLen, pConn->chandle);

View File

@ -110,6 +110,8 @@ char *taosMsg[] = {
"", "",
"", "",
"", "",
"",
"",
"", //90 "", //90
"config-table", "config-table",

View File

@ -27,7 +27,7 @@ typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int status; int status;
int role; int8_t role;
int64_t version; int64_t version;
void *wqueue; void *wqueue;
void *rqueue; void *rqueue;
@ -41,7 +41,7 @@ typedef struct {
SWalCfg walCfg; SWalCfg walCfg;
} SVnodeObj; } SVnodeObj;
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); int vnodeWriteToQueue(void *param, void *pHead, int type);
void vnodeInitWriteFp(void); void vnodeInitWriteFp(void);
void vnodeInitReadFp(void); void vnodeInitReadFp(void);

View File

@ -28,14 +28,17 @@
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
static void *tsDnodeVnodesHash; static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode); static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param); static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWALCallback(void *arg); static int vnodeWalCallback(void *arg);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int vnodeWalCallback(void *arg);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static int32_t tsOpennedVnodes;
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
static void vnodeInit() { static void vnodeInit() {
@ -138,14 +141,27 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->rqueue = dnodeAllocateRqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode);
sprintf(temp, "%s/wal", rootDir); sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, pVnode->walCfg.wals, pVnode->walCfg.commitLog); pVnode->wal = walOpen(temp, &pVnode->walCfg);
pVnode->sync = NULL;
SSyncInfo syncInfo;
syncInfo.vgId = pVnode->vgId;
syncInfo.version = pVnode->version;
syncInfo.syncCfg = pVnode->syncCfg;
sprintf(syncInfo.path, "%s/tsdb/", rootDir);
syncInfo.ahandle = pVnode;
syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.confirmForward = dnodeSendRpcWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole;
pVnode->sync = syncStart(&syncInfo);;
pVnode->events = NULL; pVnode->events = NULL;
pVnode->cq = NULL; pVnode->cq = NULL;
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.walCallBack = vnodeWALCallback; appH.walCallBack = vnodeWalCallback;
sprintf(temp, "%s/tsdb", rootDir); sprintf(temp, "%s/tsdb", rootDir);
void *pTsdb = tsdbOpenRepo(temp, &appH); void *pTsdb = tsdbOpenRepo(temp, &appH);
@ -162,7 +178,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir); dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
tsOpennedVnodes++; atomic_add_fetch_32(&tsOpennedVnodes, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -203,8 +219,8 @@ void vnodeRelease(void *pVnodeRaw) {
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId); dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
free(pVnode); free(pVnode);
tsOpennedVnodes--; int32_t count = atomic_sub_fetch_32(&tsOpennedVnodes, 1);
if (tsOpennedVnodes <= 0) { if (count <= 0) {
taosCleanUpIntHash(tsDnodeVnodesHash); taosCleanUpIntHash(tsDnodeVnodesHash);
vnodeModuleInit = PTHREAD_ONCE_INIT; vnodeModuleInit = PTHREAD_ONCE_INIT;
tsDnodeVnodesHash = NULL; tsDnodeVnodesHash = NULL;
@ -280,11 +296,27 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
} }
// TODO: this is a simple implement // TODO: this is a simple implement
static int vnodeWALCallback(void *arg) { static int vnodeWalCallback(void *arg) {
SVnodeObj *pVnode = arg; SVnodeObj *pVnode = arg;
return walRenew(pVnode->wal); return walRenew(pVnode->wal);
} }
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size) {
// SVnodeObj *pVnode = ahandle;
//tsdbGetFileInfo(pVnode->tsdb, name, index, size);
return 0;
}
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index) {
SVnodeObj *pVnode = ahandle;
return walGetWalFile(pVnode->wal, name, index);
}
static void vnodeNotifyRole(void *ahandle, int8_t role) {
SVnodeObj *pVnode = ahandle;
pVnode->role = role;
}
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId);
@ -335,7 +367,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (arbitratorIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT; if (arbitratorIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.arbitratorIp = arbitratorIp; pVnode->syncCfg.arbitratorIp = 0;
int32_t quorum = -1; int32_t quorum = -1;
num = fscanf(fp, "%s %d", option[0], &quorum); num = fscanf(fp, "%s %d", option[0], &quorum);

View File

@ -78,11 +78,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
if (code < 0) return code; if (code < 0) return code;
/* forward if (pVnode->syncCfg.replica > 1)
if (pVnode->replica > 1 && pVnode->role == TAOS_SYNC_ROLE_MASTER) {
code = syncForwardToPeer(pVnode->sync, pHead, item); code = syncForwardToPeer(pVnode->sync, pHead, item);
}
*/
return code; return code;
} }
@ -252,8 +249,9 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
return code; return code;
} }
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) { int vnodeWriteToQueue(void *param, void *data, int type) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = param;
SWalHead *pHead = data;
int size = sizeof(SWalHead) + pHead->len; int size = sizeof(SWalHead) + pHead->len;
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);

View File

@ -49,18 +49,18 @@ int wDebugFlag = 135;
static uint32_t walSignature = 0xFAFBFDFE; static uint32_t walSignature = 0xFAFBFDFE;
static int walHandleExistingFiles(char *path); static int walHandleExistingFiles(char *path);
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)); static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int));
static int walRemoveWalFiles(char *path); static int walRemoveWalFiles(char *path);
void *walOpen(char *path, int max, int level) { void *walOpen(char *path, SWalCfg *pCfg) {
SWal *pWal = calloc(sizeof(SWal), 1); SWal *pWal = calloc(sizeof(SWal), 1);
if (pWal == NULL) return NULL; if (pWal == NULL) return NULL;
pWal->fd = -1; pWal->fd = -1;
pWal->max = max; pWal->max = pCfg->wals;
pWal->id = 0; pWal->id = 0;
pWal->num = 0; pWal->num = 0;
pWal->level = level; pWal->level = pCfg->commitLog;
strcpy(pWal->path, path); strcpy(pWal->path, path);
pthread_mutex_init(&pWal->mutex, NULL); pthread_mutex_init(&pWal->mutex, NULL);
@ -170,7 +170,7 @@ void walFsync(void *handle) {
fsync(pWal->fd); fsync(pWal->fd);
} }
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) { int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int)) {
SWal *pWal = (SWal *)handle; SWal *pWal = (SWal *)handle;
int code = 0; int code = 0;
struct dirent *ent; struct dirent *ent;
@ -247,7 +247,7 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
return code; return code;
} }
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) { static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *, int)) {
int code = 0; int code = 0;
char *buffer = malloc(1024000); // size for one record char *buffer = malloc(1024000); // size for one record

View File

@ -21,8 +21,10 @@
int64_t ver = 0; int64_t ver = 0;
void *pWal = NULL; void *pWal = NULL;
int writeToQueue(void *pVnode, SWalHead *pHead, int type) { int writeToQueue(void *pVnode, void *data, int type) {
// do nothing // do nothing
SWalHead *pHead = data;
if (pHead->version > ver) if (pHead->version > ver)
ver = pHead->version; ver = pHead->version;
@ -74,7 +76,11 @@ int main(int argc, char *argv[]) {
taosInitLog("wal.log", 100000, 10); taosInitLog("wal.log", 100000, 10);
pWal = walOpen(path, max, level); SWalCfg walCfg;
walCfg.commitLog = level;
walCfg.wals = max;
pWal = walOpen(path, &walCfg);
if (pWal == NULL) { if (pWal == NULL) {
printf("failed to open wal\n"); printf("failed to open wal\n");
exit(-1); exit(-1);