refactor backend

This commit is contained in:
yihaoDeng 2023-10-11 20:26:19 +08:00
parent f264ffdcd6
commit 4c94fbb44f
6 changed files with 44 additions and 44 deletions

View File

@ -85,9 +85,9 @@ SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskBackendOpen(char* path, char* key);
STaskDbWrapper* taskDbOpen(char* path, char* key);
void taskDbDestroy(void* pBackend);
int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
void* taskDbAddRef(void* pTaskDb);
void taskDbRemoveRef(void* pTaskDb);
@ -182,7 +182,7 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskBackendBuildSnap(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, int64_t chkpId);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif

View File

@ -48,7 +48,7 @@ typedef struct {
extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskBackendWrapperId;
extern int32_t taskDbWrapperId;
const char* streamGetBlockTypeStr(int32_t type);
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);

View File

@ -950,18 +950,18 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
return 0;
}
int32_t taskBackendBuildSnap(void* arg, int64_t chkpId) {
int32_t taskDbBuildSnap(void* arg, int64_t chkpId) {
SStreamMeta* pMeta = arg;
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
int32_t code = 0;
while (pIter) {
STaskDbWrapper* pBackend = *(STaskDbWrapper**)pIter;
taskDbAddRef(pBackend);
STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
taskDbAddRef(pTaskDb);
code = taskBackendDoCheckpoint((STaskDbWrapper*)pBackend, chkpId);
code = taskDbDoCheckpoint((STaskDbWrapper*)pTaskDb, chkpId);
taskDbRemoveRef(pBackend);
taskDbRemoveRef(pTaskDb);
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
}
return 0;
@ -1035,48 +1035,48 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
/*
0
*/
int32_t taskBackendDoCheckpoint(void* arg, int64_t chkpId) {
STaskDbWrapper* pBackend = arg;
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
STaskDbWrapper* pTaskDb = arg;
int64_t st = taosGetTimestampMs();
int32_t code = -1;
int64_t refId = pBackend->refId;
int64_t refId = pTaskDb->refId;
if (taosAcquireRef(taskBackendWrapperId, refId) == NULL) {
if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
return -1;
}
char* pChkpDir = NULL;
char* pChkpIdDir = NULL;
if (chkpPreBuildDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
if (chkpPreBuildDir(pTaskDb->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
code = -1;
goto _EXIT;
}
// Get all cf and acquire cfWrappter
rocksdb_column_family_handle_t** ppCf = NULL;
int32_t nCf = chkpGetAllDbCfHandle2(pBackend, &ppCf);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pBackend, pChkpIdDir, nCf);
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
if ((code = chkpPreFlushDb(pBackend->db, ppCf, nCf)) == 0) {
if ((code = chkpDoDbCheckpoint(pBackend->db, pChkpIdDir)) != 0) {
qError("stream backend:%p failed to do checkpoint at:%s", pBackend, pChkpIdDir);
if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) {
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
qError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
} else {
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pBackend, pChkpIdDir,
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
taosGetTimestampMs() - st);
}
} else {
qError("stream backend:%p failed to flush db at:%s", pBackend, pChkpIdDir);
qError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir);
}
code = chkpMayDelObsolete(pBackend, chkpId, pChkpDir);
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
_EXIT:
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
taosReleaseRef(taskBackendWrapperId, refId);
taosReleaseRef(taskDbWrapperId, refId);
return code;
}
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskBackendDoCheckpoint(arg, chkpId); }
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskDbDoCheckpoint(arg, chkpId); }
SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendWrapper* pHandle = (SBackendWrapper*)backend;
@ -1585,11 +1585,11 @@ _EXIT:
}
void* taskDbAddRef(void* pTaskDb) {
STaskDbWrapper* pBackend = pTaskDb;
return taosAcquireRef(taskBackendWrapperId, pBackend->refId);
return taosAcquireRef(taskDbWrapperId, pBackend->refId);
}
void taskDbRemoveRef(void* pTaskDb) {
STaskDbWrapper* pBackend = pTaskDb;
taosReleaseRef(taskBackendWrapperId, pBackend->refId);
taosReleaseRef(taskDbWrapperId, pBackend->refId);
}
// void taskDbDestroy(STaskDbWrapper* wrapper);
@ -1647,19 +1647,19 @@ void taskDbInitDBOpt(STaskDbWrapper* pTaskDb) {
}
return;
}
void taskDbInitChkpOpt(STaskDbWrapper* pBackend) {
pBackend->chkpId = -1;
pBackend->chkpCap = 4;
pBackend->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pBackend->chkpInUse = taosArrayInit(4, sizeof(int64_t));
void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) {
pTaskDb->chkpId = -1;
pTaskDb->chkpCap = 4;
pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t));
taosThreadRwlockInit(&pBackend->chkpDirLock, NULL);
taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL);
}
void taskDbDestroyChkpOpt(STaskDbWrapper* pBackend) {
taosArrayDestroy(pBackend->chkpSaved);
taosArrayDestroy(pBackend->chkpInUse);
taosThreadRwlockDestroy(&pBackend->chkpDirLock);
void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) {
taosArrayDestroy(pTaskDb->chkpSaved);
taosArrayDestroy(pTaskDb->chkpInUse);
taosThreadRwlockDestroy(&pTaskDb->chkpDirLock);
}
int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
@ -1692,7 +1692,7 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta
*stateFullPath = statePath;
return 0;
}
STaskDbWrapper* taskBackendOpen(char* path, char* key) {
STaskDbWrapper* taskDbOpen(char* path, char* key) {
char* statePath = NULL;
char* err = NULL;
char* dbPath = NULL;
@ -1866,7 +1866,7 @@ int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) {
int32_t code = 0;
STaskDbWrapper* pTaskDb = taskBackendOpen(path, key);
STaskDbWrapper* pTaskDb = taskDbOpen(path, key);
RocksdbCfInst* pSrcBackend = pCfInst;
for (int i = 0; i < nCf; i++) {

View File

@ -30,7 +30,7 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0;
int32_t taskBackendWrapperId = 0;
int32_t taskDbWrapperId = 0;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId);
@ -53,7 +53,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
taskBackendWrapperId = taosOpenRef(64, taskDbDestroy);
taskDbWrapperId = taosOpenRef(64, taskDbDestroy);
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
@ -232,13 +232,13 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref)
return *ppBackend;
}
void* pBackend = taskBackendOpen(pMeta->path, key);
void* pBackend = taskDbOpen(pMeta->path, key);
if (pBackend == NULL) {
taosThreadMutexUnlock(&pMeta->backendMutex);
return NULL;
}
*ref = taosAddRef(taskBackendWrapperId, pBackend);
*ref = taosAddRef(taskDbWrapperId, pBackend);
taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*));
taosThreadMutexUnlock(&pMeta->backendMutex);

View File

@ -137,7 +137,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
return taosOpenFile(fullname, opt);
}
int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return taskBackendBuildSnap(arg, chkpId); }
int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return taskDbBuildSnap(arg, chkpId); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {

View File

@ -392,7 +392,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
taosThreadMutexInit(&pTask->lock, NULL);
streamTaskOpenAllUpstreamInput(pTask);
// pTask->pBackend = taskBackendOpen(pMeta->path, (char*)pTask->id.idStr);
// pTask->pBackend = taskDbOpen(pMeta->path, (char*)pTask->id.idStr);
return TSDB_CODE_SUCCESS;
}