fix(query): add ts in cache_scan_operator if pk exists and only retrieve ts column.

This commit is contained in:
Haojun Liao 2024-04-25 09:29:42 +08:00
parent a37667968f
commit ac860f7fe8
7 changed files with 29 additions and 24 deletions

View File

@ -367,11 +367,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
goto _end;
}
int32_t pkBufLen = 0;
if (pr->rowKey.numOfPKs > 0) {
pkBufLen = pr->pkColumn.bytes;
}
int32_t pkBufLen = (pr->rowKey.numOfPKs > 0)? pr->pkColumn.bytes:0;
for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes = (slotIds[j] == -1) ? 1 : pr->pSchema->columns[slotIds[j]].bytes;

View File

@ -278,7 +278,7 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
int32_t code = TSDB_CODE_SUCCESS;
for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check
// todo add a dummy function to avoid process check
if (pCtx[k].fpSet.process == NULL) {
continue;
}

View File

@ -221,6 +221,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList;
SStoreCacheReader* pReaderFn = &pInfo->readHandle.api.cacheFn;
SSDataBlock* pBufRes = pInfo->pBufferedRes;
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
@ -237,18 +238,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
if (pInfo->indexOfBufferedRes >= pInfo->pBufferedRes->info.rows) {
blockDataCleanup(pInfo->pBufferedRes);
if (pInfo->indexOfBufferedRes >= pBufRes->info.rows) {
blockDataCleanup(pBufRes);
taosArrayClear(pInfo->pUidList);
int32_t code = pReaderFn->retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds,
pInfo->pDstSlotIds, pInfo->pUidList);
int32_t code =
pReaderFn->retrieveRows(pInfo->pLastrowReader, pBufRes, pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
// check for tag values
int32_t resultRows = pInfo->pBufferedRes->info.rows;
int32_t resultRows = pBufRes->info.rows;
// the results may be null, if last values are all null
ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
@ -257,12 +258,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->pRes;
if (pInfo->indexOfBufferedRes < pInfo->pBufferedRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pBufferedRes->pDataBlock); ++i) {
if (pInfo->indexOfBufferedRes < pBufRes->info.rows) {
for (int32_t i = 0; i < taosArrayGetSize(pBufRes->pDataBlock); ++i) {
SColumnInfoData* pCol = taosArrayGet(pRes->pDataBlock, i);
int32_t slotId = pCol->info.slotId;
SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferedRes->pDataBlock, slotId);
SColumnInfoData* pSrc = taosArrayGet(pBufRes->pDataBlock, slotId);
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId);
if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) {

View File

@ -2837,7 +2837,7 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
if (pInput->pkData) {
pOutput->pkBytes = pInput->pkBytes;
memcpy(pOutput->buf+pOutput->bytes, pInput->pkData, pOutput->pkBytes);
memcpy(pOutput->buf + pOutput->bytes, pInput->pkData, pOutput->pkBytes);
pOutput->pkData = pOutput->buf + pOutput->bytes;
}
return TSDB_CODE_SUCCESS;
@ -2885,7 +2885,8 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
} else {
pInputInfo->pkData = NULL;
}
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

View File

@ -2343,7 +2343,7 @@ static EDealRes collectFuncs(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
}
SExprNode* pExpr = (SExprNode*)pNode;
bool bFound = false;
SNode* pn = NULL;
FOREACH(pn, pCxt->pFuncs) {

View File

@ -19,6 +19,9 @@
#include "tglobal.h"
#include "parser.h"
// primary key column always the second column if exists
#define PRIMARY_COLUMN_SLOT 1
typedef struct SLogicPlanContext {
SPlanContext* pPlanCxt;
SLogicNode* pCurrRoot;
@ -304,7 +307,7 @@ static SNode* createFirstCol(SRealTableNode* pTable, const SSchema* pSchema) {
return (SNode*)pCol;
}
static int32_t addPrimaryKeyCol(SRealTableNode* pTable, SNodeList** pCols) {
static int32_t addPrimaryTsCol(SRealTableNode* pTable, SNodeList** pCols) {
bool found = false;
SNode* pCol = NULL;
FOREACH(pCol, *pCols) {
@ -327,10 +330,10 @@ static int32_t addSystableFirstCol(SRealTableNode* pTable, SNodeList** pCols) {
return nodesListMakeStrictAppend(pCols, createFirstCol(pTable, pTable->pMeta->schema));
}
static int32_t addPkCol(SRealTableNode* pTable, SNodeList** pCols) {
static int32_t addPrimaryKeyCol(SRealTableNode* pTable, SNodeList** pCols) {
bool found = false;
SNode* pCol = NULL;
SSchema* pSchema = &pTable->pMeta->schema[1];
SSchema* pSchema = &pTable->pMeta->schema[PRIMARY_COLUMN_SLOT];
FOREACH(pCol, *pCols) {
if (pSchema->colId == ((SColumnNode*)pCol)->colId) {
found = true;
@ -348,9 +351,9 @@ static int32_t addDefaultScanCol(SRealTableNode* pTable, SNodeList** pCols) {
if (TSDB_SYSTEM_TABLE == pTable->pMeta->tableType) {
return addSystableFirstCol(pTable, pCols);
}
int32_t code = addPrimaryKeyCol(pTable, pCols);
int32_t code = addPrimaryTsCol(pTable, pCols);
if (code == TSDB_CODE_SUCCESS && hasPkInTable(pTable->pMeta)) {
code = addPkCol(pTable, pCols);
code = addPrimaryKeyCol(pTable, pCols);
}
return code;
}
@ -1802,7 +1805,7 @@ static int32_t createDeleteScanLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p
STableMeta* pMeta = ((SRealTableNode*)pDelete->pFromTable)->pMeta;
if (TSDB_CODE_SUCCESS == code && hasPkInTable(pMeta)) {
code = addPkCol((SRealTableNode*)pDelete->pFromTable, &pScan->pScanCols);
code = addPrimaryKeyCol((SRealTableNode*)pDelete->pFromTable, &pScan->pScanCols);
}
if (TSDB_CODE_SUCCESS == code && NULL != pDelete->pTagCond) {

View File

@ -3966,21 +3966,25 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
if (NULL != cxt.pLastCols) {
cxt.doAgg = false;
cxt.funcType = FUNCTION_TYPE_CACHE_LAST;
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, pLastRowCols, true, cxt.pkBytes);
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, pLastRowCols, false, cxt.pkBytes);
lastRowScanOptRemoveUslessTargets(pScan->node.pTargets, cxt.pLastCols, cxt.pOtherCols, pLastRowCols);
if (pPKTsCol && pScan->node.pTargets->length == 1) {
if (pPKTsCol && ((pScan->node.pTargets->length == 1) || (pScan->node.pTargets->length == 2 && cxt.pkBytes > 0))) {
// when select last(ts),ts from ..., we add another ts to targets
sprintf(pPKTsCol->colName, "#sel_val.%p", pPKTsCol);
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pPKTsCol));
}
if (pNonPKCol && cxt.pLastCols->length == 1 &&
nodesEqualNode((SNode*)pNonPKCol, nodesListGetNode(cxt.pLastCols, 0))) {
// when select last(c1), c1 from ..., we add c1 to targets
sprintf(pNonPKCol->colName, "#sel_val.%p", pNonPKCol);
nodesListAppend(pScan->node.pTargets, nodesCloneNode((SNode*)pNonPKCol));
}
nodesClearList(cxt.pLastCols);
}
nodesClearList(cxt.pOtherCols);