From 32ad0c6810636e202b6efd09c59d3db9d2307764 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 18 Apr 2020 18:16:08 +0800 Subject: [PATCH 1/6] [td-98] fix empty table in super table caused client crash. --- src/client/src/tscSQLParser.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index c541a35d12..befb8fcad1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4675,8 +4675,8 @@ int32_t parseLimitClause(SQueryInfo* pQueryInfo, int32_t clauseIndex, SQuerySQL* } // No tables included. No results generated. Query results are empty. - if (pTableMetaInfo->pTableMeta == NULL) { - tscTrace("%p no table in metricmeta, no output result", pSql); + if (pTableMetaInfo->vgroupList->numOfVgroups == 0) { + tscTrace("%p no table in super table, no output result", pSql); pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; } From 912d3baab4b477867c029fab584a53f9f29c60e2 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 18 Apr 2020 18:17:06 +0800 Subject: [PATCH 2/6] [td-98] refactor code, change the query function signature. --- src/inc/tsdb.h | 19 +-- src/query/src/queryExecutor.c | 257 +++++++++++++++------------------- src/tsdb/src/tsdbRead.c | 23 ++- 3 files changed, 126 insertions(+), 173 deletions(-) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index cc1e4daf12..8f4e1db590 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -144,6 +144,7 @@ typedef struct STableGroupList { // qualified table object list in group typedef struct STsdbQueryCond { STimeWindow twindow; int32_t order; // desc/asc order to iterate the data block + int32_t numOfCols; SColumnInfoData *colList; } STsdbQueryCond; @@ -188,7 +189,7 @@ typedef void *tsdbpos_t; * @param pTableList table sid list * @return */ -tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, SArray *pColumnInfo); +tsdb_query_handle_t *tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo); /** * move to next block @@ -239,20 +240,6 @@ SArray *tsdbRetrieveDataBlock(tsdb_query_handle_t *pQueryHandle, SArray *pIdList */ int32_t tsdbResetQuery(tsdb_query_handle_t *pQueryHandle, STimeWindow *window, tsdbpos_t position, int16_t order); -/** - * return the access position of current query handle - * @param pQueryHandle - * @return - */ -int32_t tsdbDataBlockSeek(tsdb_query_handle_t *pQueryHandle, tsdbpos_t pos); - -/** - * todo remove this function later - * @param pQueryHandle - * @return - */ -tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t *pQueryHandle); - /** * todo remove this function later * @param pQueryHandle @@ -290,7 +277,7 @@ SArray *tsdbGetTableList(tsdb_query_handle_t *pQueryHandle); * @param pTagCond. tag query condition * */ -int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupList, +int32_t tsdbQueryByTagsCond(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupList, SColIndex* pColIndex, int32_t numOfCols); int32_t tsdbGetOneTableGroup(tsdb_repo_t* tsdb, int64_t uid, STableGroupInfo* pGroupInfo); diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index f234e2cf46..02a3916b9e 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2226,105 +2226,96 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi pQuery->pSelectExpr[columnIndex].resBytes * realRowId; } -int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { - if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || - (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { - dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, - pQuery->window.ekey, pQuery->order.order); - - sem_post(&pQInfo->dataReady); - return TSDB_CODE_SUCCESS; - } - - pQuery->status = 0; - pQuery->rec = (SResultRec){0}; - - changeExecuteScanOrder(pQuery, true); - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - - /* - * since we employ the output control mechanism in main loop. - * so, disable it during data block scan procedure. - */ - setScanLimitationByResultBuffer(pQuery); - - // save raw query range for applying to each subgroup - pQuery->lastKey = pQuery->window.skey; - - // create runtime environment - // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; - - // get one queried meter - assert(0); - // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid); - - pRuntimeEnv->pTSBuf = param; - pRuntimeEnv->cur.vnodeIndex = -1; - - // set the ts-comp file traverse order - if (param != NULL) { - int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); - } - - assert(0); - // int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true); - // if (ret != TSDB_CODE_SUCCESS) { - // return ret; - // } - - // createTableGroup(pQInfo->pSidSet); - - int32_t size = getInitialPageNum(pQInfo); - int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - - if (pQuery->intervalTime == 0) { - int16_t type = TSDB_DATA_TYPE_NULL; - - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; - type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); - } else { - type = TSDB_DATA_TYPE_INT; // group id - } - - initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); - } - - pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); - - STsdbQueryCond cond = { - .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}, - .order = pQuery->order.order, - .colList = pQuery->colList, - }; - - // for(int32_t i = 0; i < pQInfo->pSidSet->numOfTables; ++i) { - // SMeterObj *p1 = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[i]->sid); - // taosArrayPush(sa, &p1); - // } - - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - - pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo, cols); - - // metric query do not invoke interpolation, it will be done at the second-stage merge - if (!isPointInterpoQuery(pQuery)) { - pQuery->interpoType = TSDB_INTERPO_NONE; - } - - TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, - pQuery->precision); - taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); - pRuntimeEnv->stableQuery = true; - - return TSDB_CODE_SUCCESS; -} +//int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { +// if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || +// (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { +// dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, +// pQuery->window.ekey, pQuery->order.order); +// +// sem_post(&pQInfo->dataReady); +// return TSDB_CODE_SUCCESS; +// } +// +// pQuery->status = 0; +// pQuery->rec = (SResultRec){0}; +// +// changeExecuteScanOrder(pQuery, true); +// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; +// +// /* +// * since we employ the output control mechanism in main loop. +// * so, disable it during data block scan procedure. +// */ +// setScanLimitationByResultBuffer(pQuery); +// +// // save raw query range for applying to each subgroup +// pQuery->lastKey = pQuery->window.skey; +// +// // create runtime environment +// // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; +// +// // get one queried meter +// assert(0); +// // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid); +// +// pRuntimeEnv->pTSBuf = param; +// pRuntimeEnv->cur.vnodeIndex = -1; +// +// // set the ts-comp file traverse order +// if (param != NULL) { +// int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; +// tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); +// } +// +// assert(0); +// // int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true); +// // if (ret != TSDB_CODE_SUCCESS) { +// // return ret; +// // } +// +// // createTableGroup(pQInfo->pSidSet); +// +// int32_t size = getInitialPageNum(pQInfo); +// int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); +// if (ret != TSDB_CODE_SUCCESS) { +// return ret; +// } +// +// if (pQuery->intervalTime == 0) { +// int16_t type = TSDB_DATA_TYPE_NULL; +// +// if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; +// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); +// } else { +// type = TSDB_DATA_TYPE_INT; // group id +// } +// +// initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); +// } +// +// pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); +// +// STsdbQueryCond cond = { +// .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}, +// .order = pQuery->order.order, +// .colList = pQuery->colList, +// +// }; +// +// pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo); +// +// // metric query do not invoke interpolation, it will be done at the second-stage merge +// if (!isPointInterpoQuery(pQuery)) { +// pQuery->interpoType = TSDB_INTERPO_NONE; +// } +// +// TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, +// pQuery->precision); +// taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); +// pRuntimeEnv->stableQuery = true; +// +// return TSDB_CODE_SUCCESS; +//} /** * decrease the refcount for each table involved in this query @@ -3440,11 +3431,11 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { disableFunctForTableSuppleScan(pRuntimeEnv, pQuery->order.order); queryStatusSave(pRuntimeEnv, &qStatus); - STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; +// STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; // reverse scan from current position - tsdbpos_t current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); - tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order); +// tsdbpos_t current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); +// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order); doScanAllDataBlocks(pRuntimeEnv); @@ -3535,26 +3526,17 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { break; } - // set the correct start position, and load the corresponding block in buffer for next round scan all data blocks. -// /*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos); - STsdbQueryCond cond = { - .twindow = {pQuery->window.skey, pQuery->lastKey}, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = {pQuery->window.skey, pQuery->lastKey}, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - if (pRuntimeEnv->pSecQueryHandle != NULL) { - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo, cols); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); } - taosArrayDestroy(cols); - status = pQuery->status; pRuntimeEnv->windowResInfo.curIndex = activeSlot; @@ -3573,7 +3555,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->window.skey = skey; pQuery->window.ekey = pQuery->lastKey - step; - /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); +// /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); doSingleMeterSupplementScan(pRuntimeEnv); @@ -4176,18 +4158,13 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery) pQuery->lastKey = pQuery->window.skey; STsdbQueryCond cond = { - .twindow = pQuery->window, - .order = pQuery->order.order, - .colList = pQuery->colList, + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } - pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo, cols); - taosArrayDestroy(cols); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo); pQInfo->tsdb = tsdb; pRuntimeEnv->pQuery = pQuery; @@ -4405,25 +4382,19 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { .twindow = {pInfo->pTableQInfo->lastKey, pInfo->pTableQInfo->win.ekey}, .order = pQuery->order.order, .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, }; - SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0])); - for (int32_t i = 0; i < pQuery->numOfCols; ++i) { - taosArrayPush(cols, &pQuery->colList[i]); - } SArray* g1 = taosArrayInit(1, POINTER_BYTES); + SArray* tx = taosArrayInit(1, sizeof(SPair)); + + taosArrayPush(tx, p); + taosArrayPush(g1, &tx); STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1}; - SArray* tx = taosArrayInit(1, sizeof(SPair)); - taosArrayPush(tx, p); - - taosArrayPush(g1, &tx); // include only current table - pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, cols); - -// vnodeUpdateQueryColumnIndex(pQuery, pRuntimeEnv->pMeterObj); -// vnodeUpdateFilterColumnIndex(pQuery); + pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp); if (pRuntimeEnv->pTSBuf != NULL) { if (pRuntimeEnv->cur.vnodeIndex == -1) { @@ -4587,13 +4558,11 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(taosArrayGetSize(group) == pQInfo->groupInfo.numOfTables && 1 == taosArrayGetSize(pQInfo->groupInfo.pGroupList)); while (pQInfo->tableIndex < pQInfo->groupInfo.numOfTables) { - int32_t k = pQInfo->tableIndex; - if (isQueryKilled(pQInfo)) { return; } - SPair *p = taosArrayGet(group, k); + SPair *p = taosArrayGet(group, pQInfo->tableIndex); STableDataInfo* pInfo = p->sec; TSKEY skey = pInfo->pTableQInfo->lastKey; @@ -4601,7 +4570,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQuery->window.skey = skey; } - if (!multiTableMultioutputHelper(pQInfo, k)) { + if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { pQInfo->tableIndex++; continue; } @@ -6046,7 +6015,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) STableId* id = taosArrayGet(pTableIdList, 0); id->uid = -1; //todo fix me - /*int32_t ret =*/ tsdbQueryTags(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); + /*int32_t ret =*/ tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); if (groupInfo->numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; goto _query_over; diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 600ed2ba8d..b2177a4cbc 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -134,7 +134,7 @@ static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { pCompBlockLoadInfo->fileListIndex = -1; } -tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, SArray* pColumnInfo) { +tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList) { // todo 1. filter not exist table // todo 2. add the reference count for each table that is involved in query @@ -147,7 +147,7 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S pQueryHandle->cur.fid = -1; size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList); - assert(sizeOfGroup >= 1); + assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0); pQueryHandle->pTableCheckInfo = taosArrayInit(groupList->numOfTables, sizeof(STableCheckInfo)); @@ -181,16 +181,15 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S pQueryHandle->activeIndex = 0; // allocate buffer in order to load data blocks from file - int32_t numOfCols = taosArrayGetSize(pColumnInfo); + int32_t numOfCols = pCond->numOfCols; size_t bufferCapacity = 4096; pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pColumnInfo, i); + for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData pDest = {{0}, 0}; - pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCol->info.bytes); - pDest.info = pCol->info; + pDest.info = pCond->colList[i].info; + pDest.pData = calloc(1, EXTRA_BYTES + bufferCapacity * pCond->colList[i].info.bytes); taosArrayPush(pQueryHandle->pColumns, &pDest); } @@ -1114,10 +1113,6 @@ int32_t tsdbResetQuery(tsdb_query_handle_t* pQueryHandle, STimeWindow* window, t return 0; } -int32_t tsdbDataBlockSeek(tsdb_query_handle_t* pQueryHandle, tsdbpos_t pos) { return 0; } - -tsdbpos_t tsdbDataBlockTell(tsdb_query_handle_t* pQueryHandle) { return NULL; } - SArray* tsdbRetrieveDataRow(tsdb_query_handle_t* pQueryHandle, SArray* pIdList, SQueryRowCond* pCond) { return NULL; } tsdb_query_handle_t* tsdbQueryFromTagConds(STsdbQueryCond* pCond, int16_t stableId, const char* pTagFilterStr) { @@ -1263,12 +1258,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { SColIndex* pColIndex = &pTableGroupSupp->pCols[i]; int32_t colIndex = pColIndex->colIndex; + assert(colIndex >= 0 && colIndex < schemaNCols(pTableGroupSupp->pTagSchema)); + char * f1 = NULL; char * f2 = NULL; int32_t type = 0; int32_t bytes = 0; - if (colIndex == -1) { // table name, todo fix me + if (colIndex == -1) { // todo fix me, table name // f1 = s1->tags; // f2 = s2->tags; type = TSDB_DATA_TYPE_BINARY; @@ -1435,7 +1432,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr) return TSDB_CODE_SUCCESS; } -int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo, +int32_t tsdbQueryByTagsCond(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size_t len, STableGroupInfo* pGroupInfo, SColIndex* pColIndex, int32_t numOfCols) { STable* pSTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid); From 119549c463486d44bf73e431861bc61740ccc276 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sat, 18 Apr 2020 22:43:54 +0800 Subject: [PATCH 3/6] [td-98] fix memory leak --- src/query/src/queryExecutor.c | 51 ++++++++++++----------------------- 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 02a3916b9e..9206becaab 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -3208,7 +3208,7 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * } } -void disableFunctForTableSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { +void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { SQuery *pQuery = pRuntimeEnv->pQuery; // group by normal columns and interval query on normal table @@ -3416,7 +3416,7 @@ static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pSta tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); } -static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { +static void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryStatus qStatus = {0}; @@ -3424,11 +3424,11 @@ static void doSingleMeterSupplementScan(SQueryRuntimeEnv *pRuntimeEnv) { return; } - dTrace("QInfo:%p start to supp scan", GET_QINFO_ADDR(pQuery)); + dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); // close necessary function execution during supplementary scan - disableFunctForTableSuppleScan(pRuntimeEnv, pQuery->order.order); + disableFuncInReverseScan(pRuntimeEnv, pQuery->order.order); queryStatusSave(pRuntimeEnv, &qStatus); // STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; @@ -3557,7 +3557,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->window.ekey = pQuery->lastKey - step; // /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); - doSingleMeterSupplementScan(pRuntimeEnv); + doReverseScan(pRuntimeEnv); // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan pQuery->lastKey = lkey; @@ -5866,13 +5866,6 @@ static void freeQInfo(SQInfo *pQInfo) { sem_destroy(&(pQInfo->dataReady)); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); -// if (pQInfo->pTableDataInfo != NULL) { - // size_t num = taosHashGetSize(pQInfo->groupInfo); -// for (int32_t j = 0; j < 0; ++j) { -// destroyMeterQueryInfo(pQInfo->pTableDataInfo[j].pTableQInfo, pQuery->numOfOutputCols); -// } -// } - for (int32_t i = 0; i < pQuery->numOfFilterCols; ++i) { SSingleColumnFilterInfo *pColFilter = &pQuery->pFilterInfo[i]; if (pColFilter->numOfFilters > 0) { @@ -5904,8 +5897,13 @@ static void freeQInfo(SQInfo *pQInfo) { tfree(pQuery->pGroupbyExpr); tfree(pQuery); - taosArrayDestroy(pQInfo->groupInfo.pGroupList); + int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); + for(int32_t i = 0; i < numOfGroups; ++i) { + SArray* p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i); + taosArrayDestroy(p); + } + taosArrayDestroy(pQInfo->groupInfo.pGroupList); dTrace("QInfo:%p QInfo is freed", pQInfo); // destroy signature, in order to avoid the query process pass the object safety check @@ -6007,7 +6005,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) } bool isSTableQuery = false; - STableGroupInfo* groupInfo = calloc(1, sizeof(STableGroupInfo)); + STableGroupInfo groupInfo = {0}; if ((pQueryMsg->queryType & TSDB_QUERY_TYPE_STABLE_QUERY) != 0) { isSTableQuery = true; @@ -6015,8 +6013,8 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) STableId* id = taosArrayGet(pTableIdList, 0); id->uid = -1; //todo fix me - /*int32_t ret =*/ tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); - if (groupInfo->numOfTables == 0) { // no qualified tables no need to do query + /*int32_t ret =*/ tsdbQueryByTagsCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, &groupInfo, pGroupColIndex, pQueryMsg->numOfGroupCols); + if (groupInfo.numOfTables == 0) { // no qualified tables no need to do query code = TSDB_CODE_SUCCESS; goto _query_over; } @@ -6024,12 +6022,12 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) assert(taosArrayGetSize(pTableIdList) == 1); STableId* id = taosArrayGet(pTableIdList, 0); - if ((code = tsdbGetOneTableGroup(tsdb, id->uid, groupInfo)) != TSDB_CODE_SUCCESS) { + if ((code = tsdbGetOneTableGroup(tsdb, id->uid, &groupInfo)) != TSDB_CODE_SUCCESS) { goto _query_over; } } - (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, groupInfo); + (*pQInfo) = createQInfoImpl(pQueryMsg, pGroupbyExpr, pExprs, &groupInfo); if ((*pQInfo) == NULL) { code = TSDB_CODE_SERV_OUT_OF_MEMORY; } @@ -6037,24 +6035,9 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo) code = initQInfo(pQueryMsg, tsdb, *pQInfo, isSTableQuery); _query_over: - if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pTableIdList); - } + taosArrayDestroy(pTableIdList); // if failed to add ref for all meters in this query, abort current query - // if (code != TSDB_CODE_SUCCESS) { - // vnodeDecQueryRefCount(pQueryMsg, pMeterObjList, incNumber); - // } - // - // tfree(pQueryMsg->pSqlFuncExprs); - // tfree(pMeterObjList); - // ret = vnodeSendQueryRspMsg(pObj, code, pObj->qhandle); - // - // tfree(pQueryMsg->pSidExtInfo); - // for(int32_t i = 0; i < pQueryMsg->numOfCols; ++i) { - // vnodeFreeColumnInfo(&pQueryMsg->colList[i]); - // } - // // atomic_fetch_add_32(&vnodeSelectReqNum, 1); return TSDB_CODE_SUCCESS; } From 47a983fbf7640d397ba575f7446f7d47a9cc4298 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sun, 19 Apr 2020 01:15:08 +0800 Subject: [PATCH 4/6] [td-98] fix bugs in reversed single table query --- src/query/inc/queryExecutor.h | 4 +- src/query/src/queryExecutor.c | 206 ++++++++++++++++++++-------------- 2 files changed, 121 insertions(+), 89 deletions(-) diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index 244f15e1dd..c07897abf6 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -170,11 +170,11 @@ typedef struct SQInfo { int32_t pointsInterpo; int32_t code; // error code to returned to client sem_t dataReady; - STableGroupInfo groupInfo; // table id list void* tsdb; + STableGroupInfo groupInfo; // table id list SQueryRuntimeEnv runtimeEnv; - int32_t subgroupIdx; + int32_t groupIndex; int32_t offset; /* offset in group result set of subgroup */ T_REF_DECLARE() diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 9206becaab..6ca8b3ab4a 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2900,14 +2900,14 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList); - while (pQInfo->subgroupIdx < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); + while (pQInfo->groupIndex < numOfGroups) { + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); ret = mergeIntoGroupResultImpl(pQInfo, group); if (ret < 0) { // not enough disk space to save the data into disk return -1; } - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; // this group generates at least one result, return results if (ret > 0) { @@ -2915,11 +2915,11 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) { } assert(pQInfo->numOfGroupResultPages == 0); - dTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->subgroupIdx - 1); + dTrace("QInfo:%p no result in group %d, continue", pQInfo, pQInfo->groupIndex - 1); } dTrace("QInfo:%p merge res data into group, index:%d, total group:%d, elapsed time:%lldms", - pQInfo, pQInfo->subgroupIdx - 1, numOfGroups, taosGetTimestampMs() - st); + pQInfo, pQInfo->groupIndex - 1, numOfGroups, taosGetTimestampMs() - st); return TSDB_CODE_SUCCESS; } @@ -2934,7 +2934,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { } // set current query completed - // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->subgroupIdx == pQInfo->pSidSet->numOfSubSet) { + // if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == pQInfo->pSidSet->numOfSubSet) { // pQInfo->tableIndex = pQInfo->pSidSet->numOfTables; // return; // } @@ -2943,7 +2943,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) { SQueryRuntimeEnv * pRuntimeEnv = &pQInfo->runtimeEnv; SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - int32_t id = getGroupResultId(pQInfo->subgroupIdx - 1); + int32_t id = getGroupResultId(pQInfo->groupIndex - 1); SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id); int32_t total = 0; @@ -3149,7 +3149,7 @@ int32_t flushFromResultBuf(SQInfo *pQInfo) { r = capacity; } - int32_t id = getGroupResultId(pQInfo->subgroupIdx) + pQInfo->numOfGroupResultPages; + int32_t id = getGroupResultId(pQInfo->groupIndex) + pQInfo->numOfGroupResultPages; tFilePage *buf = getNewDataBuf(pResultBuf, id, &pageId); // pagewise copy to dest buffer @@ -3198,8 +3198,8 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) { + if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || + ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { buf->resultInfo[j].complete = false; } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { buf->resultInfo[j].complete = true; @@ -3208,32 +3208,28 @@ static void doDisableFunctsForSupplementaryScan(SQuery *pQuery, SWindowResInfo * } } -void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { +void disableFuncInReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - + int32_t order = pQuery->order.order; + // group by normal columns and interval query on normal table - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; - } - SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { doDisableFunctsForSupplementaryScan(pQuery, pWindowResInfo, order); } else { // for simple result of table query, for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; + int32_t functId = pQuery->pSelectExpr[j].pBase.functionId; + SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[j]; - if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_DESC) || - ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_ASC)) { + if (((functId == TSDB_FUNC_FIRST || functId == TSDB_FUNC_FIRST_DST) && order == TSDB_ORDER_ASC) || + ((functId == TSDB_FUNC_LAST || functId == TSDB_FUNC_LAST_DST) && order == TSDB_ORDER_DESC)) { pCtx->resultInfo->complete = false; } else if (functId != TSDB_FUNC_TS && functId != TSDB_FUNC_TAG) { pCtx->resultInfo->complete = true; } } } - - pQuery->order.order = pQuery->order.order ^ 1u; } void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { @@ -3259,14 +3255,11 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { pQuery->order.order = (pQuery->order.order) ^ 1u; } -void enableFuncForForwardScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { +void setCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1u; + pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; } - - pQuery->order.order = (pQuery->order.order) ^ 1u; } void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTableQuery, SPosInfo *posInfo) { @@ -3381,8 +3374,8 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { } typedef struct SQueryStatus { - int8_t overStatus; - TSKEY lastKey; + int8_t status; +// TSKEY lastKey; STSCursor cur; } SQueryStatus; @@ -3390,8 +3383,8 @@ typedef struct SQueryStatus { static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; - pStatus->overStatus = pQuery->status; - pStatus->lastKey = pQuery->lastKey; + pStatus->status = pQuery->status; +// pStatus->lastKey = pQuery->lastKey; pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor @@ -3402,21 +3395,21 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - pQuery->lastKey = pQuery->window.skey; +// SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); +// pQuery->lastKey = pQuery->window.skey; } static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - pQuery->lastKey = pStatus->lastKey; - pQuery->status = pStatus->overStatus; +// pQuery->lastKey = pStatus->lastKey; + pQuery->status = pStatus->status; tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); } -static void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { +static UNUSED_FUNC void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SQuery * pQuery = pRuntimeEnv->pQuery; SQueryStatus qStatus = {0}; @@ -3428,19 +3421,15 @@ static void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); // close necessary function execution during supplementary scan - disableFuncInReverseScan(pRuntimeEnv, pQuery->order.order); + disableFuncInReverseScan(pRuntimeEnv); queryStatusSave(pRuntimeEnv, &qStatus); -// STimeWindow w = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; - // reverse scan from current position -// tsdbpos_t current = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); -// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &w, current, pQuery->order.order); - doScanAllDataBlocks(pRuntimeEnv); queryStatusRestore(pRuntimeEnv, &qStatus); - enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); + setCtxOrder(pRuntimeEnv); + SET_MASTER_SCAN_FLAG(pRuntimeEnv); } @@ -3508,7 +3497,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { int64_t skey = pQuery->lastKey; int32_t status = pQuery->status; - int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex; + int32_t prevSlot = pRuntimeEnv->windowResInfo.curIndex; SET_MASTER_SCAN_FLAG(pRuntimeEnv); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -3517,55 +3506,98 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { doScanAllDataBlocks(pRuntimeEnv); if (!needScanDataBlocksAgain(pRuntimeEnv)) { - // restore the status if (pRuntimeEnv->scanFlag == REPEAT_SCAN) { - pQuery->status = status; + pQuery->status = status; // restore the status code when abort from repeat scan } break; } STsdbQueryCond cond = { - .twindow = {pQuery->window.skey, pQuery->lastKey}, + .twindow = {.skey = skey, .ekey = pQuery->lastKey - step}, .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; - if (pRuntimeEnv->pSecQueryHandle != NULL) { + if (pRuntimeEnv->pSecQueryHandle == NULL) { pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); } - status = pQuery->status; - pRuntimeEnv->windowResInfo.curIndex = activeSlot; + status = pQuery->status; // backup the status + pRuntimeEnv->windowResInfo.curIndex = prevSlot; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; // check if query is killed or not - if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) { + if (isQueryKilled(pQInfo)) { return; } } + + if (!needReverseScan(pQuery)) { + return; + } + + // save the query time window + STimeWindow prev = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey}; - // no need to set the end key - TSKEY lkey = pQuery->lastKey; - TSKEY ekey = pQuery->window.ekey; + // reverse order time range + pQuery->window.skey = pQuery->lastKey - step; + pQuery->window.ekey = skey; + + pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, + }; + + // clean unused handle + if (pRuntimeEnv->pSecQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); + } - pQuery->window.skey = skey; - pQuery->window.ekey = pQuery->lastKey - step; -// /*tsdbpos_t current =*/ tsdbDataBlockTell(pRuntimeEnv->pQueryHandle); - - doReverseScan(pRuntimeEnv); - - // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query during reverse scan - pQuery->lastKey = lkey; - pQuery->window.ekey = ekey; - -// STimeWindow win = {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}; -// tsdbResetQuery(pRuntimeEnv->pQueryHandle, &win, current, pQuery->order.order); -// tsdbNextDataBlock(pRuntimeEnv->pQueryHandle); + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + + dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); + SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); + + int32_t status1 = pQuery->status; + + STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor + if (pRuntimeEnv->pTSBuf) { + pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; + tsBufNextPos(pRuntimeEnv->pTSBuf); + } + + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + + setCtxOrder(pRuntimeEnv); + disableFuncInReverseScan(pRuntimeEnv); + + // reverse scan from current position + doScanAllDataBlocks(pRuntimeEnv); + + pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + tsBufSetCursor(pRuntimeEnv->pTSBuf, &cur); + if (pRuntimeEnv->pTSBuf) { + pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; + } + + setCtxOrder(pRuntimeEnv); + + SET_MASTER_SCAN_FLAG(pRuntimeEnv); + + // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query + // during reverse scan + pQuery->lastKey = prev.skey; + pQuery->status = status1; + pQuery->window.ekey = prev.ekey; } void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { @@ -3859,17 +3891,17 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde int32_t totalSubset = getNumOfSubset(pQInfo); if (orderType == TSDB_ORDER_ASC) { - startIdx = pQInfo->subgroupIdx; + startIdx = pQInfo->groupIndex; step = 1; } else { // desc order copy all data - startIdx = totalSubset - pQInfo->subgroupIdx - 1; + startIdx = totalSubset - pQInfo->groupIndex - 1; step = -1; } for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) { if (result[i].numOfRows == 0) { pQInfo->offset = 0; - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; continue; } @@ -3887,7 +3919,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SWindowResult *result, int32_t orde pQInfo->offset += numOfRowsToCopy; } else { pQInfo->offset = 0; - pQInfo->subgroupIdx += 1; + pQInfo->groupIndex += 1; } for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -4474,14 +4506,14 @@ static void sequentialTableProcess(SQInfo *pQInfo) { assert(pQuery->limit.offset == 0 && pQuery->limit.limit != 0); #if 0 - while (pQInfo->subgroupIdx < numOfGroups) { + while (pQInfo->groupIndex < numOfGroups) { - SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->subgroupIdx); + SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex); size_t numOfTable = taosArrayGetSize(group); if (isFirstLastRowQuery(pQuery)) { dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pQInfo->subgroupIdx); + pQInfo->groupIndex); TSKEY key = -1; int32_t index = -1; @@ -4502,7 +4534,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // assert(num >= 0); } else { dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pTableIdList->numOfSubSet, - pQInfo->subgroupIdx); + pQInfo->groupIndex); for (int32_t k = start; k <= end; ++k) { if (isQueryKilled(pQInfo)) { @@ -4520,7 +4552,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - pSupporter->subgroupIdx++; + pSupporter->groupIndex++; // output buffer is full, return to client if (pQuery->size >= pQuery->pointsToRead) { @@ -4537,7 +4569,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { * if the subgroup index is larger than 0, results generated by group by tbname,k is existed. * we need to return it to client in the first place. */ - if (pQInfo->subgroupIdx > 0) { + if (pQInfo->groupIndex > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.total += pQuery->rec.rows; @@ -4667,7 +4699,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } } - pQInfo->subgroupIdx = 0; + pQInfo->groupIndex = 0; pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } @@ -4744,7 +4776,7 @@ static void doRestoreContext(SQInfo* pQInfo) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } - enableFuncForForwardScan(pRuntimeEnv, pQuery->order.order); + setCtxOrder(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); } @@ -4777,9 +4809,9 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; - if (pQInfo->subgroupIdx > 0) { + if (pQInfo->groupIndex > 0) { /* - * if the subgroupIdx > 0, the query process must be completed yet, we only need to + * if the groupIndex > 0, the query process must be completed yet, we only need to * copy the data into output buffer */ if (isIntervalQuery(pQuery)) { @@ -4841,7 +4873,7 @@ static void multiTableQueryProcess(SQInfo *pQInfo) { } if (isIntervalQuery(pQuery) || isSumAvgRateQuery(pQuery)) { -// assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); +// assert(pSupporter->groupIndex == 0 && pSupporter->numOfGroupResultPages == 0); if (mergeIntoGroupResult(pQInfo) == TSDB_CODE_SUCCESS) { copyResToQueryResultBuf(pQInfo, pQuery); @@ -4979,11 +5011,11 @@ static void tableIntervalProcess(SQInfo *pQInfo) { tableIntervalProcessImpl(pRuntimeEnv); if (isIntervalQuery(pQuery)) { - pQInfo->subgroupIdx = 0; // always start from 0 + pQInfo->groupIndex = 0; // always start from 0 pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); } // the offset is handled at prepare stage if no interpolation involved @@ -5015,10 +5047,10 @@ static void tableIntervalProcess(SQInfo *pQInfo) { // all data scanned, the group by normal column can return if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // todo refactor with merge interval time result - pQInfo->subgroupIdx = 0; + pQInfo->groupIndex = 0; pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); } pQInfo->pointsInterpo += numOfInterpo; @@ -5054,13 +5086,13 @@ static void tableQueryImpl(SQInfo* pQInfo) { // todo limit the output for interval query? pQuery->rec.rows = 0; - pQInfo->subgroupIdx = 0; // always start from 0 + pQInfo->groupIndex = 0; // always start from 0 if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQuery->rec.rows += pQuery->rec.rows; - clearFirstNTimeWindow(pRuntimeEnv, pQInfo->subgroupIdx); + clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex); if (pQuery->rec.rows > 0) { dTrace("QInfo:%p %d rows returned from group results, total:%d", pQInfo, pQuery->rec.rows, pQuery->rec.total); From ec902f8b1340919bf33d9cede464e04d9e653741 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sun, 19 Apr 2020 22:40:02 +0800 Subject: [PATCH 5/6] [td-98] refactor code in reversed scan --- src/query/inc/qast.h | 2 - src/query/src/queryExecutor.c | 329 +++++++++++----------------------- src/tsdb/src/tsdbRead.c | 2 +- 3 files changed, 107 insertions(+), 226 deletions(-) diff --git a/src/query/inc/qast.h b/src/query/inc/qast.h index bd5e61c321..f3484509f8 100644 --- a/src/query/inc/qast.h +++ b/src/query/inc/qast.h @@ -30,8 +30,6 @@ extern "C" { struct tExprNode; struct SSchema; -struct tSkipList; -struct tSkipListNode; enum { TSQL_NODE_EXPR = 0x1, diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 6ca8b3ab4a..8ee337b8ed 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -50,6 +50,7 @@ #define GET_QINFO_ADDR(x) ((void*)((char *)(x)-offsetof(SQInfo, runtimeEnv))) #define GET_COL_DATA_POS(query, index, step) ((query)->pos + (index) * (step)) +#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC)) /* get the qinfo struct address from the query struct address */ #define GET_COLUMN_BYTES(query, colidx) \ @@ -84,15 +85,24 @@ typedef enum { QUERY_OVER = 0x8u, } vnodeQueryStatus; -static void setQueryStatus(SQuery *pQuery, int8_t status); -bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } - enum { TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_NOT_EQUALS = 1, TS_JOIN_TAG_NOT_EQUALS = 2, }; +typedef struct { + int32_t status; // query status + TSKEY lastKey; // the lastKey value before query executed + STimeWindow w; // whole query time window + STimeWindow current; // current query window + int32_t windowIndex; // index of active time window result for interval query + STSCursor cur; +} SQueryStatusInfo; + +static void setQueryStatus(SQuery *pQuery, int8_t status); +bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; } + static int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray* group); static void setWindowResOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pResult); @@ -2226,102 +2236,11 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi pQuery->pSelectExpr[columnIndex].resBytes * realRowId; } -//int32_t UNUSED_FUNC vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { -// if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) || -// (!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) { -// dTrace("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey, -// pQuery->window.ekey, pQuery->order.order); -// -// sem_post(&pQInfo->dataReady); -// return TSDB_CODE_SUCCESS; -// } -// -// pQuery->status = 0; -// pQuery->rec = (SResultRec){0}; -// -// changeExecuteScanOrder(pQuery, true); -// SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; -// -// /* -// * since we employ the output control mechanism in main loop. -// * so, disable it during data block scan procedure. -// */ -// setScanLimitationByResultBuffer(pQuery); -// -// // save raw query range for applying to each subgroup -// pQuery->lastKey = pQuery->window.skey; -// -// // create runtime environment -// // SColumnModel *pTagSchemaInfo = pQInfo->pSidSet->pColumnModel; -// -// // get one queried meter -// assert(0); -// // SMeterObj *pMeter = getMeterObj(pQInfo->groupInfo, pQInfo->pSidSet->pTableIdList[0]->sid); -// -// pRuntimeEnv->pTSBuf = param; -// pRuntimeEnv->cur.vnodeIndex = -1; -// -// // set the ts-comp file traverse order -// if (param != NULL) { -// int16_t order = (pQuery->order.order == pRuntimeEnv->pTSBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; -// tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order); -// } -// -// assert(0); -// // int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pQInfo->runtimeEnv, pTagSchemaInfo, TSDB_ORDER_ASC, true); -// // if (ret != TSDB_CODE_SUCCESS) { -// // return ret; -// // } -// -// // createTableGroup(pQInfo->pSidSet); -// -// int32_t size = getInitialPageNum(pQInfo); -// int32_t ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, size, pQuery->rowSize); -// if (ret != TSDB_CODE_SUCCESS) { -// return ret; -// } -// -// if (pQuery->intervalTime == 0) { -// int16_t type = TSDB_DATA_TYPE_NULL; -// -// if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group by columns not tags; -// type = getGroupbyColumnType(pQuery, pQuery->pGroupbyExpr); -// } else { -// type = TSDB_DATA_TYPE_INT; // group id -// } -// -// initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); -// } -// -// pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); -// -// STsdbQueryCond cond = { -// .twindow = (STimeWindow) {.skey = pQuery->window.skey, .ekey = pQuery->window.ekey}, -// .order = pQuery->order.order, -// .colList = pQuery->colList, -// -// }; -// -// pRuntimeEnv->pQueryHandle = tsdbQueryTables(NULL, &cond, &pQInfo->groupInfo); -// -// // metric query do not invoke interpolation, it will be done at the second-stage merge -// if (!isPointInterpoQuery(pQuery)) { -// pQuery->interpoType = TSDB_INTERPO_NONE; -// } -// -// TSKEY revisedStime = taosGetIntervalStartTimestamp(pQuery->window.skey, pQuery->intervalTime, pQuery->slidingTimeUnit, -// pQuery->precision); -// taosInitInterpoInfo(&pRuntimeEnv->interpoInfo, pQuery->order.order, revisedStime, 0, 0); -// pRuntimeEnv->stableQuery = true; -// -// return TSDB_CODE_SUCCESS; -//} - /** * decrease the refcount for each table involved in this query * @param pQInfo */ -void vnodeDecMeterRefcnt(SQInfo *pQInfo) { +UNUSED_FUNC void vnodeDecMeterRefcnt(SQInfo *pQInfo) { if (pQInfo != NULL) { // assert(taosHashGetSize(pQInfo->groupInfo) >= 1); } @@ -2355,7 +2274,7 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) { #endif } -void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) { +UNUSED_FUNC void setTimestampRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t stime, int64_t etime) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { @@ -3255,10 +3174,10 @@ void disableFuncForReverseScan(SQInfo *pQInfo, int32_t order) { pQuery->order.order = (pQuery->order.order) ^ 1u; } -void setCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { +void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; + SWITCH_ORDER(pRuntimeEnv->pCtx[i].order);// = (pRuntimeEnv->pCtx[i].order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; } } @@ -3373,66 +3292,6 @@ void doSkipResults(SQueryRuntimeEnv *pRuntimeEnv) { } } -typedef struct SQueryStatus { - int8_t status; -// TSKEY lastKey; - STSCursor cur; -} SQueryStatus; - -// todo refactor -static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { - SQuery *pQuery = pRuntimeEnv->pQuery; - - pStatus->status = pQuery->status; -// pStatus->lastKey = pQuery->lastKey; - - pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor - - if (pRuntimeEnv->pTSBuf) { - pRuntimeEnv->pTSBuf->cur.order ^= 1u; - tsBufNextPos(pRuntimeEnv->pTSBuf); - } - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - -// SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); -// pQuery->lastKey = pQuery->window.skey; -} - -static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { - SQuery *pQuery = pRuntimeEnv->pQuery; - SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY); - -// pQuery->lastKey = pStatus->lastKey; - pQuery->status = pStatus->status; - - tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); -} - -static UNUSED_FUNC void doReverseScan(SQueryRuntimeEnv *pRuntimeEnv) { - SQuery * pQuery = pRuntimeEnv->pQuery; - SQueryStatus qStatus = {0}; - - if (!needReverseScan(pQuery)) { - return; - } - - dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); - SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - - // close necessary function execution during supplementary scan - disableFuncInReverseScan(pRuntimeEnv); - queryStatusSave(pRuntimeEnv, &qStatus); - - // reverse scan from current position - doScanAllDataBlocks(pRuntimeEnv); - - queryStatusRestore(pRuntimeEnv, &qStatus); - setCtxOrder(pRuntimeEnv); - - SET_MASTER_SCAN_FLAG(pRuntimeEnv); -} - void setQueryStatus(SQuery *pQuery, int8_t status) { if (status == QUERY_NOT_COMPLETED) { pQuery->status = status; @@ -3488,45 +3347,118 @@ bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) { return toContinue; } +static SQueryStatusInfo getQueryStatusInfo(SQueryRuntimeEnv* pRuntimeEnv) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + SQueryStatusInfo info = { + .status = pQuery->status, + .windowIndex = pRuntimeEnv->windowResInfo.curIndex, + .lastKey = pQuery->lastKey, + .w = pQuery->window, + }; + + return info; +} + +static void setEnvBeforeReverseScan(SQueryRuntimeEnv* pRuntimeEnv, SQueryStatusInfo* pStatus) { + SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQuery* pQuery = pRuntimeEnv->pQuery; + + // the step should be placed before order changed + int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); + + pStatus->cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor + if (pRuntimeEnv->pTSBuf) { + SWITCH_ORDER(pRuntimeEnv->pTSBuf->cur.order); + tsBufNextPos(pRuntimeEnv->pTSBuf); + } + + // reverse order time range + pQuery->window.skey = pQuery->lastKey - step; + pQuery->window.ekey = pStatus->lastKey; // the start timestamp of current query + + SWITCH_ORDER(pQuery->order.order); + SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); + + STsdbQueryCond cond = { + .twindow = pQuery->window, + .order = pQuery->order.order, + .colList = pQuery->colList, + .numOfCols = pQuery->numOfCols, + }; + + // clean unused handle + if (pRuntimeEnv->pSecQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); + } + + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + + setQueryStatus(pQuery, QUERY_NOT_COMPLETED); + switchCtxOrder(pRuntimeEnv); + disableFuncInReverseScan(pRuntimeEnv); +} + +static void clearEnvAfterReverseScan(SQueryRuntimeEnv* pRuntimeEnv, TSKEY lastKey, SQueryStatusInfo* pStatus) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + SWITCH_ORDER(pQuery->order.order); + switchCtxOrder(pRuntimeEnv); + + tsBufSetCursor(pRuntimeEnv->pTSBuf, &pStatus->cur); + if (pRuntimeEnv->pTSBuf) { + pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; + } + + SET_MASTER_SCAN_FLAG(pRuntimeEnv); + + // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query + // during reverse scan + pQuery->lastKey = lastKey; + pQuery->status = pStatus->status; + pQuery->window = pStatus->w; +} + void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); // store the start query position SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); - - int64_t skey = pQuery->lastKey; - int32_t status = pQuery->status; - int32_t prevSlot = pRuntimeEnv->windowResInfo.curIndex; + SQueryStatusInfo qstatus = getQueryStatusInfo(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); while (1) { doScanAllDataBlocks(pRuntimeEnv); + + if (pRuntimeEnv->scanFlag == MASTER_SCAN) { + qstatus.status = pQuery->status; + } if (!needScanDataBlocksAgain(pRuntimeEnv)) { - // restore the status + // restore the status code and jump out of loop if (pRuntimeEnv->scanFlag == REPEAT_SCAN) { - pQuery->status = status; // restore the status code when abort from repeat scan + pQuery->status = qstatus.status; } break; } STsdbQueryCond cond = { - .twindow = {.skey = skey, .ekey = pQuery->lastKey - step}, + .twindow = {.skey = qstatus.lastKey, .ekey = pQuery->lastKey - step}, .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; - if (pRuntimeEnv->pSecQueryHandle == NULL) { - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + if (pRuntimeEnv->pSecQueryHandle != NULL) { + tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); } - status = pQuery->status; // backup the status - pRuntimeEnv->windowResInfo.curIndex = prevSlot; + pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); + pRuntimeEnv->windowResInfo.curIndex = qstatus.windowIndex; setQueryStatus(pQuery, QUERY_NOT_COMPLETED); pRuntimeEnv->scanFlag = REPEAT_SCAN; @@ -3541,63 +3473,14 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { return; } - // save the query time window - STimeWindow prev = {.skey = pQuery->lastKey, .ekey = pQuery->window.ekey}; + TSKEY lastKey = pQuery->lastKey; + setEnvBeforeReverseScan(pRuntimeEnv, &qstatus); - // reverse order time range - pQuery->window.skey = pQuery->lastKey - step; - pQuery->window.ekey = skey; - - pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; - - STsdbQueryCond cond = { - .twindow = pQuery->window, - .order = pQuery->order.order, - .colList = pQuery->colList, - .numOfCols = pQuery->numOfCols, - }; - - // clean unused handle - if (pRuntimeEnv->pSecQueryHandle != NULL) { - tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); - } - - pRuntimeEnv->pSecQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &pQInfo->groupInfo); - - dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); - SET_SUPPLEMENT_SCAN_FLAG(pRuntimeEnv); - - int32_t status1 = pQuery->status; - - STSCursor cur = tsBufGetCursor(pRuntimeEnv->pTSBuf); // save the cursor - if (pRuntimeEnv->pTSBuf) { - pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; - tsBufNextPos(pRuntimeEnv->pTSBuf); - } - - setQueryStatus(pQuery, QUERY_NOT_COMPLETED); - - setCtxOrder(pRuntimeEnv); - disableFuncInReverseScan(pRuntimeEnv); - // reverse scan from current position + dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); doScanAllDataBlocks(pRuntimeEnv); - pQuery->order.order = (pQuery->order.order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC; - tsBufSetCursor(pRuntimeEnv->pTSBuf, &cur); - if (pRuntimeEnv->pTSBuf) { - pRuntimeEnv->pTSBuf->cur.order = pQuery->order.order; - } - - setCtxOrder(pRuntimeEnv); - - SET_MASTER_SCAN_FLAG(pRuntimeEnv); - - // update the pQuery->window.skey and pQuery->window.ekey to limit the scan scope of sliding query - // during reverse scan - pQuery->lastKey = prev.skey; - pQuery->status = status1; - pQuery->window.ekey = prev.ekey; + clearEnvAfterReverseScan(pRuntimeEnv, lastKey, &qstatus); } void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { @@ -4776,7 +4659,7 @@ static void doRestoreContext(SQInfo* pQInfo) { pRuntimeEnv->pTSBuf->cur.order = pRuntimeEnv->pTSBuf->cur.order ^ 1; } - setCtxOrder(pRuntimeEnv); + switchCtxOrder(pRuntimeEnv); SET_MASTER_SCAN_FLAG(pRuntimeEnv); } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index b2177a4cbc..c9b3a655c7 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1049,7 +1049,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) { if (pTable->mem != NULL) { // create mem table iterator if it is not created yet assert(pCheckInfo->iter != NULL); - rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle); + rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 4000, &skey, &ekey, pHandle); // update the last key value pCheckInfo->lastKey = ekey + step; From 3a3ccf4065f15f3c247ee3147941e5f0754271bc Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Sun, 19 Apr 2020 23:26:32 +0800 Subject: [PATCH 6/6] [td-98] fix compiler error --- src/query/src/queryExecutor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index a4895e8f85..6ded52d11d 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -3475,7 +3475,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { setEnvBeforeReverseScan(pRuntimeEnv, &qstatus); // reverse scan from current position - dTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); + qTrace("QInfo:%p start to reverse scan", GET_QINFO_ADDR(pRuntimeEnv)); doScanAllDataBlocks(pRuntimeEnv); clearEnvAfterReverseScan(pRuntimeEnv, lastKey, &qstatus);