Merge pull request #1520 from taosdata/feature/wal
integrate WAL module
This commit is contained in:
commit
fddd256fee
|
@ -13,7 +13,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
AUX_SOURCE_DIRECTORY(src SRC)
|
||||||
|
|
||||||
ADD_EXECUTABLE(taosd ${SRC})
|
ADD_EXECUTABLE(taosd ${SRC})
|
||||||
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb)
|
TARGET_LINK_LIBRARIES(taosd mnode taos_static monitor http tsdb twal)
|
||||||
|
|
||||||
IF (TD_ACCOUNT)
|
IF (TD_ACCOUNT)
|
||||||
TARGET_LINK_LIBRARIES(taosd account)
|
TARGET_LINK_LIBRARIES(taosd account)
|
||||||
|
|
|
@ -23,8 +23,8 @@ extern "C" {
|
||||||
int32_t dnodeInitRead();
|
int32_t dnodeInitRead();
|
||||||
void dnodeCleanupRead();
|
void dnodeCleanupRead();
|
||||||
void dnodeRead(SRpcMsg *pMsg);
|
void dnodeRead(SRpcMsg *pMsg);
|
||||||
void * dnodeAllocateReadWorker();
|
void * dnodeAllocateRqueue();
|
||||||
void dnodeFreeReadWorker(void *rqueue);
|
void dnodeFreeRqueue(void *rqueue);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,8 +23,9 @@ extern "C" {
|
||||||
int32_t dnodeInitWrite();
|
int32_t dnodeInitWrite();
|
||||||
void dnodeCleanupWrite();
|
void dnodeCleanupWrite();
|
||||||
void dnodeWrite(SRpcMsg *pMsg);
|
void dnodeWrite(SRpcMsg *pMsg);
|
||||||
void * dnodeAllocateWriteWorker();
|
void * dnodeAllocateWqueue();
|
||||||
void dnodeFreeWriteWorker(void *worker);
|
void dnodeFreeWqueue(void *worker);
|
||||||
|
void dnodeSendWriteResponse(void *pVnode, void *param, int32_t code);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_VNODE_H
|
||||||
|
#define TDENGINE_VNODE_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int len;
|
||||||
|
void *rsp;
|
||||||
|
} SRspRet;
|
||||||
|
|
||||||
|
int32_t vnodeInitWrite();
|
||||||
|
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
|
||||||
|
int32_t vnodeDrop(int32_t vgId);
|
||||||
|
int32_t vnodeOpen(int32_t vnode, char *rootDir);
|
||||||
|
int32_t vnodeClose(void *pVnode);
|
||||||
|
|
||||||
|
void vnodeRelease(void *pVnode);
|
||||||
|
|
||||||
|
void* vnodeGetVnode(int32_t vgId);
|
||||||
|
void* vnodeGetRqueue(void *);
|
||||||
|
void* vnodeGetWqueue(int32_t vgId);
|
||||||
|
void* vnodeGetWal(void *pVnode);
|
||||||
|
void* vnodeGetTsdb(void *pVnode);
|
||||||
|
|
||||||
|
int32_t vnodeProcessWrite(void *pVnode, int qtype, SWalHead *pHead, void *item);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,52 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_VNODE_INT_H
|
||||||
|
#define TDENGINE_VNODE_INT_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef enum _VN_STATUS {
|
||||||
|
VN_STATUS_INIT,
|
||||||
|
VN_STATUS_CREATING,
|
||||||
|
VN_STATUS_READY,
|
||||||
|
VN_STATUS_CLOSING,
|
||||||
|
VN_STATUS_DELETING,
|
||||||
|
} EVnStatus;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t vgId; // global vnode group ID
|
||||||
|
int32_t refCount; // reference count
|
||||||
|
EVnStatus status;
|
||||||
|
int role;
|
||||||
|
int64_t version;
|
||||||
|
void * wqueue;
|
||||||
|
void * rqueue;
|
||||||
|
void * wal;
|
||||||
|
void * tsdb;
|
||||||
|
void * sync;
|
||||||
|
void * events;
|
||||||
|
void * cq; // continuous query
|
||||||
|
} SVnodeObj;
|
||||||
|
|
||||||
|
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -24,12 +24,12 @@
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
#include "twal.h"
|
||||||
#include "dnodeMClient.h"
|
#include "dnodeMClient.h"
|
||||||
#include "dnodeMgmt.h"
|
#include "dnodeMgmt.h"
|
||||||
#include "dnodeRead.h"
|
#include "dnodeRead.h"
|
||||||
#include "dnodeWrite.h"
|
#include "dnodeWrite.h"
|
||||||
|
#include "vnode.h"
|
||||||
typedef enum { CLOSE_TSDB, DROP_TSDB } ECloseTsdbFlag;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId; // global vnode group ID
|
int32_t vgId; // global vnode group ID
|
||||||
|
@ -47,21 +47,16 @@ typedef struct {
|
||||||
|
|
||||||
static int32_t dnodeOpenVnodes();
|
static int32_t dnodeOpenVnodes();
|
||||||
static void dnodeCleanupVnodes();
|
static void dnodeCleanupVnodes();
|
||||||
static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir);
|
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
|
||||||
static void dnodeCleanupVnode(SVnodeObj *pVnode);
|
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
||||||
static void dnodeDoCleanupVnode(SVnodeObj *pVnode, ECloseTsdbFlag dropFlag);
|
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
||||||
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *cfg);
|
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
|
||||||
static void dnodeDropVnode(SVnodeObj *pVnode);
|
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
|
||||||
static void dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
|
static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
|
||||||
static void dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
|
|
||||||
static void dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
|
|
||||||
static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
|
|
||||||
static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
|
|
||||||
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
|
|
||||||
static void dnodeSendStatusMsg(void *handle, void *tmrId);
|
static void dnodeSendStatusMsg(void *handle, void *tmrId);
|
||||||
static void dnodeReadDnodeId();
|
static void dnodeReadDnodeId();
|
||||||
|
|
||||||
static void *tsDnodeVnodesHash = NULL;
|
void *tsDnodeVnodesHash = NULL;
|
||||||
static void *tsDnodeTmr = NULL;
|
static void *tsDnodeTmr = NULL;
|
||||||
static void *tsStatusTimer = NULL;
|
static void *tsStatusTimer = NULL;
|
||||||
static uint32_t tsRebootTime;
|
static uint32_t tsRebootTime;
|
||||||
|
@ -119,43 +114,21 @@ void dnodeCleanupMgmt() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeMgmt(SRpcMsg *pMsg) {
|
void dnodeMgmt(SRpcMsg *pMsg) {
|
||||||
terrno = 0;
|
SRpcMsg rsp;
|
||||||
|
|
||||||
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
|
if (dnodeProcessMgmtMsgFp[pMsg->msgType]) {
|
||||||
(*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
|
rsp.code = (*dnodeProcessMgmtMsgFp[pMsg->msgType])(pMsg);
|
||||||
} else {
|
} else {
|
||||||
SRpcMsg rsp;
|
rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
rsp.handle = pMsg->handle;
|
|
||||||
rsp.code = TSDB_CODE_MSG_NOT_PROCESSED;
|
|
||||||
rsp.pCont = NULL;
|
|
||||||
rpcSendResponse(&rsp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rsp.handle = pMsg->handle;
|
||||||
|
rsp.pCont = NULL;
|
||||||
|
rpcSendResponse(&rsp);
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *dnodeGetVnode(int32_t vgId) {
|
|
||||||
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
terrno = TSDB_CODE_INVALID_VGROUP_ID;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pVnode->status != TSDB_VN_STATUS_MASTER && pVnode->status == TSDB_VN_STATUS_SLAVE) {
|
|
||||||
terrno = TSDB_CODE_INVALID_VNODE_STATUS;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
|
||||||
dTrace("pVnode:%p, vgroup:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount);
|
|
||||||
|
|
||||||
return pVnode;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t dnodeGetVnodeStatus(void *pVnode) {
|
|
||||||
return ((SVnodeObj *)pVnode)->status;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *dnodeGetVnodeWworker(void *pVnode) {
|
void *dnodeGetVnodeWworker(void *pVnode) {
|
||||||
return ((SVnodeObj *)pVnode)->wworker;
|
return ((SVnodeObj *)pVnode)->wworker;
|
||||||
}
|
}
|
||||||
|
@ -164,37 +137,6 @@ void *dnodeGetVnodeRworker(void *pVnode) {
|
||||||
return ((SVnodeObj *)pVnode)->rworker;
|
return ((SVnodeObj *)pVnode)->rworker;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *dnodeGetVnodeWal(void *pVnode) {
|
|
||||||
return ((SVnodeObj *)pVnode)->wal;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *dnodeGetVnodeTsdb(void *pVnode) {
|
|
||||||
return ((SVnodeObj *)pVnode)->tsdb;
|
|
||||||
}
|
|
||||||
|
|
||||||
void dnodeReleaseVnode(void *pVnodeRaw) {
|
|
||||||
SVnodeObj *pVnode = pVnodeRaw;
|
|
||||||
|
|
||||||
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
|
||||||
if (pVnode->status == TSDB_VN_STATUS_DELETING) {
|
|
||||||
if (refCount <= 0) {
|
|
||||||
dPrint("pVnode:%p, vgroup:%d, drop vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
|
|
||||||
dnodeDoCleanupVnode(pVnode, DROP_TSDB);
|
|
||||||
} else {
|
|
||||||
dTrace("pVnode:%p, vgroup:%d, vnode will be dropped until refCount:%d is 0", pVnode, pVnode->vgId, refCount);
|
|
||||||
}
|
|
||||||
} else if (pVnode->status == TSDB_VN_STATUS_CLOSING) {
|
|
||||||
if (refCount <= 0) {
|
|
||||||
dPrint("pVnode:%p, vgroup:%d, cleanup vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
|
|
||||||
dnodeDoCleanupVnode(pVnode, CLOSE_TSDB);
|
|
||||||
} else {
|
|
||||||
dTrace("pVnode:%p, vgroup:%d, vnode will cleanup until refCount:%d is 0", pVnode, pVnode->vgId, refCount);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
dTrace("pVnode:%p, vgroup:%d, release vnode, refCount:%d", pVnode, pVnode->vgId, refCount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dnodeOpenVnodes() {
|
static int32_t dnodeOpenVnodes() {
|
||||||
DIR *dir = opendir(tsVnodeDir);
|
DIR *dir = opendir(tsVnodeDir);
|
||||||
if (dir == NULL) {
|
if (dir == NULL) {
|
||||||
|
@ -212,7 +154,7 @@ static int32_t dnodeOpenVnodes() {
|
||||||
|
|
||||||
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
||||||
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name);
|
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/%s", tsVnodeDir, de->d_name);
|
||||||
int32_t code = dnodeOpenVnode(vnode, vnodeDir);
|
int32_t code = vnodeOpen(vnode, vnodeDir);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
numOfVnodes++;
|
numOfVnodes++;
|
||||||
}
|
}
|
||||||
|
@ -227,208 +169,39 @@ static int32_t dnodeOpenVnodes() {
|
||||||
typedef void (*CleanupFp)(char *);
|
typedef void (*CleanupFp)(char *);
|
||||||
static void dnodeCleanupVnodes() {
|
static void dnodeCleanupVnodes() {
|
||||||
int32_t num = taosGetIntHashSize(tsDnodeVnodesHash);
|
int32_t num = taosGetIntHashSize(tsDnodeVnodesHash);
|
||||||
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)dnodeCleanupVnode);
|
taosCleanUpIntHashWithFp(tsDnodeVnodesHash, (CleanupFp)vnodeClose);
|
||||||
dPrint("dnode mgmt is closed, vnodes:%d", num);
|
dPrint("dnode mgmt is closed, vnodes:%d", num);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
|
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
SVnodeObj vnodeObj = {0};
|
|
||||||
vnodeObj.vgId = vnode;
|
|
||||||
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
|
|
||||||
vnodeObj.refCount = 1;
|
|
||||||
vnodeObj.version = 0;
|
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
|
|
||||||
|
|
||||||
char tsdbDir[TSDB_FILENAME_LEN];
|
|
||||||
sprintf(tsdbDir, "%s/tsdb", rootDir);
|
|
||||||
void *pTsdb = tsdbOpenRepo(tsdbDir);
|
|
||||||
if (pTsdb == NULL) {
|
|
||||||
dError("pVnode:%p, vgroup:%d, failed to open tsdb in %s, reason:%s", pVnode, pVnode->vgId, tsdbDir, tstrerror(terrno));
|
|
||||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
pVnode->wal = NULL;
|
|
||||||
pVnode->tsdb = pTsdb;
|
|
||||||
pVnode->replica = NULL;
|
|
||||||
pVnode->events = NULL;
|
|
||||||
pVnode->cq = NULL;
|
|
||||||
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
|
|
||||||
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
|
|
||||||
|
|
||||||
//TODO: jude status while replca is not null
|
|
||||||
if (pVnode->replica == NULL) {
|
|
||||||
pVnode->status = TSDB_VN_STATUS_MASTER;
|
|
||||||
}
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, vgroup:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeDoCleanupVnode(SVnodeObj *pVnode, ECloseTsdbFlag closeFlag) {
|
|
||||||
dTrace("pVnode:%p, vgroup:%d, cleanup vnode", pVnode, pVnode->vgId);
|
|
||||||
|
|
||||||
// remove replica
|
|
||||||
|
|
||||||
// remove read queue
|
|
||||||
dnodeFreeReadWorker(pVnode->rworker);
|
|
||||||
pVnode->rworker = NULL;
|
|
||||||
|
|
||||||
// remove write queue
|
|
||||||
dnodeFreeWriteWorker(pVnode->wworker);
|
|
||||||
pVnode->wworker = NULL;
|
|
||||||
|
|
||||||
// remove wal
|
|
||||||
|
|
||||||
// remove tsdb
|
|
||||||
if (pVnode->tsdb) {
|
|
||||||
if (closeFlag == DROP_TSDB) {
|
|
||||||
tsdbDropRepo(pVnode->tsdb);
|
|
||||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
|
||||||
} else if (closeFlag == CLOSE_TSDB) {
|
|
||||||
tsdbCloseRepo(pVnode->tsdb);
|
|
||||||
}
|
|
||||||
pVnode->tsdb = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeCleanupVnode(SVnodeObj *pVnode) {
|
|
||||||
pVnode->status = TSDB_VN_STATUS_CLOSING;
|
|
||||||
dnodeReleaseVnode(pVnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
|
|
||||||
STsdbCfg tsdbCfg = {0};
|
|
||||||
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
|
||||||
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
|
|
||||||
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
|
|
||||||
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
|
|
||||||
tsdbCfg.minRowsPerFileBlock = -1;
|
|
||||||
tsdbCfg.maxRowsPerFileBlock = -1;
|
|
||||||
tsdbCfg.keep = -1;
|
|
||||||
tsdbCfg.maxCacheSize = -1;
|
|
||||||
|
|
||||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
|
||||||
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
|
||||||
if (mkdir(rootDir, 0755) != 0) {
|
|
||||||
if (errno == EACCES) {
|
|
||||||
return TSDB_CODE_NO_DISK_PERMISSIONS;
|
|
||||||
} else if (errno == ENOSPC) {
|
|
||||||
return TSDB_CODE_SERV_NO_DISKSPACE;
|
|
||||||
} else if (errno == EEXIST) {
|
|
||||||
} else {
|
|
||||||
return TSDB_CODE_VG_INIT_FAILED;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sprintf(rootDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
|
||||||
if (mkdir(rootDir, 0755) != 0) {
|
|
||||||
if (errno == EACCES) {
|
|
||||||
return TSDB_CODE_NO_DISK_PERMISSIONS;
|
|
||||||
} else if (errno == ENOSPC) {
|
|
||||||
return TSDB_CODE_SERV_NO_DISKSPACE;
|
|
||||||
} else if (errno == EEXIST) {
|
|
||||||
} else {
|
|
||||||
return TSDB_CODE_VG_INIT_FAILED;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
|
|
||||||
if (pTsdb == NULL) {
|
|
||||||
dError("vgroup:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
SVnodeObj vnodeObj = {0};
|
|
||||||
vnodeObj.vgId = pVnodeCfg->cfg.vgId;
|
|
||||||
vnodeObj.status = TSDB_VN_STATUS_CREATING;
|
|
||||||
vnodeObj.refCount = 1;
|
|
||||||
vnodeObj.version = 0;
|
|
||||||
vnodeObj.wal = NULL;
|
|
||||||
vnodeObj.tsdb = pTsdb;
|
|
||||||
vnodeObj.replica = NULL;
|
|
||||||
vnodeObj.events = NULL;
|
|
||||||
vnodeObj.cq = NULL;
|
|
||||||
|
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
|
|
||||||
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
|
|
||||||
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
|
|
||||||
if (pVnode->replica == NULL) {
|
|
||||||
pVnode->status = TSDB_VN_STATUS_MASTER;
|
|
||||||
}
|
|
||||||
|
|
||||||
dPrint("pVnode:%p, vgroup:%d, vnode:%d is created", pVnode, pVnode->vgId, pVnode->vgId);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeDropVnode(SVnodeObj *pVnode) {
|
|
||||||
pVnode->status = TSDB_VN_STATUS_DELETING;
|
|
||||||
dnodeReleaseVnode(pVnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
|
||||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
|
||||||
|
|
||||||
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||||
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
||||||
|
|
||||||
dTrace("vgroup:%d, start to create vnode in dnode", pCreate->cfg.vgId);
|
return vnodeCreate(pCreate);
|
||||||
|
|
||||||
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
|
|
||||||
if (pVnodeObj != NULL) {
|
|
||||||
rpcRsp.code = TSDB_CODE_SUCCESS;
|
|
||||||
dPrint("pVnode:%p, vgroup:%d, vnode is already exist", pVnodeObj, pCreate->cfg.vgId);
|
|
||||||
} else {
|
|
||||||
rpcRsp.code = dnodeCreateVnode(pCreate);
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
|
||||||
|
|
||||||
SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
|
SMDDropVnodeMsg *pDrop = rpcMsg->pCont;
|
||||||
pDrop->vgId = htonl(pDrop->vgId);
|
pDrop->vgId = htonl(pDrop->vgId);
|
||||||
|
|
||||||
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pDrop->vgId);
|
return vnodeDrop(pDrop->vgId);
|
||||||
if (pVnodeObj != NULL) {
|
|
||||||
dPrint("pVnode:%p, vgroup:%d, start to drop vnode in dnode", pVnodeObj, pDrop->vgId);
|
|
||||||
dnodeDropVnode(pVnodeObj);
|
|
||||||
rpcRsp.code = TSDB_CODE_SUCCESS;
|
|
||||||
} else {
|
|
||||||
dTrace("vgroup:%d, failed drop vnode in dnode, vgroup not exist", pDrop->vgId);
|
|
||||||
rpcRsp.code = TSDB_CODE_INVALID_VGROUP_ID;
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
SRpcMsg rpcRsp = {.handle = rpcMsg->handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
|
||||||
|
|
||||||
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
|
||||||
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
|
||||||
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
|
||||||
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
|
||||||
|
|
||||||
dTrace("vgroup:%d, start to alter vnode in dnode", pCreate->cfg.vgId);
|
return 0;
|
||||||
|
|
||||||
SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
|
|
||||||
if (pVnodeObj != NULL) {
|
|
||||||
dPrint("pVnode:%p, vgroup:%d, start to alter vnode in dnode", pVnodeObj, pCreate->cfg.vgId);
|
|
||||||
rpcRsp.code = TSDB_CODE_SUCCESS;
|
|
||||||
} else {
|
|
||||||
dTrace("vgroup:%d, alter vnode msg received, start to create vnode", pCreate->cfg.vgId);
|
|
||||||
rpcRsp.code = dnodeCreateVnode(pCreate);;
|
|
||||||
}
|
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
|
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
|
||||||
// SMDAlterStreamMsg *pStream = pCont;
|
// SMDAlterStreamMsg *pStream = pCont;
|
||||||
// pStream->uid = htobe64(pStream->uid);
|
// pStream->uid = htobe64(pStream->uid);
|
||||||
// pStream->stime = htobe64(pStream->stime);
|
// pStream->stime = htobe64(pStream->stime);
|
||||||
|
@ -437,14 +210,13 @@ static void dnodeProcessAlterStreamMsg(SRpcMsg *pMsg) {
|
||||||
// pStream->status = htonl(pStream->status);
|
// pStream->status = htonl(pStream->status);
|
||||||
//
|
//
|
||||||
// int32_t code = dnodeCreateStream(pStream);
|
// int32_t code = dnodeCreateStream(pStream);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
|
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
|
||||||
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
|
SMDCfgDnodeMsg *pCfg = (SMDCfgDnodeMsg *)pMsg->pCont;
|
||||||
int32_t code = tsCfgDynamicOptions(pCfg->config);
|
return tsCfgDynamicOptions(pCfg->config);
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0, .code = code, .msgType = 0};
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeBuildVloadMsg(char *pNode, void * param) {
|
static void dnodeBuildVloadMsg(char *pNode, void * param) {
|
||||||
|
|
|
@ -22,9 +22,11 @@
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
|
#include "twal.h"
|
||||||
#include "dnodeMgmt.h"
|
#include "dnodeMgmt.h"
|
||||||
#include "dnodeRead.h"
|
#include "dnodeRead.h"
|
||||||
#include "queryExecutor.h"
|
#include "queryExecutor.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
@ -76,6 +78,7 @@ void dnodeRead(SRpcMsg *pMsg) {
|
||||||
int32_t leftLen = pMsg->contLen;
|
int32_t leftLen = pMsg->contLen;
|
||||||
char *pCont = (char *) pMsg->pCont;
|
char *pCont = (char *) pMsg->pCont;
|
||||||
SRpcContext *pRpcContext = NULL;
|
SRpcContext *pRpcContext = NULL;
|
||||||
|
void *pVnode;
|
||||||
|
|
||||||
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
|
dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle);
|
||||||
|
|
||||||
|
@ -88,7 +91,7 @@ void dnodeRead(SRpcMsg *pMsg) {
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
|
|
||||||
void *pVnode = dnodeGetVnode(pHead->vgId);
|
pVnode = vnodeGetVnode(pHead->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
leftLen -= pHead->contLen;
|
leftLen -= pHead->contLen;
|
||||||
pCont -= pHead->contLen;
|
pCont -= pHead->contLen;
|
||||||
|
@ -96,13 +99,13 @@ void dnodeRead(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// put message into queue
|
// put message into queue
|
||||||
|
taos_queue queue = vnodeGetRqueue(pVnode);
|
||||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||||
pRead->rpcMsg = *pMsg;
|
pRead->rpcMsg = *pMsg;
|
||||||
pRead->pCont = pCont;
|
pRead->pCont = pCont;
|
||||||
pRead->contLen = pHead->contLen;
|
pRead->contLen = pHead->contLen;
|
||||||
pRead->pRpcContext = pRpcContext;
|
pRead->pRpcContext = pRpcContext;
|
||||||
|
|
||||||
taos_queue queue = dnodeGetVnodeRworker(pVnode);
|
|
||||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
||||||
|
|
||||||
// next vnode
|
// next vnode
|
||||||
|
@ -123,7 +126,7 @@ void dnodeRead(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *dnodeAllocateReadWorker(void *pVnode) {
|
void *dnodeAllocateRqueue(void *pVnode) {
|
||||||
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
|
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
|
||||||
if (queue == NULL) return NULL;
|
if (queue == NULL) return NULL;
|
||||||
|
|
||||||
|
@ -144,7 +147,7 @@ void *dnodeAllocateReadWorker(void *pVnode) {
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeFreeReadWorker(void *rqueue) {
|
void dnodeFreeRqueue(void *rqueue) {
|
||||||
taosCloseQueue(rqueue);
|
taosCloseQueue(rqueue);
|
||||||
|
|
||||||
// dynamically adjust the number of threads
|
// dynamically adjust the number of threads
|
||||||
|
@ -229,7 +232,7 @@ static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMs
|
||||||
pRead->pRpcContext = pMsg->pRpcContext;
|
pRead->pRpcContext = pMsg->pRpcContext;
|
||||||
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||||
|
|
||||||
taos_queue queue = dnodeGetVnodeRworker(pVnode);
|
taos_queue queue = vnodeGetRqueue(pVnode);
|
||||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -238,7 +241,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
|
||||||
|
|
||||||
SQInfo* pQInfo = NULL;
|
SQInfo* pQInfo = NULL;
|
||||||
if (pMsg->contLen != 0) {
|
if (pMsg->contLen != 0) {
|
||||||
void* tsdb = dnodeGetVnodeTsdb(pVnode);
|
void* tsdb = vnodeGetTsdb(pVnode);
|
||||||
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
|
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, &pQInfo);
|
||||||
|
|
||||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||||
|
@ -255,7 +258,7 @@ static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle);
|
dTrace("dnode query msg disposed, thandle:%p", pMsg->rpcMsg.handle);
|
||||||
dnodeReleaseVnode(pVnode);
|
vnodeRelease(pVnode);
|
||||||
} else {
|
} else {
|
||||||
pQInfo = pMsg->pCont;
|
pQInfo = pMsg->pCont;
|
||||||
}
|
}
|
||||||
|
@ -286,7 +289,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
||||||
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
|
dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
|
||||||
} else { // no further execution invoked, release the ref to vnode
|
} else { // no further execution invoked, release the ref to vnode
|
||||||
dnodeProcessReadResult(pVnode, pMsg);
|
dnodeProcessReadResult(pVnode, pMsg);
|
||||||
dnodeReleaseVnode(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -300,5 +303,5 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle);
|
dTrace("dnode retrieve msg disposed, thandle:%p", pMsg->rpcMsg.handle);
|
||||||
dnodeReleaseVnode(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,22 +21,11 @@
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "twal.h"
|
||||||
#include "dataformat.h"
|
#include "dataformat.h"
|
||||||
#include "dnodeWrite.h"
|
#include "dnodeWrite.h"
|
||||||
#include "dnodeMgmt.h"
|
#include "dnodeMgmt.h"
|
||||||
|
#include "vnode.h"
|
||||||
typedef struct {
|
|
||||||
int32_t code;
|
|
||||||
int32_t count; // number of vnodes returned result
|
|
||||||
int32_t numOfVnodes; // number of vnodes involved
|
|
||||||
} SRpcContext;
|
|
||||||
|
|
||||||
typedef struct _write {
|
|
||||||
void *pCont;
|
|
||||||
int32_t contLen;
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
SRpcContext *pRpcContext; // RPC message context
|
|
||||||
} SWriteMsg;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
taos_qset qset; // queue set
|
taos_qset qset; // queue set
|
||||||
|
@ -44,30 +33,27 @@ typedef struct {
|
||||||
int32_t workerId; // worker ID
|
int32_t workerId; // worker ID
|
||||||
} SWriteWorker;
|
} SWriteWorker;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SRspRet rspRet;
|
||||||
|
void *pCont;
|
||||||
|
int32_t contLen;
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
} SWriteMsg;
|
||||||
|
|
||||||
typedef struct _thread_obj {
|
typedef struct _thread_obj {
|
||||||
int32_t max; // max number of workers
|
int32_t max; // max number of workers
|
||||||
int32_t nextId; // from 0 to max-1, cyclic
|
int32_t nextId; // from 0 to max-1, cyclic
|
||||||
SWriteWorker *writeWorker;
|
SWriteWorker *writeWorker;
|
||||||
} SWriteWorkerPool;
|
} SWriteWorkerPool;
|
||||||
|
|
||||||
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *);
|
|
||||||
static void *dnodeProcessWriteQueue(void *param);
|
static void *dnodeProcessWriteQueue(void *param);
|
||||||
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
|
static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
|
||||||
static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite);
|
|
||||||
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg);
|
|
||||||
static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg);
|
|
||||||
static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg);
|
|
||||||
static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg);
|
|
||||||
static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg);
|
|
||||||
|
|
||||||
SWriteWorkerPool wWorkerPool;
|
SWriteWorkerPool wWorkerPool;
|
||||||
|
|
||||||
int32_t dnodeInitWrite() {
|
int32_t dnodeInitWrite() {
|
||||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeProcessSubmitMsg;
|
|
||||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = dnodeProcessCreateTableMsg;
|
vnodeInitWrite();
|
||||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = dnodeProcessDropTableMsg;
|
|
||||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = dnodeProcessAlterTableMsg;
|
|
||||||
dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = dnodeProcessDropStableMsg;
|
|
||||||
|
|
||||||
wWorkerPool.max = tsNumOfCores;
|
wWorkerPool.max = tsNumOfCores;
|
||||||
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
|
wWorkerPool.writeWorker = (SWriteWorker *)calloc(sizeof(SWriteWorker), wWorkerPool.max);
|
||||||
|
@ -87,51 +73,28 @@ void dnodeCleanupWrite() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeWrite(SRpcMsg *pMsg) {
|
void dnodeWrite(SRpcMsg *pMsg) {
|
||||||
int32_t queuedMsgNum = 0;
|
|
||||||
int32_t leftLen = pMsg->contLen;
|
|
||||||
char *pCont = (char *) pMsg->pCont;
|
char *pCont = (char *) pMsg->pCont;
|
||||||
SRpcContext *pRpcContext = NULL;
|
|
||||||
|
|
||||||
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
|
if (pMsg->msgType == TSDB_MSG_TYPE_SUBMIT || pMsg->msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
|
||||||
SMsgDesc *pDesc = (SMsgDesc *)pCont;
|
SMsgDesc *pDesc = (SMsgDesc *)pCont;
|
||||||
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
|
pDesc->numOfVnodes = htonl(pDesc->numOfVnodes);
|
||||||
pCont += sizeof(SMsgDesc);
|
pCont += sizeof(SMsgDesc);
|
||||||
leftLen -= sizeof(SMsgDesc);
|
|
||||||
if (pDesc->numOfVnodes > 1) {
|
|
||||||
pRpcContext = calloc(sizeof(SRpcContext), 1);
|
|
||||||
pRpcContext->numOfVnodes = pDesc->numOfVnodes;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
while (leftLen > 0) {
|
SMsgHead *pHead = (SMsgHead *) pCont;
|
||||||
SMsgHead *pHead = (SMsgHead *) pCont;
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
|
||||||
|
|
||||||
void *pVnode = dnodeGetVnode(pHead->vgId);
|
|
||||||
if (pVnode == NULL) {
|
|
||||||
leftLen -= pHead->contLen;
|
|
||||||
pCont -= pHead->contLen;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
taos_queue queue = vnodeGetWqueue(pHead->vgId);
|
||||||
|
if (queue) {
|
||||||
// put message into queue
|
// put message into queue
|
||||||
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
|
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
|
||||||
pWrite->rpcMsg = *pMsg;
|
pWrite->rpcMsg = *pMsg;
|
||||||
pWrite->pCont = pCont;
|
pWrite->pCont = pCont;
|
||||||
pWrite->contLen = pHead->contLen;
|
pWrite->contLen = pHead->contLen;
|
||||||
pWrite->pRpcContext = pRpcContext;
|
|
||||||
|
|
||||||
taos_queue queue = dnodeGetVnodeWworker(pVnode);
|
|
||||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
|
taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
|
||||||
|
} else {
|
||||||
// next vnode
|
|
||||||
leftLen -= pHead->contLen;
|
|
||||||
pCont -= pHead->contLen;
|
|
||||||
queuedMsgNum++;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queuedMsgNum == 0) {
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
.handle = pMsg->handle,
|
.handle = pMsg->handle,
|
||||||
.pCont = NULL,
|
.pCont = NULL,
|
||||||
|
@ -143,7 +106,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *dnodeAllocateWriteWorker(void *pVnode) {
|
void *dnodeAllocateWqueue(void *pVnode) {
|
||||||
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
|
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
|
||||||
taos_queue *queue = taosOpenQueue();
|
taos_queue *queue = taosOpenQueue();
|
||||||
if (queue == NULL) return NULL;
|
if (queue == NULL) return NULL;
|
||||||
|
@ -168,22 +131,44 @@ void *dnodeAllocateWriteWorker(void *pVnode) {
|
||||||
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dTrace("queue:%p is allocated for pVnode:%p", queue, pVnode);
|
||||||
|
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodeFreeWriteWorker(void *wqueue) {
|
void dnodeFreeWqueue(void *wqueue) {
|
||||||
taosCloseQueue(wqueue);
|
taosCloseQueue(wqueue);
|
||||||
|
|
||||||
// dynamically adjust the number of threads
|
// dynamically adjust the number of threads
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) {
|
||||||
|
SWriteMsg *pWrite = (SWriteMsg *)param;
|
||||||
|
|
||||||
|
if (code > 0) return;
|
||||||
|
|
||||||
|
SRpcMsg rpcRsp = {
|
||||||
|
.handle = pWrite->rpcMsg.handle,
|
||||||
|
.pCont = pWrite->rspRet.rsp,
|
||||||
|
.contLen = pWrite->rspRet.len,
|
||||||
|
.code = code,
|
||||||
|
};
|
||||||
|
|
||||||
|
rpcSendResponse(&rpcRsp);
|
||||||
|
rpcFreeCont(pWrite->rpcMsg.pCont);
|
||||||
|
taosFreeQitem(pWrite);
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
}
|
||||||
|
|
||||||
static void *dnodeProcessWriteQueue(void *param) {
|
static void *dnodeProcessWriteQueue(void *param) {
|
||||||
SWriteWorker *pWorker = (SWriteWorker *)param;
|
SWriteWorker *pWorker = (SWriteWorker *)param;
|
||||||
taos_qall qall;
|
taos_qall qall;
|
||||||
SWriteMsg *pWriteMsg;
|
SWriteMsg *pWrite;
|
||||||
|
SWalHead *pHead;
|
||||||
int32_t numOfMsgs;
|
int32_t numOfMsgs;
|
||||||
int type;
|
int type;
|
||||||
void *pVnode;
|
void *pVnode, *item;
|
||||||
|
|
||||||
qall = taosAllocateQall();
|
qall = taosAllocateQall();
|
||||||
|
|
||||||
|
@ -195,29 +180,35 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
// retrieve all items, and write them into WAL
|
pWrite = NULL;
|
||||||
taosGetQitem(qall, &type, (void **)&pWriteMsg);
|
taosGetQitem(qall, &type, &item);
|
||||||
|
if (type == TAOS_QTYPE_RPC) {
|
||||||
|
pWrite = (SWriteMsg *)item;
|
||||||
|
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
|
||||||
|
pHead->msgType = pWrite->rpcMsg.msgType;
|
||||||
|
pHead->version = 0;
|
||||||
|
pHead->len = pWrite->contLen;
|
||||||
|
} else {
|
||||||
|
pHead = (SWalHead *)item;
|
||||||
|
}
|
||||||
|
|
||||||
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen);
|
int32_t code = vnodeProcessWrite(pVnode, type, pHead, item);
|
||||||
|
if (pWrite) pWrite->rpcMsg.code = code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// flush WAL file
|
walFsync(vnodeGetWal(pVnode));
|
||||||
// walFsync(pVnode->whandle);
|
|
||||||
|
|
||||||
// browse all items, and process them one by one
|
// browse all items, and process them one by one
|
||||||
taosResetQitems(qall);
|
taosResetQitems(qall);
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(qall, &type, (void **)&pWriteMsg);
|
taosGetQitem(qall, &type, &item);
|
||||||
|
if (type == TAOS_QTYPE_RPC) {
|
||||||
terrno = 0;
|
pWrite = (SWriteMsg *)item;
|
||||||
if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) {
|
dnodeSendRpcWriteRsp(pVnode, item, pWrite->rpcMsg.code);
|
||||||
(*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg);
|
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
taosFreeQitem(item);
|
||||||
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
dnodeProcessWriteResult(pVnode, pWriteMsg);
|
|
||||||
taosFreeQitem(pWriteMsg);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,36 +217,6 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) {
|
|
||||||
SRpcContext *pRpcContext = pWrite->pRpcContext;
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
dnodeReleaseVnode(pVnode);
|
|
||||||
|
|
||||||
if (pRpcContext) {
|
|
||||||
if (terrno) {
|
|
||||||
if (pRpcContext->code == 0) pRpcContext->code = terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t count = atomic_add_fetch_32(&pRpcContext->count, 1);
|
|
||||||
if (count < pRpcContext->numOfVnodes) {
|
|
||||||
// not over yet, multiple vnodes
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// over, result can be merged now
|
|
||||||
code = pRpcContext->code;
|
|
||||||
} else {
|
|
||||||
code = terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
SRpcMsg rsp;
|
|
||||||
rsp.handle = pWrite->rpcMsg.handle;
|
|
||||||
rsp.code = code;
|
|
||||||
rsp.pCont = NULL;
|
|
||||||
rpcSendResponse(&rsp);
|
|
||||||
rpcFreeCont(pWrite->rpcMsg.pCont); // free the received message
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||||
int32_t num = taosGetQueueNumber(pWorker->qset);
|
int32_t num = taosGetQueueNumber(pWorker->qset);
|
||||||
|
@ -270,174 +231,3 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) {
|
|
||||||
dTrace("pVnode:%p, submit msg is disposed", pVnode);
|
|
||||||
|
|
||||||
SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg));
|
|
||||||
pRsp->code = 0;
|
|
||||||
pRsp->numOfRows = htonl(1);
|
|
||||||
pRsp->affectedRows = htonl(1);
|
|
||||||
pRsp->numOfFailedBlocks = 0;
|
|
||||||
|
|
||||||
void* tsdb = dnodeGetVnodeTsdb(pVnode);
|
|
||||||
assert(tsdb != NULL);
|
|
||||||
|
|
||||||
tsdbInsertData(tsdb, pMsg->pCont);
|
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
|
||||||
.handle = pMsg->rpcMsg.handle,
|
|
||||||
.pCont = pRsp,
|
|
||||||
.contLen = sizeof(SShellSubmitRspMsg),
|
|
||||||
.code = 0,
|
|
||||||
.msgType = 0
|
|
||||||
};
|
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
|
||||||
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, table:%s, start to create in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
|
|
||||||
pTable->numOfColumns = htons(pTable->numOfColumns);
|
|
||||||
pTable->numOfTags = htons(pTable->numOfTags);
|
|
||||||
pTable->sid = htonl(pTable->sid);
|
|
||||||
pTable->sversion = htonl(pTable->sversion);
|
|
||||||
pTable->tagDataLen = htonl(pTable->tagDataLen);
|
|
||||||
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
|
|
||||||
pTable->uid = htobe64(pTable->uid);
|
|
||||||
pTable->superTableUid = htobe64(pTable->superTableUid);
|
|
||||||
pTable->createdTime = htobe64(pTable->createdTime);
|
|
||||||
SSchema *pSchema = (SSchema *) pTable->data;
|
|
||||||
|
|
||||||
int totalCols = pTable->numOfColumns + pTable->numOfTags;
|
|
||||||
for (int i = 0; i < totalCols; i++) {
|
|
||||||
pSchema[i].colId = htons(pSchema[i].colId);
|
|
||||||
pSchema[i].bytes = htons(pSchema[i].bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
STableCfg tCfg;
|
|
||||||
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
|
|
||||||
|
|
||||||
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
|
|
||||||
for (int i = 0; i < pTable->numOfColumns; i++) {
|
|
||||||
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
|
||||||
}
|
|
||||||
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
|
||||||
|
|
||||||
if (pTable->numOfTags != 0) {
|
|
||||||
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
|
|
||||||
for (int i = pTable->numOfColumns; i < totalCols; i++) {
|
|
||||||
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
|
||||||
}
|
|
||||||
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
|
|
||||||
|
|
||||||
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
|
|
||||||
int accumBytes = 0;
|
|
||||||
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
|
||||||
|
|
||||||
for (int i = 0; i < pTable->numOfTags; i++) {
|
|
||||||
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
|
|
||||||
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
|
|
||||||
}
|
|
||||||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
|
||||||
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, table:%s, create table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
|
||||||
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
|
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, table:%s, start to drop in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
|
|
||||||
STableId tableId = {
|
|
||||||
.uid = htobe64(pTable->uid),
|
|
||||||
.tid = htonl(pTable->sid)
|
|
||||||
};
|
|
||||||
|
|
||||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
|
||||||
rpcRsp.code = tsdbDropTable(pTsdb, tableId);
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, table:%s, drop table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
|
||||||
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, table:%s, start to alter in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
|
|
||||||
pTable->numOfColumns = htons(pTable->numOfColumns);
|
|
||||||
pTable->numOfTags = htons(pTable->numOfTags);
|
|
||||||
pTable->sid = htonl(pTable->sid);
|
|
||||||
pTable->sversion = htonl(pTable->sversion);
|
|
||||||
pTable->tagDataLen = htonl(pTable->tagDataLen);
|
|
||||||
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
|
|
||||||
pTable->uid = htobe64(pTable->uid);
|
|
||||||
pTable->superTableUid = htobe64(pTable->superTableUid);
|
|
||||||
pTable->createdTime = htobe64(pTable->createdTime);
|
|
||||||
SSchema *pSchema = (SSchema *) pTable->data;
|
|
||||||
|
|
||||||
int totalCols = pTable->numOfColumns + pTable->numOfTags;
|
|
||||||
for (int i = 0; i < totalCols; i++) {
|
|
||||||
pSchema[i].colId = htons(pSchema[i].colId);
|
|
||||||
pSchema[i].bytes = htons(pSchema[i].bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
STableCfg tCfg;
|
|
||||||
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
|
|
||||||
|
|
||||||
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
|
|
||||||
for (int i = 0; i < pTable->numOfColumns; i++) {
|
|
||||||
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
|
||||||
}
|
|
||||||
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
|
||||||
|
|
||||||
if (pTable->numOfTags != 0) {
|
|
||||||
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
|
|
||||||
for (int i = pTable->numOfColumns; i < totalCols; i++) {
|
|
||||||
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
|
||||||
}
|
|
||||||
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
|
|
||||||
|
|
||||||
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
|
|
||||||
int accumBytes = 0;
|
|
||||||
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
|
||||||
|
|
||||||
for (int i = 0; i < pTable->numOfTags; i++) {
|
|
||||||
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
|
|
||||||
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
|
|
||||||
}
|
|
||||||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
|
||||||
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, table:%s, alter table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) {
|
|
||||||
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
|
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, stable:%s, start to it drop in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
|
|
||||||
pTable->uid = htobe64(pTable->uid);
|
|
||||||
|
|
||||||
// TODO: drop stable in vvnode
|
|
||||||
//void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
|
||||||
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
|
|
||||||
|
|
||||||
rpcRsp.code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
dTrace("pVnode:%p, stable:%s, drop stable result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
|
|
||||||
rpcSendResponse(&rpcRsp);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,207 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "ihash.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "tstatus.h"
|
||||||
|
#include "tsdb.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
#include "twal.h"
|
||||||
|
#include "dnodeMClient.h"
|
||||||
|
#include "dnodeMgmt.h"
|
||||||
|
#include "dnodeRead.h"
|
||||||
|
#include "dnodeWrite.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
|
extern void *tsDnodeVnodesHash;
|
||||||
|
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||||
|
|
||||||
|
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
|
||||||
|
|
||||||
|
if (pTemp != NULL) {
|
||||||
|
dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
STsdbCfg tsdbCfg = {0};
|
||||||
|
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
||||||
|
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
|
||||||
|
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
|
||||||
|
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
|
||||||
|
tsdbCfg.minRowsPerFileBlock = -1;
|
||||||
|
tsdbCfg.maxRowsPerFileBlock = -1;
|
||||||
|
tsdbCfg.keep = -1;
|
||||||
|
tsdbCfg.maxCacheSize = -1;
|
||||||
|
|
||||||
|
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||||
|
if (mkdir(rootDir, 0755) != 0) {
|
||||||
|
if (errno == EACCES) {
|
||||||
|
return TSDB_CODE_NO_DISK_PERMISSIONS;
|
||||||
|
} else if (errno == ENOSPC) {
|
||||||
|
return TSDB_CODE_SERV_NO_DISKSPACE;
|
||||||
|
} else if (errno == EEXIST) {
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_VG_INIT_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
||||||
|
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||||
|
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
|
||||||
|
if (code <0) {
|
||||||
|
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
dPrint("vgId:%d, vnode is created", pVnodeCfg->cfg.vgId);
|
||||||
|
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeDrop(int32_t vgId) {
|
||||||
|
|
||||||
|
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
dTrace("vgId:%d, failed to drop, vgId not exist", vgId);
|
||||||
|
return TSDB_CODE_INVALID_VGROUP_ID;
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId);
|
||||||
|
pVnode->status = VN_STATUS_DELETING;
|
||||||
|
vnodeCleanUp(pVnode);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
|
char temp[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
SVnodeObj vnodeObj = {0};
|
||||||
|
vnodeObj.vgId = vnode;
|
||||||
|
vnodeObj.status = VN_STATUS_INIT;
|
||||||
|
vnodeObj.refCount = 1;
|
||||||
|
vnodeObj.version = 0;
|
||||||
|
SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
|
||||||
|
|
||||||
|
sprintf(temp, "%s/tsdb", rootDir);
|
||||||
|
void *pTsdb = tsdbOpenRepo(temp);
|
||||||
|
if (pTsdb == NULL) {
|
||||||
|
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
|
||||||
|
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
|
||||||
|
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
|
||||||
|
|
||||||
|
sprintf(temp, "%s/wal", rootDir);
|
||||||
|
pVnode->wal = walOpen(temp, 3, tsCommitLog);
|
||||||
|
pVnode->tsdb = pTsdb;
|
||||||
|
pVnode->sync = NULL;
|
||||||
|
pVnode->events = NULL;
|
||||||
|
pVnode->cq = NULL;
|
||||||
|
|
||||||
|
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
|
||||||
|
|
||||||
|
pVnode->status = VN_STATUS_READY;
|
||||||
|
dTrace("pVnode:%p vgId:%d, vnode is opened in %s", pVnode, pVnode->vgId, rootDir);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeClose(void *param) {
|
||||||
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, vnode will be closed", pVnode, pVnode->vgId);
|
||||||
|
pVnode->status = VN_STATUS_CLOSING;
|
||||||
|
vnodeCleanUp(pVnode);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeRelease(void *pVnodeRaw) {
|
||||||
|
SVnodeObj *pVnode = pVnodeRaw;
|
||||||
|
|
||||||
|
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
|
||||||
|
|
||||||
|
if (refCount > 0) return;
|
||||||
|
|
||||||
|
// remove read queue
|
||||||
|
dnodeFreeRqueue(pVnode->rqueue);
|
||||||
|
pVnode->rqueue = NULL;
|
||||||
|
|
||||||
|
// remove write queue
|
||||||
|
dnodeFreeWqueue(pVnode->wqueue);
|
||||||
|
pVnode->wqueue = NULL;
|
||||||
|
|
||||||
|
if (pVnode->status == VN_STATUS_DELETING) {
|
||||||
|
// remove the whole directory
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, vnode is released", pVnode, pVnode->vgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *vnodeGetVnode(int32_t vgId) {
|
||||||
|
SVnodeObj *pVnode = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, vgId);
|
||||||
|
if (pVnode == NULL) {
|
||||||
|
terrno = TSDB_CODE_INVALID_VGROUP_ID;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
|
dTrace("pVnode:%p vgId:%d, get vnode, refCount:%d", pVnode, pVnode->vgId, pVnode->refCount);
|
||||||
|
|
||||||
|
return pVnode;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *vnodeGetRqueue(void *pVnode) {
|
||||||
|
return ((SVnodeObj *)pVnode)->rqueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *vnodeGetWqueue(int32_t vgId) {
|
||||||
|
SVnodeObj *pVnode = vnodeGetVnode(vgId);
|
||||||
|
if (pVnode == NULL) return NULL;
|
||||||
|
return pVnode->wqueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *vnodeGetWal(void *pVnode) {
|
||||||
|
return ((SVnodeObj *)pVnode)->wal;
|
||||||
|
}
|
||||||
|
|
||||||
|
void *vnodeGetTsdb(void *pVnode) {
|
||||||
|
return ((SVnodeObj *)pVnode)->tsdb;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
|
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||||
|
|
||||||
|
//syncStop(pVnode->sync);
|
||||||
|
tsdbCloseRepo(pVnode->tsdb);
|
||||||
|
walClose(pVnode->wal);
|
||||||
|
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
}
|
|
@ -0,0 +1,266 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "tqueue.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "tsdb.h"
|
||||||
|
#include "twal.h"
|
||||||
|
#include "dataformat.h"
|
||||||
|
#include "dnodeWrite.h"
|
||||||
|
#include "dnodeMgmt.h"
|
||||||
|
#include "vnode.h"
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
|
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, void*);
|
||||||
|
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
|
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
|
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
|
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
|
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *);
|
||||||
|
|
||||||
|
int32_t vnodeInitWrite() {
|
||||||
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
|
||||||
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
|
||||||
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg;
|
||||||
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg;
|
||||||
|
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
|
|
||||||
|
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL)
|
||||||
|
return TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
|
||||||
|
if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING)
|
||||||
|
return TSDB_CODE_NOT_ACTIVE_VNODE;
|
||||||
|
|
||||||
|
if (pHead->version == 0) { // from client
|
||||||
|
if (pVnode->status != VN_STATUS_READY)
|
||||||
|
return TSDB_CODE_NOT_ACTIVE_VNODE;
|
||||||
|
|
||||||
|
// if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
|
||||||
|
|
||||||
|
// assign version
|
||||||
|
pVnode->version++;
|
||||||
|
pHead->version = pVnode->version;
|
||||||
|
} else {
|
||||||
|
// for data from WAL or forward, version may be smaller
|
||||||
|
if (pHead->version <= pVnode->version) return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// more status and role checking here
|
||||||
|
|
||||||
|
pVnode->version = pHead->version;
|
||||||
|
|
||||||
|
// write into WAL
|
||||||
|
code = walWrite(pVnode->wal, pHead);
|
||||||
|
if ( code < 0) return code;
|
||||||
|
|
||||||
|
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
|
||||||
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
/* forward
|
||||||
|
if (pVnode->replica > 1 && pVnode->role == TAOS_SYNC_ROLE_MASTER) {
|
||||||
|
code = syncForwardToPeer(pVnode->sync, pHead, item);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// save insert result into item
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, submit msg is processed", pVnode, pVnode->vgId);
|
||||||
|
code = tsdbInsertData(pVnode->tsdb, pCont);
|
||||||
|
|
||||||
|
pRet->len = sizeof(SShellSubmitRspMsg);
|
||||||
|
pRet->rsp = rpcMallocCont(pRet->len);
|
||||||
|
SShellSubmitRspMsg *pRsp = pRet->rsp;
|
||||||
|
|
||||||
|
pRsp->code = 0;
|
||||||
|
pRsp->numOfRows = htonl(1);
|
||||||
|
pRsp->affectedRows = htonl(1);
|
||||||
|
pRsp->numOfFailedBlocks = 0;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
SMDCreateTableMsg *pTable = pCont;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, table:%s, start to create", pVnode, pVnode->vgId, pTable->tableId);
|
||||||
|
pTable->numOfColumns = htons(pTable->numOfColumns);
|
||||||
|
pTable->numOfTags = htons(pTable->numOfTags);
|
||||||
|
pTable->sid = htonl(pTable->sid);
|
||||||
|
pTable->sversion = htonl(pTable->sversion);
|
||||||
|
pTable->tagDataLen = htonl(pTable->tagDataLen);
|
||||||
|
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
|
||||||
|
pTable->uid = htobe64(pTable->uid);
|
||||||
|
pTable->superTableUid = htobe64(pTable->superTableUid);
|
||||||
|
pTable->createdTime = htobe64(pTable->createdTime);
|
||||||
|
SSchema *pSchema = (SSchema *) pTable->data;
|
||||||
|
|
||||||
|
int totalCols = pTable->numOfColumns + pTable->numOfTags;
|
||||||
|
for (int i = 0; i < totalCols; i++) {
|
||||||
|
pSchema[i].colId = htons(pSchema[i].colId);
|
||||||
|
pSchema[i].bytes = htons(pSchema[i].bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
STableCfg tCfg;
|
||||||
|
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
|
||||||
|
|
||||||
|
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
|
||||||
|
for (int i = 0; i < pTable->numOfColumns; i++) {
|
||||||
|
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
||||||
|
}
|
||||||
|
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
||||||
|
|
||||||
|
if (pTable->numOfTags != 0) {
|
||||||
|
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
|
||||||
|
for (int i = pTable->numOfColumns; i < totalCols; i++) {
|
||||||
|
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
||||||
|
}
|
||||||
|
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
|
||||||
|
|
||||||
|
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
|
||||||
|
int accumBytes = 0;
|
||||||
|
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
||||||
|
|
||||||
|
for (int i = 0; i < pTable->numOfTags; i++) {
|
||||||
|
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
|
||||||
|
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
|
||||||
|
}
|
||||||
|
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *pTsdb = vnodeGetTsdb(pVnode);
|
||||||
|
code = tsdbCreateTable(pTsdb, &tCfg);
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, table:%s is created, result:%x", pVnode, pVnode->vgId, pTable->tableId, code);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
SMDDropTableMsg *pTable = pCont;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, table:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId);
|
||||||
|
STableId tableId = {
|
||||||
|
.uid = htobe64(pTable->uid),
|
||||||
|
.tid = htonl(pTable->sid)
|
||||||
|
};
|
||||||
|
|
||||||
|
code = tsdbDropTable(pVnode->tsdb, tableId);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
SMDCreateTableMsg *pTable = pCont;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, table:%s, start to alter", pVnode, pVnode->vgId, pTable->tableId);
|
||||||
|
pTable->numOfColumns = htons(pTable->numOfColumns);
|
||||||
|
pTable->numOfTags = htons(pTable->numOfTags);
|
||||||
|
pTable->sid = htonl(pTable->sid);
|
||||||
|
pTable->sversion = htonl(pTable->sversion);
|
||||||
|
pTable->tagDataLen = htonl(pTable->tagDataLen);
|
||||||
|
pTable->sqlDataLen = htonl(pTable->sqlDataLen);
|
||||||
|
pTable->uid = htobe64(pTable->uid);
|
||||||
|
pTable->superTableUid = htobe64(pTable->superTableUid);
|
||||||
|
pTable->createdTime = htobe64(pTable->createdTime);
|
||||||
|
SSchema *pSchema = (SSchema *) pTable->data;
|
||||||
|
|
||||||
|
int totalCols = pTable->numOfColumns + pTable->numOfTags;
|
||||||
|
for (int i = 0; i < totalCols; i++) {
|
||||||
|
pSchema[i].colId = htons(pSchema[i].colId);
|
||||||
|
pSchema[i].bytes = htons(pSchema[i].bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
STableCfg tCfg;
|
||||||
|
tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid);
|
||||||
|
|
||||||
|
STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns);
|
||||||
|
for (int i = 0; i < pTable->numOfColumns; i++) {
|
||||||
|
tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
||||||
|
}
|
||||||
|
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
||||||
|
|
||||||
|
if (pTable->numOfTags != 0) {
|
||||||
|
STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags);
|
||||||
|
for (int i = pTable->numOfColumns; i < totalCols; i++) {
|
||||||
|
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes);
|
||||||
|
}
|
||||||
|
tsdbTableSetSchema(&tCfg, pDestTagSchema, false);
|
||||||
|
|
||||||
|
char *pTagData = pTable->data + totalCols * sizeof(SSchema);
|
||||||
|
int accumBytes = 0;
|
||||||
|
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
||||||
|
|
||||||
|
for (int i = 0; i < pTable->numOfTags; i++) {
|
||||||
|
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
|
||||||
|
accumBytes += pSchema[i + pTable->numOfColumns].bytes;
|
||||||
|
}
|
||||||
|
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbAlterTable(pVnode->tsdb, &tCfg);
|
||||||
|
dTrace("pVnode:%p vgId:%d, table:%s, alter table result:%d", pVnode, pVnode->vgId, pTable->tableId, code);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
SMDDropSTableMsg *pTable = pCont;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
dTrace("pVnode:%p vgId:%d, stable:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId);
|
||||||
|
pTable->uid = htobe64(pTable->uid);
|
||||||
|
|
||||||
|
// TODO: drop stable in vvnode
|
||||||
|
//void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode);
|
||||||
|
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
|
||||||
|
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
dTrace("pVnode:%p vgId:%d, stable:%s, drop stable result:%x", pVnode, pTable->tableId, code);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) {
|
||||||
|
SVnodeObj *pVnode = param;
|
||||||
|
|
||||||
|
int size = sizeof(SWalHead) + pHead->len;
|
||||||
|
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
|
||||||
|
memcpy(pWal, pHead, size);
|
||||||
|
|
||||||
|
taosWriteQitem(pVnode->wqueue, type, pHead);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -40,7 +40,7 @@ void walClose(twal_h);
|
||||||
int walRenew(twal_h);
|
int walRenew(twal_h);
|
||||||
int walWrite(twal_h, SWalHead *);
|
int walWrite(twal_h, SWalHead *);
|
||||||
void walFsync(twal_h);
|
void walFsync(twal_h);
|
||||||
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, void *pWalHead));
|
int walRestore(twal_h, void *pVnode, int (*writeFp)(void *ahandle, SWalHead *pHead, int type));
|
||||||
int walGetWalFile(twal_h, char *name, uint32_t *index);
|
int walGetWalFile(twal_h, char *name, uint32_t *index);
|
||||||
|
|
||||||
extern int wDebugFlag;
|
extern int wDebugFlag;
|
|
@ -53,7 +53,7 @@ void tsdbFreeCfg(STsdbCfg *pCfg);
|
||||||
// --------- TSDB REPOSITORY DEFINITION
|
// --------- TSDB REPOSITORY DEFINITION
|
||||||
typedef void tsdb_repo_t; // use void to hide implementation details from outside
|
typedef void tsdb_repo_t; // use void to hide implementation details from outside
|
||||||
|
|
||||||
tsdb_repo_t * tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
|
int tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter);
|
||||||
int32_t tsdbDropRepo(tsdb_repo_t *repo);
|
int32_t tsdbDropRepo(tsdb_repo_t *repo);
|
||||||
tsdb_repo_t * tsdbOpenRepo(char *tsdbDir);
|
tsdb_repo_t * tsdbOpenRepo(char *tsdbDir);
|
||||||
int32_t tsdbCloseRepo(tsdb_repo_t *repo);
|
int32_t tsdbCloseRepo(tsdb_repo_t *repo);
|
||||||
|
|
|
@ -108,67 +108,40 @@ void tsdbFreeCfg(STsdbCfg *pCfg) {
|
||||||
*
|
*
|
||||||
* @return a TSDB repository handle on success, NULL for failure
|
* @return a TSDB repository handle on success, NULL for failure
|
||||||
*/
|
*/
|
||||||
tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) {
|
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) {
|
||||||
if (rootDir == NULL) return NULL;
|
|
||||||
|
|
||||||
if (access(rootDir, F_OK | R_OK | W_OK) == -1) return NULL;
|
if (mkdir(rootDir, 0755) != 0) {
|
||||||
|
if (errno == EACCES) {
|
||||||
|
return TSDB_CODE_NO_DISK_PERMISSIONS;
|
||||||
|
} else if (errno == ENOSPC) {
|
||||||
|
return TSDB_CODE_SERV_NO_DISKSPACE;
|
||||||
|
} else if (errno == EEXIST) {
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_VG_INIT_FAILED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (access(rootDir, F_OK | R_OK | W_OK) == -1) return -1;
|
||||||
|
|
||||||
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) {
|
if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) {
|
||||||
return NULL;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)malloc(sizeof(STsdbRepo));
|
STsdbRepo *pRepo = (STsdbRepo *)malloc(sizeof(STsdbRepo));
|
||||||
if (pRepo == NULL) {
|
if (pRepo == NULL) {
|
||||||
return NULL;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRepo->rootDir = strdup(rootDir);
|
pRepo->rootDir = strdup(rootDir);
|
||||||
pRepo->config = *pCfg;
|
pRepo->config = *pCfg;
|
||||||
pRepo->limiter = limiter;
|
pRepo->limiter = limiter;
|
||||||
pthread_mutex_init(&pRepo->mutex, NULL);
|
|
||||||
|
|
||||||
// Create the environment files and directories
|
// Create the environment files and directories
|
||||||
if (tsdbSetRepoEnv(pRepo) < 0) {
|
int32_t code = tsdbSetRepoEnv(pRepo);
|
||||||
free(pRepo->rootDir);
|
|
||||||
free(pRepo);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize meta
|
free(pRepo->rootDir);
|
||||||
STsdbMeta *pMeta = tsdbInitMeta(rootDir, pCfg->maxTables);
|
free(pRepo);
|
||||||
if (pMeta == NULL) {
|
return code;
|
||||||
free(pRepo->rootDir);
|
|
||||||
free(pRepo);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pRepo->tsdbMeta = pMeta;
|
|
||||||
|
|
||||||
// Initialize cache
|
|
||||||
STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1, (tsdb_repo_t *)pRepo);
|
|
||||||
if (pCache == NULL) {
|
|
||||||
free(pRepo->rootDir);
|
|
||||||
tsdbFreeMeta(pRepo->tsdbMeta);
|
|
||||||
free(pRepo);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pRepo->tsdbCache = pCache;
|
|
||||||
|
|
||||||
// Initialize file handle
|
|
||||||
char dataDir[128] = "\0";
|
|
||||||
tsdbGetDataDirName(pRepo, dataDir);
|
|
||||||
pRepo->tsdbFileH =
|
|
||||||
tsdbInitFileH(dataDir, pCfg->maxTables);
|
|
||||||
if (pRepo->tsdbFileH == NULL) {
|
|
||||||
free(pRepo->rootDir);
|
|
||||||
tsdbFreeCache(pRepo->tsdbCache);
|
|
||||||
tsdbFreeMeta(pRepo->tsdbMeta);
|
|
||||||
free(pRepo);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRepo->state = TSDB_REPO_STATE_ACTIVE;
|
|
||||||
|
|
||||||
return (tsdb_repo_t *)pRepo;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "tchecksum.h"
|
#include "tchecksum.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
|
#include "tqueue.h"
|
||||||
|
|
||||||
#define walPrefix "wal"
|
#define walPrefix "wal"
|
||||||
#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);}
|
#define wError(...) if (wDebugFlag & DEBUG_ERROR) {tprintf("ERROR WAL ", wDebugFlag, __VA_ARGS__);}
|
||||||
|
@ -48,7 +49,7 @@ int wDebugFlag = 135;
|
||||||
|
|
||||||
static uint32_t walSignature = 0xFAFBFDFE;
|
static uint32_t walSignature = 0xFAFBFDFE;
|
||||||
static int walHandleExistingFiles(char *path);
|
static int walHandleExistingFiles(char *path);
|
||||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *));
|
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int));
|
||||||
static int walRemoveWalFiles(char *path);
|
static int walRemoveWalFiles(char *path);
|
||||||
|
|
||||||
void *walOpen(char *path, int max, int level) {
|
void *walOpen(char *path, int max, int level) {
|
||||||
|
@ -168,7 +169,7 @@ void walFsync(void *handle) {
|
||||||
fsync(pWal->fd);
|
fsync(pWal->fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *)) {
|
int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) {
|
||||||
SWal *pWal = (SWal *)handle;
|
SWal *pWal = (SWal *)handle;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
struct dirent *ent;
|
struct dirent *ent;
|
||||||
|
@ -245,43 +246,45 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, void *)) {
|
static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SWalHead *, int)) {
|
||||||
SWalHead walHead;
|
int code = 0;
|
||||||
int code = 0;
|
|
||||||
|
char *buffer = malloc(1024000); // size for one record
|
||||||
|
if (buffer == NULL) return -1;
|
||||||
|
|
||||||
|
SWalHead *pHead = (SWalHead *)buffer;
|
||||||
|
|
||||||
int fd = open(name, O_RDONLY);
|
int fd = open(name, O_RDONLY);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
|
wError("wal:%s, failed to open for restore(%s)", name, strerror(errno));
|
||||||
|
free(buffer);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
wTrace("wal:%s, start to restore", name);
|
wTrace("wal:%s, start to restore", name);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int ret = read(fd, &walHead, sizeof(walHead));
|
int ret = read(fd, pHead, sizeof(SWalHead));
|
||||||
if ( ret == 0) { code = 0; break;}
|
if ( ret == 0) { code = 0; break;}
|
||||||
|
|
||||||
if (ret != sizeof(walHead)) {
|
if (ret != sizeof(SWalHead)) {
|
||||||
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
|
wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosCheckChecksumWhole((uint8_t *)&walHead, sizeof(walHead))) {
|
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
|
||||||
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
|
wWarn("wal:%s, cksum is messed up, skip the rest of file", name);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *buffer = malloc(sizeof(SWalHead) + walHead.len);
|
ret = read(fd, pHead->cont, pHead->len);
|
||||||
memcpy(buffer, &walHead, sizeof(walHead));
|
if ( ret != pHead->len) {
|
||||||
|
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret);
|
||||||
ret = read(fd, buffer+sizeof(walHead), walHead.len);
|
|
||||||
if ( ret != walHead.len) {
|
|
||||||
wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, walHead.len, ret);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// write into queue
|
// write into queue
|
||||||
(*writeFp)(pVnode, buffer);
|
(*writeFp)(pVnode, buffer, TAOS_QTYPE_WAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -21,16 +21,14 @@
|
||||||
int64_t ver = 0;
|
int64_t ver = 0;
|
||||||
void *pWal = NULL;
|
void *pWal = NULL;
|
||||||
|
|
||||||
int writeToQueue(void *pVnode, void *data) {
|
int writeToQueue(void *pVnode, SWalHead *pHead, int type) {
|
||||||
SWalHead *pHead = (SWalHead *)data;
|
|
||||||
|
|
||||||
// do nothing
|
// do nothing
|
||||||
if (pHead->version > ver)
|
if (pHead->version > ver)
|
||||||
ver = pHead->version;
|
ver = pHead->version;
|
||||||
|
|
||||||
walWrite(pWal, pHead);
|
walWrite(pWal, pHead);
|
||||||
|
|
||||||
free(data);
|
free(pHead);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue