[td-225] fix bug in hash and queryhandle management.
This commit is contained in:
parent
4222cfcda6
commit
0ce566550d
|
@ -76,6 +76,9 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo);
|
|||
*/
|
||||
int32_t qKillQuery(qinfo_t qinfo);
|
||||
|
||||
int32_t qQueryCompleted(qinfo_t qinfo);
|
||||
|
||||
|
||||
/**
|
||||
* destroy query info structure
|
||||
* @param qHandle
|
||||
|
|
|
@ -6432,34 +6432,6 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
|
|||
return code;
|
||||
}
|
||||
|
||||
bool qHasMoreResultsToRetrieve(qinfo_t qinfo) {
|
||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||
|
||||
if (!isValidQInfo(pQInfo) || pQInfo->code != TSDB_CODE_SUCCESS) {
|
||||
qDebug("QInfo:%p invalid qhandle or error occurs, abort query, code:%x", pQInfo, pQInfo->code);
|
||||
return false;
|
||||
}
|
||||
|
||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
|
||||
bool ret = false;
|
||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
|
||||
ret = false;
|
||||
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) {
|
||||
ret = true;
|
||||
} else if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||
ret = true;
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
|
||||
if (ret) {
|
||||
qDebug("QInfo:%p has more results waits for client retrieve", pQInfo);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *contLen, bool* continueExec) {
|
||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||
|
||||
|
@ -6487,11 +6459,11 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
|
|||
|
||||
int32_t code = pQInfo->code;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
(*pRsp)->offset = htobe64(pQuery->limit.offset);
|
||||
(*pRsp)->offset = htobe64(pQuery->limit.offset);
|
||||
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
|
||||
} else {
|
||||
(*pRsp)->useconds = 0;
|
||||
(*pRsp)->offset = 0;
|
||||
(*pRsp)->offset = 0;
|
||||
(*pRsp)->useconds = htobe64(pRuntimeEnv->summary.elapsedTime);
|
||||
}
|
||||
|
||||
(*pRsp)->precision = htons(pQuery->precision);
|
||||
|
@ -6503,22 +6475,30 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
|
|||
}
|
||||
|
||||
pQInfo->rspContext = NULL;
|
||||
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
|
||||
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
|
||||
|
||||
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
|
||||
(*pRsp)->completed = 1; // notify no more result to client
|
||||
}
|
||||
|
||||
if (qHasMoreResultsToRetrieve(pQInfo)) {
|
||||
*continueExec = true;
|
||||
} else { // failed to dump result, free qhandle immediately
|
||||
*continueExec = false;
|
||||
qKillQuery(pQInfo);
|
||||
(*pRsp)->completed = 1; // notify no more result to client
|
||||
} else {
|
||||
*continueExec = true;
|
||||
qDebug("QInfo:%p has more results waits for client retrieve", pQInfo);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t qQueryCompleted(qinfo_t qinfo) {
|
||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||
|
||||
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
|
||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
return IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER);
|
||||
}
|
||||
|
||||
int32_t qKillQuery(qinfo_t qinfo) {
|
||||
SQInfo *pQInfo = (SQInfo *)qinfo;
|
||||
|
||||
|
|
|
@ -353,16 +353,9 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
|
|||
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
|
||||
SHashEntry *pe = pHashObj->hashList[slot];
|
||||
|
||||
if (pe->num == 0) {
|
||||
assert(pe->next == NULL);
|
||||
} else {
|
||||
assert(pe->next != NULL);
|
||||
}
|
||||
|
||||
// no data, return directly
|
||||
if (pe->num == 0) {
|
||||
assert(pe->next == NULL);
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
return -1;
|
||||
}
|
||||
|
@ -371,6 +364,21 @@ int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLe
|
|||
taosWLockLatch(&pe->latch);
|
||||
}
|
||||
|
||||
if (pe->num == 0) {
|
||||
assert(pe->next == NULL);
|
||||
} else {
|
||||
assert(pe->next != NULL);
|
||||
}
|
||||
|
||||
// double check after locked
|
||||
if (pe->num == 0) {
|
||||
assert(pe->next == NULL);
|
||||
taosWUnLockLatch(&pe->latch);
|
||||
|
||||
__rd_unlock(&pHashObj->lock, pHashObj->type);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SHashNode *pNode = pe->next;
|
||||
SHashNode *pRes = NULL;
|
||||
|
||||
|
|
|
@ -438,8 +438,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
char* key = pNode->key;
|
||||
char* d = pNode->data;
|
||||
|
||||
int32_t ref = T_REF_DEC(pNode);
|
||||
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, key, d, ref);
|
||||
int32_t ref = T_REF_VAL_GET(pNode);
|
||||
uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, key, d, ref - 1);
|
||||
|
||||
/*
|
||||
* If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users
|
||||
|
@ -449,6 +449,8 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
* that tries to do the same thing.
|
||||
*/
|
||||
if (inTrashCan) {
|
||||
ref = T_REF_DEC(pNode);
|
||||
|
||||
if (ref == 0) {
|
||||
assert(pNode->pTNodeHeader->pData == pNode);
|
||||
|
||||
|
@ -459,7 +461,10 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
|||
doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader);
|
||||
}
|
||||
} else {
|
||||
// NOTE: remove it from hash in the first place, otherwise, the pNode may have been released by other thread
|
||||
// when reaches here.
|
||||
int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
|
||||
ref = T_REF_DEC(pNode);
|
||||
|
||||
// successfully remove from hash table, if failed, this node must have been move to trash already, do nothing.
|
||||
// note that the remove operation can be executed only once.
|
||||
|
|
|
@ -66,7 +66,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
|||
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
|
||||
}
|
||||
|
||||
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
|
||||
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
|
||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||
pRead->pCont = qhandle;
|
||||
|
@ -75,22 +75,22 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
|
|||
|
||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
|
||||
vDebug("QInfo:%p add to query task queue for exec, msg:%p", qhandle, pRead);
|
||||
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
|
||||
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
|
||||
}
|
||||
|
||||
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) {
|
||||
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, bool* freeHandle) {
|
||||
bool continueExec = false;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
|
||||
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
|
||||
if (continueExec) {
|
||||
*freeHandle = false;
|
||||
vnodePutItemIntoReadQueue(pVnode, handle);
|
||||
pRet->qhandle = handle;
|
||||
pRet->qhandle = *handle;
|
||||
} else {
|
||||
*freeHandle = true;
|
||||
vDebug("QInfo:%p exec completed, free handle:%d", handle, *freeHandle);
|
||||
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
|
||||
}
|
||||
} else {
|
||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
|
@ -181,50 +181,45 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
|
||||
if (handle != NULL) {
|
||||
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
|
||||
vnodePutItemIntoReadQueue(pVnode, *handle);
|
||||
vnodePutItemIntoReadQueue(pVnode, handle);
|
||||
}
|
||||
} else {
|
||||
assert(pCont != NULL);
|
||||
void** qhandle = (void**) pCont;
|
||||
// *handle = /*(void*) */pCont;
|
||||
|
||||
handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
|
||||
if (handle == NULL) {
|
||||
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
|
||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
} else {
|
||||
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, (void*) pCont);
|
||||
// handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
|
||||
// if (handle == NULL) {
|
||||
// vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
|
||||
// code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
// } else {
|
||||
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
|
||||
|
||||
bool freehandle = false;
|
||||
bool buildRes = qTableQuery(*handle); // do execute query
|
||||
bool buildRes = qTableQuery(*qhandle); // do execute query
|
||||
|
||||
// build query rsp, the retrieve request has reached here already
|
||||
if (buildRes) {
|
||||
// update the connection info according to the retrieve connection
|
||||
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
|
||||
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*qhandle);
|
||||
assert(pReadMsg->rpcMsg.handle != NULL);
|
||||
|
||||
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *handle,
|
||||
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
|
||||
pReadMsg->rpcMsg.handle);
|
||||
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
|
||||
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle);
|
||||
|
||||
// todo test the error code case
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_QRY_HAS_RSP;
|
||||
}
|
||||
} else {
|
||||
freehandle = qQueryCompleted(*qhandle);
|
||||
}
|
||||
|
||||
// If retrieval request has not arrived, release the qhandle and decrease the reference count to allow
|
||||
// the queryMgmt to free it when expired
|
||||
void** dup = handle;
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||
|
||||
// NOTE:
|
||||
// if the qhandle is put into query vread queue and wait to be executed by worker in read queue,
|
||||
// the reference count of qhandle can not be decreased. Otherwise, qhandle may be released before or in the
|
||||
// procedure of query execution
|
||||
if (freehandle) {
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&dup, freehandle);
|
||||
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
|
||||
if (freehandle || (!buildRes)) {
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -269,7 +264,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
//TODO handle malloc failure
|
||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
|
||||
} else { // result is not ready, return immediately
|
||||
if (!buildRes) {
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||
|
@ -277,12 +272,12 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
|||
}
|
||||
|
||||
void** dup = handle;
|
||||
code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
|
||||
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||
|
||||
// not added into task queue, free it immediate
|
||||
// not added into task queue, the query must be completed already, free qhandle immediate
|
||||
if (freeHandle) {
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &dup, freeHandle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void**) &dup, true);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue