rename some functions

This commit is contained in:
Shengliang Guan 2020-11-05 08:09:51 +00:00
parent 6874de8ebe
commit 247f069a0a
10 changed files with 119 additions and 148 deletions

View File

@ -20,9 +20,11 @@
extern "C" { extern "C" {
#endif #endif
int32_t dnodeInitVnodeRead(); int32_t dnodeInitVRead();
void dnodeCleanupVnodeRead(); void dnodeCleanupVRead();
void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg); void dnodeDispatchToVReadQueue(SRpcMsg *pMsg);
void * dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -61,7 +61,7 @@ static const SDnodeComponent tsDnodeComponents[] = {
{"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos}, {"mnodeinfos",dnodeInitMInfos, dnodeCleanupMInfos},
{"wal", walInit, walCleanUp}, {"wal", walInit, walCleanUp},
{"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!! {"check", dnodeInitCheck, dnodeCleanupCheck}, // NOTES: dnodeInitCheck must be behind the dnodeinitStorage component !!!
{"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, {"vread", dnodeInitVRead, dnodeCleanupVRead},
{"vwrite", dnodeInitVWrite, dnodeCleanupVWrite}, {"vwrite", dnodeInitVWrite, dnodeCleanupVWrite},
{"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead},
{"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite},

View File

@ -39,8 +39,8 @@ static int32_t tsDnodeSubmitReqNum = 0;
int32_t dnodeInitShell() { int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_SUBMIT] = dnodeDispatchToVWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_QUERY] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVnodeReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_FETCH] = dnodeDispatchToVReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = dnodeDispatchToVWriteQueue;
// the following message shall be treated as mnode write // the following message shall be treated as mnode write

View File

@ -17,77 +17,72 @@
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tutil.h"
#include "tqueue.h"
#include "twal.h"
#include "tglobal.h" #include "tglobal.h"
#include "dnodeInt.h" #include "tqueue.h"
#include "dnodeMgmt.h"
#include "dnodeVRead.h"
#include "vnode.h" #include "vnode.h"
#include "dnodeInt.h"
typedef struct { typedef struct {
pthread_t thread; // thread pthread_t thread; // thread
int32_t workerId; // worker ID int32_t workerId; // worker ID
} SReadWorker; } SVReadWorker;
typedef struct { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t min; // min number of workers int32_t min; // min number of workers
int32_t num; // current number of workers int32_t num; // current number of workers
SReadWorker *readWorker; SVReadWorker * worker;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SReadWorkerPool; } SVReadWorkerPool;
static void *dnodeProcessReadQueue(void *param); static void *dnodeProcessReadQueue(void *param);
static void dnodeHandleIdleReadWorker(SReadWorker *);
// module global variable // module global variable
static SReadWorkerPool readPool; static SVReadWorkerPool tsVReadWP;
static taos_qset readQset; static taos_qset tsVReadQset;
int32_t dnodeInitVnodeRead() { int32_t dnodeInitVRead() {
readQset = taosOpenQset(); tsVReadQset = taosOpenQset();
readPool.min = tsNumOfCores; tsVReadWP.min = tsNumOfCores;
readPool.max = tsNumOfCores * tsNumOfThreadsPerCore; tsVReadWP.max = tsNumOfCores * tsNumOfThreadsPerCore;
if (readPool.max <= readPool.min * 2) readPool.max = 2 * readPool.min; if (tsVReadWP.max <= tsVReadWP.min * 2) tsVReadWP.max = 2 * tsVReadWP.min;
readPool.readWorker = (SReadWorker *)calloc(sizeof(SReadWorker), readPool.max); tsVReadWP.worker = (SVReadWorker *)calloc(sizeof(SVReadWorker), tsVReadWP.max);
pthread_mutex_init(&readPool.mutex, NULL); pthread_mutex_init(&tsVReadWP.mutex, NULL);
if (readPool.readWorker == NULL) return -1; if (tsVReadWP.worker == NULL) return -1;
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
pWorker->workerId = i; pWorker->workerId = i;
} }
dInfo("dnode read is initialized, min worker:%d max worker:%d", readPool.min, readPool.max); dInfo("dnode vread is initialized, min worker:%d max worker:%d", tsVReadWP.min, tsVReadWP.max);
return 0; return 0;
} }
void dnodeCleanupVnodeRead() { void dnodeCleanupVRead() {
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(readQset); taosQsetThreadResume(tsVReadQset);
} }
} }
for (int i = 0; i < readPool.max; ++i) { for (int i = 0; i < tsVReadWP.max; ++i) {
SReadWorker *pWorker = readPool.readWorker + i; SVReadWorker *pWorker = tsVReadWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
} }
} }
free(readPool.readWorker); free(tsVReadWP.worker);
taosCloseQset(readQset); taosCloseQset(tsVReadQset);
pthread_mutex_destroy(&readPool.mutex); pthread_mutex_destroy(&tsVReadWP.mutex);
dInfo("dnode read is closed"); dInfo("dnode vread is closed");
} }
void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char * pCont = (char *)pMsg->pCont; char * pCont = (char *)pMsg->pCont;
@ -106,7 +101,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SVReadMsg *pRead = taosAllocateQitem(sizeof(SVReadMsg));
pRead->rpcMsg = *pMsg; pRead->rpcMsg = *pMsg;
pRead->pCont = pCont; pRead->pCont = pCont;
pRead->contLen = pHead->contLen; pRead->contLen = pHead->contLen;
@ -120,60 +115,52 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
} }
if (queuedMsgNum == 0) { if (queuedMsgNum == 0) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
.handle = pMsg->handle,
.pCont = NULL,
.contLen = 0,
.code = TSDB_CODE_VND_INVALID_VGROUP_ID,
.msgType = 0
};
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
} }
} }
void *dnodeAllocVReadQueue(void *pVnode) { void *dnodeAllocVReadQueue(void *pVnode) {
pthread_mutex_lock(&readPool.mutex); pthread_mutex_lock(&tsVReadWP.mutex);
taos_queue queue = taosOpenQueue(); taos_queue queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&readPool.mutex); pthread_mutex_unlock(&tsVReadWP.mutex);
return NULL; return NULL;
} }
taosAddIntoQset(readQset, queue, pVnode); taosAddIntoQset(tsVReadQset, queue, pVnode);
// spawn a thread to process queue // spawn a thread to process queue
if (readPool.num < readPool.max) { if (tsVReadWP.num < tsVReadWP.max) {
do { do {
SReadWorker *pWorker = readPool.readWorker + readPool.num; SVReadWorker *pWorker = tsVReadWP.worker + tsVReadWP.num;
pthread_attr_t thAttr; pthread_attr_t thAttr;
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessReadQueue, pWorker) != 0) {
dError("failed to create thread to process read queue, reason:%s", strerror(errno)); dError("failed to create thread to process vread vqueue since %s", strerror(errno));
} }
pthread_attr_destroy(&thAttr); pthread_attr_destroy(&thAttr);
readPool.num++; tsVReadWP.num++;
dDebug("read worker:%d is launched, total:%d", pWorker->workerId, readPool.num); dDebug("dnode vread worker:%d is launched, total:%d", pWorker->workerId, tsVReadWP.num);
} while (readPool.num < readPool.min); } while (tsVReadWP.num < tsVReadWP.min);
} }
pthread_mutex_unlock(&readPool.mutex); pthread_mutex_unlock(&tsVReadWP.mutex);
dDebug("pVnode:%p, read queue:%p is allocated", pVnode, queue); dDebug("pVnode:%p, dnode vread queue:%p is allocated", pVnode, queue);
return queue; return queue;
} }
void dnodeFreeVReadQueue(void *rqueue) { void dnodeFreeVReadQueue(void *rqueue) {
taosCloseQueue(rqueue); taosCloseQueue(rqueue);
// dynamically adjust the number of threads
} }
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle, .handle = pRead->rpcMsg.handle,
.pCont = pRead->rspRet.rsp, .pCont = pRead->rspRet.rsp,
@ -186,32 +173,32 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont); rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
SReadMsg *pReadMsg; SVReadMsg *pReadMsg;
int type; int32_t qtype;
void * pVnode; void * pVnode;
while (1) { while (1) {
if (taosReadQitemFromQset(readQset, &type, (void **)&pReadMsg, &pVnode) == 0) { if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pReadMsg, &pVnode) == 0) {
dDebug("qset:%p dnode read got no message from qset, exiting", readQset); dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset);
break; break;
} }
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle, dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type, pReadMsg); taosMsg[pReadMsg->rpcMsg.msgType], qtype, pReadMsg);
int32_t code = vnodeProcessRead(pVnode, pReadMsg); int32_t code = vnodeProcessRead(pVnode, pReadMsg);
if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, code); dnodeSendRpcVReadRsp(pVnode, pReadMsg, code);
} else { } else {
if (code == TSDB_CODE_QRY_HAS_RSP) { if (code == TSDB_CODE_QRY_HAS_RSP) {
dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code); dnodeSendRpcVReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
} else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client } else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5)); assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5));
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code); dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
@ -223,19 +210,3 @@ static void *dnodeProcessReadQueue(void *param) {
return NULL; return NULL;
} }
UNUSED_FUNC
static void dnodeHandleIdleReadWorker(SReadWorker *pWorker) {
int32_t num = taosGetQueueNumber(readQset);
if (num == 0 || (num <= readPool.min && readPool.num > readPool.min)) {
readPool.num--;
dDebug("read worker:%d is released, total:%d", pWorker->workerId, readPool.num);
pthread_exit(NULL);
} else {
usleep(30000);
sched_yield();
}
}

View File

@ -15,13 +15,12 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "tsdb.h"
#include "twal.h" #include "twal.h"
#include "tsync.h"
#include "vnode.h" #include "vnode.h"
#include "syncInt.h"
#include "dnodeInt.h" #include "dnodeInt.h"
typedef struct { typedef struct {
@ -29,22 +28,21 @@ typedef struct {
taos_qset qset; // queue set taos_qset qset; // queue set
int32_t workerId; // worker ID int32_t workerId; // worker ID
pthread_t thread; // thread pthread_t thread; // thread
} SWriteWorker; } SVWriteWorker;
typedef struct { typedef struct {
int32_t max; // max number of workers int32_t max; // max number of workers
int32_t nextId; // from 0 to max-1, cyclic int32_t nextId; // from 0 to max-1, cyclic
SWriteWorker *worker; SVWriteWorker * worker;
pthread_mutex_t mutex; pthread_mutex_t mutex;
} SWriteWorkerPool; } SVWriteWorkerPool;
static SWriteWorkerPool tsVWriteWP; static SVWriteWorkerPool tsVWriteWP;
static void *dnodeProcessWriteQueue(void *param); static void *dnodeProcessVWriteQueue(void *param);
int32_t dnodeInitVWrite() { int32_t dnodeInitVWrite() {
tsVWriteWP.max = tsNumOfCores; tsVWriteWP.max = tsNumOfCores;
tsVWriteWP.worker = (SWriteWorker *)tcalloc(sizeof(SWriteWorker), tsVWriteWP.max); tsVWriteWP.worker = (SVWriteWorker *)tcalloc(sizeof(SVWriteWorker), tsVWriteWP.max);
if (tsVWriteWP.worker == NULL) return -1; if (tsVWriteWP.worker == NULL) return -1;
pthread_mutex_init(&tsVWriteWP.mutex, NULL); pthread_mutex_init(&tsVWriteWP.mutex, NULL);
@ -58,14 +56,14 @@ int32_t dnodeInitVWrite() {
void dnodeCleanupVWrite() { void dnodeCleanupVWrite() {
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
taosQsetThreadResume(pWorker->qset); taosQsetThreadResume(pWorker->qset);
} }
} }
for (int32_t i = 0; i < tsVWriteWP.max; ++i) { for (int32_t i = 0; i < tsVWriteWP.max; ++i) {
SWriteWorker *pWorker = tsVWriteWP.worker + i; SVWriteWorker *pWorker = tsVWriteWP.worker + i;
if (pWorker->thread) { if (pWorker->thread) {
pthread_join(pWorker->thread, NULL); pthread_join(pWorker->thread, NULL);
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
@ -100,7 +98,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
pHead->msgType = pRpcMsg->msgType; pHead->msgType = pRpcMsg->msgType;
pHead->version = 0; pHead->version = 0;
pHead->len = pMsg->contLen; pHead->len = pMsg->contLen;
code = vnodeWriteToQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg); code = vnodeWriteToWQueue(pVnode, pHead, TAOS_QTYPE_RPC, pRpcMsg);
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -114,7 +112,7 @@ void dnodeDispatchToVWriteQueue(SRpcMsg *pRpcMsg) {
void *dnodeAllocVWriteQueue(void *pVnode) { void *dnodeAllocVWriteQueue(void *pVnode) {
pthread_mutex_lock(&tsVWriteWP.mutex); pthread_mutex_lock(&tsVWriteWP.mutex);
SWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId; SVWriteWorker *pWorker = tsVWriteWP.worker + tsVWriteWP.nextId;
void *queue = taosOpenQueue(); void *queue = taosOpenQueue();
if (queue == NULL) { if (queue == NULL) {
pthread_mutex_unlock(&tsVWriteWP.mutex); pthread_mutex_unlock(&tsVWriteWP.mutex);
@ -141,7 +139,7 @@ void *dnodeAllocVWriteQueue(void *pVnode) {
pthread_attr_init(&thAttr); pthread_attr_init(&thAttr);
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessVWriteQueue, pWorker) != 0) {
dError("failed to create thread to process vwrite queue since %s", strerror(errno)); dError("failed to create thread to process vwrite queue since %s", strerror(errno));
taosFreeQall(pWorker->qall); taosFreeQall(pWorker->qall);
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
@ -190,8 +188,8 @@ void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
} }
static void *dnodeProcessWriteQueue(void *param) { static void *dnodeProcessVWriteQueue(void *param) {
SWriteWorker *pWorker = param; SVWriteWorker *pWorker = param;
SVWriteMsg * pWrite; SVWriteMsg * pWrite;
void * pVnode; void * pVnode;
int32_t numOfMsgs; int32_t numOfMsgs;

View File

@ -55,9 +55,9 @@ void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t tid);
void *dnodeAllocVWriteQueue(void *pVnode); void *dnodeAllocVWriteQueue(void *pVnode);
void dnodeFreeVWriteQueue(void *wqueue); void dnodeFreeVWriteQueue(void *wqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
void *dnodeAllocVReadQueue(void *pVnode); void *dnodeAllocVReadQueue(void *pVnode);
void dnodeFreeVReadQueue(void *rqueue); void dnodeFreeVReadQueue(void *rqueue);
void dnodeSendRpcVWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue(); int32_t dnodeAllocateMnodePqueue();
void dnodeFreeMnodePqueue(); void dnodeFreeMnodePqueue();

View File

@ -41,7 +41,7 @@ typedef struct {
void * pCont; void * pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
} SReadMsg; } SVReadMsg;
typedef struct { typedef struct {
int32_t code; int32_t code;
@ -66,7 +66,7 @@ void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void vnodeRelease(void *pVnode); // dec refCount void vnodeRelease(void *pVnode); // dec refCount
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam); int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeCheckWrite(void *pVnode); int32_t vnodeCheckWrite(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes); int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
@ -77,7 +77,7 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources(); int32_t vnodeInitResources();
void vnodeCleanupResources(); void vnodeCleanupResources();
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pReadMsg);
int32_t vnodeCheckRead(void *pVnode); int32_t vnodeCheckRead(void *pVnode);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -266,7 +266,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
strcpy(cqCfg.pass, tsInternalPass); strcpy(cqCfg.pass, tsInternalPass);
strcpy(cqCfg.db, pVnode->db); strcpy(cqCfg.db, pVnode->db);
cqCfg.vgId = vnode; cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToQueue; cqCfg.cqWrite = vnodeWriteToWQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg); pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) { if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
@ -320,7 +320,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.ahandle = pVnode; syncInfo.ahandle = pVnode;
syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue; syncInfo.writeToCache = vnodeWriteToWQueue;
syncInfo.confirmForward = dnodeSendRpcVWriteRsp; syncInfo.confirmForward = dnodeSendRpcVWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFlowCtrl = vnodeCtrlFlow; syncInfo.notifyFlowCtrl = vnodeCtrlFlow;

View File

@ -29,9 +29,9 @@
#include "vnodeInt.h" #include "vnodeInt.h"
#include "tqueue.h" #include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
void vnodeInitReadFp(void) { void vnodeInitReadFp(void) {
@ -44,7 +44,7 @@ void vnodeInitReadFp(void) {
// still required, or there will be a deadlock, so we dont do any check here, but put the check codes before the // still required, or there will be a deadlock, so we dont do any check here, but put the check codes before the
// request enters the queue // request enters the queue
// //
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) {
SVnodeObj *pVnode = (SVnodeObj *)param; SVnodeObj *pVnode = (SVnodeObj *)param;
int msgType = pReadMsg->rpcMsg.msgType; int msgType = pReadMsg->rpcMsg.msgType;
@ -82,7 +82,7 @@ static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void
int32_t code = vnodeCheckRead(pVnode); int32_t code = vnodeCheckRead(pVnode);
if (code != TSDB_CODE_SUCCESS) return code; if (code != TSDB_CODE_SUCCESS) return code;
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SVReadMsg *pRead = (SVReadMsg *)taosAllocateQitem(sizeof(SVReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle; pRead->pCont = qhandle;
pRead->contLen = 0; pRead->contLen = 0;
@ -146,7 +146,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRsp->completed = true; pRsp->completed = true;
} }
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont; void * pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen; int32_t contLen = pReadMsg->contLen;
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;
@ -274,7 +274,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
return code; return code;
} }
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
void * pCont = pReadMsg->pCont; void * pCont = pReadMsg->pCont;
SRspRet *pRet = &pReadMsg->rspRet; SRspRet *pRet = &pReadMsg->rspRet;

View File

@ -205,7 +205,7 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
SWalHead * pHead = wparam; SWalHead * pHead = wparam;