factor code

This commit is contained in:
yihaoDeng 2023-05-11 13:01:45 +00:00
parent b91981af59
commit 6cf1adf1b1
1 changed files with 7 additions and 13 deletions

View File

@ -39,18 +39,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf(streamPath, "%s/%s", path, "stream"); sprintf(streamPath, "%s/%s", path, "stream");
pMeta->path = taosStrdup(streamPath); pMeta->path = taosStrdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
taosMemoryFree(streamPath);
goto _err; goto _err;
} }
memset(streamPath, 0, len);
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(streamPath, 0755); code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
taosMemoryFree(streamPath);
goto _err; goto _err;
} }
taosMemoryFree(streamPath);
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err; goto _err;
@ -83,24 +81,24 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
pMeta->streamBackendId = streamBackendId; pMeta->streamBackendId = streamBackendId;
char* statePath = taosMemoryCalloc(1, len); memset(streamPath, 0, len);
sprintf(statePath, "%s/%s", pMeta->path, "state"); sprintf(streamPath, "%s/%s", pMeta->path, "state");
code = taosMulModeMkDir(statePath, 0755); code = taosMulModeMkDir(streamPath, 0755);
if (code != 0) { if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
taosMemoryFree(streamPath);
goto _err; goto _err;
} }
pMeta->streamBackend = streamBackendInit(statePath); pMeta->streamBackend = streamBackendInit(streamPath);
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosMemoryFree(statePath); taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock); taosInitRWLatch(&pMeta->lock);
return pMeta; return pMeta;
_err: _err:
taosMemoryFree(streamPath);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList); if (pMeta->pTaskList) taosArrayDestroy(pMeta->pTaskList);
@ -266,13 +264,9 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
// taosWLockLatch(&pMeta->lock);
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t)); taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn); tdbTbDelete(pMeta->pTaskDb, &taskId, sizeof(int32_t), pMeta->txn);
//
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);