From cc2dfbf5ef8bb7596f0a53ba067dc0a4ac37fac9 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sun, 5 Apr 2020 09:08:32 +0800 Subject: [PATCH] add code for write response --- src/dnode/inc/vnode.h | 5 +++++ src/dnode/src/dnodeRead.c | 2 +- src/dnode/src/dnodeWrite.c | 5 +++-- src/dnode/src/vnodeWrite.c | 34 +++++++++++++++++++++++----------- 4 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/dnode/inc/vnode.h b/src/dnode/inc/vnode.h index 1cfecca456..d5edea6564 100644 --- a/src/dnode/inc/vnode.h +++ b/src/dnode/inc/vnode.h @@ -20,6 +20,11 @@ extern "C" { #endif +typedef struct { + int len; + void *rsp; +} SRspRet; + int32_t vnodeInitWrite(); int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeDrop(int32_t vgId); diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 530fc1a776..4e37b5733e 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -289,7 +289,7 @@ static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); } else { // no further execution invoked, release the ref to vnode dnodeProcessReadResult(pVnode, pMsg); - dnodeReleaseVnode(pVnode); + vnodeRelease(pVnode); } } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 66651fdf5f..6f46587f2c 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -34,6 +34,7 @@ typedef struct { } SWriteWorker; typedef struct { + SRspRet rspRet; void *pCont; int32_t contLen; SRpcMsg rpcMsg; @@ -148,8 +149,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code) { SRpcMsg rpcRsp = { .handle = pWrite->rpcMsg.handle, - .pCont = NULL, - .contLen = 0, + .pCont = pWrite->rspRet.rsp, + .contLen = pWrite->rspRet.len, .code = code, }; diff --git a/src/dnode/src/vnodeWrite.c b/src/dnode/src/vnodeWrite.c index 4133b666d6..2b05aab315 100644 --- a/src/dnode/src/vnodeWrite.c +++ b/src/dnode/src/vnodeWrite.c @@ -29,11 +29,11 @@ #include "vnodeInt.h" static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *, void*); -static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, void *); -static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, void *); -static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, void *); -static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, void *); -static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, void *); +static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); +static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pMsg, SRspRet *); int32_t vnodeInitWrite() { vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; @@ -49,6 +49,9 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) { int32_t code = 0; SVnodeObj *pVnode = (SVnodeObj *)param; + if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) + return TSDB_CODE_MSG_NOT_PROCESSED; + if (pVnode->status == VN_STATUS_DELETING || pVnode->status == VN_STATUS_CLOSING) return TSDB_CODE_NOT_ACTIVE_VNODE; @@ -73,7 +76,7 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) { // write into WAL code = walWrite(pVnode->wal, pHead); if ( code < 0) return code; - + code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item); if (code < 0) return code; @@ -86,7 +89,7 @@ int32_t vnodeProcessWrite(void *param, int qtype, SWalHead *pHead, void *item) { return code; } -static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, void *item) { +static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { int32_t code = 0; // save insert result into item @@ -94,10 +97,19 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, void *item) dTrace("pVnode:%p vgId:%d, submit msg is processed", pVnode, pVnode->vgId); code = tsdbInsertData(pVnode->tsdb, pCont); + pRet->len = sizeof(SShellSubmitRspMsg); + pRet->rsp = rpcMallocCont(pRet->len); + SShellSubmitRspMsg *pRsp = pRet->rsp; + + pRsp->code = 0; + pRsp->numOfRows = htonl(1); + pRsp->affectedRows = htonl(1); + pRsp->numOfFailedBlocks = 0; + return code; } -static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { +static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { SMDCreateTableMsg *pTable = pCont; int32_t code = 0; @@ -153,7 +165,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, void * return code; } -static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { +static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { SMDDropTableMsg *pTable = pCont; int32_t code = 0; @@ -168,7 +180,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, void *it return code; } -static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, void *item) { +static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { SMDCreateTableMsg *pTable = pCont; int32_t code = 0; @@ -223,7 +235,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, void *i return code; } -static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, void *item) { +static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { SMDDropSTableMsg *pTable = pCont; int32_t code = 0;