From ecfb5e5f4c1bbffbce40bfc94bcf58f713c92594 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 20 Jun 2024 09:28:04 +0800 Subject: [PATCH] adj par name catch --- include/libs/executor/storageapi.h | 2 +- include/libs/stream/streamState.h | 2 +- source/libs/executor/src/groupoperator.c | 4 ++-- source/libs/executor/src/scanoperator.c | 4 ++-- source/libs/executor/src/streamfilloperator.c | 2 +- source/libs/executor/src/streamtimewindowoperator.c | 8 ++++---- source/libs/stream/src/streamState.c | 5 ++++- 7 files changed, 15 insertions(+), 12 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 1b2280115a..45f9f73fb1 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -329,7 +329,7 @@ typedef struct { typedef struct SStateStore { int32_t (*streamStatePutParName)(SStreamState* pState, int64_t groupId, const char* tbname); - int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal); + int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache); int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); int32_t (*streamStateReleaseBuf)(SStreamState* pState, void* pVal, bool used); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 106efa8947..5768160fdb 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -101,7 +101,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname); -int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); +int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache); void streamStateReloadInfo(SStreamState* pState, TSKEY ts); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8196a7ccde..3a124197e0 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1157,7 +1157,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { pDest->info.parTbName[0] = 0; if (pInfo->tbnameCalSup.numOfExprs > 0) { void* tbname = NULL; - if (pAPI->stateStore.streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) { + if (pAPI->stateStore.streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname, false) == 0) { memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); pAPI->stateStore.streamStateFreeVal(tbname); } @@ -1178,7 +1178,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { void appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock, SStateStore* pAPI) { void* pValue = NULL; - if (pAPI->streamStateGetParName(pState, groupId, &pValue) != 0) { + if (pAPI->streamStateGetParName(pState, groupId, &pValue, true) != 0) { SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId); memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); pTmpBlock->info.id.groupId = groupId; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1b90088605..baf98d8069 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1892,7 +1892,7 @@ static int32_t generatePartitionDelResBlock(SStreamScanInfo* pInfo, SSDataBlock* groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, preJ); if (pInfo->pPartTbnameSup) { void* parTbname = NULL; - int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); + int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname, false); if (code != TSDB_CODE_SUCCESS) { calBlockTbName(pInfo, pPreRes, preJ); memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName)); @@ -1938,7 +1938,7 @@ static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock } if (pInfo->tbnameCalSup.pExprInfo) { void* parTbname = NULL; - int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); + int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname, false); if (code != TSDB_CODE_SUCCESS) { SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver); printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo)); diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index b54802e2c6..1cdd7d2d87 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -707,7 +707,7 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_ SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); void* tbname = NULL; - pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname); + pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname, false); if (tbname == NULL) { colDataSetNULL(pTableCol, pBlock->info.rows); } else { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2d73cf3cf6..4d567f729e 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -376,7 +376,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin for (int32_t i = *index; i < size; i++) { SWinKey* pWin = taosArrayGet(pWins, i); void* tbname = NULL; - pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname); + pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname, false); if (tbname == NULL) { appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); } else { @@ -750,7 +750,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDat if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = groupId; void* tbname = NULL; - if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { + if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname, false) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -2276,7 +2276,7 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); void* tbname = NULL; - pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname); + pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname, false); if (tbname == NULL) { colDataSetNULL(pTableCol, pBlock->info.rows); } else { @@ -2446,7 +2446,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa void* tbname = NULL; if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, - &tbname) < 0) { + &tbname, false) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 550eac3ef3..23100362ac 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1106,10 +1106,13 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char #endif } -int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { +int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache) { #ifdef USE_ROCKSDB void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)); if (!pStr) { + if (onlyCache) { + return TSDB_CODE_FAILED; + } int32_t code = streamStateGetParName_rocksdb(pState, groupId, pVal); if (code == TSDB_CODE_SUCCESS) { tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);