From 0959758bd54ae332a90ef674218ae71fead17c63 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 23 Aug 2023 21:43:10 +0800 Subject: [PATCH] fix transfer error --- source/libs/stream/inc/streamBackendRocksdb.h | 1 + source/libs/stream/src/streamBackendRocksdb.c | 48 ++++++++++++++++++- source/libs/stream/src/streamMeta.c | 3 +- source/libs/stream/src/streamSnapshot.c | 46 +++++++++++++++--- source/libs/transport/inc/transComm.h | 1 - 5 files changed, 90 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 9431327f56..0028efae17 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -137,5 +137,6 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb void* val, int32_t vlen, int64_t ttl, void* tmpBuf); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); +int32_t streamBackendTriggerChkp(void* pMeta, char* dst); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 90619cee44..1ff3878ed2 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -738,6 +738,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { taosMemoryFree(chkpPath); return 0; } + taosArrayClear(pMeta->chkpSaved); TdDirPtr pDir = taosOpenDir(chkpPath); @@ -878,6 +879,49 @@ int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI return 0; } + +int32_t streamBackendTriggerChkp(void* arg, char* dst) { + SStreamMeta* pMeta = arg; + int64_t backendRid = pMeta->streamBackendRid; + int32_t code = -1; + + SArray* refs = taosArrayInit(16, sizeof(int64_t)); + rocksdb_column_family_handle_t** ppCf = NULL; + + int64_t st = taosGetTimestampMs(); + SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); + + if (pHandle == NULL || pHandle->db == NULL) { + goto _ERROR; + } + int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); + qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, dst, nCf); + + code = chkpPreFlushDb(pHandle->db, ppCf, nCf); + if (code == 0) { + code = chkpDoDbCheckpoint(pHandle->db, dst); + if (code != 0) { + qError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst); + } else { + qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst, + taosGetTimestampMs() - st); + } + } else { + qError("stream backend:%p failed to flush db at:%s", pHandle, dst); + } + + // release all ref to cfWrapper; + for (int i = 0; i < taosArrayGetSize(refs); i++) { + int64_t id = *(int64_t*)taosArrayGet(refs, i); + taosReleaseRef(streamBackendCfWrapperId, id); + } + +_ERROR: + taosReleaseRef(streamBackendId, backendRid); + taosArrayDestroy(refs); + return code; +} + int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { SStreamMeta* pMeta = arg; int64_t backendRid = pMeta->streamBackendRid; @@ -902,7 +946,7 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { // Get all cf and acquire cfWrappter int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); - qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, 0); + qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); code = chkpPreFlushDb(pHandle->db, ppCf, nCf); if (code == 0) { @@ -928,6 +972,8 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { // delete obsolte checkpoint delObsoleteCheckpoint(arg, pChkpDir); + + // pMeta->chkpId = checkpointId; } _ERROR: diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e4c133ac1e..9a0f6a03d3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -175,8 +175,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { // return -1; } } - pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); + streamBackendLoadCheckpointInfo(pMeta); + return 0; } diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index ff5ee2ca1b..e7ab4ee9cf 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -16,6 +16,7 @@ #include "streamSnapshot.h" #include "query.h" #include "rocksdb/c.h" +#include "streamBackendRocksdb.h" #include "tcommon.h" enum SBackendFileType { @@ -79,7 +80,7 @@ const char* ROCKSDB_CURRENT = "CURRENT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; static int64_t kBlockSize = 64 * 1024; -int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId); +int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId, void* pMeta); void streamSnapHandleDestroy(SStreamSnapHandle* handle); // static void streamBuildFname(char* path, char* file, char* fullname) @@ -107,19 +108,33 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) { +int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) { // impl later int len = strlen(path); char* tdir = taosMemoryCalloc(1, len + 128); memcpy(tdir, path, len); + int32_t code = 0; + if (chkpId != 0) { sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); + } else { sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); + char* chkpdir = taosMemoryCalloc(1, len + 256); + sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp"); + taosMemoryFree(tdir); + + tdir = chkpdir; + code = streamBackendTriggerChkp(pMeta, tdir); + if (code != 0) { + qError("failed to trigger chekckpoint at %s", tdir); + taosMemoryFree(tdir); + return code; + } } - int32_t code = 0; + qInfo("start to read dir: %s", tdir); TdDirPtr pDir = taosOpenDir(tdir); if (NULL == pDir) { @@ -156,11 +171,25 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk continue; } if (strlen(name) >= strlen(ROCKSDB_SST) && - 0 == strncmp(name - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { + 0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) { char* sst = taosStrdup(name); taosArrayPush(pFile->pSst, &sst); } } + { + char* buf = taosMemoryCalloc(1, 512); + sprintf(buf, "current: %s", pFile->pCurrent); + sprintf(buf + strlen(buf), "MANIFEST: %s", pFile->pMainfest); + sprintf(buf + strlen(buf), "options: %s", pFile->pOptions); + + for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { + char* name = taosArrayGetP(pFile->pSst, i); + sprintf(buf + strlen(buf), "sst: %s", name); + } + qInfo("get file list: %s", buf); + taosMemoryFree(buf); + } + taosCloseDir(&pDir); if (pFile->pCurrent == NULL) { @@ -221,6 +250,12 @@ _err: void streamSnapHandleDestroy(SStreamSnapHandle* handle) { SBanckendFile* pFile = handle->pBackendFile; + + if (handle->checkpointId == 0) { + if (taosIsDir(pFile->path)) { + taosRemoveDir(pFile->path); + } + } if (pFile) { taosMemoryFree(pFile->pCheckpointMeta); taosMemoryFree(pFile->pCurrent); @@ -234,7 +269,6 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { taosArrayDestroy(pFile->pSst); taosMemoryFree(pFile); } - taosArrayDestroy(handle->pFileList); taosCloseFile(&handle->fd); return; @@ -247,7 +281,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa return TSDB_CODE_OUT_OF_MEMORY; } - if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) { + if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId, pMeta) < 0) { taosMemoryFree(pReader); return -1; } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a6b7a20f76..17ef6ce530 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -262,7 +262,6 @@ bool transAsyncPoolIsEmpty(SAsyncPool* pool); #define ASYNC_CHECK_HANDLE(exh1, id) \ do { \ if (id > 0) { \ - tTrace("handle step1"); \ SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \ if (exh2 == NULL || id != exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1:%" PRIu64 ", ref2:%" PRIu64, exh1, \