diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9f32964fa9..f0a6b6505d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); -int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds); +int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids); int32_t tsdbLastrowReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 5c09c7663f..5855468f31 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -104,7 +104,7 @@ int32_t tsdbLastrowReaderClose(void* pReader) { return TSDB_CODE_SUCCESS; } -int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds) { +int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -141,14 +141,15 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t // appended or not. if (internalResult) { pResBlock->info.rows -= 1; + taosArrayClear(pTableUidList); } saveOneRow(pRow, pResBlock, pr, slotIds); + taosArrayPush(pTableUidList, &pKeyInfo->uid); internalResult = true; lastKey = pRow->ts; } - // taosMemoryFree(pRow); tsdbCacheRelease(lruCache, h); } } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { @@ -171,6 +172,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t // tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema); saveOneRow(pRow, pResBlock, pr, slotIds); + taosArrayPush(pTableUidList, &pKeyInfo->uid); // taosMemoryFree(pRow); tsdbCacheRelease(lruCache, h); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index aab2f51421..3da8e298a6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -319,6 +319,7 @@ typedef struct SLastrowScanInfo { void *pLastrowReader; SArray *pColMatchInfo; int32_t *pSlotIds; + SExprSupp pseudoExprSup; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -787,6 +788,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); +int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, + SSDataBlock* pBlock, const char* idStr); void cleanupAggSup(SAggSupporter* pAggSup); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 7b1351a024..0f6817cd6b 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -45,20 +45,20 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead int32_t numOfCols = 0; pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols, COL_MATCH_FROM_COL_ID); - int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t)); - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { - SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i); - pCols[i] = pColMatch->colId; - } - int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); if (code != TSDB_CODE_SUCCESS) { goto _error; } - tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, taosArrayGetSize(pInfo->pColMatchInfo), + tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); - taosMemoryFree(pCols); + + if (pScanNode->pScanPseudoCols != NULL) { + SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup; + + pPseudoExpr->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs); + pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset); + } pOperator->name = "LastrowScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; @@ -100,7 +100,20 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { // check if it is a group by tbname if (size == taosArrayGetSize(pInfo->pTableList)) { blockDataCleanup(pInfo->pRes); - tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds); + SArray* pUidList = taosArrayInit(1, sizeof(tb_uid_t)); + int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pUidList); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + // check for tag values + if (pInfo->pRes->info.rows > 0 && pInfo->pseudoExprSup.numOfExprs > 0) { + SExprSupp* pSup = &pInfo->pseudoExprSup; + pInfo->pRes->info.uid = *(tb_uid_t*) taosArrayGet(pUidList, 0); + addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo)); + } + + doSetOperatorCompleted(pOperator); return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } else { // todo fetch the result for each group diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 64c740decf..66703502eb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -39,8 +39,6 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); -static int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, const char* idStr); static bool processBlockWithProbability(const SSampleExecInfo* pInfo); bool processBlockWithProbability(const SSampleExecInfo* pInfo) { @@ -320,8 +318,6 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int int32_t dstSlotId = pExpr->base.resSchema.slotId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); - - colInfoDataEnsureCapacity(pColInfoData, pBlock->info.rows); colInfoDataCleanup(pColInfoData, pBlock->info.rows); int32_t functionId = pExpr->pExpr->_function.functionId; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index ccf28bfd78..c1143020f0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -80,11 +80,12 @@ typedef struct STopBotRes { } STopBotRes; typedef struct SFirstLastRes { - bool hasResult; + bool hasResult; // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, // this attribute is required - bool isNull; + bool isNull; int32_t bytes; + int64_t ts; char buf[]; } SFirstLastRes; @@ -2951,6 +2952,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes); + // handle selectivity STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); @@ -5988,7 +5990,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t type = pInputCol->info.type; + int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; @@ -5999,7 +6001,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { if (colDataIsNull_s(pInputCol, i)) { pInfo->isNull = true; @@ -6012,8 +6014,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { memcpy(pInfo->buf, data, bytes); } - *(TSKEY*)(pInfo->buf + bytes) = cts; - + pInfo->ts = cts; pInfo->hasResult = true; pResInfo->numOfRes = 1;