diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index db0d6339c8..feb7bcc25e 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -336,6 +336,7 @@ typedef struct SStateStore { int32_t (*streamStatePutParName)(SStreamState* pState, int64_t groupId, const char* tbname); int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache, int32_t* pWinCode); + int32_t (*streamStateDeleteParName)(SStreamState* pState, int64_t groupId); int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode); diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index a50451c3eb..2179547352 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -116,6 +116,7 @@ void streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname); int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache, int32_t* pWinCode); +int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId); // group id int32_t streamStateGroupPut(SStreamState* pState, int64_t groupId, void* value, int32_t vLen); diff --git a/source/dnode/snode/src/snodeInitApi.c b/source/dnode/snode/src/snodeInitApi.c index 680a2fd83c..4fe4333534 100644 --- a/source/dnode/snode/src/snodeInitApi.c +++ b/source/dnode/snode/src/snodeInitApi.c @@ -31,6 +31,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStatePutParName = streamStatePutParName; pStore->streamStateGetParName = streamStateGetParName; + pStore->streamStateDeleteParName = streamStateDeleteParName; pStore->streamStateAddIfNotExist = streamStateAddIfNotExist; pStore->streamStateReleaseBuf = streamStateReleaseBuf; diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index d688d1323d..0ac0ee1b8f 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -147,6 +147,7 @@ void initStateStoreAPI(SStateStore* pStore) { pStore->streamStatePutParName = streamStatePutParName; pStore->streamStateGetParName = streamStateGetParName; + pStore->streamStateDeleteParName = streamStateDeleteParName; pStore->streamStateAddIfNotExist = streamStateAddIfNotExist; pStore->streamStateReleaseBuf = streamStateReleaseBuf; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5b5d5c5d11..8c04492162 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3535,6 +3535,23 @@ static int32_t copyGetResultBlock(SSDataBlock* dest, TSKEY start, TSKEY end) { return appendDataToSpecialBlock(dest, &start, &end, NULL, NULL, NULL); } +static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + int64_t* gpIdCol = (int64_t*)pGpIdCol->pData; + code = pInfo->stateStore.streamStateDeleteParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i]); + QUERY_CHECK_CODE(code, lino, _end); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; +} + static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { // NOTE: this operator does never check if current status is done or not int32_t code = TSDB_CODE_SUCCESS; @@ -3774,6 +3791,15 @@ FETCH_NEXT_BLOCK: prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; } break; + case STREAM_DELETE_GROUP_DATA: { + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete group recv", + GET_TASKID(pTaskInfo)); + code = setBlockGroupIdByUid(pInfo, pBlock); + QUERY_CHECK_CODE(code, lino, _end); + + deletePartName(pInfo, pBlock); + pBlock->info.type = STREAM_DELETE_DATA; + } break; case STREAM_CHECKPOINT: { qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK"); } break; diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index d313acc61d..6a10b21c53 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -223,6 +223,7 @@ int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGro // parname cf int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]); int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal); +int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId); void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 09f4e95376..65746b3100 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -4432,6 +4432,12 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi return code; } +int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) { + int code = 0; + STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId); + return code; +} + int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { int code = 0; STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 794fc346bf..ff52e1cb54 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -525,6 +525,14 @@ _end: return code; } +int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) { + int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t)); + qTrace("%s at line %d res %d", __func__, __LINE__, code); + code = streamStateDeleteParName_rocksdb(pState, groupId); + qTrace("%s at line %d res %d", __func__, __LINE__, code); + return TSDB_CODE_SUCCESS; +} + void streamStateDestroy(SStreamState* pState, bool remove) { streamFileStateDestroy(pState->pFileState); // streamStateDestroy_rocksdb(pState, remove);