diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c88971ab75..4326ac250a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3289,13 +3289,31 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)value, vLen); + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "state", &sKey, (void*)dst, size); + + taosMemoryFree(dst); return code; } int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; - STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, pVal, pVLen); + + char* tVal; + size_t tValLen = 0; + STREAM_STATE_GET_ROCKSDB(pState, "state", &sKey, &tVal, &tValLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + taosMemoryFree(tVal); return code; } int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { @@ -3541,14 +3559,31 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* // func cf int32_t streamStateFuncPut_rocksdb(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)value, vLen); + int code = 0; + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "func", key, (void*)dst, size); + taosMemoryFree(dst); + return code; } int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "func", key, pVal, pVLen); - return 0; + int code = 0; + char* tVal = NULL; + size_t tValLen = 0; + STREAM_STATE_GET_ROCKSDB(pState, "func", key, tVal, &tValLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + + taosMemoryFree(tVal); + return code; } int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { int code = 0; @@ -3563,7 +3598,15 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k if (value == NULL || vLen == 0) { stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen); } - STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, dst, size); + taosMemoryFree(dst); + return code; } int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { @@ -3861,13 +3904,30 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, value, vLen); + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "fill", key, dst, size); + + taosMemoryFree(dst); return code; } int32_t streamStateFillGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "fill", key, pVal, pVLen); + + char* tVal; + size_t tValLen = 0; + STREAM_STATE_GET_ROCKSDB(pState, "fill", key, &tVal, &tValLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + taosMemoryFree(tVal); return code; } int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { @@ -4204,21 +4264,44 @@ _end: #ifdef BUILD_NO_CALL // partag cf int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, tag, tagLen); + int code = 0; + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, value, vLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "partag", &groupId, dst, size); + taosMemoryFree(dst); return code; } int32_t streamStateGetParTag_rocksdb(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { - int code = 0; - STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, tagVal, tagLen); + int code = 0; + char* tVal; + size_t tValLen = 0; + STREAM_STATE_GET_ROCKSDB(pState, "partag", &groupId, &tVal, &tValLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)tagVal, (size_t*)tagLen); + taosMemoryFree(tVal); + return code; } #endif // parname cfg int32_t streamStatePutParName_rocksdb(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)tbname, TSDB_TABLE_NAME_LEN); + int code = 0; + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, tbname, TSDB_TABLE_NAME_LEN, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "parname", &groupId, (char*)dst, size); + taosMemoryFree(dst); return code; } int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, void** pVal) { @@ -4229,13 +4312,30 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi } int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen) { - int code = 0; - STREAM_STATE_PUT_ROCKSDB(pState, "default", key, pVal, pVLen); + int code = 0; + char* dst = NULL; + size_t size = 0; + code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, pVal, pVLen, &dst, &size); + if (code != 0) { + return code; + } + STREAM_STATE_PUT_ROCKSDB(pState, "default", key, dst, size); + taosMemoryFree(dst); return code; } int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen) { - int code = 0; + int code = 0; + char* tVal; + size_t tValLen = 0; STREAM_STATE_GET_ROCKSDB(pState, "default", key, pVal, pVLen); + if (code != 0) { + taosMemoryFree(tVal); + return code; + } + + code = (pState->pResultRowStore->resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + taosMemoryFree(tVal); + return code; } int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { @@ -4377,10 +4477,18 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl, void* tmpBuf) { - char buf[128] = {0}; + char buf[128] = {0}; + + char* dst = NULL; + size_t size = 0; + int32_t code = (pState->pResultRowStore->resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); + if (code != 0) { + return code; + } + int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf); char* ttlV = tmpBuf; - int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); + int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; @@ -4389,6 +4497,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb rocksdb_column_family_handle_t* pCf = wrapper->pCf[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); + taosMemoryFree(dst); + if (tmpBuf == NULL) { taosMemoryFree(ttlV); } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index cf5f1b2b91..7237f23671 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -698,7 +698,7 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo int idx = streamStateGetCfIdx(pFileState->pFileStore, pFileState->cfName); - int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64; + int32_t len = (pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 64) * 2; char* buf = taosMemoryCalloc(1, len); if (!buf) { code = terrno;