[tbase-1326]
This commit is contained in:
parent
c060be25b9
commit
e03f3364b8
|
@ -206,28 +206,6 @@ typedef struct {
|
||||||
char cont[];
|
char cont[];
|
||||||
} SVMsgHeader;
|
} SVMsgHeader;
|
||||||
|
|
||||||
/*
|
|
||||||
* The value of QInfo.signature is used to denote that a query is executing, it isn't safe to release QInfo yet.
|
|
||||||
* The release operations will be blocked in a busy-waiting until the query operation reach a safepoint.
|
|
||||||
* Then it will reset the signature in a atomic operation, followed by release operation.
|
|
||||||
* Only the QInfo.signature == QInfo, this structure can be released safely.
|
|
||||||
*/
|
|
||||||
#define TSDB_QINFO_QUERY_FLAG 0x1
|
|
||||||
//#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x))
|
|
||||||
#define TSDB_QINFO_RESET_SIG(x)
|
|
||||||
#define TSDB_QINFO_SET_QUERY_FLAG(x)
|
|
||||||
//#define TSDB_QINFO_SET_QUERY_FLAG(x) \
|
|
||||||
// atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG);
|
|
||||||
|
|
||||||
// live lock: wait for query reaching a safe-point, release all resources
|
|
||||||
// belongs to this query
|
|
||||||
#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \
|
|
||||||
// { \
|
|
||||||
// while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \
|
|
||||||
// taosMsleep(1); \
|
|
||||||
// } \
|
|
||||||
// }
|
|
||||||
|
|
||||||
struct tSQLBinaryExpr;
|
struct tSQLBinaryExpr;
|
||||||
|
|
||||||
typedef struct SColumnInfoEx {
|
typedef struct SColumnInfoEx {
|
||||||
|
|
|
@ -1173,15 +1173,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQInfo->killed) {
|
if (pQInfo->killed) {
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
dTrace("QInfo:%p it is already killed, abort", pQInfo);
|
||||||
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pQInfo->refCount >= 1);
|
assert(pQInfo->refCount >= 1);
|
||||||
// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
|
||||||
|
|
||||||
SQuery * pQuery = &pQInfo->query;
|
SQuery * pQuery = &pQInfo->query;
|
||||||
SMeterObj *pMeterObj = pQInfo->pObj;
|
SMeterObj *pMeterObj = pQInfo->pObj;
|
||||||
|
@ -1215,10 +1213,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
|
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
|
||||||
pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||||
|
|
||||||
dTrace("QInfo:%p reset signature", pQInfo);
|
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -1238,10 +1233,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead,
|
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead,
|
||||||
pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
pQInfo->pointsInterpo, pQInfo->pointsReturned);
|
||||||
|
|
||||||
dTrace("QInfo:%p reset signature", pQInfo);
|
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -1250,12 +1242,12 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pQInfo->over = 1;
|
pQInfo->over = 1;
|
||||||
dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned, reset signature", pQInfo,
|
dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo,
|
||||||
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead);
|
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead);
|
||||||
|
|
||||||
vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter);
|
vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter);
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1284,16 +1276,14 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
/* check if query is killed or not */
|
/* check if query is killed or not */
|
||||||
if (isQueryKilled(pQuery)) {
|
if (isQueryKilled(pQuery)) {
|
||||||
dTrace("QInfo:%p query is killed, reset signature", pQInfo);
|
dTrace("QInfo:%p query is killed", pQInfo);
|
||||||
pQInfo->over = 1;
|
pQInfo->over = 1;
|
||||||
} else {
|
} else {
|
||||||
dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned, reset signature",
|
dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned",
|
||||||
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
|
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
|
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1305,14 +1295,12 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQInfo->killed) {
|
if (pQInfo->killed) {
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
dTrace("QInfo:%p it is already killed, abort", pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pQInfo->refCount >= 1);
|
assert(pQInfo->refCount >= 1);
|
||||||
// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
|
||||||
|
|
||||||
SQuery *pQuery = &pQInfo->query;
|
SQuery *pQuery = &pQInfo->query;
|
||||||
pQuery->pointsRead = 0;
|
pQuery->pointsRead = 0;
|
||||||
|
@ -1333,7 +1321,6 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
|
||||||
pQInfo->useconds += (taosGetTimestampUs() - st);
|
pQInfo->useconds += (taosGetTimestampUs() - st);
|
||||||
pQInfo->over = isQueryKilled(pQuery) ? 1 : 0;
|
pQInfo->over = isQueryKilled(pQuery) ? 1 : 0;
|
||||||
|
|
||||||
dTrace("QInfo:%p reset signature", pQInfo);
|
|
||||||
taosInterpoSetStartInfo(&pQInfo->pMeterQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead,
|
taosInterpoSetStartInfo(&pQInfo->pMeterQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead,
|
||||||
pQInfo->query.interpoType);
|
pQInfo->query.interpoType);
|
||||||
|
|
||||||
|
@ -1341,12 +1328,11 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
if (pQuery->pointsRead == 0) {
|
if (pQuery->pointsRead == 0) {
|
||||||
pQInfo->over = 1;
|
pQInfo->over = 1;
|
||||||
dTrace("QInfo:%p over, %d meters queried, %d points are returned, reset signature", pQInfo, pSupporter->numOfMeters,
|
dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters,
|
||||||
pQInfo->pointsRead);
|
pQInfo->pointsRead);
|
||||||
vnodePrintQueryStatistics(pSupporter);
|
vnodePrintQueryStatistics(pSupporter);
|
||||||
}
|
}
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -422,8 +422,6 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) {
|
||||||
if (!vnodeIsQInfoValid(param)) return;
|
if (!vnodeIsQInfoValid(param)) return;
|
||||||
|
|
||||||
pQInfo->killed = 1;
|
pQInfo->killed = 1;
|
||||||
TSDB_WAIT_TO_SAFE_DROP_QINFO(pQInfo);
|
|
||||||
|
|
||||||
SMeterObj *pObj = pQInfo->pObj;
|
SMeterObj *pObj = pQInfo->pObj;
|
||||||
dTrace("QInfo:%p start to free SQInfo", pQInfo);
|
dTrace("QInfo:%p start to free SQInfo", pQInfo);
|
||||||
|
|
||||||
|
@ -502,7 +500,6 @@ bool vnodeIsQInfoValid(void *param) {
|
||||||
* into local variable, then compare by using local variable
|
* into local variable, then compare by using local variable
|
||||||
*/
|
*/
|
||||||
uint64_t sig = pQInfo->signature;
|
uint64_t sig = pQInfo->signature;
|
||||||
// return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG);
|
|
||||||
return (sig == (uint64_t)pQInfo);
|
return (sig == (uint64_t)pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -536,13 +533,11 @@ void vnodeQueryData(SSchedMsg *pMsg) {
|
||||||
pQInfo = (SQInfo *)pMsg->ahandle;
|
pQInfo = (SQInfo *)pMsg->ahandle;
|
||||||
|
|
||||||
if (pQInfo->killed) {
|
if (pQInfo->killed) {
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
dTrace("QInfo:%p it is already killed, abort", pQInfo);
|
||||||
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
|
||||||
pQuery = &(pQInfo->query);
|
pQuery = &(pQInfo->query);
|
||||||
|
|
||||||
SMeterObj *pObj = pQInfo->pObj;
|
SMeterObj *pObj = pQInfo->pObj;
|
||||||
|
@ -610,9 +605,6 @@ void vnodeQueryData(SSchedMsg *pMsg) {
|
||||||
tclose(pQInfo->query.lfd);
|
tclose(pQInfo->query.lfd);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* reset QInfo signature */
|
|
||||||
dTrace("QInfo:%p reset signature", pQInfo);
|
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
vnodeDecRefCount(pQInfo);
|
vnodeDecRefCount(pQInfo);
|
||||||
}
|
}
|
||||||
|
@ -697,9 +689,6 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
||||||
schedMsg.fp = vnodeQueryData;
|
schedMsg.fp = vnodeQueryData;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set in query flag
|
|
||||||
// pQInfo->signature = TSDB_QINFO_QUERY_FLAG;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The reference count, which is 2, is for both the current query thread and the future retrieve request,
|
* The reference count, which is 2, is for both the current query thread and the future retrieve request,
|
||||||
* which will always be issued by client to acquire data or free SQInfo struct.
|
* which will always be issued by client to acquire data or free SQInfo struct.
|
||||||
|
@ -818,7 +807,6 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
||||||
return pQInfo;
|
return pQInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pQInfo->signature = TSDB_QINFO_QUERY_FLAG;
|
|
||||||
vnodeAddRefCount(pQInfo);
|
vnodeAddRefCount(pQInfo);
|
||||||
|
|
||||||
schedMsg.msg = NULL;
|
schedMsg.msg = NULL;
|
||||||
|
@ -900,18 +888,9 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
|
||||||
pQInfo->pointsRead);
|
pQInfo->pointsRead);
|
||||||
|
|
||||||
if (pQInfo->over == 0) {
|
if (pQInfo->over == 0) {
|
||||||
//dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
|
dTrace("QInfo:%p set query flag, sig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
|
||||||
dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature);
|
|
||||||
// uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
|
|
||||||
* since in release function, the original value has been destroyed. However, this memory area may be reused
|
|
||||||
* by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
|
|
||||||
*/
|
|
||||||
// if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) {
|
|
||||||
if (pQInfo->killed == 1) {
|
if (pQInfo->killed == 1) {
|
||||||
// dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature);
|
|
||||||
dTrace("%p freed or killed, abort query", pQInfo);
|
dTrace("%p freed or killed, abort query", pQInfo);
|
||||||
} else {
|
} else {
|
||||||
vnodeAddRefCount(pQInfo);
|
vnodeAddRefCount(pQInfo);
|
||||||
|
|
Loading…
Reference in New Issue