commit
48614d2cb0
|
@ -22,30 +22,29 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
struct SRpcMsg;
|
typedef struct SRpcMsg SRpcMsg;
|
||||||
struct SEpSet;
|
typedef struct SEpSet SEpSet;
|
||||||
struct SMgmtWrapper;
|
|
||||||
typedef struct SMgmtWrapper SMgmtWrapper;
|
typedef struct SMgmtWrapper SMgmtWrapper;
|
||||||
|
|
||||||
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
|
typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
||||||
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq);
|
typedef int32_t (*SendReqFp)(SMgmtWrapper* pWrapper, SEpSet* epSet, SRpcMsg* pReq);
|
||||||
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
|
typedef int32_t (*SendMnodeReqFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq);
|
||||||
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp);
|
typedef void (*SendRspFp)(SMgmtWrapper* pWrapper, SRpcMsg* pRsp);
|
||||||
|
|
||||||
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EMsgQueueType;
|
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
struct SMgmtWrapper* pWrapper;
|
SMgmtWrapper* pWrapper;
|
||||||
PutToQueueFp queueFps[QUEUE_MAX];
|
PutToQueueFp queueFps[QUEUE_MAX];
|
||||||
SendReqFp sendReqFp;
|
SendReqFp sendReqFp;
|
||||||
SendMnodeReqFp sendMnodeReqFp;
|
SendMnodeReqFp sendMnodeReqFp;
|
||||||
SendRspFp sendRspFp;
|
SendRspFp sendRspFp;
|
||||||
} SMsgCb;
|
} SMsgCb;
|
||||||
|
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq);
|
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq);
|
||||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq);
|
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq);
|
||||||
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq);
|
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq);
|
||||||
void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp);
|
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,10 +70,9 @@ int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad);
|
||||||
*
|
*
|
||||||
* @param pQnode The qnode object.
|
* @param pQnode The qnode object.
|
||||||
* @param pMsg The request message
|
* @param pMsg The request message
|
||||||
* @param pRsp The response message
|
|
||||||
* @return int32_t 0 for success, -1 for failure
|
|
||||||
*/
|
*/
|
||||||
int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg);
|
||||||
|
int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
|
||||||
enum {
|
typedef enum {
|
||||||
JOB_TASK_STATUS_NULL = 0,
|
JOB_TASK_STATUS_NULL = 0,
|
||||||
JOB_TASK_STATUS_NOT_START = 1,
|
JOB_TASK_STATUS_NOT_START = 1,
|
||||||
JOB_TASK_STATUS_EXECUTING,
|
JOB_TASK_STATUS_EXECUTING,
|
||||||
|
@ -35,12 +35,12 @@ enum {
|
||||||
JOB_TASK_STATUS_CANCELLING,
|
JOB_TASK_STATUS_CANCELLING,
|
||||||
JOB_TASK_STATUS_CANCELLED,
|
JOB_TASK_STATUS_CANCELLED,
|
||||||
JOB_TASK_STATUS_DROPPING,
|
JOB_TASK_STATUS_DROPPING,
|
||||||
};
|
} EJobTaskType;
|
||||||
|
|
||||||
enum {
|
typedef enum {
|
||||||
TASK_TYPE_PERSISTENT = 1,
|
TASK_TYPE_PERSISTENT = 1,
|
||||||
TASK_TYPE_TEMP,
|
TASK_TYPE_TEMP,
|
||||||
};
|
} ETaskType;
|
||||||
|
|
||||||
typedef struct STableComInfo {
|
typedef struct STableComInfo {
|
||||||
uint8_t numOfTags; // the number of tags in schema
|
uint8_t numOfTags; // the number of tags in schema
|
||||||
|
|
|
@ -16,16 +16,16 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
|
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq) {
|
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
|
||||||
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
|
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq) {
|
int32_t tmsgSendReq(const SMsgCb* pMsgCb, SEpSet* epSet, SRpcMsg* pReq) {
|
||||||
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
|
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq) {
|
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) {
|
||||||
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
|
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
|
void tmsgSendRsp(const SMsgCb* pMsgCb, SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
|
||||||
|
|
|
@ -42,6 +42,52 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
|
||||||
|
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||||
|
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
|
return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
|
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
|
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
|
||||||
|
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
|
||||||
|
pMsg->rpcMsg = *pRpc;
|
||||||
|
|
||||||
|
int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
|
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->writeWorker, pRpc);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
|
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||||
if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) {
|
if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) {
|
||||||
dError("failed to start mnode read worker since %s", terrstr());
|
dError("failed to start mnode read worker since %s", terrstr());
|
||||||
|
@ -66,49 +112,3 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
|
||||||
dndCleanupWorker(&pMgmt->writeWorker);
|
dndCleanupWorker(&pMgmt->writeWorker);
|
||||||
dndCleanupWorker(&pMgmt->syncWorker);
|
dndCleanupWorker(&pMgmt->syncWorker);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) {
|
|
||||||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
|
||||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mmProcessSyncMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|
||||||
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
|
|
||||||
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
|
||||||
if (pMsg == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
|
||||||
pMsg->rpcMsg = *pRpc;
|
|
||||||
|
|
||||||
int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg);
|
|
||||||
if (code != 0) {
|
|
||||||
dTrace("msg:%p, is freed", pMsg);
|
|
||||||
taosFreeQitem(pMsg);
|
|
||||||
rpcFreeCont(pRpc->pCont);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
|
||||||
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
|
|
||||||
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->writeWorker, pRpc);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
|
||||||
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
|
|
||||||
return mmPutRpcMsgToWorker(pMgmt, &pMgmt->readWorker, pRpc);
|
|
||||||
}
|
|
||||||
|
|
|
@ -42,6 +42,9 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||||
int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
||||||
|
|
||||||
// qmWorker.c
|
// qmWorker.c
|
||||||
|
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
||||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt);
|
int32_t qmStartWorker(SQnodeMgmt *pMgmt);
|
||||||
void qmStopWorker(SQnodeMgmt *pMgmt);
|
void qmStopWorker(SQnodeMgmt *pMgmt);
|
||||||
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg);
|
||||||
|
|
|
@ -21,6 +21,8 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndRea
|
||||||
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
|
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
|
||||||
SMsgCb msgCb = {0};
|
SMsgCb msgCb = {0};
|
||||||
msgCb.pWrapper = pMgmt->pWrapper;
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
|
msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue;
|
||||||
|
msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue;
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
|
|
@ -16,50 +16,87 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "qmInt.h"
|
#include "qmInt.h"
|
||||||
|
|
||||||
static void qmProcessQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
|
||||||
dTrace("msg:%p, will be processed in qnode queue", pMsg);
|
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
|
||||||
SRpcMsg *pRsp = NULL;
|
dndSendRsp(pWrapper, &rsp);
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
}
|
||||||
int32_t code = qndProcessMsg(pMgmt->pQnode, pRpc, &pRsp);
|
|
||||||
|
|
||||||
if (pRpc->msgType & 1u) {
|
static void qmProcessQueryQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
if (pRsp != NULL) {
|
dTrace("msg:%p, will be processed in qnode-query queue", pMsg);
|
||||||
pRsp->ahandle = pRpc->ahandle;
|
int32_t code = qndProcessQueryMsg(pMgmt->pQnode, &pMsg->rpcMsg);
|
||||||
dndSendRsp(pMgmt->pWrapper, pRsp);
|
if (code != 0) {
|
||||||
free(pRsp);
|
qmSendRsp(pMgmt->pWrapper, pMsg, code);
|
||||||
} else {
|
|
||||||
if (code != 0) code = terrno;
|
|
||||||
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
|
|
||||||
dndSendRsp(pMgmt->pWrapper, &rpcRsp);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||||
rpcFreeCont(pRpc->pCont);
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
static void qmProcessFetchQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
SDnodeWorker *pWorker = &pMgmt->queryWorker;
|
dTrace("msg:%p, will be processed in qnode-fetch queue", pMsg);
|
||||||
|
int32_t code = qndProcessFetchMsg(pMgmt->pQnode, &pMsg->rpcMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
qmSendRsp(pMgmt->pWrapper, pMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
|
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
|
||||||
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t qmPutMsgToWorker(SDnodeWorker *pWorker, SNodeMsg *pMsg) {
|
||||||
|
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->queryWorker, pMsg); }
|
||||||
SDnodeWorker *pWorker = &pMgmt->fetchWorker;
|
|
||||||
|
|
||||||
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
|
int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { return qmPutMsgToWorker(&pMgmt->fetchWorker, pMsg); }
|
||||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
|
||||||
|
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) {
|
||||||
|
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
|
||||||
|
pMsg->rpcMsg = *pRpc;
|
||||||
|
|
||||||
|
int32_t code = dndWriteMsgToWorker(pWorker, pMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
dTrace("msg:%p, is freed", pMsg);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
rpcFreeCont(pRpc->pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
|
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
|
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
|
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
|
||||||
if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, qmProcessQueue) != 0) {
|
int32_t maxFetchThreads = 4;
|
||||||
|
int32_t minFetchThreads = TMIN(maxFetchThreads, tsNumOfCores);
|
||||||
|
int32_t minQueryThreads = TMAX((int32_t)(tsNumOfCores * tsRatioOfQueryCores), 1);
|
||||||
|
int32_t maxQueryThreads = minQueryThreads;
|
||||||
|
|
||||||
|
if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", minQueryThreads, maxQueryThreads,
|
||||||
|
qmProcessQueryQueue) != 0) {
|
||||||
dError("failed to start qnode query worker since %s", terrstr());
|
dError("failed to start qnode query worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, qmProcessQueue) != 0) {
|
if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", minFetchThreads, maxFetchThreads,
|
||||||
|
qmProcessFetchQueue) != 0) {
|
||||||
dError("failed to start qnode fetch worker since %s", terrstr());
|
dError("failed to start qnode fetch worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,8 +24,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef enum { VND_WRITE_QUEUE, VND_QUERY_QUEUE, VND_FETCH_QUEUE, VND_APPLY_QUEUE, VND_SYNC_QUEUE } EVndQueueType;
|
|
||||||
|
|
||||||
typedef struct SVnodesMgmt {
|
typedef struct SVnodesMgmt {
|
||||||
SHashObj *hash;
|
SHashObj *hash;
|
||||||
SRWLatch latch;
|
SRWLatch latch;
|
||||||
|
|
|
@ -131,6 +131,8 @@ static void *vmOpenVnodeFunc(void *param) {
|
||||||
SMsgCb msgCb = {0};
|
SMsgCb msgCb = {0};
|
||||||
msgCb.pWrapper = pMgmt->pWrapper;
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||||
|
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||||
|
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
|
|
@ -85,6 +85,8 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
SMsgCb msgCb = {0};
|
SMsgCb msgCb = {0};
|
||||||
msgCb.pWrapper = pMgmt->pWrapper;
|
msgCb.pWrapper = pMgmt->pWrapper;
|
||||||
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
|
||||||
|
msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;
|
||||||
|
msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue;
|
||||||
msgCb.sendReqFp = dndSendReqToDnode;
|
msgCb.sendReqFp = dndSendReqToDnode;
|
||||||
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
msgCb.sendMnodeReqFp = dndSendReqToMnode;
|
||||||
msgCb.sendRspFp = dndSendRsp;
|
msgCb.sendRspFp = dndSendRsp;
|
||||||
|
|
|
@ -153,7 +153,7 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) {
|
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
|
@ -168,20 +168,22 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case VND_QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-query queue", pMsg);
|
dTrace("msg:%p, will be written into vnode-query queue", pMsg);
|
||||||
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
|
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case VND_FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
|
dTrace("msg:%p, will be written into vnode-fetch queue", pMsg);
|
||||||
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case VND_WRITE_QUEUE:
|
case WRITE_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-write queue", pMsg);
|
dTrace("msg:%p, will be written into vnode-write queue", pMsg);
|
||||||
code = taosWriteQitem(pVnode->pWriteQ, pMsg);
|
code = taosWriteQitem(pVnode->pWriteQ, pMsg);
|
||||||
case VND_SYNC_QUEUE:
|
break;
|
||||||
|
case SYNC_QUEUE:
|
||||||
dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
|
dTrace("msg:%p, will be written into vnode-sync queue", pMsg);
|
||||||
code = taosWriteQitem(pVnode->pSyncQ, pMsg);
|
code = taosWriteQitem(pVnode->pSyncQ, pMsg);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
break;
|
break;
|
||||||
|
@ -192,19 +194,19 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueue
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE);
|
return vmPutNodeMsgToQueue(pMgmt, pMsg, SYNC_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE);
|
return vmPutNodeMsgToQueue(pMgmt, pMsg, WRITE_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE);
|
return vmPutNodeMsgToQueue(pMgmt, pMsg, QUERY_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE);
|
return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
|
@ -213,7 +215,7 @@ int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
||||||
return dndWriteMsgToWorker(pWorker, pMsg);
|
return dndWriteMsgToWorker(pWorker, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) {
|
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
|
||||||
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SMsgHead *pHead = pRpc->pCont;
|
SMsgHead *pHead = pRpc->pCont;
|
||||||
|
@ -226,20 +228,18 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQue
|
||||||
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
|
||||||
pMsg->rpcMsg = *pRpc;
|
pMsg->rpcMsg = *pRpc;
|
||||||
switch (qtype) {
|
switch (qtype) {
|
||||||
case VND_QUERY_QUEUE:
|
case QUERY_QUEUE:
|
||||||
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
|
dTrace("msg:%p, will be put into vnode-query queue", pMsg);
|
||||||
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
|
code = taosWriteQitem(pVnode->pQueryQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case VND_FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
|
dTrace("msg:%p, will be put into vnode-fetch queue", pMsg);
|
||||||
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case VND_APPLY_QUEUE:
|
case APPLY_QUEUE:
|
||||||
dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
|
dTrace("msg:%p, will be put into vnode-apply queue", pMsg);
|
||||||
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
|
code = taosWriteQitem(pVnode->pApplyQ, pMsg);
|
||||||
break;
|
break;
|
||||||
case VND_WRITE_QUEUE:
|
|
||||||
case VND_SYNC_QUEUE:
|
|
||||||
default:
|
default:
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
break;
|
break;
|
||||||
|
@ -250,15 +250,15 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQue
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE);
|
return vmPutRpcMsgToQueue(pWrapper, pRpc, QUERY_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE);
|
return vmPutRpcMsgToQueue(pWrapper, pRpc, FETCH_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE);
|
return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
#include "tmsgcb.h"
|
||||||
|
|
||||||
#include "qnode.h"
|
#include "qnode.h"
|
||||||
|
|
||||||
|
|
|
@ -34,25 +34,20 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
|
||||||
return pQnode;
|
return pQnode;
|
||||||
}
|
}
|
||||||
|
|
||||||
void qndClose(SQnode *pQnode) {
|
void qndClose(SQnode *pQnode) {
|
||||||
qWorkerDestroy((void **)&pQnode->pQuery);
|
qWorkerDestroy((void **)&pQnode->pQuery);
|
||||||
|
|
||||||
free(pQnode);
|
free(pQnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
|
int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad) { return 0; }
|
||||||
|
|
||||||
int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int32_t qndProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||||
*pRsp = NULL;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
|
||||||
qTrace("message in query queue is processing");
|
qTrace("message in query queue is processing");
|
||||||
SReadHandle handle = {0};
|
SReadHandle handle = {0};
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_QUERY:{
|
case TDMT_VND_QUERY: {
|
||||||
return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg);
|
return qWorkerProcessQueryMsg(&handle, pQnode->pQuery, pMsg);
|
||||||
}
|
}
|
||||||
case TDMT_VND_QUERY_CONTINUE:
|
case TDMT_VND_QUERY_CONTINUE:
|
||||||
|
@ -63,7 +58,7 @@ int qnodeProcessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||||
qTrace("message in fetch queue is processing");
|
qTrace("message in fetch queue is processing");
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_VND_FETCH:
|
case TDMT_VND_FETCH:
|
||||||
|
@ -81,17 +76,13 @@ int qnodeProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||||
case TDMT_VND_SHOW_TABLES:
|
case TDMT_VND_SHOW_TABLES:
|
||||||
return qWorkerProcessShowMsg(pQnode, pQnode->pQuery, pMsg);
|
return qWorkerProcessShowMsg(pQnode, pQnode->pQuery, pMsg);
|
||||||
case TDMT_VND_SHOW_TABLES_FETCH:
|
case TDMT_VND_SHOW_TABLES_FETCH:
|
||||||
//return vnodeGetTableList(pQnode, pMsg);
|
// return vnodeGetTableList(pQnode, pMsg);
|
||||||
case TDMT_VND_TABLE_META:
|
case TDMT_VND_TABLE_META:
|
||||||
//return vnodeGetTableMeta(pQnode, pMsg);
|
// return vnodeGetTableMeta(pQnode, pMsg);
|
||||||
case TDMT_VND_CONSUME:
|
case TDMT_VND_CONSUME:
|
||||||
//return tqProcessConsumeReq(pQnode->pTq, pMsg);
|
// return tqProcessConsumeReq(pQnode->pTq, pMsg);
|
||||||
default:
|
default:
|
||||||
qError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
qError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||||
return TSDB_CODE_VND_APP_ERROR;
|
return TSDB_CODE_VND_APP_ERROR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue