Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

This commit is contained in:
Haojun Liao 2023-08-25 19:21:00 +08:00
commit d8a6ec024a
2 changed files with 17 additions and 9 deletions

View File

@ -96,7 +96,7 @@ int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader) {
int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
int32_t code = 0;
const void* pKey = NULL;
const void* pVal = NULL;
void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
SDecoder decoder;
@ -110,9 +110,15 @@ int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData) {
NextTbl:
except = 0;
for (;;) {
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
const void* tVal = NULL;
int32_t tLen = 0;
if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &tVal, &tLen)) {
except = 1;
break;
} else {
pVal = taosMemoryCalloc(1, tLen);
memcpy(pVal, tVal, tLen);
vLen = tLen;
}
tdbTbcMoveToNext(pReader->pCur);
break;
@ -144,6 +150,7 @@ NextTbl:
pHdr->type = pPair->type;
pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen);
taosMemoryFree(pVal);
tqDebug("vgId:%d, vnode stream-task snapshot read data vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);

View File

@ -1727,17 +1727,17 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
rocksdb_readoptions_t** readOpt) {
int idx = streamStateGetCfIdx(pState, cfKeyName);
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
*readOpt = rOpt;
*readOpt = rocksdb_readoptions_create();
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb);
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0);
rocksdb_readoptions_set_snapshot(*readOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(*readOpt, 0);
}
return rocksdb_create_iterator_cf(wrapper->rocksdb, rOpt, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]);
return rocksdb_create_iterator_cf(wrapper->rocksdb, *readOpt,
((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]);
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
@ -2806,8 +2806,9 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
char buf[128] = {0};
int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV);