refactor code

This commit is contained in:
yihaoDeng 2023-08-28 18:52:10 +08:00
parent 1687f68156
commit a2901411b0
4 changed files with 53 additions and 20 deletions

View File

@ -138,5 +138,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
int32_t streamBackendAddInUseChkpPos(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif

View File

@ -671,13 +671,13 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) {
SArray* chkpDel = taosArrayInit(10, sizeof(int64_t));
SArray* chkpDup = taosArrayInit(10, sizeof(int64_t));
int64_t minId = 0;
int64_t firsId = 0;
if (taosArrayGetSize(pMeta->chkpInUse) >= 1) {
minId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
firsId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i);
if (id >= minId) {
if (id >= firsId) {
taosArrayPush(chkpDup, &id);
} else {
taosArrayPush(chkpDel, &id);
@ -722,6 +722,7 @@ static int32_t compareCheckpoint(const void* a, const void* b) {
int32_t streamBackendLoadCheckpointInfo(void* arg) {
SStreamMeta* pMeta = arg;
int32_t code = 0;
SArray* suffix = NULL;
int32_t len = strlen(pMeta->path) + 30;
char* chkpPath = taosMemoryCalloc(1, len);
@ -732,17 +733,15 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
taosMemoryFree(chkpPath);
return 0;
}
taosArrayClear(pMeta->chkpSaved);
TdDirPtr pDir = taosOpenDir(chkpPath);
if (pDir == NULL) {
taosMemoryFree(chkpPath);
return 0;
}
TdDirEntryPtr de = NULL;
SArray* suffix = taosArrayInit(4, sizeof(int64_t));
suffix = taosArrayInit(4, sizeof(int64_t));
while ((de = taosReadDir(pDir)) != NULL) {
if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue;
@ -760,7 +759,8 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
}
}
taosArraySort(suffix, compareCheckpoint);
// free previous chkpSaved
taosArrayClear(pMeta->chkpSaved);
for (int i = 0; i < taosArrayGetSize(suffix); i++) {
int64_t id = *(int64_t*)taosArrayGet(suffix, i);
taosArrayPush(pMeta->chkpSaved, &id);
@ -909,6 +909,29 @@ _ERROR:
taosArrayDestroy(refs);
return code;
}
int32_t streamBackendAddInUseChkpPos(void* arg, int64_t chkpId) {
if (arg == NULL) return 0;
SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->chkpInUse, &chkpId);
taosWUnLockLatch(&pMeta->chkpDirLock);
return 0;
}
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
if (arg == NULL) return 0;
SStreamMeta* pMeta = arg;
taosWLockLatch(&pMeta->chkpDirLock);
if (taosArrayGetSize(pMeta->chkpInUse) > 0) {
int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0);
if (id == chkpId) {
taosArrayPopFrontBatch(pMeta->chkpInUse, 1);
}
}
taosWUnLockLatch(&pMeta->chkpDirLock);
return 0;
}
int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
SStreamMeta* pMeta = arg;

View File

@ -169,10 +169,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->chkpCap = 8;
taosInitRWLatch(&pMeta->chkpDirLock);
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->chkpId = chkpId;
pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) {
taosMsleep(2 * 1000);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
@ -181,18 +179,15 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
}
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
code = streamBackendLoadCheckpointInfo(pMeta);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, chkpId, stage);
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
stage);
return pMeta;
_err:
@ -202,6 +197,10 @@ _err:
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
// taosThreadMutexDestroy(&pMeta->backendMutex);
// taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta);
qError("failed to open stream meta");

View File

@ -116,19 +116,21 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
int32_t code = 0;
int8_t chkpFlag = 0;
int8_t validChkp = 0;
if (chkpId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP,
chkpId);
if (taosIsDir(tdir)) {
chkpFlag = 1;
validChkp = 1;
qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir);
streamBackendAddInUseChkpPos(pMeta, chkpId);
} else {
qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir);
}
}
if (chkpFlag == 0) {
// no checkpoint specified or not exists invalid checkpoint, do checkpoint at default path and translate it
if (validChkp == 0) {
sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state");
char* chkpdir = taosMemoryCalloc(1, len + 256);
sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp");
@ -143,6 +145,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
taosMemoryFree(tdir);
return code;
}
chkpId = 0;
}
qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir);
@ -252,6 +255,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
pHandle->pFileList = list;
pHandle->seraial = 0;
pHandle->offset = 0;
pHandle->handle = pMeta;
return 0;
_err:
streamSnapHandleDestroy(pHandle);
@ -265,9 +269,12 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
SBanckendFile* pFile = handle->pBackendFile;
if (handle->checkpointId == 0) {
// del tmp dir
if (taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path);
}
} else {
streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
}
if (pFile) {
taosMemoryFree(pFile->pCheckpointMeta);