refactor: adjust vnode queue

This commit is contained in:
Shengliang Guan 2022-06-02 18:26:29 +08:00
parent a682999c1d
commit a9b32dc327
12 changed files with 92 additions and 158 deletions

View File

@ -38,7 +38,7 @@ typedef enum {
QUEUE_MAX, QUEUE_MAX,
} EQueueType; } EQueueType;
typedef int32_t (*PutToQueueFp)(void* pMgmt, SRpcMsg* pMsg); typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
typedef void (*SendRspFp)(SRpcMsg* pMsg); typedef void (*SendRspFp)(SRpcMsg* pMsg);
@ -50,7 +50,7 @@ typedef void (*ReportStartup)(const char* name, const char* desc);
typedef struct { typedef struct {
void* mgmt; void* mgmt;
void* clientRpc; void* clientRpc;
PutToQueueFp queueFps[QUEUE_MAX]; PutToQueueFp putToQueueFp;
GetQueueSizeFp qsizeFp; GetQueueSizeFp qsizeFp;
SendReqFp sendReqFp; SendReqFp sendReqFp;
SendRspFp sendRspFp; SendRspFp sendRspFp;

View File

@ -22,41 +22,21 @@ static SMsgCb defaultMsgCb;
void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; } void tmsgSetDefault(const SMsgCb* msgcb) { defaultMsgCb = *msgcb; }
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) { int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
PutToQueueFp fp = msgcb->queueFps[qtype]; return (*msgcb->putToQueueFp)(msgcb->mgmt, qtype, pMsg);
return (*fp)(msgcb->mgmt, pMsg);
} }
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) { int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
GetQueueSizeFp fp = msgcb->qsizeFp; return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype);
return (*fp)(msgcb->mgmt, vgId, qtype);
} }
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) { int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) { return (*defaultMsgCb.sendReqFp)(epSet, pMsg); }
SendReqFp fp = defaultMsgCb.sendReqFp;
return (*fp)(epSet, pMsg);
}
void tmsgSendRsp(SRpcMsg* pMsg) { void tmsgSendRsp(SRpcMsg* pMsg) { return (*defaultMsgCb.sendRspFp)(pMsg); }
SendRspFp fp = defaultMsgCb.sendRspFp;
return (*fp)(pMsg);
}
void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) { void tmsgSendRedirectRsp(SRpcMsg* pMsg, const SEpSet* pNewEpSet) { (*defaultMsgCb.sendRedirectRspFp)(pMsg, pNewEpSet); }
SendRedirectRspFp fp = defaultMsgCb.sendRedirectRspFp;
(*fp)(pMsg, pNewEpSet);
}
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg) { (*defaultMsgCb.registerBrokenLinkArgFp)(pMsg); }
RegisterBrokenLinkArgFp fp = defaultMsgCb.registerBrokenLinkArgFp;
(*fp)(pMsg);
}
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.releaseHandleFp)(pHandle, type); }
ReleaseHandleFp fp = defaultMsgCb.releaseHandleFp;
(*fp)(pHandle, type);
}
void tmsgReportStartup(const char* name, const char* desc) { void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); }
ReportStartup fp = defaultMsgCb.reportStartupFp;
(*fp)(name, desc);
}

View File

@ -65,10 +65,7 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -105,10 +105,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.putToQueueFp = (PutToQueueFp)mmPutRpcMsgToQueue;
pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL); taosThreadRwlockInit(&pMgmt->lock, NULL);

View File

@ -96,40 +96,38 @@ int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); return mmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
} }
static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { int32_t mmPutRpcMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
taosWriteQitem(pWorker->queue, pMsg);
switch (qtype) {
case WRITE_QUEUE:
dTrace("msg:%p, is created and will put into vnode-write queue", pMsg);
taosWriteQitem(pMgmt->writeWorker.queue, pMsg);
return 0;
case QUERY_QUEUE:
dTrace("msg:%p, is created and will put into vnode-query queue", pMsg);
taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0; return 0;
}
int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { case READ_QUEUE:
return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pMsg); dTrace("msg:%p, is created and will put into vnode-read queue", pMsg);
} taosWriteQitem(pMgmt->readWorker.queue, pMsg);
return 0;
int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { case SYNC_QUEUE:
return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pMsg);
}
int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg);
}
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t code = -1;
if (mmAcquire(pMgmt) == 0) { if (mmAcquire(pMgmt) == 0) {
code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); dTrace("msg:%p, is created and will put into vnode-sync queue", pMsg);
taosWriteQitem(pMgmt->syncWorker.queue, pMsg);
mmRelease(pMgmt); mmRelease(pMgmt);
return 0;
} else {
return -1;
} }
default:
if (code != 0) { terrno = TSDB_CODE_INVALID_PARA;
rpcFreeCont(pMsg->pCont); return -1;
pMsg->pCont = NULL;
} }
return code;
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {

View File

@ -42,8 +42,7 @@ int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
// qmWorker.c // qmWorker.c
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pMsg);
int32_t qmPutRpcMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype); int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype);
int32_t qmStartWorker(SQnodeMgmt *pMgmt); int32_t qmStartWorker(SQnodeMgmt *pMgmt);

View File

@ -43,8 +43,7 @@ static int32_t qmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qmPutRpcMsgToQueryQueue; pMgmt->msgCb.putToQueueFp = (PutToQueueFp)qmPutRpcMsgToQueue;
pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)qmPutRpcMsgToFetchQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)qmGetQueueSize; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)qmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;

View File

@ -68,22 +68,24 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return qmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg); return qmPutNodeMsgToWorker(&pMgmt->monitorWorker, pMsg);
} }
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
dTrace("msg:%p, create and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
memcpy(pMsg, pRpc, sizeof(SRpcMsg)); memcpy(pMsg, pRpc, sizeof(SRpcMsg));
taosWriteQitem(pWorker->queue, pMsg);
switch (qtype) {
case QUERY_QUEUE:
dTrace("msg:%p, is created and will put into qnode-query queue", pMsg);
taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return 0; return 0;
case READ_QUEUE:
dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg);
taosWriteQitem(pMgmt->fetchWorker.queue, pMsg);
return 0;
default:
terrno = TSDB_CODE_INVALID_PARA;
return -1;
} }
int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->queryWorker, pRpc);
}
int32_t qmPutRpcMsgToFetchQueue(SQnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return qmPutRpcMsgToWorker(pMgmt, &pMgmt->fetchWorker, pRpc);
} }
int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {

View File

@ -100,13 +100,8 @@ void vmStopWorker(SVnodeMgmt *pMgmt);
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode);
int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToApplyQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype); int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype);
int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);
int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);

View File

@ -156,7 +156,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
pMgmt->state.totalVnodes = numOfVnodes; pMgmt->state.totalVnodes = numOfVnodes;
int32_t threadNum = tsNumOfCores; int32_t threadNum = 1;
int32_t vnodesPerThread = numOfVnodes / threadNum + 1; int32_t vnodesPerThread = numOfVnodes / threadNum + 1;
SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread));
@ -249,12 +249,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->path = pInput->path; pMgmt->path = pInput->path;
pMgmt->name = pInput->name; pMgmt->name = pInput->name;
pMgmt->msgCb = pInput->msgCb; pMgmt->msgCb = pInput->msgCb;
pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToWriteQueue; pMgmt->msgCb.putToQueueFp = (PutToQueueFp)vmPutRpcMsgToQueue;
pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)vmPutRpcMsgToSyncQueue;
pMgmt->msgCb.queueFps[APPLY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToApplyQueue;
pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)vmPutRpcMsgToQueryQueue;
pMgmt->msgCb.queueFps[FETCH_QUEUE] = (PutToQueueFp)vmPutRpcMsgToFetchQueue;
pMgmt->msgCb.queueFps[MERGE_QUEUE] = (PutToQueueFp)vmPutRpcMsgToMergeQueue;
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt; pMgmt->msgCb.mgmt = pMgmt;
taosThreadRwlockInit(&pMgmt->lock, NULL); taosThreadRwlockInit(&pMgmt->lock, NULL);

View File

@ -304,14 +304,9 @@ int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return 0; return 0;
} }
static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType qtype) { int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) { if (pMsg == NULL) return -1;
rpcFreeCont(pRpc->pCont);
pRpc->pCont = NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
@ -322,28 +317,6 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q
return vmPutMsgToQueue(pMgmt, pMsg, qtype); return vmPutMsgToQueue(pMgmt, pMsg, qtype);
} }
int32_t vmPutRpcMsgToWriteQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, WRITE_QUEUE);
}
int32_t vmPutRpcMsgToSyncQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pMgmt, pRpc, SYNC_QUEUE); }
int32_t vmPutRpcMsgToApplyQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, APPLY_QUEUE);
}
int32_t vmPutRpcMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, QUERY_QUEUE);
}
int32_t vmPutRpcMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, FETCH_QUEUE);
}
int32_t vmPutRpcMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc) {
return vmPutRpcMsgToQueue(pMgmt, pRpc, MERGE_QUEUE);
}
int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
int32_t size = -1; int32_t size = -1;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId);

View File

@ -172,8 +172,7 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
return 0; return 0;
} }
int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestQueryQueueLock); taosWLockLatch(&qwtTestQueryQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
@ -704,7 +703,7 @@ void *qwtclientThread(void *param) {
qwtTestCaseFinished = false; qwtTestCaseFinished = false;
qwtBuildQueryReqMsg(&queryRpc); qwtBuildQueryReqMsg(&queryRpc);
qwtPutReqToQueue((void *)0x1, &queryRpc); qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc);
while (!qwtTestCaseFinished) { while (!qwtTestCaseFinished) {
taosUsleep(1); taosUsleep(1);
@ -874,7 +873,7 @@ TEST(seqTest, normalCase) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
@ -910,7 +909,7 @@ TEST(seqTest, cancelFirst) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
@ -947,7 +946,7 @@ TEST(seqTest, randCase) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
@ -1018,7 +1017,7 @@ TEST(seqTest, multithreadRand) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
@ -1081,7 +1080,7 @@ TEST(rcTest, shortExecshortDelay) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
@ -1165,7 +1164,7 @@ TEST(rcTest, longExecshortDelay) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
@ -1251,7 +1250,7 @@ TEST(rcTest, shortExeclongDelay) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
@ -1335,7 +1334,7 @@ TEST(rcTest, dropTest) {
SMsgCb msgCb = {0}; SMsgCb msgCb = {0};
msgCb.mgmt = (void *)mockPointer; msgCb.mgmt = (void *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue; msgCb.putToQueueFp = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);