refact task backend

This commit is contained in:
yihaoDeng 2023-10-07 17:17:30 +08:00
parent e41da13d5b
commit bdc4afec83
2 changed files with 16 additions and 14 deletions

View File

@ -1639,50 +1639,50 @@ void taskBackendDestroyChkpOpt(STaskBackendWrapper* pBackend) {
taosThreadRwlockDestroy(&pBackend->chkpDirLock); taosThreadRwlockDestroy(&pBackend->chkpDirLock);
} }
int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char** taskFullPath) { int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char** stateFullPath) {
int32_t code = 0; int32_t code = 0;
char* taskPath = taosMemoryCalloc(1, strlen(path) + 128); char* statePath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(taskPath, "%s%s%s", path, TD_DIRSEP, key); sprintf(statePath, "%s%s%s", path, TD_DIRSEP, key);
if (!taosDirExist(taskPath)) { if (!taosDirExist(statePath)) {
code = taosMulMkDir(taskPath); code = taosMulMkDir(statePath);
if (code != 0) { if (code != 0) {
qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code)); qError("failed to create dir: %s, reason:%s", statePath, tstrerror(code));
taosMemoryFree(taskPath); taosMemoryFree(statePath);
return code; return code;
} }
} }
char* dbPath = taosMemoryCalloc(1, strlen(taskPath) + 128); char* dbPath = taosMemoryCalloc(1, strlen(statePath) + 128);
sprintf(dbPath, "%s%s%s", taskPath, TD_DIRSEP, "state"); sprintf(dbPath, "%s%s%s", statePath, TD_DIRSEP, "state");
if (!taosDirExist(dbPath)) { if (!taosDirExist(dbPath)) {
code = taosMulMkDir(dbPath); code = taosMulMkDir(dbPath);
if (code != 0) { if (code != 0) {
qError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code)); qError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
taosMemoryFree(taskPath); taosMemoryFree(statePath);
taosMemoryFree(dbPath); taosMemoryFree(dbPath);
return code; return code;
} }
} }
*dbFullPath = dbPath; *dbFullPath = dbPath;
*taskFullPath = taskPath; *stateFullPath = statePath;
return 0; return 0;
} }
STaskBackendWrapper* taskBackendOpen(char* path, char* key) { STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
char* taskPath = NULL; char* statePath = NULL;
char* err = NULL; char* err = NULL;
char* dbPath = NULL; char* dbPath = NULL;
char** cfNames = NULL; char** cfNames = NULL;
size_t nCf = 0; size_t nCf = 0;
if (taskBackendBuildFullPath(path, key, &dbPath, &taskPath) != 0) { if (taskBackendBuildFullPath(path, key, &dbPath, &statePath) != 0) {
return NULL; return NULL;
} }
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper)); STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper));
pTaskBackend->path = taskPath; pTaskBackend->path = statePath;
taskBackendInitChkpOpt(pTaskBackend); taskBackendInitChkpOpt(pTaskBackend);
taskBackendInitDBOpt(pTaskBackend); taskBackendInitDBOpt(pTaskBackend);

View File

@ -237,9 +237,11 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref)
taosThreadMutexUnlock(&pMeta->backendMutex); taosThreadMutexUnlock(&pMeta->backendMutex);
return NULL; return NULL;
} }
*ref = taosAddRef(taskBackendWrapperId, pBackend); *ref = taosAddRef(taskBackendWrapperId, pBackend);
taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*)); taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*));
taosThreadMutexUnlock(&pMeta->backendMutex);
return pBackend; return pBackend;
} }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {