Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
This commit is contained in:
commit
a9757637d9
|
@ -100,7 +100,7 @@ typedef struct {
|
||||||
} SStreamQueueItem;
|
} SStreamQueueItem;
|
||||||
|
|
||||||
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||||
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver, int64_t checkpointId);
|
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
|
|
@ -168,7 +168,7 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
|
||||||
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||||
|
|
||||||
// tqStream
|
// tqStream
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId);
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
int32_t tqStreamTasksScanWal(STQ* pTq);
|
int32_t tqStreamTasksScanWal(STQ* pTq);
|
||||||
int32_t tqStreamTasksStatusCheck(STQ* pTq);
|
int32_t tqStreamTasksStatusCheck(STQ* pTq);
|
||||||
|
|
||||||
|
|
|
@ -743,7 +743,7 @@ end:
|
||||||
|
|
||||||
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
|
void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
|
||||||
|
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpointId) {
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
|
||||||
|
|
|
@ -44,7 +44,7 @@ typedef struct {
|
||||||
int64_t defaultCfInit;
|
int64_t defaultCfInit;
|
||||||
} SBackendWrapper;
|
} SBackendWrapper;
|
||||||
|
|
||||||
void* streamBackendInit(const char* path);
|
void* streamBackendInit(const char* path, int64_t chkpId);
|
||||||
void streamBackendCleanup(void* arg);
|
void streamBackendCleanup(void* arg);
|
||||||
void streamBackendHandleCleanup(void* arg);
|
void streamBackendHandleCleanup(void* arg);
|
||||||
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
int32_t streamBackendLoadCheckpointInfo(void* pMeta);
|
||||||
|
|
|
@ -144,10 +144,81 @@ SCfInit ginitDict[] = {
|
||||||
encodeValueFunc, decodeValueFunc},
|
encodeValueFunc, decodeValueFunc},
|
||||||
};
|
};
|
||||||
|
|
||||||
void* streamBackendInit(const char* path) {
|
bool isValidCheckpoint(const char* dir) { return true; }
|
||||||
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
|
|
||||||
|
|
||||||
qDebug("start to init stream backend at %s", path);
|
int32_t copyFiles(const char* src, const char* dst) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// opt later, just hard link
|
||||||
|
int32_t sLen = strlen(src);
|
||||||
|
int32_t dLen = strlen(dst);
|
||||||
|
char* absSrcPath = taosMemoryCalloc(1, sLen + 64);
|
||||||
|
char* absDstPath = taosMemoryCalloc(1, dLen + 64);
|
||||||
|
|
||||||
|
TdDirPtr pDir = taosOpenDir(src);
|
||||||
|
if (pDir == NULL) return 0;
|
||||||
|
|
||||||
|
TdDirEntryPtr de = NULL;
|
||||||
|
|
||||||
|
while ((de = taosReadDir(pDir)) != NULL) {
|
||||||
|
char* name = taosGetDirEntryName(de);
|
||||||
|
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
|
||||||
|
|
||||||
|
sprintf(absSrcPath, "%s/%s", src, name);
|
||||||
|
sprintf(absDstPath, "%s/%s", dst, name);
|
||||||
|
if (!taosDirEntryIsDir(de)) {
|
||||||
|
code = taosCopyFile(absSrcPath, absDstPath);
|
||||||
|
if (code == -1) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(absSrcPath, 0, sLen + 64);
|
||||||
|
memset(absDstPath, 0, dLen + 64);
|
||||||
|
}
|
||||||
|
|
||||||
|
_err:
|
||||||
|
taosMemoryFreeClear(absSrcPath);
|
||||||
|
taosMemoryFreeClear(absDstPath);
|
||||||
|
taosCloseDir(&pDir);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
|
||||||
|
// impl later
|
||||||
|
int32_t code = 0;
|
||||||
|
char* state = taosMemoryCalloc(1, strlen(path) + 32);
|
||||||
|
|
||||||
|
sprintf(state, "%s/%s", path, "state");
|
||||||
|
if (chkpId != 0) {
|
||||||
|
char* chkp = taosMemoryCalloc(1, strlen(path) + 64);
|
||||||
|
sprintf(chkp, "%s/%s/checkpoint-%" PRId64 "", path, "checkpoints", chkpId);
|
||||||
|
if (taosIsDir(chkp) && isValidCheckpoint(chkp)) {
|
||||||
|
if (taosIsDir(state)) {
|
||||||
|
// remove dir if exists
|
||||||
|
// taosRenameFile(const char *oldName, const char *newName)
|
||||||
|
taosRemoveDir(state);
|
||||||
|
}
|
||||||
|
taosMkDir(state);
|
||||||
|
code = copyFiles(chkp, state);
|
||||||
|
if (code != 0) {
|
||||||
|
qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
qError("failed to start stream backend at %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||||
|
taosMkDir(state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*dst = state;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
void* streamBackendInit(const char* streamPath, int64_t chkpId) {
|
||||||
|
char* backendPath = NULL;
|
||||||
|
int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath);
|
||||||
|
|
||||||
|
qDebug("start to init stream backend at %s", backendPath);
|
||||||
|
|
||||||
|
uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20;
|
||||||
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
|
SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper));
|
||||||
pHandle->list = tdListNew(sizeof(SCfComparator));
|
pHandle->list = tdListNew(sizeof(SCfComparator));
|
||||||
taosThreadMutexInit(&pHandle->mutex, NULL);
|
taosThreadMutexInit(&pHandle->mutex, NULL);
|
||||||
|
@ -183,12 +254,12 @@ void* streamBackendInit(const char* path) {
|
||||||
char* err = NULL;
|
char* err = NULL;
|
||||||
size_t nCf = 0;
|
size_t nCf = 0;
|
||||||
|
|
||||||
char** cfs = rocksdb_list_column_families(opts, path, &nCf, &err);
|
char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err);
|
||||||
if (nCf == 0 || nCf == 1 || err != NULL) {
|
if (nCf == 0 || nCf == 1 || err != NULL) {
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
pHandle->db = rocksdb_open(opts, path, &err);
|
pHandle->db = rocksdb_open(opts, backendPath, &err);
|
||||||
if (err != NULL) {
|
if (err != NULL) {
|
||||||
qError("failed to open rocksdb, path:%s, reason:%s", path, err);
|
qError("failed to open rocksdb, path:%s, reason:%s", backendPath, err);
|
||||||
taosMemoryFreeClear(err);
|
taosMemoryFreeClear(err);
|
||||||
goto _EXIT;
|
goto _EXIT;
|
||||||
}
|
}
|
||||||
|
@ -196,12 +267,13 @@ void* streamBackendInit(const char* path) {
|
||||||
/*
|
/*
|
||||||
list all cf and get prefix
|
list all cf and get prefix
|
||||||
*/
|
*/
|
||||||
streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf);
|
streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf);
|
||||||
}
|
}
|
||||||
if (cfs != NULL) {
|
if (cfs != NULL) {
|
||||||
rocksdb_list_column_families_destroy(cfs, nCf);
|
rocksdb_list_column_families_destroy(cfs, nCf);
|
||||||
}
|
}
|
||||||
qDebug("succ to init stream backend at %s, backend:%p", path, pHandle);
|
qDebug("succ to init stream backend at %s, backend:%p", backendPath, pHandle);
|
||||||
|
taosMemoryFreeClear(backendPath);
|
||||||
|
|
||||||
return (void*)pHandle;
|
return (void*)pHandle;
|
||||||
_EXIT:
|
_EXIT:
|
||||||
|
@ -213,8 +285,9 @@ _EXIT:
|
||||||
taosHashCleanup(pHandle->cfInst);
|
taosHashCleanup(pHandle->cfInst);
|
||||||
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
|
rocksdb_compactionfilterfactory_destroy(pHandle->filterFactory);
|
||||||
tdListFree(pHandle->list);
|
tdListFree(pHandle->list);
|
||||||
|
taosMemoryFree(backendPath);
|
||||||
taosMemoryFree(pHandle);
|
taosMemoryFree(pHandle);
|
||||||
qDebug("failed to init stream backend at %s", path);
|
qDebug("failed to init stream backend at %s", backendPath);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
void streamBackendCleanup(void* arg) {
|
void streamBackendCleanup(void* arg) {
|
||||||
|
|
|
@ -23,6 +23,8 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
||||||
int32_t streamBackendId = 0;
|
int32_t streamBackendId = 0;
|
||||||
int32_t streamBackendCfWrapperId = 0;
|
int32_t streamBackendCfWrapperId = 0;
|
||||||
|
|
||||||
|
int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||||
|
|
||||||
static void streamMetaEnvInit() {
|
static void streamMetaEnvInit() {
|
||||||
streamBackendId = taosOpenRef(64, streamBackendCleanup);
|
streamBackendId = taosOpenRef(64, streamBackendCleanup);
|
||||||
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
|
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
|
||||||
|
@ -49,8 +51,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
|
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
memset(streamPath, 0, len);
|
|
||||||
|
|
||||||
|
memset(streamPath, 0, len);
|
||||||
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
|
sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
|
||||||
code = taosMulModeMkDir(streamPath, 0755);
|
code = taosMulModeMkDir(streamPath, 0755);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -88,13 +90,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->ahandle = ahandle;
|
pMeta->ahandle = ahandle;
|
||||||
pMeta->expandFunc = expandFunc;
|
pMeta->expandFunc = expandFunc;
|
||||||
|
|
||||||
memset(streamPath, 0, len);
|
// memset(streamPath, 0, len);
|
||||||
sprintf(streamPath, "%s/%s", pMeta->path, "state");
|
// sprintf(streamPath, "%s/%s", pMeta->path, "state");
|
||||||
code = taosMulModeMkDir(streamPath, 0755);
|
// code = taosMulModeMkDir(streamPath, 0755);
|
||||||
if (code != 0) {
|
// if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
// terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
goto _err;
|
// goto _err;
|
||||||
}
|
// }
|
||||||
|
|
||||||
pMeta->pTaskBackendUnique =
|
pMeta->pTaskBackendUnique =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -103,7 +105,9 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->checkpointCap = 4;
|
pMeta->checkpointCap = 4;
|
||||||
taosInitRWLatch(&pMeta->checkpointDirLock);
|
taosInitRWLatch(&pMeta->checkpointDirLock);
|
||||||
|
|
||||||
pMeta->streamBackend = streamBackendInit(streamPath);
|
int64_t chkpId = streamGetLatestCheckpointId(pMeta);
|
||||||
|
|
||||||
|
pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
|
||||||
if (pMeta->streamBackend == NULL) {
|
if (pMeta->streamBackend == NULL) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -245,7 +249,7 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
|
||||||
|
|
||||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver, checkpointId) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -443,8 +447,6 @@ _err:
|
||||||
return chkpId;
|
return chkpId;
|
||||||
}
|
}
|
||||||
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
int64_t checkpointId = streamGetLatestCheckpointId(pMeta);
|
|
||||||
|
|
||||||
TBC* pCur = NULL;
|
TBC* pCur = NULL;
|
||||||
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -473,7 +475,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
|
||||||
// remove duplicate
|
// remove duplicate
|
||||||
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer, checkpointId) < 0) {
|
if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) {
|
||||||
tdbFree(pKey);
|
tdbFree(pKey);
|
||||||
tdbFree(pVal);
|
tdbFree(pVal);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
|
|
Loading…
Reference in New Issue