Merge pull request #17336 from taosdata/feature/3_liaohj

other:merge 3.0, and support last query cache
This commit is contained in:
Haojun Liao 2022-10-20 18:15:38 +08:00 committed by GitHub
commit abff4fb1f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 545 additions and 169 deletions

View File

@ -83,6 +83,27 @@ typedef struct {
int32_t exprIdx;
} STupleKey;
typedef struct STuplePos {
union {
struct {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
};
} STuplePos;
typedef struct SFirstLastRes {
bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required
bool isNull;
int32_t bytes;
int64_t ts;
STuplePos pos;
char buf[];
} SFirstLastRes;
static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
STupleKey* pTuple1 = (STupleKey*)pKey1;
STupleKey* pTuple2 = (STupleKey*)pKey2;

View File

@ -129,6 +129,7 @@ typedef enum EFunctionType {
FUNCTION_TYPE_TO_COLUMN,
FUNCTION_TYPE_GROUP_KEY,
FUNCTION_TYPE_CACHE_LAST_ROW,
FUNCTION_TYPE_CACHE_LAST,
// distributed splitting functions
FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000,
@ -216,6 +217,8 @@ bool fmIsKeepOrderFunc(int32_t funcId);
bool fmIsCumulativeFunc(int32_t funcId);
bool fmIsInterpPseudoColumnFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);
typedef enum EFuncDataRequired {

View File

@ -299,6 +299,11 @@ int32_t tsdbMerge(STsdb *pTsdb);
#define TSDB_CACHE_LAST(c) (((c).cacheLast & 2) > 0)
// tsdbCache ==============================================================================================
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);

View File

@ -15,11 +15,6 @@
#include "tsdb.h"
typedef struct {
TSKEY ts;
SColVal colVal;
} SLastCol;
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = NULL;
@ -61,9 +56,18 @@ static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
*len = sizeof(uint64_t);
}
static void deleteTableCacheLastrow(const void *key, size_t keyLen, void *value) { taosMemoryFree(value); }
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) {
SArray *pLastArray = (SArray *)value;
int16_t nCol = taosArrayGetSize(pLastArray);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pLastArray, iCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type) && pLastCol->colVal.value.nData > 0) {
taosMemoryFree(pLastCol->colVal.value.pData);
}
}
static void deleteTableCacheLast(const void *key, size_t keyLen, void *value) { taosArrayDestroy(value); }
taosArrayDestroy(value);
}
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
@ -75,13 +79,23 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
@ -130,14 +144,23 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSRow *pRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (pRow->ts <= eKey) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
// getTableCacheKey(uid, "l", key, &keyLen);
@ -177,6 +200,46 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
TSKEY keyTs = row->ts;
bool invalidate = false;
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
int16_t nCol = taosArrayGetSize(pLast);
int16_t iCol = 0;
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs > tTsVal->ts) {
STColumn *pTColumn = &pTSchema->columns[0];
SColVal tColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = keyTs});
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = tColVal});
}
for (++iCol; iCol < nCol; ++iCol) {
SLastCol *tTsVal1 = (SLastCol *)taosArrayGet(pLast, iCol);
if (keyTs >= tTsVal1->ts) {
SColVal *tColVal = &tTsVal1->colVal;
SColVal colVal = {0};
tTSRowGetVal(row, pTSchema, iCol, &colVal);
if (!COL_VAL_IS_NONE(&colVal)) {
if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) {
invalidate = true;
break;
}
} else {
taosArraySet(pLast, iCol, &(SLastCol){.ts = keyTs, .colVal = colVal});
}
}
}
_invalidate:
taosMemoryFreeClear(pTSchema);
taosLRUCacheRelease(pCache, h, invalidate);
/*
cacheRow = (STSRow *)taosLRUCacheValue(pCache, h);
if (row->ts >= cacheRow->ts) {
if (row->ts == cacheRow->ts) {
@ -218,9 +281,9 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
/* tsdbCacheInsertLastrow(pCache, uid, row, dup); */
// tsdbCacheInsertLastrow(pCache, uid, row, dup);
}
}
}*/
} /*else {
if (dup) {
cacheRow = tdRowDup(row);
@ -456,6 +519,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) {
return code;
}
if (state->pDataFReader != NULL) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
}
code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet);
if (code) goto _err;
@ -599,6 +667,8 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
*/
state->pBlockIdx = taosArraySearch(state->aBlockIdx, state->pBlockIdxExp, tCmprBlockIdx, TD_EQ);
if (!state->pBlockIdx) {
tsdbDataFReaderClose(&state->pDataFReader);
state->pDataFReader = NULL;
goto _next_fileset;
}
@ -1049,7 +1119,7 @@ _err:
return code;
}
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRow) {
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppColArray) {
int32_t code = 0;
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1);
@ -1057,7 +1127,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
int16_t iCol = 0;
int16_t noneCol = 0;
bool setNoneCol = false;
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
SArray *pColArray = taosArrayInit(nCol, sizeof(SLastCol));
SColVal *pColVal = &(SColVal){0};
TSKEY lastRowTs = TSKEY_MAX;
@ -1073,12 +1143,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
break;
}
TSKEY rowTs = TSDBROW_TS(pRow);
if (lastRowTs == TSKEY_MAX) {
lastRowTs = TSDBROW_TS(pRow);
lastRowTs = rowTs;
STColumn *pTColumn = &pTSchema->columns[0];
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = lastRowTs});
if (taosArrayPush(pColArray, pColVal) == NULL) {
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -1086,7 +1159,18 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
for (iCol = 1; iCol < nCol; ++iCol) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (taosArrayPush(pColArray, pColVal) == NULL) {
SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (taosArrayPush(pColArray, &lastCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -1097,15 +1181,15 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
}
}
if (!setNoneCol) {
// goto build the result ts row
// done, goto return pColArray
break;
} else {
continue;
}
}
if ((TSDBROW_TS(pRow) < lastRowTs)) {
// goto build the result ts row
if ((rowTs < lastRowTs)) {
// done, goto return pColArray
break;
}
@ -1117,7 +1201,21 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
taosArraySet(pColArray, iCol, pColVal);
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
taosArraySet(pColArray, iCol, &lastCol);
} else if (COL_VAL_IS_NONE(tColVal) && COL_VAL_IS_NONE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
@ -1127,15 +1225,14 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
// build the result ts row here
*dup = false;
if (taosArrayGetSize(pColArray) == nCol) {
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
if (taosArrayGetSize(pColArray) != nCol) {
*ppColArray = NULL;
taosArrayDestroy(pColArray);
} else {
*ppRow = NULL;
*ppColArray = pColArray;
}
nextRowIterClose(&iter);
taosArrayDestroy(pColArray);
taosMemoryFreeClear(pTSchema);
return code;
@ -1185,7 +1282,18 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
for (iCol = 1; iCol < nCol; ++iCol) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (taosArrayPush(pColArray, &(SLastCol){.ts = lastRowTs, .colVal = *pColVal}) == NULL) {
SLastCol lastCol = {.ts = lastRowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
if (taosArrayPush(pColArray, &lastCol) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
@ -1196,7 +1304,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
}
}
if (!setNoneCol) {
// goto build the result ts row
// done, goto return pColArray
break;
} else {
continue;
@ -1207,11 +1315,26 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
setNoneCol = false;
for (iCol = noneCol; iCol < nCol; ++iCol) {
// high version's column value
SColVal *tColVal = (SColVal *)taosArrayGet(pColArray, iCol);
SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol);
SColVal *tColVal = &lastColVal->colVal;
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
taosArraySet(pColArray, iCol, &(SLastCol){.ts = rowTs, .colVal = *pColVal});
SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal};
if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) {
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
taosMemoryFree(pLastCol->colVal.value.pData);
lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData);
if (lastCol.colVal.value.pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData);
}
taosArraySet(pColArray, iCol, &lastCol);
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
noneCol = iCol;
setNoneCol = true;
@ -1219,7 +1342,6 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
}
} while (setNoneCol);
// build the result ts row here
if (taosArrayGetSize(pColArray) <= 0) {
*ppLastArray = NULL;
taosArrayDestroy(pColArray);
@ -1252,13 +1374,13 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STSRow *pRow = NULL;
SArray *pArray = NULL;
bool dup = false; // which is always false for now
code = mergeLastRow(uid, pTsdb, &dup, &pRow);
code = mergeLastRow(uid, pTsdb, &dup, &pArray);
// if table's empty or error, return code of -1
if (code < 0 || pRow == NULL) {
if (!dup && pRow) {
taosMemoryFree(pRow);
if (code < 0 || pArray == NULL) {
if (!dup && pArray) {
taosArrayDestroy(pArray);
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
@ -1268,9 +1390,9 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUH
return 0;
}
_taos_lru_deleter_t deleter = deleteTableCacheLastrow;
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pRow, TD_ROW_LEN(pRow), deleter, NULL, TAOS_LRU_PRIORITY_LOW);
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
@ -1328,15 +1450,17 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHand
SArray *pLastArray = NULL;
code = mergeLast(uid, pTsdb, &pLastArray);
// if table's empty or error, return code of -1
// if (code < 0 || pRow == NULL) {
if (code < 0 || pLastArray == NULL) {
taosThreadMutexUnlock(&pTsdb->lruMutex);
*handle = NULL;
return 0;
}
size_t charge = pLastArray->capacity * pLastArray->elemSize + sizeof(*pLastArray);
_taos_lru_deleter_t deleter = deleteTableCacheLast;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pLastArray, pLastArray->capacity, deleter, NULL,
TAOS_LRU_PRIORITY_LOW);
LRUStatus status =
taosLRUCacheInsert(pCache, key, keyLen, pLastArray, charge, deleter, NULL, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}

View File

@ -29,31 +29,71 @@ typedef struct SCacheRowsReader {
SArray* pTableList; // table id list
} SCacheRowsReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
int32_t numOfRows = pBlock->info.rows;
SColVal colVal = {0};
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
if (slotIds[i] == -1) {
colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
} else {
int32_t slotId = slotIds[i];
tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (!COL_VAL_IS_VALUE(&colVal)) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
}
if (slotIds[i] == -1) { // the primary timestamp
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
p->ts = pColVal->ts;
p->bytes = TSDB_KEYSIZE;
*(int64_t*)p->buf = pColVal->ts;
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&colVal));
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
p->ts = pColVal->ts;
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
if (!p->isNull) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
varDataSetLen(p->buf, pColVal->colVal.value.nData);
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
} else {
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
p->bytes = pReader->pSchema->columns[slotId].bytes;
}
}
}
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
p->hasResult = true;
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
}
} else {
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (slotIds[i] == -1) {
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
colDataAppend(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
} else {
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
SColVal* pVal = &pColVal->colVal;
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
}
}
}
}
@ -118,35 +158,36 @@ void* tsdbCacherowsReaderClose(void* pReader) {
return NULL;
}
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow,
static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, SArray** pRow,
LRUHandle** h) {
int32_t code = TSDB_CODE_SUCCESS;
if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
*pRow = NULL;
// no data in the table of Uid
if (*h != NULL) {
*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
}
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
} else {
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// no data in the table of Uid
if (*h != NULL) {
SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h);
tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema);
}
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (*h != NULL) {
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
}
return code;
}
static void freeItem(void* pItem) {
SLastCol* pCol = (SLastCol*) pItem;
if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
taosMemoryFree(pCol->colVal.value.pData);
}
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
@ -157,13 +198,41 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
int32_t code = TSDB_CODE_SUCCESS;
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL;
STSRow* pRow = NULL;
SArray* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList);
bool hasRes = false;
SArray* pLastCols = NULL;
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
if (pRes == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
for (int32_t j = 0; j < pr->numOfCols; ++j) {
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
p->ts = INT64_MIN;
}
pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
if (pLastCols == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
for (int32_t i = 0; i < pr->pSchema->numOfCols; ++i) {
struct STColumn* pCol = &pr->pSchema->columns[i];
SLastCol p = {.ts = INT64_MIN, .colVal.type = pCol->type};
if (IS_VAR_DATA_TYPE(pCol->type)) {
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
}
taosArrayPush(pLastCols, &p);
}
// retrieve the only one last row of all tables in the uid list.
if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN;
bool internalResult = false;
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
@ -176,23 +245,59 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
continue;
}
if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not.
if (internalResult) {
pResBlock->info.rows -= 1;
taosArrayClear(pTableUidList);
}
{
for (int32_t k = 0; k < pr->numOfCols; ++k) {
int32_t slotId = slotIds[k];
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
internalResult = true;
lastKey = pRow->ts;
if (slotId == -1) { // the primary timestamp
SLastCol* p = taosArrayGet(pLastCols, 0);
SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
if (pCol->ts > p->ts) {
hasRes = true;
p->ts = pCol->ts;
p->colVal = pCol->colVal;
// only set value for last row query
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
if (taosArrayGetSize(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
}
}
} else {
SLastCol* p = taosArrayGet(pLastCols, slotId);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
if (pColVal->ts > p->ts) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
continue;
}
hasRes = true;
p->ts = pColVal->ts;
uint8_t* px = p->colVal.value.pData;
p->colVal = pColVal->colVal;
if (COL_VAL_IS_VALUE(&pColVal->colVal) && IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
p->colVal.value.pData = px;
memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
}
}
}
}
}
tsdbCacheRelease(lruCache, h);
}
} else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes);
}
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i);
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
@ -204,19 +309,27 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
continue;
}
saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
saveOneRow(pRow, pResBlock, pr, slotIds, pRes);
// TODO reset the pRes
taosArrayPush(pTableUidList, &pKeyInfo->uid);
tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) {
return TSDB_CODE_SUCCESS;
goto _end;
}
}
} else {
return TSDB_CODE_INVALID_PARA;
code = TSDB_CODE_INVALID_PARA;
}
return TSDB_CODE_SUCCESS;
_end:
for (int32_t j = 0; j < pr->numOfCols; ++j) {
taosMemoryFree(pRes[j]);
}
taosMemoryFree(pRes);
taosArrayDestroyEx(pLastCols, freeItem);
return code;
}

View File

@ -58,7 +58,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
// partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | CACHESCAN_RETRIEVE_LAST_ROW;
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) {
@ -67,8 +67,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
} else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | CACHESCAN_RETRIEVE_LAST_ROW;
} else { // by tags
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|(pScanNode->ignoreNull? CACHESCAN_RETRIEVE_LAST:CACHESCAN_RETRIEVE_LAST_ROW);
}
if (pScanNode->scan.pScanPseudoCols != NULL) {
@ -198,16 +198,20 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pInfo->pRes->info.rows > 0) {
if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup;
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
pInfo->pRes->info.groupId = pKeyInfo->groupId;
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
if (taosArrayGetSize(pInfo->pUidList) > 0) {
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
}
}

View File

@ -5475,9 +5475,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
}
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
ASSERT(numOfCols > 0);
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
SInterval interval = {
.interval = pIntervalPhyNode->interval,
@ -5487,6 +5489,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
};
STimeWindowAggSupp twAggSupp = {
.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
@ -5494,6 +5497,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.minTs = INT64_MAX,
.deleteMark = INT64_MAX,
};
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
pOperator->pTaskInfo = pTaskInfo;
pInfo->interval = interval;
@ -5501,16 +5505,25 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->isFinal = false;
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
SExprSupp* pSup = &pOperator->exprSupp;
initBasicInfo(&pInfo->binfo, pResBlock);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
initResultSizeInfo(&pOperator->resultInfo, 4096);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -2405,6 +2405,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = cachedLastRowFunction,
.finalizeFunc = firstLastFinalize
},
{
.name = "_cache_last",
.type = FUNCTION_TYPE_CACHE_LAST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup,
.processFunc = lastFunctionMerge,
.finalizeFunc = firstLastFinalize
},
{
.name = "_last_row_partial",
.type = FUNCTION_TYPE_LAST_PARTIAL,

View File

@ -57,16 +57,6 @@ typedef struct SAvgRes {
int16_t type; // store the original input type, used in merge function
} SAvgRes;
typedef struct STuplePos {
union {
struct {
int32_t pageId;
int32_t offset;
};
STupleKey streamTupleKey;
};
} STuplePos;
typedef struct SMinmaxResInfo {
bool assign; // assign the first value or not
int64_t v;
@ -93,17 +83,6 @@ typedef struct STopBotRes {
STopBotResItem* pItems;
} STopBotRes;
typedef struct SFirstLastRes {
bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required
bool isNull;
int32_t bytes;
int64_t ts;
STuplePos pos;
char buf[];
} SFirstLastRes;
typedef struct SStddevRes {
double result;
int64_t count;

View File

@ -16,6 +16,7 @@
#include "functionMgt.h"
#include "builtins.h"
#include "builtinsimpl.h"
#include "functionMgtInt.h"
#include "taos.h"
#include "taoserror.h"
@ -314,6 +315,11 @@ bool fmIsSameInOutType(int32_t funcId) {
return res;
}
void getLastCacheDataType(SDataType* pType) {
pType->bytes = getFirstLastInfoSize(pType->bytes) + VARSTR_HEADER_SIZE;
pType->type = TSDB_DATA_TYPE_BINARY;
}
static int32_t getFuncInfo(SFunctionNode* pFunc) {
char msg[128] = {0};
return fmGetFuncInfo(pFunc, msg, sizeof(msg));

View File

@ -1969,7 +1969,7 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT };
enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, PHY_LAST_ROW_SCAN_CODE_IGNULL };
static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
@ -1981,6 +1981,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, pNode->groupSort);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull);
}
return code;
}
@ -2001,6 +2004,9 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_LAST_ROW_SCAN_CODE_GROUP_SORT:
code = tlvDecodeBool(pTlv, &pNode->groupSort);
break;
case PHY_LAST_ROW_SCAN_CODE_IGNULL:
code = tlvDecodeBool(pTlv, &pNode->ignoreNull);
break;
default:
break;
}

View File

@ -2139,6 +2139,46 @@ static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef);
}
typedef struct SLastRowScanOptLastParaCkCxt {
bool hasTag;
bool hasCol;
} SLastRowScanOptLastParaCkCxt;
static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SLastRowScanOptLastParaCkCxt* pCxt = pContext;
if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
pCxt->hasTag = true;
} else {
pCxt->hasCol = true;
}
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
static bool lastRowScanOptLastParaCheck(SNode* pExpr) {
SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false};
nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt);
return !cxt.hasTag && cxt.hasCol;
}
static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) {
switch (cacheLastMode) {
case TSDB_CACHE_MODEL_NONE:
return false;
case TSDB_CACHE_MODEL_LAST_ROW:
return hasLastRow;
case TSDB_CACHE_MODEL_LAST_VALUE:
return hasLast;
case TSDB_CACHE_MODEL_BOTH:
return true;
default:
break;
}
return false;
}
static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
@ -2149,16 +2189,27 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
// Only one of LAST and LASTROW can appear
if (pAgg->hasLastRow == pAgg->hasLast || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions ||
0 == pScan->cacheLastMode || IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
!hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) ||
IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
return false;
}
bool hasLastFunc = false;
bool hasSelectFunc = false;
SNode* pFunc = NULL;
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType &&
// FUNCTION_TYPE_LAST != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pFunc)->funcType) {
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) {
return false;
}
hasLastFunc = true;
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
if (hasLastFunc) {
return false;
}
hasSelectFunc = true;
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
return false;
}
}
@ -2166,6 +2217,31 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
return true;
}
typedef struct SLastRowScanOptSetColDataTypeCxt {
bool doAgg;
SNodeList* pLastCols;
} SLastRowScanOptSetColDataTypeCxt;
static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SLastRowScanOptSetColDataTypeCxt* pCxt = pContext;
if (pCxt->doAgg) {
nodesListMakeAppend(&pCxt->pLastCols, pNode);
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType));
} else {
SNode* pCol = NULL;
FOREACH(pCol, pCxt->pLastCols) {
if (nodesEqualNode(pCol, pNode)) {
getLastCacheDataType(&(((SColumnNode*)pNode)->node.resType));
break;
}
}
}
return DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;
}
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
@ -2173,22 +2249,36 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
return TSDB_CODE_SUCCESS;
}
SNode* pNode = NULL;
SLastRowScanOptSetColDataTypeCxt cxt = {.doAgg = true, .pLastCols = NULL};
SNode* pNode = NULL;
FOREACH(pNode, pAgg->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (FUNCTION_TYPE_LAST_ROW == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), "_cache_last_row");
int32_t funcType = pFunc->funcType;
if (FUNCTION_TYPE_LAST_ROW == funcType || FUNCTION_TYPE_LAST == funcType) {
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName),
FUNCTION_TYPE_LAST_ROW == funcType ? "_cache_last_row" : "_cache_last");
pFunc->functionName[len] = '\0';
int32_t code = fmGetFuncInfo(pFunc, NULL, 0);
if (TSDB_CODE_SUCCESS != code) {
nodesClearList(cxt.pLastCols);
return code;
}
if (FUNCTION_TYPE_LAST == funcType) {
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
}
}
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
pScan->scanType = SCAN_TYPE_LAST_ROW;
pScan->igLastNull = pAgg->hasLast ? true : false;
if (NULL != cxt.pLastCols) {
cxt.doAgg = false;
nodesWalkExprs(pScan->pScanCols, lastRowScanOptSetColDataType, &cxt);
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
nodesWalkExprs(pScan->node.pTargets, lastRowScanOptSetColDataType, &cxt);
nodesClearList(cxt.pLastCols);
}
pAgg->hasLastRow = false;
pAgg->hasLast = false;

View File

@ -105,24 +105,6 @@ TEST_F(PlanBasicTest, interpFunc) {
run("SELECT _IROWTS, INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
}
TEST_F(PlanBasicTest, lastRowFunc) {
useDb("root", "cache_db");
run("SELECT LAST_ROW(c1) FROM t1");
run("SELECT LAST_ROW(*) FROM t1");
run("SELECT LAST_ROW(c1, c2) FROM t1");
run("SELECT LAST_ROW(c1), c2 FROM t1");
run("SELECT LAST_ROW(c1) FROM st1");
run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME");
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
}
TEST_F(PlanBasicTest, lastRowFuncWithoutCache) {
useDb("root", "test");

View File

@ -109,9 +109,28 @@ TEST_F(PlanOptimizeTest, mergeProjects) {
TEST_F(PlanOptimizeTest, pushDownProjectCond) {
useDb("root", "test");
run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first");
}
TEST_F(PlanOptimizeTest, LastRowScan) {
useDb("root", "cache_db");
run("SELECT LAST_ROW(c1), c2 FROM t1");
run("SELECT LAST_ROW(c1), c2, tag1, tbname FROM st1");
run("SELECT LAST_ROW(c1) FROM st1 PARTITION BY TBNAME");
run("SELECT LAST_ROW(c1), SUM(c3) FROM t1");
run("SELECT LAST_ROW(tag1) FROM st1");
run("SELECT LAST(c1) FROM st1");
run("SELECT LAST(c1), c2 FROM st1");
}
TEST_F(PlanOptimizeTest, tagScan) {
useDb("root", "test");
run("select tag1 from st1 group by tag1");

View File

@ -39,6 +39,7 @@ if $data02 != 5.000000000 then
return -1
endi
if $data03 != 3 then
print expect 3, actual: $data03
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
@ -210,7 +211,7 @@ if $data01 != 6 then
return -1
endi
if $data02 != 37.000000000 then
print $data02
print expect 37.000000000 actual: $data02
return -1
endi
if $data03 != 27 then
@ -233,7 +234,7 @@ if $data01 != 6 then
return -1
endi
if $data02 != 37.000000000 then
print $data02
print expect 37.000000000, acutal: $data02
return -1
endi
if $data03 != 27 then

View File

@ -13,7 +13,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
tdSql.init(conn.cursor(), True)
self.tb_nums = 10
self.row_nums = 20
self.ts = 1434938400000