Merge branch 'hotfix/killquery' of https://github.com/taosdata/TDengine into hotfix/killquery
This commit is contained in:
commit
bc1ba1281c
|
@ -26,13 +26,6 @@
|
||||||
#include "dnodeVRead.h"
|
#include "dnodeVRead.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SRspRet rspRet;
|
|
||||||
void *pCont;
|
|
||||||
int32_t contLen;
|
|
||||||
SRpcMsg rpcMsg;
|
|
||||||
} SReadMsg;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
pthread_t thread; // thread
|
pthread_t thread; // thread
|
||||||
int32_t workerId; // worker ID
|
int32_t workerId; // worker ID
|
||||||
|
@ -218,7 +211,7 @@ static void *dnodeProcessReadQueue(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
|
dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
|
||||||
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
|
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
|
||||||
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
|
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
|
||||||
taosFreeQitem(pReadMsg);
|
taosFreeQitem(pReadMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,13 @@ typedef struct {
|
||||||
void *qhandle; //used by query and retrieve msg
|
void *qhandle; //used by query and retrieve msg
|
||||||
} SRspRet;
|
} SRspRet;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SRspRet rspRet;
|
||||||
|
void *pCont;
|
||||||
|
int32_t contLen;
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
} SReadMsg;
|
||||||
|
|
||||||
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
|
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg);
|
||||||
int32_t vnodeDrop(int32_t vgId);
|
int32_t vnodeDrop(int32_t vgId);
|
||||||
int32_t vnodeOpen(int32_t vgId, char *rootDir);
|
int32_t vnodeOpen(int32_t vgId, char *rootDir);
|
||||||
|
@ -52,7 +59,7 @@ void* vnodeGetWal(void *pVnode);
|
||||||
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
|
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
|
||||||
void vnodeBuildStatusMsg(void * param);
|
void vnodeBuildStatusMsg(void * param);
|
||||||
|
|
||||||
int32_t vnodeProcessRead(void *pVnode, int msgType, void *pCont, int32_t contLen, SRspRet *ret);
|
int32_t vnodeProcessRead(void *pVnode, SReadMsg *pReadMsg);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,17 +27,18 @@
|
||||||
#include "vnodeLog.h"
|
#include "vnodeLog.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
|
||||||
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, int32_t contLen, SRspRet *pRet);
|
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
|
||||||
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet);
|
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
|
||||||
|
|
||||||
void vnodeInitReadFp(void) {
|
void vnodeInitReadFp(void) {
|
||||||
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
|
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
|
||||||
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
|
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
|
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
|
int msgType = pReadMsg->rpcMsg.msgType;
|
||||||
|
|
||||||
if (vnodeProcessReadMsgFp[msgType] == NULL) {
|
if (vnodeProcessReadMsgFp[msgType] == NULL) {
|
||||||
vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
|
vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
|
||||||
|
@ -55,10 +56,14 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen,
|
||||||
return TSDB_CODE_RPC_NOT_READY;
|
return TSDB_CODE_RPC_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
|
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
void * pCont = pReadMsg->pCont;
|
||||||
|
int32_t contLen = pReadMsg->contLen;
|
||||||
|
SRspRet *pRet = &pReadMsg->rspRet;
|
||||||
|
|
||||||
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
|
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pCont;
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
memset(pRet, 0, sizeof(SRspRet));
|
||||||
|
|
||||||
|
@ -91,7 +96,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t contLen, SRspRet *pRet) {
|
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
void * pCont = pReadMsg->pCont;
|
||||||
|
SRspRet *pRet = &pReadMsg->rspRet;
|
||||||
|
|
||||||
SRetrieveTableMsg *pRetrieve = pCont;
|
SRetrieveTableMsg *pRetrieve = pCont;
|
||||||
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
|
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
memset(pRet, 0, sizeof(SRspRet));
|
||||||
|
|
Loading…
Reference in New Issue