diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index dbbe1d92dc..4ffcb616dd 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -388,6 +388,7 @@ typedef struct SLastRowScanPhysiNode { SNodeList* pGroupTags; bool groupSort; bool ignoreNull; + SNodeList* pTargets; } SLastRowScanPhysiNode; typedef SLastRowScanPhysiNode STableCountScanPhysiNode; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 2909b550d7..b6aa791cf0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -30,10 +30,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) { uint64_t ts = 0; SFirstLastRes* p; + col_id_t colId; for (int32_t i = 0; i < pReader->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]); int32_t slotId = slotIds[i]; SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i); + colId = pColVal->colVal.cid; p = (SFirstLastRes*)varDataVal(pRes[i]); p->ts = pColVal->ts; @@ -63,8 +65,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) { colDataSetVal(pCol, numOfRows, (const char*)&ts, false); continue; - } - if (pReader->numOfCols == 1 && dstSlotIds[0] != idx) { + } else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) { if (!p->isNull) { colDataSetVal(pCol, numOfRows, p->buf, false); } else { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index a7b4fe02f6..6d59698855 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -54,6 +54,19 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM #define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) +static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SNodeList* pTargets) { + SNode* pNode; + int32_t idx = 0; + FOREACH(pNode, pTargets) { + if (nodeType(pNode) == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)pNode; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx); + pColInfo->info.colId = pCol->colId; + } + idx++; + } +} + SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -114,10 +127,12 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe capacity = TMIN(totalTables, 4096); pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); + setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode->pTargets); blockDataEnsureCapacity(pInfo->pBufferredRes, capacity); } else { // by tags pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull); capacity = 1; // only one row output + setColIdForCacheReadBlock(pInfo->pRes, pScanNode->pTargets); } initResultSizeInfo(&pOperator->resultInfo, capacity); @@ -192,7 +207,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) { - SColumnInfoData* pCol = taosArrayGet(pInfo->pBufferredRes->pDataBlock, i); + SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i); int32_t slotId = pCol->info.slotId; SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c97c920a3b..c9b49ee30f 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1773,6 +1773,7 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags"; static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort"; +static const char* jkLastRowScanPhysiPlanTargets = "Targets"; static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj; @@ -1784,6 +1785,9 @@ static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort); } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkLastRowScanPhysiPlanTargets, pNode->pTargets); + } return code; } @@ -1798,6 +1802,9 @@ static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanTargets, &pNode->pTargets); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 99100b2a1d..ea59d93d7f 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2052,7 +2052,8 @@ enum { PHY_LAST_ROW_SCAN_CODE_SCAN = 1, PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS, PHY_LAST_ROW_SCAN_CODE_GROUP_SORT, - PHY_LAST_ROW_SCAN_CODE_IGNULL + PHY_LAST_ROW_SCAN_CODE_IGNULL, + PHY_LAST_ROW_SCAN_CODE_TARGETS }; static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2068,6 +2069,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets); + } return code; } @@ -2091,6 +2095,9 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) { case PHY_LAST_ROW_SCAN_CODE_IGNULL: code = tlvDecodeBool(pTlv, &pNode->ignoreNull); break; + case PHY_LAST_ROW_SCAN_CODE_TARGETS: + code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); + break; default: break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 4f6d3d95e1..ee22caf574 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1285,6 +1285,7 @@ void nodesDestroyNode(SNode* pNode) { SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode; destroyScanPhysiNode((SScanPhysiNode*)pNode); nodesDestroyList(pPhyNode->pGroupTags); + nodesDestroyList(pPhyNode->pTargets); break; } case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index d6799a25a7..5cf3426e6f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -552,6 +552,7 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu if (NULL == pScan) { return TSDB_CODE_OUT_OF_MEMORY; } + pScan->pTargets = nodesCloneList(pScanLogicNode->node.pTargets); pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags); if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) { diff --git a/tests/system-test/2-query/last_cache_scan.py b/tests/system-test/2-query/last_cache_scan.py index e75729f960..0f0936ebab 100644 --- a/tests/system-test/2-query/last_cache_scan.py +++ b/tests/system-test/2-query/last_cache_scan.py @@ -283,6 +283,42 @@ class TDTestCase: tdSql.checkData(0, 3, 1001) tdSql.checkData(0, 4, "2018-11-25 19:30:00.000") + sql_template = 'select %s from meters partition by tbname' + select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, tbname", "last(c10), c10, ts"] + has_last_row_scan_res = [1,1,1] + sqls = self.format_sqls(sql_template, select_items) + self.explain_and_check_res(sqls, has_last_row_scan_res) + tdSql.query(sqls[0], queryTimes=1) + tdSql.checkRows(10) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:01.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:00.000') + + tdSql.query(sqls[1], queryTimes=1) + tdSql.checkRows(10) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:00.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:01.000') + + sql_template = 'select %s from meters partition by t1' + select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, t1", "last(c10), c10, ts"] + has_last_row_scan_res = [1,1,1] + sqls = self.format_sqls(sql_template, select_items) + self.explain_and_check_res(sqls, has_last_row_scan_res) + tdSql.query(sqls[0], queryTimes=1) + tdSql.checkRows(5) + tdSql.checkData(0,0, '2018-11-25 19:30:00.000') + tdSql.checkData(0,1, '2018-11-25 19:30:01.000') + tdSql.checkData(0,2, '2018-11-25 19:30:01.000') + tdSql.checkData(0,3, '2018-11-25 19:30:00.000') + + tdSql.query("select ts, last(c10), t1, t2 from meters partition by t1, t2") + tdSql.checkRows(10) + tdSql.checkData(0, 0, '2018-11-25 19:30:00.000') + tdSql.checkData(0, 1, '2018-11-25 19:30:01.000') + def run(self): self.prepareTestEnv() #time.sleep(99999999)