refactor: mnode worker

This commit is contained in:
Shengliang Guan 2022-06-14 11:10:36 +08:00
parent a3f1273640
commit d93f4c4a83
3 changed files with 35 additions and 36 deletions

View File

@ -43,10 +43,6 @@ typedef struct SMnodeMgmt {
int32_t mmReadFile(SMnodeMgmt *pMgmt, SReplica *pReplica, bool *pDeployed);
int32_t mmWriteFile(SMnodeMgmt *pMgmt, const SReplica *pReplica, bool deployed);
// mmInt.c
int32_t mmAcquire(SMnodeMgmt *pMgmt);
void mmRelease(SMnodeMgmt *pMgmt);
// mmHandle.c
SArray *mmGetMsgHandles();
int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);

View File

@ -164,22 +164,3 @@ SMgmtFunc mmGetMgmtFunc() {
return mgmtFunc;
}
int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) {
code = -1;
} else {
atomic_add_fetch_32(&pMgmt->refCount, 1);
}
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock);
}

View File

@ -16,6 +16,25 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
static inline int32_t mmAcquire(SMnodeMgmt *pMgmt) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMgmt->lock);
if (pMgmt->stopped) {
code = -1;
} else {
atomic_add_fetch_32(&pMgmt->refCount, 1);
}
taosThreadRwlockUnlock(&pMgmt->lock);
return code;
}
static inline void mmRelease(SMnodeMgmt *pMgmt) {
taosThreadRwlockRdlock(&pMgmt->lock);
atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosThreadRwlockUnlock(&pMgmt->lock);
}
static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
SRpcMsg rsp = {
.code = code,
@ -107,27 +126,30 @@ inline int32_t mmPutMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
inline int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1;
dTrace("msg:%p, is created", pMsg);
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
SSingleWorker *pWorker = NULL;
switch (qtype) {
case WRITE_QUEUE:
return mmPutMsgToWorker(pMgmt, &pMgmt->writeWorker, pMsg);
pWorker = &pMgmt->writeWorker;
break;
case QUERY_QUEUE:
return mmPutMsgToWorker(pMgmt, &pMgmt->queryWorker, pMsg);
pWorker = &pMgmt->queryWorker;
break;
case READ_QUEUE:
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
pWorker = &pMgmt->readWorker;
break;
case SYNC_QUEUE:
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
pWorker = &pMgmt->syncWorker;
break;
default:
terrno = TSDB_CODE_INVALID_PARA;
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pMsg->pCont);
return -1;
}
if (pWorker == NULL) return -1;
SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) return -1;
dTrace("msg:%p, is created and will put int %s queue", pMsg, pWorker->name);
return mmPutMsgToWorker(pMgmt, pWorker, pMsg);
}
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {