From 3a0ea21342749f91c8e4d80a35c91c40b1e73c9e Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Thu, 13 Oct 2022 19:54:25 +0800 Subject: [PATCH 01/24] fix: get SLastCol from array instead of SColVal --- source/dnode/vnode/src/tsdb/tsdbCache.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index ac6be9af2d..1615edb6ce 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1189,7 +1189,8 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { setNoneCol = false; for (iCol = noneCol; iCol < nCol; ++iCol) { // high version's column value - SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol); + SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol); + SColVal *tColVal = &lastColVal->colVal; tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { From 977fc4b15cd93a1c7d5144e75237c5c3d4398ac5 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 14 Oct 2022 15:28:19 +0800 Subject: [PATCH 02/24] fix(tsdb/cache): release lock when table's empty --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 1615edb6ce..4088d35d3e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1311,6 +1311,8 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand // if table's empty or error, return code of -1 // if (code < 0 || pRow == NULL) { if (code < 0 || pLastArray == NULL) { + taosThreadMutexUnlock(&pTsdb->lruMutex); + *handle = NULL; return 0; } From b62fa20b53fd9640d92250ed59ce22e5e9821d3a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 17 Oct 2022 17:20:32 +0800 Subject: [PATCH 03/24] tsdb/cache: return last row as array of SLastCol --- source/dnode/vnode/src/tsdb/tsdbCache.c | 122 ++++++++++++++------ source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 3 +- 2 files changed, 91 insertions(+), 34 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 4088d35d3e..e27fc252a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -61,8 +61,6 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { *len = sizeof(uint64_t); } -static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); } - static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); } int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { @@ -75,13 +73,23 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { getTableCacheKey(uid, 0, key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { - STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); - if (pRow->ts <= eKey) { + SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); + bool invalidate = false; + int16_t nCol = taosArrayGetSize(pLast); + + for (int16_t iCol = 0; iCol < nCol; ++iCol) { + SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); + if (eKey >= tTsVal->ts) { + invalidate = true; + break; + } + } + + if (invalidate) { taosLRUCacheRelease(pCache, h, true); } else { taosLRUCacheRelease(pCache, h, false); } - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); } @@ -130,14 +138,23 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { getTableCacheKey(uid, 0, key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { - STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h); - if (pRow->ts <= eKey) { + SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); + bool invalidate = false; + int16_t nCol = taosArrayGetSize(pLast); + + for (int16_t iCol = 0; iCol < nCol; ++iCol) { + SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); + if (eKey >= tTsVal->ts) { + invalidate = true; + break; + } + } + + if (invalidate) { taosLRUCacheRelease(pCache, h, true); } else { taosLRUCacheRelease(pCache, h, false); } - - // void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen); } // getTableCacheKey(uid, "l", key, &keyLen); @@ -177,6 +194,46 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST getTableCacheKey(uid, 0, key, &keyLen); LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen); if (h) { + STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); + TSKEY keyTs = row->ts; + bool invalidate = false; + + SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h); + int16_t nCol = taosArrayGetSize(pLast); + int16_t iCol = 0; + + SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol); + if (keyTs > tTsVal->ts) { + STColumn *pTColumn = &pTSchema->columns[0]; + SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs}); + + taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal}); + } + + for (++iCol; iCol < nCol; ++iCol) { + SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol); + if (keyTs >= tTsVal1->ts) { + SColVal *tColVal = &tTsVal1->colVal; + + SColVal colVal = {0}; + tTSRowGetVal(row, pTSchema, iCol, &colVal); + if (!COL_VAL_IS_NONE(&colVal)) { + if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) { + invalidate = true; + + break; + } + } else { + taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal}); + } + } + } + + _invalidate: + taosMemoryFreeClear(pTSchema); + + taosLRUCacheRelease(pCache, h, invalidate); + /* cacheRow = (STSRow *)taosLRUCacheValue(pCache, h); if (row->ts >= cacheRow->ts) { if (row->ts == cacheRow->ts) { @@ -218,9 +275,9 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST if (status != TAOS_LRU_STATUS_OK) { code = -1; } - /* tsdbCacheInsertLastrow(pCache, uid, row, dup); */ + // tsdbCacheInsertLastrow(pCache, uid, row, dup); } - } + }*/ } /*else { if (dup) { cacheRow = tdRowDup(row); @@ -1031,7 +1088,7 @@ _err: return code; } -static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) { +static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) { int32_t code = 0; STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); @@ -1055,12 +1112,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo break; } + TSKEY rowTs = TSDBROW_TS(pRow); + if (lastRowTs == TSKEY_MAX) { - lastRowTs = TSDBROW_TS(pRow); + lastRowTs = rowTs; STColumn *pTColumn = &pTSchema->columns[0]; *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs}); - if (taosArrayPush(pColArray, pColVal) == NULL) { + if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -1068,7 +1127,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo for (iCol = 1; iCol < nCol; ++iCol) { tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - if (taosArrayPush(pColArray, pColVal) == NULL) { + if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -1079,15 +1138,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo } } if (!setNoneCol) { - // goto build the result ts row + // done, goto return pColArray break; } else { continue; } } - if ((TSDBROW_TS(pRow) < lastRowTs)) { - // goto build the result ts row + if ((rowTs < lastRowTs)) { + // done, goto return pColArray break; } @@ -1099,7 +1158,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { - taosArraySet(pColArray, iCol, pColVal); + taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); } else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) { noneCol = iCol; setNoneCol = true; @@ -1110,14 +1169,13 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo // build the result ts row here *dup = false; if (taosArrayGetSize(pColArray) == nCol) { - code = tdSTSRowNew(pColArray, pTSchema, ppRow); - if (code) goto _err; + *ppColArray = NULL; + taosArrayDestroy(pColArray); } else { - *ppRow = NULL; + *ppColArray = pColArray; } nextRowIterClose(&iter); - taosArrayDestroy(pColArray); taosMemoryFreeClear(pTSchema); return code; @@ -1178,7 +1236,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { } } if (!setNoneCol) { - // goto build the result ts row + // done, goto return pColArray break; } else { continue; @@ -1202,7 +1260,6 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { } } while (setNoneCol); - // build the result ts row here if (taosArrayGetSize(pColArray) <= 0) { *ppLastArray = NULL; taosArrayDestroy(pColArray); @@ -1233,13 +1290,13 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH h = taosLRUCacheLookup(pCache, key, keyLen); if (!h) { - STSRow *pRow = NULL; + SArray *pArray = NULL; bool dup = false; // which is always false for now - code = mergeLastRow(uid, pTsdb, &dup, &pRow); + code = mergeLastRow(uid, pTsdb, &dup, &pArray); // if table's empty or error, return code of -1 - if (code < 0 || pRow == NULL) { - if (!dup && pRow) { - taosMemoryFree(pRow); + if (code < 0 || pArray == NULL) { + if (!dup && pArray) { + taosArrayDestroy(pArray); } taosThreadMutexUnlock(&pTsdb->lruMutex); @@ -1249,9 +1306,9 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH return 0; } - _taos_lru_deleter_t deleter = deleteTableCacheLastrow; + _taos_lru_deleter_t deleter = deleteTableCacheLast; LRUStatus status = - taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW); + taosLRUCacheInsert(pCache, key, keyLen, pArray, pArray->capacity, deleter, NULL, TAOS_LRU_PRIORITY_LOW); if (status != TAOS_LRU_STATUS_OK) { code = -1; } @@ -1309,7 +1366,6 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand SArray *pLastArray = NULL; code = mergeLast(uid, pTsdb, &pLastArray); // if table's empty or error, return code of -1 - // if (code < 0 || pRow == NULL) { if (code < 0 || pLastArray == NULL) { taosThreadMutexUnlock(&pTsdb->lruMutex); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 81219e1442..d0d0563649 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -128,7 +128,8 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint // no data in the table of Uid if (*h != NULL) { - *pRow = (STSRow*)taosLRUCacheValue(lruCache, *h); + //*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h); + SArray* pLastrow = (SArray*)taosLRUCacheValue(lruCache, *h); } } else { code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); From da9df994615e5fd6634c829ecf4d8076c729a493 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Oct 2022 17:59:43 +0800 Subject: [PATCH 04/24] fix(query): opt last query. --- include/common/tcommon.h | 21 +++++ source/dnode/vnode/src/inc/tsdb.h | 5 ++ source/dnode/vnode/src/tsdb/tsdbCache.c | 5 -- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 97 +++++++++++++++++---- source/libs/function/src/builtinsimpl.c | 21 ----- 5 files changed, 106 insertions(+), 43 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index eaddf4e983..c32fe81335 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -78,6 +78,27 @@ typedef struct { int32_t exprIdx; } STupleKey; +typedef struct STuplePos { + union { + struct { + int32_t pageId; + int32_t offset; + }; + STupleKey streamTupleKey; + }; +} STuplePos; + +typedef struct SFirstLastRes { + bool hasResult; + // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, + // this attribute is required + bool isNull; + int32_t bytes; + int64_t ts; + STuplePos pos; + char buf[]; +} SFirstLastRes; + static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { STupleKey* pTuple1 = (STupleKey*)pKey1; STupleKey* pTuple2 = (STupleKey*)pKey2; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bf110f1ae3..91761663e8 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -306,6 +306,11 @@ int32_t tsdbMerge(STsdb *pTsdb); #define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) // tsdbCache ============================================================================================== +typedef struct { + TSKEY ts; + SColVal colVal; +} SLastCol; + int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index de26df6c54..b5624db51c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -15,11 +15,6 @@ #include "tsdb.h" -typedef struct { - TSKEY ts; - SColVal colVal; -} SLastCol; - int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t code = 0; SLRUCache *pCache = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index d0d0563649..2d4a1b54cd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -29,7 +29,7 @@ typedef struct SCacheRowsReader { SArray* pTableList; // table id list } SCacheRowsReader; -static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) { +static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; @@ -37,20 +37,35 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SFirstLastRes *pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + TSDB_KEYSIZE); + SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, i); + if (slotIds[i] == -1) { - colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false); + pRes->ts = pColVal->ts; + pRes->bytes = TSDB_KEYSIZE; + pRes->isNull = false; + pRes->hasResult = true; + + colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); } else { int32_t slotId = slotIds[i]; - tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal); + int32_t bytes = pReader->pSchema->columns[slotId].bytes; + pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes); + pRes->bytes = bytes; + pRes->hasResult = true; if (IS_VAR_DATA_TYPE(colVal.type)) { if (!COL_VAL_IS_VALUE(&colVal)) { - colDataAppendNULL(pColInfoData, numOfRows); + pRes->isNull = true; + pRes->ts = pColVal->ts; + + colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); } else { - varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData); - memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData); - colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); + varDataSetLen(pRes->buf, colVal.value.nData); + memcpy(varDataVal(pRes->buf), colVal.value.pData, colVal.value.nData); + pRes->bytes = colVal.value.nData; + colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); } } else { colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal)); @@ -117,7 +132,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { return NULL; } -static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow, +static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow, LRUHandle** h) { int32_t code = TSDB_CODE_SUCCESS; if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) { @@ -127,9 +142,8 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint } // no data in the table of Uid - if (*h != NULL) { - //*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h); - SArray* pLastrow = (SArray*)taosLRUCacheValue(lruCache, *h); + if (*h != NULL) { // todo convert to SArray + *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); } } else { code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); @@ -139,8 +153,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint // no data in the table of Uid if (*h != NULL) { - SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h); - tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema); + *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); } } @@ -157,13 +170,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 int32_t code = TSDB_CODE_SUCCESS; SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; LRUHandle* h = NULL; - STSRow* pRow = NULL; + SArray* pRow = NULL; size_t numOfTables = taosArrayGetSize(pr->pTableList); + int64_t* lastTs = taosMemoryMalloc(TSDB_KEYSIZE * pr->pSchema->numOfCols); + for(int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { + lastTs[i] = INT64_MIN; + } + // retrieve the only one last row of all tables in the uid list. if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) { - int64_t lastKey = INT64_MIN; - bool internalResult = false; + bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); @@ -176,7 +193,53 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 continue; } + { + SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + for(int32_t j = 0; j < pr->numOfCols; ++j) { + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes); + pRes[j]->ts = INT64_MIN; + } + + for (int32_t k = 0; k < pr->numOfCols; ++k) { + SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k); + + if (slotIds[k] == -1) { // the primary timestamp + SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, k); + if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + pRes[k]->hasResult = true; + pRes[k]->ts = pColVal->ts; + memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE); + + colDataAppend(pColInfoData, 1, (const char*)pRes[k], false); + } + } else { + int32_t slotId = slotIds[k]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); + + if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + pRes[k]->hasResult = true; + pRes[k]->ts = pColVal->ts; + + pRes[k]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); + if (!pRes[k]->isNull) { + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + varDataSetLen(pRes[k]->buf, pColVal->colVal.value.nData); + memcpy(varDataVal(pRes[k]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + } else { + memcpy(pRes[k]->buf, &pColVal->colVal.value, pr->pSchema->columns[slotId].bytes); + } + } + + colDataAppend(pColInfoData, 1, (const char*)pRes[k], false); + } + } + } + } + +/* if (pRow->ts > lastKey) { + printf("qualified:%ld, old Value:%ld\n", pRow->ts, lastKey); + // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // appended or not. if (internalResult) { @@ -189,7 +252,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 internalResult = true; lastKey = pRow->ts; } - +*/ tsdbCacheRelease(lruCache, h); } } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7077a9b780..493561ca1e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -57,16 +57,6 @@ typedef struct SAvgRes { int16_t type; // store the original input type, used in merge function } SAvgRes; -typedef struct STuplePos { - union { - struct { - int32_t pageId; - int32_t offset; - }; - STupleKey streamTupleKey; - }; -} STuplePos; - typedef struct SMinmaxResInfo { bool assign; // assign the first value or not int64_t v; @@ -93,17 +83,6 @@ typedef struct STopBotRes { STopBotResItem* pItems; } STopBotRes; -typedef struct SFirstLastRes { - bool hasResult; - // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, - // this attribute is required - bool isNull; - int32_t bytes; - int64_t ts; - STuplePos pos; - char buf[]; -} SFirstLastRes; - typedef struct SStddevRes { double result; int64_t count; From 109d50a4cc7af9cb0e5decef9b8b47a2b1f96850 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 17 Oct 2022 18:13:38 +0800 Subject: [PATCH 05/24] fix: last row array form bug --- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index b5624db51c..f67a5cc444 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1181,7 +1181,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo // build the result ts row here *dup = false; - if (taosArrayGetSize(pColArray) == nCol) { + if (taosArrayGetSize(pColArray) != nCol) { *ppColArray = NULL; taosArrayDestroy(pColArray); } else { From c8b180b129cf0155884c78d3e88762bd987aaf21 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Oct 2022 19:14:06 +0800 Subject: [PATCH 06/24] fix(query): update the cached last query. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 97 +++++++------------ source/libs/executor/src/cachescanoperator.c | 1 + source/libs/executor/src/timewindowoperator.c | 28 +++--- source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 93 ------------------ 5 files changed, 54 insertions(+), 167 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 2d4a1b54cd..be36848fec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -29,48 +29,37 @@ typedef struct SCacheRowsReader { SArray* pTableList; // table id list } SCacheRowsReader; -static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) { +static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, + SFirstLastRes** pRes) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; - SColVal colVal = {0}; for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - SFirstLastRes *pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + TSDB_KEYSIZE); - SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, i); - - if (slotIds[i] == -1) { - pRes->ts = pColVal->ts; - pRes->bytes = TSDB_KEYSIZE; - pRes->isNull = false; - pRes->hasResult = true; - - colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); + if (slotIds[i] == -1) { // the primary timestamp + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + pRes[i]->ts = pColVal->ts; + memcpy(pRes[i]->buf, &pColVal->ts, TSDB_KEYSIZE); } else { - int32_t slotId = slotIds[i]; + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); - int32_t bytes = pReader->pSchema->columns[slotId].bytes; - pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes); - pRes->bytes = bytes; - pRes->hasResult = true; + pRes[i]->ts = pColVal->ts; + pRes[i]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); - if (IS_VAR_DATA_TYPE(colVal.type)) { - if (!COL_VAL_IS_VALUE(&colVal)) { - pRes->isNull = true; - pRes->ts = pColVal->ts; - - colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); + if (!pRes[i]->isNull) { + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + varDataSetLen(pRes[i]->buf, pColVal->colVal.value.nData); + memcpy(varDataVal(pRes[i]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); } else { - varDataSetLen(pRes->buf, colVal.value.nData); - memcpy(varDataVal(pRes->buf), colVal.value.pData, colVal.value.nData); - pRes->bytes = colVal.value.nData; - colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); + memcpy(pRes[i]->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); } - } else { - colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal)); } } + + pRes[i]->hasResult = true; + colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); } pBlock->info.rows += 1; @@ -142,7 +131,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint } // no data in the table of Uid - if (*h != NULL) { // todo convert to SArray + if (*h != NULL) { *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); } } else { @@ -172,15 +161,16 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 LRUHandle* h = NULL; SArray* pRow = NULL; size_t numOfTables = taosArrayGetSize(pr->pTableList); + bool hasRes = false; - int64_t* lastTs = taosMemoryMalloc(TSDB_KEYSIZE * pr->pSchema->numOfCols); - for(int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { - lastTs[i] = INT64_MIN; + SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + for (int32_t j = 0; j < pr->numOfCols; ++j) { + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes); + pRes[j]->ts = INT64_MIN; } // retrieve the only one last row of all tables in the uid list. if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) { - bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); @@ -194,18 +184,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } { - SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); - for(int32_t j = 0; j < pr->numOfCols; ++j) { - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes); - pRes[j]->ts = INT64_MIN; - } - for (int32_t k = 0; k < pr->numOfCols; ++k) { SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k); - if (slotIds[k] == -1) { // the primary timestamp - SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, k); + if (slotIds[k] == -1) { // the primary timestamp + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + hasRes = true; pRes[k]->hasResult = true; pRes[k]->ts = pColVal->ts; memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE); @@ -217,6 +202,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + hasRes = true; pRes[k]->hasResult = true; pRes[k]->ts = pColVal->ts; @@ -236,25 +222,13 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } -/* - if (pRow->ts > lastKey) { - printf("qualified:%ld, old Value:%ld\n", pRow->ts, lastKey); - - // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already - // appended or not. - if (internalResult) { - pResBlock->info.rows -= 1; - taosArrayClear(pTableUidList); - } - - saveOneRow(pRow, pResBlock, pr, slotIds); - taosArrayPush(pTableUidList, &pKeyInfo->uid); - internalResult = true; - lastKey = pRow->ts; - } -*/ tsdbCacheRelease(lruCache, h); } + + if (hasRes) { + pResBlock->info.rows = 1; + } + } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); @@ -267,9 +241,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 continue; } - saveOneRow(pRow, pResBlock, pr, slotIds); - taosArrayPush(pTableUidList, &pKeyInfo->uid); + saveOneRow(pRow, pResBlock, pr, slotIds, pRes); + // TODO reset the pRes + taosArrayPush(pTableUidList, &pKeyInfo->uid); tsdbCacheRelease(lruCache, h); pr->tableIndex += 1; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 6106544a35..5d63892b7b 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -193,6 +193,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->currentGroupIndex += 1; // check for tag values + // TODO NOTE: The uid of pInfo->pRes is required. if (pInfo->pRes->info.rows > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c1478bc935..9c219d2765 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1420,8 +1420,9 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* pGpDatas = (uint64_t*)pGpCol->pData; for (int32_t i = 0; i < pBlock->info.rows; i++) { - SResultRowInfo dumyInfo; + SResultRowInfo dumyInfo = {0}; dumyInfo.cur.pageId = -1; + STimeWindow win = {0}; if (IS_FINAL_OP(pInfo)) { win.skey = startTsCols[i]; @@ -5828,27 +5829,30 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->isFinal = false; - if (pIntervalPhyNode->window.pExprs != NULL) { - int32_t numOfScalar = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - } + pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; initResultSizeInfo(&pOperator->resultInfo, 4096); SExprSupp* pSup = &pOperator->exprSupp; + + initBasicInfo(&pInfo->binfo, pResBlock); + initStreamFunciton(pSup->pCtx, pSup->numOfExprs); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initBasicInfo(&pInfo->binfo, pResBlock); - initStreamFunciton(pSup->pCtx, pSup->numOfExprs); - initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + if (pIntervalPhyNode->window.pExprs != NULL) { + int32_t numOfScalar = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } pInfo->invertible = allInvertible(pSup->pCtx, numOfCols); pInfo->invertible = false; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 2308aaf214..54e8c27d5b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2402,7 +2402,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, - .processFunc = cachedLastRowFunction, + .processFunc = lastFunctionMerge, .finalizeFunc = firstLastFinalize }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 493561ca1e..3255ef7d26 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6148,99 +6148,6 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } -int32_t interpFunction(SqlFunctionCtx* pCtx) { -#if 0 - int32_t fillType = (int32_t) pCtx->param[2].i64; - //bool ascQuery = (pCtx->order == TSDB_ORDER_ASC); - - if (pCtx->start.key == pCtx->startTs) { - assert(pCtx->start.key != INT64_MIN); - - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val); - - goto interp_success_exit; - } else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val); - - goto interp_success_exit; - } - - switch (fillType) { - case TSDB_FILL_NULL: - setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); - break; - - case TSDB_FILL_SET_VALUE: - tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true); - break; - - case TSDB_FILL_LINEAR: - if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs - || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { - goto interp_exit; - } - - double v1 = -1, v2 = -1; - GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); - GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); - - SPoint point1 = {.key = pCtx->start.key, .val = &v1}; - SPoint point2 = {.key = pCtx->end.key, .val = &v2}; - SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; - - int32_t srcType = pCtx->inputType; - if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { - setNull(pCtx->pOutput, srcType, pCtx->inputBytes); - } else { - bool exceedMax = false, exceedMin = false; - taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); - if (exceedMax || exceedMin) { - __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); - if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); - } else { - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); - } - } - } - break; - - case TSDB_FILL_PREV: - if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) { - goto interp_exit; - } - - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val); - break; - - case TSDB_FILL_NEXT: - if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { - goto interp_exit; - } - - COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val); - break; - - case TSDB_FILL_NONE: - // do nothing - default: - goto interp_exit; - } - - - interp_success_exit: - *(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs; - INC_INIT_VAL(pCtx, 1); - - interp_exit: - pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; - pCtx->endTs = pCtx->startTs; -#endif - - return TSDB_CODE_SUCCESS; -} - int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; From 0228caaa0d23944c3ff0eb1f842bbabcd6490257 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 18 Oct 2022 11:38:59 +0800 Subject: [PATCH 07/24] enh: last cache optimize --- include/libs/function/functionMgt.h | 3 + source/libs/function/src/builtins.c | 10 +++ source/libs/function/src/functionMgt.c | 6 ++ source/libs/planner/src/planOptimizer.c | 78 +++++++++++++++++-- source/libs/planner/test/planBasicTest.cpp | 18 ----- source/libs/planner/test/planOptimizeTest.cpp | 19 +++++ 6 files changed, 109 insertions(+), 25 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 569c16675d..6968f1712c 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -129,6 +129,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_TO_COLUMN, FUNCTION_TYPE_GROUP_KEY, FUNCTION_TYPE_CACHE_LAST_ROW, + FUNCTION_TYPE_CACHE_LAST, // distributed splitting functions FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000, @@ -216,6 +217,8 @@ bool fmIsKeepOrderFunc(int32_t funcId); bool fmIsCumulativeFunc(int32_t funcId); bool fmIsInterpPseudoColumnFunc(int32_t funcId); +void getLastCacheDataType(SDataType* pType); + int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); typedef enum EFuncDataRequired { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 54e8c27d5b..f03d8354fa 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2405,6 +2405,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = lastFunctionMerge, .finalizeFunc = firstLastFinalize }, + { + .name = "_cache_last", + .type = FUNCTION_TYPE_CACHE_LAST, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .translateFunc = translateFirstLast, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunctionMerge, + .finalizeFunc = firstLastFinalize + }, { .name = "_last_row_partial", .type = FUNCTION_TYPE_LAST_PARTIAL, diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index ca8ddbc60a..40af7bb567 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -16,6 +16,7 @@ #include "functionMgt.h" #include "builtins.h" +#include "builtinsimpl.h" #include "functionMgtInt.h" #include "taos.h" #include "taoserror.h" @@ -314,6 +315,11 @@ bool fmIsSameInOutType(int32_t funcId) { return res; } +void getLastCacheDataType(SDataType* pType) { + pType->bytes = getFirstLastInfoSize(pType->bytes) + VARSTR_HEADER_SIZE; + pType->type = TSDB_DATA_TYPE_BINARY; +} + static int32_t getFuncInfo(SFunctionNode* pFunc) { char msg[128] = {0}; return fmGetFuncInfo(pFunc, msg, sizeof(msg)); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 9a2cfa3339..0fc1be8a70 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2120,6 +2120,22 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef); } +static EDealRes lastRowScanOptHasTagImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + } + return DEAL_RES_CONTINUE; +} + +static bool lastRowScanOptHasTag(SNode* pExpr) { + bool hasTag = false; + nodesWalkExpr(pExpr, lastRowScanOptHasTagImpl, &hasTag); + return hasTag; +} + static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { @@ -2134,12 +2150,22 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { return false; } + bool hasLastFunc = false; + bool hasSelectFunc = false; SNode* pFunc = NULL; FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { - if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType && - FUNCTION_TYPE_LAST != ((SFunctionNode*)pFunc)->funcType && - FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType && - FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pFunc)->funcType) { + SFunctionNode* pAggFunc = (SFunctionNode*)pFunc; + if (FUNCTION_TYPE_LAST == pAggFunc->funcType) { + if (hasSelectFunc || lastRowScanOptHasTag(nodesListGetNode(pAggFunc->pParameterList, 0))) { + return false; + } + hasLastFunc = true; + } else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) { + if (hasLastFunc) { + return false; + } + hasSelectFunc = true; + } else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) { return false; } } @@ -2147,6 +2173,31 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { return true; } +typedef struct SLastRowScanOptSetColDataTypeCxt { + bool doAgg; + SNodeList* pLastCols; +} SLastRowScanOptSetColDataTypeCxt; + +static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SLastRowScanOptSetColDataTypeCxt* pCxt = pContext; + if (pCxt->doAgg) { + nodesListMakeAppend(&pCxt->pLastCols, pNode); + getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType)); + } else { + SNode* pCol = NULL; + FOREACH(pCol, pCxt->pLastCols) { + if (nodesEqualNode(pCol, pNode)) { + getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType)); + break; + } + } + } + return DEAL_RES_IGNORE_CHILD; + } + return DEAL_RES_CONTINUE; +} + static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized); @@ -2154,22 +2205,35 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic return TSDB_CODE_SUCCESS; } - SNode* pNode = NULL; + SLastRowScanOptSetColDataTypeCxt cxt = {.doAgg = true, .pLastCols = NULL}; + SNode* pNode = NULL; FOREACH(pNode, pAgg->pAggFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pNode; - if (FUNCTION_TYPE_LAST_ROW == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) { - int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row"); + int32_t funcType = pFunc->funcType; + if (FUNCTION_TYPE_LAST_ROW == funcType || FUNCTION_TYPE_LAST == funcType) { + int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), + FUNCTION_TYPE_LAST_ROW == funcType ? "_cache_last_row" : "_cache_last"); pFunc->functionName[len] = '\0'; int32_t code = fmGetFuncInfo(pFunc, NULL, 0); if (TSDB_CODE_SUCCESS != code) { + nodesClearList(cxt.pLastCols); return code; } + if (FUNCTION_TYPE_LAST == funcType) { + nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt); + } } } SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0); pScan->scanType = SCAN_TYPE_LAST_ROW; pScan->igLastNull = pAgg->hasLast ? true : false; + if (NULL != cxt.pLastCols) { + cxt.doAgg = false; + nodesWalkExprs(pScan->pScanCols, lastRowScanOptSetColDataType, &cxt); + nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt); + nodesClearList(cxt.pLastCols); + } pAgg->hasLastRow = false; pAgg->hasLast = false; diff --git a/source/libs/planner/test/planBasicTest.cpp b/source/libs/planner/test/planBasicTest.cpp index 0baec147a2..c5a9a447c7 100644 --- a/source/libs/planner/test/planBasicTest.cpp +++ b/source/libs/planner/test/planBasicTest.cpp @@ -105,24 +105,6 @@ TEST_F(PlanBasicTest, interpFunc) { run("SELECT _IROWTS, INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"); } -TEST_F(PlanBasicTest, lastRowFunc) { - useDb("root", "cache_db"); - - run("SELECT LAST_ROW(c1) FROM t1"); - - run("SELECT LAST_ROW(*) FROM t1"); - - run("SELECT LAST_ROW(c1, c2) FROM t1"); - - run("SELECT LAST_ROW(c1), c2 FROM t1"); - - run("SELECT LAST_ROW(c1) FROM st1"); - - run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME"); - - run("SELECT LAST_ROW(c1), SUM(c3) FROM t1"); -} - TEST_F(PlanBasicTest, lastRowFuncWithoutCache) { useDb("root", "test"); diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index c2a0aee847..fb4f32a9bf 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -109,9 +109,28 @@ TEST_F(PlanOptimizeTest, mergeProjects) { TEST_F(PlanOptimizeTest, pushDownProjectCond) { useDb("root", "test"); + run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first"); } +TEST_F(PlanOptimizeTest, LastRowScan) { + useDb("root", "cache_db"); + + run("SELECT LAST_ROW(c1), c2 FROM t1"); + + run("SELECT LAST_ROW(c1), c2, tag1, tbname FROM st1"); + + run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME"); + + run("SELECT LAST_ROW(c1), SUM(c3) FROM t1"); + + run("SELECT LAST_ROW(tag1) FROM st1"); + + run("SELECT LAST(c1) FROM st1"); + + run("SELECT LAST(c1), c2 FROM st1"); +} + TEST_F(PlanOptimizeTest, tagScan) { useDb("root", "test"); run("select tag1 from st1 group by tag1"); From 60e7f2800bc350f53333efd0c341bab0febf5dc7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Oct 2022 17:39:28 +0800 Subject: [PATCH 08/24] fix(query): fix bug in cached last query. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 151 ++++++++++++-------- source/libs/planner/src/planOptimizer.c | 1 + 2 files changed, 95 insertions(+), 57 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index be36848fec..791c73047c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -30,36 +30,69 @@ typedef struct SCacheRowsReader { } SCacheRowsReader; static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, - SFirstLastRes** pRes) { + void** pRes) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; - for (int32_t i = 0; i < pReader->numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if ((pReader->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) { + for (int32_t i = 0; i < pReader->numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]); - if (slotIds[i] == -1) { // the primary timestamp - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); - pRes[i]->ts = pColVal->ts; - memcpy(pRes[i]->buf, &pColVal->ts, TSDB_KEYSIZE); - } else { - int32_t slotId = slotIds[i]; - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); + if (slotIds[i] == -1) { // the primary timestamp + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + p->ts = pColVal->ts; + p->bytes = TSDB_KEYSIZE; + *(int64_t*)p->buf = pColVal->ts; + } else { + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); - pRes[i]->ts = pColVal->ts; - pRes[i]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); + p->ts = pColVal->ts; + p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); + if (!p->isNull) { + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + varDataSetLen(p->buf, pColVal->colVal.value.nData); + memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + p->bytes = pColVal->colVal.value.nData; + } else { + memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); + p->bytes = pReader->pSchema->columns[slotId].bytes; + } + } + } - if (!pRes[i]->isNull) { - if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - varDataSetLen(pRes[i]->buf, pColVal->colVal.value.nData); - memcpy(varDataVal(pRes[i]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + p->hasResult = true; + varDataSetLen(pRes[i], pColInfoData->info.bytes); + colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); + } + } else { + ASSERT((pReader->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); + + SColVal colVal = {0}; + for (int32_t i = 0; i < pReader->numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + if (slotIds[i] == -1) { + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + colDataAppend(pColInfoData, numOfRows, (const char*)&pColVal->ts, false); + } else { + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); + + if (IS_VAR_DATA_TYPE(colVal.type)) { + if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { + colDataAppendNULL(pColInfoData, numOfRows); + } else { + varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData); + memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData); + colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); + } } else { - memcpy(pRes[i]->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); + colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&pColVal->colVal)); } } } - - pRes[i]->hasResult = true; - colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); } pBlock->info.rows += 1; @@ -126,24 +159,17 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint int32_t code = TSDB_CODE_SUCCESS; if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) { code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // no data in the table of Uid - if (*h != NULL) { - *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); - } } else { code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + } - // no data in the table of Uid - if (*h != NULL) { - *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); - } + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // no data in the table of Uid + if (*h != NULL) { + *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); } return code; @@ -163,10 +189,26 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 size_t numOfTables = taosArrayGetSize(pr->pTableList); bool hasRes = false; - SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); for (int32_t j = 0; j < pr->numOfCols; ++j) { - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes); - pRes[j]->ts = INT64_MIN; + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE); + SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); + p->ts = INT64_MIN; + } + + SArray* pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol)); + for (int32_t i = 0; i < pr->numOfCols; ++i) { + SLastCol p = {0}; + p.ts = INT64_MIN; + + struct STColumn* pCol = &pr->pSchema->columns[i]; + p.colVal.type = pCol->type; + + if (IS_VAR_DATA_TYPE(pCol->type)) { + p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char)); + } + + taosArrayPush(pLastCols, &p); } // retrieve the only one last row of all tables in the uid list. @@ -185,38 +227,32 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 { for (int32_t k = 0; k < pr->numOfCols; ++k) { - SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k); - + SLastCol* p = taosArrayGet(pLastCols, k); if (slotIds[k] == -1) { // the primary timestamp - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); - if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, k); + if (pCol->ts > p->ts) { hasRes = true; - pRes[k]->hasResult = true; - pRes[k]->ts = pColVal->ts; - memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE); - - colDataAppend(pColInfoData, 1, (const char*)pRes[k], false); + p->ts = pCol->ts; + p->colVal = pCol->colVal; } } else { int32_t slotId = slotIds[k]; SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); - if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + if (pColVal->ts > p->ts) { hasRes = true; - pRes[k]->hasResult = true; - pRes[k]->ts = pColVal->ts; + p->ts = pColVal->ts; - pRes[k]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); - if (!pRes[k]->isNull) { + if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - varDataSetLen(pRes[k]->buf, pColVal->colVal.value.nData); - memcpy(varDataVal(pRes[k]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + uint8_t* px = p->colVal.value.pData; + p->colVal = pColVal->colVal; + p->colVal.value.pData = px; + memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData); } else { - memcpy(pRes[k]->buf, &pColVal->colVal.value, pr->pSchema->columns[slotId].bytes); + p->colVal = pColVal->colVal; } } - - colDataAppend(pColInfoData, 1, (const char*)pRes[k], false); } } } @@ -227,6 +263,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 if (hasRes) { pResBlock->info.rows = 1; + saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes); } } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 0fc1be8a70..7a145e7dde 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2232,6 +2232,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic cxt.doAgg = false; nodesWalkExprs(pScan->pScanCols, lastRowScanOptSetColDataType, &cxt); nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt); + nodesWalkExprs(pScan->node.pTargets, lastRowScanOptSetColDataType, &cxt); nodesClearList(cxt.pLastCols); } pAgg->hasLastRow = false; From e4940e82c53ac6d616077768438fe15189705d6d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Oct 2022 17:54:29 +0800 Subject: [PATCH 09/24] fix(query): update last_row callback function. --- source/libs/function/src/builtins.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index f03d8354fa..4644b15dea 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2402,7 +2402,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, - .processFunc = lastFunctionMerge, + .processFunc = lastRowFunction, .finalizeFunc = firstLastFinalize }, { From d7e01b30c7ba5e67bc71b1387d11376347daecba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Oct 2022 19:13:42 +0800 Subject: [PATCH 10/24] fix(query): update last_row callback function. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 791c73047c..38c10e8354 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -54,7 +54,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { varDataSetLen(p->buf, pColVal->colVal.value.nData); memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); - p->bytes = pColVal->colVal.value.nData; + p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; } else { memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); p->bytes = pReader->pSchema->columns[slotId].bytes; From c7c99ae34634be4ec6852bdc6f6961c132264c8b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Oct 2022 09:33:54 +0800 Subject: [PATCH 11/24] fix(query): update last_row callback function. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 27 +++++++++++++++++---- source/libs/function/src/builtins.c | 2 +- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 38c10e8354..63c889985a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -188,15 +188,26 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 SArray* pRow = NULL; size_t numOfTables = taosArrayGetSize(pr->pTableList); bool hasRes = false; + SArray* pLastCols = NULL; void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + if (pRes == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + for (int32_t j = 0; j < pr->numOfCols; ++j) { pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); p->ts = INT64_MIN; } - SArray* pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol)); + pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol)); + if (pLastCols == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + for (int32_t i = 0; i < pr->numOfCols; ++i) { SLastCol p = {0}; p.ts = INT64_MIN; @@ -262,7 +273,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } if (hasRes) { - pResBlock->info.rows = 1; saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes); } @@ -286,12 +296,19 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 pr->tableIndex += 1; if (pResBlock->info.rows >= pResBlock->info.capacity) { - return TSDB_CODE_SUCCESS; + goto _end; } } } else { - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; } - return TSDB_CODE_SUCCESS; + _end: + for (int32_t j = 0; j < pr->numOfCols; ++j) { + taosMemoryFree(pRes[j]); + } + + taosMemoryFree(pRes); + taosArrayDestroy(pLastCols); + return code; } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4644b15dea..ce440a525a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2402,7 +2402,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, - .processFunc = lastRowFunction, + .processFunc = cachedLastRowFunction, .finalizeFunc = firstLastFinalize }, { From 5dcd319d3b566b16e25ae4ee83c982de133a9a79 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Oct 2022 10:07:20 +0800 Subject: [PATCH 12/24] fix(query): update last_row callback function. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 63c889985a..53f8d5be16 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -69,7 +69,6 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } else { ASSERT((pReader->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); - SColVal colVal = {0}; for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -79,17 +78,18 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } else { int32_t slotId = slotIds[i]; SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); + SColVal* pVal = &pColVal->colVal; - if (IS_VAR_DATA_TYPE(colVal.type)) { + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { colDataAppendNULL(pColInfoData, numOfRows); } else { - varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData); - memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData); + varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData); + memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData); colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); } } else { - colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&pColVal->colVal)); + colDataAppend(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal)); } } } @@ -157,6 +157,8 @@ void* tsdbCacherowsReaderClose(void* pReader) { static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow, LRUHandle** h) { int32_t code = TSDB_CODE_SUCCESS; + *pRow = NULL; + if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) { code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h); } else { @@ -208,12 +210,9 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - for (int32_t i = 0; i < pr->numOfCols; ++i) { - SLastCol p = {0}; - p.ts = INT64_MIN; - + for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { struct STColumn* pCol = &pr->pSchema->columns[i]; - p.colVal.type = pCol->type; + SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type}; if (IS_VAR_DATA_TYPE(pCol->type)) { p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char)); @@ -254,7 +253,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 hasRes = true; p->ts = pColVal->ts; - if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { + if (COL_VAL_IS_VALUE(&pColVal->colVal)) { if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { uint8_t* px = p->colVal.value.pData; p->colVal = pColVal->colVal; From 467c6386edd71c116e99acec2091650bff9e4972 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 19 Oct 2022 11:06:08 +0800 Subject: [PATCH 13/24] fix: last_row array element type --- source/dnode/vnode/src/tsdb/tsdbCache.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index f67a5cc444..7f664f9e37 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1109,7 +1109,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo int16_t iCol = 0; int16_t noneCol = 0; bool setNoneCol = false; - SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal)); + SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol)); SColVal *pColVal = &(SColVal){0}; TSKEY lastRowTs = TSKEY_MAX; @@ -1140,7 +1140,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo for (iCol = 1; iCol < nCol; ++iCol) { tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) { + SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal}; + /* + if (IS_VAR_DATA_TYPE(pColVal->type)) { + lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + */ + if (taosArrayPush(pColArray, &lastCol) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } From 12fd11c481b21dbb48194c53609674d25c8341cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Oct 2022 11:41:30 +0800 Subject: [PATCH 14/24] fix(query): fix syntax error. --- source/libs/executor/src/timewindowoperator.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index db9ebf12c2..1bcf274460 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5462,9 +5462,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys } SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; + int32_t code = TSDB_CODE_SUCCESS; int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); ASSERT(numOfCols > 0); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SInterval interval = { .interval = pIntervalPhyNode->interval, @@ -5474,6 +5476,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .offset = pIntervalPhyNode->offset, .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, }; + STimeWindowAggSupp twAggSupp = { .waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, @@ -5481,6 +5484,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .minTs = INT64_MAX, .deleteMark = INT64_MAX, }; + ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY); pOperator->pTaskInfo = pTaskInfo; pInfo->interval = interval; @@ -5502,7 +5506,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys } size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } From a1c18bba33dc550dc5f6690cc4cf7f4212023482 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 19 Oct 2022 13:58:59 +0800 Subject: [PATCH 15/24] fix: cache load calculation --- source/dnode/vnode/src/tsdb/tsdbCache.c | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 7f664f9e37..d3051f78c7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1328,9 +1328,9 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH return 0; } + size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray); _taos_lru_deleter_t deleter = deleteTableCacheLast; - LRUStatus status = - taosLRUCacheInsert(pCache, key, keyLen, pArray, pArray->capacity, deleter, NULL, TAOS_LRU_PRIORITY_LOW); + LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW); if (status != TAOS_LRU_STATUS_OK) { code = -1; } @@ -1395,9 +1395,10 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand return 0; } + size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray); _taos_lru_deleter_t deleter = deleteTableCacheLast; - LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pLastArray, pLastArray->capacity, deleter, NULL, - TAOS_LRU_PRIORITY_LOW); + LRUStatus status = + taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW); if (status != TAOS_LRU_STATUS_OK) { code = -1; } From ceec953498ad481a885c835644921c47c785e625 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 19 Oct 2022 14:50:36 +0800 Subject: [PATCH 16/24] fix: copy binary type column data out --- source/dnode/vnode/src/tsdb/tsdbCache.c | 46 +++++++++++++++++++++---- 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index d3051f78c7..d4cca40b89 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -56,7 +56,18 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) { *len = sizeof(uint64_t); } -static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); } +static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { + SArray *pLastArray = (SArray *)value; + int16_t nCol = taosArrayGetSize(pLastArray); + for (int16_t iCol = 0; iCol < nCol; ++iCol) { + SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLastArray, iCol); + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + taosMemoryFree(pLastCol->colVal.value.pData); + } + } + + taosArrayDestroy(value); +} int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) { int32_t code = 0; @@ -1141,12 +1152,11 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal}; - /* if (IS_VAR_DATA_TYPE(pColVal->type)) { lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); } - */ + if (taosArrayPush(pColArray, &lastCol) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -1178,7 +1188,16 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { - taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); + SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); + taosMemoryFree(pLastCol->colVal.value.pData); + + lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + taosArraySet(pColArray, iCol, &lastCol); } else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) { noneCol = iCol; setNoneCol = true; @@ -1245,7 +1264,13 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { for (iCol = 1; iCol < nCol; ++iCol) { tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); - if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) { + SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + if (taosArrayPush(pColArray, &lastCol) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -1272,7 +1297,16 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { - taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal}); + SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type)) { + SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); + taosMemoryFree(pLastCol->colVal.value.pData); + + lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + taosArraySet(pColArray, iCol, &lastCol); } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { noneCol = iCol; setNoneCol = true; From 640338893df2ea7feb5cfe58f9b97b87cff4723a Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 19 Oct 2022 15:01:49 +0800 Subject: [PATCH 17/24] tsdb/cache: handle OOM exceptions --- source/dnode/vnode/src/tsdb/tsdbCache.c | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index d4cca40b89..e2bc9d0bec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1144,6 +1144,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs}); if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -1154,6 +1155,11 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->type)) { lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + if (lastCol.colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); } @@ -1194,6 +1200,11 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo taosMemoryFree(pLastCol->colVal.value.pData); lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + if (lastCol.colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); } @@ -1267,6 +1278,11 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal}; if (IS_VAR_DATA_TYPE(pColVal->type)) { lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + if (lastCol.colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); } @@ -1303,6 +1319,11 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { taosMemoryFree(pLastCol->colVal.value.pData); lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + if (lastCol.colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); } From 116d1451e4c3fb4488d29c4b79a007548ad6087a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Oct 2022 15:43:58 +0800 Subject: [PATCH 18/24] fix(query): do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 35 +++++++++++++------ tests/script/tsim/parser/last_cache_query.sim | 5 +-- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 53f8d5be16..99c314fa45 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -29,12 +29,14 @@ typedef struct SCacheRowsReader { SArray* pTableList; // table id list } SCacheRowsReader; +#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t)) + static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, void** pRes) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; - if ((pReader->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) { + if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]); @@ -54,7 +56,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { varDataSetLen(p->buf, pColVal->colVal.value.nData); memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); - p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; + p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size } else { memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); p->bytes = pReader->pSchema->columns[slotId].bytes; @@ -62,12 +64,13 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } } + // pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it p->hasResult = true; - varDataSetLen(pRes[i], pColInfoData->info.bytes); + varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE); colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); } } else { - ASSERT((pReader->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); + ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)); for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -177,6 +180,13 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint return code; } +static void freeItem(void* pItem) { + SLastCol* pCol = (SLastCol*) pItem; + if (IS_VAR_DATA_TYPE(pCol->colVal.type)) { + taosMemoryFree(pCol->colVal.value.pData); + } +} + int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; @@ -217,7 +227,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 if (IS_VAR_DATA_TYPE(pCol->type)) { p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char)); } - taosArrayPush(pLastCols, &p); } @@ -237,19 +246,25 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 { for (int32_t k = 0; k < pr->numOfCols; ++k) { - SLastCol* p = taosArrayGet(pLastCols, k); - if (slotIds[k] == -1) { // the primary timestamp - SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, k); + int32_t slotId = slotIds[k]; + + if (slotId == -1) { // the primary timestamp + SLastCol* p = taosArrayGet(pLastCols, 0); + SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0); if (pCol->ts > p->ts) { hasRes = true; p->ts = pCol->ts; p->colVal = pCol->colVal; } } else { - int32_t slotId = slotIds[k]; + SLastCol* p = taosArrayGet(pLastCols, slotId); SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); if (pColVal->ts > p->ts) { + if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) { + continue; + } + hasRes = true; p->ts = pColVal->ts; @@ -308,6 +323,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } taosMemoryFree(pRes); - taosArrayDestroy(pLastCols); + taosArrayDestroyEx(pLastCols, freeItem); return code; } diff --git a/tests/script/tsim/parser/last_cache_query.sim b/tests/script/tsim/parser/last_cache_query.sim index f32435960c..6cd5309590 100644 --- a/tests/script/tsim/parser/last_cache_query.sim +++ b/tests/script/tsim/parser/last_cache_query.sim @@ -39,6 +39,7 @@ if $data02 != 5.000000000 then return -1 endi if $data03 != 3 then + print expect 3, actual: $data03 return -1 endi if $data04 != @70-01-01 07:59:57.000@ then @@ -210,7 +211,7 @@ if $data01 != 6 then return -1 endi if $data02 != 37.000000000 then - print $data02 + print expect 37.000000000 actual: $data02 return -1 endi if $data03 != 27 then @@ -233,7 +234,7 @@ if $data01 != 6 then return -1 endi if $data02 != 37.000000000 then - print $data02 + print expect 37.000000000, acutal: $data02 return -1 endi if $data03 != 27 then From 325712fdf9454ffe19dca9a8333c656483f44408 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Oct 2022 15:54:43 +0800 Subject: [PATCH 19/24] fix(query): add some init functions. --- source/libs/executor/src/timewindowoperator.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1bcf274460..8792569f14 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5492,9 +5492,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->isFinal = false; + SExprSupp* pSup = &pOperator->exprSupp; + initBasicInfo(&pInfo->binfo, pResBlock); + initStreamFunciton(pSup->pCtx, pSup->numOfExprs); + initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); + pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; initResultSizeInfo(&pOperator->resultInfo, 4096); - SExprSupp* pSup = &pOperator->exprSupp; if (pIntervalPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0; From 35e829dc28e4b262d9661a610d236c70942c9cb5 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 19 Oct 2022 15:58:51 +0800 Subject: [PATCH 20/24] fix: cache inapplicability --- source/libs/planner/src/planOptimizer.c | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f5d32796b2..2e2e4e6dcf 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2155,6 +2155,22 @@ static bool lastRowScanOptHasTag(SNode* pExpr) { return hasTag; } +static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) { + switch (cacheLastMode) { + case TSDB_CACHE_MODEL_NONE: + return false; + case TSDB_CACHE_MODEL_LAST_ROW: + return hasLastRow; + case TSDB_CACHE_MODEL_LAST_VALUE: + return hasLast; + case TSDB_CACHE_MODEL_BOTH: + return true; + default: + break; + } + return false; +} + static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) || QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { @@ -2165,7 +2181,8 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0); // Only one of LAST and LASTROW can appear if (pAgg->hasLastRow == pAgg->hasLast || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions || - 0 == pScan->cacheLastMode || IS_TSWINDOW_SPECIFIED(pScan->scanRange)) { + !hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) || + IS_TSWINDOW_SPECIFIED(pScan->scanRange)) { return false; } From 6dea891571dca633d930e24428b18ff9dc499674 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 19 Oct 2022 16:07:53 +0800 Subject: [PATCH 21/24] fix: release data file reader --- source/dnode/vnode/src/tsdb/tsdbCache.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index e2bc9d0bec..96504f343a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -519,6 +519,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { return code; } + if (state->pDataFReader != NULL) { + tsdbDataFReaderClose(&state->pDataFReader); + state->pDataFReader = NULL; + } + code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); if (code) goto _err; @@ -662,6 +667,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { */ state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ); if (!state->pBlockIdx) { + tsdbDataFReaderClose(&state->pDataFReader); + state->pDataFReader = NULL; goto _next_fileset; } From b20e2ed5afe9fc7f37fdf03686f8ca2c723b4122 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Oct 2022 17:43:23 +0800 Subject: [PATCH 22/24] fix(query): set correct last value. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 30 ++++++++++++-------- source/libs/executor/src/cachescanoperator.c | 17 ++++++----- tests/system-test/2-query/last_row.py | 2 +- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 99c314fa45..1437811521 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -162,7 +162,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint int32_t code = TSDB_CODE_SUCCESS; *pRow = NULL; - if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) { + if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) { code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h); } else { code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); @@ -231,7 +231,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } // retrieve the only one last row of all tables in the uid list. - if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) { + if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); @@ -255,6 +255,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 hasRes = true; p->ts = pCol->ts; p->colVal = pCol->colVal; + + // only set value for last row query + if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) { + if (taosArrayGetSize(pTableUidList) == 0) { + taosArrayPush(pTableUidList, &pKeyInfo->uid); + } else { + taosArraySet(pTableUidList, 0, &pKeyInfo->uid); + } + } } } else { SLastCol* p = taosArrayGet(pLastCols, slotId); @@ -268,15 +277,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 hasRes = true; p->ts = pColVal->ts; - if (COL_VAL_IS_VALUE(&pColVal->colVal)) { - if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - uint8_t* px = p->colVal.value.pData; - p->colVal = pColVal->colVal; - p->colVal.value.pData = px; - memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData); - } else { - p->colVal = pColVal->colVal; - } + uint8_t* px = p->colVal.value.pData; + p->colVal = pColVal->colVal; + + if (COL_VAL_IS_VALUE(&pColVal->colVal) && IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + p->colVal.value.pData = px; + memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData); } } } @@ -290,7 +296,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes); } - } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { + } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 5d63892b7b..814f3871db 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -193,20 +193,23 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->currentGroupIndex += 1; // check for tag values - // TODO NOTE: The uid of pInfo->pRes is required. if (pInfo->pRes->info.rows > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; - pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); pInfo->pRes->info.groupId = pKeyInfo->groupId; - code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, - GET_TASKID(pTaskInfo)); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; - return NULL; + if (taosArrayGetSize(pInfo->pUidList) > 0) { + ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); + + pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); + code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = code; + return NULL; + } } } diff --git a/tests/system-test/2-query/last_row.py b/tests/system-test/2-query/last_row.py index 5d435b068f..6286852641 100644 --- a/tests/system-test/2-query/last_row.py +++ b/tests/system-test/2-query/last_row.py @@ -13,7 +13,7 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor(), False) + tdSql.init(conn.cursor(), True) self.tb_nums = 10 self.row_nums = 20 self.ts = 1434938400000 From 60b85028afe7ca9b609d47f2afafd1d24c2f7bb7 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 19 Oct 2022 18:53:51 +0800 Subject: [PATCH 23/24] fix: do not copy binary when nData is 0 --- source/dnode/vnode/src/tsdb/tsdbCache.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 96504f343a..bb394e8acc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -61,7 +61,7 @@ static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { int16_t nCol = taosArrayGetSize(pLastArray); for (int16_t iCol = 0; iCol < nCol; ++iCol) { SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLastArray, iCol); - if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) { + if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) { taosMemoryFree(pLastCol->colVal.value.pData); } } @@ -1160,7 +1160,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); if (lastCol.colVal.value.pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1202,7 +1202,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); taosMemoryFree(pLastCol->colVal.value.pData); @@ -1283,7 +1283,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); if (lastCol.colVal.value.pData == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1321,7 +1321,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) { tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; - if (IS_VAR_DATA_TYPE(pColVal->type)) { + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); taosMemoryFree(pLastCol->colVal.value.pData); From b3658f983f4ceecb7f789c49c823039fa2b21217 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 20 Oct 2022 14:35:34 +0800 Subject: [PATCH 24/24] fix: last constant error --- source/libs/planner/src/planOptimizer.c | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 2e2e4e6dcf..3171bb531b 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2139,20 +2139,28 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef); } -static EDealRes lastRowScanOptHasTagImpl(SNode* pNode, void* pContext) { +typedef struct SLastRowScanOptLastParaCkCxt { + bool hasTag; + bool hasCol; +} SLastRowScanOptLastParaCkCxt; + +static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SLastRowScanOptLastParaCkCxt* pCxt = pContext; if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) { - *(bool*)pContext = true; - return DEAL_RES_END; + pCxt->hasTag = true; + } else { + pCxt->hasCol = true; } + return DEAL_RES_END; } return DEAL_RES_CONTINUE; } -static bool lastRowScanOptHasTag(SNode* pExpr) { - bool hasTag = false; - nodesWalkExpr(pExpr, lastRowScanOptHasTagImpl, &hasTag); - return hasTag; +static bool lastRowScanOptLastParaCheck(SNode* pExpr) { + SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false}; + nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt); + return !cxt.hasTag && cxt.hasCol; } static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) { @@ -2192,7 +2200,7 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { SFunctionNode* pAggFunc = (SFunctionNode*)pFunc; if (FUNCTION_TYPE_LAST == pAggFunc->funcType) { - if (hasSelectFunc || lastRowScanOptHasTag(nodesListGetNode(pAggFunc->pParameterList, 0))) { + if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) { return false; } hasLastFunc = true;