[tbase-1326]
This commit is contained in:
parent
7a74561f6f
commit
1570fadf99
|
@ -213,18 +213,20 @@ typedef struct {
|
||||||
* Only the QInfo.signature == QInfo, this structure can be released safely.
|
* Only the QInfo.signature == QInfo, this structure can be released safely.
|
||||||
*/
|
*/
|
||||||
#define TSDB_QINFO_QUERY_FLAG 0x1
|
#define TSDB_QINFO_QUERY_FLAG 0x1
|
||||||
#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x))
|
//#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x))
|
||||||
#define TSDB_QINFO_SET_QUERY_FLAG(x) \
|
#define TSDB_QINFO_RESET_SIG(x)
|
||||||
atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG);
|
#define TSDB_QINFO_SET_QUERY_FLAG(x)
|
||||||
|
//#define TSDB_QINFO_SET_QUERY_FLAG(x) \
|
||||||
|
// atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG);
|
||||||
|
|
||||||
// live lock: wait for query reaching a safe-point, release all resources
|
// live lock: wait for query reaching a safe-point, release all resources
|
||||||
// belongs to this query
|
// belongs to this query
|
||||||
#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \
|
#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \
|
||||||
{ \
|
// { \
|
||||||
while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \
|
// while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \
|
||||||
taosMsleep(1); \
|
// taosMsleep(1); \
|
||||||
} \
|
// } \
|
||||||
}
|
// }
|
||||||
|
|
||||||
struct tSQLBinaryExpr;
|
struct tSQLBinaryExpr;
|
||||||
|
|
||||||
|
@ -370,6 +372,8 @@ void vnodeFreeQInfo(void *, bool);
|
||||||
void vnodeFreeQInfoInQueue(void *param);
|
void vnodeFreeQInfoInQueue(void *param);
|
||||||
|
|
||||||
bool vnodeIsQInfoValid(void *param);
|
bool vnodeIsQInfoValid(void *param);
|
||||||
|
void vnodeDecRefCount(void *param);
|
||||||
|
void vnodeAddRefCount(void *param);
|
||||||
|
|
||||||
int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery);
|
int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery);
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ typedef struct SQueryLoadCompBlockInfo {
|
||||||
* the header file info for one vnode
|
* the header file info for one vnode
|
||||||
*/
|
*/
|
||||||
typedef struct SHeaderFileInfo {
|
typedef struct SHeaderFileInfo {
|
||||||
int32_t fileID; // file id
|
int32_t fileID; // file id
|
||||||
} SHeaderFileInfo;
|
} SHeaderFileInfo;
|
||||||
|
|
||||||
typedef struct SQueryCostSummary {
|
typedef struct SQueryCostSummary {
|
||||||
|
@ -99,17 +99,17 @@ typedef struct SQueryFilesInfo {
|
||||||
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
|
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
|
||||||
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
|
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
|
||||||
int32_t vnodeId;
|
int32_t vnodeId;
|
||||||
|
|
||||||
int32_t headerFd; // header file fd
|
int32_t headerFd; // header file fd
|
||||||
char* pHeaderFileData; // mmap header files
|
char* pHeaderFileData; // mmap header files
|
||||||
int64_t headFileSize;
|
int64_t headFileSize;
|
||||||
int32_t dataFd;
|
int32_t dataFd;
|
||||||
int32_t lastFd;
|
int32_t lastFd;
|
||||||
|
|
||||||
char headerFilePath[PATH_MAX]; // current opened header file name
|
char headerFilePath[PATH_MAX]; // current opened header file name
|
||||||
char dataFilePath[PATH_MAX]; // current opened data file name
|
char dataFilePath[PATH_MAX]; // current opened data file name
|
||||||
char lastFilePath[PATH_MAX]; // current opened last file path
|
char lastFilePath[PATH_MAX]; // current opened last file path
|
||||||
char dbFilePathPrefix[PATH_MAX];
|
char dbFilePathPrefix[PATH_MAX];
|
||||||
} SQueryFilesInfo;
|
} SQueryFilesInfo;
|
||||||
|
|
||||||
typedef struct RuntimeEnvironment {
|
typedef struct RuntimeEnvironment {
|
||||||
|
@ -129,17 +129,17 @@ typedef struct RuntimeEnvironment {
|
||||||
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
|
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
|
||||||
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
|
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
|
||||||
SQueryFilesInfo vnodeFileInfo;
|
SQueryFilesInfo vnodeFileInfo;
|
||||||
int16_t numOfRowsPerPage;
|
int16_t numOfRowsPerPage;
|
||||||
int16_t offset[TSDB_MAX_COLUMNS];
|
int16_t offset[TSDB_MAX_COLUMNS];
|
||||||
int16_t scanFlag; // denotes reversed scan of data or not
|
int16_t scanFlag; // denotes reversed scan of data or not
|
||||||
SInterpolationInfo interpoInfo;
|
SInterpolationInfo interpoInfo;
|
||||||
SData** pInterpoBuf;
|
SData** pInterpoBuf;
|
||||||
SOutputRes* pResult; // reference to SQuerySupporter->pResult
|
SOutputRes* pResult; // reference to SQuerySupporter->pResult
|
||||||
void* hashList;
|
void* hashList;
|
||||||
int32_t usedIndex; // assigned SOutputRes in list
|
int32_t usedIndex; // assigned SOutputRes in list
|
||||||
STSBuf* pTSBuf;
|
STSBuf* pTSBuf;
|
||||||
STSCursor cur;
|
STSCursor cur;
|
||||||
SQueryCostSummary summary;
|
SQueryCostSummary summary;
|
||||||
} SQueryRuntimeEnv;
|
} SQueryRuntimeEnv;
|
||||||
|
|
||||||
/* intermediate result during multimeter query involves interval */
|
/* intermediate result during multimeter query involves interval */
|
||||||
|
@ -214,14 +214,12 @@ typedef struct SMeterQuerySupportObj {
|
||||||
|
|
||||||
SMeterDataInfo* pMeterDataInfo;
|
SMeterDataInfo* pMeterDataInfo;
|
||||||
|
|
||||||
TSKEY* tsList;
|
TSKEY* tsList;
|
||||||
int32_t tsNum;
|
|
||||||
|
|
||||||
} SMeterQuerySupportObj;
|
} SMeterQuerySupportObj;
|
||||||
|
|
||||||
typedef struct _qinfo {
|
typedef struct _qinfo {
|
||||||
uint64_t signature;
|
uint64_t signature;
|
||||||
|
int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely
|
||||||
char user[TSDB_METER_ID_LEN + 1];
|
char user[TSDB_METER_ID_LEN + 1];
|
||||||
char sql[TSDB_SHOW_SQL_LEN];
|
char sql[TSDB_SHOW_SQL_LEN];
|
||||||
uint8_t stream;
|
uint8_t stream;
|
||||||
|
@ -231,24 +229,21 @@ typedef struct _qinfo {
|
||||||
int64_t useconds;
|
int64_t useconds;
|
||||||
int killed;
|
int killed;
|
||||||
struct _qinfo *prev, *next;
|
struct _qinfo *prev, *next;
|
||||||
|
SQuery query;
|
||||||
|
int num;
|
||||||
|
int totalPoints;
|
||||||
|
int pointsRead;
|
||||||
|
int pointsReturned;
|
||||||
|
int pointsInterpo;
|
||||||
|
int code;
|
||||||
|
char bufIndex;
|
||||||
|
char changed;
|
||||||
|
char over;
|
||||||
|
SMeterObj* pObj;
|
||||||
|
sem_t dataReady;
|
||||||
|
|
||||||
SQuery query;
|
|
||||||
int num;
|
|
||||||
int totalPoints;
|
|
||||||
int pointsRead;
|
|
||||||
int pointsReturned;
|
|
||||||
int pointsInterpo;
|
|
||||||
int code;
|
|
||||||
char bufIndex;
|
|
||||||
char changed;
|
|
||||||
char over;
|
|
||||||
SMeterObj* pObj;
|
|
||||||
|
|
||||||
int (*fp)(SMeterObj*, SQuery*);
|
|
||||||
|
|
||||||
sem_t dataReady;
|
|
||||||
SMeterQuerySupportObj* pMeterQuerySupporter;
|
SMeterQuerySupportObj* pMeterQuerySupporter;
|
||||||
|
int (*fp)(SMeterObj*, SQuery*);
|
||||||
} SQInfo;
|
} SQInfo;
|
||||||
|
|
||||||
int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj,
|
int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj,
|
||||||
|
|
|
@ -890,12 +890,16 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
||||||
|
|
||||||
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead,
|
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead,
|
||||||
pQInfo->pointsReturned);
|
pQInfo->pointsReturned);
|
||||||
|
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters);
|
pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters);
|
||||||
if (pSupporter->pMeterDataInfo == NULL) {
|
if (pSupporter->pMeterDataInfo == NULL) {
|
||||||
dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno));
|
dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno));
|
||||||
|
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -912,6 +916,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
||||||
// failed to save all intermediate results into disk, abort further query processing
|
// failed to save all intermediate results into disk, abort further query processing
|
||||||
if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) {
|
if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) {
|
||||||
dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo);
|
dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -919,6 +924,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
||||||
|
|
||||||
if (isQueryKilled(pQuery)) {
|
if (isQueryKilled(pQuery)) {
|
||||||
dTrace("QInfo:%p query killed, abort", pQInfo);
|
dTrace("QInfo:%p query killed, abort", pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -940,6 +946,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
|
||||||
pQInfo->pointsRead += pQuery->pointsRead;
|
pQInfo->pointsRead += pQuery->pointsRead;
|
||||||
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead,
|
dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead,
|
||||||
pQInfo->pointsReturned);
|
pQInfo->pointsReturned);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1174,10 +1181,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
if (pQInfo->killed) {
|
if (pQInfo->killed) {
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
assert(pQInfo->refCount >= 1);
|
||||||
|
// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
||||||
|
|
||||||
SQuery * pQuery = &pQInfo->query;
|
SQuery * pQuery = &pQInfo->query;
|
||||||
SMeterObj *pMeterObj = pQInfo->pObj;
|
SMeterObj *pMeterObj = pQInfo->pObj;
|
||||||
|
@ -1215,6 +1225,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1237,6 +1248,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1249,7 +1262,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter);
|
vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter);
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1284,8 +1297,10 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
|
||||||
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
|
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
|
||||||
}
|
}
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
|
sem_post(&pQInfo->dataReady);
|
||||||
|
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
|
void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
|
||||||
|
@ -1297,11 +1312,13 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
if (pQInfo->killed) {
|
if (pQInfo->killed) {
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
assert(pQInfo->refCount >= 1);
|
||||||
|
// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
||||||
|
|
||||||
SQuery *pQuery = &pQInfo->query;
|
SQuery *pQuery = &pQInfo->query;
|
||||||
pQuery->pointsRead = 0;
|
pQuery->pointsRead = 0;
|
||||||
|
@ -1337,4 +1354,5 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -392,10 +392,10 @@ __clean_memory:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) {
|
//static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) {
|
||||||
SQInfo *pQInfo = (SQInfo *)pMsg->ahandle;
|
// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle;
|
||||||
vnodeFreeQInfo(pQInfo, true);
|
// vnodeFreeQInfo(pQInfo, true);
|
||||||
}
|
//}
|
||||||
|
|
||||||
void vnodeFreeQInfoInQueue(void *param) {
|
void vnodeFreeQInfoInQueue(void *param) {
|
||||||
SQInfo *pQInfo = (SQInfo *)param;
|
SQInfo *pQInfo = (SQInfo *)param;
|
||||||
|
@ -403,15 +403,18 @@ void vnodeFreeQInfoInQueue(void *param) {
|
||||||
if (!vnodeIsQInfoValid(pQInfo)) return;
|
if (!vnodeIsQInfoValid(pQInfo)) return;
|
||||||
|
|
||||||
pQInfo->killed = 1;
|
pQInfo->killed = 1;
|
||||||
|
dTrace("QInfo:%p set kill flag to free QInfo");
|
||||||
|
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
|
|
||||||
|
// dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo);
|
||||||
|
// SSchedMsg schedMsg = {0};
|
||||||
|
// schedMsg.fp = vnodeFreeQInfoInQueueImpl;
|
||||||
|
|
||||||
dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo);
|
// schedMsg.msg = NULL;
|
||||||
SSchedMsg schedMsg = {0};
|
// schedMsg.thandle = (void *)1;
|
||||||
schedMsg.fp = vnodeFreeQInfoInQueueImpl;
|
// schedMsg.ahandle = param;
|
||||||
|
// taosScheduleTask(queryQhandle, &schedMsg);
|
||||||
schedMsg.msg = NULL;
|
|
||||||
schedMsg.thandle = (void *)1;
|
|
||||||
schedMsg.ahandle = param;
|
|
||||||
taosScheduleTask(queryQhandle, &schedMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeFreeQInfo(void *param, bool decQueryRef) {
|
void vnodeFreeQInfo(void *param, bool decQueryRef) {
|
||||||
|
@ -499,7 +502,30 @@ bool vnodeIsQInfoValid(void *param) {
|
||||||
* into local variable, then compare by using local variable
|
* into local variable, then compare by using local variable
|
||||||
*/
|
*/
|
||||||
uint64_t sig = pQInfo->signature;
|
uint64_t sig = pQInfo->signature;
|
||||||
return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG);
|
// return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG);
|
||||||
|
return (sig == (uint64_t)pQInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeDecRefCount(void *param) {
|
||||||
|
SQInfo *pQInfo = (SQInfo*) param;
|
||||||
|
|
||||||
|
assert(vnodeIsQInfoValid(pQInfo));
|
||||||
|
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pQInfo->refCount, 1);
|
||||||
|
dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref);
|
||||||
|
|
||||||
|
if (ref == 0) {
|
||||||
|
vnodeFreeQInfo(pQInfo, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeAddRefCount(void *param) {
|
||||||
|
SQInfo *pQInfo = (SQInfo*) param;
|
||||||
|
|
||||||
|
assert(vnodeIsQInfoValid(pQInfo));
|
||||||
|
|
||||||
|
int32_t ref = atomic_add_fetch_32(&pQInfo->refCount, 1);
|
||||||
|
dTrace("QInfo:%p add refcount, %d", pQInfo, ref);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeQueryData(SSchedMsg *pMsg) {
|
void vnodeQueryData(SSchedMsg *pMsg) {
|
||||||
|
@ -511,10 +537,11 @@ void vnodeQueryData(SSchedMsg *pMsg) {
|
||||||
if (pQInfo->killed) {
|
if (pQInfo->killed) {
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
|
||||||
pQuery = &(pQInfo->query);
|
pQuery = &(pQInfo->query);
|
||||||
|
|
||||||
SMeterObj *pObj = pQInfo->pObj;
|
SMeterObj *pObj = pQInfo->pObj;
|
||||||
|
@ -586,6 +613,7 @@ void vnodeQueryData(SSchedMsg *pMsg) {
|
||||||
dTrace("QInfo:%p reset signature", pQInfo);
|
dTrace("QInfo:%p reset signature", pQInfo);
|
||||||
TSDB_QINFO_RESET_SIG(pQInfo);
|
TSDB_QINFO_RESET_SIG(pQInfo);
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
|
vnodeDecRefCount(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
|
void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
|
||||||
|
@ -668,14 +696,22 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp
|
||||||
}
|
}
|
||||||
|
|
||||||
// set in query flag
|
// set in query flag
|
||||||
pQInfo->signature = TSDB_QINFO_QUERY_FLAG;
|
// pQInfo->signature = TSDB_QINFO_QUERY_FLAG;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The reference count, which is 2, is for both the current query thread and the future retrieve request,
|
||||||
|
* which will always be issued by client to acquire data or free SQInfo struct.
|
||||||
|
*/
|
||||||
|
vnodeAddRefCount(pQInfo);
|
||||||
|
vnodeAddRefCount(pQInfo);
|
||||||
|
|
||||||
schedMsg.msg = NULL;
|
schedMsg.msg = NULL;
|
||||||
schedMsg.thandle = (void *)1;
|
schedMsg.thandle = (void *)1;
|
||||||
schedMsg.ahandle = pQInfo;
|
schedMsg.ahandle = pQInfo;
|
||||||
|
|
||||||
dTrace("QInfo:%p set query flag and prepare runtime environment completed, wait for schedule", pQInfo);
|
dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
|
||||||
|
pQInfo->refCount);
|
||||||
|
|
||||||
taosScheduleTask(queryQhandle, &schedMsg);
|
taosScheduleTask(queryQhandle, &schedMsg);
|
||||||
return pQInfo;
|
return pQInfo;
|
||||||
|
|
||||||
|
@ -779,8 +815,10 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
||||||
return pQInfo;
|
return pQInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQInfo->signature = TSDB_QINFO_QUERY_FLAG;
|
// pQInfo->signature = TSDB_QINFO_QUERY_FLAG;
|
||||||
|
vnodeAddRefCount(pQInfo);
|
||||||
|
vnodeAddRefCount(pQInfo);
|
||||||
|
|
||||||
schedMsg.msg = NULL;
|
schedMsg.msg = NULL;
|
||||||
schedMsg.thandle = (void *)1;
|
schedMsg.thandle = (void *)1;
|
||||||
schedMsg.ahandle = pQInfo;
|
schedMsg.ahandle = pQInfo;
|
||||||
|
@ -862,19 +900,22 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
|
||||||
if (pQInfo->over == 0) {
|
if (pQInfo->over == 0) {
|
||||||
//dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
|
//dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
|
||||||
dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature);
|
dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature);
|
||||||
uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo);
|
// uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
|
* If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
|
||||||
* since in release function, the original value has been destroyed. However, this memory area may be reused
|
* since in release function, the original value has been destroyed. However, this memory area may be reused
|
||||||
* by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
|
* by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
|
||||||
*/
|
*/
|
||||||
if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) {
|
// if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) {
|
||||||
dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature);
|
if (pQInfo->killed == 1) {
|
||||||
|
// dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature);
|
||||||
|
dTrace("%p freed or killed, abort query", pQInfo);
|
||||||
} else {
|
} else {
|
||||||
|
vnodeAddRefCount(pQInfo);
|
||||||
dTrace("%p add query into task queue for schedule", pQInfo);
|
dTrace("%p add query into task queue for schedule", pQInfo);
|
||||||
|
|
||||||
SSchedMsg schedMsg;
|
SSchedMsg schedMsg = {0};
|
||||||
|
|
||||||
if (pQInfo->pMeterQuerySupporter != NULL) {
|
if (pQInfo->pMeterQuerySupporter != NULL) {
|
||||||
if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) {
|
if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) {
|
||||||
|
|
|
@ -470,9 +470,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
|
||||||
pMsg += size;
|
pMsg += size;
|
||||||
msgLen = pMsg - pStart;
|
msgLen = pMsg - pStart;
|
||||||
|
|
||||||
|
assert(code != TSDB_CODE_ACTION_IN_PROGRESS);
|
||||||
|
|
||||||
if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
|
if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
|
||||||
dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
|
dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
|
||||||
vnodeFreeQInfo(pObj->qhandle, true);
|
vnodeDecRefCount(pObj->qhandle);
|
||||||
pObj->qhandle = NULL;
|
pObj->qhandle = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue