refactor backend

This commit is contained in:
yihaoDeng 2023-10-10 20:51:41 +08:00
parent 2aa88dfe9b
commit 28421cd35e
4 changed files with 204 additions and 162 deletions

View File

@ -80,13 +80,14 @@ void* streamBackendInit(const char* path, int64_t chkpId);
void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
STaskBackendWrapper* taskBackendOpen(char* path, char* key);
void taskBackendDestroy(void* pBackend);
int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId);
void* taskBackendAddRef(void* pTaskBackend);
void taskBackendRemoveRef(void* pTaskBackend);

View File

@ -40,6 +40,11 @@ typedef struct {
SRpcMsg msg;
} SStreamContinueExecInfo;
typedef struct {
int64_t streamId;
int64_t taskId;
int64_t chkpId;
} SStreamTaskSnap;
extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;

View File

@ -953,7 +953,15 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
int32_t taskBackendBuildSnap(void* arg, int64_t chkpId) {
SStreamMeta* pMeta = arg;
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
int32_t code = 0;
while (pIter) {
STaskBackendWrapper* pBackend = *(STaskBackendWrapper**)pIter;
taskBackendAddRef(pBackend);
code = taskBackendDoCheckpoint((STaskBackendWrapper*)pBackend, chkpId);
taskBackendRemoveRef(pBackend);
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
}
return 0;
@ -1024,7 +1032,10 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
// taosWUnLockLatch(&pMeta->chkpDirLock);
}
int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) {
/*
0
*/
int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId) {
STaskBackendWrapper* pBackend = arg;
int64_t st = taosGetTimestampMs();
int32_t code = -1;
@ -1065,7 +1076,7 @@ _EXIT:
taosReleaseRef(taskBackendWrapperId, refId);
return code;
}
int32_t streamBackendDoCheckpoint(void* arg, uint64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); }
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); }
SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)backend;
@ -1577,8 +1588,8 @@ void* taskBackendAddRef(void* pTaskBackend) {
return taosAcquireRef(taskBackendWrapperId, pBackend->refId);
}
void taskBackendRemoveRef(void* pTaskBackend) {
// STaskBackendWrapper* pBackend = pTaskBackend;
// taosReleaseRef(taskBackendWrapperId, pBackend->refId);
STaskBackendWrapper* pBackend = pTaskBackend;
taosReleaseRef(taskBackendWrapperId, pBackend->refId);
}
// void taskBackendDestroy(STaskBackendWrapper* wrapper);

View File

@ -17,6 +17,7 @@
#include "query.h"
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tcommon.h"
enum SBackendFileType {
@ -39,7 +40,26 @@ typedef struct SBackendFile {
SArray* pSst;
char* pCheckpointMeta;
char* path;
} SBanckendFile;
typedef struct SBackendSnapFiles2 {
char* pCurrent;
char* pMainfest;
char* pOptions;
SArray* pSst;
char* pCheckpointMeta;
char* path;
int64_t checkpointId;
int64_t seraial;
int64_t offset;
TdFilePtr fd;
int8_t filetype;
SArray* pFileList;
int32_t currFileIdx;
} SBackendSnapFile2;
struct SStreamSnapHandle {
void* handle;
SBanckendFile* pBackendFile;
@ -50,6 +70,9 @@ struct SStreamSnapHandle {
int8_t filetype;
SArray* pFileList;
int32_t currFileIdx;
SArray* pBackendSnapSet;
int32_t currIdx;
};
struct SStreamSnapBlockHdr {
int8_t type;
@ -108,202 +131,204 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
return taosOpenFile(fullname, opt);
}
int32_t streamStateSnapBuild(void* arg, char* path, int64_t chkpId) {
return taskBackendBuildSnap(arg, chkpId);
// int32_t code = 0;
// int8_t validChkp = 0;
int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return taskBackendBuildSnap(arg, chkpId); }
// int len = strlen(path);
// char* tpath = taosMemoryCalloc(1, len + 256);
// memcpy(tpath, path, len);
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
char* buf = taosMemoryCalloc(1, 512);
sprintf(buf, "[current: %s,", pSnapFile->pCurrent);
sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions);
// SStreamMeta *pMeta = arg;
// if (chkpId != 0) {
// sprintf(tpath, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints",
// TD_DIRSEP,
// chkpId);
// if (taosIsDir(tpath)) {
// validChkp = 1;
// qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tpath);
// streamBackendAddInUseChkp(pMeta, chkpId);
// } else {
// qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER,
// tpath);
// }
// }
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* name = taosArrayGetP(pSnapFile->pSst, i);
sprintf(buf + strlen(buf), "%s,", name);
}
sprintf(buf + strlen(buf) - 1, "]");
// no checkpoint specified or not exists invalid checkpoint, do checkpoint at default path and translate it
// if (validChkp == 0) {
// sprintf(tpath, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
// char* chkpdir = taosMemoryCalloc(1, len + 256);
// sprintf(chkpdir, "%s%s%s", tpath, TD_DIRSEP, "tmp");
// taosMemoryFree(tpath);
// tpath = chkpdir;
// qInfo("%s start to trigger checkpoint on %s", STREAM_STATE_TRANSFER, tpath);
// code = streamBackendTriggerChkp(arg, tpath);
// if (code != 0) {
// qError("%s failed to trigger chekckpoint at %s", STREAM_STATE_TRANSFER, tpath);
// taosMemoryFree(tpath);
// return code;
// }
// chkpId = 0;
// }
//*dstPath = tpath;
qInfo("%s get file list: %s", STREAM_STATE_TRANSFER, buf);
taosMemoryFree(buf);
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
// impl later
char* tdir = NULL;
int32_t code = streamStateSnapBuild(pMeta, path, chkpId);
if (code != 0) {
int32_t snapFileCvtMeta(SBackendSnapFile2* pSnapFile) {
SBackendFileItem item;
// current
item.name = pSnapFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
// mainfest
item.name = pSnapFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
// options
item.name = pSnapFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
char* sst = taosArrayGetP(pSnapFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
streamGetFileSize(pSnapFile->path, item.name, &item.size);
taosArrayPush(pSnapFile->pFileList, &item);
}
// meta
item.name = pSnapFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
if (streamGetFileSize(pSnapFile->path, item.name, &item.size) == 0) {
taosArrayPush(pSnapFile->pFileList, &item);
}
return 0;
}
int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
TdDirPtr pDir = taosOpenDir(pSnapFile->path);
if (NULL == pDir) {
qError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path);
return -1;
}
qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir);
TdDirPtr pDir = taosOpenDir(tdir);
if (NULL == pDir) {
qError("%s failed to open %s", STREAM_STATE_TRANSFER, tdir);
goto _err;
}
SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
pHandle->pBackendFile = pFile;
pHandle->checkpointId = chkpId;
pHandle->seraial = 0;
pFile->path = tdir;
pFile->pSst = taosArrayInit(16, sizeof(void*));
TdDirEntryPtr pDirEntry;
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(pDirEntry);
if (strlen(name) >= strlen(ROCKSDB_CURRENT) && 0 == strncmp(name, ROCKSDB_CURRENT, strlen(ROCKSDB_CURRENT))) {
pFile->pCurrent = taosStrdup(name);
pSnapFile->pCurrent = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_MAINFEST) && 0 == strncmp(name, ROCKSDB_MAINFEST, strlen(ROCKSDB_MAINFEST))) {
pFile->pMainfest = taosStrdup(name);
pSnapFile->pMainfest = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_OPTIONS) && 0 == strncmp(name, ROCKSDB_OPTIONS, strlen(ROCKSDB_OPTIONS))) {
pFile->pOptions = taosStrdup(name);
pSnapFile->pOptions = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_CHECKPOINT_META) &&
0 == strncmp(name, ROCKSDB_CHECKPOINT_META, strlen(ROCKSDB_CHECKPOINT_META))) {
pFile->pCheckpointMeta = taosStrdup(name);
pSnapFile->pCheckpointMeta = taosStrdup(name);
continue;
}
if (strlen(name) >= strlen(ROCKSDB_SST) &&
0 == strncmp(name + strlen(name) - strlen(ROCKSDB_SST), ROCKSDB_SST, strlen(ROCKSDB_SST))) {
char* sst = taosStrdup(name);
taosArrayPush(pFile->pSst, &sst);
taosArrayPush(pSnapFile->pSst, &sst);
}
}
{
char* buf = taosMemoryCalloc(1, 512);
sprintf(buf, "[current: %s,", pFile->pCurrent);
sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* name = taosArrayGetP(pFile->pSst, i);
sprintf(buf + strlen(buf), "%s,", name);
}
sprintf(buf + strlen(buf) - 1, "]");
qInfo("%s get file list: %s", STREAM_STATE_TRANSFER, buf);
taosMemoryFree(buf);
}
taosCloseDir(&pDir);
if (pFile->pCurrent == NULL) {
qError("%s failed to open %s, reason: no valid file", STREAM_STATE_TRANSFER, tdir);
code = -1;
tdir = NULL;
goto _err;
}
SArray* list = taosArrayInit(64, sizeof(SBackendFileItem));
SBackendFileItem item;
// current
item.name = pFile->pCurrent;
item.type = ROCKSDB_CURRENT_TYPE;
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// mainfest
item.name = pFile->pMainfest;
item.type = ROCKSDB_MAINFEST_TYPE;
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// options
item.name = pFile->pOptions;
item.type = ROCKSDB_OPTIONS_TYPE;
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
// sst
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
item.name = sst;
item.type = ROCKSDB_SST_TYPE;
streamGetFileSize(pFile->path, item.name, &item.size);
taosArrayPush(list, &item);
}
// meta
item.name = pFile->pCheckpointMeta;
item.type = ROCKSDB_CHECKPOINT_META_TYPE;
if (streamGetFileSize(pFile->path, item.name, &item.size) == 0) {
taosArrayPush(list, &item);
}
pHandle->pBackendFile = pFile;
pHandle->currFileIdx = 0;
pHandle->pFileList = list;
pHandle->seraial = 0;
pHandle->offset = 0;
pHandle->handle = pMeta;
return 0;
}
int32_t streamBackendSnapInitFile(char* path, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
// SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
int32_t code = -1;
char* snapPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(snapPath, "%s%s%" PRId64 "_%" PRId64 "%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, pSnap->streamId,
pSnap->taskId, TD_DIRSEP, "state", TD_DIRSEP, "checkpoints", TD_DIRSEP, pSnap->chkpId);
if (taosIsDir(snapPath)) {
goto _ERROR;
}
pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
pSnapFile->path = snapPath;
if ((code = snapFileReadMeta(pSnapFile)) != 0) {
goto _ERROR;
}
if ((code = snapFileCvtMeta(pSnapFile)) != 0) {
goto _ERROR;
}
snapFileDebugInfo(pSnapFile);
code = 0;
_ERROR:
taosMemoryFree(snapPath);
return code;
}
void snapFileDestroy(SBackendSnapFile2* pSnap) {
taosMemoryFree(pSnap->pCheckpointMeta);
taosMemoryFree(pSnap->pCurrent);
taosMemoryFree(pSnap->pMainfest);
taosMemoryFree(pSnap->pOptions);
taosMemoryFree(pSnap->path);
for (int i = 0; i < taosArrayGetSize(pSnap->pSst); i++) {
char* sst = taosArrayGetP(pSnap->pSst, i);
taosMemoryFree(sst);
}
taosArrayDestroy(pSnap->pFileList);
taosArrayDestroy(pSnap->pSst);
taosCloseFile(&pSnap->fd);
return;
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
// impl later
SArray* pSnapSet = NULL;
int32_t code = streamBackendGetSnapInfo(pMeta, path, chkpId);
if (code != 0) {
return -1;
}
SArray* pBdSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2));
for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) {
SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i);
SBackendSnapFile2 snapFile = {0};
code = streamBackendSnapInitFile(path, pSnap, &snapFile);
ASSERT(code == 0);
taosArrayPush(pBdSnapSet, &snapFile);
}
pHandle->pBackendSnapSet = pBdSnapSet;
pHandle->currIdx = 0;
return 0;
_err:
streamSnapHandleDestroy(pHandle);
taosMemoryFreeClear(tdir);
code = -1;
return code;
}
void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
SBanckendFile* pFile = handle->pBackendFile;
// SBanckendFile* pFile = handle->pBackendFile;
if (handle->pBackendSnapSet) {
for (int i = 0; i < taosArrayGetSize(handle->pBackendSnapSet); i++) {
SBackendSnapFile2* pSnapFile = taosArrayGet(handle->pBackendSnapSet, i);
snapFileDestroy(pSnapFile);
}
taosArrayDestroy(handle->pBackendSnapSet);
}
if (handle->checkpointId == 0) {
// del tmp dir
if (pFile && taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path);
}
} else {
streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
}
if (pFile) {
taosMemoryFree(pFile->pCheckpointMeta);
taosMemoryFree(pFile->pCurrent);
taosMemoryFree(pFile->pMainfest);
taosMemoryFree(pFile->pOptions);
taosMemoryFree(pFile->path);
for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
char* sst = taosArrayGetP(pFile->pSst, i);
taosMemoryFree(sst);
}
taosArrayDestroy(pFile->pSst);
taosMemoryFree(pFile);
}
taosArrayDestroy(handle->pFileList);
taosCloseFile(&handle->fd);
// if (handle->checkpointId == 0) {
// // del tmp dir
// if (pFile && taosIsDir(pFile->path)) {
// taosRemoveDir(pFile->path);
// }
// } else {
// streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
// }
// if (pFile) {
// taosMemoryFree(pFile->pCheckpointMeta);
// taosMemoryFree(pFile->pCurrent);
// taosMemoryFree(pFile->pMainfest);
// taosMemoryFree(pFile->pOptions);
// taosMemoryFree(pFile->path);
// for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) {
// char* sst = taosArrayGetP(pFile->pSst, i);
// taosMemoryFree(sst);
// }
// taosArrayDestroy(pFile->pSst);
// taosMemoryFree(pFile);
// }
// taosArrayDestroy(handle->pFileList);
// taosCloseFile(&handle->fd);
return;
}