fix write crash
This commit is contained in:
parent
d0246a80cf
commit
4625dd98f0
|
@ -18,8 +18,17 @@
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
|
|
||||||
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
|
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
|
||||||
//
|
int ret = memcmp(aBuf, bBuf, aLen);
|
||||||
return memcmp(aBuf, bBuf, aLen);
|
if (ret == 0) {
|
||||||
|
if (aLen < bLen)
|
||||||
|
return -1;
|
||||||
|
else if (aLen > bLen)
|
||||||
|
return 1;
|
||||||
|
else
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
int defaultKeyEncode(void* k, char* buf) {
|
int defaultKeyEncode(void* k, char* buf) {
|
||||||
int len = strlen((char*)k);
|
int len = strlen((char*)k);
|
||||||
|
|
|
@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
|
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
|
||||||
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
|
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
|
||||||
streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
|
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
|
||||||
taosMemoryFree(valBuf);
|
taosMemoryFree(valBuf);
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
|
@ -363,14 +363,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
char valBuf[64] = {0};
|
char valBuf[64] = {0};
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
memcpy(keyBuf, taskKey, strlen(taskKey));
|
memcpy(keyBuf, taskKey, strlen(taskKey));
|
||||||
sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
|
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
|
||||||
|
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len);
|
||||||
streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, strlen(valBuf));
|
|
||||||
}
|
}
|
||||||
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamStateDestroyBatch(batch);
|
streamStateDestroyBatch(batch);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
|
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
|
||||||
|
|
Loading…
Reference in New Issue