refactor backend

This commit is contained in:
yihaoDeng 2023-09-20 12:53:58 +00:00
parent 13e8a79294
commit ee8c09667e
1 changed files with 100 additions and 35 deletions

View File

@ -103,6 +103,76 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
return 0;
}
typedef struct {
int64_t chkpId;
char* path;
char* taskId;
SArray* pChkpSave;
SArray* pChkpInUse;
int8_t chkpCap;
void* backend;
} StreamMetaTaskState;
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
return -1;
// goto _err;
}
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
return -1;
}
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
return -1;
}
return 0;
}
//
// impl later
//
enum STREAM_STATE_VER {
STREAM_STATA_NO_COMPATIBLE,
STREAM_STATA_COMPATIBLE,
STREAM_STATA_NEED_CONVERT,
};
int32_t streamMetaCheckStateVer(SStreamMeta* pMeta) {
TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
// no task info, no stream
return STREAM_STATA_COMPATIBLE;
}
void* pKey = NULL;
int32_t kLen = 0;
void* pVal = NULL;
int32_t vLen = 0;
SDecoder decoder;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (pVal == NULL || vLen == 0) {
break;
}
SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
continue;
}
tDecoderClear(&decoder);
}
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return STREAM_STATA_NEED_CONVERT;
}
int32_t streamMetaDoStateDataConvert(SStreamMeta* pMeta) { return 0; }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) {
int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@ -118,15 +188,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
pMeta->path = tpath;
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
if (streamMetaOpenTdb(pMeta) < 0) {
goto _err;
}
if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) {
goto _err;
}
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
if (streamMetaDoStateDataConvert(pMeta) < 0) {
goto _err;
}
@ -134,8 +200,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err;
}
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) {
goto _err;
}
@ -153,41 +218,43 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc;
pMeta->stage = stage;
// send heartbeat every 5sec.
pMeta->rid = taosAddRef(streamMetaId, pMeta);
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
pMeta->rid = taosAddRef(streamMetaId, pMeta);
*pRid = pMeta->rid;
metaRefMgtAdd(pMeta->vgId, pRid);
// send heartbeat every 5sec.
// TODO: start hb later
pMeta->hbInfo.hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->hbInfo.tickCounter = 0;
pMeta->hbInfo.stopFlag = 0;
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
pMeta->chkpCap = 8;
taosInitRWLatch(&pMeta->chkpDirLock);
// start backend
// taosInitRWLatch(&pMeta->chkpDirLock);
pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) {
taosMsleep(2 * 1000);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) {
qError("vgId:%d failed to init stream backend", pMeta->vgId);
qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
}
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
// pMeta->pTaskBackendUnique =
// taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
// pMeta->chkpSaved = taosArrayInit(4, sizeof(int64_t));
// pMeta->chkpInUse = taosArrayInit(4, sizeof(int64_t));
// pMeta->chkpCap = 8;
code = streamBackendLoadCheckpointInfo(pMeta);
// pMeta->chkpId = streamGetLatestCheckpointId(pMeta);
// pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
// while (pMeta->streamBackend == NULL) {
// qError("vgId:%d failed to init stream backend", pMeta->vgId);
// taosMsleep(2 * 1000);
// qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
// pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
// if (pMeta->streamBackend == NULL) {
// }
// }
// pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
// code = streamBackendLoadCheckpointInfo(pMeta);
// taosThreadMutexInit(&pMeta->backendMutex, NULL);
taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
pMeta->pauseTaskNum = 0;
qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId,
@ -241,13 +308,11 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
while (pMeta->streamBackend == NULL) {
qError("vgId:%d failed to init stream backend", pMeta->vgId);
taosMsleep(2 * 1000);
qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId);
if (pMeta->streamBackend == NULL) {
qError("vgId:%d failed to init stream backend", pMeta->vgId);
qInfo("vgId:%d retry to init stream backend", pMeta->vgId);
// return -1;
}
}
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
streamBackendLoadCheckpointInfo(pMeta);