TD-2289
This commit is contained in:
parent
f5e62a9e34
commit
90c133a12c
|
@ -0,0 +1,36 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_VNODE_SYNC_H
|
||||||
|
#define TDENGINE_VNODE_SYNC_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver);
|
||||||
|
int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
|
||||||
|
void vnodeNotifyRole(int32_t vgId, int8_t role);
|
||||||
|
void vnodeCtrlFlow(int32_t vgId, int32_t level);
|
||||||
|
int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
|
||||||
|
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
|
||||||
|
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
|
||||||
|
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -24,6 +24,7 @@
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "vnodeCfg.h"
|
#include "vnodeCfg.h"
|
||||||
#include "vnodeStatus.h"
|
#include "vnodeStatus.h"
|
||||||
|
#include "vnodeSync.h"
|
||||||
#include "vnodeVersion.h"
|
#include "vnodeVersion.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "dnode.h"
|
#include "dnode.h"
|
||||||
|
@ -34,14 +35,6 @@
|
||||||
static SHashObj*tsVnodesHash;
|
static SHashObj*tsVnodesHash;
|
||||||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||||
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
|
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
|
||||||
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fversion);
|
|
||||||
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
|
|
||||||
static void vnodeNotifyRole(int32_t vgId, int8_t role);
|
|
||||||
static void vnodeCtrlFlow(int32_t vgId, int32_t level);
|
|
||||||
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion);
|
|
||||||
static void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
|
|
||||||
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
|
|
||||||
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
|
|
||||||
|
|
||||||
int32_t vnodeInitResources() {
|
int32_t vnodeInitResources() {
|
||||||
int32_t code = syncInit();
|
int32_t code = syncInit();
|
||||||
|
@ -609,70 +602,8 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size,
|
|
||||||
uint64_t *fversion) {
|
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
vError("vgId:%d, vnode not found while get file info", vgId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
*fversion = pVnode->fversion;
|
int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
|
||||||
uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
|
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
|
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
vError("vgId:%d, vnode not found while get wal info", vgId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = walGetWalFile(pVnode->wal, fileName, fileId);
|
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void vnodeNotifyRole(int32_t vgId, int8_t role) {
|
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
vTrace("vgId:%d, vnode not found while notify role", vgId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
|
|
||||||
pVnode->role = role;
|
|
||||||
dnodeSendStatusMsgToMnode();
|
|
||||||
|
|
||||||
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) {
|
|
||||||
cqStart(pVnode->cq);
|
|
||||||
} else {
|
|
||||||
cqStop(pVnode->cq);
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
|
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pVnode->flowctrlLevel != level) {
|
|
||||||
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
|
|
||||||
pVnode->flowctrlLevel = level;
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
|
|
||||||
char rootDir[128] = "\0";
|
char rootDir[128] = "\0";
|
||||||
sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId);
|
sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId);
|
||||||
|
|
||||||
|
@ -705,65 +636,3 @@ static int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) {
|
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
vError("vgId:%d, vnode not found while notify file synced", vgId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pVnode->fversion = fversion;
|
|
||||||
pVnode->version = fversion;
|
|
||||||
vnodeSaveVersion(pVnode);
|
|
||||||
|
|
||||||
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
|
|
||||||
int32_t code = vnodeResetTsdb(pVnode);
|
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
|
|
||||||
void *pVnode = vnodeAcquire(vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
vError("vgId:%d, vnode not found while confirm forward", vgId);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
dnodeSendRpcVWriteRsp(pVnode, wparam, code);
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
|
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
vError("vgId:%d, vnode not found while write to cache", vgId);
|
|
||||||
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam);
|
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
|
|
||||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
vError("vgId:%d, vnode not found while write to cache", vgId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = 0;
|
|
||||||
if (pVnode->isCommiting) {
|
|
||||||
vDebug("vgId:%d, vnode is commiting while get version", vgId);
|
|
||||||
code = -1;
|
|
||||||
} else {
|
|
||||||
*fver = pVnode->fversion;
|
|
||||||
*wver = pVnode->version;
|
|
||||||
}
|
|
||||||
|
|
||||||
vnodeRelease(pVnode);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,156 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "tutil.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
#include "vnodeCfg.h"
|
||||||
|
#include "vnodeStatus.h"
|
||||||
|
#include "vnodeVersion.h"
|
||||||
|
#include "query.h"
|
||||||
|
#include "dnode.h"
|
||||||
|
#include "dnodeVWrite.h"
|
||||||
|
#include "dnodeVRead.h"
|
||||||
|
#include "tfs.h"
|
||||||
|
|
||||||
|
uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) {
|
||||||
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vError("vgId:%d, vnode not found while get file info", vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
*fver = pVnode->fversion;
|
||||||
|
uint32_t ret = tsdbGetFileInfo(pVnode->tsdb, name, index, eindex, size);
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId) {
|
||||||
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vError("vgId:%d, vnode not found while get wal info", vgId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = walGetWalFile(pVnode->wal, fileName, fileId);
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeNotifyRole(int32_t vgId, int8_t role) {
|
||||||
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vTrace("vgId:%d, vnode not found while notify role", vgId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
vInfo("vgId:%d, sync role changed from %s to %s", pVnode->vgId, syncRole[pVnode->role], syncRole[role]);
|
||||||
|
pVnode->role = role;
|
||||||
|
dnodeSendStatusMsgToMnode();
|
||||||
|
|
||||||
|
if (pVnode->role == TAOS_SYNC_ROLE_MASTER) {
|
||||||
|
cqStart(pVnode->cq);
|
||||||
|
} else {
|
||||||
|
cqStop(pVnode->cq);
|
||||||
|
}
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeCtrlFlow(int32_t vgId, int32_t level) {
|
||||||
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vTrace("vgId:%d, vnode not found while flow ctrl", vgId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pVnode->flowctrlLevel != level) {
|
||||||
|
vDebug("vgId:%d, set flowctrl level from %d to %d", pVnode->vgId, pVnode->flowctrlLevel, level);
|
||||||
|
pVnode->flowctrlLevel = level;
|
||||||
|
}
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) {
|
||||||
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vError("vgId:%d, vnode not found while notify file synced", vgId);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVnode->fversion = fversion;
|
||||||
|
pVnode->version = fversion;
|
||||||
|
vnodeSaveVersion(pVnode);
|
||||||
|
|
||||||
|
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
|
||||||
|
int32_t code = vnodeResetTsdb(pVnode);
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code) {
|
||||||
|
void *pVnode = vnodeAcquire(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vError("vgId:%d, vnode not found while confirm forward", vgId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
dnodeSendRpcVWriteRsp(pVnode, wparam, code);
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam) {
|
||||||
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vError("vgId:%d, vnode not found while write to cache", vgId);
|
||||||
|
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = vnodeWriteToWQueue(pVnode, wparam, qtype, rparam);
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
|
||||||
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
vError("vgId:%d, vnode not found while write to cache", vgId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
if (pVnode->isCommiting) {
|
||||||
|
vDebug("vgId:%d, vnode is commiting while get version", vgId);
|
||||||
|
code = -1;
|
||||||
|
} else {
|
||||||
|
*fver = pVnode->fversion;
|
||||||
|
*wver = pVnode->version;
|
||||||
|
}
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return code;
|
||||||
|
}
|
Loading…
Reference in New Issue