From 1570fadf99cb6fb76c8d059f8237cd0b7b7a1b2c Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 11 Dec 2019 11:55:22 +0800 Subject: [PATCH 1/6] [tbase-1326] --- src/system/detail/inc/vnode.h | 20 +++-- src/system/detail/inc/vnodeRead.h | 83 ++++++++++----------- src/system/detail/src/vnodeQueryProcess.c | 26 ++++++- src/system/detail/src/vnodeRead.c | 91 ++++++++++++++++------- src/system/detail/src/vnodeShell.c | 4 +- 5 files changed, 142 insertions(+), 82 deletions(-) diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 6e366b9b7f..4ac0848427 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -213,18 +213,20 @@ typedef struct { * Only the QInfo.signature == QInfo, this structure can be released safely. */ #define TSDB_QINFO_QUERY_FLAG 0x1 -#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x)) -#define TSDB_QINFO_SET_QUERY_FLAG(x) \ - atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); +//#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x)) +#define TSDB_QINFO_RESET_SIG(x) +#define TSDB_QINFO_SET_QUERY_FLAG(x) +//#define TSDB_QINFO_SET_QUERY_FLAG(x) \ +// atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); // live lock: wait for query reaching a safe-point, release all resources // belongs to this query #define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \ - { \ - while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ - taosMsleep(1); \ - } \ - } +// { \ +// while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ +// taosMsleep(1); \ +// } \ +// } struct tSQLBinaryExpr; @@ -370,6 +372,8 @@ void vnodeFreeQInfo(void *, bool); void vnodeFreeQInfoInQueue(void *param); bool vnodeIsQInfoValid(void *param); +void vnodeDecRefCount(void *param); +void vnodeAddRefCount(void *param); int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery); diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index d059075142..3a1b6f7949 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -52,7 +52,7 @@ typedef struct SQueryLoadCompBlockInfo { * the header file info for one vnode */ typedef struct SHeaderFileInfo { - int32_t fileID; // file id + int32_t fileID; // file id } SHeaderFileInfo; typedef struct SQueryCostSummary { @@ -99,17 +99,17 @@ typedef struct SQueryFilesInfo { uint32_t numOfFiles; // the total available number of files for this virtual node during query execution int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap. int32_t vnodeId; - - int32_t headerFd; // header file fd - char* pHeaderFileData; // mmap header files - int64_t headFileSize; - int32_t dataFd; - int32_t lastFd; - - char headerFilePath[PATH_MAX]; // current opened header file name - char dataFilePath[PATH_MAX]; // current opened data file name - char lastFilePath[PATH_MAX]; // current opened last file path - char dbFilePathPrefix[PATH_MAX]; + + int32_t headerFd; // header file fd + char* pHeaderFileData; // mmap header files + int64_t headFileSize; + int32_t dataFd; + int32_t lastFd; + + char headerFilePath[PATH_MAX]; // current opened header file name + char dataFilePath[PATH_MAX]; // current opened data file name + char lastFilePath[PATH_MAX]; // current opened last file path + char dbFilePathPrefix[PATH_MAX]; } SQueryFilesInfo; typedef struct RuntimeEnvironment { @@ -129,17 +129,17 @@ typedef struct RuntimeEnvironment { SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ SQueryFilesInfo vnodeFileInfo; - int16_t numOfRowsPerPage; - int16_t offset[TSDB_MAX_COLUMNS]; - int16_t scanFlag; // denotes reversed scan of data or not - SInterpolationInfo interpoInfo; - SData** pInterpoBuf; - SOutputRes* pResult; // reference to SQuerySupporter->pResult - void* hashList; - int32_t usedIndex; // assigned SOutputRes in list - STSBuf* pTSBuf; - STSCursor cur; - SQueryCostSummary summary; + int16_t numOfRowsPerPage; + int16_t offset[TSDB_MAX_COLUMNS]; + int16_t scanFlag; // denotes reversed scan of data or not + SInterpolationInfo interpoInfo; + SData** pInterpoBuf; + SOutputRes* pResult; // reference to SQuerySupporter->pResult + void* hashList; + int32_t usedIndex; // assigned SOutputRes in list + STSBuf* pTSBuf; + STSCursor cur; + SQueryCostSummary summary; } SQueryRuntimeEnv; /* intermediate result during multimeter query involves interval */ @@ -214,14 +214,12 @@ typedef struct SMeterQuerySupportObj { SMeterDataInfo* pMeterDataInfo; - TSKEY* tsList; - int32_t tsNum; - + TSKEY* tsList; } SMeterQuerySupportObj; typedef struct _qinfo { - uint64_t signature; - + uint64_t signature; + int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely char user[TSDB_METER_ID_LEN + 1]; char sql[TSDB_SHOW_SQL_LEN]; uint8_t stream; @@ -231,24 +229,21 @@ typedef struct _qinfo { int64_t useconds; int killed; struct _qinfo *prev, *next; + SQuery query; + int num; + int totalPoints; + int pointsRead; + int pointsReturned; + int pointsInterpo; + int code; + char bufIndex; + char changed; + char over; + SMeterObj* pObj; + sem_t dataReady; - SQuery query; - int num; - int totalPoints; - int pointsRead; - int pointsReturned; - int pointsInterpo; - int code; - char bufIndex; - char changed; - char over; - SMeterObj* pObj; - - int (*fp)(SMeterObj*, SQuery*); - - sem_t dataReady; SMeterQuerySupportObj* pMeterQuerySupporter; - + int (*fp)(SMeterObj*, SQuery*); } SQInfo; int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj, diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 6fa0891baa..f39ca0dea1 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -890,12 +890,16 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); + + vnodeDecRefCount(pQInfo); return; } pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters); if (pSupporter->pMeterDataInfo == NULL) { dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; + vnodeDecRefCount(pQInfo); return; } @@ -912,6 +916,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { // failed to save all intermediate results into disk, abort further query processing if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) { dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo); + vnodeDecRefCount(pQInfo); return; } @@ -919,6 +924,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query killed, abort", pQInfo); + vnodeDecRefCount(pQInfo); return; } @@ -940,6 +946,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { pQInfo->pointsRead += pQuery->pointsRead; dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); + vnodeDecRefCount(pQInfo); } /* @@ -1174,10 +1181,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { if (pQInfo->killed) { TSDB_QINFO_RESET_SIG(pQInfo); dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + vnodeDecRefCount(pQInfo); + return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); + assert(pQInfo->refCount >= 1); +// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; @@ -1215,6 +1225,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); return; } @@ -1237,6 +1248,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); + return; } } @@ -1249,7 +1262,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter); sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); - + vnodeDecRefCount(pQInfo); return; } @@ -1284,8 +1297,10 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); } - sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); + sem_post(&pQInfo->dataReady); + + vnodeDecRefCount(pQInfo); } void vnodeMultiMeterQuery(SSchedMsg *pMsg) { @@ -1297,11 +1312,13 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { if (pQInfo->killed) { TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); + assert(pQInfo->refCount >= 1); +// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); SQuery *pQuery = &pQInfo->query; pQuery->pointsRead = 0; @@ -1337,4 +1354,5 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); } diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 21e83c5198..2ea41d9d58 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -392,10 +392,10 @@ __clean_memory: return NULL; } -static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) { - SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; - vnodeFreeQInfo(pQInfo, true); -} +//static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) { +// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; +// vnodeFreeQInfo(pQInfo, true); +//} void vnodeFreeQInfoInQueue(void *param) { SQInfo *pQInfo = (SQInfo *)param; @@ -403,15 +403,18 @@ void vnodeFreeQInfoInQueue(void *param) { if (!vnodeIsQInfoValid(pQInfo)) return; pQInfo->killed = 1; + dTrace("QInfo:%p set kill flag to free QInfo"); + + vnodeDecRefCount(pQInfo); + +// dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo); +// SSchedMsg schedMsg = {0}; +// schedMsg.fp = vnodeFreeQInfoInQueueImpl; - dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo); - SSchedMsg schedMsg = {0}; - schedMsg.fp = vnodeFreeQInfoInQueueImpl; - - schedMsg.msg = NULL; - schedMsg.thandle = (void *)1; - schedMsg.ahandle = param; - taosScheduleTask(queryQhandle, &schedMsg); +// schedMsg.msg = NULL; +// schedMsg.thandle = (void *)1; +// schedMsg.ahandle = param; +// taosScheduleTask(queryQhandle, &schedMsg); } void vnodeFreeQInfo(void *param, bool decQueryRef) { @@ -499,7 +502,30 @@ bool vnodeIsQInfoValid(void *param) { * into local variable, then compare by using local variable */ uint64_t sig = pQInfo->signature; - return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG); +// return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG); + return (sig == (uint64_t)pQInfo); +} + +void vnodeDecRefCount(void *param) { + SQInfo *pQInfo = (SQInfo*) param; + + assert(vnodeIsQInfoValid(pQInfo)); + + int32_t ref = atomic_sub_fetch_32(&pQInfo->refCount, 1); + dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref); + + if (ref == 0) { + vnodeFreeQInfo(pQInfo, true); + } +} + +void vnodeAddRefCount(void *param) { + SQInfo *pQInfo = (SQInfo*) param; + + assert(vnodeIsQInfoValid(pQInfo)); + + int32_t ref = atomic_add_fetch_32(&pQInfo->refCount, 1); + dTrace("QInfo:%p add refcount, %d", pQInfo, ref); } void vnodeQueryData(SSchedMsg *pMsg) { @@ -511,10 +537,11 @@ void vnodeQueryData(SSchedMsg *pMsg) { if (pQInfo->killed) { TSDB_QINFO_RESET_SIG(pQInfo); dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + vnodeDecRefCount(pQInfo); return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); +// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); pQuery = &(pQInfo->query); SMeterObj *pObj = pQInfo->pObj; @@ -586,6 +613,7 @@ void vnodeQueryData(SSchedMsg *pMsg) { dTrace("QInfo:%p reset signature", pQInfo); TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); + vnodeDecRefCount(pQInfo); } void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, @@ -668,14 +696,22 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp } // set in query flag - pQInfo->signature = TSDB_QINFO_QUERY_FLAG; - +// pQInfo->signature = TSDB_QINFO_QUERY_FLAG; + + /* + * The reference count, which is 2, is for both the current query thread and the future retrieve request, + * which will always be issued by client to acquire data or free SQInfo struct. + */ + vnodeAddRefCount(pQInfo); + vnodeAddRefCount(pQInfo); + schedMsg.msg = NULL; schedMsg.thandle = (void *)1; schedMsg.ahandle = pQInfo; - dTrace("QInfo:%p set query flag and prepare runtime environment completed, wait for schedule", pQInfo); - + dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, + pQInfo->refCount); + taosScheduleTask(queryQhandle, &schedMsg); return pQInfo; @@ -779,8 +815,10 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE return pQInfo; } - pQInfo->signature = TSDB_QINFO_QUERY_FLAG; - +// pQInfo->signature = TSDB_QINFO_QUERY_FLAG; + vnodeAddRefCount(pQInfo); + vnodeAddRefCount(pQInfo); + schedMsg.msg = NULL; schedMsg.thandle = (void *)1; schedMsg.ahandle = pQInfo; @@ -862,19 +900,22 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { if (pQInfo->over == 0) { //dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__); dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature); - uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo); +// uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo); /* * If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo, * since in release function, the original value has been destroyed. However, this memory area may be reused * by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo. */ - if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) { - dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature); +// if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) { + if (pQInfo->killed == 1) { +// dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature); + dTrace("%p freed or killed, abort query", pQInfo); } else { + vnodeAddRefCount(pQInfo); dTrace("%p add query into task queue for schedule", pQInfo); - - SSchedMsg schedMsg; + + SSchedMsg schedMsg = {0}; if (pQInfo->pMeterQuerySupporter != NULL) { if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) { diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index cfdd956768..6ff3f4d27c 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -470,9 +470,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { pMsg += size; msgLen = pMsg - pStart; + assert(code != TSDB_CODE_ACTION_IN_PROGRESS); + if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); - vnodeFreeQInfo(pObj->qhandle, true); + vnodeDecRefCount(pObj->qhandle); pObj->qhandle = NULL; } From 992a449ed6167c3144b990accd4626d415def36d Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 11 Dec 2019 15:30:54 +0800 Subject: [PATCH 2/6] [tbase-1327] --- src/client/src/tscServer.c | 18 ++++++++---------- src/system/detail/src/mgmtMeter.c | 7 +++++-- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 17d870fcb2..e10865b642 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1507,7 +1507,6 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pQueryMsg->uid = pMeterMeta->uid; pQueryMsg->numOfTagsCols = 0; } else { // query on metric - SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; if (pMeterMetaInfo->vnodeIndex < 0) { tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex); return -1; @@ -2872,15 +2871,14 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) { pElem->groupbyTagColumnList = htonl(offset); for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) { SColIndexEx *pCol = &pCmd->groupbyExpr.columnInfo[j]; - - *((int16_t *)pMsg) = pCol->colId; - pMsg += sizeof(pCol->colId); - - *((int16_t *)pMsg) += pCol->colIdx; - pMsg += sizeof(pCol->colIdx); - - *((int16_t *)pMsg) += pCol->flag; - pMsg += sizeof(pCol->flag); + SColIndexEx* pDestCol = (SColIndexEx*) pMsg; + + pDestCol->colIdxInBuf = 0; + pDestCol->colIdx = htons(pCol->colIdx); + pDestCol->colId = htons(pDestCol->colId); + pDestCol->flag = htons(pDestCol->flag); + + pMsg += sizeof(SColIndexEx); } } } diff --git a/src/system/detail/src/mgmtMeter.c b/src/system/detail/src/mgmtMeter.c index 5ef9f61b0b..be141f278d 100644 --- a/src/system/detail/src/mgmtMeter.c +++ b/src/system/detail/src/mgmtMeter.c @@ -1094,9 +1094,12 @@ static SMetricMetaElemMsg *doConvertMetricMetaMsg(SMetricMetaMsg *pMetricMetaMsg pElem->groupbyTagColumnList = htonl(pElem->groupbyTagColumnList); - int16_t *groupColIds = (int16_t*) (((char *)pMetricMetaMsg) + pElem->groupbyTagColumnList); + SColIndexEx *groupColIds = (SColIndexEx*) (((char *)pMetricMetaMsg) + pElem->groupbyTagColumnList); for (int32_t i = 0; i < pElem->numOfGroupCols; ++i) { - groupColIds[i] = htons(groupColIds[i]); + groupColIds[i].colId = htons(groupColIds[i].colId); + groupColIds[i].colIdx = htons(groupColIds[i].colIdx); + groupColIds[i].flag = htons(groupColIds[i].flag); + groupColIds[i].colIdxInBuf = 0; } return pElem; From 38d377b421fa99a06de872fa48c4abda9f24ef3c Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 11 Dec 2019 16:49:46 +0800 Subject: [PATCH 3/6] [tbase-1328] --- src/system/detail/src/mgmtSupertableQuery.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/system/detail/src/mgmtSupertableQuery.c b/src/system/detail/src/mgmtSupertableQuery.c index b56173f14f..10a64408ce 100644 --- a/src/system/detail/src/mgmtSupertableQuery.c +++ b/src/system/detail/src/mgmtSupertableQuery.c @@ -784,14 +784,15 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer } // todo refactor!!!!! -static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, void* param) { +static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len, char* param) { if (offset == TSDB_TBNAME_COLUMN_INDEX) { extractMeterName(pMeter->meterId, param); - return param; } else { - char* tags = pMeter->pTagData + TSDB_METER_ID_LEN; // tag start position - return (tags + offset); + char* tags = pMeter->pTagData + offset + TSDB_METER_ID_LEN; // tag start position + memcpy(param, tags, len); // make sure the value is null-terminated string } + + return param; } bool tSkipListNodeFilterCallback(const void* pNode, void* param) { @@ -799,8 +800,9 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { tQueryInfo* pInfo = (tQueryInfo*)param; STabObj* pMeter = (STabObj*)(((tSkipListNode*)pNode)->pData); - char name[TSDB_METER_NAME_LEN + 1] = {0}; - char* val = getTagValueFromMeter(pMeter, pInfo->offset, name); + char buf[TSDB_MAX_TAGS_LEN] = {0}; + + char* val = getTagValueFromMeter(pMeter, pInfo->offset, pInfo->sch.bytes, buf); int8_t type = pInfo->sch.type; int32_t ret = 0; From 8636bc95aaf7aac1596abfdc8d45c8b6da4b250d Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 11 Dec 2019 20:23:34 +0800 Subject: [PATCH 4/6] [tbase-1326] --- src/client/src/tscJoinProcess.c | 3 +-- src/system/detail/inc/vnode.h | 2 +- src/system/detail/src/vnodeQueryProcess.c | 6 ------ src/system/detail/src/vnodeRead.c | 8 +++++--- src/system/detail/src/vnodeShell.c | 2 +- 5 files changed, 8 insertions(+), 13 deletions(-) diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 375e0066b1..a94b308e87 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -528,8 +528,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { numOfFetch++; } } else { - if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && - tscProjectionQueryOnTable(&pSql->cmd)) || (pRes->numOfRows == 0)) { + if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i]))) { numOfFetch++; } } diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 4ac0848427..37e31803d0 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -339,7 +339,7 @@ extern void * vnodeTmrCtrl; // read API extern int (*vnodeSearchKeyFunc[])(char *pValue, int num, TSKEY key, int order); -void *vnodeQueryInTimeRange(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs, +void *vnodeQueryOnSingleTable(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs, SQueryMeterMsg *pQueryMsg, int *code); void *vnodeQueryOnMultiMeters(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index f39ca0dea1..231b47206e 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -890,8 +890,6 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); - - vnodeDecRefCount(pQInfo); return; } @@ -899,7 +897,6 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { if (pSupporter->pMeterDataInfo == NULL) { dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; - vnodeDecRefCount(pQInfo); return; } @@ -916,7 +913,6 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { // failed to save all intermediate results into disk, abort further query processing if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) { dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo); - vnodeDecRefCount(pQInfo); return; } @@ -924,7 +920,6 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query killed, abort", pQInfo); - vnodeDecRefCount(pQInfo); return; } @@ -946,7 +941,6 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { pQInfo->pointsRead += pQuery->pointsRead; dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); - vnodeDecRefCount(pQInfo); } /* diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 2ea41d9d58..5640c6bc0d 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -512,8 +512,9 @@ void vnodeDecRefCount(void *param) { assert(vnodeIsQInfoValid(pQInfo)); int32_t ref = atomic_sub_fetch_32(&pQInfo->refCount, 1); - dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref); + assert(ref >= 0); + dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref); if (ref == 0) { vnodeFreeQInfo(pQInfo, true); } @@ -616,7 +617,7 @@ void vnodeQueryData(SSchedMsg *pMsg) { vnodeDecRefCount(pQInfo); } -void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, +void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, SQueryMeterMsg *pQueryMsg, int32_t *code) { SQInfo *pQInfo; SQuery *pQuery; @@ -687,6 +688,7 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp } if (pQInfo->over == 1) { + vnodeAddRefCount(pQInfo); // for retrieve procedure return pQInfo; } @@ -811,13 +813,13 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE goto _error; } + vnodeAddRefCount(pQInfo); if (pQInfo->over == 1) { return pQInfo; } // pQInfo->signature = TSDB_QINFO_QUERY_FLAG; vnodeAddRefCount(pQInfo); - vnodeAddRefCount(pQInfo); schedMsg.msg = NULL; schedMsg.thandle = (void *)1; diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index 6ff3f4d27c..5efbb41014 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -376,7 +376,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { if (QUERY_IS_STABLE_QUERY(pQueryMsg->queryType)) { pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); } else { - pObj->qhandle = vnodeQueryInTimeRange(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); + pObj->qhandle = vnodeQueryOnSingleTable(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); } _query_over: From c060be25b937f946eabcfceaac6342e022398733 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 11 Dec 2019 21:02:32 +0800 Subject: [PATCH 5/6] [tbase-1327] --- src/client/src/tscServer.c | 6 ++++++ src/client/src/tscUtil.c | 8 +++----- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e10865b642..40399d85b7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1290,6 +1290,12 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query pNew->cmd.type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; + assert(pNew->cmd.numOfTables == 1); + + //launch subquery for each vnode, so the subquery index equals to the vnodeIndex. + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); + pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex; + pSql->pSubs[trsupport->subqueryIndex] = pNew; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9876dad906..4b0df76784 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1706,12 +1706,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } pNew->fp = fp; - pNew->param = param; - SMeterMetaInfo* pMetermetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); - + char key[TSDB_MAX_TAGS_LEN + 1] = {0}; - tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); + tscGetMetricMetaCacheKey(pCmd, key, uid); #ifdef _DEBUG_VIEW printf("the metricmeta key is:%s\n", key); @@ -1736,7 +1734,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } assert(pFinalInfo->pMeterMeta != NULL); - if (UTIL_METER_IS_METRIC(pMetermetaInfo)) { + if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { assert(pFinalInfo->pMetricMeta != NULL); } From e03f3364b80e7b7ba7a4b1479390216c826df6d2 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 11 Dec 2019 21:03:52 +0800 Subject: [PATCH 6/6] [tbase-1326] --- src/system/detail/inc/vnode.h | 22 ------------------ src/system/detail/src/vnodeQueryProcess.c | 28 ++++++----------------- src/system/detail/src/vnodeRead.c | 25 ++------------------ 3 files changed, 9 insertions(+), 66 deletions(-) diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 37e31803d0..3a5e7e7260 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -206,28 +206,6 @@ typedef struct { char cont[]; } SVMsgHeader; -/* - * The value of QInfo.signature is used to denote that a query is executing, it isn't safe to release QInfo yet. - * The release operations will be blocked in a busy-waiting until the query operation reach a safepoint. - * Then it will reset the signature in a atomic operation, followed by release operation. - * Only the QInfo.signature == QInfo, this structure can be released safely. - */ -#define TSDB_QINFO_QUERY_FLAG 0x1 -//#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x)) -#define TSDB_QINFO_RESET_SIG(x) -#define TSDB_QINFO_SET_QUERY_FLAG(x) -//#define TSDB_QINFO_SET_QUERY_FLAG(x) \ -// atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); - -// live lock: wait for query reaching a safe-point, release all resources -// belongs to this query -#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \ -// { \ -// while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ -// taosMsleep(1); \ -// } \ -// } - struct tSQLBinaryExpr; typedef struct SColumnInfoEx { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 231b47206e..a545645a09 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -1173,15 +1173,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { } if (pQInfo->killed) { - TSDB_QINFO_RESET_SIG(pQInfo); - dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + dTrace("QInfo:%p it is already killed, abort", pQInfo); vnodeDecRefCount(pQInfo); return; } assert(pQInfo->refCount >= 1); -// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; @@ -1215,10 +1213,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); - dTrace("QInfo:%p reset signature", pQInfo); - sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo); return; @@ -1238,10 +1233,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); - dTrace("QInfo:%p reset signature", pQInfo); - sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo); return; @@ -1250,12 +1242,12 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { } pQInfo->over = 1; - dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned, reset signature", pQInfo, + dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead); vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter); sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); return; } @@ -1284,16 +1276,14 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { /* check if query is killed or not */ if (isQueryKilled(pQuery)) { - dTrace("QInfo:%p query is killed, reset signature", pQInfo); + dTrace("QInfo:%p query is killed", pQInfo); pQInfo->over = 1; } else { - dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned, reset signature", + dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); } - TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); - vnodeDecRefCount(pQInfo); } @@ -1305,14 +1295,12 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { } if (pQInfo->killed) { - TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo); - dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + dTrace("QInfo:%p it is already killed, abort", pQInfo); return; } assert(pQInfo->refCount >= 1); -// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); SQuery *pQuery = &pQInfo->query; pQuery->pointsRead = 0; @@ -1333,7 +1321,6 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { pQInfo->useconds += (taosGetTimestampUs() - st); pQInfo->over = isQueryKilled(pQuery) ? 1 : 0; - dTrace("QInfo:%p reset signature", pQInfo); taosInterpoSetStartInfo(&pQInfo->pMeterQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead, pQInfo->query.interpoType); @@ -1341,12 +1328,11 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { if (pQuery->pointsRead == 0) { pQInfo->over = 1; - dTrace("QInfo:%p over, %d meters queried, %d points are returned, reset signature", pQInfo, pSupporter->numOfMeters, + dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters, pQInfo->pointsRead); vnodePrintQueryStatistics(pSupporter); } sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo); } diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 5640c6bc0d..ccd59f356b 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -422,8 +422,6 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) { if (!vnodeIsQInfoValid(param)) return; pQInfo->killed = 1; - TSDB_WAIT_TO_SAFE_DROP_QINFO(pQInfo); - SMeterObj *pObj = pQInfo->pObj; dTrace("QInfo:%p start to free SQInfo", pQInfo); @@ -502,7 +500,6 @@ bool vnodeIsQInfoValid(void *param) { * into local variable, then compare by using local variable */ uint64_t sig = pQInfo->signature; -// return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG); return (sig == (uint64_t)pQInfo); } @@ -536,13 +533,11 @@ void vnodeQueryData(SSchedMsg *pMsg) { pQInfo = (SQInfo *)pMsg->ahandle; if (pQInfo->killed) { - TSDB_QINFO_RESET_SIG(pQInfo); - dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + dTrace("QInfo:%p it is already killed, abort", pQInfo); vnodeDecRefCount(pQInfo); return; } -// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); pQuery = &(pQInfo->query); SMeterObj *pObj = pQInfo->pObj; @@ -610,9 +605,6 @@ void vnodeQueryData(SSchedMsg *pMsg) { tclose(pQInfo->query.lfd); } - /* reset QInfo signature */ - dTrace("QInfo:%p reset signature", pQInfo); - TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); vnodeDecRefCount(pQInfo); } @@ -697,9 +689,6 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE schedMsg.fp = vnodeQueryData; } - // set in query flag -// pQInfo->signature = TSDB_QINFO_QUERY_FLAG; - /* * The reference count, which is 2, is for both the current query thread and the future retrieve request, * which will always be issued by client to acquire data or free SQInfo struct. @@ -818,7 +807,6 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE return pQInfo; } -// pQInfo->signature = TSDB_QINFO_QUERY_FLAG; vnodeAddRefCount(pQInfo); schedMsg.msg = NULL; @@ -900,18 +888,9 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { pQInfo->pointsRead); if (pQInfo->over == 0) { - //dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__); - dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature); -// uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo); + dTrace("QInfo:%p set query flag, sig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__); - /* - * If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo, - * since in release function, the original value has been destroyed. However, this memory area may be reused - * by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo. - */ -// if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) { if (pQInfo->killed == 1) { -// dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature); dTrace("%p freed or killed, abort query", pQInfo); } else { vnodeAddRefCount(pQInfo);