Merge pull request #10875 from taosdata/feature/shm

refact dnode queue
This commit is contained in:
Shengliang Guan 2022-03-21 19:37:41 +08:00 committed by GitHub
commit 8738ef6980
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 223 additions and 236 deletions

View File

@ -2330,14 +2330,6 @@ typedef struct {
#pragma pack(pop) #pragma pack(pop)
struct SRpcMsg;
struct SEpSet;
struct SMgmtWrapper;
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

54
include/common/tmsgcb.h Normal file
View File

@ -0,0 +1,54 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_MSG_CB_H_
#define _TD_COMMON_MSG_CB_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
struct SRpcMsg;
struct SEpSet;
struct SMgmtWrapper;
typedef struct SMgmtWrapper SMgmtWrapper;
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp);
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EMsgQueueType;
typedef struct {
struct SMgmtWrapper* pWrapper;
PutToQueueFp queueFps[QUEUE_MAX];
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SMsgCb;
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_MSG_CB_H_*/

View File

@ -16,24 +16,20 @@
#ifndef _TD_BNODE_H_ #ifndef _TD_BNODE_H_
#define _TD_BNODE_H_ #define _TD_BNODE_H_
#include "tmsgcb.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SBnode SBnode;
typedef struct SBnode SBnode;
typedef struct { typedef struct {
} SBnodeLoad; } SBnodeLoad;
typedef struct { typedef struct {
int32_t dnodeId; SMsgCb msgCb;
int64_t clusterId;
SMgmtWrapper *pWrapper;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SBnodeOpt; } SBnodeOpt;
/* ------------------------ SBnode ------------------------ */ /* ------------------------ SBnode ------------------------ */

View File

@ -17,27 +17,22 @@
#define _TD_MND_H_ #define _TD_MND_H_
#include "monitor.h" #include "monitor.h"
#include "tmsgcb.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SMnode SMnode;
typedef struct SMnode SMnode;
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
SMgmtWrapper *pWrapper; SMsgCb msgCb;
PutToQueueFp putToWriteQFp;
PutToQueueFp putToReadQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SMnodeOpt; } SMnodeOpt;
/* ------------------------ SMnode ------------------------ */ /* ------------------------ SMnode ------------------------ */

View File

@ -16,13 +16,14 @@
#ifndef _TD_QNODE_H_ #ifndef _TD_QNODE_H_
#define _TD_QNODE_H_ #define _TD_QNODE_H_
#include "tmsgcb.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SQnode SQnode;
typedef struct SQnode SQnode;
typedef struct { typedef struct {
int64_t numOfStartTask; int64_t numOfStartTask;
@ -36,12 +37,7 @@ typedef struct {
} SQnodeLoad; } SQnodeLoad;
typedef struct { typedef struct {
int32_t dnodeId; SMsgCb msgCb;
int64_t clusterId;
SMgmtWrapper *pWrapper;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SQnodeOpt; } SQnodeOpt;
/* ------------------------ SQnode ------------------------ */ /* ------------------------ SQnode ------------------------ */

View File

@ -16,7 +16,7 @@
#ifndef _TD_SNODE_H_ #ifndef _TD_SNODE_H_
#define _TD_SNODE_H_ #define _TD_SNODE_H_
#include "tcommon.h" #include "tmsgcb.h"
#include "tmsg.h" #include "tmsg.h"
#include "trpc.h" #include "trpc.h"
@ -25,20 +25,14 @@ extern "C" {
#endif #endif
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SSnode SSnode;
typedef struct SSnode SSnode;
typedef struct { typedef struct {
int32_t reserved; int32_t reserved;
} SSnodeLoad; } SSnodeLoad;
typedef struct { typedef struct {
int32_t dnodeId; SMsgCb msgCb;
int64_t clusterId;
SMgmtWrapper *pWrapper;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SSnodeOpt; } SSnodeOpt;
/* ------------------------ SSnode ------------------------ */ /* ------------------------ SSnode ------------------------ */

View File

@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "tmsgcb.h"
#include "trpc.h" #include "trpc.h"
@ -48,11 +49,7 @@ typedef struct {
uint64_t numOfErrors; uint64_t numOfErrors;
} SQWorkerStat; } SQWorkerStat;
typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *); int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
typedef int32_t (*sendReqFp)(void *, struct SEpSet *, struct SRpcMsg *);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
putReqToQueryQFp fp1, sendReqFp fp2);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);

View File

@ -0,0 +1,31 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tmsgcb.h"
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq) {
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
}
int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
}
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq) {
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
}
void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }

View File

@ -30,7 +30,7 @@ extern "C" {
#endif #endif
typedef struct SBnode { typedef struct SBnode {
SBnodeOpt opt; SMsgCb msgCb;
} SBnode; } SBnode;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -17,6 +17,7 @@
SBnode *bndOpen(const char *path, const SBnodeOpt *pOption) { SBnode *bndOpen(const char *path, const SBnodeOpt *pOption) {
SBnode *pBnode = calloc(1, sizeof(SBnode)); SBnode *pBnode = calloc(1, sizeof(SBnode));
pBnode->msgCb = pOption->msgCb;
return pBnode; return pBnode;
} }

View File

@ -19,13 +19,12 @@
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode; SMsgCb msgCb = {0};
pOption->pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->msgCb = msgCb;
pOption->clusterId = pDnode->clusterId;
} }
static int32_t bmOpenImp(SBnodeMgmt *pMgmt) { static int32_t bmOpenImp(SBnodeMgmt *pMgmt) {

View File

@ -39,14 +39,17 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode; SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
pOption->putToWriteQFp = mmPutMsgToWriteQueue;
pOption->putToReadQFp = mmPutMsgToReadQueue;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId; pOption->clusterId = pDnode->clusterId;
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
pOption->msgCb = msgCb;
} }
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {

View File

@ -19,13 +19,12 @@
static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode; SMsgCb msgCb = {0};
pOption->pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->msgCb = msgCb;
pOption->clusterId = pDnode->clusterId;
} }
static int32_t qmOpenImp(SQnodeMgmt *pMgmt) { static int32_t qmOpenImp(SQnodeMgmt *pMgmt) {

View File

@ -19,13 +19,12 @@
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode; SMsgCb msgCb = {0};
pOption->pWrapper = pMgmt->pWrapper; msgCb.pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode; msgCb.sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode; msgCb.sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp; msgCb.sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId; pOption->msgCb = msgCb;
pOption->clusterId = pDnode->clusterId;
} }
static int32_t smOpenImp(SSnodeMgmt *pMgmt) { static int32_t smOpenImp(SSnodeMgmt *pMgmt) {

View File

@ -128,7 +128,13 @@ static void *vmOpenVnodeFunc(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes); pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc); dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pWrapper = pMgmt->pWrapper, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
@ -262,7 +268,6 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt)); SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt));
int32_t code = -1; int32_t code = -1;
SVnodeOpt vnodeOpt = {0};
dInfo("vnodes-mgmt start to init"); dInfo("vnodes-mgmt start to init");
if (pMgmt == NULL) goto _OVER; if (pMgmt == NULL) goto _OVER;
@ -294,13 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
goto _OVER; goto _OVER;
} }
vnodeOpt.nthreads = tsNumOfCommitThreads; if (vnodeInit() != 0) {
vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue;
vnodeOpt.putToFetchQFp = vmPutMsgToQueryQueue;
vnodeOpt.sendReqFp = dndSendReqToDnode;
vnodeOpt.sendMnodeReqFp = dndSendReqToMnode;
vnodeOpt.sendRspFp = dndSendRsp;
if (vnodeInit(&vnodeOpt) != 0) {
dError("failed to init vnode since %s", terrstr()); dError("failed to init vnode since %s", terrstr());
goto _OVER; goto _OVER;
} }

View File

@ -82,7 +82,14 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1; return -1;
} }
vnodeCfg.pWrapper = pMgmt->pWrapper; SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
vnodeCfg.msgCb = msgCb;
vnodeCfg.pTfs = pMgmt->pTfs; vnodeCfg.pTfs = pMgmt->pTfs;
vnodeCfg.dbId = wrapperCfg.dbUid; vnodeCfg.dbId = wrapperCfg.dbUid;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);

View File

@ -119,19 +119,12 @@ typedef struct SMnode {
SHashObj *infosMeta; SHashObj *infosMeta;
SGrantInfo grant; SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX]; MndMsgFp msgFp[TDMT_MAX];
SendReqFp sendReqFp; SMsgCb msgCb;
SendMnodeReqFp sendMnodeReqFp;
PutToQueueFp putToWriteQFp;
PutToQueueFp putToReadQFp;
} SMnode; } SMnode;
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
uint64_t mndGenerateUid(char *name, int32_t len); uint64_t mndGenerateUid(char *name, int32_t len);
void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -623,7 +623,7 @@ static int32_t mndProcessConfigDnodeReq(SNodeMsg *pReq) {
.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .ahandle = pReq->rpcMsg.ahandle}; .msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .ahandle = pReq->rpcMsg.ahandle};
mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.ahandle, cfgReq.config); mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.ahandle, cfgReq.config);
mndSendReqToDnode(pMnode, &epSet, &rpcMsg); tmsgSendReq(&pMnode->msgCb, &epSet, &rpcMsg);
return 0; return 0;
} }

View File

@ -420,7 +420,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
.pCont = pRebMsg, .pCont = pRebMsg,
.contLen = sizeof(SMqDoRebalanceMsg), .contLen = sizeof(SMqDoRebalanceMsg),
}; };
(*pMnode->putToWriteQFp)(pMnode->pWrapper, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} else { } else {
taosHashCleanup(pRebMsg->rebSubHash); taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg); rpcFreeCont(pRebMsg);

View File

@ -888,7 +888,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
} }
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
if (mndSendReqToDnode(pMnode, &pAction->epSet, &rpcMsg) == 0) { if (tmsgSendReq(&pMnode->msgCb, &pAction->epSet, &rpcMsg) == 0) {
mDebug("trans:%d, action:%d is sent", pTrans->id, action); mDebug("trans:%d, action:%d is sent", pTrans->id, action);
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 0; pAction->msgReceived = 0;

View File

@ -43,24 +43,6 @@
#define TRNAS_TIMER_MS 6000 #define TRNAS_TIMER_MS 6000
#define TELEM_TIMER_MS 86400000 #define TELEM_TIMER_MS 86400000
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pMnode == NULL || pMnode->sendReqFp == NULL) {
terrno = TSDB_CODE_MND_NOT_READY;
return -1;
}
return (*pMnode->sendReqFp)(pMnode->pWrapper, pEpSet, pMsg);
}
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode == NULL || pMnode->sendReqFp == NULL) {
terrno = TSDB_CODE_MND_NOT_READY;
return -1;
}
return (*pMnode->sendMnodeReqFp)(pMnode->pWrapper, pMsg);
}
static void *mndBuildTimerMsg(int32_t *pContLen) { static void *mndBuildTimerMsg(int32_t *pContLen) {
SMTimerReq timerReq = {0}; SMTimerReq timerReq = {0};
@ -80,7 +62,7 @@ static void mndPullupTrans(void *param, void *tmrId) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putToWriteQFp(pMnode->pWrapper, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer); taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
@ -96,7 +78,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
.pCont = pReq, .pCont = pReq,
.contLen = contLen, .contLen = contLen,
}; };
pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg);
} }
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer); taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
@ -108,7 +90,7 @@ static void mndPullupTelem(void *param, void *tmrId) {
int32_t contLen = 0; int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg);
} }
taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer); taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
@ -286,14 +268,9 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->replica = pOption->replica; pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex; pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pWrapper = pOption->pWrapper; pMnode->msgCb = pOption->msgCb;
pMnode->putToWriteQFp = pOption->putToWriteQFp;
pMnode->putToReadQFp = pOption->putToReadQFp;
pMnode->sendReqFp = pOption->sendReqFp;
pMnode->sendMnodeReqFp = pOption->sendMnodeReqFp;
if (pMnode->sendReqFp == NULL || pMnode->sendMnodeReqFp == NULL || if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
pMnode->putToWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS; terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1; return -1;
} }

View File

@ -32,8 +32,8 @@ typedef struct SQWorkerMgmt SQHandle;
typedef struct SQnode { typedef struct SQnode {
int32_t qndId; int32_t qndId;
SQnodeOpt opt; SMsgCb msgCb;
SQHandle* pQuery; SQHandle* pQuery;
} SQnode; } SQnode;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -13,14 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "executor.h"
#include "qndInt.h" #include "qndInt.h"
#include "query.h" #include "query.h"
#include "qworker.h" #include "qworker.h"
#include "executor.h"
int32_t qnodePutReqToVQueryQ(SQnode* pQnode, struct SRpcMsg* pReq) {}
void qnodeSendReqToDnode(SQnode* pQnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {}
SQnode *qndOpen(const SQnodeOpt *pOption) { SQnode *qndOpen(const SQnodeOpt *pOption) {
SQnode *pQnode = calloc(1, sizeof(SQnode)); SQnode *pQnode = calloc(1, sizeof(SQnode));
@ -29,12 +25,12 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
return NULL; return NULL;
} }
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, pQnode, if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
(putReqToQueryQFp)qnodePutReqToVQueryQ, (sendReqFp)qnodeSendReqToDnode)) {
tfree(pQnode); tfree(pQnode);
return NULL; return NULL;
} }
pQnode->msgCb = pOption->msgCb;
return pQnode; return pQnode;
} }

View File

@ -44,7 +44,7 @@ typedef struct {
typedef struct SSnode { typedef struct SSnode {
SStreamMeta* pMeta; SStreamMeta* pMeta;
SSnodeOpt cfg; SMsgCb msgCb;
} SSnode; } SSnode;
SStreamMeta* sndMetaNew(); SStreamMeta* sndMetaNew();

View File

@ -22,7 +22,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
if (pSnode == NULL) { if (pSnode == NULL) {
return NULL; return NULL;
} }
memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt)); pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = sndMetaNew(); pSnode->pMeta = sndMetaNew();
if (pSnode->pMeta == NULL) { if (pSnode->pMeta == NULL) {
free(pSnode); free(pSnode);

View File

@ -18,6 +18,7 @@
#include "os.h" #include "os.h"
#include "trpc.h" #include "trpc.h"
#include "tmsgcb.h"
#include "meta.h" #include "meta.h"
#include "tarray.h" #include "tarray.h"
@ -40,7 +41,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
uint64_t dbId; uint64_t dbId;
void *pWrapper;
STfs *pTfs; STfs *pTfs;
uint64_t wsize; uint64_t wsize;
uint64_t ssize; uint64_t ssize;
@ -54,20 +54,12 @@ typedef struct {
SMetaCfg metaCfg; SMetaCfg metaCfg;
STqCfg tqCfg; STqCfg tqCfg;
SWalCfg walCfg; SWalCfg walCfg;
SMsgCb msgCb;
uint32_t hashBegin; uint32_t hashBegin;
uint32_t hashEnd; uint32_t hashEnd;
int8_t hashMethod; int8_t hashMethod;
} SVnodeCfg; } SVnodeCfg;
typedef struct {
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutToQueueFp putToQueryQFp;
PutToQueueFp putToFetchQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SVnodeOpt;
typedef struct { typedef struct {
int64_t ver; int64_t ver;
int64_t tbUid; int64_t tbUid;
@ -87,10 +79,9 @@ typedef struct {
/** /**
* @brief Initialize the vnode module * @brief Initialize the vnode module
* *
* @param pOption Option of the vnode mnodule
* @return int 0 for success and -1 for failure * @return int 0 for success and -1 for failure
*/ */
int vnodeInit(const SVnodeOpt *pOption); int vnodeInit();
/** /**
* @brief Cleanup the vnode module * @brief Cleanup the vnode module

View File

@ -52,12 +52,6 @@ typedef struct SVnodeMgr {
TdThreadMutex mutex; TdThreadMutex mutex;
TdThreadCond hasTask; TdThreadCond hasTask;
TD_DLIST(SVnodeTask) queue; TD_DLIST(SVnodeTask) queue;
// For vnode Mgmt
PutToQueueFp putToQueryQFp;
PutToQueueFp putToFetchQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SVnodeMgr; } SVnodeMgr;
extern SVnodeMgr vnodeMgr; extern SVnodeMgr vnodeMgr;
@ -81,7 +75,7 @@ struct SVnode {
SWal* pWal; SWal* pWal;
tsem_t canCommit; tsem_t canCommit;
SQHandle* pQuery; SQHandle* pQuery;
void* pWrapper; SMsgCb msgCb;
STfs* pTfs; STfs* pTfs;
}; };

View File

@ -27,7 +27,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnodeCfg cfg = defaultVnodeOptions; SVnodeCfg cfg = defaultVnodeOptions;
if (pVnodeCfg != NULL) { if (pVnodeCfg != NULL) {
cfg.vgId = pVnodeCfg->vgId; cfg.vgId = pVnodeCfg->vgId;
cfg.pWrapper = pVnodeCfg->pWrapper; cfg.msgCb = pVnodeCfg->msgCb;
cfg.pTfs = pVnodeCfg->pTfs; cfg.pTfs = pVnodeCfg->pTfs;
cfg.dbId = pVnodeCfg->dbId; cfg.dbId = pVnodeCfg->dbId;
cfg.hashBegin = pVnodeCfg->hashBegin; cfg.hashBegin = pVnodeCfg->hashBegin;
@ -79,7 +79,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
} }
pVnode->vgId = pVnodeCfg->vgId; pVnode->vgId = pVnodeCfg->vgId;
pVnode->pWrapper = pVnodeCfg->pWrapper; pVnode->msgCb = pVnodeCfg->msgCb;
pVnode->pTfs = pVnodeCfg->pTfs; pVnode->pTfs = pVnodeCfg->pTfs;
pVnode->path = strdup(path); pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);

View File

@ -14,43 +14,33 @@
*/ */
#include "vnd.h" #include "vnd.h"
#include "tglobal.h"
SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED};
static void* loop(void* arg); static void* loop(void* arg);
int vnodeInit(const SVnodeOpt *pOption) { int vnodeInit() {
if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
return 0; return 0;
} }
vnodeMgr.stop = false; vnodeMgr.stop = false;
vnodeMgr.putToQueryQFp = pOption->putToQueryQFp;
vnodeMgr.putToFetchQFp = pOption->putToFetchQFp;
vnodeMgr.sendReqFp = pOption->sendReqFp;
vnodeMgr.sendMnodeReqFp = pOption->sendMnodeReqFp;
vnodeMgr.sendRspFp = pOption->sendRspFp;
// Start commit handers // Start commit handers
if (pOption->nthreads > 0) { vnodeMgr.nthreads = tsNumOfCommitThreads;
vnodeMgr.nthreads = pOption->nthreads; vnodeMgr.threads = calloc(vnodeMgr.nthreads, sizeof(TdThread));
vnodeMgr.threads = (TdThread*)calloc(pOption->nthreads, sizeof(TdThread)); if (vnodeMgr.threads == NULL) {
if (vnodeMgr.threads == NULL) { return -1;
return -1; }
}
taosThreadMutexInit(&(vnodeMgr.mutex), NULL); taosThreadMutexInit(&(vnodeMgr.mutex), NULL);
taosThreadCondInit(&(vnodeMgr.hasTask), NULL); taosThreadCondInit(&(vnodeMgr.hasTask), NULL);
TD_DLIST_INIT(&(vnodeMgr.queue)); TD_DLIST_INIT(&(vnodeMgr.queue));
for (uint16_t i = 0; i < pOption->nthreads; i++) { for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) {
taosThreadCreate(&(vnodeMgr.threads[i]), NULL, loop, NULL); taosThreadCreate(&(vnodeMgr.threads[i]), NULL, loop, NULL);
// pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); // pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
}
} else {
// TODO: if no commit thread is set, then another mechanism should be
// given. Otherwise, it is a false.
ASSERT(0);
} }
if (walInit() < 0) { if (walInit() < 0) {
@ -92,26 +82,6 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
return 0; return 0;
} }
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq);
}
int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.putToFetchQFp)(pVnode->pWrapper, pReq);
}
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
return (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq);
}
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.sendMnodeReqFp)(pVnode->pWrapper, pReq);
}
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp) {
(*vnodeMgr.sendRspFp)(pVnode->pWrapper, pRsp);
}
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) { static void* loop(void* arg) {
setThreadName("vnode-commit"); setThreadName("vnode-commit");

View File

@ -20,8 +20,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg); static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
(putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq);
} }
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }

View File

@ -20,6 +20,7 @@
extern "C" { extern "C" {
#endif #endif
#include "qworker.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "ttimer.h" #include "ttimer.h"
@ -139,18 +140,16 @@ typedef struct SQWSchStatus {
// Qnode/Vnode level task management // Qnode/Vnode level task management
typedef struct SQWorkerMgmt { typedef struct SQWorkerMgmt {
SQWorkerCfg cfg; SQWorkerCfg cfg;
int8_t nodeType; int8_t nodeType;
int32_t nodeId; int32_t nodeId;
void *timer; void *timer;
tmr_h hbTimer; tmr_h hbTimer;
SRWLatch schLock; SRWLatch schLock;
//SRWLatch ctxLock; // SRWLatch ctxLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
void *nodeObj; SMsgCb msgCb;
putReqToQueryQFp putToQueueFp;
sendReqFp sendReqFp;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId #define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId

View File

@ -1442,9 +1442,8 @@ _return:
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer); taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
} }
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
putReqToQueryQFp fp1, sendReqFp fp2) { if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) {
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) {
qError("invalid param to init qworker"); qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
@ -1500,9 +1499,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
mgmt->nodeType = nodeType; mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId; mgmt->nodeId = nodeId;
mgmt->nodeObj = nodeObj; mgmt->msgCb = *pMsgCb;
mgmt->putToQueueFp = fp1;
mgmt->sendReqFp = fp2;
*qWorkerMgmt = mgmt; *qWorkerMgmt = mgmt;

View File

@ -245,15 +245,15 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
req->taskId = tId; req->taskId = tId;
SRpcMsg pNewMsg = { SRpcMsg pNewMsg = {
.handle = pMsg->handle, .handle = pMsg->handle,
.ahandle = pMsg->ahandle, .ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE, .msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req, .pCont = req,
.contLen = sizeof(SQueryContinueReq), .contLen = sizeof(SQueryContinueReq),
.code = 0, .code = 0,
}; };
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code)); QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
rpcFreeCont(req); rpcFreeCont(req);

View File

@ -1080,7 +1080,10 @@ TEST(rcTest, shortExecshortDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0; qwtTestMaxExecTaskUsec = 0;
@ -1161,7 +1164,10 @@ TEST(rcTest, longExecshortDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 1000000; qwtTestMaxExecTaskUsec = 1000000;
@ -1244,7 +1250,10 @@ TEST(rcTest, shortExeclongDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL); SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0; qwtTestMaxExecTaskUsec = 0;