diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 1dbc8f2f76..669340f9e5 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -189,7 +189,8 @@ typedef struct TsdReader { typedef struct SStoreCacheReader { int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); + SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr, + SArray *pFuncTypeList); void *(*closeReader)(void *pReader); int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUidList); diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 7fbdbfb211..891084419b 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -149,6 +149,8 @@ void nodesRewriteExprPostOrder(SNode** pNode, FNodeRewriter rewriter, void* pCon void nodesRewriteExprsPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* pContext); bool nodesEqualNode(const SNode* a, const SNode* b); +bool nodeListNodeEqual(const SNodeList* a, const SNode* b); + bool nodesMatchNode(const SNode* pSub, const SNode* pNode); SNode* nodesCloneNode(const SNode* pNode); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index f0383e8168..6cb83ebb51 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -120,6 +120,7 @@ typedef struct SScanLogicNode { bool onlyMetaCtbIdx; // for tag scan with no tbname bool filesetDelimited; // returned blocks delimited by fileset bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname + SArray* pFuncTypes; // for last, last_row } SScanLogicNode; typedef struct SJoinLogicNode { @@ -401,6 +402,7 @@ typedef struct SLastRowScanPhysiNode { bool groupSort; bool ignoreNull; SNodeList* pTargets; + SArray* pFuncTypes; } SLastRowScanPhysiNode; typedef SLastRowScanPhysiNode STableCountScanPhysiNode; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9a4e2edf8d..97cf0ffebc 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -175,7 +175,8 @@ void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn not int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, - SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr); + SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr, + SArray* pFuncTypeList); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 9b29329cf3..d86219542f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "cos.h" +#include "functionMgt.h" #include "tsdb.h" #include "tsdbDataFileRW.h" #include "tsdbReadUtil.h" @@ -894,19 +895,56 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr } int num_keys = TARRAY_SIZE(remainCols); - int16_t *aCols = taosMemoryMalloc(num_keys * sizeof(int16_t)); int16_t *slotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + int16_t *lastColIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + int16_t *lastSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + int16_t *lastrowColIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + int16_t *lastrowSlotIds = taosMemoryMalloc(num_keys * sizeof(int16_t)); + SArray* lastTmpColArray = NULL; + SArray* lastTmpIndexArray = NULL; + SArray* lastrowTmpColArray = NULL; + SArray* lastrowTmpIndexArray = NULL; + + int lastIndex = 0; + int lastrowIndex = 0; + for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = taosArrayGet(remainCols, i); - aCols[i] = idxKey->key.cid; slotIds[i] = pr->pSlotIds[idxKey->idx]; + if (idxKey->key.ltype == CACHESCAN_RETRIEVE_LAST >> 3) { + if(NULL == lastTmpIndexArray) { + lastTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); + } + taosArrayPush(lastTmpIndexArray, &(i)); + lastColIds[lastIndex] = idxKey->key.cid; + lastSlotIds[lastIndex] = pr->pSlotIds[idxKey->idx]; + lastIndex++; + } else { + if(NULL == lastrowTmpIndexArray) { + lastrowTmpIndexArray = taosArrayInit(num_keys, sizeof(int32_t)); + } + taosArrayPush(lastrowTmpIndexArray, &(i)); + lastrowColIds[lastrowIndex] = idxKey->key.cid; + lastrowSlotIds[lastrowIndex] = pr->pSlotIds[idxKey->idx]; + lastrowIndex++; + } } - if (ltype) { - mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds); - } else { - mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, num_keys, slotIds); + pTmpColArray = taosArrayInit(lastIndex + lastrowIndex, sizeof(SLastCol)); + + if(lastTmpIndexArray != NULL) { + mergeLastCid(uid, pTsdb, &lastTmpColArray, pr, lastColIds, lastIndex, lastSlotIds); + for(int i = 0; i < taosArrayGetSize(lastTmpColArray); i++) { + taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastTmpIndexArray, i), taosArrayGet(lastTmpColArray, i)); + } + } + + if(lastrowTmpIndexArray != NULL) { + mergeLastCid(uid, pTsdb, &lastrowTmpColArray, pr, lastrowColIds, lastrowIndex, lastrowSlotIds); + for(int i = 0; i < taosArrayGetSize(lastrowTmpColArray); i++) { + taosArrayInsert(pTmpColArray, *(int32_t*)taosArrayGet(lastrowTmpIndexArray, i), taosArrayGet(lastrowTmpColArray, i)); + } } SLRUCache *pCache = pTsdb->lruCache; @@ -965,9 +1003,18 @@ static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArr rocksMayWrite(pTsdb, false, true, false); } + taosArrayDestroy(lastrowTmpIndexArray); + taosArrayDestroy(lastrowTmpColArray); + taosArrayDestroy(lastTmpIndexArray); + taosArrayDestroy(lastTmpColArray); + + taosMemoryFree(lastColIds); + taosMemoryFree(lastSlotIds); + taosMemoryFree(lastrowColIds); + taosMemoryFree(lastrowSlotIds); + taosArrayDestroy(pTmpColArray); - taosMemoryFree(aCols); taosMemoryFree(slotIds); return code; @@ -1057,6 +1104,15 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache int16_t cid = ((int16_t *)TARRAY_DATA(pCidList))[i]; SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}; + // for select last_row, last case + int32_t funcType = FUNCTION_TYPE_CACHE_LAST; + if (pr->pFuncTypeList != NULL && taosArrayGetSize(pr->pFuncTypeList) > i) { + funcType = ((int32_t *)TARRAY_DATA(pr->pFuncTypeList))[i]; + } + if (((pr->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) && FUNCTION_TYPE_CACHE_LAST_ROW == funcType) { + int8_t tempType = CACHESCAN_RETRIEVE_LAST_ROW | (pr->type ^ CACHESCAN_RETRIEVE_LAST); + key->ltype = (tempType & CACHESCAN_RETRIEVE_LAST) >> 3; + } LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN); if (h) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index f668ea5f72..3df2bb20d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "functionMgt.h" #include "taoserror.h" #include "tarray.h" #include "tcommon.h" @@ -33,31 +34,69 @@ static void setFirstLastResColToNull(SColumnInfoData* pCol, int32_t row) { taosMemoryFree(buf); } +static void saveOneRowForLastRaw(SLastCol* pColVal, SCacheRowsReader* pReader, const int32_t slotId, + SColumnInfoData* pColInfoData, int32_t numOfRows) { + SColVal* pVal = &pColVal->colVal; + + // allNullRow = false; + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { + colDataSetNULL(pColInfoData, numOfRows); + } else { + varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData); + + memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData); + colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); + } + } else { + colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal)); + } + return; +} + static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, const int32_t* dstSlotIds, void** pRes, const char* idStr) { int32_t numOfRows = pBlock->info.rows; // bool allNullRow = true; if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { + uint64_t ts = TSKEY_MIN; SFirstLastRes* p = NULL; col_id_t colId = -1; + + SArray* funcTypeBlockArray = taosArrayInit(pReader->numOfCols, sizeof(int32_t)); for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); + int32_t funcType = FUNCTION_TYPE_CACHE_LAST; + if (pReader->pFuncTypeList != NULL && taosArrayGetSize(pReader->pFuncTypeList) > i) { + funcType = *(int32_t*)taosArrayGet(pReader->pFuncTypeList, i); + } + taosArrayInsert(funcTypeBlockArray, dstSlotIds[i], taosArrayGet(pReader->pFuncTypeList, i)); + if (slotIds[i] == -1) { + if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) { + colDataSetNULL(pColInfoData, numOfRows); + continue; + } setFirstLastResColToNull(pColInfoData, numOfRows); continue; } int32_t slotId = slotIds[i]; SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); colId = pColVal->colVal.cid; + + if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) { + saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows); + continue; + } + p = (SFirstLastRes*)varDataVal(pRes[i]); p->ts = pColVal->ts; ts = p->ts; p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); // allNullRow = p->isNull & allNullRow; - if (!p->isNull) { if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { varDataSetLen(p->buf, pColVal->colVal.value.nData); @@ -77,6 +116,13 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p } for (int32_t idx = 0; idx < taosArrayGetSize(pBlock->pDataBlock); ++idx) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, idx); + if (idx < funcTypeBlockArray->size) { + int32_t funcType = *(int32_t*)taosArrayGet(funcTypeBlockArray, idx); + if (FUNCTION_TYPE_CACHE_LAST_ROW == funcType) { + continue; + } + } + if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) { if (ts == TSKEY_MIN) { colDataSetNULL(pCol, numOfRows); @@ -95,6 +141,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p // pBlock->info.rows += allNullRow ? 0 : 1; ++pBlock->info.rows; + taosArrayDestroy(funcTypeBlockArray); } else if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW)) { for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); @@ -105,21 +152,8 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p continue; } SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); - SColVal* pVal = &pColVal->colVal; - // allNullRow = false; - if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { - colDataSetNULL(pColInfoData, numOfRows); - } else { - varDataSetLen(pReader->transferBuf[slotId], pVal->value.nData); - - memcpy(varDataVal(pReader->transferBuf[slotId]), pVal->value.pData, pVal->value.nData); - colDataSetVal(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); - } - } else { - colDataSetVal(pColInfoData, numOfRows, (const char*)&pVal->value.val, !COL_VAL_IS_VALUE(pVal)); - } + saveOneRowForLastRaw(pColVal, pReader, slotId, pColInfoData, numOfRows); } // pBlock->info.rows += allNullRow ? 0 : 1; @@ -175,7 +209,8 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf } int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, - SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr) { + SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr, + SArray* pFuncTypeList) { *pReader = NULL; SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { @@ -190,6 +225,7 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, p->numOfCols = numOfCols; p->pCidList = pCidList; p->pSlotIds = pSlotIds; + p->pFuncTypeList = pFuncTypeList; if (numOfTables == 0) { *pReader = p; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 16ee90823b..c27e9ebe04 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -348,6 +348,7 @@ typedef struct SCacheRowsReader { STsdbReadSnap* pReadSnap; char* idstr; int64_t lastTs; + SArray* pFuncTypeList; } SCacheRowsReader; int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 640ed2f2f9..92de5c4364 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -82,6 +82,7 @@ typedef struct SColMatchItem { int32_t dstSlotId; bool needOutput; SDataType dataType; + int32_t funcType; } SColMatchItem; typedef struct SColMatchInfo { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 9d4c20493a..63fcfba7c1 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -21,6 +21,7 @@ #include "tmsg.h" #include "executorInt.h" +#include "functionMgt.h" #include "operator.h" #include "querytask.h" #include "tcompare.h" @@ -44,6 +45,7 @@ typedef struct SCacheRowsScanInfo { SArray* pCidList; int32_t indexOfBufferedRes; STableListInfo* pTableList; + SArray* pFuncTypeList; } SCacheRowsScanInfo; static SSDataBlock* doScanCache(SOperatorInfo* pOperator); @@ -105,9 +107,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe } SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t)); + pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t)); + taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes); + for (int i = 0; i < TARRAY_SIZE(pInfo->matchInfo.pList); ++i) { SColMatchItem* pColInfo = taosArrayGet(pInfo->matchInfo.pList, i); taosArrayPush(pCidList, &pColInfo->colId); + if (pInfo->pFuncTypeList != NULL && taosArrayGetSize(pInfo->pFuncTypeList) > i) { + pColInfo->funcType = *(int32_t*)taosArrayGet(pInfo->pFuncTypeList, i); + } } pInfo->pCidList = pCidList; @@ -132,7 +140,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe uint64_t suid = tableListGetSuid(pTableListInfo); code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds, - suid, &pInfo->pLastrowReader, pTaskInfo->id.str); + suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pScanNode->pFuncTypes); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -274,7 +282,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (NULL == pInfo->pLastrowReader) { code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, - pTaskInfo->id.str); + pTaskInfo->id.str, pInfo->pFuncTypeList); if (code != TSDB_CODE_SUCCESS) { pInfo->currentGroupIndex += 1; taosArrayClear(pInfo->pUidList); @@ -333,6 +341,7 @@ void destroyCacheScanOperator(void* param) { taosMemoryFree(pInfo->pSlotIds); taosMemoryFree(pInfo->pDstSlotIds); taosArrayDestroy(pInfo->pCidList); + taosArrayDestroy(pInfo->pFuncTypeList); taosArrayDestroy(pInfo->pUidList); taosArrayDestroy(pInfo->matchInfo.pList); tableListDestroy(pInfo->pTableList); @@ -405,6 +414,8 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId); if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) { taosArrayPush(pMatchInfo, pColInfo); + } else if (FUNCTION_TYPE_CACHE_LAST_ROW == pColInfo->funcType){ + taosArrayPush(pMatchInfo, pColInfo); } } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index efab09f02b..1da7818c3c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1343,7 +1343,6 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod c.colId = pColNode->colId; c.srcSlotId = pColNode->slotId; c.dstSlotId = pNode->slotId; - c.dataType = pColNode->node.resType; taosArrayPush(pList, &c); } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index d27daad550..eb84cb0639 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -621,6 +621,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo } else { pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); } + + if(pRes == NULL) { + st = taosGetTimestampUs(); + } int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold; if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 390190e8db..daf409070a 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2125,7 +2125,7 @@ bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { } static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) { - if (pTsColInfo == NULL) { + if (pTsColInfo == NULL || pTsColInfo->pData == NULL) { return 0; } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index c68fd81d22..c090cb4b63 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -241,6 +241,29 @@ static SVgroupsInfo* vgroupsInfoClone(const SVgroupsInfo* pSrc) { return pDst; } +static SArray* functParamClone(const SArray* pSrc) { + int32_t len = sizeof(SArray) + pSrc->capacity * pSrc->elemSize; + + SArray* pDst = taosArrayInit(pSrc->capacity, pSrc->elemSize); + if (NULL == pDst) { + return NULL; + } + for (int i = 0; i < TARRAY_SIZE(pSrc); ++i) { + SFunctParam* pFunctParam = taosArrayGet(pSrc, i); + SFunctParam* pNewFunctParam = (SFunctParam*)taosArrayPush(pDst, pFunctParam); + + if (NULL == pNewFunctParam) { + return NULL; + } + pNewFunctParam->type = pFunctParam->type; + pNewFunctParam->pCol = taosMemoryCalloc(1, sizeof(SColumn)); + memcpy(pNewFunctParam->pCol, pFunctParam->pCol, sizeof(SColumn)); + } + + return pDst; +} + + static int32_t realTableNodeCopy(const SRealTableNode* pSrc, SRealTableNode* pDst) { COPY_BASE_OBJECT_FIELD(table, tableNodeCopy); CLONE_OBJECT_FIELD(pMeta, tableMetaClone); @@ -425,6 +448,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(onlyMetaCtbIdx); COPY_SCALAR_FIELD(filesetDelimited); COPY_SCALAR_FIELD(isCountByTag); + CLONE_OBJECT_FIELD(pFuncTypes, functParamClone); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 402a6c6e3d..6696aab4c1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1784,6 +1784,24 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags"; static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort"; static const char* jkLastRowScanPhysiPlanTargets = "Targets"; +static const char* jkLastRowScanPhysiPlanFuncType = "FuncType"; +static const char* jkLastRowScanPhysiPlanFuncTypes = "FuncTypes"; + +static int32_t funcTypeToJson(const void* pObj, SJson* pJson) { + const int32_t* pNode = (const int32_t*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkLastRowScanPhysiPlanFuncType, *pNode); + return code; +} + +static int32_t jsonToFuncType(const SJson* pJson, void* pObj) { + int32_t* pNode = (int32_t*)pObj; + + int32_t code = tjsonGetIntValue(pJson, jkLastRowScanPhysiPlanFuncType, pNode); + return code; +} + + static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj; @@ -1798,6 +1816,9 @@ static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkLastRowScanPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddTArray(pJson, jkLastRowScanPhysiPlanFuncTypes, funcTypeToJson, pNode->pFuncTypes); + } return code; } @@ -1815,6 +1836,9 @@ static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToTArray(pJson, jkLastRowScanPhysiPlanFuncTypes, jsonToFuncType, &pNode->pFuncTypes, sizeof(int32_t)); + } return code; } diff --git a/source/libs/nodes/src/nodesEqualFuncs.c b/source/libs/nodes/src/nodesEqualFuncs.c index f755b8cb8c..241da85267 100644 --- a/source/libs/nodes/src/nodesEqualFuncs.c +++ b/source/libs/nodes/src/nodesEqualFuncs.c @@ -194,3 +194,21 @@ bool nodesEqualNode(const SNode* a, const SNode* b) { return false; } + + bool nodeListNodeEqual(const SNodeList* a, const SNode* b) { + if (NULL == a || NULL == b) { + return false; + } + + if (LIST_LENGTH(a) < 1) { + return false; + } + + SNode *na; + FOREACH(na, a) { + if (nodesEqualNode(na, b)) { + return true; + } + } + return false; +} diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index b36e2695f6..3cff7c76aa 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -65,10 +65,14 @@ typedef int32_t (*FSetObject)(STlv* pTlv, void* pObj); static int32_t nodeToMsg(const void* pObj, STlvEncoder* pEncoder); static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder); +static int32_t SArrayToMsg(const void* pObj, STlvEncoder* pEncoder); + static int32_t msgToNode(STlvDecoder* pDecoder, void** pObj); static int32_t msgToNodeFromTlv(STlv* pTlv, void** pObj); static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj); static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj); +static int32_t msgToSArray(STlv* pTlv, void** pObj); + static int32_t initTlvEncoder(STlvEncoder* pEncoder) { pEncoder->allocSize = NODES_MSG_DEFAULT_LEN; @@ -2053,7 +2057,8 @@ enum { PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, PHY_LAST_ROW_SCAN_CODE_IGNULL, - PHY_LAST_ROW_SCAN_CODE_TARGETS + PHY_LAST_ROW_SCAN_CODE_TARGETS, + PHY_LAST_ROW_SCAN_CODE_FUNCTYPES }; static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2072,6 +2077,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_FUNCTYPES, SArrayToMsg, pNode->pFuncTypes); + } return code; } @@ -2098,6 +2106,10 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) { case PHY_LAST_ROW_SCAN_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_LAST_ROW_SCAN_CODE_FUNCTYPES: + code = msgToSArray(pTlv, (void**)&pNode->pFuncTypes); + break; + default: break; } @@ -4391,6 +4403,31 @@ static int32_t nodeListToMsg(const void* pObj, STlvEncoder* pEncoder) { return TSDB_CODE_SUCCESS; } +enum { + SARRAY_CODE_CAPACITY = 1, + SARRAY_CODE_ELEMSIZE, + SARRAY_CODE_SIZE, + SARRAY_CODE_PDATA +}; + +static int32_t SArrayToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SArray* pArray = (const SArray*)pObj; + int32_t code = TSDB_CODE_SUCCESS; + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SARRAY_CODE_CAPACITY, pArray->capacity); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SARRAY_CODE_ELEMSIZE, pArray->elemSize); + } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI32(pEncoder, SARRAY_CODE_SIZE, pArray->size); + } + if (TSDB_CODE_SUCCESS == code && pArray->capacity * pArray->elemSize > 0 && pArray->pData != NULL) { + code = tlvEncodeBinary(pEncoder, SARRAY_CODE_PDATA, pArray->pData, pArray->capacity * pArray->elemSize); + } + return code; +} + static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) { SNodeList* pList = nodesMakeList(); @@ -4411,6 +4448,67 @@ static int32_t msgToNodeList(STlvDecoder* pDecoder, void** pObj) { return code; } +static int32_t msgToSArray(STlv* pTlv, void** pObj){ + SArray* pArray = NULL; + uint32_t capacity = 0; + uint32_t elemSize = 0; + uint32_t actualSize; + int32_t decodeFieldNum = 0;; + int32_t code = TSDB_CODE_SUCCESS; + STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; + STlv* pTlvTemp = NULL; + STlv* pDataTlv = NULL; + + tlvForEach(&decoder, pTlvTemp, code) { + switch (pTlvTemp->type) { + case SARRAY_CODE_CAPACITY: + code = tlvDecodeI32(pTlvTemp, &capacity); + break; + case SARRAY_CODE_ELEMSIZE: + code = tlvDecodeI32(pTlvTemp, &elemSize); + break; + case SARRAY_CODE_SIZE: + code = tlvDecodeI32(pTlvTemp, &actualSize); + break; + case SARRAY_CODE_PDATA: + if (decodeFieldNum < 3) { + pDataTlv = pTlvTemp; + break; + } + pArray = taosArrayInit(capacity, elemSize); + if (NULL == pArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pArray->size = actualSize; + if (TSDB_CODE_SUCCESS != code || pTlvTemp == NULL) { + taosArrayDestroy(pArray); + return TSDB_CODE_OUT_OF_MEMORY; + } + code = tlvDecodeBinary(pTlvTemp, pArray->pData); + break; + default: + break; + } + decodeFieldNum++; + } + + if (pDataTlv != NULL) { + pArray = taosArrayInit(capacity, elemSize); + if (NULL == pArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pArray->size = actualSize; + if (TSDB_CODE_SUCCESS != code || pTlvTemp == NULL) { + taosArrayDestroy(pArray); + return TSDB_CODE_OUT_OF_MEMORY; + } + code = tlvDecodeBinary(pDataTlv, pArray->pData); + } + *pObj = pArray; + return code; +} + + static int32_t msgToNodeListFromTlv(STlv* pTlv, void** pObj) { STlvDecoder decoder = {.bufSize = pTlv->len, .offset = 0, .pBuf = pTlv->value}; return msgToNodeList(&decoder, pObj); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8f2e82385b..5bd466af43 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -674,6 +674,8 @@ static void destroyTableCfg(STableCfg* pCfg) { static void destroySmaIndex(void* pIndex) { taosMemoryFree(((STableIndexInfo*)pIndex)->expr); } +static void destroyFuncParam(void* pValue) { taosMemoryFree(((SFunctParam*)pValue)->pCol); } + static void destroyHintValue(EHintOption option, void* value) { switch (option) { default: @@ -1173,6 +1175,7 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyList(pLogicNode->pGroupTags); nodesDestroyList(pLogicNode->pTags); nodesDestroyNode(pLogicNode->pSubtable); + taosArrayDestroyEx(pLogicNode->pFuncTypes, destroyFuncParam); break; } case QUERY_NODE_LOGIC_PLAN_JOIN: { @@ -1300,6 +1303,7 @@ void nodesDestroyNode(SNode* pNode) { destroyScanPhysiNode((SScanPhysiNode*)pNode); nodesDestroyList(pPhyNode->pGroupTags); nodesDestroyList(pPhyNode->pTargets); + taosArrayDestroy(pPhyNode->pFuncTypes); break; } case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 035377d22e..af1475aea2 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2501,17 +2501,30 @@ static bool lastRowScanOptCheckColNum(int32_t lastColNum, col_id_t lastColId, return true; } -static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, bool* hasOtherFunc) { +static bool isNeedSplitCacheLastFunc(SFunctionNode* pFunc, SScanLogicNode* pScan) { + int32_t funcType = pFunc->funcType; + if ((FUNCTION_TYPE_LAST_ROW != funcType || (FUNCTION_TYPE_LAST_ROW == funcType && TSDB_CACHE_MODEL_LAST_VALUE == pScan->cacheLastMode)) && + (FUNCTION_TYPE_LAST != funcType || (FUNCTION_TYPE_LAST == funcType && (TSDB_CACHE_MODEL_LAST_ROW == pScan->cacheLastMode || + QUERY_NODE_OPERATOR == nodeType(nodesListGetNode(pFunc->pParameterList, 0)) || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pFunc->pParameterList, 0))))) && + FUNCTION_TYPE_SELECT_VALUE != funcType && FUNCTION_TYPE_GROUP_KEY != funcType) { + return true; + } + return false; +} + +static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, int8_t cacheLastModel, bool* hasOtherFunc) { bool hasNonPKSelectFunc = false; SNode* pFunc = NULL; int32_t lastColNum = 0, selectNonPKColNum = 0; col_id_t lastColId = -1, selectNonPKColId = -1; + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(((SAggLogicNode*)pNode)->node.pChildren, 0); + uint32_t needSplitFuncCount = 0; FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { SFunctionNode* pAggFunc = (SFunctionNode*)pFunc; + SNode* pParam = nodesListGetNode(pAggFunc->pParameterList, 0); if (FUNCTION_TYPE_LAST == pAggFunc->funcType) { - SNode* pPar = nodesListGetNode(pAggFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN == nodeType(pPar)) { - SColumnNode* pCol = (SColumnNode*)pPar; + if (QUERY_NODE_COLUMN == nodeType(pParam)) { + SColumnNode* pCol = (SColumnNode*)pParam; if (pCol->colType != COLUMN_TYPE_COLUMN) { return false; } @@ -2520,13 +2533,18 @@ static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, bool* hasOtherFunc) { lastColNum++; } } - if (QUERY_NODE_VALUE == nodeType(nodesListGetNode(pAggFunc->pParameterList, 0))) { + else if (QUERY_NODE_VALUE == nodeType(pParam) || QUERY_NODE_OPERATOR == nodeType(pParam)) { + needSplitFuncCount++; + *hasOtherFunc = true; + } + if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId)) { return false; } - if (!lastRowScanOptCheckColNum(lastColNum, lastColId, selectNonPKColNum, selectNonPKColId)) - return false; + if (TSDB_CACHE_MODEL_LAST_ROW == cacheLastModel) { + needSplitFuncCount++; + *hasOtherFunc = true; + } } else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) { - SNode* pParam = nodesListGetNode(pAggFunc->pParameterList, 0); if (QUERY_NODE_COLUMN == nodeType(pParam)) { SColumnNode* pCol = (SColumnNode*)pParam; if (COLUMN_TYPE_COLUMN == pCol->colType && PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) { @@ -2548,15 +2566,21 @@ static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, bool* hasOtherFunc) { } } else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) { *hasOtherFunc = true; + needSplitFuncCount++; + } else if (FUNCTION_TYPE_LAST_ROW == pAggFunc->funcType && TSDB_CACHE_MODEL_LAST_VALUE == cacheLastModel) { + *hasOtherFunc = true; + needSplitFuncCount++; } } + if (needSplitFuncCount >= ((SAggLogicNode*)pNode)->pAggFuncs->length) { + return false; + } return true; } static bool lastRowScanOptCheckLastCache(SAggLogicNode* pAgg, SScanLogicNode* pScan) { - // Only one of LAST and LASTROW can appear - if (pAgg->hasLastRow == pAgg->hasLast || (!pAgg->hasLast && !pAgg->hasLastRow) || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions || + if ((pAgg->hasLastRow == pAgg->hasLast && !pAgg->hasLastRow) || (!pAgg->hasLast && !pAgg->hasLastRow) || NULL != pAgg->pGroupKeys || NULL != pScan->node.pConditions || !hasSuitableCache(pScan->cacheLastMode, pAgg->hasLastRow, pAgg->hasLast) || IS_TSWINDOW_SPECIFIED(pScan->scanRange)) { return false; @@ -2578,7 +2602,7 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) { } bool hasOtherFunc = false; - if (!lastRowScanOptCheckFuncList(pNode, &hasOtherFunc)) { + if (!lastRowScanOptCheckFuncList(pNode, pScan->cacheLastMode, &hasOtherFunc)) { return false; } @@ -2593,6 +2617,7 @@ typedef struct SLastRowScanOptSetColDataTypeCxt { bool doAgg; SNodeList* pLastCols; SNodeList* pOtherCols; + int32_t funcType; } SLastRowScanOptSetColDataTypeCxt; static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { @@ -2615,7 +2640,7 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, bool erase) { +static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, SNodeList* pLastRowCols, bool erase) { SNode* pTarget = NULL; WHERE_EACH(pTarget, pTargets) { bool found = false; @@ -2627,6 +2652,10 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo break; } } + if (!found && nodeListNodeEqual(pLastRowCols, pTarget)) { + found = true; + } + if (!found && erase) { ERASE_NODE(pTargets); continue; @@ -2635,7 +2664,7 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo } } -static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pList1, SNodeList* pList2) { +static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pList1, SNodeList* pList2, SNodeList* pList3) { SNode* pTarget = NULL; WHERE_EACH(pTarget, pTargets) { bool found = false; @@ -2654,6 +2683,11 @@ static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pL } } } + + if (!found && nodeListNodeEqual(pList3, pTarget)) { + found = true; + } + if (!found) { ERASE_NODE(pTargets); continue; @@ -2662,6 +2696,33 @@ static void lastRowScanOptRemoveUslessTargets(SNodeList* pTargets, SNodeList* pL } } +static int32_t lastRowScanBuildFuncTypes(SScanLogicNode* pScan, SColumnNode* pColNode, int32_t funcType) { + SFunctParam* pFuncTypeParam = taosMemoryCalloc(1, sizeof(SFunctParam)); + if (NULL == pFuncTypeParam) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pFuncTypeParam->type = funcType; + if (NULL == pScan->pFuncTypes) { + pScan->pFuncTypes = taosArrayInit(pScan->pScanCols->length, sizeof(SFunctParam)); + if (NULL == pScan->pFuncTypes) { + taosMemoryFree(pFuncTypeParam); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + pFuncTypeParam->pCol = taosMemoryCalloc(1, sizeof(SColumn)); + if (NULL == pFuncTypeParam->pCol) { + taosMemoryFree(pFuncTypeParam); + return TSDB_CODE_OUT_OF_MEMORY; + } + pFuncTypeParam->pCol->colId = pColNode->colId; + strcpy(pFuncTypeParam->pCol->name, pColNode->colName); + taosArrayPush(pScan->pFuncTypes, pFuncTypeParam); + + taosMemoryFree(pFuncTypeParam); + return TSDB_CODE_SUCCESS; +} + static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SAggLogicNode* pAgg = (SAggLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, lastRowScanOptMayBeOptimized); @@ -2673,10 +2734,16 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic SNode* pNode = NULL; SColumnNode* pPKTsCol = NULL; SColumnNode* pNonPKCol = NULL; + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0); + pScan->scanType = SCAN_TYPE_LAST_ROW; + pScan->igLastNull = pAgg->hasLast ? true : false; + SArray* isDuplicateCol = taosArrayInit(pScan->pScanCols->length, sizeof(bool)); + SNodeList* pLastRowCols = NULL; FOREACH(pNode, pAgg->pAggFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pNode; int32_t funcType = pFunc->funcType; + SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, 0); if (FUNCTION_TYPE_LAST_ROW == funcType || FUNCTION_TYPE_LAST == funcType) { int32_t len = snprintf(pFunc->functionName, sizeof(pFunc->functionName), FUNCTION_TYPE_LAST_ROW == funcType ? "_cache_last_row" : "_cache_last"); @@ -2686,6 +2753,61 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic nodesClearList(cxt.pLastCols); return code; } + cxt.funcType = pFunc->funcType; + // add duplicate cols which be removed for both last_row, last + if (pAgg->hasLast && pAgg->hasLastRow) { + if (QUERY_NODE_COLUMN == nodeType(pParamNode)) { + SNode* pColNode = NULL; + int i = 0; + FOREACH(pColNode, pScan->pScanCols) { + bool isDup = false; + bool* isDuplicate = taosArrayGet(isDuplicateCol, i); + if (NULL == isDuplicate) { + taosArrayInsert(isDuplicateCol, i, &isDup); + isDuplicate = taosArrayGet(isDuplicateCol, i); + } + i++; + if (nodesEqualNode(pParamNode, pColNode)) { + if (*isDuplicate) { + if (0 == strncmp(((SColumnNode*)pColNode)->colName, "#dup_col.", 9)) { + continue; + } + SNode* newColNode = nodesCloneNode(pColNode); + sprintf(((SColumnNode*)newColNode)->colName, "#dup_col.%p", newColNode); + sprintf(((SColumnNode*)pParamNode)->colName, "#dup_col.%p", newColNode); + + nodesListAppend(pScan->pScanCols, newColNode); + isDup = true; + taosArrayInsert(isDuplicateCol, pScan->pScanCols->length, &isDup); + nodesListAppend(pScan->node.pTargets, nodesCloneNode(newColNode)); + if (funcType != FUNCTION_TYPE_LAST) { + nodesListMakeAppend(&pLastRowCols, nodesCloneNode(newColNode)); + } + + lastRowScanBuildFuncTypes(pScan, (SColumnNode*)newColNode, pFunc->funcType); + } else { + isDup = true; + *isDuplicate = isDup; + if (funcType != FUNCTION_TYPE_LAST && !nodeListNodeEqual(cxt.pLastCols, pColNode)) { + nodesListMakeAppend(&pLastRowCols, nodesCloneNode(pColNode)); + } + lastRowScanBuildFuncTypes(pScan, (SColumnNode*)pColNode, pFunc->funcType); + } + continue; + }else if (nodeListNodeEqual(pFunc->pParameterList, pColNode)) { + if (funcType != FUNCTION_TYPE_LAST && ((SColumnNode*)pColNode)->colId == PRIMARYKEY_TIMESTAMP_COL_ID && + !nodeListNodeEqual(pLastRowCols, pColNode)) { + nodesListMakeAppend(&pLastRowCols, nodesCloneNode(pColNode)); + + lastRowScanBuildFuncTypes(pScan, (SColumnNode*)pColNode, pFunc->funcType); + isDup = true; + *isDuplicate = isDup; + } + } + } + } + } + if (FUNCTION_TYPE_LAST == funcType) { nodesWalkExpr(nodesListGetNode(pFunc->pParameterList, 0), lastRowScanOptSetColDataType, &cxt); nodesListErase(pFunc->pParameterList, nodesListGetCell(pFunc->pParameterList, 1)); @@ -2707,15 +2829,13 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic } } - SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0); - pScan->scanType = SCAN_TYPE_LAST_ROW; - pScan->igLastNull = pAgg->hasLast ? true : false; if (NULL != cxt.pLastCols) { cxt.doAgg = false; - lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true); + cxt.funcType = FUNCTION_TYPE_CACHE_LAST; + lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true); nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt); - lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false); - lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols); + lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false); + lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols); if (pPKTsCol && pScan->node.pTargets->length == 1) { // when select last(ts),ts from ..., we add another ts to targets sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol); @@ -2728,10 +2848,12 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic } nodesClearList(cxt.pLastCols); } + pAgg->hasLastRow = false; pAgg->hasLast = false; pCxt->optimized = true; + taosArrayDestroy(isDuplicateCol); return TSDB_CODE_SUCCESS; } @@ -2749,7 +2871,7 @@ static bool splitCacheLastFuncOptMayBeOptimized(SLogicNode* pNode) { } bool hasOtherFunc = false; - if (!lastRowScanOptCheckFuncList(pNode, &hasOtherFunc)) { + if (!lastRowScanOptCheckFuncList(pNode, pScan->cacheLastMode, &hasOtherFunc)) { return false; } @@ -2770,6 +2892,16 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, pNew->hasLastRow = false; pNew->hasLast = false; + SNode* pFuncNode = NULL; + FOREACH(pFuncNode, pFunc) { + SFunctionNode* pFunc = (SFunctionNode*)pFuncNode; + if (FUNCTION_TYPE_LAST_ROW == pFunc->funcType) { + pNew->hasLastRow = true; + } else if (FUNCTION_TYPE_LAST == pFunc->funcType) { + pNew->hasLast = true; + } + } + pNew->hasTimeLineFunc = pAgg->hasTimeLineFunc; pNew->hasGroupKeyOptimized = false; pNew->onlyHasKeepOrderFunc = pAgg->onlyHasKeepOrderFunc; @@ -2894,21 +3026,31 @@ static int32_t splitCacheLastFuncOptimize(SOptimizeContext* pCxt, SLogicSubplan* if (NULL == pAgg) { return TSDB_CODE_SUCCESS; } - + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0); SNode* pNode = NULL; SNodeList* pAggFuncList = NULL; + { + bool hasLast = false; + bool hasLastRow = false; WHERE_EACH(pNode, pAgg->pAggFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pNode; int32_t funcType = pFunc->funcType; - if (FUNCTION_TYPE_LAST_ROW != funcType && FUNCTION_TYPE_LAST != funcType && - FUNCTION_TYPE_SELECT_VALUE != funcType && FUNCTION_TYPE_GROUP_KEY != funcType) { + + if (isNeedSplitCacheLastFunc(pFunc, pScan)) { nodesListMakeStrictAppend(&pAggFuncList, nodesCloneNode(pNode)); ERASE_NODE(pAgg->pAggFuncs); continue; } + if (FUNCTION_TYPE_LAST_ROW == funcType ) { + hasLastRow = true; + } else if (FUNCTION_TYPE_LAST == funcType) { + hasLast = true; + } WHERE_NEXT; } + pAgg->hasLast = hasLast; + pAgg->hasLastRow = hasLastRow; } if (NULL == pAggFuncList) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index e266c55425..21c698fda5 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -562,9 +562,36 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu pScan->groupSort = pScanLogicNode->groupSort; pScan->ignoreNull = pScanLogicNode->igLastNull; + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); - return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); + int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); + + if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) { + pScan->pFuncTypes = taosArrayInit(taosArrayGetSize(pScanLogicNode->pFuncTypes), sizeof(int32_t)); + if (NULL == pScan->pFuncTypes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SNode* pTargetNode = NULL; + int funcTypeIndex = 0; + FOREACH(pTargetNode, ((SScanPhysiNode*)pScan)->pScanCols) { + if (((STargetNode*)pTargetNode)->pExpr->type != QUERY_NODE_COLUMN) { + continue; + } + SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pTargetNode)->pExpr; + + for (int i = 0; i < TARRAY_SIZE(pScanLogicNode->pFuncTypes); ++i) { + SFunctParam* pFunctParam = taosArrayGet(pScanLogicNode->pFuncTypes, i); + if (pColNode->colId == pFunctParam->pCol->colId && + 0 == strncmp(pColNode->colName, pFunctParam->pCol->name, strlen(pColNode->colName))) { + taosArrayInsert(pScan->pFuncTypes, funcTypeIndex, &pFunctParam->type); + break; + } + } + funcTypeIndex++; + } + } + return code; } static int32_t createTableCountScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 63745f75ab..b6aff95571 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -422,6 +422,11 @@ e ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py -R +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_and_last_row.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/leastsquares.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/leastsquares.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/length.py diff --git a/tests/script/tsim/query/cache_last.sim b/tests/script/tsim/query/cache_last.sim index 8247a2f723..65eb46de69 100644 --- a/tests/script/tsim/query/cache_last.sim +++ b/tests/script/tsim/query/cache_last.sim @@ -55,7 +55,7 @@ if $rows != 1 then return -1 endi sql explain select count(*), last_row(f1), last(f1) from sta; -if $data00 != @-> Aggragate (functions=3 width=24 input_order=desc )@ then +if $data00 != @-> Merge (columns=3 width=24 input_order=unknown output_order=unknown mode=column)@ then return -1 endi sql_error select count(*), last_row(f1), min(f1), f1 from sta; diff --git a/tests/system-test/2-query/last_and_last_row.py b/tests/system-test/2-query/last_and_last_row.py new file mode 100644 index 0000000000..b04b3a75f3 --- /dev/null +++ b/tests/system-test/2-query/last_and_last_row.py @@ -0,0 +1,660 @@ +import datetime +import sys +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import tdDnodes +from math import inf + + +class TDTestCase: + def init(self, conn, logSql, replicaVer=1): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), True) + + def check_explain_res_has_row(self, plan_str_expect: str, rows, sql): + plan_found = False + for row in rows: + if str(row).find(plan_str_expect) >= 0: + tdLog.debug("plan: [%s] found in: [%s]" % (plan_str_expect, str(row))) + plan_found = True + break + if not plan_found: + tdLog.exit("plan: %s not found in res: [%s] in sql: %s" % (plan_str_expect, str(rows), sql)) + + def check_explain_res_no_row(self, plan_str_not_expect: str, res, sql): + for row in res: + if str(row).find(plan_str_not_expect) >= 0: + tdLog.exit('plan: [%s] found in: [%s] for sql: %s' % (plan_str_not_expect, str(row), sql)) + + def explain_sql(self, sql: str): + sql = "explain " + sql + tdSql.query(sql, queryTimes=1) + return tdSql.queryResult + + def explain_and_check_res(self, sqls, hasLastRowScanRes): + for sql, has_last in zip(sqls, hasLastRowScanRes): + res = self.explain_sql(sql) + if has_last == 1: + self.check_explain_res_has_row("Last Row Scan", res, sql) + else: + self.check_explain_res_no_row("Last Row Scan", res, sql) + + def none_model_test(self): + tdSql.execute("drop database if exists last_test_none_model ;") + tdSql.execute("create database last_test_none_model cachemodel 'none';") + tdSql.execute("use last_test_none_model;") + tdSql.execute("create stable last_test_none_model.st(ts timestamp, id int) tags(tid int);") + tdSql.execute("create table last_test_none_model.test_t1 using last_test_none_model.st tags(1);") + tdSql.execute("create table last_test_none_model.test_t2 using last_test_none_model.st tags(2);") + tdSql.execute("create table last_test_none_model.test_t3 using last_test_none_model.st tags(3);") + tdSql.execute("create table last_test_none_model.test_t4 using last_test_none_model.st tags(4);") + + maxRange = 100 + # 2023-11-13 00:00:00.000 + startTs = 1699804800000 + for i in range(maxRange): + insertSqlString = "insert into last_test_none_model.test_t1 values(%d, %d);" % (startTs + i, i) + tdSql.execute(insertSqlString) + + last_ts = startTs + maxRange + tdSql.execute("insert into last_test_none_model.test_t1 (ts) values(%d)" % (last_ts)) + sql = f'select last_row(ts), last(*) from last_test_none_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_no_row("Last Row Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_none_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, None) + tdSql.checkData(0, 3, maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_no_row("Last Row Scan", explain_res, sql) + + sql = f'select last(*), last_row(ts), count(*) from last_test_none_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, maxRange - 1) + tdSql.checkData(0, 2, last_ts) + tdSql.checkData(0, 3, maxRange + 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_no_row("Last Row Scan", explain_res, sql) + + tdSql.error(f'select last(*), last_row(ts), ts from last_test_none_model.test_t1;') + + sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_none_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, maxRange + 1) + tdSql.checkData(0, 3, None) + tdSql.checkData(0, 4, maxRange - 1) + tdSql.checkData(0, 5, last_ts) + tdSql.checkData(0, 6, maxRange - 1) + + + startTs2 = startTs + 86400000 + for i in range(maxRange): + i = i + 2 * maxRange + insertSqlString = "insert into last_test_none_model.test_t2 values(%d, %d);" % (startTs2 + i, i) + tdSql.execute(insertSqlString) + last_ts2 = startTs2 + maxRange + + startTs3 = startTs + 2 * 86400000 + for i in range(maxRange): + i = i + 3 * maxRange + insertSqlString = "insert into last_test_none_model.test_t3 values(%d, %d);" % (startTs3 + i, i) + tdSql.execute(insertSqlString) + last_ts3 = startTs3 + 4 * maxRange - 1 + + startTs4 = startTs + 3 * 86400000 + for i in range(maxRange): + i = i + 4 * maxRange + insertSqlString = "insert into last_test_none_model.test_t4 values(%d, %d);" % (startTs4 + i, i) + tdSql.execute(insertSqlString) + + last_ts4 = startTs4 + 5 * maxRange - 1 + sql = f'select last_row(ts), last(*) from last_test_none_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 5 * maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_no_row("Last Row Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_none_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 5 * maxRange - 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_no_row("Last Row Scan", explain_res, sql) + + sql = f'select last(*), last_row(ts), count(*) from last_test_none_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, 5 * maxRange - 1) + tdSql.checkData(0, 2, last_ts4) + tdSql.checkData(0, 3, 4 * maxRange + 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_no_row("Last Row Scan", explain_res, sql) + + tdSql.error(f'select last(*), last_row(ts), ts from last_test_none_model.st;') + + sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_none_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 4 * maxRange + 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + tdSql.checkData(0, 4, 5 * maxRange - 1) + tdSql.checkData(0, 5, last_ts4) + tdSql.checkData(0, 6, 5 * maxRange - 1) + + sql = f'select last_row(1), last(2), count(*) , last_row(id), last(id), last(*) from last_test_none_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 2) + tdSql.checkData(0, 2, 4 * maxRange + 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + tdSql.checkData(0, 4, 5 * maxRange - 1) + tdSql.checkData(0, 5, last_ts4) + tdSql.checkData(0, 6, 5 * maxRange - 1) + + tdSql.execute("drop table if exists last_test_none_model.test_t4 ;") + tdSql.execute("drop table if exists last_test_none_model.test_t3 ;") + tdSql.execute("drop table if exists last_test_none_model.test_t2 ;") + tdSql.execute("drop table if exists last_test_none_model.test_t1 ;") + tdSql.execute("drop stable if exists last_test_none_model.st;") + tdSql.execute("drop database if exists last_test_none_model;") + + def last_value_model_test(self): + tdSql.execute("create database last_test_last_value_model cachemodel 'last_value' ;") + tdSql.execute("use last_test_last_value_model;") + tdSql.execute("create stable last_test_last_value_model.st(ts timestamp, id int) tags(tid int);") + tdSql.execute("create table last_test_last_value_model.test_t1 using last_test_last_value_model.st tags(1);") + tdSql.execute("create table last_test_last_value_model.test_t2 using last_test_last_value_model.st tags(2);") + tdSql.execute("create table last_test_last_value_model.test_t3 using last_test_last_value_model.st tags(3);") + tdSql.execute("create table last_test_last_value_model.test_t4 using last_test_last_value_model.st tags(4);") + + maxRange = 100 + # 2023-11-13 00:00:00.000 + startTs = 1699804800000 + for i in range(maxRange): + insertSqlString = "insert into last_test_last_value_model.test_t1 values(%d, %d);" % (startTs + i, i) + tdSql.execute(insertSqlString) + + last_ts = startTs + maxRange + tdSql.execute("insert into last_test_last_value_model.test_t1 (ts) values(%d)" % (last_ts)) + sql = f'select last_row(ts), last(*) from last_test_last_value_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_last_value_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, None) + tdSql.checkData(0, 3, maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last(*), last_row(ts), count(*) from last_test_last_value_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, maxRange - 1) + tdSql.checkData(0, 2, last_ts) + tdSql.checkData(0, 3, maxRange + 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_last_value_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, maxRange + 1) + tdSql.checkData(0, 3, None) + tdSql.checkData(0, 4, maxRange - 1) + tdSql.checkData(0, 5, last_ts) + tdSql.checkData(0, 6, maxRange - 1) + + startTs2 = startTs + 86400000 + for i in range(maxRange): + i = i + 2 * maxRange + insertSqlString = "insert into last_test_last_value_model.test_t2 values(%d, %d);" % (startTs2 + i, i) + tdSql.execute(insertSqlString) + last_ts2 = startTs2 + maxRange + + startTs3 = startTs + 2 * 86400000 + for i in range(maxRange): + i = i + 3 * maxRange + insertSqlString = "insert into last_test_last_value_model.test_t3 values(%d, %d);" % (startTs3 + i, i) + tdSql.execute(insertSqlString) + last_ts3 = startTs3 + 4 * maxRange - 1 + + startTs4 = startTs + 3 * 86400000 + for i in range(maxRange): + i = i + 4 * maxRange + insertSqlString = "insert into last_test_last_value_model.test_t4 values(%d, %d);" % (startTs4 + i, i) + tdSql.execute(insertSqlString) + + last_ts4 = startTs4 + 5 * maxRange - 1 + sql = f'select last_row(ts), last(*) from last_test_last_value_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 5 * maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_last_value_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 5 * maxRange - 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last(*), last_row(ts), count(*) from last_test_last_value_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, 5 * maxRange - 1) + tdSql.checkData(0, 2, last_ts4) + tdSql.checkData(0, 3, 4 * maxRange + 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + tdSql.error(f'select last(*), last_row(ts), ts from last_test_last_value_model.st;') + + sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_last_value_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 4 * maxRange + 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + tdSql.checkData(0, 4, 5 * maxRange - 1) + tdSql.checkData(0, 5, last_ts4) + tdSql.checkData(0, 6, 5 * maxRange - 1) + + sql = f'select last_row(1), last(2), count(*) , last_row(id), last(id), last(*) from last_test_last_value_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 2) + tdSql.checkData(0, 2, 4 * maxRange + 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + tdSql.checkData(0, 4, 5 * maxRange - 1) + tdSql.checkData(0, 5, last_ts4) + tdSql.checkData(0, 6, 5 * maxRange - 1) + + tdSql.execute("drop table if exists last_test_last_value_model.test_t4 ;") + tdSql.execute("drop table if exists last_test_last_value_model.test_t3 ;") + tdSql.execute("drop table if exists last_test_last_value_model.test_t2 ;") + tdSql.execute("drop table if exists last_test_last_value_model.test_t1 ;") + tdSql.execute("drop stable if exists last_test_last_value_model.st;") + tdSql.execute("drop database if exists last_test_last_value_model;") + + def last_row_model_test(self): + tdSql.execute("create database last_test_last_row_model cachemodel 'last_row';") + tdSql.execute("use last_test_last_row_model;") + tdSql.execute("create stable last_test_last_row_model.st(ts timestamp, id int) tags(tid int);") + tdSql.execute("create table last_test_last_row_model.test_t1 using last_test_last_row_model.st tags(1);") + tdSql.execute("create table last_test_last_row_model.test_t2 using last_test_last_row_model.st tags(2);") + tdSql.execute("create table last_test_last_row_model.test_t3 using last_test_last_row_model.st tags(3);") + tdSql.execute("create table last_test_last_row_model.test_t4 using last_test_last_row_model.st tags(4);") + + maxRange = 100 + # 2023-11-13 00:00:00.000 + startTs = 1699804800000 + for i in range(maxRange): + insertSqlString = "insert into last_test_last_row_model.test_t1 values(%d, %d);" % (startTs + i, i) + tdSql.execute(insertSqlString) + + last_ts = startTs + maxRange + tdSql.execute("insert into last_test_last_row_model.test_t1 (ts) values(%d)" % (last_ts)) + sql = f'select last_row(ts), last(*) from last_test_last_row_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_last_row_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, None) + tdSql.checkData(0, 3, maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last(*), last_row(ts), count(*) from last_test_last_row_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, maxRange - 1) + tdSql.checkData(0, 2, last_ts) + tdSql.checkData(0, 3, maxRange + 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_last_row_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, maxRange + 1) + tdSql.checkData(0, 3, None) + tdSql.checkData(0, 4, maxRange - 1) + tdSql.checkData(0, 5, last_ts) + tdSql.checkData(0, 6, maxRange - 1) + + startTs2 = startTs + 86400000 + for i in range(maxRange): + i = i + 2 * maxRange + insertSqlString = "insert into last_test_last_row_model.test_t2 values(%d, %d);" % (startTs2 + i, i) + tdSql.execute(insertSqlString) + last_ts2 = startTs2 + maxRange + + startTs3 = startTs + 2 * 86400000 + for i in range(maxRange): + i = i + 3 * maxRange + insertSqlString = "insert into last_test_last_row_model.test_t3 values(%d, %d);" % (startTs3 + i, i) + tdSql.execute(insertSqlString) + last_ts3 = startTs3 + 4 * maxRange - 1 + + startTs4 = startTs + 3 * 86400000 + for i in range(maxRange): + i = i + 4 * maxRange + insertSqlString = "insert into last_test_last_row_model.test_t4 values(%d, %d);" % (startTs4 + i, i) + tdSql.execute(insertSqlString) + + last_ts4 = startTs4 + 5 * maxRange - 1 + sql = f'select last_row(ts), last(*) from last_test_last_row_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 5 * maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_last_row_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 5 * maxRange - 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + sql = f'select last(*), last_row(ts), count(*) from last_test_last_row_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, 5 * maxRange - 1) + tdSql.checkData(0, 2, last_ts4) + tdSql.checkData(0, 3, 4 * maxRange + 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) + + tdSql.error(f'select last(*), last_row(ts), ts from last_test_last_row_model.st;') + + sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_last_row_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 4 * maxRange + 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + tdSql.checkData(0, 4, 5 * maxRange - 1) + tdSql.checkData(0, 5, last_ts4) + tdSql.checkData(0, 6, 5 * maxRange - 1) + + sql = f'select last_row(1), last(2), count(*) , last_row(id), last(id), last(*) from last_test_last_row_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 2) + tdSql.checkData(0, 2, 4 * maxRange + 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + tdSql.checkData(0, 4, 5 * maxRange - 1) + tdSql.checkData(0, 5, last_ts4) + tdSql.checkData(0, 6, 5 * maxRange - 1) + + tdSql.execute("drop table if exists last_test_last_row_model.test_t4 ;") + tdSql.execute("drop table if exists last_test_last_row_model.test_t3 ;") + tdSql.execute("drop table if exists last_test_last_row_model.test_t2 ;") + tdSql.execute("drop table if exists last_test_last_row_model.test_t1 ;") + tdSql.execute("drop stable if exists last_test_last_row_model.st;") + tdSql.execute("drop database if exists last_test_last_row_model;") + + def both_model_test(self): + tdSql.execute("create database last_test_both_model cachemodel 'both';") + tdSql.execute("use last_test_both_model;") + tdSql.execute("create stable last_test_both_model.st(ts timestamp, id int) tags(tid int);") + tdSql.execute("create table last_test_both_model.test_t1 using last_test_both_model.st tags(1);") + tdSql.execute("create table last_test_both_model.test_t2 using last_test_both_model.st tags(2);") + tdSql.execute("create table last_test_both_model.test_t3 using last_test_both_model.st tags(3);") + tdSql.execute("create table last_test_both_model.test_t4 using last_test_both_model.st tags(4);") + + maxRange = 100 + # 2023-11-13 00:00:00.000 + startTs = 1699804800000 + for i in range(maxRange): + insertSqlString = "insert into last_test_both_model.test_t1 values(%d, %d);" % (startTs + i, i) + tdSql.execute(insertSqlString) + + last_ts = startTs + maxRange + tdSql.execute("insert into last_test_both_model.test_t1 (ts) values(%d)" % (last_ts)) + sql = f'select last_row(ts), last(*) from last_test_both_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_no_row("Table Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_both_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, None) + tdSql.checkData(0, 3, maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_no_row("Table Scan", explain_res, sql) + + sql = f'select last(*), last_row(ts), count(*) from last_test_both_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, maxRange - 1) + tdSql.checkData(0, 2, last_ts) + tdSql.checkData(0, 3, maxRange + 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_both_model.test_t1;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts) + tdSql.checkData(0, 1, last_ts) + tdSql.checkData(0, 2, maxRange + 1) + tdSql.checkData(0, 3, None) + tdSql.checkData(0, 4, maxRange - 1) + tdSql.checkData(0, 5, last_ts) + tdSql.checkData(0, 6, maxRange - 1) + + tdSql.error(f'select last(*), last_row(ts), ts from last_test_both_model.test_t1;') + + startTs2 = startTs + 86400000 + for i in range(maxRange): + i = i + 2 * maxRange + insertSqlString = "insert into last_test_both_model.test_t2 values(%d, %d);" % (startTs2 + i, i) + tdSql.execute(insertSqlString) + last_ts2 = startTs2 + maxRange + + startTs3 = startTs + 2 * 86400000 + for i in range(maxRange): + i = i + 3 * maxRange + insertSqlString = "insert into last_test_both_model.test_t3 values(%d, %d);" % (startTs3 + i, i) + tdSql.execute(insertSqlString) + last_ts3 = startTs3 + 4 * maxRange - 1 + + startTs4 = startTs + 3 * 86400000 + for i in range(maxRange): + i = i + 4 * maxRange + insertSqlString = "insert into last_test_both_model.test_t4 values(%d, %d);" % (startTs4 + i, i) + tdSql.execute(insertSqlString) + + last_ts4 = startTs4 + 5 * maxRange - 1 + + sql = f'select last_row(ts), last(*) from last_test_both_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 5 * maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_no_row("Table Scan", explain_res, sql) + + sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_both_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 5 * maxRange - 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + self.check_explain_res_no_row("Table Scan", explain_res, sql) + + sql = f'select last(*), last_row(ts), count(*) from last_test_both_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, 5 * maxRange - 1) + #tdSql.checkData(0, 2, last_ts4) + tdSql.checkData(0, 3, 4 * maxRange + 1) + + explain_res = self.explain_sql(sql) + self.check_explain_res_has_row("Last Row Scan", explain_res, sql) + + tdSql.error(f'select last(*), last_row(ts), ts from last_test_both_model.st;') + + sql = f'select last_row(ts), last(ts), count(*) , last_row(id), last(id), last(*) from last_test_both_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, last_ts4) + tdSql.checkData(0, 1, last_ts4) + tdSql.checkData(0, 2, 4 * maxRange + 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + tdSql.checkData(0, 4, 5 * maxRange - 1) + tdSql.checkData(0, 5, last_ts4) + tdSql.checkData(0, 6, 5 * maxRange - 1) + + sql = f'select last_row(1), last(2), count(*) , last_row(id), last(id), last(*) from last_test_both_model.st;' + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 2) + tdSql.checkData(0, 2, 4 * maxRange + 1) + tdSql.checkData(0, 3, 5 * maxRange - 1) + tdSql.checkData(0, 4, 5 * maxRange - 1) + tdSql.checkData(0, 5, last_ts4) + tdSql.checkData(0, 6, 5 * maxRange - 1) + + tdSql.execute("drop table if exists last_test_both_model.test_t4 ;") + tdSql.execute("drop table if exists last_test_both_model.test_t3 ;") + tdSql.execute("drop table if exists last_test_both_model.test_t2 ;") + tdSql.execute("drop table if exists last_test_both_model.test_t1 ;") + tdSql.execute("drop stable if exists last_test_both_model.st;") + tdSql.execute("drop database if exists last_test_both_model;") + + def run(self): + self.none_model_test() + + self.last_value_model_test() + + self.last_row_model_test() + + self.both_model_test() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index 3881607437..3efa8b8b74 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -250,7 +250,7 @@ class TDTestCase: "last_row(c1), last(c1)", "last_row(c1), c1,c3, ts" ] - has_last_row_scan_res = [0,0,1] + has_last_row_scan_res = [1,1,1] sqls = self.format_sqls(sql_template, select_items) self.explain_and_check_res(sqls, has_last_row_scan_res) #res_expect = [None, None, [999, 999, 499, "2018-11-25 19:30:00.000"]]