From 16132ff943a83253862f14d6d95de62e90f62ce8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 5 Nov 2020 09:57:25 +0800 Subject: [PATCH] TD-1918 --- src/inc/vnode.h | 2 -- src/mnode/src/mnodeSdb.c | 14 +++++++------- src/vnode/src/vnodeMain.c | 15 --------------- src/vnode/src/vnodeWrite.c | 5 +++-- 4 files changed, 10 insertions(+), 26 deletions(-) diff --git a/src/inc/vnode.h b/src/inc/vnode.h index 670f4f599c..8b3b4b6ed0 100644 --- a/src/inc/vnode.h +++ b/src/inc/vnode.h @@ -63,11 +63,9 @@ int32_t vnodeClose(int32_t vgId); void* vnodeAcquire(int32_t vgId); // add refcount void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue -void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue void vnodeRelease(void *pVnode); // dec refCount void* vnodeGetWal(void *pVnode); - int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg); int32_t vnodeProcessWrite(void *param, int32_t qtype, SVWriteMsg *pWrite); int32_t vnodeCheckWrite(void *pVnode); diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 8fb0b33060..fc986521e6 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -98,8 +98,8 @@ static taos_qall tsSdbWriteQall; static taos_queue tsSdbWriteQueue; static SSdbWriteWorkerPool tsSdbPool; -static int sdbWrite(void *param, void *data, int type); -static int sdbWriteToQueue(void *param, void *data, int type); +static int32_t sdbWrite(void *param, void *data, int32_t type, void *pMsg); +static int32_t sdbWriteToQueue(void *param, void *data, int32_t type, void *pMsg); static void * sdbWorkerFp(void *param); static int32_t sdbInitWriteWorker(); static void sdbCleanupWriteWorker(); @@ -575,7 +575,7 @@ static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper) { return TSDB_CODE_SUCCESS; } -static int sdbWrite(void *param, void *data, int type) { +static int sdbWrite(void *param, void *data, int32_t type, void *pMsg) { SSdbOper *pOper = param; SWalHead *pHead = data; int32_t tableId = pHead->msgType / 10; @@ -1040,13 +1040,13 @@ void sdbFreeWritequeue() { tsSdbWriteQueue = NULL; } -int sdbWriteToQueue(void *param, void *data, int type) { +int32_t sdbWriteToQueue(void *param, void *data, int32_t qtype, void *pMsg) { SWalHead *pHead = data; - int size = sizeof(SWalHead) + pHead->len; + int32_t size = sizeof(SWalHead) + pHead->len; SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); memcpy(pWal, pHead, size); - taosWriteQitem(tsSdbWriteQueue, type, pWal); + taosWriteQitem(tsSdbWriteQueue, qtype, pWal); return 0; } @@ -1081,7 +1081,7 @@ static void *sdbWorkerFp(void *param) { pOper = NULL; } - int32_t code = sdbWrite(pOper, pHead, type); + int32_t code = sdbWrite(pOper, pHead, type, NULL); if (code > 0) code = 0; if (pOper) { pOper->retCode = code; diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 7cbbf0feb8..49b7e4e712 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -483,21 +483,6 @@ void *vnodeAcquireRqueue(int32_t vgId) { return pVnode->rqueue; } -void *vnodeAcquireWqueue(int32_t vgId) { - SVnodeObj *pVnode = vnodeAcquire(vgId); - if (pVnode == NULL) return NULL; - - int32_t code = vnodeCheckWrite(pVnode); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - vInfo("vgId:%d, can not provide write service, status is %s", vgId, vnodeStatus[pVnode->status]); - vnodeRelease(pVnode); - return NULL; - } - - return pVnode->wqueue; -} - void *vnodeGetWal(void *pVnode) { return ((SVnodeObj *)pVnode)->wal; } diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 4432f98db0..0d521b4d2e 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -208,8 +208,9 @@ int32_t vnodeWriteToQueue(void *vparam, void *wparam, int32_t qtype, void *pMsg) SVnodeObj *pVnode = vparam; SWalHead * pHead = wparam; - if (qtype == TAOS_QTYPE_RPC && vnodeCheckWrite(pVnode) != TSDB_CODE_SUCCESS) { - return TSDB_CODE_VND_INVALID_VGROUP_ID; + if (qtype == TAOS_QTYPE_RPC) { + int32_t code = vnodeCheckWrite(pVnode); + if (code != TSDB_CODE_SUCCESS) return code; } int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;