upload by rsync

This commit is contained in:
yihaoDeng 2023-11-09 17:39:34 +08:00
parent f74edc7618
commit dd2252598e
3 changed files with 84 additions and 0 deletions

View File

@ -17,6 +17,7 @@
#define _STREAM_BACKEDN_ROCKSDB_H_
#include "rocksdb/c.h"
//#include "streamInt.h"
#include "streamState.h"
#include "tcoding.h"
#include "tcommon.h"
@ -252,4 +253,6 @@ int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list);
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadPath(void* arg, int64_t chkpId, int8_t type, char** pathkj);
#endif

View File

@ -1691,6 +1691,44 @@ void taskDbDestroy(void* pDb) {
return;
}
int32_t taskDbGenChkpUplaodPath__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
int64_t st = taosGetTimestampMs();
int32_t code = -1;
int64_t refId = pDb->refId;
if (taosAcquireRef(taskDbWrapperId, refId) == NULL) {
return -1;
}
char* pChkpDir = NULL;
char* pChkpIdDir = NULL;
if (chkpPreBuildDir(pDb->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) {
code = -1;
}
if (taosIsDir(pChkpIdDir) && isValidCheckpoint(pChkpIdDir)) {
code = 0;
*path = pChkpIdDir;
pChkpIdDir = NULL;
}
taosMemoryFree(pChkpDir);
taosMemoryFree(pChkpIdDir);
taosReleaseRef(taskDbWrapperId, refId);
return code;
}
int32_t taskDbGenChkpUploadPath(void* arg, int64_t chkpId, int8_t type, char** path) {
STaskDbWrapper* pDb = arg;
UPLOAD_TYPE utype = type;
if (utype == UPLOAD_RSYNC) {
return taskDbGenChkpUplaodPath__rsync(pDb, chkpId, path);
} else if (utype == UPLOAD_S3) {
return 0;
}
return -1;
}
int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) {
int32_t code = 0;
char* err = NULL;

View File

@ -15,8 +15,16 @@
#include "cos.h"
#include "rsync.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
typedef struct {
UPLOAD_TYPE type;
char* taskId;
int64_t chkpId;
SStreamTask* pTask;
} SAsyncUploadArg;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -328,6 +336,36 @@ void streamTaskSetFailedId(SStreamTask* pTask) {
pTask->chkInfo.checkpointId = pTask->chkInfo.checkpointingId;
}
int32_t doUploadChkp(void* param) {
SAsyncUploadArg* arg = param;
char* path = NULL;
int32_t code = 0;
if ((code = taskDbGenChkpUploadPath(arg->pTask->pBackend, arg->chkpId, (int8_t)(arg->type), &path)) != 0) {
stError("s-task:%s faile to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) {
stError("s-task:%s faile to upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
taosMemoryFree(path);
taosMemoryFree(arg->taskId);
taosMemoryFree(arg);
return 0;
}
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
// async upload
UPLOAD_TYPE type = getUploadType();
if (type == UPLOAD_DISABLE) {
return 0;
}
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
arg->type = type;
arg->taskId = taosStrdup(taskId);
arg->chkpId = chkpId;
arg->pTask = pTask;
return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL);
}
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t startTs = pTask->chkInfo.startTs;
@ -363,6 +401,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId,
tstrerror(terrno));
} else {
code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr);
if (code != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", pTask->id.idStr, ckId);
}
}
}