add trigger checkpoint

This commit is contained in:
yihaoDeng 2023-06-08 09:28:34 +00:00
parent 8b91617145
commit 9828ca9d7f
4 changed files with 57 additions and 2 deletions

View File

@ -46,6 +46,7 @@ typedef struct {
void* streamBackendInit(const char* path); void* streamBackendInit(const char* path);
void streamBackendCleanup(void* arg); void streamBackendCleanup(void* arg);
int32_t streamBackendDoCheckpoint(int64_t rid, const char* cpPath);
SListNode* streamBackendAddCompare(void* backend, void* arg); SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg);

View File

@ -41,6 +41,7 @@ typedef struct {
} RocksdbCfInst; } RocksdbCfInst;
uint32_t nextPow2(uint32_t x) { uint32_t nextPow2(uint32_t x) {
if (x <= 1) return 2;
x = x - 1; x = x - 1;
x = x | (x >> 1); x = x | (x >> 1);
x = x | (x >> 2); x = x | (x >> 2);
@ -189,6 +190,37 @@ void streamBackendCleanup(void* arg) {
qDebug("destroy stream backend backend:%p", pHandle); qDebug("destroy stream backend backend:%p", pHandle);
return; return;
} }
int32_t streamBackendDoCheckpoint(int64_t backendRid, const char* path) {
int64_t st = taosGetTimestampMs();
int32_t code = -1;
SBackendHandle* pHandle = taosAcquireRef(streamBackendId, backendRid);
if (pHandle == NULL) {
return -1;
}
qDebug("stream backend:%p start to do checkpoint at:%s ", pHandle, path);
if (pHandle->db != NULL) {
char* err = NULL;
rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err);
if (cp == NULL || err != NULL) {
taosMemoryFree(err);
qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err);
}
rocksdb_checkpoint_create(cp, path, 64 << 20, &err);
if (err != NULL) {
taosMemoryFree(err);
qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err);
} else {
code = 0;
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, path,
taosGetTimestampMs() - st);
}
rocksdb_checkpoint_object_destroy(cp);
}
taosReleaseRef(streamBackendId, backendRid);
return code;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) { SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)backend; SBackendHandle* pHandle = (SBackendHandle*)backend;
SListNode* node = NULL; SListNode* node = NULL;
@ -810,7 +842,11 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
} }
int streamStateOpenBackend(void* backend, SStreamState* pState) { int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid); void* arg = taosAcquireRef(streamBackendId, pState->streamBackendRid);
if (arg == NULL) {
return -1;
}
SBackendHandle* handle = backend; SBackendHandle* handle = backend;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId); sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);

View File

@ -89,6 +89,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
} }
pMeta->streamBackend = streamBackendInit(streamPath); pMeta->streamBackend = streamBackendInit(streamPath);
if (pMeta->streamBackend == NULL) {
goto _err;
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
taosMemoryFree(streamPath); taosMemoryFree(streamPath);
@ -104,7 +108,6 @@ _err:
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db); if (pMeta->db) tdbClose(pMeta->db);
// if (pMeta->streamBackend) streamBackendCleanup(pMeta->streamBackend);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
qError("failed to open stream meta"); qError("failed to open stream meta");
return NULL; return NULL;
@ -399,3 +402,17 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return 0; return 0;
} }
int32_t streamDoCheckpoint(SStreamMeta* pMeta) {
int code = -1;
char buf[256];
sprintf(buf, "%s/%s", pMeta->path, "checkpoints");
code = taosMulModeMkDir(buf, 0755);
if (code != 0) {
qError("failed to create chechpoint %s, reason:%s", buf, tstrerror(code));
return code;
}
code = streamBackendDoCheckpoint(pMeta->streamBackendRid, buf);
return code;
}

View File

@ -120,6 +120,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta; SStreamMeta* pMeta = pStreamTask->pMeta;
pState->streamBackendRid = pMeta->streamBackendRid; pState->streamBackendRid = pMeta->streamBackendRid;
int code = streamStateOpenBackend(pMeta->streamBackend, pState); int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) { if (code == -1) {
taosReleaseRef(streamBackendId, pMeta->streamBackendRid); taosReleaseRef(streamBackendId, pMeta->streamBackendRid);