From 981466a0c7a98f640a406a69467eb4a5eb78c068 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 22 Mar 2023 14:35:24 +0000 Subject: [PATCH] failed to read from --- source/libs/stream/src/streamState.c | 1 + source/libs/stream/src/streamStateRocksdb.c | 68 ++++++++++++++------- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 721d72f7c4..a7862fd0ce 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1038,6 +1038,7 @@ int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVa } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { + qWarn("try to write to cf parname"); #ifdef USE_ROCKSDB return streamStatePutParName_rocksdb(pState, groupId, tbname); #else diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index ab0f5cd6a3..de330e4f59 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -65,9 +65,9 @@ int stateKeyDecode(void* k, char* buf) { int stateKeyToString(void* k, char* buf) { SStateKey* key = k; int n = 0; - n += sprintf(buf + n, "groupId:%" PRId64 " ", key->key.groupId); - n += sprintf(buf + n, "ts:%" PRIi64 " ", key->key.ts); - n += sprintf(buf + n, "opNum:%" PRIi64 " ", key->opNum); + n += sprintf(buf + n, "[groupId:%" PRId64 ",", key->key.groupId); + n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts); + n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum); return n; } @@ -124,10 +124,10 @@ int stateSessionKeyDecode(void* ses, char* buf) { int stateSessionKeyToString(void* k, char* buf) { SStateSessionKey* key = k; int n = 0; - n += sprintf(buf + n, "skey:%" PRIi64 " ", key->key.win.skey); - n += sprintf(buf + n, "ekey:%" PRIi64 " ", key->key.win.ekey); - n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->key.groupId); - n += sprintf(buf + n, "opNum:%" PRIi64 " ", key->opNum); + n += sprintf(buf + n, "[skey:%" PRIi64 ",", key->key.win.skey); + n += sprintf(buf + n, "ekey:%" PRIi64 ",", key->key.win.ekey); + n += sprintf(buf + n, "groupId:%" PRIu64 ",", key->key.groupId); + n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum); return n; } @@ -173,8 +173,8 @@ int winKeyDecode(void* k, char* buf) { int winKeyToString(void* k, char* buf) { SWinKey* key = k; int n = 0; - n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->groupId); - n += sprintf(buf + n, "ts:%" PRIi64 " ", key->ts); + n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId); + n += sprintf(buf + n, "ts:%" PRIi64 "]", key->ts); return n; } /* @@ -222,9 +222,9 @@ int tupleKeyDecode(void* k, char* buf) { int tupleKeyToString(void* k, char* buf) { int n = 0; STupleKey* key = k; - n += sprintf(buf + n, "groupId:%" PRIu64 " ", key->groupId); - n += sprintf(buf + n, "ts:%" PRIi64 " ", key->ts); - n += sprintf(buf + n, "exprIdx:%d ", key->exprIdx); + n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->groupId); + n += sprintf(buf + n, "ts:%" PRIi64 ",", key->ts); + n += sprintf(buf + n, "exprIdx:%d]", key->exprIdx); return n; } @@ -258,7 +258,7 @@ int parKeyDecode(void* k, char* buf) { int parKeyToString(void* k, char* buf) { int64_t* key = k; int n = 0; - n = sprintf(buf + n, "groupId:%" PRIi64 " ", *key); + n = sprintf(buf + n, "[groupId:%" PRIi64 "]", *key); return n; } @@ -360,7 +360,7 @@ int streamGetInit(const char* funcName) { char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ - qWarn("failed to get cf name: %s", funcname); \ + qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ char toString[128] = {0}; \ @@ -372,10 +372,10 @@ int streamGetInit(const char* funcName) { rocksdb_put_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (const char*)value, (size_t)vLen, &err); \ if (err != NULL) { \ taosMemoryFree(err); \ - qWarn("str: %s failed to write to %s, err: %s", toString, funcname, err); \ + qWarn("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ code = -1; \ } else { \ - qWarn("str:%s succ to write to %s", toString, funcname); \ + qWarn("streamState str:%s succ to write to %s", toString, funcname); \ } \ } while (0); @@ -386,7 +386,7 @@ int streamGetInit(const char* funcName) { char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ - qWarn("failed to get cf name: %s", funcname); \ + qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ char toString[128] = {0}; \ @@ -397,17 +397,17 @@ int streamGetInit(const char* funcName) { rocksdb_readoptions_t* opts = pState->pTdbState->ropts; \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), (size_t*)vLen, &err); \ if (val == NULL) { \ - qWarn("str: %s failed to read from %s, err: not exist", toString, funcname); \ + qWarn("streamState str: %s failed to read from %s, err: not exist", toString, funcname); \ code = -1; \ } else { \ *pVal = val; \ } \ if (err != NULL) { \ taosMemoryFree(err); \ - qWarn("str: %s failed to read from %s, err: %s", toString, funcname, err); \ + qWarn("streamState str: %s failed to read from %s, err: %s", toString, funcname, err); \ code = -1; \ } else { \ - if (code == 0) qWarn("str: %s succ to read from %s", toString, funcname); \ + if (code == 0) qWarn("streamState str: %s succ to read from %s", toString, funcname); \ } \ } while (0); @@ -418,7 +418,7 @@ int streamGetInit(const char* funcName) { char* err = NULL; \ int i = streamGetInit(funcname); \ if (i < 0) { \ - qWarn("failed to get cf name: %s", funcname); \ + qWarn("streamState failed to get cf name: %s", funcname); \ return -1; \ } \ char toString[128] = {0}; \ @@ -429,11 +429,11 @@ int streamGetInit(const char* funcName) { rocksdb_writeoptions_t* opts = pState->pTdbState->wopts; \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, sizeof(*key), &err); \ if (err != NULL) { \ - qWarn("str: %s failed to del from %s, err: %s", toString, funcname, err); \ + qWarn("streamState str: %s failed to del from %s, err: %s", toString, funcname, err); \ taosMemoryFree(err); \ code = -1; \ } else { \ - qWarn("str: %s succ to del from %s", toString, funcname); \ + qWarn("streamState str: %s succ to del from %s", toString, funcname); \ } \ } while (0); @@ -485,6 +485,7 @@ int32_t streamStateFillPut_rocksdb(SStreamState* pState, const SWinKey* key, con // todo refactor int32_t streamStateClear_rocksdb(SStreamState* pState) { + qWarn("streamStateClear_rocksdb"); SWinKey key = {.ts = 0, .groupId = 0}; // batch clear later streamStatePut_rocksdb(pState, &key, NULL, 0); @@ -521,6 +522,7 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k return code; } SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { + qWarn("streamStateSessionSeekKeyCurrentPrev_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -550,6 +552,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta return pCur; } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { + qWarn("streamStateSessionSeekKeyCurrentNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -580,6 +583,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta } SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { + qWarn("streamStateSessionSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -610,6 +614,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con } int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + qWarn("streamStateAddIfNotExist_rocksdb"); int32_t size = *pVLen; if (streamStateGet_rocksdb(pState, key, pVal, pVLen) == 0) { return 0; @@ -619,6 +624,7 @@ int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* ke return 0; } SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { + qWarn("streamStateGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; pCur->iter = @@ -643,6 +649,7 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* return NULL; } SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { + qWarn("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; @@ -665,6 +672,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK return NULL; } SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) { + qWarn("streamStateGetAndCheckCur_rocksdb"); SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key); if (pCur) { int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0); @@ -674,6 +682,7 @@ SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey return NULL; } int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qWarn("streamStateGetKVByCur_rocksdb"); if (!pCur) return -1; SStateKey tkey; SStateKey* pKtmp = &tkey; @@ -691,6 +700,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons return -1; } int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qWarn("streamStateFillGetKVByCur_rocksdb"); if (!pCur) { return -1; } @@ -707,6 +717,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, return 0; } int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + qWarn("streamStateGetGroupKVByCur_rocksdb"); if (!pCur) { return -1; } @@ -721,6 +732,7 @@ int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, return -1; } int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { + qWarn("streamStateGetFirst_rocksdb"); SWinKey tmp = {.ts = 0, .groupId = 0}; streamStatePut_rocksdb(pState, &tmp, NULL, 0); SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); @@ -730,6 +742,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { return code; } int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* pKey, void** pVal, int32_t* pVLen) { + qWarn("streamStateSessionGetKVByCur_rocksdb"); if (!pCur) { return -1; } @@ -758,6 +771,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* } SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { + qWarn("streamStateSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -790,6 +804,7 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin return NULL; } SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { + qWarn("streamStateFillSeekKeyNext_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (!pCur) { return NULL; @@ -820,6 +835,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const return NULL; } SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { + qWarn("streamStateFillSeekKeyPrev_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; @@ -851,6 +867,7 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const return NULL; } int32_t streamStateCurPrev_rocksdb(SStreamState* pState, SStreamStateCur* pCur) { + qWarn("streamStateCurPrev_rocksdb"); if (!pCur) return -1; rocksdb_iter_prev(pCur->iter); if (!rocksdb_iter_valid(pCur->iter)) { @@ -869,6 +886,7 @@ int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) return 0; } int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { + qWarn("streamStateSessionGetKeyByRange_rocksdb"); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return -1; @@ -925,6 +943,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes } int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { + qWarn("streamStateSessionGet_rocksdb"); int code = 0; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, key); SSessionKey resKey = *key; @@ -953,6 +972,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k } int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen) { + qWarn("streamStateSessionAddIfNotExist_rocksdb"); // todo refactor int32_t res = 0; SSessionKey originKey = *key; @@ -999,6 +1019,7 @@ _end: } int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { + qWarn("streamStateStateAddIfNotExist_rocksdb"); // todo refactor int32_t res = 0; SSessionKey tmpKey = *key; @@ -1054,6 +1075,7 @@ _end: } int32_t streamStateSessionClear_rocksdb(SStreamState* pState) { + qWarn("streamStateSessionClear_rocksdb"); SSessionKey key = {.win.skey = 0, .win.ekey = 0, .groupId = 0}; SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, &key);