diff --git a/include/common/tcommon.h b/include/common/tcommon.h index eaddf4e983..c32fe81335 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -78,6 +78,27 @@ typedef struct { int32_t exprIdx; } STupleKey; +typedef struct STuplePos { + union { + struct { + int32_t pageId; + int32_t offset; + }; + STupleKey streamTupleKey; + }; +} STuplePos; + +typedef struct SFirstLastRes { + bool hasResult; + // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, + // this attribute is required + bool isNull; + int32_t bytes; + int64_t ts; + STuplePos pos; + char buf[]; +} SFirstLastRes; + static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { STupleKey* pTuple1 = (STupleKey*)pKey1; STupleKey* pTuple2 = (STupleKey*)pKey2; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bf110f1ae3..91761663e8 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -306,6 +306,11 @@ int32_t tsdbMerge(STsdb *pTsdb); #define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0) // tsdbCache ============================================================================================== +typedef struct { + TSKEY ts; + SColVal colVal; +} SLastCol; + int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index de26df6c54..b5624db51c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -15,11 +15,6 @@ #include "tsdb.h" -typedef struct { - TSKEY ts; - SColVal colVal; -} SLastCol; - int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t code = 0; SLRUCache *pCache = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index d0d0563649..2d4a1b54cd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -29,7 +29,7 @@ typedef struct SCacheRowsReader { SArray* pTableList; // table id list } SCacheRowsReader; -static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) { +static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; @@ -37,20 +37,35 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SFirstLastRes *pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + TSDB_KEYSIZE); + SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, i); + if (slotIds[i] == -1) { - colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false); + pRes->ts = pColVal->ts; + pRes->bytes = TSDB_KEYSIZE; + pRes->isNull = false; + pRes->hasResult = true; + + colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); } else { int32_t slotId = slotIds[i]; - tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal); + int32_t bytes = pReader->pSchema->columns[slotId].bytes; + pRes = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes); + pRes->bytes = bytes; + pRes->hasResult = true; if (IS_VAR_DATA_TYPE(colVal.type)) { if (!COL_VAL_IS_VALUE(&colVal)) { - colDataAppendNULL(pColInfoData, numOfRows); + pRes->isNull = true; + pRes->ts = pColVal->ts; + + colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); } else { - varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData); - memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData); - colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); + varDataSetLen(pRes->buf, colVal.value.nData); + memcpy(varDataVal(pRes->buf), colVal.value.pData, colVal.value.nData); + pRes->bytes = colVal.value.nData; + colDataAppend(pColInfoData, numOfRows, (const char*)pRes, false); } } else { colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal)); @@ -117,7 +132,7 @@ void* tsdbCacherowsReaderClose(void* pReader) { return NULL; } -static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow, +static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow, LRUHandle** h) { int32_t code = TSDB_CODE_SUCCESS; if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) { @@ -127,9 +142,8 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint } // no data in the table of Uid - if (*h != NULL) { - //*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h); - SArray* pLastrow = (SArray*)taosLRUCacheValue(lruCache, *h); + if (*h != NULL) { // todo convert to SArray + *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); } } else { code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); @@ -139,8 +153,7 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint // no data in the table of Uid if (*h != NULL) { - SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h); - tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema); + *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); } } @@ -157,13 +170,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 int32_t code = TSDB_CODE_SUCCESS; SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; LRUHandle* h = NULL; - STSRow* pRow = NULL; + SArray* pRow = NULL; size_t numOfTables = taosArrayGetSize(pr->pTableList); + int64_t* lastTs = taosMemoryMalloc(TSDB_KEYSIZE * pr->pSchema->numOfCols); + for(int32_t i = 0; i < pr->pSchema->numOfCols; ++i) { + lastTs[i] = INT64_MIN; + } + // retrieve the only one last row of all tables in the uid list. if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) { - int64_t lastKey = INT64_MIN; - bool internalResult = false; + bool internalResult = false; for (int32_t i = 0; i < numOfTables; ++i) { STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); @@ -176,7 +193,53 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 continue; } + { + SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + for(int32_t j = 0; j < pr->numOfCols; ++j) { + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes); + pRes[j]->ts = INT64_MIN; + } + + for (int32_t k = 0; k < pr->numOfCols; ++k) { + SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k); + + if (slotIds[k] == -1) { // the primary timestamp + SLastCol *pColVal = (SLastCol *)taosArrayGet(pRow, k); + if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + pRes[k]->hasResult = true; + pRes[k]->ts = pColVal->ts; + memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE); + + colDataAppend(pColInfoData, 1, (const char*)pRes[k], false); + } + } else { + int32_t slotId = slotIds[k]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); + + if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + pRes[k]->hasResult = true; + pRes[k]->ts = pColVal->ts; + + pRes[k]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); + if (!pRes[k]->isNull) { + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + varDataSetLen(pRes[k]->buf, pColVal->colVal.value.nData); + memcpy(varDataVal(pRes[k]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + } else { + memcpy(pRes[k]->buf, &pColVal->colVal.value, pr->pSchema->columns[slotId].bytes); + } + } + + colDataAppend(pColInfoData, 1, (const char*)pRes[k], false); + } + } + } + } + +/* if (pRow->ts > lastKey) { + printf("qualified:%ld, old Value:%ld\n", pRow->ts, lastKey); + // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // appended or not. if (internalResult) { @@ -189,7 +252,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 internalResult = true; lastKey = pRow->ts; } - +*/ tsdbCacheRelease(lruCache, h); } } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 7077a9b780..493561ca1e 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -57,16 +57,6 @@ typedef struct SAvgRes { int16_t type; // store the original input type, used in merge function } SAvgRes; -typedef struct STuplePos { - union { - struct { - int32_t pageId; - int32_t offset; - }; - STupleKey streamTupleKey; - }; -} STuplePos; - typedef struct SMinmaxResInfo { bool assign; // assign the first value or not int64_t v; @@ -93,17 +83,6 @@ typedef struct STopBotRes { STopBotResItem* pItems; } STopBotRes; -typedef struct SFirstLastRes { - bool hasResult; - // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, - // this attribute is required - bool isNull; - int32_t bytes; - int64_t ts; - STuplePos pos; - char buf[]; -} SFirstLastRes; - typedef struct SStddevRes { double result; int64_t count;