refact task backend
This commit is contained in:
parent
54e3ac2c1e
commit
bb265887e6
|
@ -990,7 +990,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
|
||||
if (code < 0) {
|
||||
tqError("vgId:%d failed to add s-task:0x%x, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -1565,7 +1565,7 @@ int32_t taskBackendBuildFullPath(char* path, char* key, char** fullPath) {
|
|||
char* taskPath = taosMemoryCalloc(1, strlen(path) + 128);
|
||||
sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key);
|
||||
if (!taosDirExist(taskPath)) {
|
||||
code = taosMkDir(taskPath);
|
||||
code = taosMulMkDir(taskPath);
|
||||
if (code != 0) {
|
||||
qError("failed to create dir: %s, reason:%s", taskPath, tstrerror(code));
|
||||
taosMemoryFree(taskPath);
|
||||
|
|
|
@ -229,9 +229,12 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) {
|
|||
return *ppBackend;
|
||||
}
|
||||
void* pBackend = taskBackendOpen(pMeta->path, key);
|
||||
if (pBackend == NULL) {
|
||||
taosThreadMutexUnlock(&pMeta->backendMutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*));
|
||||
taosThreadMutexLock(&pMeta->backendMutex);
|
||||
return pBackend;
|
||||
}
|
||||
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
|
||||
|
|
Loading…
Reference in New Issue