From e49b9b2276bb7b64b961d85015e70974daeaf2c5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 18 Oct 2023 11:34:11 +0800 Subject: [PATCH] fix transfer crash --- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 10 +++---- source/libs/stream/src/streamBackendRocksdb.c | 26 +++++++++++++++++-- source/libs/stream/src/streamSnapshot.c | 11 +++++++- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index b7c440dfa5..885d12a6aa 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -167,6 +167,11 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) return code; } + +int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { + tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); + return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); +} int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); int32_t code = streamStateLoadTasks(pWriter); @@ -178,8 +183,3 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta); } - -int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { - tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); - return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); -} diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 8aa6878b10..d1ddd5c81a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -492,8 +492,30 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** } } else { - qError("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, - tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); + qInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, + tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); + taosMkDir(defaultPath); + } + taosMemoryFree(chkpPath); + } else { + char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256); + sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", + (int64_t)-1); + qInfo("no chkp id specified, try to restart from received chkp id -1, dir: %s", chkpPath); + if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) { + if (taosIsDir(defaultPath)) { + taosRemoveDir(defaultPath); + } + taosMkDir(defaultPath); + code = copyFiles(chkpPath, defaultPath); + if (code != 0) { + qError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + qInfo("start to restart stream backend at checkpoint path: %s", chkpPath); + } + } else { + qInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, + tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); taosMkDir(defaultPath); } taosMemoryFree(chkpPath); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 70c3031eb2..3993fe09a1 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -400,7 +400,13 @@ _NEXT: uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); - if (nread == -1) { + + qInfo("%s read impl %d, file name: %s", STREAM_STATE_TRANSFER, (int)nread, item->name); + if (nread == 0) { + code = TAOS_SYSTEM_ERROR(errno); + qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, + item->type, tstrerror(code)); + } else if (nread == -1) { code = TAOS_SYSTEM_ERROR(terrno); qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, item->type, tstrerror(code)); @@ -509,6 +515,8 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t code = TAOS_SYSTEM_ERROR(terrno); qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); return code; + } else { + qInfo("succ to write data %s", pItem->name); } pSnapFile->offset += bytes; } else { @@ -531,6 +539,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t } taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); + qInfo("succ to write data %s", pItem->name); pSnapFile->offset += pHdr->size; } code = 0;