add backend

This commit is contained in:
yihaoDeng 2023-03-26 05:46:48 +00:00
parent b9531fc9a7
commit c3d7f888e9
3 changed files with 183 additions and 93 deletions

View File

@ -158,6 +158,7 @@ int main(int argc, char const *argv[]) {
} }
rocksdb_write(db, wOpt, wBatch, &err); rocksdb_write(db, wOpt, wBatch, &err);
} }
{
{ {
char buf[128] = {0}; char buf[128] = {0};
KV kv = {.k1 = 0, .k2 = 0}; KV kv = {.k1 = 0, .k2 = 0};
@ -178,11 +179,43 @@ int main(int argc, char const *argv[]) {
rocksdb_iter_next(iter); rocksdb_iter_next(iter);
} }
rocksdb_iter_destroy(iter); rocksdb_iter_destroy(iter);
printf("iterator count %d\n", i); }
{
char buf[128] = {0};
KV kv = {.k1 = 0, .k2 = 0};
int len = kvSerial(&kv, buf);
rocksdb_iterator_t *iter = rocksdb_create_iterator_cf(db, rOpt, cfHandle[1]);
rocksdb_iter_seek(iter, buf, len);
if (!rocksdb_iter_valid(iter)) {
printf("invalid iter");
}
{
char buf[128] = {0};
KV kv = {.k1 = 100, .k2 = 0};
int len = kvSerial(&kv, buf);
rocksdb_iterator_t *iter = rocksdb_create_iterator_cf(db, rOpt, cfHandle[1]);
rocksdb_iter_seek(iter, buf, len);
if (!rocksdb_iter_valid(iter)) {
printf("invalid iter\n");
rocksdb_iter_seek_for_prev(iter, buf, len);
if (!rocksdb_iter_valid(iter)) {
printf("stay invalid iter\n");
} else {
size_t klen = 0, vlen = 0;
const char *key = rocksdb_iter_key(iter, &klen);
const char *value = rocksdb_iter_value(iter, &vlen);
KV kv;
kvDeserial(&kv, (char *)key);
printf("kv1: %d\t kv2: %d, len:%d, value = %s\n", (int)(kv.k1), (int)(kv.k2), (int)(klen), value);
}
}
}
}
} }
char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err); // char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err);
printf("Get value %s, and len = %d\n", v, (int)vlen); // printf("Get value %s, and len = %d\n", v, (int)vlen);
rocksdb_column_family_handle_destroy(cfHandle[0]); rocksdb_column_family_handle_destroy(cfHandle[0]);
rocksdb_column_family_handle_destroy(cfHandle[1]); rocksdb_column_family_handle_destroy(cfHandle[1]);

View File

@ -398,7 +398,7 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void*
int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) {
// todo refactor // todo refactor
qWarn("streamStateReleaseBuf"); qDebug("streamStateReleaseBuf");
if (!pVal) { if (!pVal) {
return 0; return 0;
} }
@ -667,6 +667,7 @@ void streamStateFreeCur(SStreamStateCur* pCur) {
if (!pCur) { if (!pCur) {
return; return;
} }
qDebug("streamStateFreeCur");
rocksdb_iter_destroy(pCur->iter); rocksdb_iter_destroy(pCur->iter);
tdbTbcClose(pCur->pCur); tdbTbcClose(pCur->pCur);
taosMemoryFree(pCur); taosMemoryFree(pCur);

View File

@ -375,7 +375,7 @@ int streamGetInit(const char* funcName) {
qWarn("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ qWarn("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
code = -1; \ code = -1; \
} else { \ } else { \
qWarn("streamState str:%s succ to write to %s", toString, funcname); \ qDebug("streamState str:%s succ to write to %s", toString, funcname); \
} \ } \
} while (0); } while (0);
@ -409,7 +409,7 @@ int streamGetInit(const char* funcName) {
qWarn("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ qWarn("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \
code = -1; \ code = -1; \
} else { \ } else { \
if (code == 0) qWarn("streamState str: %s succ to read from %s", toString, funcname); \ if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \
} \ } \
} while (0); } while (0);
@ -435,7 +435,7 @@ int streamGetInit(const char* funcName) {
taosMemoryFree(err); \ taosMemoryFree(err); \
code = -1; \ code = -1; \
} else { \ } else { \
qWarn("streamState str: %s succ to del from %s", toString, funcname); \ qDebug("streamState str: %s succ to del from %s", toString, funcname); \
} \ } \
} while (0); } while (0);
@ -498,7 +498,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
// todo refactor // todo refactor
int32_t streamStateClear_rocksdb(SStreamState* pState) { int32_t streamStateClear_rocksdb(SStreamState* pState) {
qWarn("streamStateClear_rocksdb"); qDebug("streamStateClear_rocksdb");
SWinKey key = {.ts = 0, .groupId = 0}; SWinKey key = {.ts = 0, .groupId = 0};
// batch clear later // batch clear later
streamStatePut_rocksdb(pState, &key, NULL, 0); streamStatePut_rocksdb(pState, &key, NULL, 0);
@ -525,7 +525,7 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
return code; return code;
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qWarn("streamStateSessionSeekKeyCurrentPrev_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
@ -534,12 +534,23 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
pCur->iter = pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int len = stateSessionKeyEncode(&sKey, buf);
char toString[128] = {0};
stateSessionKeyToString(&sKey, toString);
// qWarn("streamState seek key %s", toString);
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
}
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
size_t klen; size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
@ -548,10 +559,15 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur;
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
qWarn("streamState failed to seek key prev %s", toString);
streamStateFreeCur(pCur);
return NULL;
}
return pCur; return pCur;
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qWarn("streamStateSessionSeekKeyCurrentNext_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
@ -563,13 +579,16 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
char buf[128] = {0}; char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
stateSessionKeyEncode(&sKey, buf); int len = stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey)); rocksdb_iter_seek(pCur->iter, (const char*)buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
}
size_t klen; size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0}; SStateSessionKey curKey = {0};
@ -577,11 +596,15 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur; if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur;
rocksdb_iter_next(pCur->iter); rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur; return pCur;
} }
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qWarn("streamStateSessionSeekKeyNext_rocksdb"); qDebug("streamStateSessionSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
@ -590,13 +613,17 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]);
pCur->number = pState->number; pCur->number = pState->number;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0}; char buf[128] = {0};
stateSessionKeyEncode(&sKey, buf); int len = stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey)); rocksdb_iter_seek(pCur->iter, (const char*)buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
}
size_t klen; size_t klen;
const char* iKey = rocksdb_iter_key(pCur->iter, &klen); const char* iKey = rocksdb_iter_key(pCur->iter, &klen);
SStateSessionKey curKey = {0}; SStateSessionKey curKey = {0};
@ -604,11 +631,15 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur; if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur;
rocksdb_iter_next(pCur->iter); rocksdb_iter_next(pCur->iter);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
return pCur; return pCur;
} }
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
qWarn("streamStateAddIfNotExist_rocksdb"); qDebug("streamStateAddIfNotExist_rocksdb");
int32_t size = *pVLen; int32_t size = *pVLen;
if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) { if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) {
return 0; return 0;
@ -618,43 +649,39 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke
return 0; return 0;
} }
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
qWarn("streamStateGetCur_rocksdb-->1");
pCur->iter = pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]); rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]);
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0}; char buf[128] = {0};
stateKeyEncode((void*)&sKey, buf); int len = stateKeyEncode((void*)&sKey, buf);
char sKeyStr[128] = {0}; char sKeyStr[128] = {0};
stateKeyToString(&sKey, sKeyStr); stateKeyToString(&sKey, sKeyStr);
rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); rocksdb_iter_seek(pCur->iter, buf, len);
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter)) {
qWarn("streamStateGetCur_rocksdb-->2");
SStateKey curKey; SStateKey curKey;
size_t kLen = 0; size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
stateKeyDecode((void*)&curKey, keyStr); stateKeyDecode((void*)&curKey, keyStr);
char tKeyStr[128] = {0};
stateKeyToString(&curKey, tKeyStr);
qWarn("streamStateGetCur_rocksdb-->src:%s, dst:%s", sKeyStr, tKeyStr); // char tKeyStr[128] = {0};
// stateKeyToString(&curKey, tKeyStr);
// qWarn("streamStateGetCur_rocksdb-->src:%s, dst:%s", sKeyStr, tKeyStr);
if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) {
pCur->number = pState->number; pCur->number = pState->number;
return pCur; return pCur;
} }
qWarn("streamStateGetCur_rocksdb-->3");
} }
qWarn("streamStateGetCur_rocksdb-->4");
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateFillGetCur_rocksdb"); qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
@ -662,8 +689,15 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
pCur->iter = pCur->iter =
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
char buf[128] = {0}; char buf[128] = {0};
winKeyDecode((void*)key, buf); int len = winKeyDecode((void*)key, buf);
rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
}
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter)) {
size_t kLen; size_t kLen;
SWinKey curKey; SWinKey curKey;
@ -678,7 +712,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
return NULL; return NULL;
} }
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
qWarn("streamStateGetAndCheckCur_rocksdb"); qDebug("streamStateGetAndCheckCur_rocksdb");
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
if (pCur) { if (pCur) {
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
@ -688,29 +722,25 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey
return NULL; return NULL;
} }
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qWarn("streamStateGetKVByCur_rocksdb"); qDebug("streamStateGetKVByCur_rocksdb");
if (!pCur) return -1; if (!pCur) return -1;
SStateKey tkey; SStateKey tkey;
SStateKey* pKtmp = &tkey; SStateKey* pKtmp = &tkey;
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter)) {
qWarn("streamStateGetKVByCur_rocksdb-2");
size_t tlen; size_t tlen;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen);
stateKeyDecode((void*)pKtmp, keyStr); stateKeyDecode((void*)pKtmp, keyStr);
if (pKtmp->opNum != pCur->number) { if (pKtmp->opNum != pCur->number) {
qWarn("streamStateGetKVByCur_rocksdb-3");
return -1; return -1;
} }
qWarn("streamStateGetKVByCur_rocksdb-4");
*pKey = pKtmp->key; *pKey = pKtmp->key;
return 0; return 0;
} }
qWarn("streamStateGetKVByCur_rocksdb-5");
return -1; return -1;
} }
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qWarn("streamStateFillGetKVByCur_rocksdb"); qDebug("streamStateFillGetKVByCur_rocksdb");
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
@ -727,7 +757,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
return 0; return 0;
} }
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
qWarn("streamStateGetGroupKVByCur_rocksdb"); qDebug("streamStateGetGroupKVByCur_rocksdb");
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
@ -742,7 +772,7 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
return -1; return -1;
} }
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
qWarn("streamStateGetFirst_rocksdb"); qDebug("streamStateGetFirst_rocksdb");
SWinKey tmp = {.ts = 0, .groupId = 0}; SWinKey tmp = {.ts = 0, .groupId = 0};
streamStatePut_rocksdb(pState, &tmp, NULL, 0); streamStatePut_rocksdb(pState, &tmp, NULL, 0);
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
@ -752,7 +782,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
return code; return code;
} }
int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
qWarn("streamStateSessionGetKVByCur_rocksdb"); qDebug("streamStateSessionGetKVByCur_rocksdb");
if (!pCur) { if (!pCur) {
return -1; return -1;
} }
@ -767,7 +797,9 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
SStateSessionKey* pKTmp = &ktmp; SStateSessionKey* pKTmp = &ktmp;
const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen);
if (pVal != NULL) *pVal = (char*)val; if (pVal != NULL) {
*pVal = (char*)val;
}
if (pVLen != NULL) *pVLen = vLen; if (pVLen != NULL) *pVLen = vLen;
if (pKTmp->opNum != pCur->number) { if (pKTmp->opNum != pCur->number) {
@ -781,7 +813,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
} }
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateSeekKeyNext_rocksdb"); qDebug("streamStateSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
@ -792,8 +824,16 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
SStateKey sKey = {.key = *key, .opNum = pState->number}; SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0}; char buf[128] = {0};
stateKeyEncode((void*)&sKey, buf); int len = stateKeyEncode((void*)&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur);
return NULL;
}
}
if (rocksdb_iter_valid(pCur->iter)) { if (rocksdb_iter_valid(pCur->iter)) {
SStateKey curKey; SStateKey curKey;
size_t kLen; size_t kLen;
@ -809,7 +849,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
return NULL; return NULL;
} }
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateFillSeekKeyNext_rocksdb"); qDebug("streamStateFillSeekKeyNext_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (!pCur) { if (!pCur) {
return NULL; return NULL;
@ -818,12 +858,15 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
char buf[128] = {0}; char buf[128] = {0};
winKeyEncode((void*)key, buf); int len = winKeyEncode((void*)key, buf);
rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
}
{ {
SWinKey curKey; SWinKey curKey;
size_t kLen = 0; size_t kLen = 0;
@ -839,7 +882,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
return NULL; return NULL;
} }
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qWarn("streamStateFillSeekKeyPrev_rocksdb"); qDebug("streamStateFillSeekKeyPrev_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
@ -848,12 +891,16 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]);
char buf[128] = {0}; char buf[128] = {0};
winKeyEncode((void*)key, buf); int len = winKeyEncode((void*)key, buf);
rocksdb_iter_seek(pCur->iter, buf, sizeof(*key));
rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
}
{ {
SWinKey curKey; SWinKey curKey;
@ -870,7 +917,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
return NULL; return NULL;
} }
int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
qWarn("streamStateCurPrev_rocksdb"); qDebug("streamStateCurPrev_rocksdb");
if (!pCur) return -1; if (!pCur) return -1;
rocksdb_iter_prev(pCur->iter); rocksdb_iter_prev(pCur->iter);
@ -884,7 +931,7 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur)
return 0; return 0;
} }
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qWarn("streamStateSessionGetKeyByRange_rocksdb"); qDebug("streamStateSessionGetKeyByRange_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return -1; return -1;
@ -896,12 +943,15 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0; int32_t c = 0;
char buf[128] = {0}; char buf[128] = {0};
stateSessionKeyEncode(&sKey, buf); int len = stateSessionKeyEncode(&sKey, buf);
rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); rocksdb_iter_seek(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) {
rocksdb_iter_seek_for_prev(pCur->iter, buf, len);
if (!rocksdb_iter_valid(pCur->iter)) { if (!rocksdb_iter_valid(pCur->iter)) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return -1; return -1;
} }
}
int32_t kLen; int32_t kLen;
const char* iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen); const char* iKeyStr = rocksdb_iter_key(pCur->iter, (size_t*)&kLen);
@ -941,18 +991,21 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
} }
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
qWarn("streamStateSessionGet_rocksdb"); qDebug("streamStateSessionGet_rocksdb");
int code = 0; int code = 0;
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key);
SSessionKey resKey = *key; SSessionKey resKey = *key;
void* tmp = NULL; void* tmp = NULL;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, pVLen); int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0) { if (code == 0) {
if (pVLen != NULL) *pVLen = vLen;
if (key->win.skey != resKey.win.skey) { if (key->win.skey != resKey.win.skey) {
code = -1; code = -1;
} else { } else {
*key = resKey; *key = resKey;
*pVal = taosMemoryMalloc(*pVLen); *pVal = taosMemoryCalloc(1, *pVLen);
memcpy(*pVal, tmp, *pVLen); memcpy(*pVal, tmp, *pVLen);
} }
} }
@ -969,7 +1022,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
} }
int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen) { int32_t* pVLen) {
qWarn("streamStateSessionAddIfNotExist_rocksdb"); qDebug("streamStateSessionAddIfNotExist_rocksdb");
// todo refactor // todo refactor
int32_t res = 0; int32_t res = 0;
SSessionKey originKey = *key; SSessionKey originKey = *key;
@ -981,7 +1034,10 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
void* tmp = taosMemoryMalloc(valSize); void* tmp = taosMemoryMalloc(valSize);
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key);
if (pCur == NULL) {
}
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen);
if (code == 0) { if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
@ -1016,7 +1072,7 @@ _end:
} }
int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData,
int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
qWarn("streamStateStateAddIfNotExist_rocksdb"); qDebug("streamStateStateAddIfNotExist_rocksdb");
// todo refactor // todo refactor
int32_t res = 0; int32_t res = 0;
SSessionKey tmpKey = *key; SSessionKey tmpKey = *key;
@ -1072,7 +1128,7 @@ _end:
} }
int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
qWarn("streamStateSessionClear_rocksdb"); qDebug("streamStateSessionClear_rocksdb");
SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0};
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);