TD-2165 add vnode cancel worker to process query msg
This commit is contained in:
parent
f61f005eeb
commit
8b8c79487a
|
@ -0,0 +1,33 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TDENGINE_VNODE_CANCEL_H
|
||||||
|
#define TDENGINE_VNODE_CANCEL_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
#include "vnode.h"
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
|
int32_t vnodeInitCWorker();
|
||||||
|
void vnodeCleanupCWorker();
|
||||||
|
int32_t vnodeWriteIntoCQueue(SVReadMsg *pRead);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,166 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "os.h"
|
||||||
|
#include "taoserror.h"
|
||||||
|
#include "taosmsg.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "tqueue.h"
|
||||||
|
#include "dnode.h"
|
||||||
|
#include "tsdb.h"
|
||||||
|
#include "vnodeCancel.h"
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
pthread_t thread;
|
||||||
|
int32_t workerId;
|
||||||
|
} SVCWorker;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t curNum;
|
||||||
|
int32_t maxNum;
|
||||||
|
SVCWorker *worker;
|
||||||
|
} SVCWorkerPool;
|
||||||
|
|
||||||
|
static SVCWorkerPool tsVCWorkerPool;
|
||||||
|
static taos_qset tsVCWorkerQset;
|
||||||
|
static taos_queue tsVCWorkerQueue;
|
||||||
|
|
||||||
|
static void *vnodeCWorkerFunc(void *param);
|
||||||
|
|
||||||
|
static int32_t vnodeStartCWorker() {
|
||||||
|
tsVCWorkerQueue = taosOpenQueue();
|
||||||
|
if (tsVCWorkerQueue == NULL) return TSDB_CODE_DND_OUT_OF_MEMORY;
|
||||||
|
|
||||||
|
taosAddIntoQset(tsVCWorkerQset, tsVCWorkerQueue, NULL);
|
||||||
|
|
||||||
|
for (int32_t i = tsVCWorkerPool.curNum; i < tsVCWorkerPool.maxNum; ++i) {
|
||||||
|
SVCWorker *pWorker = tsVCWorkerPool.worker + i;
|
||||||
|
pWorker->workerId = i;
|
||||||
|
|
||||||
|
pthread_attr_t thAttr;
|
||||||
|
pthread_attr_init(&thAttr);
|
||||||
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
|
if (pthread_create(&pWorker->thread, &thAttr, vnodeCWorkerFunc, pWorker) != 0) {
|
||||||
|
vError("failed to create thread to process vcworker queue, reason:%s", strerror(errno));
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_attr_destroy(&thAttr);
|
||||||
|
|
||||||
|
tsVCWorkerPool.curNum = i + 1;
|
||||||
|
vDebug("vcworker:%d is launched, total:%d", pWorker->workerId, tsVCWorkerPool.maxNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
vDebug("vcworker queue:%p is allocated", tsVCWorkerQueue);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeInitCWorker() {
|
||||||
|
tsVCWorkerQset = taosOpenQset();
|
||||||
|
|
||||||
|
tsVCWorkerPool.maxNum = 1;
|
||||||
|
tsVCWorkerPool.curNum = 0;
|
||||||
|
tsVCWorkerPool.worker = calloc(sizeof(SVCWorker), tsVCWorkerPool.maxNum);
|
||||||
|
|
||||||
|
if (tsVCWorkerPool.worker == NULL) return -1;
|
||||||
|
for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) {
|
||||||
|
SVCWorker *pWorker = tsVCWorkerPool.worker + i;
|
||||||
|
pWorker->workerId = i;
|
||||||
|
vDebug("vcworker:%d is created", i);
|
||||||
|
}
|
||||||
|
|
||||||
|
vDebug("vcworker is initialized, num:%d qset:%p", tsVCWorkerPool.maxNum, tsVCWorkerQset);
|
||||||
|
|
||||||
|
return vnodeStartCWorker();
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vnodeStopCWorker() {
|
||||||
|
vDebug("vcworker queue:%p is freed", tsVCWorkerQueue);
|
||||||
|
taosCloseQueue(tsVCWorkerQueue);
|
||||||
|
tsVCWorkerQueue = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeCleanupCWorker() {
|
||||||
|
for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) {
|
||||||
|
SVCWorker *pWorker = tsVCWorkerPool.worker + i;
|
||||||
|
if (pWorker->thread) {
|
||||||
|
taosQsetThreadResume(tsVCWorkerQset);
|
||||||
|
}
|
||||||
|
vDebug("vcworker:%d is closed", i);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < tsVCWorkerPool.maxNum; ++i) {
|
||||||
|
SVCWorker *pWorker = tsVCWorkerPool.worker + i;
|
||||||
|
vDebug("vcworker:%d start to join", i);
|
||||||
|
if (pWorker->thread) {
|
||||||
|
pthread_join(pWorker->thread, NULL);
|
||||||
|
}
|
||||||
|
vDebug("vcworker:%d join success", i);
|
||||||
|
}
|
||||||
|
|
||||||
|
vDebug("vcworker is closed, qset:%p", tsVCWorkerQset);
|
||||||
|
|
||||||
|
taosCloseQset(tsVCWorkerQset);
|
||||||
|
tsVCWorkerQset = NULL;
|
||||||
|
tfree(tsVCWorkerPool.worker);
|
||||||
|
|
||||||
|
vnodeStopCWorker();
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t vnodeWriteIntoCQueue(SVReadMsg *pRead) {
|
||||||
|
vTrace("msg:%p, write into vcqueue", pRead);
|
||||||
|
return taosWriteQitem(tsVCWorkerQueue, pRead->qtype, pRead);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vnodeFreeFromCQueue(SVReadMsg *pRead) {
|
||||||
|
vTrace("msg:%p, free from vcqueue", pRead);
|
||||||
|
taosFreeQitem(pRead);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void vnodeSendVCancelRpcRsp(SVReadMsg *pRead, int32_t code) {
|
||||||
|
SRpcMsg rpcRsp = {
|
||||||
|
.handle = pRead->rpcHandle,
|
||||||
|
.pCont = pRead->rspRet.rsp,
|
||||||
|
.contLen = pRead->rspRet.len,
|
||||||
|
.code = code,
|
||||||
|
};
|
||||||
|
|
||||||
|
rpcSendResponse(&rpcRsp);
|
||||||
|
vnodeFreeFromCQueue(pRead);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *vnodeCWorkerFunc(void *param) {
|
||||||
|
int32_t qtype;
|
||||||
|
SVReadMsg *pRead;
|
||||||
|
SVnodeObj *pVnode;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (taosReadQitemFromQset(tsVCWorkerQset, &qtype, (void **)&pRead, (void **)&pVnode) == 0) {
|
||||||
|
vDebug("qset:%p, vcworker got no message from qset, exiting", tsVCWorkerQset);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
vTrace("msg:%p will be processed in vcworker queue", pRead);
|
||||||
|
|
||||||
|
assert(qtype == TAOS_QTYPE_RPC);
|
||||||
|
assert(pVnode == NULL);
|
||||||
|
|
||||||
|
int32_t code = vnodeProcessRead(NULL, pRead);
|
||||||
|
vnodeSendVCancelRpcRsp(pRead, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
|
@ -27,6 +27,7 @@
|
||||||
#include "dnode.h"
|
#include "dnode.h"
|
||||||
#include "vnodeCfg.h"
|
#include "vnodeCfg.h"
|
||||||
#include "vnodeVersion.h"
|
#include "vnodeVersion.h"
|
||||||
|
#include "vnodeCancel.h"
|
||||||
|
|
||||||
static SHashObj*tsVnodesHash;
|
static SHashObj*tsVnodesHash;
|
||||||
static void vnodeCleanUp(SVnodeObj *pVnode);
|
static void vnodeCleanUp(SVnodeObj *pVnode);
|
||||||
|
@ -63,6 +64,7 @@ int32_t vnodeInitResources() {
|
||||||
|
|
||||||
vnodeInitWriteFp();
|
vnodeInitWriteFp();
|
||||||
vnodeInitReadFp();
|
vnodeInitReadFp();
|
||||||
|
vnodeInitCWorker();
|
||||||
|
|
||||||
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
if (tsVnodesHash == NULL) {
|
if (tsVnodesHash == NULL) {
|
||||||
|
@ -79,6 +81,7 @@ int32_t vnodeInitResources() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeCleanupResources() {
|
void vnodeCleanupResources() {
|
||||||
|
vnodeCleanupCWorker();
|
||||||
tsdbDestroyCommitQueue();
|
tsdbDestroyCommitQueue();
|
||||||
|
|
||||||
if (tsVnodesHash != NULL) {
|
if (tsVnodesHash != NULL) {
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
#include "tqueue.h"
|
#include "tqueue.h"
|
||||||
|
#include "vnodeCancel.h"
|
||||||
|
|
||||||
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
|
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
||||||
|
@ -116,12 +117,14 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt
|
||||||
|
|
||||||
pRead->qtype = qtype;
|
pRead->qtype = qtype;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
if (pRead->msgType == TSDB_MSG_TYPE_CM_KILL_QUERY) {
|
||||||
atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
|
return vnodeWriteIntoCQueue(pRead);
|
||||||
vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
|
} else {
|
||||||
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
taosWriteQitem(pVnode->rqueue, qtype, pRead);
|
atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
vTrace("vgId:%d, write into vrqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg);
|
||||||
|
return taosWriteQitem(pVnode->rqueue, qtype, pRead);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
|
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
|
||||||
|
|
Loading…
Reference in New Issue