Merge branch '3.0' into 3.0_query_integrate
This commit is contained in:
commit
da1523f1f5
|
@ -25,23 +25,25 @@ extern "C" {
|
|||
typedef struct SRpcMsg SRpcMsg;
|
||||
typedef struct SEpSet SEpSet;
|
||||
typedef struct SMgmtWrapper SMgmtWrapper;
|
||||
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
|
||||
|
||||
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
||||
typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype);
|
||||
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
|
||||
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
||||
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
|
||||
|
||||
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
|
||||
|
||||
typedef struct {
|
||||
SMgmtWrapper* pWrapper;
|
||||
PutToQueueFp queueFps[QUEUE_MAX];
|
||||
GetQueueSizeFp qsizeFp;
|
||||
SendReqFp sendReqFp;
|
||||
SendMnodeReqFp sendMnodeReqFp;
|
||||
SendRspFp sendRspFp;
|
||||
} SMsgCb;
|
||||
|
||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype);
|
||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
|
||||
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
|
||||
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
|
||||
|
|
|
@ -42,8 +42,14 @@ shall be used to set up the protection.
|
|||
typedef struct STaosQueue STaosQueue;
|
||||
typedef struct STaosQset STaosQset;
|
||||
typedef struct STaosQall STaosQall;
|
||||
typedef void (*FItem)(void *ahandle, void *pItem);
|
||||
typedef void (*FItems)(void *ahandle, STaosQall *qall, int32_t numOfItems);
|
||||
typedef struct {
|
||||
void *ahandle;
|
||||
int32_t workerId;
|
||||
int32_t threadNum;
|
||||
} SQueueInfo;
|
||||
|
||||
typedef void (*FItem)(SQueueInfo *pInfo, void *pItem);
|
||||
typedef void (*FItems)(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfItems);
|
||||
|
||||
STaosQueue *taosOpenQueue();
|
||||
void taosCloseQueue(STaosQueue *queue);
|
||||
|
|
|
@ -50,7 +50,8 @@ typedef struct SWWorker {
|
|||
} SWWorker;
|
||||
|
||||
typedef struct SWWorkerPool {
|
||||
int32_t max; // max number of workers
|
||||
int32_t max; // max number of workers
|
||||
int32_t num;
|
||||
int32_t nextId; // from 0 to max-1, cyclic
|
||||
const char *name;
|
||||
SWWorker *workers;
|
||||
|
@ -73,31 +74,31 @@ typedef struct {
|
|||
int32_t maxNum;
|
||||
FItem fp;
|
||||
void *param;
|
||||
} SQWorkerAllCfg;
|
||||
} SSingleWorkerCfg;
|
||||
|
||||
typedef struct {
|
||||
const char *name;
|
||||
STaosQueue *queue;
|
||||
SQWorkerPool pool;
|
||||
} SQWorkerAll;
|
||||
} SSingleWorker;
|
||||
|
||||
typedef struct {
|
||||
const char *name;
|
||||
int32_t maxNum;
|
||||
FItems fp;
|
||||
void *param;
|
||||
} SWWorkerAllCfg;
|
||||
} SMultiWorkerCfg;
|
||||
|
||||
typedef struct {
|
||||
const char *name;
|
||||
STaosQueue *queue;
|
||||
SWWorkerPool pool;
|
||||
} SWWorkerAll;
|
||||
} SMultiWorker;
|
||||
|
||||
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg);
|
||||
void tQWorkerAllCleanup(SQWorkerAll *pWorker);
|
||||
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg);
|
||||
void tWWorkerAllCleanup(SWWorkerAll *pWorker);
|
||||
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg);
|
||||
void tSingleWorkerCleanup(SSingleWorker *pWorker);
|
||||
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg);
|
||||
void tMultiWorkerCleanup(SMultiWorker *pWorker);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -20,6 +20,10 @@ int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
|
|||
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
|
||||
}
|
||||
|
||||
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
|
||||
return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype);
|
||||
}
|
||||
|
||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) {
|
||||
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ typedef struct SBnodeMgmt {
|
|||
SDnode *pDnode;
|
||||
SMgmtWrapper *pWrapper;
|
||||
const char *path;
|
||||
SDnodeWorker writeWorker;
|
||||
SMultiWorker writeWorker;
|
||||
} SBnodeMgmt;
|
||||
|
||||
// bmInt.c
|
||||
|
|
|
@ -33,7 +33,8 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num
|
|||
}
|
||||
}
|
||||
|
||||
static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) {
|
||||
static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SBnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
SMgmtWrapper *pWrapper = pMgmt->pWrapper;
|
||||
|
||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
||||
|
@ -63,14 +64,15 @@ static void bmProcessQueue(SBnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs
|
|||
}
|
||||
|
||||
int32_t bmProcessWriteMsg(SBnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
SDnodeWorker *pWorker = &pMgmt->writeWorker;
|
||||
SMultiWorker *pWorker = &pMgmt->writeWorker;
|
||||
|
||||
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
|
||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
|
||||
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, bmProcessQueue) != 0) {
|
||||
SMultiWorkerCfg cfg = {.maxNum = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt};
|
||||
if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
|
||||
dError("failed to start bnode write worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -78,4 +80,4 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void bmStopWorker(SBnodeMgmt *pMgmt) { dndCleanupWorker(&pMgmt->writeWorker); }
|
||||
void bmStopWorker(SBnodeMgmt *pMgmt) { tMultiWorkerCleanup(&pMgmt->writeWorker); }
|
||||
|
|
|
@ -74,20 +74,6 @@ typedef int32_t (*CreateNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
|||
typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||
typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required);
|
||||
|
||||
typedef struct {
|
||||
EWorkerType type;
|
||||
const char *name;
|
||||
int32_t minNum;
|
||||
int32_t maxNum;
|
||||
void *queueFp;
|
||||
void *param;
|
||||
STaosQueue *queue;
|
||||
union {
|
||||
SQWorkerPool pool;
|
||||
SWWorkerPool mpool;
|
||||
};
|
||||
} SDnodeWorker;
|
||||
|
||||
typedef struct SMsgHandle {
|
||||
int32_t vgId;
|
||||
NodeMsgFp vgIdMsgFp;
|
||||
|
@ -161,11 +147,6 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
|||
int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||
void dndSendRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
|
||||
|
||||
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
|
||||
int32_t maxNum, void *queueFp);
|
||||
void dndCleanupWorker(SDnodeWorker *pWorker);
|
||||
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg);
|
||||
|
||||
int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg);
|
||||
|
||||
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed);
|
||||
|
|
|
@ -1,96 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http:www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "dndInt.h"
|
||||
|
||||
int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum,
|
||||
int32_t maxNum, void *queueFp) {
|
||||
if (pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pWorker->type = type;
|
||||
pWorker->name = name;
|
||||
pWorker->minNum = minNum;
|
||||
pWorker->maxNum = maxNum;
|
||||
pWorker->queueFp = queueFp;
|
||||
pWorker->param = param;
|
||||
|
||||
if (pWorker->type == DND_WORKER_SINGLE) {
|
||||
SQWorkerPool *pPool = &pWorker->pool;
|
||||
pPool->name = name;
|
||||
pPool->min = minNum;
|
||||
pPool->max = maxNum;
|
||||
if (tQWorkerInit(pPool) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pWorker->queue = tQWorkerAllocQueue(pPool, param, (FItem)queueFp);
|
||||
if (pWorker->queue == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else if (pWorker->type == DND_WORKER_MULTI) {
|
||||
SWWorkerPool *pPool = &pWorker->mpool;
|
||||
pPool->name = name;
|
||||
pPool->max = maxNum;
|
||||
if (tWWorkerInit(pPool) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pWorker->queue = tWWorkerAllocQueue(pPool, param, (FItems)queueFp);
|
||||
if (pWorker->queue == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void dndCleanupWorker(SDnodeWorker *pWorker) {
|
||||
if (pWorker->queue == NULL) return;
|
||||
|
||||
while (!taosQueueEmpty(pWorker->queue)) {
|
||||
taosMsleep(10);
|
||||
}
|
||||
|
||||
if (pWorker->type == DND_WORKER_SINGLE) {
|
||||
tQWorkerCleanup(&pWorker->pool);
|
||||
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
|
||||
} else if (pWorker->type == DND_WORKER_MULTI) {
|
||||
tWWorkerCleanup(&pWorker->mpool);
|
||||
tWWorkerFreeQueue(&pWorker->mpool, pWorker->queue);
|
||||
} else {
|
||||
}
|
||||
}
|
||||
|
||||
int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pMsg) {
|
||||
if (pWorker == NULL || pWorker->queue == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosWriteQitem(pWorker->queue, pMsg) != 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -29,10 +29,10 @@ typedef struct SDnodeMgmt {
|
|||
SEpSet mnodeEpSet;
|
||||
SHashObj *dnodeHash;
|
||||
SArray *dnodeEps;
|
||||
TdThread *threadId;
|
||||
TdThread *threadId;
|
||||
SRWLatch latch;
|
||||
SDnodeWorker mgmtWorker;
|
||||
SDnodeWorker statusWorker;
|
||||
SSingleWorker mgmtWorker;
|
||||
SSingleWorker statusWorker;
|
||||
const char *path;
|
||||
SDnode *pDnode;
|
||||
SMgmtWrapper *pWrapper;
|
||||
|
|
|
@ -52,10 +52,12 @@ static void *dmThreadRoutine(void *param) {
|
|||
}
|
||||
}
|
||||
|
||||
static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
SDnode *pDnode = pMgmt->pDnode;
|
||||
static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SDnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
SDnode *pDnode = pMgmt->pDnode;
|
||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||
int32_t code = -1;
|
||||
int32_t code = -1;
|
||||
dTrace("msg:%p, will be processed in dnode queue", pMsg);
|
||||
|
||||
switch (pRpc->msgType) {
|
||||
|
@ -98,13 +100,17 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
|
||||
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessQueue) != 0) {
|
||||
SSingleWorkerCfg mgmtCfg = {
|
||||
.minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
|
||||
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) {
|
||||
dError("failed to start dnode mgmt worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dndInitWorker(pMgmt, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dmProcessQueue) != 0) {
|
||||
dError("failed to start dnode mgmt worker since %s", terrstr());
|
||||
SSingleWorkerCfg statusCfg = {
|
||||
.minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
|
||||
if (tSingleWorkerInit(&pMgmt->statusWorker, &statusCfg) != 0) {
|
||||
dError("failed to start dnode status worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -123,8 +129,8 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
void dmStopWorker(SDnodeMgmt *pMgmt) {
|
||||
dndCleanupWorker(&pMgmt->mgmtWorker);
|
||||
dndCleanupWorker(&pMgmt->statusWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->statusWorker);
|
||||
|
||||
if (pMgmt->threadId != NULL) {
|
||||
taosDestoryThread(pMgmt->threadId);
|
||||
|
@ -133,11 +139,11 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
int32_t dmProcessMgmtMsg(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
|
||||
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
|
||||
if (pMsg->rpcMsg.msgType == TDMT_MND_STATUS_RSP) {
|
||||
pWorker = &pMgmt->statusWorker;
|
||||
}
|
||||
|
||||
dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name);
|
||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
|
|
@ -28,9 +28,9 @@ typedef struct SMnodeMgmt {
|
|||
SDnode *pDnode;
|
||||
SMgmtWrapper *pWrapper;
|
||||
const char *path;
|
||||
SQWorkerAll readWorker;
|
||||
SQWorkerAll writeWorker;
|
||||
SQWorkerAll syncWorker;
|
||||
SSingleWorker readWorker;
|
||||
SSingleWorker writeWorker;
|
||||
SSingleWorker syncWorker;
|
||||
SReplica replicas[TSDB_MAX_REPLICA];
|
||||
int8_t replica;
|
||||
int8_t selfIndex;
|
||||
|
|
|
@ -16,7 +16,9 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "mmInt.h"
|
||||
|
||||
static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in mnode queue", pMsg);
|
||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||
int32_t code = -1;
|
||||
|
@ -42,7 +44,7 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) {
|
||||
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SNodeMsg *pMsg) {
|
||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
@ -59,7 +61,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
|
||||
}
|
||||
|
||||
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) {
|
||||
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
|
||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||
if (pMsg == NULL) {
|
||||
return -1;
|
||||
|
@ -89,19 +91,19 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
|||
}
|
||||
|
||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||
SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||
|
||||
if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) {
|
||||
if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) {
|
||||
dError("failed to start mnode-read worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) {
|
||||
if (tSingleWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
|
||||
dError("failed to start mnode-write worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) {
|
||||
if (tSingleWorkerInit(&pMgmt->syncWorker, &cfg) != 0) {
|
||||
dError("failed to start mnode sync-worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -110,7 +112,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
void mmStopWorker(SMnodeMgmt *pMgmt) {
|
||||
tQWorkerAllCleanup(&pMgmt->readWorker);
|
||||
tQWorkerAllCleanup(&pMgmt->writeWorker);
|
||||
tQWorkerAllCleanup(&pMgmt->syncWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->readWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->writeWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->syncWorker);
|
||||
}
|
||||
|
|
|
@ -28,8 +28,8 @@ typedef struct SQnodeMgmt {
|
|||
SDnode *pDnode;
|
||||
SMgmtWrapper *pWrapper;
|
||||
const char *path;
|
||||
SDnodeWorker queryWorker;
|
||||
SDnodeWorker fetchWorker;
|
||||
SSingleWorker queryWorker;
|
||||
SSingleWorker fetchWorker;
|
||||
} SQnodeMgmt;
|
||||
|
||||
// qmInt.c
|
||||
|
@ -44,6 +44,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
|||
// qmWorker.c
|
||||
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype);
|
||||
|
||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt);
|
||||
void qmStopWorker(SQnodeMgmt *pMgmt);
|
||||
|
|
|
@ -23,6 +23,7 @@ static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
|
|||
msgCb.pWrapper = pMgmt->pWrapper;
|
||||
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
|
||||
msgCb.qsizeFp = qmGetQueueSize;
|
||||
msgCb.sendReqFp = dndSendReqToDnode;
|
||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||
msgCb.sendRspFp = dndSendRsp;
|
||||
|
|
|
@ -21,7 +21,9 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
|
|||
dndSendRsp(pWrapper, &rsp);
|
||||
}
|
||||
|
||||
static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SQnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in qnode-query queue", pMsg);
|
||||
int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg);
|
||||
if (code != 0) {
|
||||
|
@ -33,7 +35,9 @@ static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SQnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg);
|
||||
int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg);
|
||||
if (code != 0) {
|
||||
|
@ -45,16 +49,16 @@ static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) {
|
||||
static int32_t qmPutMsgToWorker(SSingleWorker *pWorker, SNodeMsg *pMsg) {
|
||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); }
|
||||
|
||||
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); }
|
||||
|
||||
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
|
||||
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
|
||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||
if (pMsg == NULL) {
|
||||
return -1;
|
||||
|
@ -63,7 +67,7 @@ static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp
|
|||
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
|
||||
pMsg->rpcMsg = *pRpc;
|
||||
|
||||
int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
|
||||
int32_t code = taosWriteQitem(pWorker->queue, pMsg);
|
||||
if (code != 0) {
|
||||
dTrace("msg:%p, is freed", pMsg);
|
||||
taosFreeQitem(pMsg);
|
||||
|
@ -83,21 +87,48 @@ int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
|||
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
|
||||
}
|
||||
|
||||
int32_t qmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
|
||||
int32_t size = -1;
|
||||
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||
switch (qtype) {
|
||||
case QUERY_QUEUE:
|
||||
size = taosQueueSize(pMgmt->queryWorker.queue);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
size = taosQueueSize(pMgmt->fetchWorker.queue);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
||||
int32_t maxFetchThreads = 4;
|
||||
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
||||
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
|
||||
int32_t maxQueryThreads = minQueryThreads;
|
||||
|
||||
if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads,
|
||||
qmProcessQueryQueue) != 0) {
|
||||
dError("failed to start qnode query worker since %s", terrstr());
|
||||
SSingleWorkerCfg queryCfg = {.minNum = minQueryThreads,
|
||||
.maxNum = maxQueryThreads,
|
||||
.name = "qnode-query",
|
||||
.fp = (FItem)qmProcessQueryQueue,
|
||||
.param = pMgmt};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->queryWorker, &queryCfg) != 0) {
|
||||
dError("failed to start qnode-query worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads,
|
||||
qmProcessFetchQueue) != 0) {
|
||||
dError("failed to start qnode fetch worker since %s", terrstr());
|
||||
SSingleWorkerCfg fetchCfg = {.minNum = minFetchThreads,
|
||||
.maxNum = maxFetchThreads,
|
||||
.name = "qnode-fetch",
|
||||
.fp = (FItem)qmProcessFetchQueue,
|
||||
.param = pMgmt};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->fetchWorker, &fetchCfg) != 0) {
|
||||
dError("failed to start qnode-fetch worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -105,6 +136,6 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
void qmStopWorker(SQnodeMgmt *pMgmt) {
|
||||
dndCleanupWorker(&pMgmt->queryWorker);
|
||||
dndCleanupWorker(&pMgmt->fetchWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->queryWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->fetchWorker);
|
||||
}
|
||||
|
|
|
@ -30,8 +30,8 @@ typedef struct SSnodeMgmt {
|
|||
const char *path;
|
||||
SRWLatch latch;
|
||||
int8_t uniqueWorkerInUse;
|
||||
SArray *uniqueWorkers; // SArray<SDnodeWorker*>
|
||||
SDnodeWorker sharedWorker;
|
||||
SArray *uniqueWorkers; // SArray<SMultiWorker*>
|
||||
SSingleWorker sharedWorker;
|
||||
} SSnodeMgmt;
|
||||
|
||||
// smInt.c
|
||||
|
|
|
@ -16,7 +16,9 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "smInt.h"
|
||||
|
||||
static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t numOfMsgs) {
|
||||
static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||
SNodeMsg *pMsg = NULL;
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
|
@ -30,7 +32,9 @@ static void smProcessUniqueQueue(SSnodeMgmt *pMgmt, STaosQall *qall, int32_t num
|
|||
}
|
||||
}
|
||||
|
||||
static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SSnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in snode shared queue", pMsg);
|
||||
sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg);
|
||||
|
||||
|
@ -40,20 +44,23 @@ static void smProcessSharedQueue(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
||||
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
|
||||
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(SMultiWorker *));
|
||||
if (pMgmt->uniqueWorkers == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
|
||||
SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker));
|
||||
SMultiWorker *pUniqueWorker = malloc(sizeof(SMultiWorker));
|
||||
if (pUniqueWorker == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
if (dndInitWorker(pMgmt, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, smProcessSharedQueue) != 0) {
|
||||
dError("failed to start snode unique worker since %s", terrstr());
|
||||
|
||||
SMultiWorkerCfg cfg = {.maxNum = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt};
|
||||
|
||||
if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
|
||||
dError("failed to start snode-unique worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
if (taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker) == NULL) {
|
||||
|
@ -62,9 +69,14 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
|||
}
|
||||
}
|
||||
|
||||
if (dndInitWorker(pMgmt, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM,
|
||||
SND_SHARED_THREAD_NUM, smProcessSharedQueue)) {
|
||||
dError("failed to start snode shared worker since %s", terrstr());
|
||||
SSingleWorkerCfg cfg = {.minNum = SND_SHARED_THREAD_NUM,
|
||||
.maxNum = SND_SHARED_THREAD_NUM,
|
||||
.name = "snode-shared",
|
||||
.fp = (FItem)smProcessSharedQueue,
|
||||
.param = pMgmt};
|
||||
|
||||
if (tSingleWorkerInit(&pMgmt->sharedWorker, &cfg)) {
|
||||
dError("failed to start snode shared-worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -73,11 +85,11 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
|
|||
|
||||
void smStopWorker(SSnodeMgmt *pMgmt) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
|
||||
SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i);
|
||||
dndCleanupWorker(worker);
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i);
|
||||
tMultiWorkerCleanup(pWorker);
|
||||
}
|
||||
taosArrayDestroy(pMgmt->uniqueWorkers);
|
||||
dndCleanupWorker(&pMgmt->sharedWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->sharedWorker);
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) {
|
||||
|
@ -93,33 +105,33 @@ static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, 0);
|
||||
if (pWorker == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
|
||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t smProcessUniqueMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
|
||||
SDnodeWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, index);
|
||||
if (pWorker == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
|
||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t smProcessSharedMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
SDnodeWorker *pWorker = &pMgmt->sharedWorker;
|
||||
SSingleWorker *pWorker = &pMgmt->sharedWorker;
|
||||
|
||||
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
|
||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
int32_t smProcessExecMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
|
|
|
@ -36,7 +36,7 @@ typedef struct SVnodesMgmt {
|
|||
const char *path;
|
||||
SDnode *pDnode;
|
||||
SMgmtWrapper *pWrapper;
|
||||
SDnodeWorker mgmtWorker;
|
||||
SSingleWorker mgmtWorker;
|
||||
} SVnodesMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
@ -104,6 +104,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode);
|
|||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype);
|
||||
|
||||
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
||||
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg);
|
||||
|
|
|
@ -133,6 +133,7 @@ static void *vmOpenVnodeFunc(void *param) {
|
|||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||
msgCb.qsizeFp = vmGetQueueSize;
|
||||
msgCb.sendReqFp = dndSendReqToDnode;
|
||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||
msgCb.sendRspFp = dndSendRsp;
|
||||
|
|
|
@ -87,6 +87,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||
msgCb.qsizeFp = vmGetQueueSize;
|
||||
msgCb.sendReqFp = dndSendReqToDnode;
|
||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||
msgCb.sendRspFp = dndSendRsp;
|
||||
|
|
|
@ -21,7 +21,9 @@ static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
|
|||
dndSendRsp(pWrapper, &rsp);
|
||||
}
|
||||
|
||||
static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SVnodesMgmt *pMgmt = pInfo->ahandle;
|
||||
|
||||
int32_t code = -1;
|
||||
tmsg_t msgType = pMsg->rpcMsg.msgType;
|
||||
dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);
|
||||
|
@ -57,7 +59,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
||||
static void vmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in vnode-query queue", pMsg);
|
||||
int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
||||
if (code != 0) {
|
||||
|
@ -68,7 +72,9 @@ static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
|
||||
dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg);
|
||||
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
|
||||
if (code != 0) {
|
||||
|
@ -79,7 +85,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
|
||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
||||
if (pArray == NULL) {
|
||||
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
|
||||
|
@ -126,8 +134,9 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
|
|||
taosArrayDestroy(pArray);
|
||||
}
|
||||
|
||||
static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SNodeMsg *pMsg = NULL;
|
||||
static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SNodeMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
|
@ -138,8 +147,9 @@ static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SNodeMsg *pMsg = NULL;
|
||||
static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SNodeMsg *pMsg = NULL;
|
||||
|
||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(qall, (void **)&pMsg);
|
||||
|
@ -190,26 +200,18 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
|
||||
}
|
||||
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE); }
|
||||
|
||||
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
|
||||
}
|
||||
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE); }
|
||||
|
||||
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
|
||||
}
|
||||
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE); }
|
||||
|
||||
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
|
||||
}
|
||||
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); }
|
||||
|
||||
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
|
||||
SSingleWorker *pWorker = &pMgmt->mgmtWorker;
|
||||
dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name);
|
||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
|
||||
|
@ -258,6 +260,34 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
|||
return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
|
||||
}
|
||||
|
||||
int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
|
||||
int32_t size = -1;
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId);
|
||||
if (pVnode != NULL) {
|
||||
switch (qtype) {
|
||||
case QUERY_QUEUE:
|
||||
size = taosQueueSize(pVnode->pQueryQ);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
size = taosQueueSize(pVnode->pFetchQ);
|
||||
break;
|
||||
case WRITE_QUEUE:
|
||||
size = taosQueueSize(pVnode->pWriteQ);
|
||||
break;
|
||||
case SYNC_QUEUE:
|
||||
size = taosQueueSize(pVnode->pSyncQ);
|
||||
break;
|
||||
case APPLY_QUEUE:
|
||||
size = taosQueueSize(pVnode->pApplyQ);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
vmReleaseVnode(pWrapper->pMgmt, pVnode);
|
||||
return size;
|
||||
}
|
||||
|
||||
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
|
||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
|
||||
|
@ -319,7 +349,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
|||
pWPool->max = maxSyncThreads;
|
||||
if (tWWorkerInit(pWPool) != 0) return -1;
|
||||
|
||||
if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) {
|
||||
SSingleWorkerCfg cfg = {
|
||||
.minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
|
||||
if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) {
|
||||
dError("failed to start vnode-mgmt worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
@ -329,7 +361,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
|||
}
|
||||
|
||||
void vmStopWorker(SVnodesMgmt *pMgmt) {
|
||||
dndCleanupWorker(&pMgmt->mgmtWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->mgmtWorker);
|
||||
tQWorkerCleanup(&pMgmt->fetchPool);
|
||||
tQWorkerCleanup(&pMgmt->queryPool);
|
||||
tWWorkerCleanup(&pMgmt->writePool);
|
||||
|
|
|
@ -86,7 +86,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
|
|||
}
|
||||
|
||||
if (fp != NULL) {
|
||||
(*fp)(ahandle, msg);
|
||||
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
|
||||
(*fp)(&info, msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -210,7 +211,8 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
|
|||
}
|
||||
|
||||
if (fp != NULL) {
|
||||
(*fp)(ahandle, worker->qall, numOfMsgs);
|
||||
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
|
||||
(*fp)(&info, worker->qall, numOfMsgs);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -264,6 +266,8 @@ STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp) {
|
|||
}
|
||||
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
pool->num++;
|
||||
if (pool->num > pool->max) pool->num = pool->max;
|
||||
} else {
|
||||
taosAddIntoQset(worker->qset, queue, ahandle);
|
||||
pool->nextId = (pool->nextId + 1) % pool->max;
|
||||
|
@ -280,7 +284,7 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
|
|||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
||||
}
|
||||
|
||||
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
|
||||
int32_t tSingleWorkerInit(SSingleWorker *pWorker, const SSingleWorkerCfg *pCfg) {
|
||||
SQWorkerPool *pPool = &pWorker->pool;
|
||||
pPool->name = pCfg->name;
|
||||
pPool->min = pCfg->minNum;
|
||||
|
@ -298,7 +302,7 @@ int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
|
||||
void tSingleWorkerCleanup(SSingleWorker *pWorker) {
|
||||
if (pWorker->queue == NULL) return;
|
||||
|
||||
while (!taosQueueEmpty(pWorker->queue)) {
|
||||
|
@ -309,7 +313,7 @@ void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
|
|||
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
|
||||
}
|
||||
|
||||
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
|
||||
int32_t tMultiWorkerInit(SMultiWorker *pWorker, const SMultiWorkerCfg *pCfg) {
|
||||
SWWorkerPool *pPool = &pWorker->pool;
|
||||
pPool->name = pCfg->name;
|
||||
pPool->max = pCfg->maxNum;
|
||||
|
@ -326,7 +330,7 @@ int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tWWorkerAllCleanup(SWWorkerAll *pWorker) {
|
||||
void tMultiWorkerCleanup(SMultiWorker *pWorker) {
|
||||
if (pWorker->queue == NULL) return;
|
||||
|
||||
while (!taosQueueEmpty(pWorker->queue)) {
|
||||
|
|
Loading…
Reference in New Issue