refact task backend

This commit is contained in:
yihaoDeng 2023-10-07 12:17:55 +08:00
parent b8e265e502
commit fa70221238
7 changed files with 230 additions and 203 deletions

View File

@ -726,7 +726,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key);
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -738,7 +738,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
if (code != TSDB_CODE_SUCCESS) return code;
pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr);
pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr, &pTask->backendRefId);
if (pTask->pBackend == NULL) return -1;
streamTaskOpenAllUpstreamInput(pTask);

View File

@ -69,17 +69,20 @@ typedef struct {
} STaskBackendWrapper;
void* streamBackendInit(const char* path, int64_t chkpId);
void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
STaskBackendWrapper* taskBackendOpen(char* path, char* key);
void* streamBackendInit(const char* path, int64_t chkpId);
void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
int32_t streamBackendDoCheckpoint(void* pMeta, uint64_t checkpointId);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
void taskBackendAddRef(void* pTaskBackend);
STaskBackendWrapper* taskBackendOpen(char* path, char* key);
void taskBackendDestroy(void* pBackend);
void* taskBackendAddRef(void* pTaskBackend);
void taskBackendRemoveRef(void* pTaskBackend);
int streamStateOpenBackend(void* backend, SStreamState* pState);
void streamStateCloseBackend(SStreamState* pState, bool remove);

View File

@ -18,9 +18,9 @@
#include "executor.h"
#include "query.h"
#include "tstream.h"
#include "streamBackendRocksdb.h"
#include "trpc.h"
#include "tstream.h"
#ifdef __cplusplus
extern "C" {
@ -41,8 +41,9 @@ typedef struct {
} SStreamContinueExecInfo;
extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskBackendWrapperId;
const char* streamGetBlockTypeStr(int32_t type);
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
@ -68,10 +69,11 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);

View File

@ -770,39 +770,40 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
}
int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) {
SArray* pHandle = taosArrayInit(16, POINTER_BYTES);
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
while (pIter) {
int64_t id = *(int64_t*)pIter;
return 0;
// SArray* pHandle = taosArrayInit(16, POINTER_BYTES);
// void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
// while (pIter) {
// int64_t id = *(int64_t*)pIter;
SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id);
if (wrapper == NULL) {
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
continue;
}
// SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id);
// if (wrapper == NULL) {
// pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
// continue;
// }
taosThreadRwlockRdlock(&wrapper->rwLock);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (wrapper->pHandle[i]) {
rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
taosArrayPush(pHandle, &p);
}
}
taosThreadRwlockUnlock(&wrapper->rwLock);
// taosThreadRwlockRdlock(&wrapper->rwLock);
// for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
// if (wrapper->pHandle[i]) {
// rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
// taosArrayPush(pHandle, &p);
// }
// }
// taosThreadRwlockUnlock(&wrapper->rwLock);
taosArrayPush(refs, &id);
}
// taosArrayPush(refs, &id);
// }
int32_t nCf = taosArrayGetSize(pHandle);
// int32_t nCf = taosArrayGetSize(pHandle);
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
ppCf[i] = taosArrayGetP(pHandle, i);
}
taosArrayDestroy(pHandle);
// rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
// for (int i = 0; i < nCf; i++) {
// ppCf[i] = taosArrayGetP(pHandle, i);
// }
// taosArrayDestroy(pHandle);
*ppHandle = ppCf;
return nCf;
// *ppHandle = ppCf;
// return nCf;
}
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
int32_t code = -1;
@ -826,172 +827,176 @@ _ERROR:
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
int code = 0;
char* err = NULL;
return 0;
// int code = 0;
// char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
// rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
// rocksdb_flushoptions_set_wait(flushOpt, 1);
rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err);
code = -1;
}
rocksdb_flushoptions_destroy(flushOpt);
return code;
// rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
// if (err != NULL) {
// qError("failed to flush db before streamBackend clean up, reason:%s", err);
// taosMemoryFree(err);
// code = -1;
// }
// rocksdb_flushoptions_destroy(flushOpt);
// return code;
}
int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
int32_t code = 0;
char* pChkpDir = taosMemoryCalloc(1, 256);
char* pChkpIdDir = taosMemoryCalloc(1, 256);
sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints");
code = taosMulModeMkDir(pChkpDir, 0755, true);
if (code != 0) {
qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
code = -1;
return code;
}
sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId);
if (taosIsDir(pChkpIdDir)) {
qInfo("stream rm exist checkpoint%s", pChkpIdDir);
taosRemoveFile(pChkpIdDir);
}
*chkpDir = pChkpDir;
*chkpIdDir = pChkpIdDir;
return 0;
// int32_t code = 0;
// char* pChkpDir = taosMemoryCalloc(1, 256);
// char* pChkpIdDir = taosMemoryCalloc(1, 256);
// sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints");
// code = taosMulModeMkDir(pChkpDir, 0755, true);
// if (code != 0) {
// qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
// taosMemoryFree(pChkpDir);
// taosMemoryFree(pChkpIdDir);
// code = -1;
// return code;
// }
// sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId);
// if (taosIsDir(pChkpIdDir)) {
// qInfo("stream rm exist checkpoint%s", pChkpIdDir);
// taosRemoveFile(pChkpIdDir);
// }
// *chkpDir = pChkpDir;
// *chkpIdDir = pChkpIdDir;
// return 0;
}
int32_t streamBackendTriggerChkp(void* arg, char* dst) {
SStreamMeta* pMeta = arg;
int64_t backendRid = pMeta->streamBackendRid;
int32_t code = -1;
return 0;
// SStreamMeta* pMeta = arg;
// int64_t backendRid = pMeta->streamBackendRid;
// int32_t code = -1;
SArray* refs = taosArrayInit(16, sizeof(int64_t));
rocksdb_column_family_handle_t** ppCf = NULL;
// SArray* refs = taosArrayInit(16, sizeof(int64_t));
// rocksdb_column_family_handle_t** ppCf = NULL;
int64_t st = taosGetTimestampMs();
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
// int64_t st = taosGetTimestampMs();
// SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL || pHandle->db == NULL) {
goto _ERROR;
}
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, dst, nCf);
// if (pHandle == NULL || pHandle->db == NULL) {
// goto _ERROR;
// }
// int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
// qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, dst, nCf);
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
if (code == 0) {
code = chkpDoDbCheckpoint(pHandle->db, dst);
if (code != 0) {
qError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst);
} else {
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst,
taosGetTimestampMs() - st);
}
} else {
qError("stream backend:%p failed to flush db at:%s", pHandle, dst);
}
// code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
// if (code == 0) {
// code = chkpDoDbCheckpoint(pHandle->db, dst);
// if (code != 0) {
// qError("stream backend:%p failed to do checkpoint at:%s", pHandle, dst);
// } else {
// qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, dst,
// taosGetTimestampMs() - st);
// }
// } else {
// qError("stream backend:%p failed to flush db at:%s", pHandle, dst);
// }
// release all ref to cfWrapper;
for (int i = 0; i < taosArrayGetSize(refs); i++) {
int64_t id = *(int64_t*)taosArrayGet(refs, i);
taosReleaseRef(streamBackendCfWrapperId, id);
}
// // release all ref to cfWrapper;
// // for (int i = 0; i < taosArrayGetSize(refs); i++) {
// // int64_t id = *(int64_t*)taosArrayGet(refs, i);
// // taosReleaseRef(streamBackendCfWrapperId, id);
// // }
_ERROR:
taosReleaseRef(streamBackendId, backendRid);
taosArrayDestroy(refs);
return code;
// _ERROR:
// // taosReleaseRef(streamBackendId, backendRid);
// // taosArrayDestroy(refs);
// return code;
}
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) {
if (arg == NULL) return 0;
// if (arg == NULL) return 0;
SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->chkpInUse, &chkpId);
taosWUnLockLatch(&pMeta->chkpDirLock);
// SStreamMeta* pMeta = arg;
// taosWLockLatch(&pMeta->chkpDirLock);
// taosArrayPush(pMeta->chkpInUse, &chkpId);
// taosWUnLockLatch(&pMeta->chkpDirLock);
return 0;
}
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
if (arg == NULL) return 0;
SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->chkpDirLock);
if (taosArrayGetSize(pMeta->chkpInUse) > 0) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
if (id == chkpId) {
taosArrayPopFrontBatch(pMeta->chkpInUse, 1);
}
}
taosWUnLockLatch(&pMeta->chkpDirLock);
return 0;
// if (arg == NULL) return 0;
// SStreamMeta* pMeta = arg;
// taosWLockLatch(&pMeta->chkpDirLock);
// if (taosArrayGetSize(pMeta->chkpInUse) > 0) {
// int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
// if (id == chkpId) {
// taosArrayPopFrontBatch(pMeta->chkpInUse, 1);
// }
// }
// taosWUnLockLatch(&pMeta->chkpDirLock);
}
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
SStreamMeta* pMeta = arg;
int64_t backendRid = pMeta->streamBackendRid;
int64_t st = taosGetTimestampMs();
int32_t code = -1;
return 0;
// SStreamMeta* pMeta = arg;
// int64_t backendRid = pMeta->streamBackendRid;
// int64_t st = taosGetTimestampMs();
// int32_t code = -1;
SArray* refs = taosArrayInit(16, sizeof(int64_t));
// SArray* refs = taosArrayInit(16, sizeof(int64_t));
rocksdb_column_family_handle_t** ppCf = NULL;
// rocksdb_column_family_handle_t** ppCf = NULL;
char* pChkpDir = NULL;
char* pChkpIdDir = NULL;
if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) {
taosArrayDestroy(refs);
return code;
}
// char* pChkpDir = NULL;
// char* pChkpIdDir = NULL;
// if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) {
// taosArrayDestroy(refs);
// return code;
// }
SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL || pHandle->db == NULL) {
goto _ERROR;
}
// SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid);
// if (pHandle == NULL || pHandle->db == NULL) {
// goto _ERROR;
// }
// Get all cf and acquire cfWrappter
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
// // Get all cf and acquire cfWrappter
// int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
// qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf);
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
if (code == 0) {
code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir);
if (code != 0) {
qError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir);
} else {
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir,
taosGetTimestampMs() - st);
}
} else {
qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
}
// release all ref to cfWrapper;
for (int i = 0; i < taosArrayGetSize(refs); i++) {
int64_t id = *(int64_t*)taosArrayGet(refs, i);
taosReleaseRef(streamBackendCfWrapperId, id);
}
if (code == 0) {
taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->chkpSaved, &checkpointId);
taosWUnLockLatch(&pMeta->chkpDirLock);
// code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
// if (code == 0) {
// code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir);
// if (code != 0) {
// qError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir);
// } else {
// qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir,
// taosGetTimestampMs() - st);
// }
// } else {
// qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
// }
// // release all ref to cfWrapper;
// for (int i = 0; i < taosArrayGetSize(refs); i++) {
// int64_t id = *(int64_t*)taosArrayGet(refs, i);
// taosReleaseRef(streamBackendCfWrapperId, id);
// }
// if (code == 0) {
// taosWLockLatch(&pMeta->chkpDirLock);
// taosArrayPush(pMeta->chkpSaved, &checkpointId);
// taosWUnLockLatch(&pMeta->chkpDirLock);
// delete obsolte checkpoint
delObsoleteCheckpoint(arg, pChkpDir);
pMeta->chkpId = checkpointId;
}
// // delete obsolte checkpoint
// delObsoleteCheckpoint(arg, pChkpDir);
// pMeta->chkpId = checkpointId;
// }
_ERROR:
taosReleaseRef(streamBackendId, backendRid);
taosArrayDestroy(refs);
taosMemoryFree(ppCf);
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
return code;
// _ERROR:
// taosReleaseRef(streamBackendId, backendRid);
// taosArrayDestroy(refs);
// taosMemoryFree(ppCf);
// taosMemoryFree(pChkpDir);
// taosMemoryFree(pChkpIdDir);
// return code;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) {
@ -1499,12 +1504,15 @@ _EXIT:
taosMemoryFree(cfHandle);
return code;
}
void taskBackendAddRef(void* pTaskBackend) {
void* taskBackendAddRef(void* pTaskBackend) {
STaskBackendWrapper* pBackend = pTaskBackend;
taosAcquireRef(streamBackendCfWrapperId, pBackend->refId);
return;
return taosAcquireRef(taskBackendWrapperId, pBackend->refId);
}
void taskBackendDestroy(STaskBackendWrapper* wrapper);
void taskBackendRemoveRef(void* pTaskBackend) {
// STaskBackendWrapper* pBackend = pTaskBackend;
// taosReleaseRef(taskBackendWrapperId, pBackend->refId);
}
// void taskBackendDestroy(STaskBackendWrapper* wrapper);
void taskBackendInitOpt(STaskBackendWrapper* pTaskBackend) {
rocksdb_env_t* env = rocksdb_create_default_env();
@ -1579,44 +1587,49 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
char* taskPath = NULL;
char* err = NULL;
int32_t code = taskBackendBuildFullPath(path, key, &taskPath);
if (code != 0) return NULL;
char** cfNames = NULL;
size_t nCf = 0;
if (0 != taskBackendBuildFullPath(path, key, &taskPath)) return NULL;
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper));
taskBackendInitOpt(pTaskBackend);
char** cfs = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err);
if (nCf == 0 || nCf == 1 || err != NULL) {
taosMemoryFreeClear(err);
pTaskBackend->db = rocksdb_open(pTaskBackend->dbOpt, taskPath, &err);
if (err != NULL) {
qError("failed to open rocksdb, path:%s, reason:%s", taskPath, err);
taosMemoryFreeClear(err);
code = -1;
goto _EXIT;
}
} else {
code = taskBackendOpenCfs(pTaskBackend, taskPath, cfs, nCf);
if (code != 0) goto _EXIT;
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err);
if (nCf == 0) {
pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], taskPath, &err);
rocksdb_close(pTaskBackend->db);
if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf);
taosMemoryFree(err);
}
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err);
ASSERT(err != NULL);
if (0 != taskBackendOpenCfs(pTaskBackend, taskPath, cfNames, nCf)) {
goto _EXIT;
}
if (cfs != NULL) rocksdb_list_column_families_destroy(cfs, nCf);
taosThreadMutexInit(&pTaskBackend->mutex, NULL);
qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend);
taosMemoryFree(taskPath);
pTaskBackend->refId = taosAddRef(streamBackendCfWrapperId, pTaskBackend);
if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf);
return pTaskBackend;
_EXIT:
taskBackendDestroy(pTaskBackend);
if (err != NULL) taosMemoryFree(err);
if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf);
return NULL;
}
void taskBackendDestroy(STaskBackendWrapper* wrapper) {
void taskBackendDestroy(void* pBackend) {
STaskBackendWrapper* wrapper = pBackend;
if (wrapper == NULL) return;
if (wrapper->db && wrapper->pCf) {

View File

@ -30,6 +30,7 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0;
int32_t taskBackendWrapperId = 0;
static int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
static void metaHbToMnode(void* param, void* tmrId);
@ -52,6 +53,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
taskBackendWrapperId = taosOpenRef(64, taskBackendDestroy);
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
@ -220,19 +222,22 @@ int32_t streamMetaMayConvertBackendFormat(SStreamMeta* pMeta) {
return 0;
}
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key) {
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) {
taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key));
if (ppBackend != NULL && *ppBackend != NULL) {
taskBackendAddRef(*ppBackend);
*ref = ((STaskBackendWrapper*)*ppBackend)->refId;
taosThreadMutexUnlock(&pMeta->backendMutex);
return *ppBackend;
}
void* pBackend = taskBackendOpen(pMeta->path, key);
if (pBackend == NULL) {
taosThreadMutexUnlock(&pMeta->backendMutex);
return NULL;
}
*ref = taosAddRef(taskBackendWrapperId, pBackend);
taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*));
return pBackend;

View File

@ -354,7 +354,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosArrayDestroyEx(pTask->pUpstreamInfoList, freeUpstreamItem);
pTask->pUpstreamInfoList = NULL;
}
if (pTask->pBackend) {
taskBackendRemoveRef(pTask->pBackend);
pTask->pBackend = NULL;
}
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask);