add backend

This commit is contained in:
yihaoDeng 2023-03-26 12:06:06 +00:00
parent ffd272597c
commit 874b6916aa
1 changed files with 31 additions and 30 deletions

View File

@ -682,37 +682,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
streamStateFreeCur(pCur);
return NULL;
}
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
char buf[128] = {0};
int len = winKeyDecode((void*)key, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
}
if (rocksdb_iter_valid(pCur->iter)) {
size_t kLen;
SWinKey curKey;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
winKeyDecode((void*)&curKey, keyStr);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
return pCur;
}
}
streamStateFreeCur(pCur);
return NULL;
}
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
qDebug("streamStateGetAndCheckCur_rocksdb");
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
@ -741,6 +711,37 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
}
return -1;
}
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
}
if (rocksdb_iter_valid(pCur->iter)) {
size_t kLen;
SWinKey curKey;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
winKeyDecode((void*)&curKey, keyStr);
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) == 0) {
return pCur;
}
}
streamStateFreeCur(pCur);
return NULL;
}
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qDebug("streamStateFillGetKVByCur_rocksdb");
if (!pCur) {