From f66f203caa0ca3ab53c05611e39f21581de57190 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 23 Aug 2023 19:04:11 +0800 Subject: [PATCH 1/4] reload semi session state --- .../executor/src/streamtimewindowoperator.c | 74 +++++++++++++++++-- 1 file changed, 67 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 816f460d15..a6326be62f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1874,6 +1874,31 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* return winNum; } +static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin) { + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + // Just look for the window behind StartIndex + while (1) { + SResultWindowInfo winInfo = {0}; + SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo); + if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) || + !inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) { + taosMemoryFree(winInfo.pOutputBuf); + pAPI->stateStore.streamStateFreeCur(pCur); + break; + } + pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); + doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); + pAPI->stateStore.streamStateFreeCur(pCur); + taosMemoryFree(winInfo.pOutputBuf); + } +} + int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); @@ -2081,6 +2106,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin); if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); continue; } @@ -2089,6 +2115,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin); code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); break; } } @@ -2099,7 +2126,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true); saveResult(parentWin, pStUpdated); + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); } else { + releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore); break; } } @@ -2474,13 +2503,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } void streamSessionReleaseState(SOperatorInfo* pOperator) { - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); - pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, - strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, - resSize); - } + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, + resSize); SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.releaseStreamStateFn) { downstream->fpSet.releaseStreamStateFn(downstream); @@ -2492,6 +2519,33 @@ void resetWinRange(STimeWindow* winRange) { winRange->ekey = INT64_MAX; } +void streamSessionSemiReloadState(SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + resetWinRange(&pAggSup->winRange); + + SResultWindowInfo winInfo = {0}; + int32_t size = 0; + void* pBuf = NULL; + int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + int32_t num = size / sizeof(SSessionKey); + SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf; + ASSERT(size == num * sizeof(SSessionKey)); + for (int32_t i = 0; i < num; i++) { + SResultWindowInfo winInfo = {0}; + setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); + compactSessionSemiWindow(pOperator, &winInfo); + saveSessionOutputBuf(pAggSup, &winInfo); + } + taosMemoryFree(pBuf); + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->fpSet.reloadStreamStateFn) { + downstream->fpSet.reloadStreamStateFn(downstream); + } +} + void streamSessionReloadState(SOperatorInfo* pOperator) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; @@ -2731,6 +2785,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated); tSimpleHashCleanup(pInfo->pStUpdated); pInfo->pStUpdated = NULL; + + if(pInfo->isHistoryOp) { + getMaxTsWins(pInfo->pUpdated, pInfo->historyWins); + } + initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated); pInfo->pUpdated = NULL; blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); @@ -2763,6 +2822,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState); } setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); From 0959758bd54ae332a90ef674218ae71fead17c63 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 23 Aug 2023 21:43:10 +0800 Subject: [PATCH 2/4] fix transfer error --- source/libs/stream/inc/streamBackendRocksdb.h | 1 + source/libs/stream/src/streamBackendRocksdb.c | 48 ++++++++++++++++++- source/libs/stream/src/streamMeta.c | 3 +- source/libs/stream/src/streamSnapshot.c | 46 +++++++++++++++--- source/libs/transport/inc/transComm.h | 1 - 5 files changed, 90 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 9431327f56..0028efae17 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -137,5 +137,6 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb void* val, int32_t vlen, int64_t ttl, void* tmpBuf); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); +int32_t streamBackendTriggerChkp(void* pMeta, char* dst); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 90619cee44..1ff3878ed2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -738,6 +738,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { taosMemoryFree(chkpPath); return 0; } + taosArrayClear(pMeta->chkpSaved); TdDirPtr pDir = taosOpenDir(chkpPath); @@ -878,6 +879,49 @@ int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI return 0; } + +int32_t streamBackendTriggerChkp(void* arg, char* dst) { + SStreamMeta* pMeta = arg; + int64_t backendRid = pMeta->streamBackendRid; + int32_t code = -1; + + SArray* refs = taosArrayInit(16, sizeof(int64_t)); + rocksdb_column_family_handle_t** ppCf = NULL; + + int64_t st = taosGetTimestampMs(); + SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); + + if (pHandle == NULL || pHandle->db == NULL) { + goto _ERROR; + } + int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); + qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, dst, nCf); + + code = chkpPreFlushDb(pHandle->db, ppCf, nCf); + if (code == 0) { + code = chkpDoDbCheckpoint(pHandle->db, dst); + if (code != 0) { + qError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst); + } else { + qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst, + taosGetTimestampMs() - st); + } + } else { + qError("stream backend:%p failed to flush db at:%s", pHandle, dst); + } + + // release all ref to cfWrapper; + for (int i = 0; i < taosArrayGetSize(refs); i++) { + int64_t id = *(int64_t*)taosArrayGet(refs, i); + taosReleaseRef(streamBackendCfWrapperId, id); + } + +_ERROR: + taosReleaseRef(streamBackendId, backendRid); + taosArrayDestroy(refs); + return code; +} + int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { SStreamMeta* pMeta = arg; int64_t backendRid = pMeta->streamBackendRid; @@ -902,7 +946,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { // Get all cf and acquire cfWrappter int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); - qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, 0); + qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); code = chkpPreFlushDb(pHandle->db, ppCf, nCf); if (code == 0) { @@ -928,6 +972,8 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { // delete obsolte checkpoint delObsoleteCheckpoint(arg, pChkpDir); + + // pMeta->chkpId = checkpointId; } _ERROR: diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e4c133ac1e..9a0f6a03d3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -175,8 +175,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { // return -1; } } - pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + streamBackendLoadCheckpointInfo(pMeta); + return 0; } diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index ff5ee2ca1b..e7ab4ee9cf 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -16,6 +16,7 @@ #include "streamSnapshot.h" #include "query.h" #include "rocksdb/c.h" +#include "streamBackendRocksdb.h" #include "tcommon.h" enum SBackendFileType { @@ -79,7 +80,7 @@ const char* ROCKSDB_CURRENT = "CURRENT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; static int64_t kBlockSize = 64 * 1024; -int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId); +int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId, void* pMeta); void streamSnapHandleDestroy(SStreamSnapHandle* handle); // static void streamBuildFname(char* path, char* file, char* fullname) @@ -107,19 +108,33 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) { +int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { // impl later int len = strlen(path); char* tdir = taosMemoryCalloc(1, len + 128); memcpy(tdir, path, len); + int32_t code = 0; + if (chkpId != 0) { sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + } else { sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); + char* chkpdir = taosMemoryCalloc(1, len + 256); + sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp"); + taosMemoryFree(tdir); + + tdir = chkpdir; + code = streamBackendTriggerChkp(pMeta, tdir); + if (code != 0) { + qError("failed to trigger chekckpoint at %s", tdir); + taosMemoryFree(tdir); + return code; + } } - int32_t code = 0; + qInfo("start to read dir: %s", tdir); TdDirPtr pDir = taosOpenDir(tdir); if (NULL == pDir) { @@ -156,11 +171,25 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk continue; } if (strlen(name) >= strlen(ROCKSDB_SST) && - 0 == strncmp(name - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { + 0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { char* sst = taosStrdup(name); taosArrayPush(pFile->pSst, &sst); } } + { + char* buf = taosMemoryCalloc(1, 512); + sprintf(buf, "current: %s", pFile->pCurrent); + sprintf(buf + strlen(buf), "MANIFEST: %s", pFile->pMainfest); + sprintf(buf + strlen(buf), "options: %s", pFile->pOptions); + + for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { + char* name = taosArrayGetP(pFile->pSst, i); + sprintf(buf + strlen(buf), "sst: %s", name); + } + qInfo("get file list: %s", buf); + taosMemoryFree(buf); + } + taosCloseDir(&pDir); if (pFile->pCurrent == NULL) { @@ -221,6 +250,12 @@ _err: void streamSnapHandleDestroy(SStreamSnapHandle* handle) { SBanckendFile* pFile = handle->pBackendFile; + + if (handle->checkpointId == 0) { + if (taosIsDir(pFile->path)) { + taosRemoveDir(pFile->path); + } + } if (pFile) { taosMemoryFree(pFile->pCheckpointMeta); taosMemoryFree(pFile->pCurrent); @@ -234,7 +269,6 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { taosArrayDestroy(pFile->pSst); taosMemoryFree(pFile); } - taosArrayDestroy(handle->pFileList); taosCloseFile(&handle->fd); return; @@ -247,7 +281,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa return TSDB_CODE_OUT_OF_MEMORY; } - if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) { + if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId, pMeta) < 0) { taosMemoryFree(pReader); return -1; } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a6b7a20f76..17ef6ce530 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -262,7 +262,6 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); #define ASYNC_CHECK_HANDLE(exh1, id) \ do { \ if (id > 0) { \ - tTrace("handle step1"); \ SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ if (exh2 == NULL || id != exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \ From 3afea998ac160d6b71d84722cb93539290cc0204 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 24 Aug 2023 10:15:15 +0800 Subject: [PATCH 3/4] refactor log --- include/libs/stream/streamSnapshot.h | 2 + source/dnode/vnode/src/tq/tqStreamStateSnap.c | 22 ++++--- source/libs/stream/src/streamBackendRocksdb.c | 9 +-- source/libs/stream/src/streamSnapshot.c | 66 +++++++++++-------- 4 files changed, 57 insertions(+), 42 deletions(-) diff --git a/include/libs/stream/streamSnapshot.h b/include/libs/stream/streamSnapshot.h index bc5fd63070..15d5f56ffd 100644 --- a/include/libs/stream/streamSnapshot.h +++ b/include/libs/stream/streamSnapshot.h @@ -16,6 +16,8 @@ #define _STREAM_BACKEDN_SNAPSHOT_H_ #include "tcommon.h" +#define STREAM_STATE_TRANSFER "stream-state-transfer" + typedef struct SStreamSnapReader SStreamSnapReader; typedef struct SStreamSnapWriter SStreamSnapWriter; diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index aa1d9b0476..6469045621 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -26,7 +26,7 @@ struct SStreamStateReader { TBC* pCur; SStreamSnapReader* pReaderImpl; - int32_t complete; + int32_t complete; // open reader or not }; int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader) { @@ -60,26 +60,29 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS } pReader->pReaderImpl = pSnapReader; - tqDebug("vgId:%d, vnode stream-state snapshot reader opened", TD_VID(pTq->pVnode)); + tqDebug("vgId:%d, vnode %s snapshot reader opened", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER); *ppReader = pReader; return code; _err: - tqError("vgId:%d, vnode stream-state snapshot reader failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, + tstrerror(code)); *ppReader = NULL; return code; } int32_t streamStateSnapReaderClose(SStreamStateReader* pReader) { int32_t code = 0; - tqDebug("vgId:%d, vnode stream-state snapshot reader closed", TD_VID(pReader->pTq->pVnode)); + tqDebug("vgId:%d, vnode %s snapshot reader closed", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER); streamSnapReaderClose(pReader->pReaderImpl); taosMemoryFree(pReader); return code; } int32_t streamStateSnapRead(SStreamStateReader* pReader, uint8_t** ppData) { + tqDebug("vgId:%d, vnode %s snapshot read data", TD_VID(pReader->pTq->pVnode), STREAM_STATE_TRANSFER); + int32_t code = 0; if (pReader->complete == 0) { return 0; @@ -143,13 +146,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS goto _err; } - tqDebug("vgId:%d, vnode stream-state snapshot writer opened, path:%s", TD_VID(pTq->pVnode), tdir); + tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tdir); pWriter->pWriterImpl = pSnapWriter; *ppWriter = pWriter; return code; _err: - tqError("vgId:%d, vnode stream-state snapshot writer failed to open since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, + tstrerror(code)); taosMemoryFree(pWriter); *ppWriter = NULL; return -1; @@ -157,16 +161,18 @@ _err: int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) { int32_t code = 0; - tqDebug("vgId:%d, vnode stream-state snapshot writer closed", TD_VID(pWriter->pTq->pVnode)); + tqDebug("vgId:%d, vnode %s snapshot writer closed", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); code = streamSnapWriterClose(pWriter->pWriterImpl, rollback); return code; } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { + tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); if (code == 0) { code = streamStateLoadTasks(pWriter); } + tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); taosMemoryFree(pWriter); return code; } @@ -174,6 +180,6 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); } int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { - tqDebug("vgId:%d, vnode stream-state snapshot write", TD_VID(pWriter->pTq->pVnode)); + tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 1ff3878ed2..34d22f9ac2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -792,12 +792,6 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t* if (wrapper->pHandle[i]) { rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; taosArrayPush(pHandle, &p); - // size_t len = 0; - // char* name = rocksdb_column_family_handle_get_name(p, &len); - // char buf[64] = {0}; - // memcpy(buf, name, len); - // qError("column name: name: %s, len: %d", buf, (int)len); - // taosMemoryFree(name); } } taosThreadRwlockUnlock(&wrapper->rwLock); @@ -972,8 +966,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { // delete obsolte checkpoint delObsoleteCheckpoint(arg, pChkpDir); - - // pMeta->chkpId = checkpointId; + pMeta->chkpId = checkpointId; } _ERROR: diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index e7ab4ee9cf..367b45b9eb 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -111,34 +111,45 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { // impl later int len = strlen(path); - char* tdir = taosMemoryCalloc(1, len + 128); + char* tdir = taosMemoryCalloc(1, len + 256); memcpy(tdir, path, len); int32_t code = 0; + int8_t chkpFlag = 0; if (chkpId != 0) { sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + if (taosIsDir(tdir)) { + chkpFlag = 1; + qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir); + } else { + qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir); + } + } - } else { + if (chkpFlag == 0) { sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); char* chkpdir = taosMemoryCalloc(1, len + 256); sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp"); taosMemoryFree(tdir); tdir = chkpdir; + qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tdir); + code = streamBackendTriggerChkp(pMeta, tdir); if (code != 0) { - qError("failed to trigger chekckpoint at %s", tdir); + qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tdir); taosMemoryFree(tdir); return code; } } - qInfo("start to read dir: %s", tdir); + + qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir); TdDirPtr pDir = taosOpenDir(tdir); if (NULL == pDir) { - qError("stream-state failed to open %s", tdir); + qError("%s failed to open %s", STREAM_STATE_TRANSFER, tdir); goto _err; } @@ -178,22 +189,24 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk } { char* buf = taosMemoryCalloc(1, 512); - sprintf(buf, "current: %s", pFile->pCurrent); - sprintf(buf + strlen(buf), "MANIFEST: %s", pFile->pMainfest); - sprintf(buf + strlen(buf), "options: %s", pFile->pOptions); + sprintf(buf, "[current: %s,", pFile->pCurrent); + sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest); + sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions); for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { char* name = taosArrayGetP(pFile->pSst, i); - sprintf(buf + strlen(buf), "sst: %s", name); + sprintf(buf + strlen(buf), "%s,", name); } - qInfo("get file list: %s", buf); + sprintf(buf + strlen(buf) - 1, "]"); + + qInfo("%s get file list: %s", STREAM_STATE_TRANSFER, buf); taosMemoryFree(buf); } taosCloseDir(&pDir); if (pFile->pCurrent == NULL) { - qError("stream-state failed to open %s, reason: no valid file", tdir); + qError("%s failed to open %s, reason: no valid file", STREAM_STATE_TRANSFER, tdir); code = -1; tdir = NULL; goto _err; @@ -313,24 +326,24 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si return 0; } else { pHandle->fd = streamOpenFile(pFile->path, item->name, TD_FILE_READ); - qDebug("stream-state open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", item->name, - (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + qDebug("%s open file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } } - qDebug("stream-state start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", item->name, - (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream-state snap failed to read snap, file name:%s, type:%d,reason:%s", item->name, item->type, - tstrerror(code)); + qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, + item->type, tstrerror(code)); return -1; } else if (nread > 0 && nread <= kBlockSize) { // left bytes less than kBlockSize - qDebug("stream-state read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", item->name, - (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + qDebug("%s read file %s, current offset:%" PRId64 ",size:% " PRId64 ", file no.%d", STREAM_STATE_TRANSFER, + item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); pHandle->offset += nread; if (pHandle->offset >= item->size || nread < kBlockSize) { taosCloseFile(&pHandle->fd); @@ -338,7 +351,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si pHandle->currFileIdx += 1; } } else { - qDebug("stream-state no data read, close file no.%d, move to next file, open and read", pHandle->currFileIdx); + qDebug("%s no data read, close file no.%d, move to next file, open and read", STREAM_STATE_TRANSFER, + pHandle->currFileIdx); taosCloseFile(&pHandle->fd); pHandle->offset = 0; pHandle->currFileIdx += 1; @@ -355,8 +369,8 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset); pHandle->offset += nread; - qDebug("stream-state open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", - item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); + qDebug("%s open file and read file %s, current offset:%" PRId64 ", size:% " PRId64 ", file no.%d", + STREAM_STATE_TRANSFER, item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx); } SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)buf; @@ -411,7 +425,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, + qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, tstrerror(code)); } } @@ -420,7 +434,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); if (bytes != pHdr->size) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream-state failed to write snap, file name:%s, reason:%s", pHdr->name, tstrerror(code)); + qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); return code; } pHandle->offset += bytes; @@ -438,7 +452,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pHandle->fd == NULL) { code = TAOS_SYSTEM_ERROR(terrno); - qError("stream-state failed to open file name:%s%s%s, reason:%s", pFile->path, TD_DIRSEP, pHdr->name, + qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, tstrerror(code)); } @@ -462,7 +476,7 @@ int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { n += sprintf(buf + n, "%s %" PRId64 "]", item->name, item->size); } } - qDebug("stream snap get file list, %s", buf); + qDebug("%s snap get file list, %s", STREAM_STATE_TRANSFER, buf); taosMemoryFree(buf); } From 08553b778892081b1189057146bd3193c7a53758 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 24 Aug 2023 11:18:10 +0800 Subject: [PATCH 4/4] mem leak --- source/libs/executor/src/filloperator.c | 1 + source/libs/executor/src/streamtimewindowoperator.c | 3 +++ 2 files changed, 4 insertions(+) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4cf3b3239d..53fef3b7e3 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -838,6 +838,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; + resetFillWindow(&pFillSup->next); pFillSup->next.key = pFillSup->cur.key; pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillInfo->preRowKey = INT64_MIN; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index a6326be62f..4ead0df159 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1535,6 +1535,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2861,6 +2862,7 @@ void destroyStreamStateOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupGroupResInfo(&pInfo->groupResInfo); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { @@ -2874,6 +2876,7 @@ void destroyStreamStateOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); blockDataDestroy(pInfo->pCheckpointRes); taosMemoryFreeClear(param);