recoverSnapshot
This commit is contained in:
parent
5adf580764
commit
f7732ddd18
|
@ -690,7 +690,7 @@ int stateKeyDecode(void* k, char* buf) {
|
||||||
int stateKeyToString(void* k, char* buf) {
|
int stateKeyToString(void* k, char* buf) {
|
||||||
SStateKey* key = k;
|
SStateKey* key = k;
|
||||||
int n = 0;
|
int n = 0;
|
||||||
n += sprintf(buf + n, "[groupId:%" PRId64 ",", key->key.groupId);
|
n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId);
|
||||||
n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts);
|
n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts);
|
||||||
n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
|
n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum);
|
||||||
return n;
|
return n;
|
||||||
|
@ -1630,6 +1630,12 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
|
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
|
||||||
|
|
||||||
|
{
|
||||||
|
char tbuf[256] = {0};
|
||||||
|
stateKeyToString((void*)&maxStateKey, tbuf);
|
||||||
|
qDebug("seek to last:%s", tbuf);
|
||||||
|
}
|
||||||
|
|
||||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||||
if (pCur == NULL) return NULL;
|
if (pCur == NULL) return NULL;
|
||||||
|
|
||||||
|
@ -2408,14 +2414,18 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
char* ttlV = tmpBuf;
|
char* ttlV = tmpBuf;
|
||||||
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
|
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
|
||||||
|
|
||||||
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
||||||
|
|
||||||
rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx];
|
rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx];
|
||||||
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
|
||||||
|
|
||||||
if (tmpBuf == NULL) {
|
if (tmpBuf == NULL) {
|
||||||
taosMemoryFree(ttlV);
|
taosMemoryFree(ttlV);
|
||||||
}
|
}
|
||||||
|
{
|
||||||
|
char tbuf[256] = {0};
|
||||||
|
ginitDict[cfIdx].toStrFunc((void*)key, tbuf);
|
||||||
|
qDebug("stream state: %s succ to state", tbuf);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
|
||||||
|
|
Loading…
Reference in New Issue