From 9d9a88b3edc2bdd04e4cc7923c5ac5d1a18cb3b7 Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Thu, 23 Nov 2023 19:52:32 +0800 Subject: [PATCH] set datablock table name --- source/libs/executor/src/scanoperator.c | 36 ++++++++++++++----------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 28832ffec8..448c585869 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1525,6 +1525,17 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS return TSDB_CODE_SUCCESS; } +static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { + SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; + blockDataCleanup(pInfo->pCreateTbRes); + if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { + pBlock->info.parTbName[0] = 0; + } else { + appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, + pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore); + } +} + static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) { blockDataCleanup(pDestBlock); int32_t rows = pSrcBlock->info.rows; @@ -1549,15 +1560,21 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { uint64_t srcUid = srcUidData[i]; uint64_t groupId = srcGp[i]; - char* tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0}; + char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0}; if (groupId == 0) { groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver); } if (pInfo->tbnameCalSup.pExprInfo) { void* parTbname = NULL; - pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); - - memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); + code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); + if (code != TSDB_CODE_SUCCESS) { + SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver); + printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo)); + calBlockTbName(pInfo, pPreRes); + memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName)); + } else { + memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); + } varDataSetLen(tbname, strlen(varDataVal(tbname))); pInfo->stateStore.streamStateFreeVal(parTbname); } @@ -1583,17 +1600,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } -static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { - SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; - blockDataCleanup(pInfo->pCreateTbRes); - if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { - pBlock->info.parTbName[0] = 0; - } else { - appendCreateTableRow(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, &pInfo->tbnameCalSup, &pInfo->tagCalSup, - pBlock->info.id.groupId, pBlock, 0, pInfo->pCreateTbRes, &pInfo->stateStore); - } -} - void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName) { SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);