fix stream state transfer

This commit is contained in:
yihaoDeng 2023-07-20 03:49:32 +00:00
parent 733db123ad
commit 6e5be0d30e
5 changed files with 24 additions and 7 deletions

View File

@ -368,6 +368,8 @@ typedef struct SStreamMeta {
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
int64_t checkpointId;
SArray* checkpointSaved; SArray* checkpointSaved;
SArray* checkpointInUse; SArray* checkpointInUse;
int32_t checkpointCap; int32_t checkpointCap;

View File

@ -41,13 +41,25 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
SStreamMeta* meta = pTq->pStreamMeta;
pReader->pTq = pTq; pReader->pTq = pTq;
pReader->sver = sver; pReader->sver = sver;
pReader->ever = ever; pReader->ever = ever;
int64_t checkpointId = meta ? meta->checkpointId : 0;
SStreamSnapReader* pSnapReader = NULL; SStreamSnapReader* pSnapReader = NULL;
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints");
if (streamSnapReaderOpen(pTq, sver, ever, tdir, &pSnapReader) == 0) { // restore from checkpoint if checkpointid != 0
if (checkpointId != 0) {
sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints",
TD_DIRSEP, checkpointId);
} else {
sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "state");
}
if (streamSnapReaderOpen(pTq, sver, checkpointId, tdir, &pSnapReader) == 0) {
pReader->complete = 1; pReader->complete = 1;
} else { } else {
code = -1; code = -1;

View File

@ -163,8 +163,8 @@ int32_t copyFiles(const char* src, const char* dst) {
char* name = taosGetDirEntryName(de); char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
sprintf(absSrcPath, "%s/%s", src, name); sprintf(absSrcPath, "%s%s%s", src, TD_DIRSEP, name);
sprintf(absDstPath, "%s/%s", dst, name); sprintf(absDstPath, "%s%s%s", dst, TD_DIRSEP, name);
if (!taosDirEntryIsDir(de)) { if (!taosDirEntryIsDir(de)) {
code = taosCopyFile(absSrcPath, absDstPath); code = taosCopyFile(absSrcPath, absDstPath);
if (code == -1) { if (code == -1) {

View File

@ -64,6 +64,7 @@ struct SStreamSnapReader {
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
SStreamSnapHandle handle; SStreamSnapHandle handle;
int64_t checkpointId;
}; };
struct SStreamSnapWriter { struct SStreamSnapWriter {
void* pMeta; void* pMeta;
@ -217,6 +218,7 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path
if (pReader == NULL) { if (pReader == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pReader->handle.checkpointId = ever;
// const char* path = NULL; // const char* path = NULL;
if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) {
taosMemoryFree(pReader); taosMemoryFree(pReader);

View File

@ -108,7 +108,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
SStreamTask* pStreamTask = pTask; SStreamTask* pStreamTask = pTask;
char statePath[1024]; char statePath[1024];
if (!specPath) { if (!specPath) {
sprintf(statePath, "%s/%d", path, pStreamTask->id.taskId); sprintf(statePath, "%s%s%d", path, TD_DIRSEP, pStreamTask->id.taskId);
} else { } else {
memset(statePath, 0, 1024); memset(statePath, 0, 1024);
tstrncpy(statePath, path, 1024); tstrncpy(statePath, path, 1024);
@ -762,7 +762,8 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey,key->win.ekey, key->groupId); qDebug("===stream===delete skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey,
key->groupId);
return streamStateSessionDel_rocksdb(pState, key); return streamStateSessionDel_rocksdb(pState, key);
#else #else
SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; SStateSessionKey sKey = {.key = *key, .opNum = pState->number};