diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9c94799237..7ea80ad8d8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -201,7 +201,7 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { void* key = taosHashGetKey(pIter, NULL); code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter); if (code != 0) { - qError("failed to cvt data"); + stError("failed to cvt data"); goto _EXIT; } @@ -225,11 +225,11 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { if (compatible == STREAM_STATA_COMPATIBLE) { return 0; } else if (compatible == STREAM_STATA_NEED_CONVERT) { - qInfo("stream state need covert backend format"); + stInfo("stream state need covert backend format"); return streamMetaCvtDbFormat(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { - qError( + stError( "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " "manually", tsDataDir); diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 5f19b47e1f..2cb77c60dc 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -17,17 +17,14 @@ #include "query.h" #include "streamBackendRocksdb.h" -#include "taos.h" #include "tcommon.h" -#include "thash.h" #include "tsimplehash.h" - -typedef int (*__session_compare_fn_t) (const SSessionKey* pWin, const void* pDatas, int pos); +typedef int (*__session_compare_fn_t)(const SSessionKey* pWin, const void* pDatas, int pos); int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) { SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos); - SSessionKey* pWin2 = (SSessionKey*) pPos2->pKey; + SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey; return sessionWinKeyCmpr(pWin1, pWin2); } @@ -61,12 +58,12 @@ int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_ int64_t getSessionWindowEndkey(void* data, int32_t index) { SArray* pWinInfos = (SArray*)data; SRowBuffPos** ppos = taosArrayGet(pWinInfos, index); - SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey); + SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey); return pWin->win.ekey; } bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) { - if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) { + if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) { return true; } return false; @@ -80,7 +77,8 @@ static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pW return pNewPos; } -static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey, int32_t index) { +static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey, + int32_t index) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); ASSERT(pNewPos->pRowBuff); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); @@ -97,11 +95,12 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe return pNewPos; } -int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) { - int32_t code = TSDB_CODE_SUCCESS; +int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, + int32_t* pVLen) { + int32_t code = TSDB_CODE_SUCCESS; SSHashObj* pSessionBuff = getRowStateBuff(pFileState); - SArray* pWinStates = NULL; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } else { @@ -146,7 +145,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, if (index + 1 < size) { pPos = taosArrayGetP(pWinStates, index + 1); - if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) { + if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; pPos->beUsed = true; @@ -157,9 +156,9 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, if (index + 1 == 0) { if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) { - void* p = NULL; - void* pFileStore = getStateFileStore(pFileState); - int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); + void* p = NULL; + void* pFileStore = getStateFileStore(pFileState); + int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen); code = code_file; @@ -184,10 +183,10 @@ _end: } int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { - SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SSessionKey* pKey = pPos->pKey; - SArray* pWinStates = NULL; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } else { @@ -202,7 +201,7 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) } // find the first position which is smaller than the pKey - int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare); + int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare); if (index >= 0) { taosArrayInsert(pWinStates, index, &pPos); } else { @@ -218,7 +217,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; - void* pBuff = NULL; + void* pBuff = NULL; int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); if (code != TSDB_CODE_SUCCESS) { return code; @@ -229,16 +228,16 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v return TSDB_CODE_SUCCESS; } -int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) { - SSHashObj* pSessionBuff = (SSHashObj*) pBuff; - SSessionKey* pWinKey = (SSessionKey*) key; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); +int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) { + SSHashObj* pSessionBuff = (SSHashObj*)pBuff; + SSessionKey* pWinKey = (SSessionKey*)key; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); if (!ppBuff) { return TSDB_CODE_SUCCESS; } SArray* pWinStates = (SArray*)(*ppBuff); int32_t size = taosArrayGetSize(pWinStates); - TSKEY gap = 0; + TSKEY gap = 0; int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); if (index >= 0) { SRowBuffPos* pPos = taosArrayGetP(pWinStates, index); @@ -251,15 +250,15 @@ int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) } int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { - SSHashObj* pSessionBuff = getRowStateBuff(pFileState); - SSessionKey* pWinKey = (SSessionKey*) pPos->pKey; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SSessionKey* pWinKey = (SSessionKey*)pPos->pKey; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); if (!ppBuff) { return TSDB_CODE_SUCCESS; } SArray* pWinStates = (SArray*)(*ppBuff); int32_t size = taosArrayGetSize(pWinStates); - TSKEY gap = 0; + TSKEY gap = 0; int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare); if (index >= 0) { SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index); @@ -270,11 +269,12 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP return TSDB_CODE_SUCCESS; } -int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) { +int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, + const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) { SRowBuffPos* pNewPos = NULL; - SSHashObj* pSessionBuff = getRowStateBuff(pFileState); - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); - SArray* pWinStates = NULL; + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + SArray* pWinStates = NULL; if (!ppBuff) { pWinStates = taosArrayInit(16, POINTER_BYTES); tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES); @@ -297,10 +297,10 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream } else { if (size > 0) { SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0); - if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >=0) { + if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) { // pCur is invalid SSessionKey pTmpKey = *pWinKey; - int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen); + int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen); ASSERT(code == TSDB_CODE_FAILED); goto _end; } @@ -320,7 +320,7 @@ void sessionWinStateClear(SStreamFileState* pFileState) { void* pIte = NULL; size_t keyLen = 0; int32_t iter = 0; - void* pBuff = getRowStateBuff(pFileState); + void* pBuff = getRowStateBuff(pFileState); while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { SArray* pWinStates = *((void**)pIte); int32_t size = taosArrayGetSize(pWinStates); @@ -336,7 +336,7 @@ void sessionWinStateCleanup(void* pBuff) { size_t keyLen = 0; int32_t iter = 0; while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) { - SArray* pWinStates = (SArray*) (*(void**)pIte); + SArray* pWinStates = (SArray*)(*(void**)pIte); taosArrayDestroy(pWinStates); } tSimpleHashCleanup(pBuff); @@ -395,11 +395,13 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) pCur->pStreamFileState = pFileState; } -static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) { +static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, + SStreamStateCur** ppCur) { SSessionKey key = {.groupId = groupId}; - int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL); - if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { - if ( !(*ppCur) ) { + int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL); + if (taosArrayGetSize(pWinStates) > 0 && + (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { + if (!(*ppCur)) { (*ppCur) = createStreamStateCursor(); } transformCursor(pFileState, *ppCur); @@ -410,8 +412,8 @@ static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t } SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) { - SArray* pWinStates = NULL; - int32_t index = -1; + SArray* pWinStates = NULL; + int32_t index = -1; SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index); if (pCur) { if (sessionStateKeyCompare(pWinKey, pWinStates, index) > 0) { @@ -427,8 +429,8 @@ SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, } SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) { - SArray* pWinStates = NULL; - int32_t index = -1; + SArray* pWinStates = NULL; + int32_t index = -1; SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index); if (pCur) { sessionWinStateMoveToNext(pCur); @@ -449,7 +451,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState); void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t)); - SArray* pWinStates = NULL; + SArray* pWinStates = NULL; if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } @@ -467,7 +469,8 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void } else { void* pData = NULL; code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen); - if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) { + if (taosArrayGetSize(pWinStates) > 0 && + (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) { transformCursor(pCur->pStreamFileState, pCur); SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex); if (pVal) { @@ -498,9 +501,9 @@ int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey) { SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key); - SSessionKey tmpKey = *key; - int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); - bool hasCurrentPrev = true; + SSessionKey tmpKey = *key; + int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL); + bool hasCurrentPrev = true; if (code == TSDB_CODE_FAILED) { streamStateFreeCur(pCur); pCur = sessionWinStateSeekKeyNext(pFileState, key); @@ -535,13 +538,13 @@ _end: } int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, - state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { + state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { SSessionKey* pWinKey = key; - TSKEY gap = 0; - int32_t code = TSDB_CODE_SUCCESS; - SSHashObj* pSessionBuff = getRowStateBuff(pFileState); - SArray* pWinStates = NULL; - void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); + TSKEY gap = 0; + int32_t code = TSDB_CODE_SUCCESS; + SSHashObj* pSessionBuff = getRowStateBuff(pFileState); + SArray* pWinStates = NULL; + void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); if (ppBuff) { pWinStates = (SArray*)(*ppBuff); } else { @@ -560,7 +563,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); code = code_file; - qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + pWinKey->win.ekey, code_file); } else { (*pVal) = addNewSessionWindow(pFileState, pWinStates, key); code = TSDB_CODE_FAILED; @@ -589,7 +593,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (index + 1 < size) { pPos = taosArrayGetP(pWinStates, index + 1); void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen); - if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) { + if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || + fn(pKeyData, stateKey) == true) { (*pVal) = pPos; SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey; pPos->beUsed = true; @@ -607,7 +612,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); code = code_file; - qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); + qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, + pWinKey->win.ekey, code_file); goto _end; } else { taosMemoryFree(p); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index e29f2ba7de..a69cb75dad 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -15,10 +15,8 @@ #include "streamSnapshot.h" #include "query.h" -#include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "streamInt.h" -#include "tcommon.h" enum SBackendFileType { ROCKSDB_OPTIONS_TYPE = 1, @@ -158,7 +156,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { } sprintf(buf + strlen(buf) - 1, "]"); - qInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, + stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, pSnapFile->snapInfo.taskId, buf); taosMemoryFree(buf); } @@ -203,7 +201,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) { int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { TdDirPtr pDir = taosOpenDir(pSnapFile->path); if (NULL == pDir) { - qError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path); + stError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path); return -1; } @@ -397,7 +395,7 @@ _NEXT: } item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); - qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 + stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d, total set:%d, current set idx: %d", STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx, (int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); @@ -515,7 +513,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); return code; } else { - qInfo("succ to write data %s", pItem->name); + stInfo("succ to write data %s", pItem->name); } pSnapFile->offset += bytes; } else { @@ -538,7 +536,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t } taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); - qInfo("succ to write data %s", pItem->name); + stInfo("succ to write data %s", pItem->name); pSnapFile->offset += pHdr->size; } code = 0; @@ -563,7 +561,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa "checkpoint", snapInfo.chkpId); if (!taosIsDir(path)) { code = taosMulMkDir(path); - qInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path); + stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path); ASSERT(code == 0); } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index f78f6f4df1..d607f26de3 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -#include "query.h" -#include "tdatablock.h" #include "tencode.h" #include "tstreamUpdate.h" #include "ttime.h" @@ -387,8 +385,8 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); - bool res = true; - if ( pMapMaxTs && ts < *pMapMaxTs ) { + bool res = true; + if (pMapMaxTs && ts < *pMapMaxTs) { res = false; } else { taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));