feat(archive): add support for 7z and bz2 / extract rar and 7zip files protected with password (#2668)
This commit is contained in:
@@ -163,6 +163,10 @@ type (
|
||||
rsc io.ReadCloser
|
||||
pos int64
|
||||
o *EntitySourceOptions
|
||||
|
||||
// Cache for resetRequest URL and expiry
|
||||
cachedUrl string
|
||||
cachedExpiry time.Time
|
||||
}
|
||||
)
|
||||
|
||||
@@ -215,6 +219,10 @@ func NewEntitySource(
|
||||
}
|
||||
|
||||
func (f *entitySource) Apply(opts ...EntitySourceOption) {
|
||||
if len(opts) > 0 {
|
||||
// Clear cache when options are applied as they might affect URL generation
|
||||
f.clearUrlCache()
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt.Apply(f.o)
|
||||
}
|
||||
@@ -247,6 +255,10 @@ func (f *entitySource) LocalPath(ctx context.Context) string {
|
||||
}
|
||||
|
||||
func (f *entitySource) Serve(w http.ResponseWriter, r *http.Request, opts ...EntitySourceOption) {
|
||||
if len(opts) > 0 {
|
||||
// Clear cache when options are applied as they might affect URL generation
|
||||
f.clearUrlCache()
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt.Apply(f.o)
|
||||
}
|
||||
@@ -478,16 +490,22 @@ func (f *entitySource) Read(p []byte) (n int, err error) {
|
||||
}
|
||||
|
||||
func (f *entitySource) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
if f.IsLocal() {
|
||||
if f.rsc == nil {
|
||||
err = f.resetRequest()
|
||||
}
|
||||
if readAt, ok := f.rsc.(io.ReaderAt); ok {
|
||||
return readAt.ReadAt(p, off)
|
||||
if f.rsc == nil {
|
||||
err = f.resetRequest()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
if readAt, ok := f.rsc.(io.ReaderAt); ok {
|
||||
return readAt.ReadAt(p, off)
|
||||
}
|
||||
|
||||
return 0, errors.New("source does not support ReadAt")
|
||||
// For non-local sources, use HTTP range request to read at specific offset
|
||||
rsc, err := f.getRsc(off)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return io.ReadFull(rsc, p)
|
||||
}
|
||||
|
||||
func (f *entitySource) Seek(offset int64, whence int) (int64, error) {
|
||||
@@ -524,6 +542,12 @@ func (f *entitySource) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// clearUrlCache clears the cached URL and expiry
|
||||
func (f *entitySource) clearUrlCache() {
|
||||
f.cachedUrl = ""
|
||||
f.cachedExpiry = time.Time{}
|
||||
}
|
||||
|
||||
func (f *entitySource) ShouldInternalProxy(opts ...EntitySourceOption) bool {
|
||||
for _, opt := range opts {
|
||||
opt.Apply(f.o)
|
||||
@@ -534,6 +558,10 @@ func (f *entitySource) ShouldInternalProxy(opts ...EntitySourceOption) bool {
|
||||
}
|
||||
|
||||
func (f *entitySource) Url(ctx context.Context, opts ...EntitySourceOption) (*EntityUrl, error) {
|
||||
if len(opts) > 0 {
|
||||
// Clear cache when options are applied as they might affect URL generation
|
||||
f.clearUrlCache()
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt.Apply(f.o)
|
||||
}
|
||||
@@ -613,50 +641,75 @@ func (f *entitySource) Url(ctx context.Context, opts ...EntitySourceOption) (*En
|
||||
|
||||
func (f *entitySource) resetRequest() error {
|
||||
// For inbound files, we can use the handler to open the file directly
|
||||
if f.IsLocal() {
|
||||
if f.rsc == nil {
|
||||
file, err := f.handler.Open(f.o.Ctx, f.e.Source())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open inbound file: %w", err)
|
||||
}
|
||||
|
||||
if f.pos > 0 {
|
||||
_, err = file.Seek(f.pos, io.SeekStart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to seek inbound file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
f.rsc = file
|
||||
|
||||
if f.o.SpeedLimit > 0 {
|
||||
bucket := ratelimit.NewBucketWithRate(float64(f.o.SpeedLimit), f.o.SpeedLimit)
|
||||
f.rsc = lrs{f.rsc, ratelimit.Reader(f.rsc, bucket)}
|
||||
}
|
||||
}
|
||||
|
||||
if f.IsLocal() && f.rsc != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
expire := time.Now().Add(defaultUrlExpire)
|
||||
u, err := f.Url(driver.WithForcePublicEndpoint(f.o.Ctx, false), WithNoInternalProxy(), WithExpire(&expire))
|
||||
rsc, err := f.getRsc(f.pos)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate download url: %w", err)
|
||||
return fmt.Errorf("failed to get rsc: %w", err)
|
||||
}
|
||||
f.rsc = rsc
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *entitySource) getRsc(pos int64) (io.ReadCloser, error) {
|
||||
// For inbound files, we can use the handler to open the file directly
|
||||
if f.IsLocal() {
|
||||
file, err := f.handler.Open(f.o.Ctx, f.e.Source())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open inbound file: %w", err)
|
||||
}
|
||||
|
||||
if pos > 0 {
|
||||
_, err = file.Seek(pos, io.SeekStart)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to seek inbound file: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if f.o.SpeedLimit > 0 {
|
||||
bucket := ratelimit.NewBucketWithRate(float64(f.o.SpeedLimit), f.o.SpeedLimit)
|
||||
return lrs{f.rsc, ratelimit.Reader(f.rsc, bucket)}, nil
|
||||
} else {
|
||||
return file, nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
var urlStr string
|
||||
now := time.Now()
|
||||
|
||||
// Check if we have a valid cached URL and expiry
|
||||
if f.cachedUrl != "" && now.Before(f.cachedExpiry.Add(-time.Minute)) {
|
||||
// Use cached URL if it's still valid (with 1 minute buffer before expiry)
|
||||
urlStr = f.cachedUrl
|
||||
} else {
|
||||
// Generate new URL and cache it
|
||||
expire := now.Add(defaultUrlExpire)
|
||||
u, err := f.Url(driver.WithForcePublicEndpoint(f.o.Ctx, false), WithNoInternalProxy(), WithExpire(&expire))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to generate download url: %w", err)
|
||||
}
|
||||
|
||||
// Cache the URL and expiry
|
||||
f.cachedUrl = u.Url
|
||||
f.cachedExpiry = expire
|
||||
urlStr = u.Url
|
||||
}
|
||||
|
||||
h := http.Header{}
|
||||
h.Set("Range", fmt.Sprintf("bytes=%d-", f.pos))
|
||||
resp := f.c.Request(http.MethodGet, u.Url, nil,
|
||||
h.Set("Range", fmt.Sprintf("bytes=%d-", pos))
|
||||
resp := f.c.Request(http.MethodGet, urlStr, nil,
|
||||
request.WithContext(f.o.Ctx),
|
||||
request.WithLogger(f.l),
|
||||
request.WithHeader(h),
|
||||
).CheckHTTPResponse(http.StatusOK, http.StatusPartialContent)
|
||||
if resp.Err != nil {
|
||||
return fmt.Errorf("failed to request download url: %w", resp.Err)
|
||||
return nil, fmt.Errorf("failed to request download url: %w", resp.Err)
|
||||
}
|
||||
|
||||
f.rsc = resp.Response.Body
|
||||
return nil
|
||||
return resp.Response.Body, nil
|
||||
}
|
||||
|
||||
// capExpireTime make sure expire time is not too long or too short (if min or max is set)
|
||||
|
||||
@@ -26,7 +26,14 @@ import (
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/util"
|
||||
"github.com/gofrs/uuid"
|
||||
"github.com/mholt/archiver/v4"
|
||||
"github.com/mholt/archives"
|
||||
"golang.org/x/text/encoding"
|
||||
"golang.org/x/text/encoding/charmap"
|
||||
"golang.org/x/text/encoding/japanese"
|
||||
"golang.org/x/text/encoding/korean"
|
||||
"golang.org/x/text/encoding/simplifiedchinese"
|
||||
"golang.org/x/text/encoding/traditionalchinese"
|
||||
"golang.org/x/text/encoding/unicode"
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -47,6 +54,7 @@ type (
|
||||
TempZipFilePath string `json:"temp_zip_file_path,omitempty"`
|
||||
ProcessedCursor string `json:"processed_cursor,omitempty"`
|
||||
SlaveTaskID int `json:"slave_task_id,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
NodeState `json:",inline"`
|
||||
Phase ExtractArchiveTaskPhase `json:"phase,omitempty"`
|
||||
}
|
||||
@@ -70,13 +78,54 @@ func init() {
|
||||
queue.RegisterResumableTaskFactory(queue.ExtractArchiveTaskType, NewExtractArchiveTaskFromModel)
|
||||
}
|
||||
|
||||
var encodings = map[string]encoding.Encoding{
|
||||
"ibm866": charmap.CodePage866,
|
||||
"iso8859_2": charmap.ISO8859_2,
|
||||
"iso8859_3": charmap.ISO8859_3,
|
||||
"iso8859_4": charmap.ISO8859_4,
|
||||
"iso8859_5": charmap.ISO8859_5,
|
||||
"iso8859_6": charmap.ISO8859_6,
|
||||
"iso8859_7": charmap.ISO8859_7,
|
||||
"iso8859_8": charmap.ISO8859_8,
|
||||
"iso8859_8I": charmap.ISO8859_8I,
|
||||
"iso8859_10": charmap.ISO8859_10,
|
||||
"iso8859_13": charmap.ISO8859_13,
|
||||
"iso8859_14": charmap.ISO8859_14,
|
||||
"iso8859_15": charmap.ISO8859_15,
|
||||
"iso8859_16": charmap.ISO8859_16,
|
||||
"koi8r": charmap.KOI8R,
|
||||
"koi8u": charmap.KOI8U,
|
||||
"macintosh": charmap.Macintosh,
|
||||
"windows874": charmap.Windows874,
|
||||
"windows1250": charmap.Windows1250,
|
||||
"windows1251": charmap.Windows1251,
|
||||
"windows1252": charmap.Windows1252,
|
||||
"windows1253": charmap.Windows1253,
|
||||
"windows1254": charmap.Windows1254,
|
||||
"windows1255": charmap.Windows1255,
|
||||
"windows1256": charmap.Windows1256,
|
||||
"windows1257": charmap.Windows1257,
|
||||
"windows1258": charmap.Windows1258,
|
||||
"macintoshcyrillic": charmap.MacintoshCyrillic,
|
||||
"gbk": simplifiedchinese.GBK,
|
||||
"gb18030": simplifiedchinese.GB18030,
|
||||
"big5": traditionalchinese.Big5,
|
||||
"eucjp": japanese.EUCJP,
|
||||
"iso2022jp": japanese.ISO2022JP,
|
||||
"shiftjis": japanese.ShiftJIS,
|
||||
"euckr": korean.EUCKR,
|
||||
"utf16be": unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM),
|
||||
"utf16le": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
|
||||
}
|
||||
|
||||
// NewExtractArchiveTask creates a new ExtractArchiveTask
|
||||
func NewExtractArchiveTask(ctx context.Context, src, dst, encoding string) (queue.Task, error) {
|
||||
func NewExtractArchiveTask(ctx context.Context, src, dst, encoding, password string) (queue.Task, error) {
|
||||
state := &ExtractArchiveTaskState{
|
||||
Uri: src,
|
||||
Dst: dst,
|
||||
Encoding: encoding,
|
||||
NodeState: NodeState{},
|
||||
Password: password,
|
||||
}
|
||||
stateBytes, err := json.Marshal(state)
|
||||
if err != nil {
|
||||
@@ -197,6 +246,7 @@ func (m *ExtractArchiveTask) createSlaveExtractTask(ctx context.Context, dep dep
|
||||
Encoding: m.state.Encoding,
|
||||
Dst: m.state.Dst,
|
||||
UserID: user.ID,
|
||||
Password: m.state.Password,
|
||||
}
|
||||
|
||||
payloadStr, err := json.Marshal(payload)
|
||||
@@ -277,20 +327,21 @@ func (m *ExtractArchiveTask) masterExtractArchive(ctx context.Context, dep depen
|
||||
|
||||
m.l.Info("Extracting archive %q to %q", uri, m.state.Dst)
|
||||
// Identify file format
|
||||
format, readStream, err := archiver.Identify(archiveFile.DisplayName(), es)
|
||||
format, readStream, err := archives.Identify(ctx, archiveFile.DisplayName(), es)
|
||||
if err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to identify archive format: %w", err)
|
||||
}
|
||||
|
||||
m.l.Info("Archive file %q format identified as %q", uri, format.Name())
|
||||
m.l.Info("Archive file %q format identified as %q", uri, format.Extension())
|
||||
|
||||
extractor, ok := format.(archiver.Extractor)
|
||||
extractor, ok := format.(archives.Extractor)
|
||||
if !ok {
|
||||
return task.StatusError, fmt.Errorf("format not an extractor %s")
|
||||
}
|
||||
|
||||
if format.Name() == ".zip" {
|
||||
// Zip extractor requires a Seeker+ReadAt
|
||||
formatExt := format.Extension()
|
||||
if formatExt == ".zip" || formatExt == ".7z" {
|
||||
// Zip/7Z extractor requires a Seeker+ReadAt
|
||||
if m.state.TempZipFilePath == "" && !es.IsLocal() {
|
||||
m.state.Phase = ExtractArchivePhaseDownloadZip
|
||||
m.ResumeAfter(0)
|
||||
@@ -315,11 +366,25 @@ func (m *ExtractArchiveTask) masterExtractArchive(ctx context.Context, dep depen
|
||||
|
||||
readStream = es
|
||||
}
|
||||
}
|
||||
|
||||
if zipExtractor, ok := extractor.(archives.Zip); ok {
|
||||
if m.state.Encoding != "" {
|
||||
m.l.Info("Using encoding %q for zip archive", m.state.Encoding)
|
||||
extractor = archiver.Zip{TextEncoding: m.state.Encoding}
|
||||
encoding, ok := encodings[strings.ToLower(m.state.Encoding)]
|
||||
if !ok {
|
||||
m.l.Warning("Unknown encoding %q, fallback to default encoding", m.state.Encoding)
|
||||
} else {
|
||||
zipExtractor.TextEncoding = encoding
|
||||
extractor = zipExtractor
|
||||
}
|
||||
}
|
||||
} else if rarExtractor, ok := extractor.(archives.Rar); ok && m.state.Password != "" {
|
||||
rarExtractor.Password = m.state.Password
|
||||
extractor = rarExtractor
|
||||
} else if sevenZipExtractor, ok := extractor.(archives.SevenZip); ok && m.state.Password != "" {
|
||||
sevenZipExtractor.Password = m.state.Password
|
||||
extractor = sevenZipExtractor
|
||||
}
|
||||
|
||||
needSkipToCursor := false
|
||||
@@ -332,7 +397,7 @@ func (m *ExtractArchiveTask) masterExtractArchive(ctx context.Context, dep depen
|
||||
m.Unlock()
|
||||
|
||||
// extract and upload
|
||||
err = extractor.Extract(ctx, readStream, nil, func(ctx context.Context, f archiver.File) error {
|
||||
err = extractor.Extract(ctx, readStream, func(ctx context.Context, f archives.FileInfo) error {
|
||||
if needSkipToCursor && f.NameInArchive != m.state.ProcessedCursor {
|
||||
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
|
||||
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, f.Size())
|
||||
@@ -533,6 +598,7 @@ type (
|
||||
TempPath string `json:"temp_path,omitempty"`
|
||||
TempZipFilePath string `json:"temp_zip_file_path,omitempty"`
|
||||
ProcessedCursor string `json:"processed_cursor,omitempty"`
|
||||
Password string `json:"password,omitempty"`
|
||||
}
|
||||
)
|
||||
|
||||
@@ -602,18 +668,19 @@ func (m *SlaveExtractArchiveTask) Do(ctx context.Context) (task.Status, error) {
|
||||
defer es.Close()
|
||||
|
||||
// 2. Identify file format
|
||||
format, readStream, err := archiver.Identify(m.state.FileName, es)
|
||||
format, readStream, err := archives.Identify(ctx, m.state.FileName, es)
|
||||
if err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to identify archive format: %w", err)
|
||||
}
|
||||
m.l.Info("Archive file %q format identified as %q", m.state.FileName, format.Name())
|
||||
m.l.Info("Archive file %q format identified as %q", m.state.FileName, format.Extension())
|
||||
|
||||
extractor, ok := format.(archiver.Extractor)
|
||||
extractor, ok := format.(archives.Extractor)
|
||||
if !ok {
|
||||
return task.StatusError, fmt.Errorf("format not an extractor %s")
|
||||
return task.StatusError, fmt.Errorf("format not an extractor %q", format.Extension())
|
||||
}
|
||||
|
||||
if format.Name() == ".zip" {
|
||||
formatExt := format.Extension()
|
||||
if formatExt == ".zip" || formatExt == ".7z" {
|
||||
if _, err = es.Seek(0, 0); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to seek entity source: %w", err)
|
||||
}
|
||||
@@ -666,11 +733,25 @@ func (m *SlaveExtractArchiveTask) Do(ctx context.Context) (task.Status, error) {
|
||||
if es.IsLocal() {
|
||||
readStream = es
|
||||
}
|
||||
}
|
||||
|
||||
if zipExtractor, ok := extractor.(archives.Zip); ok {
|
||||
if m.state.Encoding != "" {
|
||||
m.l.Info("Using encoding %q for zip archive", m.state.Encoding)
|
||||
extractor = archiver.Zip{TextEncoding: m.state.Encoding}
|
||||
encoding, ok := encodings[strings.ToLower(m.state.Encoding)]
|
||||
if !ok {
|
||||
m.l.Warning("Unknown encoding %q, fallback to default encoding", m.state.Encoding)
|
||||
} else {
|
||||
zipExtractor.TextEncoding = encoding
|
||||
extractor = zipExtractor
|
||||
}
|
||||
}
|
||||
} else if rarExtractor, ok := extractor.(archives.Rar); ok && m.state.Password != "" {
|
||||
rarExtractor.Password = m.state.Password
|
||||
extractor = rarExtractor
|
||||
} else if sevenZipExtractor, ok := extractor.(archives.SevenZip); ok && m.state.Password != "" {
|
||||
sevenZipExtractor.Password = m.state.Password
|
||||
extractor = sevenZipExtractor
|
||||
}
|
||||
|
||||
needSkipToCursor := false
|
||||
@@ -679,7 +760,7 @@ func (m *SlaveExtractArchiveTask) Do(ctx context.Context) (task.Status, error) {
|
||||
}
|
||||
|
||||
// 3. Extract and upload
|
||||
err = extractor.Extract(ctx, readStream, nil, func(ctx context.Context, f archiver.File) error {
|
||||
err = extractor.Extract(ctx, readStream, func(ctx context.Context, f archives.FileInfo) error {
|
||||
if needSkipToCursor && f.NameInArchive != m.state.ProcessedCursor {
|
||||
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
|
||||
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, f.Size())
|
||||
|
||||
Reference in New Issue
Block a user