support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-10 01:49:26 +00:00
parent a20b299f9b
commit ef247cdb1d
4 changed files with 58 additions and 58 deletions

View File

@ -369,11 +369,11 @@ typedef struct SStreamMeta {
int32_t chkptNotReadyTasks;
int64_t checkpointId;
SArray* checkpointSaved;
SArray* checkpointInUse;
int32_t checkpointCap;
SRWLatch checkpointDirLock;
int64_t chkpId;
SArray* chkpSaved;
SArray* chkpInUse;
int32_t chkpCap;
SRWLatch chkpDirLock;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);

View File

@ -47,11 +47,11 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
pReader->sver = sver;
pReader->ever = ever;
int64_t checkpointId = meta ? meta->checkpointId : 0;
int64_t chkpId = meta ? meta->chkpId : 0;
SStreamSnapReader* pSnapReader = NULL;
if (streamSnapReaderOpen(pTq, sver, checkpointId, pTq->path, &pSnapReader) == 0) {
if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) {
pReader->complete = 1;
} else {
code = -1;

View File

@ -372,8 +372,8 @@ int32_t copyFiles(const char* src, const char* dst) {
// opt later, just hard link
int32_t sLen = strlen(src);
int32_t dLen = strlen(dst);
char* absSrcPath = taosMemoryCalloc(1, sLen + 64);
char* absDstPath = taosMemoryCalloc(1, dLen + 64);
char* srcName = taosMemoryCalloc(1, sLen + 64);
char* dstName = taosMemoryCalloc(1, dLen + 64);
TdDirPtr pDir = taosOpenDir(src);
if (pDir == NULL) return 0;
@ -383,22 +383,22 @@ int32_t copyFiles(const char* src, const char* dst) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name);
sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name);
sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
if (!taosDirEntryIsDir(de)) {
code = taosCopyFile(absSrcPath, absDstPath);
code = taosCopyFile(srcName, dstName);
if (code == -1) {
goto _err;
}
}
memset(absSrcPath, 0, sLen + 64);
memset(absDstPath, 0, dLen + 64);
memset(srcName, 0, sLen + 64);
memset(dstName, 0, dLen + 64);
}
_err:
taosMemoryFreeClear(absSrcPath);
taosMemoryFreeClear(absDstPath);
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
return code >= 0 ? 0 : -1;
}
@ -626,75 +626,75 @@ void streamBackendHandleCleanup(void* arg) {
int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) {
SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->checkpointDirLock);
taosWLockLatch(&pMeta->chkpDirLock);
int64_t tc = 0;
int32_t sz = taosArrayGetSize(pMeta->checkpointSaved);
int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
if (sz <= 0) {
taosWUnLockLatch(&pMeta->checkpointDirLock);
taosWUnLockLatch(&pMeta->chkpDirLock);
return -1;
} else {
tc = *(int64_t*)taosArrayGetLast(pMeta->checkpointSaved);
tc = *(int64_t*)taosArrayGetLast(pMeta->chkpSaved);
}
taosArrayPush(pMeta->checkpointInUse, &tc);
taosArrayPush(pMeta->chkpInUse, &tc);
*checkpoint = tc;
taosWUnLockLatch(&pMeta->checkpointDirLock);
taosWUnLockLatch(&pMeta->chkpDirLock);
return 0;
}
/*
* checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--|
* checkpointInUse: |--cp2--|--cp4--|
* checkpointInUse is doing translation, cannot del until
* chkpInUse: |--cp2--|--cp4--|
* chkpInUse is doing translation, cannot del until
* replication is finished
*/
int32_t delObsoleteCheckpoint(void* arg, const char* path) {
SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->checkpointDirLock);
taosWLockLatch(&pMeta->chkpDirLock);
SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t));
SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t));
SArray* chkpDel = taosArrayInit(10, sizeof(int64_t));
SArray* chkpDup = taosArrayInit(10, sizeof(int64_t));
int64_t minId = 0;
if (taosArrayGetSize(pMeta->checkpointInUse) >= 1) {
minId = *(int64_t*)taosArrayGet(pMeta->checkpointInUse, 0);
if (taosArrayGetSize(pMeta->chkpInUse) >= 1) {
minId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
for (int i = 0; i < taosArrayGetSize(pMeta->checkpointSaved); i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i);
for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
if (id >= minId) {
taosArrayPush(checkpointDup, &id);
taosArrayPush(chkpDup, &id);
} else {
taosArrayPush(checkpointDel, &id);
taosArrayPush(chkpDel, &id);
}
}
} else {
int32_t sz = taosArrayGetSize(pMeta->checkpointSaved);
int32_t dsz = sz - pMeta->checkpointCap; // del size
int32_t sz = taosArrayGetSize(pMeta->chkpSaved);
int32_t dsz = sz - pMeta->chkpCap; // del size
for (int i = 0; i < dsz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i);
taosArrayPush(checkpointDel, &id);
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
taosArrayPush(chkpDel, &id);
}
for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i);
taosArrayPush(checkpointDup, &id);
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
taosArrayPush(chkpDup, &id);
}
}
taosArrayDestroy(pMeta->checkpointSaved);
pMeta->checkpointSaved = checkpointDup;
taosArrayDestroy(pMeta->chkpSaved);
pMeta->chkpSaved = chkpDup;
taosWUnLockLatch(&pMeta->checkpointDirLock);
taosWUnLockLatch(&pMeta->chkpDirLock);
for (int i = 0; i < taosArrayGetSize(checkpointDel); i++) {
int64_t id = *(int64_t*)taosArrayGet(checkpointDel, i);
for (int i = 0; i < taosArrayGetSize(chkpDel); i++) {
int64_t id = *(int64_t*)taosArrayGet(chkpDel, i);
char tbuf[256] = {0};
sprintf(tbuf, "%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, id);
if (taosIsDir(tbuf)) {
taosRemoveDir(tbuf);
}
}
taosArrayDestroy(checkpointDel);
taosArrayDestroy(chkpDel);
return 0;
}
@ -742,7 +742,7 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
int64_t id = *(int64_t*)taosArrayGet(suffix, i);
taosArrayPush(pMeta->checkpointSaved, &id);
taosArrayPush(pMeta->chkpSaved, &id);
}
taosArrayDestroy(suffix);
@ -794,9 +794,9 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
}
rocksdb_checkpoint_object_destroy(cp);
}
taosWLockLatch(&pMeta->checkpointDirLock);
taosArrayPush(pMeta->checkpointSaved, &checkpointId);
taosWUnLockLatch(&pMeta->checkpointDirLock);
taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->chkpSaved, &checkpointId);
taosWUnLockLatch(&pMeta->chkpDirLock);
delObsoleteCheckpoint(arg, path);

View File

@ -91,12 +91,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->checkpointSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->checkpointCap = 8;
taosInitRWLatch(&pMeta->checkpointDirLock);
pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpCap = 8;
taosInitRWLatch(&pMeta->chkpDirLock);
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->chkpId = chkpId;
pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
if (pMeta->streamBackend == NULL) {
@ -109,7 +110,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
@ -182,9 +182,9 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
taosHashClear(pMeta->pTaskBackendUnique);
taosArrayClear(pMeta->checkpointSaved);
taosArrayClear(pMeta->chkpSaved);
taosArrayClear(pMeta->checkpointInUse);
taosArrayClear(pMeta->chkpInUse);
return 0;
}
@ -222,8 +222,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosThreadMutexDestroy(&pMeta->backendMutex);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosArrayDestroy(pMeta->checkpointSaved);
taosArrayDestroy(pMeta->checkpointInUse);
taosArrayDestroy(pMeta->chkpSaved);
taosArrayDestroy(pMeta->chkpInUse);
taosMemoryFree(pMeta);
}