fix(query): always generated results for last_row query if rows exists.
This commit is contained in:
parent
e99231c1c8
commit
6f2ac5ba72
|
@ -30,6 +30,7 @@ typedef struct SLastrowReader {
|
||||||
} SLastrowReader;
|
} SLastrowReader;
|
||||||
|
|
||||||
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
|
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
|
||||||
|
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
|
|
||||||
SColVal colVal = {0};
|
SColVal colVal = {0};
|
||||||
|
@ -69,7 +70,6 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
|
||||||
p->type = type;
|
p->type = type;
|
||||||
p->pVnode = pVnode;
|
p->pVnode = pVnode;
|
||||||
p->numOfCols = numOfCols;
|
p->numOfCols = numOfCols;
|
||||||
p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pTableIdList) == 0) {
|
if (taosArrayGetSize(pTableIdList) == 0) {
|
||||||
*pReader = p;
|
*pReader = p;
|
||||||
|
@ -80,6 +80,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
|
||||||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
|
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
|
||||||
p->pTableList = pTableIdList;
|
p->pTableList = pTableIdList;
|
||||||
|
|
||||||
|
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
|
||||||
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
|
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
|
||||||
if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
|
if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
|
||||||
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
|
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
|
||||||
|
|
|
@ -2217,7 +2217,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
{
|
{
|
||||||
.name = "last_row",
|
.name = "last_row",
|
||||||
.type = FUNCTION_TYPE_LAST_ROW,
|
.type = FUNCTION_TYPE_LAST_ROW,
|
||||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||||
.translateFunc = translateFirstLast,
|
.translateFunc = translateFirstLast,
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
|
|
|
@ -81,7 +81,9 @@ typedef struct STopBotRes {
|
||||||
|
|
||||||
typedef struct SFirstLastRes {
|
typedef struct SFirstLastRes {
|
||||||
bool hasResult;
|
bool hasResult;
|
||||||
bool isNull; // used for last_row function only
|
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
|
||||||
|
// this attribute is required
|
||||||
|
bool isNull;
|
||||||
int32_t bytes;
|
int32_t bytes;
|
||||||
char buf[];
|
char buf[];
|
||||||
} SFirstLastRes;
|
} SFirstLastRes;
|
||||||
|
@ -2900,7 +2902,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
||||||
|
|
||||||
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes);
|
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes);
|
||||||
// handle selectivity
|
// handle selectivity
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
||||||
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
||||||
|
@ -5956,24 +5958,28 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
int32_t type = pInputCol->info.type;
|
int32_t type = pInputCol->info.type;
|
||||||
int32_t bytes = pInputCol->info.bytes;
|
int32_t bytes = pInputCol->info.bytes;
|
||||||
|
|
||||||
pInfo->bytes = bytes;
|
pInfo->bytes = bytes;
|
||||||
|
|
||||||
|
// last_row function does not ignore the null value
|
||||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||||
if (pInputCol->hasNull && colDataIsNull_s(pInputCol, i)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
|
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pInputCol, i)) {
|
||||||
|
pInfo->isNull = true;
|
||||||
|
} else {
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
bytes = varDataTLen(data);
|
bytes = varDataTLen(data);
|
||||||
pInfo->bytes = bytes;
|
pInfo->bytes = bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pInfo->buf, data, bytes);
|
memcpy(pInfo->buf, data, bytes);
|
||||||
|
}
|
||||||
|
|
||||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
||||||
|
|
||||||
pInfo->hasResult = true;
|
pInfo->hasResult = true;
|
||||||
|
|
Loading…
Reference in New Issue