Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
This commit is contained in:
commit
5b29a5ec97
|
@ -3816,14 +3816,13 @@ void* decodeSSessionKey(void* buf, SSessionKey* key) {
|
||||||
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) {
|
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedBool(buf, key->isOutput);
|
tlen += taosEncodeFixedBool(buf, key->isOutput);
|
||||||
tlen += taosEncodeBinary(buf, key->pOutputBuf, outLen);
|
|
||||||
tlen += encodeSSessionKey(buf, &key->sessionWin);
|
tlen += encodeSSessionKey(buf, &key->sessionWin);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) {
|
||||||
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
buf = taosDecodeFixedBool(buf, &key->isOutput);
|
||||||
buf = taosDecodeBinary(buf, &key->pOutputBuf, outLen);
|
key->pOutputBuf = NULL;
|
||||||
buf = decodeSSessionKey(buf, &key->sessionWin);
|
buf = decodeSSessionKey(buf, &key->sessionWin);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -210,6 +210,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
||||||
qError("failed to start stream backend at %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
qError("failed to start stream backend at %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
taosMkDir(state);
|
taosMkDir(state);
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(chkp);
|
||||||
}
|
}
|
||||||
*dst = state;
|
*dst = state;
|
||||||
|
|
||||||
|
@ -1323,8 +1324,8 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKeyName,
|
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKeyName, rocksdb_snapshot_t** snapshot,
|
||||||
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt) {
|
rocksdb_readoptions_t** readOpt) {
|
||||||
int idx = streamStateGetCfIdx(pState, cfKeyName);
|
int idx = streamStateGetCfIdx(pState, cfKeyName);
|
||||||
|
|
||||||
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
||||||
|
|
Loading…
Reference in New Issue