diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 9bfec5577c..b89664a6c1 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -19,10 +19,7 @@ #include "rocksdb/c.h" //#include "streamInt.h" #include "streamState.h" -#include "tcoding.h" #include "tcommon.h" -#include "tcompare.h" -#include "ttimer.h" typedef struct SCfComparator { rocksdb_comparator_t** comp; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a5a4bd4dc1..4b2a9e4a65 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -14,8 +14,6 @@ */ #include "streamBackendRocksdb.h" -#include "executor.h" -#include "query.h" #include "streamInt.h" #include "tcommon.h" #include "tref.h" @@ -1059,14 +1057,14 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { rocksdb_column_family_handle_t** ppCf = NULL; int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf); - qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf); + stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf); if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) { if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); } else { - qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, - taosGetTimestampMs() - st); + stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, + taosGetTimestampMs() - st); } } else { stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir); @@ -1797,7 +1795,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) { cfNames = NULL; } - qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb); + stDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb); return pTaskDb; _EXIT: @@ -1826,7 +1824,7 @@ void taskDbDestroy(void* pDb, bool flush) { streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr); - qDebug("succ to destroy stream backend:%p", wrapper); + stDebug("succ to destroy stream backend:%p", wrapper); int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -2323,7 +2321,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err); taosMemoryFree(err); } else { - qDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName); + stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName); wrapper->pCf[idx] = cf; } } @@ -2365,14 +2363,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* err = NULL; \ int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ + stWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ wrapper->dataWritten += 1; \ char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ @@ -2385,54 +2383,54 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe taosMemoryFree(err); \ code = -1; \ } else { \ - qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \ - ttlVLen, wrapper); \ + stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \ + ttlVLen, wrapper); \ } \ taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->db; \ - rocksdb_readoptions_t* opts = wrapper->readOpt; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL || len == 0) { \ - if (err == NULL) { \ - qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ - } else { \ - stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ - taosMemoryFreeClear(err); \ - } \ - code = -1; \ - } else { \ - char* p = NULL; \ - int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ - if (tlen <= 0) { \ - stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ - funcname); \ - code = -1; \ - } else { \ - qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, \ - wrapper); \ - } \ - taosMemoryFree(val); \ - if (vLen != NULL) *vLen = tlen; \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + stWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ + if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->db; \ + rocksdb_readoptions_t* opts = wrapper->readOpt; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL || len == 0) { \ + if (err == NULL) { \ + stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ + } else { \ + stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + taosMemoryFreeClear(err); \ + } \ + code = -1; \ + } else { \ + char* p = NULL; \ + int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (tlen <= 0) { \ + stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ + funcname); \ + code = -1; \ + } else { \ + stTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, \ + tlen, wrapper); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = tlen; \ + } \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -2442,14 +2440,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* err = NULL; \ int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ - qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ code = -1; \ break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ wrapper->dataWritten += 1; \ char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ rocksdb_t* db = wrapper->db; \ @@ -2460,7 +2458,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe taosMemoryFree(err); \ code = -1; \ } else { \ - qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ + stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ } \ } while (0); @@ -2681,7 +2679,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { } #ifdef BUILD_NO_CALL SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateGetCur_rocksdb"); + stDebug("streamStateGetCur_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); @@ -2775,7 +2773,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k } SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { - qDebug("streamStateSessionSeekToLast_rocksdb"); + stDebug("streamStateSessionSeekToLast_rocksdb"); int32_t code = 0; @@ -2812,7 +2810,7 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { } int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) { - qDebug("streamStateCurPrev_rocksdb"); + stDebug("streamStateCurPrev_rocksdb"); if (!pCur) return -1; rocksdb_iter_prev(pCur->iter); @@ -2862,7 +2860,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta return pCur; } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { - qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); + stDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { @@ -2900,7 +2898,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta } SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { - qDebug("streamStateSessionSeekKeyNext_rocksdb"); + stDebug("streamStateSessionSeekKeyNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { @@ -3004,7 +3002,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { } SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillGetCur_rocksdb"); + stDebug("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = createStreamStateCursor(); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; @@ -3064,7 +3062,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, } SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillSeekKeyNext_rocksdb"); + stDebug("streamStateFillSeekKeyNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); if (!pCur) { @@ -3102,7 +3100,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const return NULL; } SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillSeekKeyPrev_rocksdb"); + stDebug("streamStateFillSeekKeyPrev_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) {