TD-2324
This commit is contained in:
parent
18c4f5b7ac
commit
8fb299ce4d
|
@ -265,14 +265,12 @@ static int32_t dnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
|
|||
|
||||
static void *dnodeOpenVnode(void *param) {
|
||||
SOpenVnodeThread *pThread = param;
|
||||
char vnodeDir[TSDB_FILENAME_LEN * 3];
|
||||
|
||||
|
||||
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
|
||||
|
||||
for (int32_t v = 0; v < pThread->vnodeNum; ++v) {
|
||||
int32_t vgId = pThread->vnodeList[v];
|
||||
snprintf(vnodeDir, TSDB_FILENAME_LEN * 3, "%s/vnode%d", tsVnodeDir, vgId);
|
||||
if (vnodeOpen(vgId, vnodeDir) < 0) {
|
||||
if (vnodeOpen(vgId) < 0) {
|
||||
dError("vgId:%d, failed to open vnode by thread:%d", vgId, pThread->threadIndex);
|
||||
pThread->failed++;
|
||||
} else {
|
||||
|
|
|
@ -57,7 +57,7 @@ extern char *vnodeStatus[];
|
|||
// vnodeMain
|
||||
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeDrop(int32_t vgId);
|
||||
int32_t vnodeOpen(int32_t vgId, char *rootDir);
|
||||
int32_t vnodeOpen(int32_t vgId);
|
||||
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeClose(int32_t vgId);
|
||||
|
||||
|
|
|
@ -358,8 +358,8 @@ int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phand
|
|||
if (queue->head) {
|
||||
pNode = queue->head;
|
||||
*pitem = pNode->item;
|
||||
*type = pNode->type;
|
||||
*phandle = queue->ahandle;
|
||||
if (type) *type = pNode->type;
|
||||
if (phandle) *phandle = queue->ahandle;
|
||||
queue->head = pNode->next;
|
||||
if (queue->head == NULL)
|
||||
queue->tail = NULL;
|
||||
|
|
|
@ -23,7 +23,7 @@ extern "C" {
|
|||
|
||||
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeDrop(int32_t vgId);
|
||||
int32_t vnodeOpen(int32_t vgId, char *rootDir);
|
||||
int32_t vnodeOpen(int32_t vgId);
|
||||
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
|
||||
int32_t vnodeClose(int32_t vgId);
|
||||
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_VNODE_WORKER_H
|
||||
#define TDENGINE_VNODE_WORKER_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "vnodeInt.h"
|
||||
|
||||
int32_t vnodeInitMWorker();
|
||||
void vnodeCleanupMWorker();
|
||||
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle);
|
||||
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif
|
|
@ -84,7 +84,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
|
|||
|
||||
vInfo("vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel,
|
||||
pVnodeCfg->cfg.fsyncPeriod);
|
||||
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
|
||||
code = vnodeOpen(pVnodeCfg->cfg.vgId);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -158,18 +158,20 @@ int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||
char temp[TSDB_FILENAME_LEN];
|
||||
int32_t vnodeOpen(int32_t vgId) {
|
||||
char temp[TSDB_FILENAME_LEN * 3];
|
||||
char rootDir[TSDB_FILENAME_LEN * 2];
|
||||
snprintf(rootDir, TSDB_FILENAME_LEN * 2, "%s/vnode%d", tsVnodeDir, vgId);
|
||||
|
||||
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
|
||||
if (pVnode == NULL) {
|
||||
vError("vgId:%d, failed to open vnode since no enough memory", vnode);
|
||||
vError("vgId:%d, failed to open vnode since no enough memory", vgId);
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
|
||||
pVnode->vgId = vnode;
|
||||
pVnode->vgId = vgId;
|
||||
pVnode->fversion = 0;
|
||||
pVnode->version = 0;
|
||||
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
||||
|
@ -206,7 +208,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
sprintf(cqCfg.user, "_root");
|
||||
strcpy(cqCfg.pass, tsInternalPass);
|
||||
strcpy(cqCfg.db, pVnode->db);
|
||||
cqCfg.vgId = vnode;
|
||||
cqCfg.vgId = vgId;
|
||||
cqCfg.cqWrite = vnodeWriteToCache;
|
||||
pVnode->cq = cqOpen(pVnode, &cqCfg);
|
||||
if (pVnode->cq == NULL) {
|
||||
|
@ -220,7 +222,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
appH.cqH = pVnode->cq;
|
||||
appH.cqCreateFunc = cqCreate;
|
||||
appH.cqDropFunc = cqDrop;
|
||||
sprintf(temp, "vnode/vnode%d/tsdb", vnode);
|
||||
sprintf(temp, "vnode/vnode%d/tsdb", vgId);
|
||||
|
||||
terrno = 0;
|
||||
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
||||
|
@ -280,7 +282,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
|||
syncInfo.vgId = pVnode->vgId;
|
||||
syncInfo.version = pVnode->version;
|
||||
syncInfo.syncCfg = pVnode->syncCfg;
|
||||
sprintf(syncInfo.path, "%s", rootDir);
|
||||
tstrncpy(syncInfo.path, rootDir, TSDB_FILENAME_LEN);
|
||||
syncInfo.getWalInfo = vnodeGetWalInfo;
|
||||
syncInfo.getFileInfo = vnodeGetFileInfo;
|
||||
syncInfo.writeToCache = vnodeWriteToCache;
|
||||
|
|
|
@ -0,0 +1,205 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "taoserror.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tutil.h"
|
||||
#include "tqueue.h"
|
||||
#include "tglobal.h"
|
||||
#include "vnodeWorker.h"
|
||||
|
||||
typedef enum {
|
||||
VNODE_WORKER_ACTION_CREATE,
|
||||
VNODE_WORKER_ACTION_DELETE,
|
||||
VNODE_WORKER_ACTION_ALTER
|
||||
} EVMWorkerAction;
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t code;
|
||||
void * rpcHandle;
|
||||
SVnodeObj *pVnode;
|
||||
EVMWorkerAction action;
|
||||
} SVMWorkerMsg;
|
||||
|
||||
typedef struct {
|
||||
pthread_t thread;
|
||||
int32_t workerId;
|
||||
} SVMWorker;
|
||||
|
||||
typedef struct {
|
||||
int32_t curNum;
|
||||
int32_t maxNum;
|
||||
SVMWorker *worker;
|
||||
} SVMWorkerPool;
|
||||
|
||||
static SVMWorkerPool tsVMWorkerPool;
|
||||
static taos_qset tsVMWorkerQset;
|
||||
static taos_queue tsVMWorkerQueue;
|
||||
|
||||
static void *vnodeMWorkerFunc(void *param);
|
||||
|
||||
static int32_t vnodeStartMWorker() {
|
||||
tsVMWorkerQueue = taosOpenQueue();
|
||||
if (tsVMWorkerQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
|
||||
|
||||
taosAddIntoQset(tsVMWorkerQset, tsVMWorkerQueue, NULL);
|
||||
|
||||
for (int32_t i = tsVMWorkerPool.curNum; i < tsVMWorkerPool.maxNum; ++i) {
|
||||
SVMWorker *pWorker = tsVMWorkerPool.worker + i;
|
||||
pWorker->workerId = i;
|
||||
|
||||
pthread_attr_t thAttr;
|
||||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&pWorker->thread, &thAttr, vnodeMWorkerFunc, pWorker) != 0) {
|
||||
vError("failed to create thread to process vmworker queue, reason:%s", strerror(errno));
|
||||
}
|
||||
|
||||
pthread_attr_destroy(&thAttr);
|
||||
|
||||
tsVMWorkerPool.curNum = i + 1;
|
||||
vDebug("vmworker:%d is launched, total:%d", pWorker->workerId, tsVMWorkerPool.maxNum);
|
||||
}
|
||||
|
||||
vDebug("vmworker queue:%p is allocated", tsVMWorkerQueue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeInitMWorker() {
|
||||
tsVMWorkerQset = taosOpenQset();
|
||||
|
||||
tsVMWorkerPool.maxNum = 1;
|
||||
tsVMWorkerPool.curNum = 0;
|
||||
tsVMWorkerPool.worker = calloc(sizeof(SVMWorker), tsVMWorkerPool.maxNum);
|
||||
|
||||
if (tsVMWorkerPool.worker == NULL) return -1;
|
||||
for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) {
|
||||
SVMWorker *pWorker = tsVMWorkerPool.worker + i;
|
||||
pWorker->workerId = i;
|
||||
vDebug("vmworker:%d is created", i);
|
||||
}
|
||||
|
||||
vDebug("vmworker is initialized, num:%d qset:%p", tsVMWorkerPool.maxNum, tsVMWorkerQset);
|
||||
|
||||
return vnodeStartMWorker();
|
||||
}
|
||||
|
||||
static void vnodeStopMWorker() {
|
||||
vDebug("vmworker queue:%p is freed", tsVMWorkerQueue);
|
||||
taosCloseQueue(tsVMWorkerQueue);
|
||||
tsVMWorkerQueue = NULL;
|
||||
}
|
||||
|
||||
void vnodeCleanupMWorker() {
|
||||
for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) {
|
||||
SVMWorker *pWorker = tsVMWorkerPool.worker + i;
|
||||
if (pWorker->thread) {
|
||||
taosQsetThreadResume(tsVMWorkerQset);
|
||||
}
|
||||
vDebug("vmworker:%d is closed", i);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tsVMWorkerPool.maxNum; ++i) {
|
||||
SVMWorker *pWorker = tsVMWorkerPool.worker + i;
|
||||
vDebug("vmworker:%d start to join", i);
|
||||
if (pWorker->thread) {
|
||||
pthread_join(pWorker->thread, NULL);
|
||||
}
|
||||
vDebug("vmworker:%d join success", i);
|
||||
}
|
||||
|
||||
vDebug("vmworker is closed, qset:%p", tsVMWorkerQset);
|
||||
|
||||
taosCloseQset(tsVMWorkerQset);
|
||||
tsVMWorkerQset = NULL;
|
||||
tfree(tsVMWorkerPool.worker);
|
||||
|
||||
vnodeStopMWorker();
|
||||
}
|
||||
|
||||
static int32_t vnodeWriteIntoMWorker(int32_t vgId, EVMWorkerAction action,void *rpcHandle) {
|
||||
SVMWorkerMsg *pMsg = taosAllocateQitem(sizeof(SVMWorkerMsg));
|
||||
if (pMsg == NULL) return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->pVnode = pVnode;
|
||||
pMsg->rpcHandle = rpcHandle;
|
||||
pMsg->action = action;
|
||||
return taosWriteQitem(tsVMWorkerQueue, TAOS_QTYPE_RPC, pMsg);
|
||||
}
|
||||
|
||||
int32_t vnodeOpenInMWorker(int32_t vgId, void *rpcHandle) {
|
||||
vTrace("vgId:%d, will open in vmworker", vgId);
|
||||
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_CREATE, rpcHandle);
|
||||
}
|
||||
|
||||
int32_t vnodeCleanupInMWorker(int32_t vgId, void *rpcHandle) {
|
||||
vTrace("vgId:%d, will cleanup in vmworker", vgId);
|
||||
return vnodeWriteIntoMWorker(vgId, VNODE_WORKER_ACTION_DELETE, rpcHandle);
|
||||
}
|
||||
|
||||
static void vnodeFreeMWorkerMsg(SVMWorkerMsg *pMsg) {
|
||||
vTrace("vgId:%d, disposed in vmworker", pMsg->vgId);
|
||||
vnodeRelease(pMsg->pVnode);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vnodeSendVMWorkerRpcRsp(SVMWorkerMsg *pMsg) {
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->rpcHandle,
|
||||
.code = pMsg->code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
vnodeFreeMWorkerMsg(pMsg);
|
||||
}
|
||||
|
||||
static void vnodeProcessMWorkerMsg(SVMWorkerMsg *pMsg) {
|
||||
pMsg->code = 0;
|
||||
|
||||
switch (pMsg->action) {
|
||||
case VNODE_WORKER_ACTION_CREATE:
|
||||
pMsg->code = vnodeOpen(pMsg->vgId);
|
||||
break;
|
||||
case VNODE_WORKER_ACTION_DELETE:
|
||||
pMsg->code = vnodeDrop(pMsg->vgId);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static void *vnodeMWorkerFunc(void *param) {
|
||||
while (1) {
|
||||
SVMWorkerMsg *pMsg = NULL;
|
||||
if (taosReadQitemFromQset(tsVMWorkerQset, NULL, (void **)&pMsg, NULL) == 0) {
|
||||
vDebug("qset:%p, vmworker got no message from qset, exiting", tsVMWorkerQset);
|
||||
break;
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, action:%d will be processed in vmworker queue", pMsg->vgId, pMsg->action);
|
||||
vnodeProcessMWorkerMsg(pMsg);
|
||||
vnodeSendVMWorkerRpcRsp(pMsg);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
Loading…
Reference in New Issue