From 4625dd98f02a6fc206a74ceb39ca374195d12a83 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Apr 2023 07:39:54 +0000 Subject: [PATCH] fix write crash --- source/libs/stream/src/streamStateRocksdb.c | 13 +++++++++++-- source/libs/stream/src/tstreamFileState.c | 9 ++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index fb5ad34d0d..9977732a9a 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -18,8 +18,17 @@ #include "tcommon.h" int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { - // - return memcmp(aBuf, bBuf, aLen); + int ret = memcmp(aBuf, bBuf, aLen); + if (ret == 0) { + if (aLen < bLen) + return -1; + else if (aLen > bLen) + return 1; + else + return 0; + } else { + return ret; + } } int defaultKeyEncode(void* k, char* buf) { int len = strlen((char*)k); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 4c6c1e1afa..eab95d1214 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = 0; sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); - streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); taosMemoryFree(valBuf); } { @@ -363,14 +363,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, char valBuf[64] = {0}; int32_t len = 0; memcpy(keyBuf, taskKey, strlen(taskKey)); - sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); - - streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, strlen(valBuf)); + len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); } streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } - streamStateDestroyBatch(batch); + return code; } int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {