[td-225]tracking the total number of qhandle in dnode.
This commit is contained in:
parent
24e24ba163
commit
4fd1d1f17a
|
@ -26,6 +26,7 @@ extern "C" {
|
|||
#include "vnode.h"
|
||||
|
||||
extern int32_t vDebugFlag;
|
||||
extern int32_t vNumOfExistedQHandle; // current initialized and existed query handle in current dnode
|
||||
|
||||
#define vFatal(...) { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", 255, __VA_ARGS__); }}
|
||||
#define vError(...) { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", 255, __VA_ARGS__); }}
|
||||
|
|
|
@ -21,6 +21,8 @@
|
|||
#include "query.h"
|
||||
#include "vnodeStatus.h"
|
||||
|
||||
int32_t vNumOfExistedQHandle; // current initialized and existed query handle in current dnode
|
||||
|
||||
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
|
||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
||||
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
|
||||
|
@ -247,7 +249,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
if (handle == NULL) { // failed to register qhandle
|
||||
pRsp->code = terrno;
|
||||
terrno = 0;
|
||||
vError("vgId:%d, QInfo:0x%"PRIx64 "-%p register qhandle failed, return to app, code:%s", pVnode->vgId, qId, (void *)pQInfo,
|
||||
|
||||
vError("vgId:%d, QInfo:0x%"PRIx64 "-%p register qhandle failed, return to app, code:%s,", pVnode->vgId, qId, (void *)pQInfo,
|
||||
tstrerror(pRsp->code));
|
||||
qDestroyQueryInfo(pQInfo); // destroy it directly
|
||||
return pRsp->code;
|
||||
|
@ -260,10 +263,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
vnodeNotifyCurrentQhandle(pRead->rpcHandle, qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:0x%"PRIx64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle,
|
||||
pRead->rpcHandle);
|
||||
|
||||
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
return pRsp->code;
|
||||
}
|
||||
|
||||
} else {
|
||||
assert(pQInfo == NULL);
|
||||
}
|
||||
|
@ -277,6 +282,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
return pRsp->code;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t remain = atomic_add_fetch_32(&vNumOfExistedQHandle, 1);
|
||||
vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain);
|
||||
} else {
|
||||
assert(pCont != NULL);
|
||||
void **qhandle = (void **)pRead->qhandle;
|
||||
|
@ -318,8 +326,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
|
||||
// If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
|
||||
if (freehandle || (!buildRes)) {
|
||||
if (freehandle) {
|
||||
int32_t remain = atomic_sub_fetch_32(&vNumOfExistedQHandle, 1);
|
||||
vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain);
|
||||
}
|
||||
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,7 +371,10 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
|
||||
// kill current query and free corresponding resources.
|
||||
if (pRetrieve->free == 1) {
|
||||
vWarn("vgId:%d, QInfo:%"PRIx64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qId, *handle);
|
||||
int32_t remain = atomic_sub_fetch_32(&vNumOfExistedQHandle, 1);
|
||||
vWarn("vgId:%d, QInfo:%"PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d", pVnode->vgId, pRetrieve->qId,
|
||||
*handle, remain);
|
||||
|
||||
qKillQuery(*handle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
|
||||
|
@ -368,7 +385,10 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
|
||||
// register the qhandle to connect to quit query immediate if connection is broken
|
||||
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, conn:%p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
|
||||
int32_t remain = atomic_sub_fetch_32(&vNumOfExistedQHandle, 1);
|
||||
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d", pVnode->vgId, pRetrieve->qhandle,
|
||||
*handle, pRead->rpcHandle, remain);
|
||||
|
||||
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
qKillQuery(*handle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
|
@ -390,7 +410,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
if (!tsRetrieveBlockingModel) {
|
||||
if (!buildRes) {
|
||||
assert(pRead->rpcHandle != NULL);
|
||||
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
|
||||
return TSDB_CODE_QRY_NOT_READY;
|
||||
}
|
||||
|
@ -403,6 +422,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
|||
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
|
||||
// Here free qhandle immediately
|
||||
if (freeHandle) {
|
||||
int32_t remain = atomic_sub_fetch_32(&vNumOfExistedQHandle, 1);
|
||||
vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue