refact task backend

This commit is contained in:
yihaoDeng 2023-10-07 16:48:36 +08:00
parent 66315fe405
commit c146327da8
3 changed files with 213 additions and 53 deletions

View File

@ -419,9 +419,9 @@ typedef struct SStreamMeta {
int64_t rid;
int64_t chkpId;
int32_t chkpCap;
SArray* chkpSaved;
SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
int32_t pauseTaskNum;

View File

@ -66,6 +66,13 @@ typedef struct {
TdThreadMutex mutex;
char* idstr;
int64_t refId;
char* path;
int64_t chkpId;
SArray* chkpSaved;
SArray* chkpInUse;
int32_t chkpCap;
TdThreadRwlock chkpDirLock;
} STaskBackendWrapper;

View File

@ -709,6 +709,60 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
taosArrayDestroy(chkpDel);
return 0;
}
/*
* checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
* chkpInUse: |--cp2--|--cp4--|
* chkpInUse is doing translation, cannot del until
* replication is finished
*/
int32_t chkpDelObsolete(void* arg, char* path) {
STaskBackendWrapper* pBackend = arg;
taosThreadRwlockWrlock(&pBackend->chkpDirLock);
SArray* chkpDel = taosArrayInit(8, sizeof(int64_t));
SArray* chkpDup = taosArrayInit(8, sizeof(int64_t));
int64_t firsId = 0;
if (taosArrayGetSize(pBackend->chkpInUse) >= 1) {
firsId = *(int64_t*)taosArrayGet(pBackend->chkpInUse, 0);
for (int i = 0; i < taosArrayGetSize(pBackend->chkpSaved); i++) {
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
if (id >= firsId) {
taosArrayPush(chkpDup, &id);
} else {
taosArrayPush(chkpDel, &id);
}
}
} else {
int32_t sz = taosArrayGetSize(pBackend->chkpSaved);
int32_t dsz = sz - pBackend->chkpCap; // del size
for (int i = 0; i < dsz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
taosArrayPush(chkpDel, &id);
}
for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pBackend->chkpSaved, i);
taosArrayPush(chkpDup, &id);
}
}
taosArrayDestroy(pBackend->chkpSaved);
pBackend->chkpSaved = chkpDup;
taosThreadRwlockUnlock(&pBackend->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
char tbuf[256] = {0};
sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
if (taosIsDir(tbuf)) {
taosRemoveDir(tbuf);
}
}
taosArrayDestroy(chkpDel);
return 0;
}
static int32_t compareCheckpoint(const void* a, const void* b) {
int64_t x = *(int64_t*)a;
@ -805,6 +859,27 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
// *ppHandle = ppCf;
// return nCf;
}
int32_t chkpGetAllDbCfHandle2(STaskBackendWrapper* pBackend, rocksdb_column_family_handle_t*** ppHandle) {
SArray* pHandle = taosArrayInit(8, POINTER_BYTES);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (pBackend->pCf[i]) {
rocksdb_column_family_handle_t* p = pBackend->pCf[i];
taosArrayPush(pHandle, &p);
}
}
int32_t nCf = taosArrayGetSize(pHandle);
rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
for (int i = 0; i < nCf; i++) {
ppCf[i] = taosArrayGetP(pHandle, i);
}
taosArrayDestroy(pHandle);
*ppHandle = ppCf;
return nCf;
}
int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) {
int32_t code = -1;
char* err = NULL;
@ -827,47 +902,45 @@ _ERROR:
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
return 0;
// int code = 0;
// char* err = NULL;
int code = 0;
char* err = NULL;
// rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
// rocksdb_flushoptions_set_wait(flushOpt, 1);
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
// rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
// if (err != NULL) {
// qError("failed to flush db before streamBackend clean up, reason:%s", err);
// taosMemoryFree(err);
// code = -1;
// }
// rocksdb_flushoptions_destroy(flushOpt);
// return code;
rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err);
if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err);
code = -1;
}
rocksdb_flushoptions_destroy(flushOpt);
return code;
}
int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) {
int32_t code = 0;
char* pChkpDir = taosMemoryCalloc(1, 256);
char* pChkpIdDir = taosMemoryCalloc(1, 256);
sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints");
code = taosMulModeMkDir(pChkpDir, 0755, true);
if (code != 0) {
qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
code = -1;
return code;
}
sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId);
if (taosIsDir(pChkpIdDir)) {
qInfo("stream rm exist checkpoint%s", pChkpIdDir);
taosRemoveFile(pChkpIdDir);
}
*chkpDir = pChkpDir;
*chkpIdDir = pChkpIdDir;
return 0;
// int32_t code = 0;
// char* pChkpDir = taosMemoryCalloc(1, 256);
// char* pChkpIdDir = taosMemoryCalloc(1, 256);
// sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints");
// code = taosMulModeMkDir(pChkpDir, 0755, true);
// if (code != 0) {
// qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code));
// taosMemoryFree(pChkpDir);
// taosMemoryFree(pChkpIdDir);
// code = -1;
// return code;
// }
// sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId);
// if (taosIsDir(pChkpIdDir)) {
// qInfo("stream rm exist checkpoint%s", pChkpIdDir);
// taosRemoveFile(pChkpIdDir);
// }
// *chkpDir = pChkpDir;
// *chkpIdDir = pChkpIdDir;
// return 0;
}
int32_t streamBackendTriggerChkp(void* arg, char* dst) {
@ -936,6 +1009,49 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
// taosWUnLockLatch(&pMeta->chkpDirLock);
}
int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) {
STaskBackendWrapper* pBackend = arg;
int64_t st = taosGetTimestampMs();
int32_t code = -1;
int64_t refId = pBackend->refId;
if (taosAcquireRef(taskBackendWrapperId, refId) == NULL) {
return -1;
}
char* pChkpDir = NULL;
char* pChkpIdDir = NULL;
if (chkpPreCheckDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
goto _EXIT;
}
// Get all cf and acquire cfWrappter
rocksdb_column_family_handle_t** ppCf = NULL;
int32_t nCf = chkpGetAllDbCfHandle2(pBackend, &ppCf);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pBackend, pChkpIdDir, nCf);
if ((code = chkpPreFlushDb(pBackend->db, ppCf, nCf)) == 0) {
if ((code = chkpDoDbCheckpoint(pBackend->db, pChkpIdDir)) != 0) {
qError("stream backend:%p failed to do checkpoint at:%s", pBackend, pChkpIdDir);
} else {
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pBackend, pChkpIdDir,
taosGetTimestampMs() - st);
}
} else {
qError("stream backend:%p failed to flush db at:%s", pBackend, pChkpIdDir);
}
code = chkpDelObsolete(pBackend, pChkpDir);
taosReleaseRef(taskBackendWrapperId, refId);
return code;
_EXIT:
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
taosReleaseRef(taskBackendWrapperId, refId);
return -1;
}
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
return 0;
// SStreamMeta* pMeta = arg;
@ -1514,7 +1630,7 @@ void taskBackendRemoveRef(void* pTaskBackend) {
}
// void taskBackendDestroy(STaskBackendWrapper* wrapper);
void taskBackendInitOpt(STaskBackendWrapper* pTaskBackend) {
void taskBackendInitDBOpt(STaskBackendWrapper* pTaskBackend) {
rocksdb_env_t* env = rocksdb_create_default_env();
rocksdb_cache_t* cache = rocksdb_cache_create_lru(256);
@ -1568,10 +1684,26 @@ void taskBackendInitOpt(STaskBackendWrapper* pTaskBackend) {
}
return;
}
int32_t taskBackendBuildFullPath(char* path, char* key, char** fullPath) {
void taskBackendInitChkpOpt(STaskBackendWrapper* pBackend) {
pBackend->chkpId = -1;
pBackend->chkpCap = 4;
pBackend->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pBackend->chkpInUse = taosArrayInit(4, sizeof(int64_t));
taosThreadRwlockInit(&pBackend->chkpDirLock, NULL);
}
void taskBackendDestroyChkpOpt(STaskBackendWrapper* pBackend) {
taosArrayDestroy(pBackend->chkpSaved);
taosArrayDestroy(pBackend->chkpInUse);
taosThreadRwlockDestroy(&pBackend->chkpDirLock);
}
int32_t taskBackendBuildFullPath(char* path, char* key, char** dbFullPath, char** taskFullPath) {
int32_t code = 0;
char* taskPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(taskPath, "%s%s%s%s%s", path, TD_DIRSEP, "state", TD_DIRSEP, key);
char* taskPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(taskPath, "%s%s%s", path, TD_DIRSEP, key);
if (!taosDirExist(taskPath)) {
code = taosMulMkDir(taskPath);
if (code != 0) {
@ -1580,36 +1712,53 @@ int32_t taskBackendBuildFullPath(char* path, char* key, char** fullPath) {
return code;
}
}
*fullPath = taskPath;
char* dbPath = taosMemoryCalloc(1, strlen(taskPath) + 128);
sprintf(dbPath, "%s%s%s", taskPath, TD_DIRSEP, "state");
if (!taosDirExist(dbPath)) {
code = taosMulMkDir(dbPath);
if (code != 0) {
qError("failed to create dir: %s, reason:%s", dbPath, tstrerror(code));
taosMemoryFree(taskPath);
taosMemoryFree(dbPath);
return code;
}
}
*dbFullPath = dbPath;
*taskFullPath = taskPath;
return 0;
}
STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
char* taskPath = NULL;
char* err = NULL;
char* taskPath = NULL;
char* err = NULL;
char* dbPath = NULL;
char** cfNames = NULL;
size_t nCf = 0;
if (taskBackendBuildFullPath(path, key, &taskPath) != 0) {
if (taskBackendBuildFullPath(path, key, &dbPath, &taskPath) != 0) {
return NULL;
}
STaskBackendWrapper* pTaskBackend = taosMemoryCalloc(1, sizeof(STaskBackendWrapper));
taskBackendInitOpt(pTaskBackend);
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err);
pTaskBackend->path = taskPath;
taskBackendInitChkpOpt(pTaskBackend);
taskBackendInitDBOpt(pTaskBackend);
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err);
if (nCf == 0) {
pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], taskPath, &err);
pTaskBackend->db = rocksdb_open(pTaskBackend->pCfOpts[0], dbPath, &err);
rocksdb_close(pTaskBackend->db);
if (cfNames != NULL) rocksdb_list_column_families_destroy(cfNames, nCf);
taosMemoryFree(err);
}
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, taskPath, &nCf, &err);
cfNames = rocksdb_list_column_families(pTaskBackend->dbOpt, dbPath, &nCf, &err);
ASSERT(err != NULL);
if (taskBackendOpenCfs(pTaskBackend, taskPath, cfNames, nCf) != 0) {
if (taskBackendOpenCfs(pTaskBackend, dbPath, cfNames, nCf) != 0) {
goto _EXIT;
}
@ -1617,11 +1766,12 @@ STaskBackendWrapper* taskBackendOpen(char* path, char* key) {
rocksdb_list_column_families_destroy(cfNames, nCf);
}
qDebug("succ to init stream backend at %s, backend:%p", taskPath, pTaskBackend);
taosMemoryFree(taskPath);
qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskBackend);
taosMemoryFree(dbPath);
pTaskBackend->idstr = taosStrdup(key);
taosThreadMutexInit(&pTaskBackend->mutex, NULL);
return pTaskBackend;
_EXIT:
@ -1673,7 +1823,10 @@ void taskBackendDestroy(void* pBackend) {
if (wrapper->db) rocksdb_close(wrapper->db);
taskBackendDestroyChkpOpt(pBackend);
taosMemoryFree(wrapper->idstr);
taosMemoryFree(wrapper->path);
taosMemoryFree(wrapper);
return;