diff --git a/contrib/test/rocksdb/main.c b/contrib/test/rocksdb/main.c index 8b8317f635..33652386d7 100644 --- a/contrib/test/rocksdb/main.c +++ b/contrib/test/rocksdb/main.c @@ -159,30 +159,63 @@ int main(int argc, char const *argv[]) { rocksdb_write(db, wOpt, wBatch, &err); } { - char buf[128] = {0}; - KV kv = {.k1 = 0, .k2 = 0}; - kvSerial(&kv, buf); - char *v = rocksdb_get_cf(db, rOpt, cfHandle[1], buf, sizeof(kv), &vlen, &err); - printf("Get value %s, and len = %d, xxxx\n", v, (int)vlen); - rocksdb_iterator_t *iter = rocksdb_create_iterator_cf(db, rOpt, cfHandle[1]); - rocksdb_iter_seek_to_first(iter); - int i = 0; - while (rocksdb_iter_valid(iter)) { - size_t klen, vlen; - const char *key = rocksdb_iter_key(iter, &klen); - const char *value = rocksdb_iter_value(iter, &vlen); - KV kv; - kvDeserial(&kv, (char *)key); - printf("kv1: %d\t kv2: %d, len:%d, value = %s\n", (int)(kv.k1), (int)(kv.k2), (int)(klen), value); - i++; - rocksdb_iter_next(iter); + { + char buf[128] = {0}; + KV kv = {.k1 = 0, .k2 = 0}; + kvSerial(&kv, buf); + char *v = rocksdb_get_cf(db, rOpt, cfHandle[1], buf, sizeof(kv), &vlen, &err); + printf("Get value %s, and len = %d, xxxx\n", v, (int)vlen); + rocksdb_iterator_t *iter = rocksdb_create_iterator_cf(db, rOpt, cfHandle[1]); + rocksdb_iter_seek_to_first(iter); + int i = 0; + while (rocksdb_iter_valid(iter)) { + size_t klen, vlen; + const char *key = rocksdb_iter_key(iter, &klen); + const char *value = rocksdb_iter_value(iter, &vlen); + KV kv; + kvDeserial(&kv, (char *)key); + printf("kv1: %d\t kv2: %d, len:%d, value = %s\n", (int)(kv.k1), (int)(kv.k2), (int)(klen), value); + i++; + rocksdb_iter_next(iter); + } + rocksdb_iter_destroy(iter); + } + { + char buf[128] = {0}; + KV kv = {.k1 = 0, .k2 = 0}; + int len = kvSerial(&kv, buf); + rocksdb_iterator_t *iter = rocksdb_create_iterator_cf(db, rOpt, cfHandle[1]); + rocksdb_iter_seek(iter, buf, len); + if (!rocksdb_iter_valid(iter)) { + printf("invalid iter"); + } + { + char buf[128] = {0}; + KV kv = {.k1 = 100, .k2 = 0}; + int len = kvSerial(&kv, buf); + + rocksdb_iterator_t *iter = rocksdb_create_iterator_cf(db, rOpt, cfHandle[1]); + rocksdb_iter_seek(iter, buf, len); + if (!rocksdb_iter_valid(iter)) { + printf("invalid iter\n"); + rocksdb_iter_seek_for_prev(iter, buf, len); + if (!rocksdb_iter_valid(iter)) { + printf("stay invalid iter\n"); + } else { + size_t klen = 0, vlen = 0; + const char *key = rocksdb_iter_key(iter, &klen); + const char *value = rocksdb_iter_value(iter, &vlen); + KV kv; + kvDeserial(&kv, (char *)key); + printf("kv1: %d\t kv2: %d, len:%d, value = %s\n", (int)(kv.k1), (int)(kv.k2), (int)(klen), value); + } + } + } } - rocksdb_iter_destroy(iter); - printf("iterator count %d\n", i); } - char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err); - printf("Get value %s, and len = %d\n", v, (int)vlen); + // char *v = rocksdb_get_cf(db, rOpt, cfHandle[0], "key", strlen("key"), &vlen, &err); + // printf("Get value %s, and len = %d\n", v, (int)vlen); rocksdb_column_family_handle_destroy(cfHandle[0]); rocksdb_column_family_handle_destroy(cfHandle[1]); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 5776c3aa57..2c715b4adb 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -398,7 +398,7 @@ int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void* int32_t streamStateReleaseBuf(SStreamState* pState, const SWinKey* key, void* pVal) { // todo refactor - qWarn("streamStateReleaseBuf"); + qDebug("streamStateReleaseBuf"); if (!pVal) { return 0; } @@ -667,6 +667,7 @@ void streamStateFreeCur(SStreamStateCur* pCur) { if (!pCur) { return; } + qDebug("streamStateFreeCur"); rocksdb_iter_destroy(pCur->iter); tdbTbcClose(pCur->pCur); taosMemoryFree(pCur); diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 87cb3676be..2a26a9d648 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -375,7 +375,7 @@ int streamGetInit(const char* funcName) { qWarn("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ code = -1; \ } else { \ - qWarn("streamState str:%s succ to write to %s", toString, funcname); \ + qDebug("streamState str:%s succ to write to %s", toString, funcname); \ } \ } while (0); @@ -409,7 +409,7 @@ int streamGetInit(const char* funcName) { qWarn("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ code = -1; \ } else { \ - if (code == 0) qWarn("streamState str: %s succ to read from %s", toString, funcname); \ + if (code == 0) qDebug("streamState str: %s succ to read from %s", toString, funcname); \ } \ } while (0); @@ -435,7 +435,7 @@ int streamGetInit(const char* funcName) { taosMemoryFree(err); \ code = -1; \ } else { \ - qWarn("streamState str: %s succ to del from %s", toString, funcname); \ + qDebug("streamState str: %s succ to del from %s", toString, funcname); \ } \ } while (0); @@ -498,7 +498,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { // todo refactor int32_t streamStateClear_rocksdb(SStreamState* pState) { - qWarn("streamStateClear_rocksdb"); + qDebug("streamStateClear_rocksdb"); SWinKey key = {.ts = 0, .groupId = 0}; // batch clear later streamStatePut_rocksdb(pState, &key, NULL, 0); @@ -525,7 +525,7 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k return code; } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { - qWarn("streamStateSessionSeekKeyCurrentPrev_rocksdb"); + qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -534,12 +534,23 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta pCur->iter = rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); + + char buf[128] = {0}; + SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + int len = stateSessionKeyEncode(&sKey, buf); + char toString[128] = {0}; + stateSessionKeyToString(&sKey, toString); + // qWarn("streamState seek key %s", toString); + + rocksdb_iter_seek(pCur->iter, buf, len); if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } } - SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); @@ -548,10 +559,15 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) >= 0) return pCur; rocksdb_iter_prev(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + qWarn("streamState failed to seek key prev %s", toString); + streamStateFreeCur(pCur); + return NULL; + } return pCur; } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { - qWarn("streamStateSessionSeekKeyCurrentNext_rocksdb"); + qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -563,12 +579,15 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta char buf[128] = {0}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - stateSessionKeyEncode(&sKey, buf); + int len = stateSessionKeyEncode(&sKey, buf); - rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey)); + rocksdb_iter_seek(pCur->iter, (const char*)buf, len); if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } } size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); @@ -577,11 +596,15 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) <= 0) return pCur; rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } return pCur; } SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { - qWarn("streamStateSessionSeekKeyNext_rocksdb"); + qDebug("streamStateSessionSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -590,12 +613,16 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[2]); pCur->number = pState->number; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; - char buf[128] = {0}; - stateSessionKeyEncode(&sKey, buf); - rocksdb_iter_seek(pCur->iter, (const char*)buf, sizeof(sKey)); + + char buf[128] = {0}; + int len = stateSessionKeyEncode(&sKey, buf); + rocksdb_iter_seek(pCur->iter, (const char*)buf, len); if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } } size_t klen; const char* iKey = rocksdb_iter_key(pCur->iter, &klen); @@ -604,11 +631,15 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con if (stateSessionKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) < 0) return pCur; rocksdb_iter_next(pCur->iter); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } return pCur; } int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { - qWarn("streamStateAddIfNotExist_rocksdb"); + qDebug("streamStateAddIfNotExist_rocksdb"); int32_t size = *pVLen; if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) { return 0; @@ -618,43 +649,39 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke return 0; } SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qWarn("streamStateGetCur_rocksdb"); + qDebug("streamStateGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; - qWarn("streamStateGetCur_rocksdb-->1"); pCur->iter = rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[0]); SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; - stateKeyEncode((void*)&sKey, buf); - char sKeyStr[128] = {0}; + int len = stateKeyEncode((void*)&sKey, buf); + char sKeyStr[128] = {0}; stateKeyToString(&sKey, sKeyStr); - rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); + rocksdb_iter_seek(pCur->iter, buf, len); if (rocksdb_iter_valid(pCur->iter)) { - qWarn("streamStateGetCur_rocksdb-->2"); SStateKey curKey; size_t kLen = 0; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen); stateKeyDecode((void*)&curKey, keyStr); - char tKeyStr[128] = {0}; - stateKeyToString(&curKey, tKeyStr); - qWarn("streamStateGetCur_rocksdb-->src:%s, dst:%s", sKeyStr, tKeyStr); + // char tKeyStr[128] = {0}; + // stateKeyToString(&curKey, tKeyStr); + // qWarn("streamStateGetCur_rocksdb-->src:%s, dst:%s", sKeyStr, tKeyStr); if (stateKeyCmpr(&sKey, sizeof(sKey), &curKey, sizeof(curKey)) == 0) { pCur->number = pState->number; return pCur; } - qWarn("streamStateGetCur_rocksdb-->3"); } - qWarn("streamStateGetCur_rocksdb-->4"); streamStateFreeCur(pCur); return NULL; } SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qWarn("streamStateFillGetCur_rocksdb"); + qDebug("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; @@ -662,8 +689,15 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK pCur->iter = rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); char buf[128] = {0}; - winKeyDecode((void*)key, buf); - rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); + int len = winKeyDecode((void*)key, buf); + rocksdb_iter_seek(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + } if (rocksdb_iter_valid(pCur->iter)) { size_t kLen; SWinKey curKey; @@ -678,7 +712,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK return NULL; } SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { - qWarn("streamStateGetAndCheckCur_rocksdb"); + qDebug("streamStateGetAndCheckCur_rocksdb"); SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); if (pCur) { int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); @@ -688,29 +722,25 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey return NULL; } int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qWarn("streamStateGetKVByCur_rocksdb"); + qDebug("streamStateGetKVByCur_rocksdb"); if (!pCur) return -1; SStateKey tkey; SStateKey* pKtmp = &tkey; if (rocksdb_iter_valid(pCur->iter)) { - qWarn("streamStateGetKVByCur_rocksdb-2"); size_t tlen; char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &tlen); stateKeyDecode((void*)pKtmp, keyStr); if (pKtmp->opNum != pCur->number) { - qWarn("streamStateGetKVByCur_rocksdb-3"); return -1; } - qWarn("streamStateGetKVByCur_rocksdb-4"); *pKey = pKtmp->key; return 0; } - qWarn("streamStateGetKVByCur_rocksdb-5"); return -1; } int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qWarn("streamStateFillGetKVByCur_rocksdb"); + qDebug("streamStateFillGetKVByCur_rocksdb"); if (!pCur) { return -1; } @@ -727,7 +757,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, return 0; } int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - qWarn("streamStateGetGroupKVByCur_rocksdb"); + qDebug("streamStateGetGroupKVByCur_rocksdb"); if (!pCur) { return -1; } @@ -742,7 +772,7 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, return -1; } int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { - qWarn("streamStateGetFirst_rocksdb"); + qDebug("streamStateGetFirst_rocksdb"); SWinKey tmp = {.ts = 0, .groupId = 0}; streamStatePut_rocksdb(pState, &tmp, NULL, 0); SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); @@ -752,7 +782,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { return code; } int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { - qWarn("streamStateSessionGetKVByCur_rocksdb"); + qDebug("streamStateSessionGetKVByCur_rocksdb"); if (!pCur) { return -1; } @@ -767,7 +797,9 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* SStateSessionKey* pKTmp = &ktmp; const char* val = rocksdb_iter_value(pCur->iter, (size_t*)&vLen); - if (pVal != NULL) *pVal = (char*)val; + if (pVal != NULL) { + *pVal = (char*)val; + } if (pVLen != NULL) *pVLen = vLen; if (pKTmp->opNum != pCur->number) { @@ -781,7 +813,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* } SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { - qWarn("streamStateSeekKeyNext_rocksdb"); + qDebug("streamStateSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -792,8 +824,16 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin SStateKey sKey = {.key = *key, .opNum = pState->number}; char buf[128] = {0}; - stateKeyEncode((void*)&sKey, buf); - rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); + int len = stateKeyEncode((void*)&sKey, buf); + rocksdb_iter_seek(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } + } + if (rocksdb_iter_valid(pCur->iter)) { SStateKey curKey; size_t kLen; @@ -809,7 +849,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return NULL; } SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { - qWarn("streamStateFillSeekKeyNext_rocksdb"); + qDebug("streamStateFillSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (!pCur) { return NULL; @@ -818,11 +858,14 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); char buf[128] = {0}; - winKeyEncode((void*)key, buf); - rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); + int len = winKeyEncode((void*)key, buf); + rocksdb_iter_seek(pCur->iter, buf, len); if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } } { SWinKey curKey; @@ -839,7 +882,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const return NULL; } SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { - qWarn("streamStateFillSeekKeyPrev_rocksdb"); + qDebug("streamStateFillSeekKeyPrev_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -848,11 +891,15 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, pState->pTdbState->ropts, pState->pTdbState->pHandle[1]); char buf[128] = {0}; - winKeyEncode((void*)key, buf); - rocksdb_iter_seek(pCur->iter, buf, sizeof(*key)); + int len = winKeyEncode((void*)key, buf); + + rocksdb_iter_seek(pCur->iter, buf, len); if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return NULL; + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return NULL; + } } { @@ -870,7 +917,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const return NULL; } int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { - qWarn("streamStateCurPrev_rocksdb"); + qDebug("streamStateCurPrev_rocksdb"); if (!pCur) return -1; rocksdb_iter_prev(pCur->iter); @@ -884,7 +931,7 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) return 0; } int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { - qWarn("streamStateSessionGetKeyByRange_rocksdb"); + qDebug("streamStateSessionGetKeyByRange_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return -1; @@ -896,11 +943,14 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; int32_t c = 0; char buf[128] = {0}; - stateSessionKeyEncode(&sKey, buf); - rocksdb_iter_seek(pCur->iter, buf, sizeof(sKey)); + int len = stateSessionKeyEncode(&sKey, buf); + rocksdb_iter_seek(pCur->iter, buf, len); if (!rocksdb_iter_valid(pCur->iter)) { - streamStateFreeCur(pCur); - return -1; + rocksdb_iter_seek_for_prev(pCur->iter, buf, len); + if (!rocksdb_iter_valid(pCur->iter)) { + streamStateFreeCur(pCur); + return -1; + } } int32_t kLen; @@ -941,18 +991,21 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes } int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { - qWarn("streamStateSessionGet_rocksdb"); + qDebug("streamStateSessionGet_rocksdb"); int code = 0; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); SSessionKey resKey = *key; void* tmp = NULL; - code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, pVLen); + int32_t vLen = 0; + code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); if (code == 0) { + if (pVLen != NULL) *pVLen = vLen; + if (key->win.skey != resKey.win.skey) { code = -1; } else { *key = resKey; - *pVal = taosMemoryMalloc(*pVLen); + *pVal = taosMemoryCalloc(1, *pVLen); memcpy(*pVal, tmp, *pVLen); } } @@ -969,7 +1022,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k } int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) { - qWarn("streamStateSessionAddIfNotExist_rocksdb"); + qDebug("streamStateSessionAddIfNotExist_rocksdb"); // todo refactor int32_t res = 0; SSessionKey originKey = *key; @@ -981,7 +1034,10 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe void* tmp = taosMemoryMalloc(valSize); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); - int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + if (pCur == NULL) { + } + int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, key, pVal, pVLen); + if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, valSize); @@ -1016,7 +1072,7 @@ _end: } int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { - qWarn("streamStateStateAddIfNotExist_rocksdb"); + qDebug("streamStateStateAddIfNotExist_rocksdb"); // todo refactor int32_t res = 0; SSessionKey tmpKey = *key; @@ -1072,7 +1128,7 @@ _end: } int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { - qWarn("streamStateSessionClear_rocksdb"); + qDebug("streamStateSessionClear_rocksdb"); SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);