From 649cf7e55dbe7182e914d377c6889f3ef49e7e81 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Jul 2022 14:21:44 +0800 Subject: [PATCH 1/4] fix(query): support last_row(tags) for super table query. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 6 ++-- source/libs/executor/inc/executorimpl.h | 3 ++ source/libs/executor/src/cachescanoperator.c | 31 ++++++++++++++------ source/libs/executor/src/scanoperator.c | 4 --- source/libs/function/src/builtinsimpl.c | 13 ++++---- 6 files changed, 37 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 9f32964fa9..f0a6b6505d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -138,7 +138,7 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); -int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds); +int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray* pTableUids); int32_t tsdbLastrowReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 5c09c7663f..5855468f31 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -104,7 +104,7 @@ int32_t tsdbLastrowReaderClose(void* pReader) { return TSDB_CODE_SUCCESS; } -int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds) { +int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { if (pReader == NULL || pResBlock == NULL) { return TSDB_CODE_INVALID_PARA; } @@ -141,14 +141,15 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t // appended or not. if (internalResult) { pResBlock->info.rows -= 1; + taosArrayClear(pTableUidList); } saveOneRow(pRow, pResBlock, pr, slotIds); + taosArrayPush(pTableUidList, &pKeyInfo->uid); internalResult = true; lastKey = pRow->ts; } - // taosMemoryFree(pRow); tsdbCacheRelease(lruCache, h); } } else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { @@ -171,6 +172,7 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t // tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema); saveOneRow(pRow, pResBlock, pr, slotIds); + taosArrayPush(pTableUidList, &pKeyInfo->uid); // taosMemoryFree(pRow); tsdbCacheRelease(lruCache, h); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index aab2f51421..3da8e298a6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -319,6 +319,7 @@ typedef struct SLastrowScanInfo { void *pLastrowReader; SArray *pColMatchInfo; int32_t *pSlotIds; + SExprSupp pseudoExprSup; } SLastrowScanInfo; typedef enum EStreamScanMode { @@ -787,6 +788,8 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); +int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, + SSDataBlock* pBlock, const char* idStr); void cleanupAggSup(SAggSupporter* pAggSup); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 7b1351a024..0f6817cd6b 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -45,20 +45,20 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead int32_t numOfCols = 0; pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfCols, COL_MATCH_FROM_COL_ID); - int32_t* pCols = taosMemoryMalloc(numOfCols * sizeof(int32_t)); - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { - SColMatchInfo* pColMatch = taosArrayGet(pInfo->pColMatchInfo, i); - pCols[i] = pColMatch->colId; - } - int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); if (code != TSDB_CODE_SUCCESS) { goto _error; } - tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_ALL, pTableList, taosArrayGetSize(pInfo->pColMatchInfo), + tsdbLastRowReaderOpen(readHandle->vnode, LASTROW_RETRIEVE_TYPE_SINGLE, pTableList, taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); - taosMemoryFree(pCols); + + if (pScanNode->pScanPseudoCols != NULL) { + SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup; + + pPseudoExpr->pExprInfo = createExprInfo(pScanNode->pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs); + pPseudoExpr->pCtx = createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset); + } pOperator->name = "LastrowScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; @@ -100,7 +100,20 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { // check if it is a group by tbname if (size == taosArrayGetSize(pInfo->pTableList)) { blockDataCleanup(pInfo->pRes); - tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds); + SArray* pUidList = taosArrayInit(1, sizeof(tb_uid_t)); + int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pUidList); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + // check for tag values + if (pInfo->pRes->info.rows > 0 && pInfo->pseudoExprSup.numOfExprs > 0) { + SExprSupp* pSup = &pInfo->pseudoExprSup; + pInfo->pRes->info.uid = *(tb_uid_t*) taosArrayGet(pUidList, 0); + addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, GET_TASKID(pTaskInfo)); + } + + doSetOperatorCompleted(pOperator); return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } else { // todo fetch the result for each group diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 64c740decf..66703502eb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -39,8 +39,6 @@ static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capac static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size, const char* dbName); -static int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, const char* idStr); static bool processBlockWithProbability(const SSampleExecInfo* pInfo); bool processBlockWithProbability(const SSampleExecInfo* pInfo) { @@ -320,8 +318,6 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int int32_t dstSlotId = pExpr->base.resSchema.slotId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); - - colInfoDataEnsureCapacity(pColInfoData, pBlock->info.rows); colInfoDataCleanup(pColInfoData, pBlock->info.rows); int32_t functionId = pExpr->pExpr->_function.functionId; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index ccf28bfd78..c1143020f0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -80,11 +80,12 @@ typedef struct STopBotRes { } STopBotRes; typedef struct SFirstLastRes { - bool hasResult; + bool hasResult; // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, // this attribute is required - bool isNull; + bool isNull; int32_t bytes; + int64_t ts; char buf[]; } SFirstLastRes; @@ -2951,6 +2952,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes); + // handle selectivity STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); @@ -5988,7 +5990,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - int32_t type = pInputCol->info.type; + int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; pInfo->bytes = bytes; @@ -5999,7 +6001,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); - if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { if (colDataIsNull_s(pInputCol, i)) { pInfo->isNull = true; @@ -6012,8 +6014,7 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { memcpy(pInfo->buf, data, bytes); } - *(TSKEY*)(pInfo->buf + bytes) = cts; - + pInfo->ts = cts; pInfo->hasResult = true; pResInfo->numOfRes = 1; From 56b1d11beb1f36188681af3c2ea92b07388102fa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Jul 2022 15:06:27 +0800 Subject: [PATCH 2/4] fix(query): prepare the output buffer before assign daata. --- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/scanoperator.c | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6f4a6b805d..760d7e55c8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4334,6 +4334,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTaskInfo->code = code; return NULL; } + code = extractTableSchemaInfo(pHandle, pTableScanNode->scan.uid, pTaskInfo); if (code) { pTaskInfo->code = terrno; @@ -4349,7 +4350,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); - } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STimeWindowAggSupp twSup = { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 66703502eb..c7112ab8a6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1153,10 +1153,11 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock SOperatorInfo* pOperator = pInfo->pStreamScanOp; SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo; + blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); + pInfo->pRes->info.rows = pBlock->info.rows; pInfo->pRes->info.uid = pBlock->info.uid; pInfo->pRes->info.type = STREAM_NORMAL; - pInfo->pRes->info.capacity = pBlock->info.rows; uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupIdPre) { @@ -2415,6 +2416,7 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId); return TSDB_CODE_SUCCESS; } + pTableListInfo->needSortTableByGroupId = pTableScanNode->groupSort; code = generateGroupIdMap(pTableListInfo, pHandle, pTableScanNode->pGroupTags); if (code != TSDB_CODE_SUCCESS) { From afb20f79c71c97a9d6b95905bb89e0161fa5689b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Jul 2022 19:58:07 +0800 Subject: [PATCH 3/4] fix(query): copy the value instead of assign data. --- source/libs/executor/src/scanoperator.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c7112ab8a6..8786d30007 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1183,7 +1183,10 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock for (int32_t j = 0; j < blockDataGetNumOfCols(pBlock); ++j) { SColumnInfoData* pResCol = bdGetColumnInfoData(pBlock, j); if (pResCol->info.colId == pColMatchInfo->colId) { - taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol); + + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId); + colDataAssign(pDst, pResCol, pBlock->info.rows, &pInfo->pRes->info); +// taosArraySet(pInfo->pRes->pDataBlock, pColMatchInfo->targetSlotId, pResCol); colExists = true; break; } @@ -2590,9 +2593,11 @@ static SSDataBlock* getTableDataBlock(void* param) { SDataBlockInfo binfo = pBlock->info; tsdbRetrieveDataBlockInfo(reader, &binfo); - binfo.capacity = binfo.rows; blockDataEnsureCapacity(pBlock, binfo.capacity); - pBlock->info = binfo; + pBlock->info.type = binfo.type; + pBlock->info.uid = binfo.uid; + pBlock->info.window = binfo.window; + pBlock->info.rows = binfo.rows; uint32_t status = 0; int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); From d40191b62c9daaefc6e98d1b8c1a11ffdec66b23 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Jul 2022 20:58:14 +0800 Subject: [PATCH 4/4] fix(query): prepare the buffer before loading data. --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8786d30007..ba89592189 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2593,7 +2593,7 @@ static SSDataBlock* getTableDataBlock(void* param) { SDataBlockInfo binfo = pBlock->info; tsdbRetrieveDataBlockInfo(reader, &binfo); - blockDataEnsureCapacity(pBlock, binfo.capacity); + blockDataEnsureCapacity(pBlock, binfo.rows); pBlock->info.type = binfo.type; pBlock->info.uid = binfo.uid; pBlock->info.window = binfo.window;