[td-225]fix bugs in qmgmt management.
This commit is contained in:
parent
385f45a1eb
commit
44b91cac2a
|
@ -20,7 +20,6 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef void* qinfo_t;
|
typedef void* qinfo_t;
|
||||||
typedef void (*_qinfo_free_fn_t)(void*);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* create the qinfo object according to QueryTableMsg
|
* create the qinfo object according to QueryTableMsg
|
||||||
|
@ -29,13 +28,8 @@ typedef void (*_qinfo_free_fn_t)(void*);
|
||||||
* @param qinfo
|
* @param qinfo
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, _qinfo_free_fn_t fn, qinfo_t* qinfo);
|
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, void* param, qinfo_t* qinfo);
|
||||||
|
|
||||||
/**
|
|
||||||
* Destroy QInfo object
|
|
||||||
* @param qinfo qhandle
|
|
||||||
*/
|
|
||||||
void qDestroyQueryInfo(qinfo_t qinfo);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* the main query execution function, including query on both table and multitables,
|
* the main query execution function, including query on both table and multitables,
|
||||||
|
@ -84,8 +78,14 @@ bool qHasMoreResultsToRetrieve(qinfo_t qinfo);
|
||||||
*/
|
*/
|
||||||
int32_t qKillQuery(qinfo_t qinfo);
|
int32_t qKillQuery(qinfo_t qinfo);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* destroy query info structure
|
||||||
|
* @param qHandle
|
||||||
|
*/
|
||||||
|
void qDestroyQueryInfo(qinfo_t qHandle);
|
||||||
|
|
||||||
void* qOpenQueryMgmt(int32_t vgId);
|
void* qOpenQueryMgmt(int32_t vgId);
|
||||||
void qSetQueryMgmtClosed(void* pExecutor);
|
void qQueryMgmtNotifyClosed(void* pExecutor);
|
||||||
void qCleanupQueryMgmt(void* pExecutor);
|
void qCleanupQueryMgmt(void* pExecutor);
|
||||||
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
|
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
|
||||||
void** qAcquireQInfo(void* pMgmt, uint64_t key);
|
void** qAcquireQInfo(void* pMgmt, uint64_t key);
|
||||||
|
|
|
@ -33,17 +33,20 @@ typedef struct SCacheStatis {
|
||||||
int64_t refreshCount;
|
int64_t refreshCount;
|
||||||
} SCacheStatis;
|
} SCacheStatis;
|
||||||
|
|
||||||
|
struct STrashElem;
|
||||||
|
|
||||||
typedef struct SCacheDataNode {
|
typedef struct SCacheDataNode {
|
||||||
uint64_t addedTime; // the added time when this element is added or updated into cache
|
uint64_t addedTime; // the added time when this element is added or updated into cache
|
||||||
uint64_t lifespan; // expiredTime expiredTime when this element should be remove from cache
|
uint64_t lifespan; // life duration when this element should be remove from cache
|
||||||
uint64_t signature;
|
uint64_t expireTime; // expire time
|
||||||
uint32_t size; // allocated size for current SCacheDataNode
|
uint64_t signature;
|
||||||
|
struct STrashElem *pTNodeHeader; // point to trash node head
|
||||||
|
uint16_t keySize: 15; // max key size: 32kb
|
||||||
|
bool inTrashCan: 1;// denote if it is in trash or not
|
||||||
|
uint32_t size; // allocated size for current SCacheDataNode
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
uint16_t keySize: 15; // max key size: 32kb
|
char *key;
|
||||||
bool inTrashCan: 1;// denote if it is in trash or not
|
char data[];
|
||||||
int32_t extendFactor; // number of life span extend
|
|
||||||
char *key;
|
|
||||||
char data[];
|
|
||||||
} SCacheDataNode;
|
} SCacheDataNode;
|
||||||
|
|
||||||
typedef struct STrashElem {
|
typedef struct STrashElem {
|
||||||
|
|
|
@ -116,11 +116,13 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t size = pNode->size;
|
|
||||||
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
|
taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
|
||||||
|
|
||||||
|
pCacheObj->totalSize -= pNode->size;
|
||||||
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
|
uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes",
|
||||||
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, size);
|
pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize,
|
||||||
|
pNode->size);
|
||||||
|
|
||||||
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
|
if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data);
|
||||||
free(pNode);
|
free(pNode);
|
||||||
}
|
}
|
||||||
|
@ -285,7 +287,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
|
||||||
|
|
||||||
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
|
uDebug("cache:%s, key:%p, %p added into cache, added:%" PRIu64 ", expire:%" PRIu64 ", totalNum:%d totalSize:%" PRId64
|
||||||
"bytes size:%" PRId64 "bytes",
|
"bytes size:%" PRId64 "bytes",
|
||||||
pCacheObj->name, key, pNode->data, pNode->addedTime, (pNode->lifespan * pNode->extendFactor + pNode->addedTime),
|
pCacheObj->name, key, pNode->data, pNode->addedTime, pNode->expireTime,
|
||||||
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize);
|
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, dataSize);
|
||||||
} else {
|
} else {
|
||||||
uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
|
uError("cache:%s, key:%p, failed to added into cache, out of memory", pCacheObj->name, key);
|
||||||
|
@ -312,16 +314,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
|
||||||
int32_t ref = 0;
|
int32_t ref = 0;
|
||||||
if (ptNode != NULL) {
|
if (ptNode != NULL) {
|
||||||
ref = T_REF_INC(*ptNode);
|
ref = T_REF_INC(*ptNode);
|
||||||
|
|
||||||
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
|
|
||||||
if (pCacheObj->extendLifespan) {
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
|
|
||||||
if ((now - (*ptNode)->addedTime) < (*ptNode)->lifespan * (*ptNode)->extendFactor) {
|
|
||||||
(*ptNode)->extendFactor += 1;
|
|
||||||
uDebug("key:%p extend life time to %"PRId64, key, (*ptNode)->lifespan * (*ptNode)->extendFactor + (*ptNode)->addedTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
__cache_unlock(pCacheObj);
|
__cache_unlock(pCacheObj);
|
||||||
|
|
||||||
|
@ -347,8 +339,7 @@ void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t ke
|
||||||
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
|
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen);
|
||||||
if (ptNode != NULL) {
|
if (ptNode != NULL) {
|
||||||
T_REF_INC(*ptNode);
|
T_REF_INC(*ptNode);
|
||||||
(*ptNode)->extendFactor += 1;
|
(*ptNode)->expireTime = taosGetTimestampMs() + (*ptNode)->lifespan;
|
||||||
// (*ptNode)->lifespan = expireTime;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
__cache_unlock(pCacheObj);
|
__cache_unlock(pCacheObj);
|
||||||
|
@ -380,17 +371,6 @@ void *taosCacheAcquireByData(SCacheObj *pCacheObj, void *data) {
|
||||||
int32_t ref = T_REF_INC(ptNode);
|
int32_t ref = T_REF_INC(ptNode);
|
||||||
uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
|
uDebug("cache:%s, data: %p acquired by data in cache, refcnt:%d", pCacheObj->name, ptNode->data, ref);
|
||||||
|
|
||||||
// if the remained life span is less then the (*ptNode)->lifeSpan, add up one lifespan
|
|
||||||
if (pCacheObj->extendLifespan) {
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
|
|
||||||
if ((now - ptNode->addedTime) < ptNode->lifespan * ptNode->extendFactor) {
|
|
||||||
ptNode->extendFactor += 1;
|
|
||||||
uDebug("cache:%s, %p extend life time to %" PRId64, pCacheObj->name, ptNode->data,
|
|
||||||
ptNode->lifespan * ptNode->extendFactor + ptNode->addedTime);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
|
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
|
||||||
assert(ref >= 2);
|
assert(ref >= 2);
|
||||||
return data;
|
return data;
|
||||||
|
@ -431,22 +411,58 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
}
|
}
|
||||||
|
|
||||||
*data = NULL;
|
*data = NULL;
|
||||||
int16_t ref = T_REF_DEC(pNode);
|
|
||||||
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref);
|
|
||||||
|
|
||||||
if (_remove && (!pNode->inTrashCan)) {
|
// note: extend lifespan before dec ref count
|
||||||
__cache_wr_lock(pCacheObj);
|
if (pCacheObj->extendLifespan) {
|
||||||
|
atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
|
||||||
|
uDebug("cache:%s data:%p extend life time to %"PRId64 " before release", pCacheObj->name, pNode->data, pNode->expireTime);
|
||||||
|
}
|
||||||
|
|
||||||
if (T_REF_VAL_GET(pNode) == 0) {
|
bool inTrashCan = pNode->inTrashCan;
|
||||||
// remove directly, if not referenced by other users
|
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, T_REF_VAL_GET(pNode) - 1);
|
||||||
taosCacheReleaseNode(pCacheObj, pNode);
|
|
||||||
} else {
|
// NOTE: once refcount is decrease, pNode may be free by other thread immediately.
|
||||||
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
|
int32_t ref = T_REF_DEC(pNode);
|
||||||
// So we need to lock it in the first place.
|
|
||||||
taosCacheMoveToTrash(pCacheObj, pNode);
|
if (inTrashCan) {
|
||||||
|
// Remove it if the ref count is 0.
|
||||||
|
// The ref count does not need to load and check again after lock acquired, since ref count can not be increased when
|
||||||
|
// the node is in trashcan.
|
||||||
|
if (ref == 0) {
|
||||||
|
__cache_wr_lock(pCacheObj);
|
||||||
|
assert(pNode->pTNodeHeader->pData == pNode);
|
||||||
|
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
|
||||||
|
__cache_unlock(pCacheObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
__cache_unlock(pCacheObj);
|
} else {
|
||||||
|
assert(pNode->pTNodeHeader == NULL);
|
||||||
|
|
||||||
|
if (_remove) { // not in trash can, but need to remove it
|
||||||
|
__cache_wr_lock(pCacheObj);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If not referenced by other users. Otherwise move this node to trashcan wait for all users
|
||||||
|
* releasing this resources.
|
||||||
|
*
|
||||||
|
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
|
||||||
|
* that tries to do the same thing.
|
||||||
|
*/
|
||||||
|
if (ref == 0) {
|
||||||
|
if (T_REF_VAL_GET(pNode) == 0) {
|
||||||
|
taosCacheReleaseNode(pCacheObj, pNode);
|
||||||
|
} else {
|
||||||
|
taosCacheMoveToTrash(pCacheObj, pNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
__cache_unlock(pCacheObj);
|
||||||
|
// } else { // extend its life time
|
||||||
|
// if (pCacheObj->extendLifespan) {
|
||||||
|
// atomic_store_64(&pNode->expireTime, pNode->lifespan + taosGetTimestampMs());
|
||||||
|
// uDebug("cache:%s data:%p extend life time to %"PRId64 " after release", pCacheObj->name, pNode->data, pNode->expireTime);
|
||||||
|
// }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -486,7 +502,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj) {
|
||||||
|
|
||||||
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
|
SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *pData, size_t size,
|
||||||
uint64_t duration) {
|
uint64_t duration) {
|
||||||
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen + 1;
|
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
|
||||||
|
|
||||||
SCacheDataNode *pNewNode = calloc(1, totalSize);
|
SCacheDataNode *pNewNode = calloc(1, totalSize);
|
||||||
if (pNewNode == NULL) {
|
if (pNewNode == NULL) {
|
||||||
|
@ -503,7 +519,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *
|
||||||
|
|
||||||
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
|
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
|
||||||
pNewNode->lifespan = duration;
|
pNewNode->lifespan = duration;
|
||||||
pNewNode->extendFactor = 1;
|
pNewNode->expireTime = pNewNode->addedTime + pNewNode->lifespan;
|
||||||
pNewNode->signature = (uint64_t)pNewNode;
|
pNewNode->signature = (uint64_t)pNewNode;
|
||||||
pNewNode->size = (uint32_t)totalSize;
|
pNewNode->size = (uint32_t)totalSize;
|
||||||
|
|
||||||
|
@ -512,6 +528,7 @@ SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const char *
|
||||||
|
|
||||||
void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
||||||
if (pNode->inTrashCan) { /* node is already in trash */
|
if (pNode->inTrashCan) { /* node is already in trash */
|
||||||
|
assert(pNode->pTNodeHeader != NULL && pNode->pTNodeHeader->pData == pNode);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -527,6 +544,7 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) {
|
||||||
pCacheObj->pTrash = pElem;
|
pCacheObj->pTrash = pElem;
|
||||||
|
|
||||||
pNode->inTrashCan = true;
|
pNode->inTrashCan = true;
|
||||||
|
pNode->pTNodeHeader = pElem;
|
||||||
pCacheObj->numOfElemsInTrash++;
|
pCacheObj->numOfElemsInTrash++;
|
||||||
|
|
||||||
uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
|
uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash);
|
||||||
|
@ -629,7 +647,7 @@ static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t
|
||||||
__cache_wr_lock(pCacheObj);
|
__cache_wr_lock(pCacheObj);
|
||||||
while (taosHashIterNext(pIter)) {
|
while (taosHashIterNext(pIter)) {
|
||||||
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
|
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
|
||||||
if ((pNode->addedTime + pNode->lifespan * pNode->extendFactor) <= time && T_REF_VAL_GET(pNode) <= 0) {
|
if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) {
|
||||||
taosCacheReleaseNode(pCacheObj, pNode);
|
taosCacheReleaseNode(pCacheObj, pNode);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -508,7 +508,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
|
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
|
||||||
|
|
||||||
// release local resources only after cutting off outside connections
|
// release local resources only after cutting off outside connections
|
||||||
qSetQueryMgmtClosed(pVnode->qMgmt);
|
qQueryMgmtNotifyClosed(pVnode->qMgmt);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -82,6 +82,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
||||||
} else {
|
} else {
|
||||||
assert(*qhandle == (void*) killQueryMsg->qhandle);
|
assert(*qhandle == (void*) killQueryMsg->qhandle);
|
||||||
|
qKillQuery(*qhandle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,7 +94,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
if (contLen != 0) {
|
if (contLen != 0) {
|
||||||
qinfo_t pQInfo = NULL;
|
qinfo_t pQInfo = NULL;
|
||||||
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, NULL, &pQInfo);
|
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, pVnode, &pQInfo);
|
||||||
|
|
||||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
|
@ -108,9 +109,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
|
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
|
||||||
if (handle == NULL) { // failed to register qhandle
|
if (handle == NULL) { // failed to register qhandle
|
||||||
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
qDestroyQueryInfo(pQInfo); // destroy it directly
|
||||||
qKillQuery(pQInfo);
|
|
||||||
qKillQuery(pQInfo);
|
|
||||||
} else {
|
} else {
|
||||||
assert(*handle == pQInfo);
|
assert(*handle == pQInfo);
|
||||||
pRsp->qhandle = htobe64((uint64_t) pQInfo);
|
pRsp->qhandle = htobe64((uint64_t) pQInfo);
|
||||||
|
@ -120,10 +119,6 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||||
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
||||||
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
|
|
||||||
// NOTE: there two refcount, needs to kill twice
|
|
||||||
// query has not been put into qhandle pool, kill it directly.
|
|
||||||
qKillQuery(*handle);
|
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
||||||
return pRsp->code;
|
return pRsp->code;
|
||||||
}
|
}
|
||||||
|
@ -134,6 +129,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
dnodePutItemIntoReadQueue(pVnode, *handle);
|
dnodePutItemIntoReadQueue(pVnode, *handle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
|
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
|
||||||
} else {
|
} else {
|
||||||
assert(pCont != NULL);
|
assert(pCont != NULL);
|
||||||
|
@ -183,6 +179,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
if (pRetrieve->free == 1) {
|
if (pRetrieve->free == 1) {
|
||||||
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
||||||
|
qKillQuery(*handle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
||||||
|
|
||||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
|
@ -209,6 +206,9 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
dnodePutItemIntoReadQueue(pVnode, *handle);
|
dnodePutItemIntoReadQueue(pVnode, *handle);
|
||||||
pRet->qhandle = *handle;
|
pRet->qhandle = *handle;
|
||||||
freeHandle = false;
|
freeHandle = false;
|
||||||
|
} else {
|
||||||
|
qKillQuery(*handle);
|
||||||
|
freeHandle = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue