refactor backend

This commit is contained in:
yihaoDeng 2023-10-12 20:16:41 +08:00
parent 861d26e356
commit a56d831a14
8 changed files with 120 additions and 46 deletions

View File

@ -410,7 +410,7 @@ typedef struct SStreamMeta {
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
SHashObj* pTaskDbUnique;
TdThreadMutex backendMutex;
SMetaHbInfo hbInfo;
int32_t closedTask;
@ -727,7 +727,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref);
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t chkpId, int64_t* ref);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -738,7 +738,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
if (code != TSDB_CODE_SUCCESS) return code;
pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr, &pTask->backendRefId);
pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr, pTask->checkpointingId,
&pTask->backendRefId);
// taskDbUpdateChkpId(pTask->pBackend, pTask->checkpointingId);
if (pTask->pBackend == NULL) return -1;
streamTaskOpenAllUpstreamInput(pTask);

View File

@ -83,12 +83,14 @@ int32_t streamBackendLoadCheckpointInfo(void* pMeta);
int32_t streamBackendDoCheckpoint(void* pMeta, int64_t checkpointId);
SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateConvertDataFormat(char* path, char* key, void* cfInst);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(char* path, char* key);
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
void taskDbDestroy(void* pBackend);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId);
void* taskDbAddRef(void* pTaskDb);
void taskDbRemoveRef(void* pTaskDb);
@ -182,7 +184,7 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst);
int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId);
int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg, int64_t chkpId);
int32_t taskDbBuildSnap(void* arg);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif

View File

@ -450,6 +450,60 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
return 0;
}
int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** dbPrefixPath, char** dbPath) {
// impl later
int32_t code = 0;
/*param@1: checkpointId dir
param@2: state
copy pChkpIdDir's file to state dir
opt to set hard link to previous file
*/
char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128);
sprintf(prefixPath, "%s%s%s", path, TD_DIRSEP, key);
if (!taosIsDir(prefixPath)) {
code = taosMkDir(prefixPath);
ASSERT(code == 0);
}
char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "defaultPath");
if (!taosIsDir(defaultPath)) {
taosMulMkDir(defaultPath);
}
if (chkpId != 0) {
char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(chkpPath, "%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) {
if (taosIsDir(defaultPath)) {
// remove dir if exists
// taosRenameFile(const char *oldName, const char *newName)
taosRemoveDir(defaultPath);
}
taosMkDir(defaultPath);
code = copyFiles(chkpPath, defaultPath);
if (code != 0) {
qError("failed to restart stream backend from %s, reason: %s", chkpPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {
qInfo("start to restart stream backend at checkpoint path: %s", chkpPath);
}
} else {
qError("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
taosMkDir(defaultPath);
}
taosMemoryFree(chkpPath);
}
*dbPath = defaultPath;
*dbPrefixPath = prefixPath;
return 0;
}
void* streamBackendInit(const char* streamPath, int64_t chkpId) {
char* backendPath = NULL;
@ -829,13 +883,13 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) {
int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) {
return 0;
// SArray* pHandle = taosArrayInit(16, POINTER_BYTES);
// void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
// void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
// while (pIter) {
// int64_t id = *(int64_t*)pIter;
// SBackendCfWrapper* wrapper = taosAcquireRef(streamBackendCfWrapperId, id);
// if (wrapper == NULL) {
// pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
// pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
// continue;
// }
@ -940,7 +994,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
return code;
}
sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId);
sprintf(pChkpIdDir, "%s%s%s%" PRId64, pChkpDir, TD_DIRSEP, "checkpoint", chkpId);
if (taosIsDir(pChkpIdDir)) {
qInfo("stream rm exist checkpoint%s", pChkpIdDir);
taosRemoveFile(pChkpIdDir);
@ -950,19 +1004,19 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
return 0;
}
int32_t taskDbBuildSnap(void* arg, int64_t chkpId) {
int32_t taskDbBuildSnap(void* arg) {
SStreamMeta* pMeta = arg;
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL);
int32_t code = 0;
while (pIter) {
STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter;
taskDbAddRef(pTaskDb);
code = taskDbDoCheckpoint((STaskDbWrapper*)pTaskDb, chkpId);
code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId);
taskDbRemoveRef(pTaskDb);
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter);
}
return 0;
}
@ -1070,6 +1124,8 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
code = chkpMayDelObsolete(pTaskDb, chkpId, pChkpDir);
pTaskDb->chkpId = chkpId;
_EXIT:
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
@ -1692,14 +1748,20 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta
*stateFullPath = statePath;
return 0;
}
STaskDbWrapper* taskDbOpen(char* path, char* key) {
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
STaskDbWrapper* p = pTaskDb;
taosThreadMutexLock(&p->mutex);
p->chkpId = chkpId;
taosThreadMutexUnlock(&p->mutex);
}
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) {
char* statePath = NULL;
char* err = NULL;
char* dbPath = NULL;
char** cfNames = NULL;
size_t nCf = 0;
if (taskDbBuildFullPath(path, key, &dbPath, &statePath) != 0) {
if (rebuildDirFromChkp2(path, key, chkpId, &statePath, &dbPath) != 0) {
return NULL;
}
@ -1861,12 +1923,12 @@ _EXIT:
return code;
}
int32_t streamStateConvertDataFormat(char* path, char* key, void* pCfInst) {
int32_t streamStateCvtDataFormat(char* path, char* key, void* pCfInst) {
int nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
int32_t code = 0;
STaskDbWrapper* pTaskDb = taskDbOpen(path, key);
STaskDbWrapper* pTaskDb = taskDbOpen(path, key, 0);
RocksdbCfInst* pSrcBackend = pCfInst;
for (int i = 0; i < nCf; i++) {

View File

@ -182,7 +182,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
return ret;
}
int32_t streamMetaConvertBackendFormat(SStreamMeta* pMeta) {
int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
int32_t code = 0;
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId);
@ -190,7 +190,7 @@ int32_t streamMetaConvertBackendFormat(SStreamMeta* pMeta) {
void* pIter = taosHashIterate(pBackend->cfInst, NULL);
while (pIter) {
void* key = taosHashGetKey(pIter, NULL);
code = streamStateConvertDataFormat(pMeta->path, key, *(void**)pIter);
code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
if (code != 0) {
qError("failed to cvt data");
goto _EXIT;
@ -210,7 +210,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
qInfo("stream state need covert backend format");
return streamMetaConvertBackendFormat(pMeta);
return streamMetaCvtDbFormat(pMeta);
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
qError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
@ -222,9 +222,9 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
return 0;
}
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref) {
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t chkpId, int64_t* ref) {
taosThreadMutexLock(&pMeta->backendMutex);
void** ppBackend = taosHashGet(pMeta->pTaskBackendUnique, key, strlen(key));
void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key));
if (ppBackend != NULL && *ppBackend != NULL) {
taskDbAddRef(*ppBackend);
*ref = ((STaskDbWrapper*)*ppBackend)->refId;
@ -232,7 +232,7 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref)
return *ppBackend;
}
void* pBackend = taskDbOpen(pMeta->path, key);
void* pBackend = taskDbOpen(pMeta->path, key, chkpId);
if (pBackend == NULL) {
taosThreadMutexUnlock(&pMeta->backendMutex);
return NULL;
@ -240,7 +240,7 @@ void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref)
*ref = taosAddRef(taskDbWrapperId, pBackend);
taosHashPut(pMeta->pTaskBackendUnique, key, strlen(key), &pBackend, sizeof(void*));
taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
taosThreadMutexUnlock(&pMeta->backendMutex);
return pBackend;
}
@ -301,8 +301,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->hbInfo.tickCounter = 0;
pMeta->hbInfo.stopFlag = 0;
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
// start backend
// taosInitRWLatch(&pMeta->chkpDirLock);
@ -411,7 +410,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasks);
taosHashClear(pMeta->pTaskBackendUnique);
taosHashClear(pMeta->pTaskDbUnique);
taosArrayClear(pMeta->pTaskList);
taosArrayClear(pMeta->chkpSaved);
@ -452,7 +451,7 @@ void streamMetaCloseImpl(void* arg) {
taosArrayDestroy(pMeta->chkpInUse);
taosHashCleanup(pMeta->pTasks);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->pTaskDbUnique);
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);

View File

@ -110,7 +110,7 @@ const char* ROCKSDB_CURRENT = "CURRENT";
const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT";
static int64_t kBlockSize = 64 * 1024;
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId, void* pMeta);
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta);
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
// static void streamBuildFname(char* path, char* file, char* fullname)
@ -139,7 +139,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
return taosOpenFile(fullname, opt);
}
int32_t streamBackendGetSnapInfo(void* arg, char* path, int64_t chkpId) { return taskDbBuildSnap(arg, chkpId); }
int32_t streamTaskDbGetSnapInfo(void* arg, char* path) { return taskDbBuildSnap(arg); }
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
if (qDebugFlag & DEBUG_DEBUG) {
@ -235,20 +235,23 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
taosCloseDir(&pDir);
return 0;
}
int32_t streamBackendSnapInitFile(char* path, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) {
// SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile));
int32_t code = -1;
char* snapPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(snapPath, "%s%s%" PRId64 "_%" PRId64 "%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, pSnap->streamId,
pSnap->taskId, TD_DIRSEP, "state", TD_DIRSEP, "checkpoints", TD_DIRSEP, pSnap->chkpId);
if (taosIsDir(snapPath)) {
char* path = taosMemoryCalloc(1, strlen(metaPath) + 256);
char idstr[64] = {0};
sprintf(idstr, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)(pSnap->taskId));
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", pSnap->chkpId);
if (taosIsDir(path)) {
goto _ERROR;
}
pSnapFile->pSst = taosArrayInit(16, sizeof(void*));
pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem));
pSnapFile->path = snapPath;
pSnapFile->path = path;
pSnapFile->snapInfo = *pSnap;
if ((code = snapFileReadMeta(pSnapFile)) != 0) {
goto _ERROR;
@ -262,7 +265,7 @@ int32_t streamBackendSnapInitFile(char* path, SStreamTaskSnap* pSnap, SBackendSn
code = 0;
_ERROR:
taosMemoryFree(snapPath);
taosMemoryFree(path);
return code;
}
void snapFileDestroy(SBackendSnapFile2* pSnap) {
@ -288,11 +291,11 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) {
return;
}
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId, void* pMeta) {
int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) {
// impl later
SArray* pSnapSet = NULL;
int32_t code = streamBackendGetSnapInfo(pMeta, path, chkpId);
int32_t code = streamTaskDbGetSnapInfo(pMeta, path);
if (code != 0) {
return -1;
}
@ -339,7 +342,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa
return TSDB_CODE_OUT_OF_MEMORY;
}
if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId, pMeta) < 0) {
if (streamSnapHandleInit(&pReader->handle, (char*)path, pMeta) < 0) {
taosMemoryFree(pReader);
return -1;
}
@ -531,9 +534,16 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
if (pDbSnapFile->inited == 0) {
char idstr[64] = {0};
sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId));
char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256);
sprintf(path, "%s%s%" PRId64 "_%" PRId64 "%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, snapInfo.streamId,
snapInfo.taskId, TD_DIRSEP, "state", TD_DIRSEP, "checkpoints", TD_DIRSEP, snapInfo.chkpId);
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", path, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
"checkpoint", snapInfo.chkpId);
if (!taosIsDir(path)) {
code = taosMulMkDir(path);
ASSERT(code == 0);
}
pDbSnapFile->path = path;
pDbSnapFile->snapInfo = snapInfo;

View File

@ -116,7 +116,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
// taosWLockLatch(&pMeta->lock);
// taosThreadMutexLock(&pMeta->backendMutex);
// void* uniqueId =
// taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
// taosHashGet(pMeta->pTaskDbUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
// if (uniqueId == NULL) {
// int code = streamStateOpenBackend(pMeta->streamBackend, pState);
// if (code == -1) {
@ -124,7 +124,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
// taosMemoryFree(pState);
// return NULL;
// }
// taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1,
// taosHashPut(pMeta->pTaskDbUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1,
// &pState->pTdbState->backendCfWrapperId, sizeof(pState->pTdbState->backendCfWrapperId));
// } else {
// int64_t id = *(int64_t*)uniqueId;

View File

@ -392,8 +392,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
taosThreadMutexInit(&pTask->lock, NULL);
streamTaskOpenAllUpstreamInput(pTask);
// pTask->pBackend = taskDbOpen(pMeta->path, (char*)pTask->id.idStr);
return TSDB_CODE_SUCCESS;
}