Fix TD-27003: use last row scan when select last_row, last

This commit is contained in:
fullhonest 2024-01-14 13:44:40 +08:00
parent ed1a3d6ab3
commit cb2ab749b5
23 changed files with 1172 additions and 56 deletions

View File

@ -189,7 +189,8 @@ typedef struct TsdReader {
typedef struct SStoreCacheReader {
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
SArray *pFuncTypeList);
void *(*closeReader)(void *pReader);
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUidList);

View File

@ -149,6 +149,8 @@ void nodesRewriteExprPostOrder(SNode** pNode, FNodeRewriter rewriter, void* pCon
void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext);
bool nodesEqualNode(const SNode* a, const SNode* b);
bool nodeListNodeEqual(const SNodeList* a, const SNode* b);
bool nodesMatchNode(const SNode* pSub, const SNode* pNode);
SNode* nodesCloneNode(const SNode* pNode);

View File

@ -120,6 +120,7 @@ typedef struct SScanLogicNode {
bool onlyMetaCtbIdx; // for tag scan with no tbname
bool filesetDelimited; // returned blocks delimited by fileset
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
SArray* pFuncTypes; // for last, last_row
} SScanLogicNode;
typedef struct SJoinLogicNode {
@ -401,6 +402,7 @@ typedef struct SLastRowScanPhysiNode {
bool groupSort;
bool ignoreNull;
SNodeList* pTargets;
SArray* pFuncTypes;
} SLastRowScanPhysiNode;
typedef SLastRowScanPhysiNode STableCountScanPhysiNode;

View File

@ -175,7 +175,8 @@ void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn not
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr);
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
SArray* pFuncTypeList);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cos.h"
#include "functionMgt.h"
#include "tsdb.h"
#include "tsdbDataFileRW.h"
#include "tsdbReadUtil.h"
@ -894,19 +895,56 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
}
int num_keys = TARRAY_SIZE(remainCols);
int16_t *aCols = taosMemoryMalloc(num_keys * sizeof(int16_t));
int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
int16_t *lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
int16_t *lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
int16_t *lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
int16_t *lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t));
SArray* lastTmpColArray = NULL;
SArray* lastTmpIndexArray = NULL;
SArray* lastrowTmpColArray = NULL;
SArray* lastrowTmpIndexArray = NULL;
int lastIndex = 0;
int lastrowIndex = 0;
for (int i = 0; i < num_keys; ++i) {
SIdxKey *idxKey = taosArrayGet(remainCols, i);
aCols[i] = idxKey->key.cid;
slotIds[i] = pr->pSlotIds[idxKey->idx];
if (idxKey->key.ltype == CACHESCAN_RETRIEVE_LAST >> 3) {
if(NULL == lastTmpIndexArray) {
lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
}
taosArrayPush(lastTmpIndexArray, &(i));
lastColIds[lastIndex] = idxKey->key.cid;
lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx];
lastIndex++;
} else {
if(NULL == lastrowTmpIndexArray) {
lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t));
}
taosArrayPush(lastrowTmpIndexArray, &(i));
lastrowColIds[lastrowIndex] = idxKey->key.cid;
lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx];
lastrowIndex++;
}
}
if (ltype) {
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds);
} else {
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds);
pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol));
if(lastTmpIndexArray != NULL) {
mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds);
for(int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) {
taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i));
}
}
if(lastrowTmpIndexArray != NULL) {
mergeLastCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds);
for(int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) {
taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastrowTmpIndexArray, i), taosArrayGet(lastrowTmpColArray, i));
}
}
SLRUCache *pCache = pTsdb->lruCache;
@ -965,9 +1003,18 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr
rocksMayWrite(pTsdb, false, true, false);
}
taosArrayDestroy(lastrowTmpIndexArray);
taosArrayDestroy(lastrowTmpColArray);
taosArrayDestroy(lastTmpIndexArray);
taosArrayDestroy(lastTmpColArray);
taosMemoryFree(lastColIds);
taosMemoryFree(lastSlotIds);
taosMemoryFree(lastrowColIds);
taosMemoryFree(lastrowSlotIds);
taosArrayDestroy(pTmpColArray);
taosMemoryFree(aCols);
taosMemoryFree(slotIds);
return code;
@ -1057,6 +1104,15 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i];
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
// for select last_row, last case
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) {
funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i];
}
if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST);
key->ltype = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3;
}
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (h) {

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "functionMgt.h"
#include "taoserror.h"
#include "tarray.h"
#include "tcommon.h"
@ -33,31 +34,69 @@ static void setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) {
taosMemoryFree(buf);
}
static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, const int32_t slotId,
SColumnInfoData* pColInfoData, int32_t numOfRows) {
SColVal* pVal = &pColVal->colVal;
// allNullRow = false;
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
colDataSetNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
}
} else {
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
}
return;
}
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
// bool allNullRow = true;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
uint64_t ts = TSKEY_MIN;
SFirstLastRes* p = NULL;
col_id_t colId = -1;
SArray* funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t));
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
int32_t funcType = FUNCTION_TYPE_CACHE_LAST;
if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) {
funcType = *(int32_t*)taosArrayGet(pReader->pFuncTypeList, i);
}
taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i));
if (slotIds[i] == -1) {
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
colDataSetNULL(pColInfoData, numOfRows);
continue;
}
setFirstLastResColToNull(pColInfoData, numOfRows);
continue;
}
int32_t slotId = slotIds[i];
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
colId = pColVal->colVal.cid;
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows);
continue;
}
p = (SFirstLastRes*)varDataVal(pRes[i]);
p->ts = pColVal->ts;
ts = p->ts;
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
// allNullRow = p->isNull & allNullRow;
if (!p->isNull) {
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
varDataSetLen(p->buf, pColVal->colVal.value.nData);
@ -77,6 +116,13 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
}
for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx);
if (idx < funcTypeBlockArray->size) {
int32_t funcType = *(int32_t*)taosArrayGet(funcTypeBlockArray, idx);
if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) {
continue;
}
}
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
if (ts == TSKEY_MIN) {
colDataSetNULL(pCol, numOfRows);
@ -95,6 +141,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
// pBlock->info.rows += allNullRow ? 0 : 1;
++pBlock->info.rows;
taosArrayDestroy(funcTypeBlockArray);
} else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
@ -105,21 +152,8 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
SColVal* pVal = &pColVal->colVal;
// allNullRow = false;
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
colDataSetNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData);
memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData);
colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
}
} else {
colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal));
}
saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows);
}
// pBlock->info.rows += allNullRow ? 0 : 1;
@ -175,7 +209,8 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
}
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) {
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr,
SArray* pFuncTypeList) {
*pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) {
@ -190,6 +225,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->numOfCols = numOfCols;
p->pCidList = pCidList;
p->pSlotIds = pSlotIds;
p->pFuncTypeList = pFuncTypeList;
if (numOfTables == 0) {
*pReader = p;

View File

@ -348,6 +348,7 @@ typedef struct SCacheRowsReader {
STsdbReadSnap* pReadSnap;
char* idstr;
int64_t lastTs;
SArray* pFuncTypeList;
} SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);

View File

@ -82,6 +82,7 @@ typedef struct SColMatchItem {
int32_t dstSlotId;
bool needOutput;
SDataType dataType;
int32_t funcType;
} SColMatchItem;
typedef struct SColMatchInfo {

View File

@ -21,6 +21,7 @@
#include "tmsg.h"
#include "executorInt.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcompare.h"
@ -44,6 +45,7 @@ typedef struct SCacheRowsScanInfo {
SArray* pCidList;
int32_t indexOfBufferedRes;
STableListInfo* pTableList;
SArray* pFuncTypeList;
} SCacheRowsScanInfo;
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
@ -105,9 +107,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
}
SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) {
SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i);
taosArrayPush(pCidList, &pColInfo->colId);
if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) {
pColInfo->funcType = *(int32_t*)taosArrayGet(pInfo->pFuncTypeList, i);
}
}
pInfo->pCidList = pCidList;
@ -132,7 +140,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo);
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds,
suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pScanNode->pFuncTypes);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -274,7 +282,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (NULL == pInfo->pLastrowReader) {
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader,
pTaskInfo->id.str);
pTaskInfo->id.str, pInfo->pFuncTypeList);
if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList);
@ -333,6 +341,7 @@ void destroyCacheScanOperator(void* param) {
taosMemoryFree(pInfo->pSlotIds);
taosMemoryFree(pInfo->pDstSlotIds);
taosArrayDestroy(pInfo->pCidList);
taosArrayDestroy(pInfo->pFuncTypeList);
taosArrayDestroy(pInfo->pUidList);
taosArrayDestroy(pInfo->matchInfo.pList);
tableListDestroy(pInfo->pTableList);
@ -405,6 +414,8 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
taosArrayPush(pMatchInfo, pColInfo);
} else if (FUNCTION_TYPE_CACHE_LAST_ROW == pColInfo->funcType){
taosArrayPush(pMatchInfo, pColInfo);
}
}

View File

@ -1343,7 +1343,6 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
c.colId = pColNode->colId;
c.srcSlotId = pColNode->slotId;
c.dstSlotId = pNode->slotId;
c.dataType = pColNode->node.resType;
taosArrayPush(pList, &c);
}
}

View File

@ -621,6 +621,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
} else {
pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
}
if(pRes == NULL) {
st = taosGetTimestampUs();
}
int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {

View File

@ -2125,7 +2125,7 @@ bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
}
static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) {
if (pTsColInfo == NULL) {
if (pTsColInfo == NULL || pTsColInfo->pData == NULL) {
return 0;
}

View File

@ -241,6 +241,29 @@ static SVgroupsInfo* vgroupsInfoClone(const SVgroupsInfo* pSrc) {
return pDst;
}
static SArray* functParamClone(const SArray* pSrc) {
int32_t len = sizeof(SArray) + pSrc->capacity * pSrc->elemSize;
SArray* pDst = taosArrayInit(pSrc->capacity, pSrc->elemSize);
if (NULL == pDst) {
return NULL;
}
for (int i = 0; i < TARRAY_SIZE(pSrc); ++i) {
SFunctParam* pFunctParam = taosArrayGet(pSrc, i);
SFunctParam* pNewFunctParam = (SFunctParam*)taosArrayPush(pDst, pFunctParam);
if (NULL == pNewFunctParam) {
return NULL;
}
pNewFunctParam->type = pFunctParam->type;
pNewFunctParam->pCol = taosMemoryCalloc(1, sizeof(SColumn));
memcpy(pNewFunctParam->pCol, pFunctParam->pCol, sizeof(SColumn));
}
return pDst;
}
static int32_t realTableNodeCopy(const SRealTableNode* pSrc, SRealTableNode* pDst) {
COPY_BASE_OBJECT_FIELD(table, tableNodeCopy);
CLONE_OBJECT_FIELD(pMeta, tableMetaClone);
@ -425,6 +448,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
COPY_SCALAR_FIELD(filesetDelimited);
COPY_SCALAR_FIELD(isCountByTag);
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
return TSDB_CODE_SUCCESS;
}

View File

@ -1784,6 +1784,24 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
static const char* jkLastRowScanPhysiPlanTargets = "Targets";
static const char* jkLastRowScanPhysiPlanFuncType = "FuncType";
static const char* jkLastRowScanPhysiPlanFuncTypes = "FuncTypes";
static int32_t funcTypeToJson(const void* pObj, SJson* pJson) {
const int32_t* pNode = (const int32_t*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkLastRowScanPhysiPlanFuncType, *pNode);
return code;
}
static int32_t jsonToFuncType(const SJson* pJson, void* pObj) {
int32_t* pNode = (int32_t*)pObj;
int32_t code = tjsonGetIntValue(pJson, jkLastRowScanPhysiPlanFuncType, pNode);
return code;
}
static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
@ -1798,6 +1816,9 @@ static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkLastRowScanPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddTArray(pJson, jkLastRowScanPhysiPlanFuncTypes, funcTypeToJson, pNode->pFuncTypes);
}
return code;
}
@ -1815,6 +1836,9 @@ static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToTArray(pJson, jkLastRowScanPhysiPlanFuncTypes, jsonToFuncType, &pNode->pFuncTypes, sizeof(int32_t));
}
return code;
}

View File

@ -194,3 +194,21 @@ bool nodesEqualNode(const SNode* a, const SNode* b) {
return false;
}
bool nodeListNodeEqual(const SNodeList* a, const SNode* b) {
if (NULL == a || NULL == b) {
return false;
}
if (LIST_LENGTH(a) < 1) {
return false;
}
SNode *na;
FOREACH(na, a) {
if (nodesEqualNode(na, b)) {
return true;
}
}
return false;
}

View File

@ -65,10 +65,14 @@ typedef int32_t (*FSetObject)(STlv* pTlv, void* pObj);
static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder);
static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder);
static int32_t SArrayToMsg(const void* pObj, STlvEncoder* pEncoder);
static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj);
static int32_t msgToNodeFromTlv(STlv* pTlv, void** pObj);
static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj);
static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj);
static int32_t msgToSArray(STlv* pTlv, void** pObj);
static int32_t initTlvEncoder(STlvEncoder* pEncoder) {
pEncoder->allocSize = NODES_MSG_DEFAULT_LEN;
@ -2053,7 +2057,8 @@ enum {
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
PHY_LAST_ROW_SCAN_CODE_GROUP_SORT,
PHY_LAST_ROW_SCAN_CODE_IGNULL,
PHY_LAST_ROW_SCAN_CODE_TARGETS
PHY_LAST_ROW_SCAN_CODE_TARGETS,
PHY_LAST_ROW_SCAN_CODE_FUNCTYPES
};
static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2072,6 +2077,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_FUNCTYPES, SArrayToMsg, pNode->pFuncTypes);
}
return code;
}
@ -2098,6 +2106,10 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_LAST_ROW_SCAN_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break;
case PHY_LAST_ROW_SCAN_CODE_FUNCTYPES:
code = msgToSArray(pTlv, (void**)&pNode->pFuncTypes);
break;
default:
break;
}
@ -4391,6 +4403,31 @@ static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) {
return TSDB_CODE_SUCCESS;
}
enum {
SARRAY_CODE_CAPACITY = 1,
SARRAY_CODE_ELEMSIZE,
SARRAY_CODE_SIZE,
SARRAY_CODE_PDATA
};
static int32_t SArrayToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SArray* pArray = (const SArray*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, SARRAY_CODE_CAPACITY, pArray->capacity);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, SARRAY_CODE_ELEMSIZE, pArray->elemSize);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, SARRAY_CODE_SIZE, pArray->size);
}
if (TSDB_CODE_SUCCESS == code && pArray->capacity * pArray->elemSize > 0 && pArray->pData != NULL) {
code = tlvEncodeBinary(pEncoder, SARRAY_CODE_PDATA, pArray->pData, pArray->capacity * pArray->elemSize);
}
return code;
}
static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) {
SNodeList* pList = nodesMakeList();
@ -4411,6 +4448,67 @@ static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) {
return code;
}
static int32_t msgToSArray(STlv* pTlv, void** pObj){
SArray* pArray = NULL;
uint32_t capacity = 0;
uint32_t elemSize = 0;
uint32_t actualSize;
int32_t decodeFieldNum = 0;;
int32_t code = TSDB_CODE_SUCCESS;
STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value};
STlv* pTlvTemp = NULL;
STlv* pDataTlv = NULL;
tlvForEach(&decoder, pTlvTemp, code) {
switch (pTlvTemp->type) {
case SARRAY_CODE_CAPACITY:
code = tlvDecodeI32(pTlvTemp, &capacity);
break;
case SARRAY_CODE_ELEMSIZE:
code = tlvDecodeI32(pTlvTemp, &elemSize);
break;
case SARRAY_CODE_SIZE:
code = tlvDecodeI32(pTlvTemp, &actualSize);
break;
case SARRAY_CODE_PDATA:
if (decodeFieldNum < 3) {
pDataTlv = pTlvTemp;
break;
}
pArray = taosArrayInit(capacity, elemSize);
if (NULL == pArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pArray->size = actualSize;
if (TSDB_CODE_SUCCESS != code || pTlvTemp == NULL) {
taosArrayDestroy(pArray);
return TSDB_CODE_OUT_OF_MEMORY;
}
code = tlvDecodeBinary(pTlvTemp, pArray->pData);
break;
default:
break;
}
decodeFieldNum++;
}
if (pDataTlv != NULL) {
pArray = taosArrayInit(capacity, elemSize);
if (NULL == pArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pArray->size = actualSize;
if (TSDB_CODE_SUCCESS != code || pTlvTemp == NULL) {
taosArrayDestroy(pArray);
return TSDB_CODE_OUT_OF_MEMORY;
}
code = tlvDecodeBinary(pDataTlv, pArray->pData);
}
*pObj = pArray;
return code;
}
static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj) {
STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value};
return msgToNodeList(&decoder, pObj);

View File

@ -674,6 +674,8 @@ static void destroyTableCfg(STableCfg* pCfg) {
static void destroySmaIndex(void* pIndex) { taosMemoryFree(((STableIndexInfo*)pIndex)->expr); }
static void destroyFuncParam(void* pValue) { taosMemoryFree(((SFunctParam*)pValue)->pCol); }
static void destroyHintValue(EHintOption option, void* value) {
switch (option) {
default:
@ -1173,6 +1175,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pLogicNode->pGroupTags);
nodesDestroyList(pLogicNode->pTags);
nodesDestroyNode(pLogicNode->pSubtable);
taosArrayDestroyEx(pLogicNode->pFuncTypes, destroyFuncParam);
break;
}
case QUERY_NODE_LOGIC_PLAN_JOIN: {
@ -1300,6 +1303,7 @@ void nodesDestroyNode(SNode* pNode) {
destroyScanPhysiNode((SScanPhysiNode*)pNode);
nodesDestroyList(pPhyNode->pGroupTags);
nodesDestroyList(pPhyNode->pTargets);
taosArrayDestroy(pPhyNode->pFuncTypes);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:

View File

@ -2501,17 +2501,30 @@ static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId,
return true;
}
static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, bool* hasOtherFunc) {
static bool isNeedSplitCacheLastFunc(SFunctionNode* pFunc, SScanLogicNode* pScan) {
int32_t funcType = pFunc->funcType;
if ((FUNCTION_TYPE_LAST_ROW != funcType || (FUNCTION_TYPE_LAST_ROW == funcType && TSDB_CACHE_MODEL_LAST_VALUE == pScan->cacheLastMode)) &&
(FUNCTION_TYPE_LAST != funcType || (FUNCTION_TYPE_LAST == funcType && (TSDB_CACHE_MODEL_LAST_ROW == pScan->cacheLastMode ||
QUERY_NODE_OPERATOR == nodeType(nodesListGetNode(pFunc->pParameterList, 0)) || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pFunc->pParameterList, 0))))) &&
FUNCTION_TYPE_SELECT_VALUE != funcType && FUNCTION_TYPE_GROUP_KEY != funcType) {
return true;
}
return false;
}
static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, int8_t cacheLastModel, bool* hasOtherFunc) {
bool hasNonPKSelectFunc = false;
SNode* pFunc = NULL;
int32_t lastColNum = 0, selectNonPKColNum = 0;
col_id_t lastColId = -1, selectNonPKColId = -1;
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(((SAggLogicNode*)pNode)->node.pChildren, 0);
uint32_t needSplitFuncCount = 0;
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
SNode* pParam = nodesListGetNode(pAggFunc->pParameterList, 0);
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
SNode* pPar = nodesListGetNode(pAggFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN == nodeType(pPar)) {
SColumnNode* pCol = (SColumnNode*)pPar;
if (QUERY_NODE_COLUMN == nodeType(pParam)) {
SColumnNode* pCol = (SColumnNode*)pParam;
if (pCol->colType != COLUMN_TYPE_COLUMN) {
return false;
}
@ -2520,13 +2533,18 @@ static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, bool* hasOtherFunc) {
lastColNum++;
}
}
if (QUERY_NODE_VALUE == nodeType(nodesListGetNode(pAggFunc->pParameterList, 0))) {
else if (QUERY_NODE_VALUE == nodeType(pParam) || QUERY_NODE_OPERATOR == nodeType(pParam)) {
needSplitFuncCount++;
*hasOtherFunc = true;
}
if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId)) {
return false;
}
if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId))
return false;
if (TSDB_CACHE_MODEL_LAST_ROW == cacheLastModel) {
needSplitFuncCount++;
*hasOtherFunc = true;
}
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) {
SNode* pParam = nodesListGetNode(pAggFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN == nodeType(pParam)) {
SColumnNode* pCol = (SColumnNode*)pParam;
if (COLUMN_TYPE_COLUMN == pCol->colType && PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
@ -2548,15 +2566,21 @@ static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, bool* hasOtherFunc) {
}
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
*hasOtherFunc = true;
needSplitFuncCount++;
} else if (FUNCTION_TYPE_LAST_ROW == pAggFunc->funcType && TSDB_CACHE_MODEL_LAST_VALUE == cacheLastModel) {
*hasOtherFunc = true;
needSplitFuncCount++;
}
}
if (needSplitFuncCount >= ((SAggLogicNode*)pNode)->pAggFuncs->length) {
return false;
}
return true;
}
static bool lastRowScanOptCheckLastCache(SAggLogicNode* pAgg, SScanLogicNode* pScan) {
// Only one of LAST and LASTROW can appear
if (pAgg->hasLastRow == pAgg->hasLast || (!pAgg->hasLast && !pAgg->hasLastRow) || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions ||
if ((pAgg->hasLastRow == pAgg->hasLast && !pAgg->hasLastRow) || (!pAgg->hasLast && !pAgg->hasLastRow) || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions ||
!hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) ||
IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
return false;
@ -2578,7 +2602,7 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
}
bool hasOtherFunc = false;
if (!lastRowScanOptCheckFuncList(pNode, &hasOtherFunc)) {
if (!lastRowScanOptCheckFuncList(pNode, pScan->cacheLastMode, &hasOtherFunc)) {
return false;
}
@ -2593,6 +2617,7 @@ typedef struct SLastRowScanOptSetColDataTypeCxt {
bool doAgg;
SNodeList* pLastCols;
SNodeList* pOtherCols;
int32_t funcType;
} SLastRowScanOptSetColDataTypeCxt;
static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
@ -2615,7 +2640,7 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, bool erase) {
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase) {
SNode* pTarget = NULL;
WHERE_EACH(pTarget, pTargets) {
bool found = false;
@ -2627,6 +2652,10 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo
break;
}
}
if (!found && nodeListNodeEqual(pLastRowCols, pTarget)) {
found = true;
}
if (!found && erase) {
ERASE_NODE(pTargets);
continue;
@ -2635,7 +2664,7 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo
}
}
static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pList1, SNodeList* pList2) {
static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pList1, SNodeList* pList2, SNodeList* pList3) {
SNode* pTarget = NULL;
WHERE_EACH(pTarget, pTargets) {
bool found = false;
@ -2654,6 +2683,11 @@ static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pL
}
}
}
if (!found && nodeListNodeEqual(pList3, pTarget)) {
found = true;
}
if (!found) {
ERASE_NODE(pTargets);
continue;
@ -2662,6 +2696,33 @@ static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pL
}
}
static int32_t lastRowScanBuildFuncTypes(SScanLogicNode* pScan, SColumnNode* pColNode, int32_t funcType) {
SFunctParam* pFuncTypeParam = taosMemoryCalloc(1, sizeof(SFunctParam));
if (NULL == pFuncTypeParam) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pFuncTypeParam->type = funcType;
if (NULL == pScan->pFuncTypes) {
pScan->pFuncTypes = taosArrayInit(pScan->pScanCols->length, sizeof(SFunctParam));
if (NULL == pScan->pFuncTypes) {
taosMemoryFree(pFuncTypeParam);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
pFuncTypeParam->pCol = taosMemoryCalloc(1, sizeof(SColumn));
if (NULL == pFuncTypeParam->pCol) {
taosMemoryFree(pFuncTypeParam);
return TSDB_CODE_OUT_OF_MEMORY;
}
pFuncTypeParam->pCol->colId = pColNode->colId;
strcpy(pFuncTypeParam->pCol->name, pColNode->colName);
taosArrayPush(pScan->pFuncTypes, pFuncTypeParam);
taosMemoryFree(pFuncTypeParam);
return TSDB_CODE_SUCCESS;
}
static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized);
@ -2673,10 +2734,16 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
SNode* pNode = NULL;
SColumnNode* pPKTsCol = NULL;
SColumnNode* pNonPKCol = NULL;
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
pScan->scanType = SCAN_TYPE_LAST_ROW;
pScan->igLastNull = pAgg->hasLast ? true : false;
SArray* isDuplicateCol = taosArrayInit(pScan->pScanCols->length, sizeof(bool));
SNodeList* pLastRowCols = NULL;
FOREACH(pNode, pAgg->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
int32_t funcType = pFunc->funcType;
SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, 0);
if (FUNCTION_TYPE_LAST_ROW == funcType || FUNCTION_TYPE_LAST == funcType) {
int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName),
FUNCTION_TYPE_LAST_ROW == funcType ? "_cache_last_row" : "_cache_last");
@ -2686,6 +2753,61 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
nodesClearList(cxt.pLastCols);
return code;
}
cxt.funcType = pFunc->funcType;
// add duplicate cols which be removed for both last_row, last
if (pAgg->hasLast && pAgg->hasLastRow) {
if (QUERY_NODE_COLUMN == nodeType(pParamNode)) {
SNode* pColNode = NULL;
int i = 0;
FOREACH(pColNode, pScan->pScanCols) {
bool isDup = false;
bool* isDuplicate = taosArrayGet(isDuplicateCol, i);
if (NULL == isDuplicate) {
taosArrayInsert(isDuplicateCol, i, &isDup);
isDuplicate = taosArrayGet(isDuplicateCol, i);
}
i++;
if (nodesEqualNode(pParamNode, pColNode)) {
if (*isDuplicate) {
if (0 == strncmp(((SColumnNode*)pColNode)->colName, "#dup_col.", 9)) {
continue;
}
SNode* newColNode = nodesCloneNode(pColNode);
sprintf(((SColumnNode*)newColNode)->colName, "#dup_col.%p", newColNode);
sprintf(((SColumnNode*)pParamNode)->colName, "#dup_col.%p", newColNode);
nodesListAppend(pScan->pScanCols, newColNode);
isDup = true;
taosArrayInsert(isDuplicateCol, pScan->pScanCols->length, &isDup);
nodesListAppend(pScan->node.pTargets, nodesCloneNode(newColNode));
if (funcType != FUNCTION_TYPE_LAST) {
nodesListMakeAppend(&pLastRowCols, nodesCloneNode(newColNode));
}
lastRowScanBuildFuncTypes(pScan, (SColumnNode*)newColNode, pFunc->funcType);
} else {
isDup = true;
*isDuplicate = isDup;
if (funcType != FUNCTION_TYPE_LAST && !nodeListNodeEqual(cxt.pLastCols, pColNode)) {
nodesListMakeAppend(&pLastRowCols, nodesCloneNode(pColNode));
}
lastRowScanBuildFuncTypes(pScan, (SColumnNode*)pColNode, pFunc->funcType);
}
continue;
}else if (nodeListNodeEqual(pFunc->pParameterList, pColNode)) {
if (funcType != FUNCTION_TYPE_LAST && ((SColumnNode*)pColNode)->colId == PRIMARYKEY_TIMESTAMP_COL_ID &&
!nodeListNodeEqual(pLastRowCols, pColNode)) {
nodesListMakeAppend(&pLastRowCols, nodesCloneNode(pColNode));
lastRowScanBuildFuncTypes(pScan, (SColumnNode*)pColNode, pFunc->funcType);
isDup = true;
*isDuplicate = isDup;
}
}
}
}
}
if (FUNCTION_TYPE_LAST == funcType) {
nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt);
nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1));
@ -2707,15 +2829,13 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
}
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
pScan->scanType = SCAN_TYPE_LAST_ROW;
pScan->igLastNull = pAgg->hasLast ? true : false;
if (NULL != cxt.pLastCols) {
cxt.doAgg = false;
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true);
cxt.funcType = FUNCTION_TYPE_CACHE_LAST;
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true);
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false);
lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false);
lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols);
if (pPKTsCol && pScan->node.pTargets->length == 1) {
// when select last(ts),ts from ..., we add another ts to targets
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
@ -2728,10 +2848,12 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
}
nodesClearList(cxt.pLastCols);
}
pAgg->hasLastRow = false;
pAgg->hasLast = false;
pCxt->optimized = true;
taosArrayDestroy(isDuplicateCol);
return TSDB_CODE_SUCCESS;
}
@ -2749,7 +2871,7 @@ static bool splitCacheLastFuncOptMayBeOptimized(SLogicNode* pNode) {
}
bool hasOtherFunc = false;
if (!lastRowScanOptCheckFuncList(pNode, &hasOtherFunc)) {
if (!lastRowScanOptCheckFuncList(pNode, pScan->cacheLastMode, &hasOtherFunc)) {
return false;
}
@ -2770,6 +2892,16 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg,
pNew->hasLastRow = false;
pNew->hasLast = false;
SNode* pFuncNode = NULL;
FOREACH(pFuncNode, pFunc) {
SFunctionNode* pFunc = (SFunctionNode*)pFuncNode;
if (FUNCTION_TYPE_LAST_ROW == pFunc->funcType) {
pNew->hasLastRow = true;
} else if (FUNCTION_TYPE_LAST == pFunc->funcType) {
pNew->hasLast = true;
}
}
pNew->hasTimeLineFunc = pAgg->hasTimeLineFunc;
pNew->hasGroupKeyOptimized = false;
pNew->onlyHasKeepOrderFunc = pAgg->onlyHasKeepOrderFunc;
@ -2894,21 +3026,31 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan*
if (NULL == pAgg) {
return TSDB_CODE_SUCCESS;
}
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0);
SNode* pNode = NULL;
SNodeList* pAggFuncList = NULL;
{
bool hasLast = false;
bool hasLastRow = false;
WHERE_EACH(pNode, pAgg->pAggFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pNode;
int32_t funcType = pFunc->funcType;
if (FUNCTION_TYPE_LAST_ROW != funcType && FUNCTION_TYPE_LAST != funcType &&
FUNCTION_TYPE_SELECT_VALUE != funcType && FUNCTION_TYPE_GROUP_KEY != funcType) {
if (isNeedSplitCacheLastFunc(pFunc, pScan)) {
nodesListMakeStrictAppend(&pAggFuncList, nodesCloneNode(pNode));
ERASE_NODE(pAgg->pAggFuncs);
continue;
}
if (FUNCTION_TYPE_LAST_ROW == funcType ) {
hasLastRow = true;
} else if (FUNCTION_TYPE_LAST == funcType) {
hasLast = true;
}
WHERE_NEXT;
}
pAgg->hasLast = hasLast;
pAgg->hasLastRow = hasLastRow;
}
if (NULL == pAggFuncList) {

View File

@ -562,9 +562,36 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
pScan->groupSort = pScanLogicNode->groupSort;
pScan->ignoreNull = pScanLogicNode->igLastNull;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) {
pScan->pFuncTypes = taosArrayInit(taosArrayGetSize(pScanLogicNode->pFuncTypes), sizeof(int32_t));
if (NULL == pScan->pFuncTypes) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SNode* pTargetNode = NULL;
int funcTypeIndex = 0;
FOREACH(pTargetNode, ((SScanPhysiNode*)pScan)->pScanCols) {
if (((STargetNode*)pTargetNode)->pExpr->type != QUERY_NODE_COLUMN) {
continue;
}
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pTargetNode)->pExpr;
for (int i = 0; i < TARRAY_SIZE(pScanLogicNode->pFuncTypes); ++i) {
SFunctParam* pFunctParam = taosArrayGet(pScanLogicNode->pFuncTypes, i);
if (pColNode->colId == pFunctParam->pCol->colId &&
0 == strncmp(pColNode->colName, pFunctParam->pCol->name, strlen(pColNode->colName))) {
taosArrayInsert(pScan->pFuncTypes, funcTypeIndex, &pFunctParam->type);
break;
}
}
funcTypeIndex++;
}
}
return code;
}
static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,

View File

@ -422,6 +422,11 @@ e
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/leastsquares.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/leastsquares.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/length.py

View File

@ -55,7 +55,7 @@ if $rows != 1 then
return -1
endi
sql explain select count(*), last_row(f1), last(f1) from sta;
if $data00 != @-> Aggragate (functions=3 width=24 input_order=desc )@ then
if $data00 != @-> Merge (columns=3 width=24 input_order=unknown output_order=unknown mode=column)@ then
return -1
endi
sql_error select count(*), last_row(f1), min(f1), f1 from sta;

View File

@ -0,0 +1,660 @@
import datetime
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import tdDnodes
from math import inf
class TDTestCase:
def init(self, conn, logSql, replicaVer=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
def check_explain_res_has_row(self, plan_str_expect: str, rows, sql):
plan_found = False
for row in rows:
if str(row).find(plan_str_expect) >= 0:
tdLog.debug("plan: [%s] found in: [%s]" % (plan_str_expect, str(row)))
plan_found = True
break
if not plan_found:
tdLog.exit("plan: %s not found in res: [%s] in sql: %s" % (plan_str_expect, str(rows), sql))
def check_explain_res_no_row(self, plan_str_not_expect: str, res, sql):
for row in res:
if str(row).find(plan_str_not_expect) >= 0:
tdLog.exit('plan: [%s] found in: [%s] for sql: %s' % (plan_str_not_expect, str(row), sql))
def explain_sql(self, sql: str):
sql = "explain " + sql
tdSql.query(sql, queryTimes=1)
return tdSql.queryResult
def explain_and_check_res(self, sqls, hasLastRowScanRes):
for sql, has_last in zip(sqls, hasLastRowScanRes):
res = self.explain_sql(sql)
if has_last == 1:
self.check_explain_res_has_row("Last Row Scan", res, sql)
else:
self.check_explain_res_no_row("Last Row Scan", res, sql)
def none_model_test(self):
tdSql.execute("drop database if exists last_test_none_model ;")
tdSql.execute("create database last_test_none_model cachemodel 'none';")
tdSql.execute("use last_test_none_model;")
tdSql.execute("create stable last_test_none_model.st(ts timestamp, id int) tags(tid int);")
tdSql.execute("create table last_test_none_model.test_t1 using last_test_none_model.st tags(1);")
tdSql.execute("create table last_test_none_model.test_t2 using last_test_none_model.st tags(2);")
tdSql.execute("create table last_test_none_model.test_t3 using last_test_none_model.st tags(3);")
tdSql.execute("create table last_test_none_model.test_t4 using last_test_none_model.st tags(4);")
maxRange = 100
# 2023-11-13 00:00:00.000
startTs = 1699804800000
for i in range(maxRange):
insertSqlString = "insert into last_test_none_model.test_t1 values(%d, %d);" % (startTs + i, i)
tdSql.execute(insertSqlString)
last_ts = startTs + maxRange
tdSql.execute("insert into last_test_none_model.test_t1 (ts) values(%d)" % (last_ts))
sql = f'select last_row(ts), last(*) from last_test_none_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Last Row Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_none_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Last Row Scan", explain_res, sql)
sql = f'select last(*), last_row(ts), count(*) from last_test_none_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, maxRange - 1)
tdSql.checkData(0, 2, last_ts)
tdSql.checkData(0, 3, maxRange + 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Last Row Scan", explain_res, sql)
tdSql.error(f'select last(*), last_row(ts), ts from last_test_none_model.test_t1;')
sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_none_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, maxRange + 1)
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, maxRange - 1)
tdSql.checkData(0, 5, last_ts)
tdSql.checkData(0, 6, maxRange - 1)
startTs2 = startTs + 86400000
for i in range(maxRange):
i = i + 2 * maxRange
insertSqlString = "insert into last_test_none_model.test_t2 values(%d, %d);" % (startTs2 + i, i)
tdSql.execute(insertSqlString)
last_ts2 = startTs2 + maxRange
startTs3 = startTs + 2 * 86400000
for i in range(maxRange):
i = i + 3 * maxRange
insertSqlString = "insert into last_test_none_model.test_t3 values(%d, %d);" % (startTs3 + i, i)
tdSql.execute(insertSqlString)
last_ts3 = startTs3 + 4 * maxRange - 1
startTs4 = startTs + 3 * 86400000
for i in range(maxRange):
i = i + 4 * maxRange
insertSqlString = "insert into last_test_none_model.test_t4 values(%d, %d);" % (startTs4 + i, i)
tdSql.execute(insertSqlString)
last_ts4 = startTs4 + 5 * maxRange - 1
sql = f'select last_row(ts), last(*) from last_test_none_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 5 * maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Last Row Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_none_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 5 * maxRange - 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Last Row Scan", explain_res, sql)
sql = f'select last(*), last_row(ts), count(*) from last_test_none_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, 5 * maxRange - 1)
tdSql.checkData(0, 2, last_ts4)
tdSql.checkData(0, 3, 4 * maxRange + 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_no_row("Last Row Scan", explain_res, sql)
tdSql.error(f'select last(*), last_row(ts), ts from last_test_none_model.st;')
sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_none_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 4 * maxRange + 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
tdSql.checkData(0, 4, 5 * maxRange - 1)
tdSql.checkData(0, 5, last_ts4)
tdSql.checkData(0, 6, 5 * maxRange - 1)
sql = f'select last_row(1), last(2), count(*) , last_row(id), last(id), last(*) from last_test_none_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 2)
tdSql.checkData(0, 2, 4 * maxRange + 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
tdSql.checkData(0, 4, 5 * maxRange - 1)
tdSql.checkData(0, 5, last_ts4)
tdSql.checkData(0, 6, 5 * maxRange - 1)
tdSql.execute("drop table if exists last_test_none_model.test_t4 ;")
tdSql.execute("drop table if exists last_test_none_model.test_t3 ;")
tdSql.execute("drop table if exists last_test_none_model.test_t2 ;")
tdSql.execute("drop table if exists last_test_none_model.test_t1 ;")
tdSql.execute("drop stable if exists last_test_none_model.st;")
tdSql.execute("drop database if exists last_test_none_model;")
def last_value_model_test(self):
tdSql.execute("create database last_test_last_value_model cachemodel 'last_value' ;")
tdSql.execute("use last_test_last_value_model;")
tdSql.execute("create stable last_test_last_value_model.st(ts timestamp, id int) tags(tid int);")
tdSql.execute("create table last_test_last_value_model.test_t1 using last_test_last_value_model.st tags(1);")
tdSql.execute("create table last_test_last_value_model.test_t2 using last_test_last_value_model.st tags(2);")
tdSql.execute("create table last_test_last_value_model.test_t3 using last_test_last_value_model.st tags(3);")
tdSql.execute("create table last_test_last_value_model.test_t4 using last_test_last_value_model.st tags(4);")
maxRange = 100
# 2023-11-13 00:00:00.000
startTs = 1699804800000
for i in range(maxRange):
insertSqlString = "insert into last_test_last_value_model.test_t1 values(%d, %d);" % (startTs + i, i)
tdSql.execute(insertSqlString)
last_ts = startTs + maxRange
tdSql.execute("insert into last_test_last_value_model.test_t1 (ts) values(%d)" % (last_ts))
sql = f'select last_row(ts), last(*) from last_test_last_value_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_last_value_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last(*), last_row(ts), count(*) from last_test_last_value_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, maxRange - 1)
tdSql.checkData(0, 2, last_ts)
tdSql.checkData(0, 3, maxRange + 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_last_value_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, maxRange + 1)
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, maxRange - 1)
tdSql.checkData(0, 5, last_ts)
tdSql.checkData(0, 6, maxRange - 1)
startTs2 = startTs + 86400000
for i in range(maxRange):
i = i + 2 * maxRange
insertSqlString = "insert into last_test_last_value_model.test_t2 values(%d, %d);" % (startTs2 + i, i)
tdSql.execute(insertSqlString)
last_ts2 = startTs2 + maxRange
startTs3 = startTs + 2 * 86400000
for i in range(maxRange):
i = i + 3 * maxRange
insertSqlString = "insert into last_test_last_value_model.test_t3 values(%d, %d);" % (startTs3 + i, i)
tdSql.execute(insertSqlString)
last_ts3 = startTs3 + 4 * maxRange - 1
startTs4 = startTs + 3 * 86400000
for i in range(maxRange):
i = i + 4 * maxRange
insertSqlString = "insert into last_test_last_value_model.test_t4 values(%d, %d);" % (startTs4 + i, i)
tdSql.execute(insertSqlString)
last_ts4 = startTs4 + 5 * maxRange - 1
sql = f'select last_row(ts), last(*) from last_test_last_value_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 5 * maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_last_value_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 5 * maxRange - 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last(*), last_row(ts), count(*) from last_test_last_value_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, 5 * maxRange - 1)
tdSql.checkData(0, 2, last_ts4)
tdSql.checkData(0, 3, 4 * maxRange + 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
tdSql.error(f'select last(*), last_row(ts), ts from last_test_last_value_model.st;')
sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_last_value_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 4 * maxRange + 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
tdSql.checkData(0, 4, 5 * maxRange - 1)
tdSql.checkData(0, 5, last_ts4)
tdSql.checkData(0, 6, 5 * maxRange - 1)
sql = f'select last_row(1), last(2), count(*) , last_row(id), last(id), last(*) from last_test_last_value_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 2)
tdSql.checkData(0, 2, 4 * maxRange + 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
tdSql.checkData(0, 4, 5 * maxRange - 1)
tdSql.checkData(0, 5, last_ts4)
tdSql.checkData(0, 6, 5 * maxRange - 1)
tdSql.execute("drop table if exists last_test_last_value_model.test_t4 ;")
tdSql.execute("drop table if exists last_test_last_value_model.test_t3 ;")
tdSql.execute("drop table if exists last_test_last_value_model.test_t2 ;")
tdSql.execute("drop table if exists last_test_last_value_model.test_t1 ;")
tdSql.execute("drop stable if exists last_test_last_value_model.st;")
tdSql.execute("drop database if exists last_test_last_value_model;")
def last_row_model_test(self):
tdSql.execute("create database last_test_last_row_model cachemodel 'last_row';")
tdSql.execute("use last_test_last_row_model;")
tdSql.execute("create stable last_test_last_row_model.st(ts timestamp, id int) tags(tid int);")
tdSql.execute("create table last_test_last_row_model.test_t1 using last_test_last_row_model.st tags(1);")
tdSql.execute("create table last_test_last_row_model.test_t2 using last_test_last_row_model.st tags(2);")
tdSql.execute("create table last_test_last_row_model.test_t3 using last_test_last_row_model.st tags(3);")
tdSql.execute("create table last_test_last_row_model.test_t4 using last_test_last_row_model.st tags(4);")
maxRange = 100
# 2023-11-13 00:00:00.000
startTs = 1699804800000
for i in range(maxRange):
insertSqlString = "insert into last_test_last_row_model.test_t1 values(%d, %d);" % (startTs + i, i)
tdSql.execute(insertSqlString)
last_ts = startTs + maxRange
tdSql.execute("insert into last_test_last_row_model.test_t1 (ts) values(%d)" % (last_ts))
sql = f'select last_row(ts), last(*) from last_test_last_row_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_last_row_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last(*), last_row(ts), count(*) from last_test_last_row_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, maxRange - 1)
tdSql.checkData(0, 2, last_ts)
tdSql.checkData(0, 3, maxRange + 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_last_row_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, maxRange + 1)
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, maxRange - 1)
tdSql.checkData(0, 5, last_ts)
tdSql.checkData(0, 6, maxRange - 1)
startTs2 = startTs + 86400000
for i in range(maxRange):
i = i + 2 * maxRange
insertSqlString = "insert into last_test_last_row_model.test_t2 values(%d, %d);" % (startTs2 + i, i)
tdSql.execute(insertSqlString)
last_ts2 = startTs2 + maxRange
startTs3 = startTs + 2 * 86400000
for i in range(maxRange):
i = i + 3 * maxRange
insertSqlString = "insert into last_test_last_row_model.test_t3 values(%d, %d);" % (startTs3 + i, i)
tdSql.execute(insertSqlString)
last_ts3 = startTs3 + 4 * maxRange - 1
startTs4 = startTs + 3 * 86400000
for i in range(maxRange):
i = i + 4 * maxRange
insertSqlString = "insert into last_test_last_row_model.test_t4 values(%d, %d);" % (startTs4 + i, i)
tdSql.execute(insertSqlString)
last_ts4 = startTs4 + 5 * maxRange - 1
sql = f'select last_row(ts), last(*) from last_test_last_row_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 5 * maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_last_row_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 5 * maxRange - 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
sql = f'select last(*), last_row(ts), count(*) from last_test_last_row_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, 5 * maxRange - 1)
tdSql.checkData(0, 2, last_ts4)
tdSql.checkData(0, 3, 4 * maxRange + 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_has_row("Table Scan", explain_res, sql)
tdSql.error(f'select last(*), last_row(ts), ts from last_test_last_row_model.st;')
sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_last_row_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 4 * maxRange + 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
tdSql.checkData(0, 4, 5 * maxRange - 1)
tdSql.checkData(0, 5, last_ts4)
tdSql.checkData(0, 6, 5 * maxRange - 1)
sql = f'select last_row(1), last(2), count(*) , last_row(id), last(id), last(*) from last_test_last_row_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 2)
tdSql.checkData(0, 2, 4 * maxRange + 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
tdSql.checkData(0, 4, 5 * maxRange - 1)
tdSql.checkData(0, 5, last_ts4)
tdSql.checkData(0, 6, 5 * maxRange - 1)
tdSql.execute("drop table if exists last_test_last_row_model.test_t4 ;")
tdSql.execute("drop table if exists last_test_last_row_model.test_t3 ;")
tdSql.execute("drop table if exists last_test_last_row_model.test_t2 ;")
tdSql.execute("drop table if exists last_test_last_row_model.test_t1 ;")
tdSql.execute("drop stable if exists last_test_last_row_model.st;")
tdSql.execute("drop database if exists last_test_last_row_model;")
def both_model_test(self):
tdSql.execute("create database last_test_both_model cachemodel 'both';")
tdSql.execute("use last_test_both_model;")
tdSql.execute("create stable last_test_both_model.st(ts timestamp, id int) tags(tid int);")
tdSql.execute("create table last_test_both_model.test_t1 using last_test_both_model.st tags(1);")
tdSql.execute("create table last_test_both_model.test_t2 using last_test_both_model.st tags(2);")
tdSql.execute("create table last_test_both_model.test_t3 using last_test_both_model.st tags(3);")
tdSql.execute("create table last_test_both_model.test_t4 using last_test_both_model.st tags(4);")
maxRange = 100
# 2023-11-13 00:00:00.000
startTs = 1699804800000
for i in range(maxRange):
insertSqlString = "insert into last_test_both_model.test_t1 values(%d, %d);" % (startTs + i, i)
tdSql.execute(insertSqlString)
last_ts = startTs + maxRange
tdSql.execute("insert into last_test_both_model.test_t1 (ts) values(%d)" % (last_ts))
sql = f'select last_row(ts), last(*) from last_test_both_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_no_row("Table Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_both_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, None)
tdSql.checkData(0, 3, maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_no_row("Table Scan", explain_res, sql)
sql = f'select last(*), last_row(ts), count(*) from last_test_both_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, maxRange - 1)
tdSql.checkData(0, 2, last_ts)
tdSql.checkData(0, 3, maxRange + 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_both_model.test_t1;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts)
tdSql.checkData(0, 1, last_ts)
tdSql.checkData(0, 2, maxRange + 1)
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, maxRange - 1)
tdSql.checkData(0, 5, last_ts)
tdSql.checkData(0, 6, maxRange - 1)
tdSql.error(f'select last(*), last_row(ts), ts from last_test_both_model.test_t1;')
startTs2 = startTs + 86400000
for i in range(maxRange):
i = i + 2 * maxRange
insertSqlString = "insert into last_test_both_model.test_t2 values(%d, %d);" % (startTs2 + i, i)
tdSql.execute(insertSqlString)
last_ts2 = startTs2 + maxRange
startTs3 = startTs + 2 * 86400000
for i in range(maxRange):
i = i + 3 * maxRange
insertSqlString = "insert into last_test_both_model.test_t3 values(%d, %d);" % (startTs3 + i, i)
tdSql.execute(insertSqlString)
last_ts3 = startTs3 + 4 * maxRange - 1
startTs4 = startTs + 3 * 86400000
for i in range(maxRange):
i = i + 4 * maxRange
insertSqlString = "insert into last_test_both_model.test_t4 values(%d, %d);" % (startTs4 + i, i)
tdSql.execute(insertSqlString)
last_ts4 = startTs4 + 5 * maxRange - 1
sql = f'select last_row(ts), last(*) from last_test_both_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 5 * maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_no_row("Table Scan", explain_res, sql)
sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_both_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 5 * maxRange - 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
self.check_explain_res_no_row("Table Scan", explain_res, sql)
sql = f'select last(*), last_row(ts), count(*) from last_test_both_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, 5 * maxRange - 1)
#tdSql.checkData(0, 2, last_ts4)
tdSql.checkData(0, 3, 4 * maxRange + 1)
explain_res = self.explain_sql(sql)
self.check_explain_res_has_row("Last Row Scan", explain_res, sql)
tdSql.error(f'select last(*), last_row(ts), ts from last_test_both_model.st;')
sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_both_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, last_ts4)
tdSql.checkData(0, 1, last_ts4)
tdSql.checkData(0, 2, 4 * maxRange + 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
tdSql.checkData(0, 4, 5 * maxRange - 1)
tdSql.checkData(0, 5, last_ts4)
tdSql.checkData(0, 6, 5 * maxRange - 1)
sql = f'select last_row(1), last(2), count(*) , last_row(id), last(id), last(*) from last_test_both_model.st;'
tdSql.query(sql)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 2)
tdSql.checkData(0, 2, 4 * maxRange + 1)
tdSql.checkData(0, 3, 5 * maxRange - 1)
tdSql.checkData(0, 4, 5 * maxRange - 1)
tdSql.checkData(0, 5, last_ts4)
tdSql.checkData(0, 6, 5 * maxRange - 1)
tdSql.execute("drop table if exists last_test_both_model.test_t4 ;")
tdSql.execute("drop table if exists last_test_both_model.test_t3 ;")
tdSql.execute("drop table if exists last_test_both_model.test_t2 ;")
tdSql.execute("drop table if exists last_test_both_model.test_t1 ;")
tdSql.execute("drop stable if exists last_test_both_model.st;")
tdSql.execute("drop database if exists last_test_both_model;")
def run(self):
self.none_model_test()
self.last_value_model_test()
self.last_row_model_test()
self.both_model_test()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -250,7 +250,7 @@ class TDTestCase:
"last_row(c1), last(c1)",
"last_row(c1), c1,c3, ts"
]
has_last_row_scan_res = [0,0,1]
has_last_row_scan_res = [1,1,1]
sqls = self.format_sqls(sql_template, select_items)
self.explain_and_check_res(sqls, has_last_row_scan_res)
#res_expect = [None, None, [999, 999, 499, "2018-11-25 19:30:00.000"]]