fix transfer crash

This commit is contained in:
yihaoDeng 2023-10-18 11:34:11 +08:00
parent d67fdb5e46
commit e49b9b2276
3 changed files with 39 additions and 8 deletions

View File

@ -167,6 +167,11 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
return code; return code;
} }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
int32_t code = streamStateLoadTasks(pWriter); int32_t code = streamStateLoadTasks(pWriter);
@ -178,8 +183,3 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta); return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta);
} }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
return streamSnapWrite(pWriter->pWriterImpl, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
}

View File

@ -492,7 +492,29 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char**
} }
} else { } else {
qError("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath, qInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
taosMkDir(defaultPath);
}
taosMemoryFree(chkpPath);
} else {
char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
(int64_t)-1);
qInfo("no chkp id specified, try to restart from received chkp id -1, dir: %s", chkpPath);
if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) {
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
}
taosMkDir(defaultPath);
code = copyFiles(chkpPath, defaultPath);
if (code != 0) {
qError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
qInfo("start to restart stream backend at checkpoint path: %s", chkpPath);
}
} else {
qInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath); tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
taosMkDir(defaultPath); taosMkDir(defaultPath);
} }

View File

@ -400,7 +400,13 @@ _NEXT:
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset);
if (nread == -1) {
qInfo("%s read impl %d, file name: %s", STREAM_STATE_TRANSFER, (int)nread, item->name);
if (nread == 0) {
code = TAOS_SYSTEM_ERROR(errno);
qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
item->type, tstrerror(code));
} else if (nread == -1) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name, qError("%s snap failed to read snap, file name:%s, type:%d,reason:%s", STREAM_STATE_TRANSFER, item->name,
item->type, tstrerror(code)); item->type, tstrerror(code));
@ -509,6 +515,8 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code; return code;
} else {
qInfo("succ to write data %s", pItem->name);
} }
pSnapFile->offset += bytes; pSnapFile->offset += bytes;
} else { } else {
@ -531,6 +539,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
} }
taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset); taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
qInfo("succ to write data %s", pItem->name);
pSnapFile->offset += pHdr->size; pSnapFile->offset += pHdr->size;
} }
code = 0; code = 0;