diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 9bfec5577c..b89664a6c1 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -19,10 +19,7 @@ #include "rocksdb/c.h" //#include "streamInt.h" #include "streamState.h" -#include "tcoding.h" #include "tcommon.h" -#include "tcompare.h" -#include "ttimer.h" typedef struct SCfComparator { rocksdb_comparator_t** comp; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index a5a4bd4dc1..4b2a9e4a65 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -14,8 +14,6 @@ */ #include "streamBackendRocksdb.h" -#include "executor.h" -#include "query.h" #include "streamInt.h" #include "tcommon.h" #include "tref.h" @@ -1059,14 +1057,14 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { rocksdb_column_family_handle_t** ppCf = NULL; int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf); - qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf); + stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf); if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) { if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) { stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir); } else { - qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, - taosGetTimestampMs() - st); + stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir, + taosGetTimestampMs() - st); } } else { stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir); @@ -1797,7 +1795,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) { cfNames = NULL; } - qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb); + stDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb); return pTaskDb; _EXIT: @@ -1826,7 +1824,7 @@ void taskDbDestroy(void* pDb, bool flush) { streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr); - qDebug("succ to destroy stream backend:%p", wrapper); + stDebug("succ to destroy stream backend:%p", wrapper); int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]); @@ -2323,7 +2321,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) { stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err); taosMemoryFree(err); } else { - qDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName); + stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName); wrapper->pCf[idx] = cf; } } @@ -2365,14 +2363,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* err = NULL; \ int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ + stWarn("streamState failed to get cf name: %s", funcname); \ code = -1; \ break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ wrapper->dataWritten += 1; \ char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ @@ -2385,54 +2383,54 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe taosMemoryFree(err); \ code = -1; \ } else { \ - qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \ - ttlVLen, wrapper); \ + stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \ + ttlVLen, wrapper); \ } \ taosMemoryFree(ttlV); \ } while (0); -#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - qWarn("streamState failed to get cf name: %s", funcname); \ - code = -1; \ - break; \ - } \ - STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ - rocksdb_t* db = wrapper->db; \ - rocksdb_readoptions_t* opts = wrapper->readOpt; \ - size_t len = 0; \ - char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ - if (val == NULL || len == 0) { \ - if (err == NULL) { \ - qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ - } else { \ - stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ - taosMemoryFreeClear(err); \ - } \ - code = -1; \ - } else { \ - char* p = NULL; \ - int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ - if (tlen <= 0) { \ - stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ - funcname); \ - code = -1; \ - } else { \ - qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, \ - wrapper); \ - } \ - taosMemoryFree(val); \ - if (vLen != NULL) *vLen = tlen; \ - } \ +#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + stWarn("streamState failed to get cf name: %s", funcname); \ + code = -1; \ + break; \ + } \ + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + char toString[128] = {0}; \ + if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ + rocksdb_t* db = wrapper->db; \ + rocksdb_readoptions_t* opts = wrapper->readOpt; \ + size_t len = 0; \ + char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ + if (val == NULL || len == 0) { \ + if (err == NULL) { \ + stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \ + } else { \ + stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \ + taosMemoryFreeClear(err); \ + } \ + code = -1; \ + } else { \ + char* p = NULL; \ + int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ + if (tlen <= 0) { \ + stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \ + funcname); \ + code = -1; \ + } else { \ + stTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, \ + tlen, wrapper); \ + } \ + taosMemoryFree(val); \ + if (vLen != NULL) *vLen = tlen; \ + } \ } while (0); #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ @@ -2442,14 +2440,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe char* err = NULL; \ int i = streamStateGetCfIdx(pState, funcname); \ if (i < 0) { \ - qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ + stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \ code = -1; \ break; \ } \ STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ wrapper->dataWritten += 1; \ char toString[128] = {0}; \ - if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ + if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ rocksdb_t* db = wrapper->db; \ @@ -2460,7 +2458,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe taosMemoryFree(err); \ code = -1; \ } else { \ - qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ + stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \ } \ } while (0); @@ -2681,7 +2679,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) { } #ifdef BUILD_NO_CALL SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateGetCur_rocksdb"); + stDebug("streamStateGetCur_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); @@ -2775,7 +2773,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k } SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { - qDebug("streamStateSessionSeekToLast_rocksdb"); + stDebug("streamStateSessionSeekToLast_rocksdb"); int32_t code = 0; @@ -2812,7 +2810,7 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { } int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) { - qDebug("streamStateCurPrev_rocksdb"); + stDebug("streamStateCurPrev_rocksdb"); if (!pCur) return -1; rocksdb_iter_prev(pCur->iter); @@ -2862,7 +2860,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta return pCur; } SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { - qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); + stDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { @@ -2900,7 +2898,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta } SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { - qDebug("streamStateSessionSeekKeyNext_rocksdb"); + stDebug("streamStateSessionSeekKeyNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { @@ -3004,7 +3002,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) { } SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillGetCur_rocksdb"); + stDebug("streamStateFillGetCur_rocksdb"); SStreamStateCur* pCur = createStreamStateCursor(); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; @@ -3064,7 +3062,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, } SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillSeekKeyNext_rocksdb"); + stDebug("streamStateFillSeekKeyNext_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); if (!pCur) { @@ -3102,7 +3100,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const return NULL; } SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { - qDebug("streamStateFillSeekKeyPrev_rocksdb"); + stDebug("streamStateFillSeekKeyPrev_rocksdb"); STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; SStreamStateCur* pCur = createStreamStateCursor(); if (pCur == NULL) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9c94799237..7ea80ad8d8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -201,7 +201,7 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { void* key = taosHashGetKey(pIter, NULL); code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter); if (code != 0) { - qError("failed to cvt data"); + stError("failed to cvt data"); goto _EXIT; } @@ -225,11 +225,11 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { if (compatible == STREAM_STATA_COMPATIBLE) { return 0; } else if (compatible == STREAM_STATA_NEED_CONVERT) { - qInfo("stream state need covert backend format"); + stInfo("stream state need covert backend format"); return streamMetaCvtDbFormat(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { - qError( + stError( "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " "manually", tsDataDir); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 5f19b47e1f..2cb77c60dc 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -17,17 +17,14 @@ #include "query.h" #include "streamBackendRocksdb.h" -#include "taos.h" #include "tcommon.h" -#include "thash.h" #include "tsimplehash.h" - -typedef int (*__session_compare_fn_t) (const SSessionKey* pWin, const void* pDatas, int pos); +typedef int (*__session_compare_fn_t)(const SSessionKey* pWin, const void* pDatas, int pos); int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) { SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos); - SSessionKey* pWin2 = (SSessionKey*) pPos2->pKey; + SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey; return sessionWinKeyCmpr(pWin1, pWin2); } @@ -61,12 +58,12 @@ int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_ int64_t getSessionWindowEndkey(void* data, int32_t index) { SArray* pWinInfos = (SArray*)data; SRowBuffPos** ppos = taosArrayGet(pWinInfos, index); - SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey); + SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey); return pWin->win.ekey; } bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) { - if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) { + if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) { return true; } return false; @@ -80,7 +77,8 @@ static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pW return pNewPos; } -static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey, int32_t index) { +static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey, + int32_t index) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); ASSERT(pNewPos->pRowBuff); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); @@ -97,11 +95,12 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe return pNewPos; } -int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) { - int32_t code = TSDB_CODE_SUCCESS; +int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, + int32_t* pVLen) { + int32_t code = TSDB_CODE_SUCCESS; SSHashObj* pSessionBuff = getRowStateBuff(pFileState); - SArray* pWinStates = NULL; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } else { @@ -146,7 +145,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, if (index + 1 < size) { pPos = taosArrayGetP(pWinStates, index + 1); - if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) { + if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; pPos->beUsed = true; @@ -157,9 +156,9 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, if (index + 1 == 0) { if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) { - void* p = NULL; - void* pFileStore = getStateFileStore(pFileState); - int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); + void* p = NULL; + void* pFileStore = getStateFileStore(pFileState); + int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen); code = code_file; @@ -184,10 +183,10 @@ _end: } int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { - SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SSessionKey* pKey = pPos->pKey; - SArray* pWinStates = NULL; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } else { @@ -202,7 +201,7 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) } // find the first position which is smaller than the pKey - int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare); + int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare); if (index >= 0) { taosArrayInsert(pWinStates, index, &pPos); } else { @@ -218,7 +217,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; - void* pBuff = NULL; + void* pBuff = NULL; int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); if (code != TSDB_CODE_SUCCESS) { return code; @@ -229,16 +228,16 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v return TSDB_CODE_SUCCESS; } -int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) { - SSHashObj* pSessionBuff = (SSHashObj*) pBuff; - SSessionKey* pWinKey = (SSessionKey*) key; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); +int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) { + SSHashObj* pSessionBuff = (SSHashObj*)pBuff; + SSessionKey* pWinKey = (SSessionKey*)key; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); if (!ppBuff) { return TSDB_CODE_SUCCESS; } SArray* pWinStates = (SArray*)(*ppBuff); int32_t size = taosArrayGetSize(pWinStates); - TSKEY gap = 0; + TSKEY gap = 0; int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); if (index >= 0) { SRowBuffPos* pPos = taosArrayGetP(pWinStates, index); @@ -251,15 +250,15 @@ int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) } int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { - SSHashObj* pSessionBuff = getRowStateBuff(pFileState); - SSessionKey* pWinKey = (SSessionKey*) pPos->pKey; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SSessionKey* pWinKey = (SSessionKey*)pPos->pKey; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); if (!ppBuff) { return TSDB_CODE_SUCCESS; } SArray* pWinStates = (SArray*)(*ppBuff); int32_t size = taosArrayGetSize(pWinStates); - TSKEY gap = 0; + TSKEY gap = 0; int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); if (index >= 0) { SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index); @@ -270,11 +269,12 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP return TSDB_CODE_SUCCESS; } -int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) { +int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, + const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) { SRowBuffPos* pNewPos = NULL; - SSHashObj* pSessionBuff = getRowStateBuff(pFileState); - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); - SArray* pWinStates = NULL; + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + SArray* pWinStates = NULL; if (!ppBuff) { pWinStates = taosArrayInit(16, POINTER_BYTES); tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); @@ -297,10 +297,10 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream } else { if (size > 0) { SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0); - if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >=0) { + if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) { // pCur is invalid SSessionKey pTmpKey = *pWinKey; - int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen); + int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen); ASSERT(code == TSDB_CODE_FAILED); goto _end; } @@ -320,7 +320,7 @@ void sessionWinStateClear(SStreamFileState* pFileState) { void* pIte = NULL; size_t keyLen = 0; int32_t iter = 0; - void* pBuff = getRowStateBuff(pFileState); + void* pBuff = getRowStateBuff(pFileState); while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { SArray* pWinStates = *((void**)pIte); int32_t size = taosArrayGetSize(pWinStates); @@ -336,7 +336,7 @@ void sessionWinStateCleanup(void* pBuff) { size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { - SArray* pWinStates = (SArray*) (*(void**)pIte); + SArray* pWinStates = (SArray*)(*(void**)pIte); taosArrayDestroy(pWinStates); } tSimpleHashCleanup(pBuff); @@ -395,11 +395,13 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) pCur->pStreamFileState = pFileState; } -static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) { +static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, + SStreamStateCur** ppCur) { SSessionKey key = {.groupId = groupId}; - int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL); - if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { - if ( !(*ppCur) ) { + int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL); + if (taosArrayGetSize(pWinStates) > 0 && + (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { + if (!(*ppCur)) { (*ppCur) = createStreamStateCursor(); } transformCursor(pFileState, *ppCur); @@ -410,8 +412,8 @@ static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t } SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) { - SArray* pWinStates = NULL; - int32_t index = -1; + SArray* pWinStates = NULL; + int32_t index = -1; SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index); if (pCur) { if (sessionStateKeyCompare(pWinKey, pWinStates, index) > 0) { @@ -427,8 +429,8 @@ SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, } SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) { - SArray* pWinStates = NULL; - int32_t index = -1; + SArray* pWinStates = NULL; + int32_t index = -1; SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index); if (pCur) { sessionWinStateMoveToNext(pCur); @@ -449,7 +451,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState); void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); - SArray* pWinStates = NULL; + SArray* pWinStates = NULL; if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } @@ -467,7 +469,8 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void } else { void* pData = NULL; code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen); - if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) { + if (taosArrayGetSize(pWinStates) > 0 && + (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) { transformCursor(pCur->pStreamFileState, pCur); SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex); if (pVal) { @@ -498,9 +501,9 @@ int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey) { SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key); - SSessionKey tmpKey = *key; - int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); - bool hasCurrentPrev = true; + SSessionKey tmpKey = *key; + int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); + bool hasCurrentPrev = true; if (code == TSDB_CODE_FAILED) { streamStateFreeCur(pCur); pCur = sessionWinStateSeekKeyNext(pFileState, key); @@ -535,13 +538,13 @@ _end: } int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, - state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { + state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { SSessionKey* pWinKey = key; - TSKEY gap = 0; - int32_t code = TSDB_CODE_SUCCESS; - SSHashObj* pSessionBuff = getRowStateBuff(pFileState); - SArray* pWinStates = NULL; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + TSKEY gap = 0; + int32_t code = TSDB_CODE_SUCCESS; + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } else { @@ -560,7 +563,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); code = code_file; - qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + pWinKey->win.ekey, code_file); } else { (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); code = TSDB_CODE_FAILED; @@ -589,7 +593,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (index + 1 < size) { pPos = taosArrayGetP(pWinStates, index + 1); void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen); - if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) { + if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || + fn(pKeyData, stateKey) == true) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; pPos->beUsed = true; @@ -607,7 +612,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); code = code_file; - qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + pWinKey->win.ekey, code_file); goto _end; } else { taosMemoryFree(p); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index e29f2ba7de..a69cb75dad 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -15,10 +15,8 @@ #include "streamSnapshot.h" #include "query.h" -#include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "streamInt.h" -#include "tcommon.h" enum SBackendFileType { ROCKSDB_OPTIONS_TYPE = 1, @@ -158,7 +156,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { } sprintf(buf + strlen(buf) - 1, "]"); - qInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, + stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, pSnapFile->snapInfo.taskId, buf); taosMemoryFree(buf); } @@ -203,7 +201,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) { int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { TdDirPtr pDir = taosOpenDir(pSnapFile->path); if (NULL == pDir) { - qError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path); + stError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path); return -1; } @@ -397,7 +395,7 @@ _NEXT: } item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); - qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 + stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d, total set:%d, current set idx: %d", STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx, (int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); @@ -515,7 +513,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); return code; } else { - qInfo("succ to write data %s", pItem->name); + stInfo("succ to write data %s", pItem->name); } pSnapFile->offset += bytes; } else { @@ -538,7 +536,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t } taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); - qInfo("succ to write data %s", pItem->name); + stInfo("succ to write data %s", pItem->name); pSnapFile->offset += pHdr->size; } code = 0; @@ -563,7 +561,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa "checkpoint", snapInfo.chkpId); if (!taosIsDir(path)) { code = taosMulMkDir(path); - qInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path); + stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path); ASSERT(code == 0); } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index f78f6f4df1..d607f26de3 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -#include "query.h" -#include "tdatablock.h" #include "tencode.h" #include "tstreamUpdate.h" #include "ttime.h" @@ -387,8 +385,8 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); - bool res = true; - if ( pMapMaxTs && ts < *pMapMaxTs ) { + bool res = true; + if (pMapMaxTs && ts < *pMapMaxTs) { res = false; } else { taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));