Merge pull request #14836 from taosdata/feature/3_liaohj
fix(query): always generated results for last_row query if rows exists.
This commit is contained in:
commit
06c208fc89
|
@ -22,7 +22,6 @@ typedef struct SLastrowReader {
|
|||
SVnode* pVnode;
|
||||
STSchema* pSchema;
|
||||
uint64_t uid;
|
||||
// int32_t* pSlotIds;
|
||||
char** transferBuf; // todo remove it soon
|
||||
int32_t numOfCols;
|
||||
int32_t type;
|
||||
|
@ -31,6 +30,7 @@ typedef struct SLastrowReader {
|
|||
} SLastrowReader;
|
||||
|
||||
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) {
|
||||
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
|
||||
int32_t numOfRows = pBlock->info.rows;
|
||||
|
||||
SColVal colVal = {0};
|
||||
|
@ -40,15 +40,17 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade
|
|||
if (slotIds[i] == -1) {
|
||||
colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false);
|
||||
} else {
|
||||
tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal);
|
||||
int32_t slotId = slotIds[i];
|
||||
|
||||
tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(colVal.type)) {
|
||||
if (colVal.isNull || colVal.isNone) {
|
||||
colDataAppendNULL(pColInfoData, numOfRows);
|
||||
} else {
|
||||
varDataSetLen(pReader->transferBuf[i], colVal.value.nData);
|
||||
memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData);
|
||||
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false);
|
||||
varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData);
|
||||
memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData);
|
||||
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
|
||||
}
|
||||
} else {
|
||||
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull || colVal.isNone);
|
||||
|
@ -68,7 +70,6 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
|
|||
p->type = type;
|
||||
p->pVnode = pVnode;
|
||||
p->numOfCols = numOfCols;
|
||||
p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES);
|
||||
|
||||
if (taosArrayGetSize(pTableIdList) == 0) {
|
||||
*pReader = p;
|
||||
|
@ -79,7 +80,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
|
|||
p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1);
|
||||
p->pTableList = pTableIdList;
|
||||
|
||||
for (int32_t i = 0; i < p->numOfCols; ++i) {
|
||||
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
|
||||
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
|
||||
if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
|
||||
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
|
||||
}
|
||||
|
@ -92,10 +94,11 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
|
|||
int32_t tsdbLastrowReaderClose(void* pReader) {
|
||||
SLastrowReader* p = pReader;
|
||||
|
||||
for (int32_t i = 0; i < p->numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
|
||||
taosMemoryFreeClear(p->transferBuf[i]);
|
||||
}
|
||||
|
||||
taosMemoryFree(p->pSchema);
|
||||
taosMemoryFree(p->transferBuf);
|
||||
taosMemoryFree(pReader);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -63,10 +63,10 @@ typedef struct SBlockLoadSuppInfo {
|
|||
} SBlockLoadSuppInfo;
|
||||
|
||||
typedef struct SFilesetIter {
|
||||
int32_t numOfFiles; // number of total files
|
||||
int32_t index; // current accessed index in the list
|
||||
SArray* pFileList; // data file list
|
||||
int32_t order;
|
||||
int32_t numOfFiles; // number of total files
|
||||
int32_t index; // current accessed index in the list
|
||||
SArray* pFileList; // data file list
|
||||
int32_t order;
|
||||
} SFilesetIter;
|
||||
|
||||
typedef struct SFileDataBlockInfo {
|
||||
|
@ -302,7 +302,10 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
|
|||
STimeWindow win = {0};
|
||||
|
||||
while (1) {
|
||||
/*if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);*/
|
||||
// if (pReader->pFileReader != NULL) {
|
||||
// tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
// }
|
||||
|
||||
pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index);
|
||||
|
||||
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset);
|
||||
|
@ -2434,6 +2437,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd
|
|||
SVersionRange* pVerRange, int32_t step) {
|
||||
while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
|
||||
if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
|
||||
rowIndex += step;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -2832,7 +2836,10 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
|||
destroyBlockScanInfo(pReader->status.pTableMap);
|
||||
blockDataDestroy(pReader->pResBlock);
|
||||
|
||||
if (pReader->pFileReader != NULL) tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
if (pReader->pFileReader != NULL) {
|
||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
}
|
||||
|
||||
#if 0
|
||||
// if (pReader->status.pTableScanInfo != NULL) {
|
||||
// pReader->status.pTableScanInfo = destroyTableCheckInfo(pReader->status.pTableScanInfo);
|
||||
|
@ -3020,6 +3027,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
|
||||
|
||||
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
|
||||
// todo set the correct numOfTables
|
||||
int32_t numOfTables = 1;
|
||||
|
|
|
@ -2217,7 +2217,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
{
|
||||
.name = "last_row",
|
||||
.type = FUNCTION_TYPE_LAST_ROW,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
|
||||
.translateFunc = translateFirstLast,
|
||||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
|
|
|
@ -80,8 +80,10 @@ typedef struct STopBotRes {
|
|||
} STopBotRes;
|
||||
|
||||
typedef struct SFirstLastRes {
|
||||
bool hasResult;
|
||||
bool isNull; // used for last_row function only
|
||||
bool hasResult;
|
||||
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
|
||||
// this attribute is required
|
||||
bool isNull;
|
||||
int32_t bytes;
|
||||
char buf[];
|
||||
} SFirstLastRes;
|
||||
|
@ -2948,7 +2950,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|||
pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0;
|
||||
|
||||
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes);
|
||||
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes);
|
||||
// handle selectivity
|
||||
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
||||
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
||||
|
@ -5988,24 +5990,28 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) {
|
|||
|
||||
int32_t type = pInputCol->info.type;
|
||||
int32_t bytes = pInputCol->info.bytes;
|
||||
|
||||
pInfo->bytes = bytes;
|
||||
|
||||
// last_row function does not ignore the null value
|
||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||
if (pInputCol->hasNull && colDataIsNull_s(pInputCol, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
|
||||
char* data = colDataGetData(pInputCol, i);
|
||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
bytes = varDataTLen(data);
|
||||
pInfo->bytes = bytes;
|
||||
|
||||
if (colDataIsNull_s(pInputCol, i)) {
|
||||
pInfo->isNull = true;
|
||||
} else {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
bytes = varDataTLen(data);
|
||||
pInfo->bytes = bytes;
|
||||
}
|
||||
|
||||
memcpy(pInfo->buf, data, bytes);
|
||||
}
|
||||
|
||||
memcpy(pInfo->buf, data, bytes);
|
||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
||||
|
||||
pInfo->hasResult = true;
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 6dccac192a2ae7dd78718ab926201aab5419327a
|
||||
Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6
|
Loading…
Reference in New Issue