support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-07 07:20:05 +00:00
parent 4ca39d0f3c
commit 7ef4df8752
1 changed files with 24 additions and 30 deletions

View File

@ -408,6 +408,7 @@ int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
int64_t tc = 0;
int32_t sz = taosArrayGetSize(pMeta->checkpointSaved);
if (sz <= 0) {
taosWUnLockLatch(&pMeta->checkpointDirLock);
return -1;
} else {
tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved);
@ -623,9 +624,9 @@ int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf,
return ret;
}
}
int streamStateValueIsStale(char* vv) {
int streamStateValueIsStale(char* v) {
int64_t ts = 0;
taosDecodeFixedI64(vv, &ts);
taosDecodeFixedI64(v, &ts);
return (ts != 0 && ts < taosGetTimestampMs()) ? 1 : 0;
}
int iterValueIsStale(rocksdb_iterator_t* iter) {
@ -956,33 +957,23 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
SStreamValue key = {0};
char* p = value;
if (streamStateValueIsStale(p)) {
if (dest != NULL) *dest = NULL;
return -1;
goto _EXCEPT;
}
p = taosDecodeFixedI64(p, &key.unixTimestamp);
p = taosDecodeFixedI32(p, &key.len);
if (vlen != (sizeof(int64_t) + sizeof(int32_t) + key.len)) {
if (dest != NULL) *dest = NULL;
qError("vlen: %d, read len: %d", vlen, key.len);
return -1;
goto _EXCEPT;
}
if (key.len != 0 && dest != NULL) p = taosDecodeBinary(p, (void**)dest, key.len);
if (key.len == 0) {
key.data = NULL;
} else {
p = taosDecodeBinary(p, (void**)&(key.data), key.len);
}
if (ttl != NULL) {
int64_t now = taosGetTimestampMs();
*ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - now;
}
if (dest != NULL) {
*dest = key.data;
} else {
taosMemoryFree(key.data);
}
if (ttl != NULL) *ttl = key.unixTimestamp == 0 ? 0 : key.unixTimestamp - taosGetTimestampMs();
return key.len;
_EXCEPT:
if (dest != NULL) *dest = NULL;
if (ttl != NULL) *ttl = 0;
return -1;
}
const char* compareDefaultName(void* arg) {
@ -1096,9 +1087,10 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
} else {
qDebug("succ to open rocksdb cf");
}
// close default cf
// close default cf and destroy default cfOpts
if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]);
rocksdb_options_destroy(cfOpts[0]);
handle->db = db;
static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
@ -2354,9 +2346,7 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
SStreamStateCur* pCur = iter;
bool val = rocksdb_iter_valid(pCur->iter);
return val ? 1 : 0;
return rocksdb_iter_valid(pCur->iter) ? 1 : 0;
}
void streamDefaultIterSeek_rocksdb(void* iter, const char* key) {
SStreamStateCur* pCur = iter;
@ -2372,13 +2362,16 @@ char* streamDefaultIterKey_rocksdb(void* iter, int32_t* len) {
}
char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
SStreamStateCur* pCur = iter;
int32_t vlen = 0;
char* dst = NULL;
const char* vval = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
if (decodeValueFunc((void*)vval, vlen, NULL, &dst) < 0) {
char* ret = NULL;
int32_t vlen = 0;
const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vlen);
*len = decodeValueFunc((void*)val, vlen, NULL, &ret);
if (*len < 0) {
return NULL;
}
return dst;
return ret;
}
// batch func
void* streamStateCreateBatch() {
@ -2433,6 +2426,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
if (tmpBuf == NULL) {
taosMemoryFree(ttlV);
}
{
char tbuf[256] = {0};
ginitDict[cfIdx].toStrFunc((void*)key, tbuf);