refactor backend

This commit is contained in:
yihaoDeng 2023-10-11 16:03:04 +08:00
parent 4cff121a38
commit 0baef335d7
1 changed files with 89 additions and 36 deletions

View File

@ -59,6 +59,7 @@ typedef struct SBackendSnapFiles2 {
SArray* pFileList; SArray* pFileList;
int32_t currFileIdx; int32_t currFileIdx;
SStreamTaskSnap snapInfo; SStreamTaskSnap snapInfo;
int8_t inited;
} SBackendSnapFile2; } SBackendSnapFile2;
struct SStreamSnapHandle { struct SStreamSnapHandle {
@ -71,6 +72,7 @@ struct SStreamSnapHandle {
int8_t filetype; int8_t filetype;
SArray* pFileList; SArray* pFileList;
int32_t currFileIdx; int32_t currFileIdx;
char* metaPath;
SArray* pBackendSnapSet; SArray* pBackendSnapSet;
int32_t currIdx; int32_t currIdx;
@ -310,6 +312,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
} }
taosArrayDestroy(handle->pBackendSnapSet); taosArrayDestroy(handle->pBackendSnapSet);
} }
taosMemoryFree(handle->metaPath);
// if (handle->checkpointId == 0) { // if (handle->checkpointId == 0) {
// // del tmp dir // // del tmp dir
@ -455,76 +458,126 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
if (pWriter == NULL) { if (pWriter == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
SBackendSnapFile2 snapFile = {0};
SStreamSnapHandle* pHandle = &pWriter->handle; SStreamSnapHandle* pHandle = &pWriter->handle;
pHandle->pBackendSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); taosArrayPush(pHandle->pBackendSnapSet, &snapFile);
pFile->path = taosStrdup(path); pHandle->currIdx = 0;
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); pHandle->metaPath = taosStrdup(path);
SBackendFileItem item; // SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
item.name = taosStrdup((char*)ROCKSDB_CURRENT); // pFile->path = taosStrdup(path);
item.type = ROCKSDB_CURRENT_TYPE; // SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
taosArrayPush(list, &item);
pHandle->pBackendFile = pFile; // SBackendFileItem item;
// item.name = taosStrdup((char*)ROCKSDB_CURRENT);
// item.type = ROCKSDB_CURRENT_TYPE;
// taosArrayPush(list, &item);
pHandle->pFileList = list; // pHandle->pBackendFile = pFile;
pHandle->currFileIdx = 0;
pHandle->offset = 0; // pHandle->pFileList = list;
// pHandle->currFileIdx = 0;
// pHandle->offset = 0;
*ppWriter = pWriter; *ppWriter = pWriter;
return 0; return 0;
} }
int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) {
int32_t code = 0; if (a->streamId != b->streamId || a->taskId != b->taskId || a->chkpId != b->chkpId) {
return 0;
}
return 1;
}
int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData,
SBackendSnapFile2* pBackendFile) {
int code = -1;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle; SStreamSnapHandle* pHandle = &pWriter->handle;
SBanckendFile* pFile = pHandle->pBackendFile; SStreamTaskSnap snapInfo = pHdr->snapInfo;
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx);
if (pHandle->fd == NULL) { SStreamTaskSnap* pSnapInfo = &pBackendFile->snapInfo;
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) { SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx);
if (pBackendFile->fd == 0) {
pBackendFile->fd = streamOpenFile(pHandle->metaPath, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pBackendFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP,
tstrerror(code)); pHdr->name, tstrerror(code));
} }
} }
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); int64_t bytes = taosPWriteFile(pBackendFile->fd, pHdr->data, pHdr->size, pBackendFile->offset);
if (bytes != pHdr->size) { if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code; return code;
} }
pHandle->offset += bytes; pBackendFile->offset += bytes;
} else { } else {
taosCloseFile(&pHandle->fd); taosCloseFile(&pBackendFile->fd);
pHandle->offset = 0; pBackendFile->offset = 0;
pHandle->currFileIdx += 1; pBackendFile->currFileIdx += 1;
SBackendFileItem item; SBackendFileItem item;
item.name = taosStrdup(pHdr->name); item.name = taosStrdup(pHdr->name);
item.type = pHdr->type; item.type = pHdr->type;
taosArrayPush(pHandle->pFileList, &item); taosArrayPush(pBackendFile->pFileList, &item);
SBackendFileItem* pItem = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx);
pHandle->fd = streamOpenFile(pFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pBackendFile->fd = streamOpenFile(pHandle->metaPath, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pHandle->fd == NULL) { if (pBackendFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pFile->path, TD_DIRSEP, pHdr->name, qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pBackendFile->path, TD_DIRSEP,
tstrerror(code)); pHdr->name, tstrerror(code));
} }
taosPWriteFile(pHandle->fd, pHdr->data, pHdr->size, pHandle->offset); taosPWriteFile(pBackendFile->fd, pHdr->data, pHdr->size, pBackendFile->offset);
pHandle->offset += pHdr->size; pBackendFile->offset += pHdr->size;
} }
code = 0;
_EXIT:
return code;
}
int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
// impl later SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
return 0; SStreamSnapHandle* pHandle = &pWriter->handle;
SStreamTaskSnap snapInfo = pHdr->snapInfo;
SBackendSnapFile2* pBackendFile = taosArrayGet(pHandle->pBackendSnapSet, pHandle->currIdx);
if (pBackendFile->inited == 0) {
pBackendFile->snapInfo = snapInfo;
pBackendFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
pBackendFile->currFileIdx = 0;
pBackendFile->offset = 0;
SBackendFileItem item;
item.name = taosStrdup((char*)ROCKSDB_CURRENT);
item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(pBackendFile->pFileList, &item);
pBackendFile->inited = 1;
return streamSnapWriteImpl(pWriter, pData, nData, pBackendFile);
} else {
if (snapInfoEqual(&snapInfo, &pBackendFile->snapInfo)) {
return streamSnapWriteImpl(pWriter, pData, nData, pBackendFile);
} else {
SBackendSnapFile2 snapFile = {0};
taosArrayPush(pHandle->pBackendSnapSet, &snapFile);
pHandle->currIdx += 1;
return streamSnapWrite(pWriter, pData, nData);
}
}
return code;
} }
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
SStreamSnapHandle* handle = &pWriter->handle; SStreamSnapHandle* handle = &pWriter->handle;