From e1046015aecd99a513693510a7e176147d1b5878 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 28 Mar 2023 02:25:11 +0000 Subject: [PATCH] add backend --- source/libs/executor/src/executorimpl.c | 6 +-- source/libs/stream/src/streamStateRocksdb.c | 42 ++++++++++++++------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5c0ca4d31a..a28de52fab 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2709,7 +2709,7 @@ int32_t getOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow** pResult, } int32_t streamStateAddIfNotExist2(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - qWarn("streamStateAddIfNotExist"); + // qWarn("streamStateAddIfNotExist"); char* tVal = NULL; int32_t size = 0; int32_t code = streamStateGet(pState, key, (void**)&tVal, &size); @@ -2803,8 +2803,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); SResultRowEntryInfo* pEnryInfo = pCtx[j].resultInfo; - qWarn("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, pEnryInfo->isNullRes, - pEnryInfo->numOfRes); + qDebug("initd:%d, complete:%d, null:%d, res:%d", pEnryInfo->initialized, pEnryInfo->complete, + pEnryInfo->isNullRes, pEnryInfo->numOfRes); if (pCtx[j].fpSet.finalize) { int32_t code1 = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code1)) { diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 8e6557254d..03c2473aa2 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -451,6 +451,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)&len, &err); \ if (val == NULL) { \ qWarn("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ + if (err != NULL) taosMemoryFree(err); \ code = -1; \ } else { \ if (pVal != NULL) *pVal = val; \ @@ -551,20 +552,35 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); - SWinKey key = {.ts = 0, .groupId = 0}; - // batch clear later - streamStatePut_rocksdb(pState, &key, NULL, 0); - while (1) { - SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &key); - SWinKey delKey = {0}; - int32_t code = streamStateGetKVByCur_rocksdb(pCur, &delKey, NULL, 0); - streamStateFreeCur(pCur); - if (code == 0) { - streamStateDel_rocksdb(pState, &delKey); - } else { - break; - } + + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = UINT64_MAX, .groupId = INT64_MAX}, .opNum = pState->number}; + char sKeyStr[128] = {0}; + char eKeyStr[128] = {0}; + + int sLen = stateKeyEncode(&sKey, sKeyStr); + int eLen = stateKeyEncode(&sKey, eKeyStr); + + char* err = NULL; + rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[0], + sKeyStr, sLen, eKeyStr, eLen, &err); + if (err != NULL) { + qWarn("failed to delete range cf(default)"); } + + // batch clear later + // streamStatePut_rocksdb(pState, &key, NULL, 0); + // while (1) { + // SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &key); + // SWinKey delKey = {0}; + // int32_t code = streamStateGetKVByCur_rocksdb(pCur, &delKey, NULL, 0); + // streamStateFreeCur(pCur); + // if (code == 0) { + // streamStateDel_rocksdb(pState, &delKey); + // } else { + // break; + // } + // } return 0; }