refactor backend

This commit is contained in:
yihaoDeng 2023-10-12 15:24:50 +08:00
parent f704d4b82e
commit 861d26e356
2 changed files with 27 additions and 31 deletions

View File

@ -203,7 +203,7 @@ _EXIT:
streamBackendCleanup((void*)pBackend); streamBackendCleanup((void*)pBackend);
return code; return code;
} }
int32_t streamMetaMayConvertBackendFormat(SStreamMeta* pMeta) { int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
int8_t compatible = streamMetaCheckBackendCompatible(pMeta); int8_t compatible = streamMetaCheckBackendCompatible(pMeta);
if (compatible == STREAM_STATA_COMPATIBLE) { if (compatible == STREAM_STATA_COMPATIBLE) {
return 0; return 0;
@ -263,7 +263,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
if (streamMetaMayConvertBackendFormat(pMeta) < 0) { if (streamMetaMayCvtDbFormat(pMeta) < 0) {
goto _err; goto _err;
} }
@ -759,16 +759,6 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) { int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) {
if (pMeta == NULL) return 0; if (pMeta == NULL) return 0;
// void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
// while (pIter) {
// STaskDbWrapper* taskBackend = *(STaskDbWrapper**)pIter;
// if (taskBackend != NULL) {
// taskDbRemoveRef(taskBackend);
// }
// pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
// }
// taosHashClear(pMeta->pTaskBackendUnique);
return streamMetaLoadAllTasks(pMeta); return streamMetaLoadAllTasks(pMeta);
} }
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {

View File

@ -134,6 +134,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
char fullname[256] = {0}; char fullname[256] = {0};
STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname); STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname);
return taosOpenFile(fullname, opt); return taosOpenFile(fullname, opt);
} }
@ -469,54 +470,53 @@ int32_t snapInfoEqual(SStreamTaskSnap* a, SStreamTaskSnap* b) {
return 1; return 1;
} }
int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nData, SBackendSnapFile2* pSnapFile) {
SBackendSnapFile2* pBackendFile) {
int code = -1; int code = -1;
SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData; SStreamSnapBlockHdr* pHdr = (SStreamSnapBlockHdr*)pData;
SStreamSnapHandle* pHandle = &pWriter->handle; SStreamSnapHandle* pHandle = &pWriter->handle;
SStreamTaskSnap snapInfo = pHdr->snapInfo; SStreamTaskSnap snapInfo = pHdr->snapInfo;
SStreamTaskSnap* pSnapInfo = &pBackendFile->snapInfo; SStreamTaskSnap* pSnapInfo = &pSnapFile->snapInfo;
SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx); SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
if (pBackendFile->fd == 0) { if (pSnapFile->fd == 0) {
pBackendFile->fd = streamOpenFile(pHandle->metaPath, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pBackendFile->fd == NULL) { if (pSnapFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP, qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pHandle->metaPath, TD_DIRSEP,
pHdr->name, tstrerror(code)); pHdr->name, tstrerror(code));
} }
} }
if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) { if (strlen(pHdr->name) == strlen(pItem->name) && strcmp(pHdr->name, pItem->name) == 0) {
int64_t bytes = taosPWriteFile(pBackendFile->fd, pHdr->data, pHdr->size, pBackendFile->offset); int64_t bytes = taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
if (bytes != pHdr->size) { if (bytes != pHdr->size) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code)); qError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code; return code;
} }
pBackendFile->offset += bytes; pSnapFile->offset += bytes;
} else { } else {
taosCloseFile(&pBackendFile->fd); taosCloseFile(&pSnapFile->fd);
pBackendFile->offset = 0; pSnapFile->offset = 0;
pBackendFile->currFileIdx += 1; pSnapFile->currFileIdx += 1;
SBackendFileItem item = {0}; SBackendFileItem item = {0};
item.name = taosStrdup(pHdr->name); item.name = taosStrdup(pHdr->name);
item.type = pHdr->type; item.type = pHdr->type;
taosArrayPush(pBackendFile->pFileList, &item); taosArrayPush(pSnapFile->pFileList, &item);
SBackendFileItem* pItem = taosArrayGet(pBackendFile->pFileList, pBackendFile->currFileIdx); SBackendFileItem* pItem = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
pBackendFile->fd = streamOpenFile(pHandle->metaPath, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); pSnapFile->fd = streamOpenFile(pSnapFile->path, pItem->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pBackendFile->fd == NULL) { if (pSnapFile->fd == NULL) {
code = TAOS_SYSTEM_ERROR(terrno); code = TAOS_SYSTEM_ERROR(terrno);
qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pBackendFile->path, TD_DIRSEP, qError("%s failed to open file name:%s%s%s, reason:%s", STREAM_STATE_TRANSFER, pSnapFile->path, TD_DIRSEP,
pHdr->name, tstrerror(code)); pHdr->name, tstrerror(code));
} }
taosPWriteFile(pBackendFile->fd, pHdr->data, pHdr->size, pBackendFile->offset); taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
pBackendFile->offset += pHdr->size; pSnapFile->offset += pHdr->size;
} }
code = 0; code = 0;
_EXIT: _EXIT:
@ -531,6 +531,11 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx); SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
if (pDbSnapFile->inited == 0) { if (pDbSnapFile->inited == 0) {
char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256);
sprintf(path, "%s%s%" PRId64 "_%" PRId64 "%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, snapInfo.streamId,
snapInfo.taskId, TD_DIRSEP, "state", TD_DIRSEP, "checkpoints", TD_DIRSEP, snapInfo.chkpId);
pDbSnapFile->path = path;
pDbSnapFile->snapInfo = snapInfo; pDbSnapFile->snapInfo = snapInfo;
pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
pDbSnapFile->currFileIdx = 0; pDbSnapFile->currFileIdx = 0;
@ -539,6 +544,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SBackendFileItem item = {0}; SBackendFileItem item = {0};
item.name = taosStrdup((char*)ROCKSDB_CURRENT); item.name = taosStrdup((char*)ROCKSDB_CURRENT);
item.type = ROCKSDB_CURRENT_TYPE; item.type = ROCKSDB_CURRENT_TYPE;
taosArrayPush(pDbSnapFile->pFileList, &item); taosArrayPush(pDbSnapFile->pFileList, &item);
pDbSnapFile->inited = 1; pDbSnapFile->inited = 1;