fix compile error

This commit is contained in:
yihaoDeng 2023-07-13 10:38:21 +08:00
parent f3f3893872
commit 0e1a91d134
2 changed files with 8 additions and 10 deletions

View File

@ -1490,6 +1490,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
if (pKtmp->opNum != pCur->number) { if (pKtmp->opNum != pCur->number) {
return -1; return -1;
} }
size_t vlen = 0; size_t vlen = 0;
if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen); if (pVal != NULL) *pVal = (char*)rocksdb_iter_value(pCur->iter, &vlen);
if (pVLen != NULL) *pVLen = vlen; if (pVLen != NULL) *pVLen = vlen;
@ -1555,19 +1556,18 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
char buf[128] = {0}; char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; pCur->db = ((SBackendCfWrapper*)pState->pTdbState->pBackendCfWrapper)->rocksdb;
pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number; pCur->number = pState->number;
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) { while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);

View File

@ -386,7 +386,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
0, buf); 0, buf);
// todo handle failure // todo handle failure
memset(buf, 0, len); memset(buf, 0, len);
// qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); // qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
} }
taosMemoryFree(buf); taosMemoryFree(buf);
@ -397,8 +397,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStateClearBatch(batch); streamStateClearBatch(batch);
int64_t elapsed = taosGetTimestampMs() - st; int64_t elapsed = taosGetTimestampMs() - st;
qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%"PRId64"ms", pFileState->id, numOfElems, qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
BATCH_LIMIT, elapsed); pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
if (flushState) { if (flushState) {
const char* taskKey = "streamFileState"; const char* taskKey = "streamFileState";
@ -488,8 +488,6 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
: pFileState->maxTs - pFileState->deleteMark; : pFileState->maxTs - pFileState->deleteMark;
deleteExpiredCheckPoint(pFileState, mark); deleteExpiredCheckPoint(pFileState, mark);
} }
void* pStVal = NULL;
int32_t len = 0;
SWinKey key = {.groupId = 0, .ts = 0}; SWinKey key = {.groupId = 0, .ts = 0};
SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key); SStreamStateCur* pCur = streamStateSeekToLast_rocksdb(pFileState->pFileStore, &key);