refactor backend

This commit is contained in:
yihaoDeng 2023-10-12 11:31:39 +08:00
parent 4c94fbb44f
commit bfd205b3f7
1 changed files with 45 additions and 73 deletions

View File

@ -32,6 +32,7 @@ typedef struct SBackendFileItem {
char* name; char* name;
int8_t type; int8_t type;
int64_t size; int64_t size;
int8_t ref;
} SBackendFileItem; } SBackendFileItem;
typedef struct SBackendFile { typedef struct SBackendFile {
char* pCurrent; char* pCurrent;
@ -74,7 +75,7 @@ struct SStreamSnapHandle {
int32_t currFileIdx; int32_t currFileIdx;
char* metaPath; char* metaPath;
SArray* pBackendSnapSet; SArray* pDbSnapSet;
int32_t currIdx; int32_t currIdx;
}; };
struct SStreamSnapBlockHdr { struct SStreamSnapBlockHdr {
@ -142,9 +143,11 @@ int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) { if (qDebugFlag & DEBUG_DEBUG) {
char* buf = taosMemoryCalloc(1, 512); char* buf = taosMemoryCalloc(1, 512);
sprintf(buf, "[current: %s,", pSnapFile->pCurrent); sprintf(buf + strlen(buf), "[");
sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions); if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent);
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions);
if (pSnapFile->pSst) { if (pSnapFile->pSst) {
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i); char* name = taosArrayGetP(pSnapFile->pSst, i);
@ -160,7 +163,8 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
} }
int32_t snapFileCvtMeta(SBackendSnapFile2* pSnapFile) { int32_t snapFileCvtMeta(SBackendSnapFile2* pSnapFile) {
SBackendFileItem item; SBackendFileItem item = {0};
item.ref = 1;
// current // current
item.name = pSnapFile->pCurrent; item.name = pSnapFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE; item.type = ROCKSDB_CURRENT_TYPE;
@ -270,6 +274,13 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
char* sst = taosArrayGetP(pSnap->pSst, i); char* sst = taosArrayGetP(pSnap->pSst, i);
taosMemoryFree(sst); taosMemoryFree(sst);
} }
// unite read/write snap file
for (int i = 0; i < taosArrayGetSize(pSnap->pFileList); i++) {
SBackendFileItem* pItem = taosArrayGet(pSnap->pFileList, i);
if (pItem->ref == 0) {
taosMemoryFree(pItem->name);
}
}
taosArrayDestroy(pSnap->pFileList); taosArrayDestroy(pSnap->pFileList);
taosArrayDestroy(pSnap->pSst); taosArrayDestroy(pSnap->pSst);
taosCloseFile(&pSnap->fd); taosCloseFile(&pSnap->fd);
@ -285,7 +296,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
return -1; return -1;
} }
SArray* pBdSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) { for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
@ -293,10 +304,10 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
SBackendSnapFile2 snapFile = {0}; SBackendSnapFile2 snapFile = {0};
code = streamBackendSnapInitFile(path, pSnap, &snapFile); code = streamBackendSnapInitFile(path, pSnap, &snapFile);
ASSERT(code == 0); ASSERT(code == 0);
taosArrayPush(pBdSnapSet, &snapFile); taosArrayPush(pDbSnapSet, &snapFile);
} }
pHandle->pBackendSnapSet = pBdSnapSet; pHandle->pDbSnapSet = pDbSnapSet;
pHandle->currIdx = 0; pHandle->currIdx = 0;
return 0; return 0;
@ -308,40 +319,15 @@ _err:
} }
void streamSnapHandleDestroy(SStreamSnapHandle* handle) { void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
// SBanckendFile* pFile = handle->pBackendFile; if (handle->pDbSnapSet) {
if (handle->pBackendSnapSet) { for (int i = 0; i < taosArrayGetSize(handle->pDbSnapSet); i++) {
for (int i = 0; i < taosArrayGetSize(handle->pBackendSnapSet); i++) { SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pDbSnapSet, i);
SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pBackendSnapSet, i);
snapFileDebugInfo(pSnapFile); snapFileDebugInfo(pSnapFile);
snapFileDestroy(pSnapFile); snapFileDestroy(pSnapFile);
} }
taosArrayDestroy(handle->pBackendSnapSet); taosArrayDestroy(handle->pDbSnapSet);
} }
taosMemoryFree(handle->metaPath); taosMemoryFree(handle->metaPath);
// if (handle->checkpointId == 0) {
// // del tmp dir
// if (pFile && taosIsDir(pFile->path)) {
// taosRemoveDir(pFile->path);
// }
// } else {
// streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
// }
// if (pFile) {
// taosMemoryFree(pFile->pCheckpointMeta);
// taosMemoryFree(pFile->pCurrent);
// taosMemoryFree(pFile->pMainfest);
// taosMemoryFree(pFile->pOptions);
// taosMemoryFree(pFile->path);
// for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
// char* sst = taosArrayGetP(pFile->pSst, i);
// taosMemoryFree(sst);
// }
// taosArrayDestroy(pFile->pSst);
// taosMemoryFree(pFile);
// }
// taosArrayDestroy(handle->pFileList);
// taosCloseFile(&handle->fd);
return; return;
} }
@ -374,17 +360,17 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
int32_t code = 0; int32_t code = 0;
SStreamSnapHandle* pHandle = &pReader->handle; SStreamSnapHandle* pHandle = &pReader->handle;
int32_t idx = pHandle->currIdx; int32_t idx = pHandle->currIdx;
SBackendSnapFile2* pSnapFile = taosArrayGet(pHandle->pBackendSnapSet, idx); SBackendSnapFile2* pSnapFile = taosArrayGet(pHandle->pDbSnapSet, idx);
SBackendFileItem* item = NULL; SBackendFileItem* item = NULL;
_NEXT: _NEXT:
if (pSnapFile->fd == NULL) { if (pSnapFile->fd == NULL) {
if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) { if (pSnapFile->currFileIdx >= taosArrayGetSize(pSnapFile->pFileList)) {
if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pBackendSnapSet)) { if (pHandle->currIdx + 1 < taosArrayGetSize(pHandle->pDbSnapSet)) {
pHandle->currIdx += 1; pHandle->currIdx += 1;
pSnapFile = taosArrayGet(pHandle->pBackendSnapSet, pHandle->currIdx); pSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
goto _NEXT; goto _NEXT;
} else { } else {
*ppData = NULL; *ppData = NULL;
@ -466,27 +452,12 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path
SBackendSnapFile2 snapFile = {0}; SBackendSnapFile2 snapFile = {0};
SStreamSnapHandle* pHandle = &pWriter->handle; SStreamSnapHandle* pHandle = &pWriter->handle;
pHandle->pBackendSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
taosArrayPush(pHandle->pBackendSnapSet, &snapFile); taosArrayPush(pHandle->pDbSnapSet, &snapFile);
pHandle->currIdx = 0; pHandle->currIdx = 0;
pHandle->metaPath = taosStrdup(path); pHandle->metaPath = taosStrdup(path);
// SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
// pFile->path = taosStrdup(path);
// SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
// SBackendFileItem item;
// item.name = taosStrdup((char*)ROCKSDB_CURRENT);
// item.type = ROCKSDB_CURRENT_TYPE;
// taosArrayPush(list, &item);
// pHandle->pBackendFile = pFile;
// pHandle->pFileList = list;
// pHandle->currFileIdx = 0;
// pHandle->offset = 0;
*ppWriter = pWriter; *ppWriter = pWriter;
return 0; return 0;
} }
@ -530,9 +501,10 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
pBackendFile->offset = 0; pBackendFile->offset = 0;
pBackendFile->currFileIdx += 1; pBackendFile->currFileIdx += 1;
SBackendFileItem item; SBackendFileItem item = {0};
item.name = taosStrdup(pHdr->name); item.name = taosStrdup(pHdr->name);
item.type = pHdr->type; item.type = pHdr->type;
taosArrayPush(pBackendFile->pFileList, &item); taosArrayPush(pBackendFile->pFileList, &item);
SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx); SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx);
@ -557,26 +529,26 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SStreamSnapHandle* pHandle = &pWriter->handle; SStreamSnapHandle* pHandle = &pWriter->handle;
SStreamTaskSnap snapInfo = pHdr->snapInfo; SStreamTaskSnap snapInfo = pHdr->snapInfo;
SBackendSnapFile2* pBackendFile = taosArrayGet(pHandle->pBackendSnapSet, pHandle->currIdx); SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
if (pBackendFile->inited == 0) { if (pDbSnapFile->inited == 0) {
pBackendFile->snapInfo = snapInfo; pDbSnapFile->snapInfo = snapInfo;
pBackendFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
pBackendFile->currFileIdx = 0; pDbSnapFile->currFileIdx = 0;
pBackendFile->offset = 0; pDbSnapFile->offset = 0;
SBackendFileItem item; SBackendFileItem item = {0};
item.name = taosStrdup((char*)ROCKSDB_CURRENT); item.name = taosStrdup((char*)ROCKSDB_CURRENT);
item.type = ROCKSDB_CURRENT_TYPE; item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(pBackendFile->pFileList, &item); taosArrayPush(pDbSnapFile->pFileList, &item);
pBackendFile->inited = 1; pDbSnapFile->inited = 1;
return streamSnapWriteImpl(pWriter, pData, nData, pBackendFile); return streamSnapWriteImpl(pWriter, pData, nData, pDbSnapFile);
} else { } else {
if (snapInfoEqual(&snapInfo, &pBackendFile->snapInfo)) { if (snapInfoEqual(&snapInfo, &pDbSnapFile->snapInfo)) {
return streamSnapWriteImpl(pWriter, pData, nData, pBackendFile); return streamSnapWriteImpl(pWriter, pData, nData, pDbSnapFile);
} else { } else {
SBackendSnapFile2 snapFile = {0}; SBackendSnapFile2 snapFile = {0};
taosArrayPush(pHandle->pBackendSnapSet, &snapFile); taosArrayPush(pHandle->pDbSnapSet, &snapFile);
pHandle->currIdx += 1; pHandle->currIdx += 1;
return streamSnapWrite(pWriter, pData, nData); return streamSnapWrite(pWriter, pData, nData);
@ -585,8 +557,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
return code; return code;
} }
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) { int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
SStreamSnapHandle* handle = &pWriter->handle; if (pWriter == NULL) return 0;
streamSnapHandleDestroy(handle); streamSnapHandleDestroy(&pWriter->handle);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return 0; return 0;