分片上传开发完成
Former-commit-id: fb304e3e7d3b5321b18197be1fba6d2a36acdbd2
This commit is contained in:
parent
b7e7b8cd09
commit
b3a0c91d5e
|
@ -181,7 +181,9 @@ service pcm {
|
||||||
@handler uploadImageHandler
|
@handler uploadImageHandler
|
||||||
post /image/upload () returns ()
|
post /image/upload () returns ()
|
||||||
|
|
||||||
|
@handler chunkImageHandler
|
||||||
|
post /image/chunk () returns ()
|
||||||
|
|
||||||
@handler imageListHandler
|
@handler imageListHandler
|
||||||
get /image/list () returns (imageListResp)
|
get /image/list () returns (imageListResp)
|
||||||
|
|
||||||
}
|
}
|
|
@ -0,0 +1,104 @@
|
||||||
|
package image
|
||||||
|
|
||||||
|
import (
|
||||||
|
result2 "PCM/common/result"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"path/filepath"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"PCM/adaptor/PCM-CORE/api/internal/logic/image"
|
||||||
|
"PCM/adaptor/PCM-CORE/api/internal/svc"
|
||||||
|
)
|
||||||
|
|
||||||
|
var dir, _ = os.Getwd()
|
||||||
|
var windowsUploadPath = strings.ReplaceAll(path.Join(dir, "uploads"), "/", "\\")
|
||||||
|
var linuxUploadPath = path.Join(dir, "uploads")
|
||||||
|
|
||||||
|
func ChunkImageHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
size, _ := strconv.ParseInt(r.PostFormValue("size"), 10, 64)
|
||||||
|
hash := r.PostFormValue("hash")
|
||||||
|
name := r.PostFormValue("name")
|
||||||
|
|
||||||
|
toSize, _ := getDirSize(path.Join(linuxUploadTempPath, hash))
|
||||||
|
if size != toSize {
|
||||||
|
fmt.Fprintf(w, "文件上传错误")
|
||||||
|
}
|
||||||
|
chunksPath := path.Join(linuxUploadTempPath, hash)
|
||||||
|
files, _ := ioutil.ReadDir(chunksPath)
|
||||||
|
// 排序
|
||||||
|
filesSort := make(map[string]string)
|
||||||
|
for _, f := range files {
|
||||||
|
nameArr := strings.Split(f.Name(), "-")
|
||||||
|
filesSort[nameArr[1]] = f.Name()
|
||||||
|
}
|
||||||
|
saveFile := path.Join(linuxUploadPath, name)
|
||||||
|
if exists, _ := PathExists(saveFile); exists {
|
||||||
|
os.Remove(saveFile)
|
||||||
|
}
|
||||||
|
fs, _ := os.OpenFile(saveFile, os.O_CREATE|os.O_RDWR|os.O_APPEND, os.ModeAppend|os.ModePerm)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
filesCount := len(files)
|
||||||
|
if filesCount != len(filesSort) {
|
||||||
|
fmt.Fprintf(w, "文件上传错误2")
|
||||||
|
}
|
||||||
|
wg.Add(filesCount)
|
||||||
|
for i := 0; i < filesCount; i++ {
|
||||||
|
// 这里一定要注意按顺序读取不然文件就会损坏
|
||||||
|
fileName := path.Join(chunksPath, "\\"+filesSort[strconv.Itoa(i)])
|
||||||
|
data, err := ioutil.ReadFile(fileName)
|
||||||
|
fmt.Println(err)
|
||||||
|
fs.Write(data)
|
||||||
|
|
||||||
|
wg.Done()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
os.RemoveAll(path.Join(chunksPath, "\\"))
|
||||||
|
defer fs.Close()
|
||||||
|
|
||||||
|
//// 加载镜像文件到docker
|
||||||
|
//body, err := svcCtx.DockerClient.ImageLoad(context.Background(), multipartFile, false)
|
||||||
|
//if err != nil {
|
||||||
|
// httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
// return
|
||||||
|
//}
|
||||||
|
//bytes, err := ioutil.ReadAll(body.Body)
|
||||||
|
//loadBody := LoadBody{}
|
||||||
|
//err = json.Unmarshal(bytes, &loadBody)
|
||||||
|
//if err != nil {
|
||||||
|
// httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
// return
|
||||||
|
//}
|
||||||
|
//imageName := strings.TrimSpace(loadBody.Stream[13:])
|
||||||
|
//req.Name = "hub.jcce.dev:18443/repository/docker-hub/jcce/" + imageName
|
||||||
|
//// 给镜像打上私有仓库的tag
|
||||||
|
//err = svcCtx.DockerClient.ImageTag(context.Background(), imageName, req.Name)
|
||||||
|
//if err != nil {
|
||||||
|
// httpx.ErrorCtx(r.Context(), w, err)
|
||||||
|
// return
|
||||||
|
//}
|
||||||
|
//l := image.NewUploadImageLogic(r.Context(), svcCtx)
|
||||||
|
//err = l.UploadImage(&req)
|
||||||
|
l := image.NewChunkImageLogic(r.Context(), svcCtx)
|
||||||
|
err := l.ChunkImage()
|
||||||
|
result2.HttpResult(r, w, nil, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DirSize 获取整体文件夹大小
|
||||||
|
func getDirSize(path string) (int64, error) {
|
||||||
|
var size int64
|
||||||
|
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
|
||||||
|
if !info.IsDir() {
|
||||||
|
size += info.Size()
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
return size, err
|
||||||
|
}
|
|
@ -1,15 +1,18 @@
|
||||||
package image
|
package image
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"PCM/adaptor/PCM-CORE/api/internal/logic/image"
|
result2 "PCM/common/result"
|
||||||
types2 "PCM/adaptor/PCM-CORE/api/internal/types"
|
"bufio"
|
||||||
"PCM/common/result"
|
"encoding/json"
|
||||||
"context"
|
"fmt"
|
||||||
"github.com/zeromicro/go-zero/rest/httpx"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"k8s.io/apimachinery/pkg/util/json"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
"PCM/adaptor/PCM-CORE/api/internal/svc"
|
"PCM/adaptor/PCM-CORE/api/internal/svc"
|
||||||
)
|
)
|
||||||
|
@ -18,38 +21,71 @@ type LoadBody struct {
|
||||||
Stream string `json:"stream"`
|
Stream string `json:"stream"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var windowsUploadTempPath = strings.ReplaceAll(path.Join(windowsUploadPath, "temp"), "/", "\\")
|
||||||
|
var linuxUploadTempPath = path.Join(windowsUploadPath)
|
||||||
|
|
||||||
func UploadImageHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
func UploadImageHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
var req types2.UploadImageReq
|
file, _, err := r.FormFile("file")
|
||||||
// 解析yaml文件
|
index := r.PostFormValue("index")
|
||||||
multipartFile, _, err := r.FormFile("file")
|
hash := r.PostFormValue("hash")
|
||||||
if err != nil {
|
// 获取uploads下所有的文件夹
|
||||||
result.HttpResult(r, w, nil, err)
|
nameList, err := ioutil.ReadDir(linuxUploadPath)
|
||||||
return
|
m := map[string]interface{}{
|
||||||
|
"code": 46900,
|
||||||
|
"msg": "文件已上传",
|
||||||
}
|
}
|
||||||
// 加载镜像文件到docker
|
result, _ := json.MarshalIndent(m, "", " ")
|
||||||
body, err := svcCtx.DockerClient.ImageLoad(context.Background(), multipartFile, false)
|
// 循环判断hash是否在文件里如果有就返回上传已完成
|
||||||
if err != nil {
|
for _, name := range nameList {
|
||||||
httpx.ErrorCtx(r.Context(), w, err)
|
tmpName := strings.Split(name.Name(), "_")[0]
|
||||||
return
|
if tmpName == hash {
|
||||||
|
fmt.Fprintf(w, string(result))
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
bytes, err := ioutil.ReadAll(body.Body)
|
|
||||||
loadBody := LoadBody{}
|
chunksPath := path.Join(linuxUploadTempPath, hash)
|
||||||
err = json.Unmarshal(bytes, &loadBody)
|
|
||||||
if err != nil {
|
isPathExists, err := PathExists(chunksPath)
|
||||||
httpx.ErrorCtx(r.Context(), w, err)
|
if !isPathExists {
|
||||||
return
|
err = os.MkdirAll(chunksPath, os.ModePerm)
|
||||||
}
|
}
|
||||||
imageName := strings.TrimSpace(loadBody.Stream[13:])
|
destFile, err := os.OpenFile(path.Join(chunksPath+"\\"+hash+"-"+index), syscall.O_CREAT|syscall.O_WRONLY, 0777)
|
||||||
req.Name = "hub.jcce.dev:18443/repository/docker-hub/jcce/" + imageName
|
reader := bufio.NewReader(file)
|
||||||
// 给镜像打上私有仓库的tag
|
writer := bufio.NewWriter(destFile)
|
||||||
err = svcCtx.DockerClient.ImageTag(context.Background(), imageName, req.Name)
|
buf := make([]byte, 1024*1024) // 1M buf
|
||||||
if err != nil {
|
for {
|
||||||
httpx.ErrorCtx(r.Context(), w, err)
|
n, err := reader.Read(buf)
|
||||||
return
|
if err == io.EOF {
|
||||||
|
writer.Flush()
|
||||||
|
break
|
||||||
|
} else if err != nil {
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
writer.Write(buf[:n])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
l := image.NewUploadImageLogic(r.Context(), svcCtx)
|
|
||||||
err = l.UploadImage(&req)
|
defer file.Close()
|
||||||
result.HttpResult(r, w, nil, err)
|
defer destFile.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal("%v", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("第%s:%s块上传完成\n", index, destFile.Name())
|
||||||
|
|
||||||
|
result2.HttpResult(r, w, nil, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PathExists 判断文件夹是否存在
|
||||||
|
func PathExists(path string) (bool, error) {
|
||||||
|
_, err := os.Stat(path)
|
||||||
|
if err == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
|
@ -266,12 +266,18 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
|
||||||
Path: "/image/upload",
|
Path: "/image/upload",
|
||||||
Handler: image.UploadImageHandler(serverCtx),
|
Handler: image.UploadImageHandler(serverCtx),
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
Method: http.MethodPost,
|
||||||
|
Path: "/image/chunk",
|
||||||
|
Handler: image.ChunkImageHandler(serverCtx),
|
||||||
|
},
|
||||||
{
|
{
|
||||||
Method: http.MethodGet,
|
Method: http.MethodGet,
|
||||||
Path: "/image/list",
|
Path: "/image/list",
|
||||||
Handler: image.ImageListHandler(serverCtx),
|
Handler: image.ImageListHandler(serverCtx),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
rest.WithMaxBytes(624288000),
|
||||||
rest.WithPrefix("/pcm/v1"),
|
rest.WithPrefix("/pcm/v1"),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
package image
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"PCM/adaptor/PCM-CORE/api/internal/svc"
|
||||||
|
"github.com/zeromicro/go-zero/core/logx"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ChunkImageLogic struct {
|
||||||
|
logx.Logger
|
||||||
|
ctx context.Context
|
||||||
|
svcCtx *svc.ServiceContext
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewChunkImageLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ChunkImageLogic {
|
||||||
|
return &ChunkImageLogic{
|
||||||
|
Logger: logx.WithContext(ctx),
|
||||||
|
ctx: ctx,
|
||||||
|
svcCtx: svcCtx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *ChunkImageLogic) ChunkImage() error {
|
||||||
|
// todo: add your logic here and delete this line
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -44,7 +44,7 @@ func main() {
|
||||||
serviceGroup := service.NewServiceGroup()
|
serviceGroup := service.NewServiceGroup()
|
||||||
defer serviceGroup.Stop()
|
defer serviceGroup.Stop()
|
||||||
|
|
||||||
server := rest.MustNewServer(c.RestConf)
|
server := rest.MustNewServer(c.RestConf, rest.WithCors())
|
||||||
|
|
||||||
ctx := svc.NewServiceContext(c)
|
ctx := svc.NewServiceContext(c)
|
||||||
// start log component
|
// start log component
|
||||||
|
|
Loading…
Reference in New Issue