support s3

This commit is contained in:
yihaoDeng 2023-11-09 20:15:08 +08:00
parent 757425bff8
commit 5f9b922a04
5 changed files with 46 additions and 10 deletions

View File

@ -513,6 +513,8 @@ typedef struct SStreamMeta {
void* qHandle;
int32_t pauseTaskNum;
void* bkdChkptMgt;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);

View File

@ -250,9 +250,9 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);
SBkdMgt* bkdMgtCreate(char* path);
int32_t bkdMgtAddChkp(SBkdMgt* bm, char* task, char* path);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list);
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* name);
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadPath(void* arg, int64_t chkpId, int8_t type, char** pathkj);
int32_t taskDbGenChkpUploadPath(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** pathkj);
#endif

View File

@ -1717,14 +1717,30 @@ int32_t taskDbGenChkpUplaodPath__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
return code;
}
int32_t taskDbGenChkpUploadPath(void* arg, int64_t chkpId, int8_t type, char** path) {
int32_t taskDbGenChkpUplaodPath__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path) {
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
char* temp = taosMemoryCalloc(1, strlen(pDb->path));
sprintf(temp, "%s%s%s", pDb->path, TD_DIRSEP, "tmp");
if (!taosDirExist(temp)) {
taosMkDir(temp);
}
bkdMgtGetDelta(p, pDb->idstr, chkpId, NULL, temp);
*path = temp;
return 0;
}
int32_t taskDbGenChkpUploadPath(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path) {
STaskDbWrapper* pDb = arg;
UPLOAD_TYPE utype = type;
if (utype == UPLOAD_RSYNC) {
return taskDbGenChkpUplaodPath__rsync(pDb, chkpId, path);
return taskDbGenChkpUplaodPath__rsync(pDb,chkpId, path);
} else if (utype == UPLOAD_S3) {
return 0;
return taskDbGenChkpUplaodPath__s3(pDb,mgt, chkpId, path);
}
return -1;
}
@ -3603,14 +3619,28 @@ void bkdMgtDestroy(SBkdMgt* bm) {
taosMemoryFree(bm);
}
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list) {
int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list, char* dname) {
int32_t code = 0;
taosThreadRwlockWrlock(&bm->rwLock);
taosThreadRwlockWrlock(&bm->rwLock);
SDbChkp* pChkp = taosHashGet(bm->pDbChkpTbl, taskId, strlen(taskId));
code = dbChkpGetDelta(pChkp, chkpId, list);
if (pChkp == NULL) {
char* taskPath = taosMemoryCalloc(1, strlen(bm->path) + 64);
sprintf(taskPath, "%s%s%s", bm->path, TD_DIRSEP, taskId);
SDbChkp* p = dbChkpCreate(taskPath, chkpId);
taosHashPut(bm->pDbChkpTbl, taskId, strlen(taskId), &p, sizeof(void*));
taosMemoryFree(taskPath);
}
taosThreadRwlockUnlock(&bm->rwLock);
code = dbChkpGetDelta(pChkp, chkpId, list);
code = dbChkpDumpTo(pChkp, dname);
return code;
}

View File

@ -340,8 +340,10 @@ int32_t doUploadChkp(void* param) {
SAsyncUploadArg* arg = param;
char* path = NULL;
int32_t code = 0;
if ((code = taskDbGenChkpUploadPath(arg->pTask->pBackend, arg->chkpId, (int8_t)(arg->type), &path)) != 0) {
stError("s-task:%s faile to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
if ((code = taskDbGenChkpUploadPath(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
(int8_t)(arg->type), &path)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
if (code == 0 && uploadCheckpoint(arg->taskId, path) != 0) {
stError("s-task:%s faile to upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);

View File

@ -356,6 +356,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->pHbInfo->stopFlag = 0;
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
pMeta->bkdChkptMgt = bkdMgtCreate(tpath);
return pMeta;
_err: