correct colid in blockinfo
This commit is contained in:
parent
829ad64205
commit
a6600ab23a
|
@ -388,6 +388,7 @@ typedef struct SLastRowScanPhysiNode {
|
||||||
SNodeList* pGroupTags;
|
SNodeList* pGroupTags;
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
bool ignoreNull;
|
bool ignoreNull;
|
||||||
|
SNodeList* pTargets;
|
||||||
} SLastRowScanPhysiNode;
|
} SLastRowScanPhysiNode;
|
||||||
|
|
||||||
typedef SLastRowScanPhysiNode STableCountScanPhysiNode;
|
typedef SLastRowScanPhysiNode STableCountScanPhysiNode;
|
||||||
|
|
|
@ -30,10 +30,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
||||||
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
SFirstLastRes* p;
|
SFirstLastRes* p;
|
||||||
|
col_id_t colId;
|
||||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
|
||||||
int32_t slotId = slotIds[i];
|
int32_t slotId = slotIds[i];
|
||||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
|
||||||
|
colId = pColVal->colVal.cid;
|
||||||
p = (SFirstLastRes*)varDataVal(pRes[i]);
|
p = (SFirstLastRes*)varDataVal(pRes[i]);
|
||||||
|
|
||||||
p->ts = pColVal->ts;
|
p->ts = pColVal->ts;
|
||||||
|
@ -63,8 +65,7 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
|
||||||
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pCol->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID && pCol->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
|
colDataSetVal(pCol, numOfRows, (const char*)&ts, false);
|
||||||
continue;
|
continue;
|
||||||
}
|
} else if (pReader->numOfCols == 1 && idx != dstSlotIds[0] && pCol->info.colId == colId) {
|
||||||
if (pReader->numOfCols == 1 && dstSlotIds[0] != idx) {
|
|
||||||
if (!p->isNull) {
|
if (!p->isNull) {
|
||||||
colDataSetVal(pCol, numOfRows, p->buf, false);
|
colDataSetVal(pCol, numOfRows, p->buf, false);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -54,6 +54,19 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM
|
||||||
|
|
||||||
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
|
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
|
||||||
|
|
||||||
|
static void setColIdForCacheReadBlock(SSDataBlock* pBlock, SNodeList* pTargets) {
|
||||||
|
SNode* pNode;
|
||||||
|
int32_t idx = 0;
|
||||||
|
FOREACH(pNode, pTargets) {
|
||||||
|
if (nodeType(pNode) == QUERY_NODE_COLUMN) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, idx);
|
||||||
|
pColInfo->info.colId = pCol->colId;
|
||||||
|
}
|
||||||
|
idx++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
||||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -114,10 +127,12 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
capacity = TMIN(totalTables, 4096);
|
capacity = TMIN(totalTables, 4096);
|
||||||
|
|
||||||
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
|
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
|
||||||
|
setColIdForCacheReadBlock(pInfo->pBufferredRes, pScanNode->pTargets);
|
||||||
blockDataEnsureCapacity(pInfo->pBufferredRes, capacity);
|
blockDataEnsureCapacity(pInfo->pBufferredRes, capacity);
|
||||||
} else { // by tags
|
} else { // by tags
|
||||||
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE | SCAN_ROW_TYPE(pScanNode->ignoreNull);
|
||||||
capacity = 1; // only one row output
|
capacity = 1; // only one row output
|
||||||
|
setColIdForCacheReadBlock(pInfo->pRes, pScanNode->pTargets);
|
||||||
}
|
}
|
||||||
|
|
||||||
initResultSizeInfo(&pOperator->resultInfo, capacity);
|
initResultSizeInfo(&pOperator->resultInfo, capacity);
|
||||||
|
@ -192,7 +207,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
|
if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferredRes->pDataBlock); ++i) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pInfo->pBufferredRes->pDataBlock, i);
|
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
|
||||||
int32_t slotId = pCol->info.slotId;
|
int32_t slotId = pCol->info.slotId;
|
||||||
|
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId);
|
||||||
|
|
|
@ -1773,6 +1773,7 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
|
||||||
|
|
||||||
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
|
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
|
||||||
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
|
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
|
||||||
|
static const char* jkLastRowScanPhysiPlanTargets = "Targets";
|
||||||
|
|
||||||
static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
|
const SLastRowScanPhysiNode* pNode = (const SLastRowScanPhysiNode*)pObj;
|
||||||
|
@ -1784,6 +1785,9 @@ static int32_t physiLastRowScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort);
|
code = tjsonAddBoolToObject(pJson, jkLastRowScanPhysiPlanGroupSort, pNode->groupSort);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkLastRowScanPhysiPlanTargets, pNode->pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1798,6 +1802,9 @@ static int32_t jsonToPhysiLastRowScanNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort);
|
code = tjsonGetBoolValue(pJson, jkLastRowScanPhysiPlanGroupSort, &pNode->groupSort);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkLastRowScanPhysiPlanTargets, &pNode->pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2052,7 +2052,8 @@ enum {
|
||||||
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
|
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
|
||||||
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
|
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
|
||||||
PHY_LAST_ROW_SCAN_CODE_GROUP_SORT,
|
PHY_LAST_ROW_SCAN_CODE_GROUP_SORT,
|
||||||
PHY_LAST_ROW_SCAN_CODE_IGNULL
|
PHY_LAST_ROW_SCAN_CODE_IGNULL,
|
||||||
|
PHY_LAST_ROW_SCAN_CODE_TARGETS
|
||||||
};
|
};
|
||||||
|
|
||||||
static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
|
@ -2068,6 +2069,9 @@ static int32_t physiLastRowScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull);
|
code = tlvEncodeBool(pEncoder, PHY_LAST_ROW_SCAN_CODE_IGNULL, pNode->ignoreNull);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeObj(pEncoder, PHY_LAST_ROW_SCAN_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2091,6 +2095,9 @@ static int32_t msgToPhysiLastRowScanNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_LAST_ROW_SCAN_CODE_IGNULL:
|
case PHY_LAST_ROW_SCAN_CODE_IGNULL:
|
||||||
code = tlvDecodeBool(pTlv, &pNode->ignoreNull);
|
code = tlvDecodeBool(pTlv, &pNode->ignoreNull);
|
||||||
break;
|
break;
|
||||||
|
case PHY_LAST_ROW_SCAN_CODE_TARGETS:
|
||||||
|
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1285,6 +1285,7 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode;
|
SLastRowScanPhysiNode* pPhyNode = (SLastRowScanPhysiNode*)pNode;
|
||||||
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
destroyScanPhysiNode((SScanPhysiNode*)pNode);
|
||||||
nodesDestroyList(pPhyNode->pGroupTags);
|
nodesDestroyList(pPhyNode->pGroupTags);
|
||||||
|
nodesDestroyList(pPhyNode->pTargets);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||||
|
|
|
@ -552,6 +552,7 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu
|
||||||
if (NULL == pScan) {
|
if (NULL == pScan) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
pScan->pTargets = nodesCloneList(pScanLogicNode->node.pTargets);
|
||||||
|
|
||||||
pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
|
pScan->pGroupTags = nodesCloneList(pScanLogicNode->pGroupTags);
|
||||||
if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
|
if (NULL != pScanLogicNode->pGroupTags && NULL == pScan->pGroupTags) {
|
||||||
|
|
|
@ -283,6 +283,42 @@ class TDTestCase:
|
||||||
tdSql.checkData(0, 3, 1001)
|
tdSql.checkData(0, 3, 1001)
|
||||||
tdSql.checkData(0, 4, "2018-11-25 19:30:00.000")
|
tdSql.checkData(0, 4, "2018-11-25 19:30:00.000")
|
||||||
|
|
||||||
|
sql_template = 'select %s from meters partition by tbname'
|
||||||
|
select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, tbname", "last(c10), c10, ts"]
|
||||||
|
has_last_row_scan_res = [1,1,1]
|
||||||
|
sqls = self.format_sqls(sql_template, select_items)
|
||||||
|
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||||
|
tdSql.query(sqls[0], queryTimes=1)
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0,0, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0,1, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,2, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,3, '2018-11-25 19:30:00.000')
|
||||||
|
|
||||||
|
tdSql.query(sqls[1], queryTimes=1)
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0,0, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0,1, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0,2, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,3, '2018-11-25 19:30:01.000')
|
||||||
|
|
||||||
|
sql_template = 'select %s from meters partition by t1'
|
||||||
|
select_items = ["ts, last(c10), c10, ts", "ts, ts, last(c10), c10, t1", "last(c10), c10, ts"]
|
||||||
|
has_last_row_scan_res = [1,1,1]
|
||||||
|
sqls = self.format_sqls(sql_template, select_items)
|
||||||
|
self.explain_and_check_res(sqls, has_last_row_scan_res)
|
||||||
|
tdSql.query(sqls[0], queryTimes=1)
|
||||||
|
tdSql.checkRows(5)
|
||||||
|
tdSql.checkData(0,0, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0,1, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,2, '2018-11-25 19:30:01.000')
|
||||||
|
tdSql.checkData(0,3, '2018-11-25 19:30:00.000')
|
||||||
|
|
||||||
|
tdSql.query("select ts, last(c10), t1, t2 from meters partition by t1, t2")
|
||||||
|
tdSql.checkRows(10)
|
||||||
|
tdSql.checkData(0, 0, '2018-11-25 19:30:00.000')
|
||||||
|
tdSql.checkData(0, 1, '2018-11-25 19:30:01.000')
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
#time.sleep(99999999)
|
#time.sleep(99999999)
|
||||||
|
|
Loading…
Reference in New Issue