[td-225] fix invalid read for qhandle mgmt
This commit is contained in:
parent
a26c86e826
commit
6f6af85dd5
|
@ -87,8 +87,8 @@ int32_t qKillQuery(qinfo_t qinfo);
|
|||
void* qOpenQueryMgmt(int32_t vgId);
|
||||
void qSetQueryMgmtClosed(void* pExecutor);
|
||||
void qCleanupQueryMgmt(void* pExecutor);
|
||||
void** qRegisterQInfo(void* pMgmt, void* qInfo);
|
||||
void** qAcquireQInfo(void* pMgmt, void** key);
|
||||
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo);
|
||||
void** qAcquireQInfo(void* pMgmt, uint64_t key);
|
||||
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool needFree);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -473,7 +473,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
uint64_t qhandle;
|
||||
uint64_t qhandle; // query handle
|
||||
} SQueryTableRsp;
|
||||
|
||||
typedef struct {
|
||||
|
@ -486,7 +486,7 @@ typedef struct SRetrieveTableRsp {
|
|||
int32_t numOfRows;
|
||||
int8_t completed; // all results are returned to client
|
||||
int16_t precision;
|
||||
int64_t offset; // updated offset value for multi-vnode projection query
|
||||
int64_t offset; // updated offset value for multi-vnode projection query
|
||||
int64_t useconds;
|
||||
char data[];
|
||||
} SRetrieveTableRsp;
|
||||
|
|
|
@ -6517,11 +6517,13 @@ void qCleanupQueryMgmt(void* pQMgmt) {
|
|||
qDebug("vgId:%d querymgmt cleanup completed", vgId);
|
||||
}
|
||||
|
||||
void** qRegisterQInfo(void* pMgmt, void* qInfo) {
|
||||
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||
if (pMgmt == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2;
|
||||
|
||||
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||
if (pQueryMgmt->qinfoPool == NULL) {
|
||||
return NULL;
|
||||
|
@ -6533,21 +6535,23 @@ void** qRegisterQInfo(void* pMgmt, void* qInfo) {
|
|||
|
||||
return NULL;
|
||||
} else {
|
||||
void** handle = taosCachePut(pQueryMgmt->qinfoPool, qInfo, POINTER_BYTES, &qInfo, POINTER_BYTES, tsShellActivityTimer*2);
|
||||
uint64_t handleVal = (uint64_t) qInfo;
|
||||
|
||||
void** handle = taosCachePut(pQueryMgmt->qinfoPool, &handleVal, sizeof(int64_t), &qInfo, POINTER_BYTES, DEFAULT_QHANDLE_LIFE_SPAN);
|
||||
pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||
|
||||
return handle;
|
||||
}
|
||||
}
|
||||
|
||||
void** qAcquireQInfo(void* pMgmt, void** key) {
|
||||
void** qAcquireQInfo(void* pMgmt, uint64_t key) {
|
||||
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||
|
||||
if (pQueryMgmt->qinfoPool == NULL || pQueryMgmt->closed) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, key, POINTER_BYTES);
|
||||
void** handle = taosCacheAcquireByKey(pQueryMgmt->qinfoPool, &key, sizeof(uint64_t));
|
||||
if (handle == NULL || *handle == NULL) {
|
||||
return NULL;
|
||||
} else {
|
||||
|
|
|
@ -61,7 +61,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
|||
}
|
||||
|
||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||
void * pCont = pReadMsg->pCont;
|
||||
void *pCont = pReadMsg->pCont;
|
||||
int32_t contLen = pReadMsg->contLen;
|
||||
SRspRet *pRet = &pReadMsg->rspRet;
|
||||
|
||||
|
@ -74,19 +74,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
killQueryMsg->free = htons(killQueryMsg->free);
|
||||
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
|
||||
|
||||
void* handle = NULL;
|
||||
if ((void**) killQueryMsg->qhandle != NULL) {
|
||||
handle = *(void**) killQueryMsg->qhandle;
|
||||
}
|
||||
|
||||
vWarn("QInfo:%p connection %p broken, kill query", handle, pReadMsg->rpcMsg.handle);
|
||||
vWarn("QInfo:%p connection %p broken, kill query", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
||||
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
|
||||
|
||||
void** qhandle = qAcquireQInfo(pVnode->qMgmt, (void**) killQueryMsg->qhandle);
|
||||
void** qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) killQueryMsg->qhandle);
|
||||
if (qhandle == NULL || *qhandle == NULL) {
|
||||
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void*) killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
||||
} else {
|
||||
assert(qhandle == (void**) killQueryMsg->qhandle);
|
||||
assert(*qhandle == (void*) killQueryMsg->qhandle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &qhandle, true);
|
||||
}
|
||||
|
||||
|
@ -110,7 +105,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
|
||||
// current connect is broken
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
handle = qRegisterQInfo(pVnode->qMgmt, pQInfo);
|
||||
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
|
||||
if (handle == NULL) { // failed to register qhandle
|
||||
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
|
||||
|
@ -118,11 +113,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
qKillQuery(pQInfo);
|
||||
} else {
|
||||
assert(*handle == pQInfo);
|
||||
pRsp->qhandle = htobe64((uint64_t) (handle));
|
||||
pRsp->qhandle = htobe64((uint64_t) pQInfo);
|
||||
}
|
||||
|
||||
pQInfo = NULL;
|
||||
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
if (handle != NULL && vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
||||
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
|
||||
|
@ -136,18 +131,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
assert(pQInfo == NULL);
|
||||
}
|
||||
if (handle != NULL) {
|
||||
dnodePutItemIntoReadQueue(pVnode, handle);
|
||||
dnodePutItemIntoReadQueue(pVnode, *handle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||
}
|
||||
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed", vgId, pQInfo);
|
||||
} else {
|
||||
assert(pCont != NULL);
|
||||
handle = qAcquireQInfo(pVnode->qMgmt, (void**) pCont);
|
||||
handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
|
||||
if (handle == NULL) {
|
||||
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", *(void**) pCont, pReadMsg->rpcMsg.handle);
|
||||
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
|
||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
} else {
|
||||
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, *(void**) pCont);
|
||||
vDebug("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, (void*) pCont);
|
||||
code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
|
||||
qTableQuery(*handle); // do execute query
|
||||
}
|
||||
|
@ -169,10 +164,10 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
memset(pRet, 0, sizeof(SRspRet));
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void** handle = qAcquireQInfo(pVnode->qMgmt, (void**) pRetrieve->qhandle);
|
||||
if (handle == NULL || handle != (void**) pRetrieve->qhandle) {
|
||||
void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
|
||||
if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
|
||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, *(void**) pRetrieve->qhandle);
|
||||
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
|
||||
|
||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
pRet->len = sizeof(SRetrieveTableRsp);
|
||||
|
@ -180,8 +175,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
||||
SRetrieveTableRsp* pRsp = pRet->rsp;
|
||||
pRsp->numOfRows = 0;
|
||||
pRsp->completed = true;
|
||||
pRsp->useconds = 0;
|
||||
pRsp->completed = true;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -211,8 +206,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
} else { // if failed to dump result, free qhandle immediately
|
||||
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
|
||||
if (qHasMoreResultsToRetrieve(*handle)) {
|
||||
dnodePutItemIntoReadQueue(pVnode, handle);
|
||||
pRet->qhandle = handle;
|
||||
dnodePutItemIntoReadQueue(pVnode, *handle);
|
||||
pRet->qhandle = *handle;
|
||||
freeHandle = false;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue