From a1b403d23cf1a478e52c4c99dd79f9cc92bfa36a Mon Sep 17 00:00:00 2001 From: binwiederhier Date: Thu, 19 Mar 2026 21:11:36 -0400 Subject: [PATCH] Remove s3 config option, reduce size when removing files --- attachment/backend.go | 3 +- attachment/backend_file.go | 41 +++++++++++--------- attachment/backend_s3.go | 49 +++++++++++++----------- attachment/store.go | 69 +++++++++++++++++++++++----------- attachment/store_file_test.go | 4 +- attachment/store_s3_test.go | 5 +-- cmd/serve.go | 9 +---- docs/config.md | 17 ++++----- docs/releases.md | 2 +- s3/client.go | 71 +++++++++++++++++++++++++++++++++++ s3/types.go | 21 +++++++++++ server/config.go | 1 - server/server.go | 8 ++-- server/server.yml | 4 +- 14 files changed, 211 insertions(+), 93 deletions(-) diff --git a/attachment/backend.go b/attachment/backend.go index 8989b890..e95fc91e 100644 --- a/attachment/backend.go +++ b/attachment/backend.go @@ -17,6 +17,7 @@ type object struct { type backend interface { Put(id string, in io.Reader) error Get(id string) (io.ReadCloser, int64, error) - Delete(ids ...string) error List() ([]object, error) + Delete(ids ...string) error + DeleteIncomplete(cutoff time.Time) error } diff --git a/attachment/backend_file.go b/attachment/backend_file.go index b0afb2ca..8aaf20b9 100644 --- a/attachment/backend_file.go +++ b/attachment/backend_file.go @@ -4,6 +4,7 @@ import ( "io" "os" "path/filepath" + "time" "heckel.io/ntfy/v2/log" ) @@ -41,6 +42,26 @@ func (b *fileBackend) Put(id string, in io.Reader) error { return nil } +func (b *fileBackend) List() ([]object, error) { + entries, err := os.ReadDir(b.dir) + if err != nil { + return nil, err + } + objects := make([]object, 0, len(entries)) + for _, e := range entries { + info, err := e.Info() + if err != nil { + return nil, err + } + objects = append(objects, object{ + ID: e.Name(), + Size: info.Size(), + LastModified: info.ModTime(), + }) + } + return objects, nil +} + func (b *fileBackend) Get(id string) (io.ReadCloser, int64, error) { file := filepath.Join(b.dir, id) stat, err := os.Stat(file) @@ -64,22 +85,6 @@ func (b *fileBackend) Delete(ids ...string) error { return nil } -func (b *fileBackend) List() ([]object, error) { - entries, err := os.ReadDir(b.dir) - if err != nil { - return nil, err - } - objects := make([]object, 0, len(entries)) - for _, e := range entries { - info, err := e.Info() - if err != nil { - return nil, err - } - objects = append(objects, object{ - ID: e.Name(), - Size: info.Size(), - LastModified: info.ModTime(), - }) - } - return objects, nil +func (b *fileBackend) DeleteIncomplete(_ time.Time) error { + return nil } diff --git a/attachment/backend_s3.go b/attachment/backend_s3.go index 8fcd8ccb..11d2254b 100644 --- a/attachment/backend_s3.go +++ b/attachment/backend_s3.go @@ -4,13 +4,16 @@ import ( "context" "io" "strings" - - "heckel.io/ntfy/v2/s3" + "time" "heckel.io/ntfy/v2/log" + "heckel.io/ntfy/v2/s3" ) -const tagS3Backend = "s3_backend" +const ( + tagS3Backend = "s3_backend" + deleteBatchSize = 1000 +) type s3Backend struct { client *s3.Client @@ -30,24 +33,6 @@ func (b *s3Backend) Get(id string) (io.ReadCloser, int64, error) { return b.client.GetObject(context.Background(), id) } -func (b *s3Backend) Delete(ids ...string) error { - // S3 DeleteObjects supports up to 1000 keys per call - for i := 0; i < len(ids); i += 1000 { - end := i + 1000 - if end > len(ids) { - end = len(ids) - } - batch := ids[i:end] - for _, id := range batch { - log.Tag(tagS3Backend).Field("message_id", id).Debug("Deleting attachment from S3") - } - if err := b.client.DeleteObjects(context.Background(), batch); err != nil { - return err - } - } - return nil -} - func (b *s3Backend) List() ([]object, error) { objects, err := b.client.ListAllObjects(context.Background()) if err != nil { @@ -68,3 +53,25 @@ func (b *s3Backend) List() ([]object, error) { } return result, nil } + +func (b *s3Backend) Delete(ids ...string) error { + // S3 DeleteObjects supports up to 1000 keys per call + for i := 0; i < len(ids); i += deleteBatchSize { + end := i + deleteBatchSize + if end > len(ids) { + end = len(ids) + } + batch := ids[i:end] + for _, id := range batch { + log.Tag(tagS3Backend).Field("message_id", id).Debug("Deleting attachment from S3") + } + if err := b.client.DeleteObjects(context.Background(), batch); err != nil { + return err + } + } + return nil +} + +func (b *s3Backend) DeleteIncomplete(cutoff time.Time) error { + return b.client.AbortIncompleteUploads(context.Background(), cutoff) +} diff --git a/attachment/store.go b/attachment/store.go index f66cd35c..78b8a7cc 100644 --- a/attachment/store.go +++ b/attachment/store.go @@ -28,21 +28,22 @@ var ( // Store manages attachment storage with shared logic for size tracking, limiting, // ID validation, and background sync to reconcile storage with the database. type Store struct { - backend backend - totalSizeCurrent int64 - totalSizeLimit int64 - localIDs func() ([]string, error) // returns IDs that should exist - closeChan chan struct{} - mu sync.Mutex // Protects totalSizeCurrent + backend backend + limit int64 // Defined limit of the store in bytes + size int64 // Current size of the store in bytes + sizes map[string]int64 // File ID -> size, for subtracting on Remove + localIDs func() ([]string, error) // Returns file IDs that should exist locally, used for sync() + closeChan chan struct{} + mu sync.Mutex // Protects size and sizes } // NewFileStore creates a new file-system backed attachment cache func NewFileStore(dir string, totalSizeLimit int64, localIDsFn func() ([]string, error)) (*Store, error) { - backend, err := newFileBackend(dir) + b, err := newFileBackend(dir) if err != nil { return nil, err } - return newStore(backend, totalSizeLimit, localIDsFn) + return newStore(b, totalSizeLimit, localIDsFn) } // NewS3Store creates a new S3-backed attachment cache. The s3URL must be in the format: @@ -58,10 +59,11 @@ func NewS3Store(s3URL string, totalSizeLimit int64, localIDs func() ([]string, e func newStore(backend backend, totalSizeLimit int64, localIDs func() ([]string, error)) (*Store, error) { c := &Store{ - backend: backend, - totalSizeLimit: totalSizeLimit, - localIDs: localIDs, - closeChan: make(chan struct{}), + backend: backend, + limit: totalSizeLimit, + sizes: make(map[string]int64), + localIDs: localIDs, + closeChan: make(chan struct{}), } if localIDs != nil { go c.syncLoop() @@ -85,7 +87,8 @@ func (c *Store) Write(id string, in io.Reader, limiters ...util.Limiter) (int64, } size := cr.Total() c.mu.Lock() - c.totalSizeCurrent += size + c.size += size + c.sizes[id] = size c.mu.Unlock() return size, nil } @@ -98,15 +101,30 @@ func (c *Store) Read(id string) (io.ReadCloser, int64, error) { return c.backend.Get(id) } -// Remove deletes attachment files by ID. It does NOT recompute the total size; -// the next sync() call will correct it. +// Remove deletes attachment files by ID and subtracts their known sizes from +// the total. Sizes for objects not tracked (e.g. written before this process +// started and before the first sync) are corrected by the next sync() call. func (c *Store) Remove(ids ...string) error { for _, id := range ids { if !fileIDRegex.MatchString(id) { return errInvalidFileID } } - return c.backend.Delete(ids...) + if err := c.backend.Delete(ids...); err != nil { + return err + } + c.mu.Lock() + for _, id := range ids { + if size, ok := c.sizes[id]; ok { + c.size -= size + delete(c.sizes, id) + } + } + if c.size < 0 { + c.size = 0 + } + c.mu.Unlock() + return nil } // sync reconciles the backend storage with the database. It lists all objects, @@ -130,7 +148,8 @@ func (c *Store) sync() error { // than the grace period to account for races, and skipping objects with invalid IDs. cutoff := time.Now().Add(-orphanGracePeriod) var orphanIDs []string - var totalSize int64 + var size int64 + sizes := make(map[string]int64, len(remoteObjects)) for _, obj := range remoteObjects { if !fileIDRegex.MatchString(obj.ID) { continue @@ -138,12 +157,14 @@ func (c *Store) sync() error { if _, ok := localIDMap[obj.ID]; !ok && obj.LastModified.Before(cutoff) { orphanIDs = append(orphanIDs, obj.ID) } else { - totalSize += obj.Size + size += obj.Size + sizes[obj.ID] = obj.Size } } - log.Tag(tagStore).Debug("Sync: cache size updated to %s", util.FormatSizeHuman(totalSize)) + log.Tag(tagStore).Debug("Sync: cache size updated to %s", util.FormatSizeHuman(size)) c.mu.Lock() - c.totalSizeCurrent = totalSize + c.size = size + c.sizes = sizes c.mu.Unlock() // Delete orphaned attachments if len(orphanIDs) > 0 { @@ -152,6 +173,10 @@ func (c *Store) sync() error { return fmt.Errorf("attachment sync: failed to delete orphaned objects: %w", err) } } + // Clean up incomplete uploads (S3 only) + if err := c.backend.DeleteIncomplete(cutoff); err != nil { + log.Tag(tagStore).Err(err).Warn("Sync: failed to abort incomplete uploads") + } return nil } @@ -159,14 +184,14 @@ func (c *Store) sync() error { func (c *Store) Size() int64 { c.mu.Lock() defer c.mu.Unlock() - return c.totalSizeCurrent + return c.size } // Remaining returns the remaining capacity for attachments func (c *Store) Remaining() int64 { c.mu.Lock() defer c.mu.Unlock() - remaining := c.totalSizeLimit - c.totalSizeCurrent + remaining := c.limit - c.size if remaining < 0 { return 0 } diff --git a/attachment/store_file_test.go b/attachment/store_file_test.go index ceac09d7..c65bad92 100644 --- a/attachment/store_file_test.go +++ b/attachment/store_file_test.go @@ -57,8 +57,8 @@ func TestFileStore_Write_Remove_Success(t *testing.T) { require.Nil(t, c.Remove("abcdefghijk1", "abcdefghijk5")) require.NoFileExists(t, dir+"/abcdefghijk1") require.NoFileExists(t, dir+"/abcdefghijk5") - // Size is not recomputed by Remove; it stays stale until next sync - require.Equal(t, int64(9990), c.Size()) + require.Equal(t, int64(8*999), c.Size()) + require.Equal(t, int64(10240-8*999), c.Remaining()) } func TestFileStore_Write_FailedTotalSizeLimit(t *testing.T) { diff --git a/attachment/store_s3_test.go b/attachment/store_s3_test.go index e29f9ed6..7ae122ae 100644 --- a/attachment/store_s3_test.go +++ b/attachment/store_s3_test.go @@ -42,7 +42,7 @@ func TestS3Store_WriteReadRemove(t *testing.T) { // Remove require.Nil(t, cache.Remove("abcdefghijkl")) - // Size is not recomputed by Remove; stays stale until next sync + require.Equal(t, int64(0), cache.Size()) // Read after remove should fail _, _, err = cache.Read("abcdefghijkl") @@ -107,8 +107,7 @@ func TestS3Store_WriteRemoveMultiple(t *testing.T) { require.Equal(t, int64(500), cache.Size()) require.Nil(t, cache.Remove("abcdefghijk1", "abcdefghijk3")) - // Size not recomputed by Remove - require.Equal(t, int64(500), cache.Size()) + require.Equal(t, int64(300), cache.Size()) } func TestS3Store_ReadNotFound(t *testing.T) { diff --git a/cmd/serve.go b/cmd/serve.go index 26a08c81..52794a07 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -52,8 +52,7 @@ var flagsServe = append( altsrc.NewStringSliceFlag(&cli.StringSliceFlag{Name: "auth-users", Aliases: []string{"auth_users"}, EnvVars: []string{"NTFY_AUTH_USERS"}, Usage: "pre-provisioned declarative users"}), altsrc.NewStringSliceFlag(&cli.StringSliceFlag{Name: "auth-access", Aliases: []string{"auth_access"}, EnvVars: []string{"NTFY_AUTH_ACCESS"}, Usage: "pre-provisioned declarative access control entries"}), altsrc.NewStringSliceFlag(&cli.StringSliceFlag{Name: "auth-tokens", Aliases: []string{"auth_tokens"}, EnvVars: []string{"NTFY_AUTH_TOKENS"}, Usage: "pre-provisioned declarative access tokens"}), - altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-cache-dir", Aliases: []string{"attachment_cache_dir"}, EnvVars: []string{"NTFY_ATTACHMENT_CACHE_DIR"}, Usage: "cache directory for attached files"}), - altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-s3-url", Aliases: []string{"attachment_s3_url"}, EnvVars: []string{"NTFY_ATTACHMENT_S3_URL"}, Usage: "S3 URL for attachment storage (s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION)"}), + altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-cache-dir", Aliases: []string{"attachment_cache_dir"}, EnvVars: []string{"NTFY_ATTACHMENT_CACHE_DIR"}, Usage: "cache directory for attached files, or S3 URL (s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION)"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-total-size-limit", Aliases: []string{"attachment_total_size_limit", "A"}, EnvVars: []string{"NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT"}, Value: util.FormatSize(server.DefaultAttachmentTotalSizeLimit), Usage: "limit of the on-disk attachment cache"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-file-size-limit", Aliases: []string{"attachment_file_size_limit", "Y"}, EnvVars: []string{"NTFY_ATTACHMENT_FILE_SIZE_LIMIT"}, Value: util.FormatSize(server.DefaultAttachmentFileSizeLimit), Usage: "per-file attachment size limit (e.g. 300k, 2M, 100M)"}), altsrc.NewStringFlag(&cli.StringFlag{Name: "attachment-expiry-duration", Aliases: []string{"attachment_expiry_duration", "X"}, EnvVars: []string{"NTFY_ATTACHMENT_EXPIRY_DURATION"}, Value: util.FormatDuration(server.DefaultAttachmentExpiryDuration), Usage: "duration after which uploaded attachments will be deleted (e.g. 3h, 20h)"}), @@ -167,7 +166,6 @@ func execServe(c *cli.Context) error { authAccessRaw := c.StringSlice("auth-access") authTokensRaw := c.StringSlice("auth-tokens") attachmentCacheDir := c.String("attachment-cache-dir") - attachmentS3URL := c.String("attachment-s3-url") attachmentTotalSizeLimitStr := c.String("attachment-total-size-limit") attachmentFileSizeLimitStr := c.String("attachment-file-size-limit") attachmentExpiryDurationStr := c.String("attachment-expiry-duration") @@ -316,10 +314,6 @@ func execServe(c *cli.Context) error { return errors.New("if smtp-server-listen is set, smtp-server-domain must also be set") } else if attachmentCacheDir != "" && baseURL == "" { return errors.New("if attachment-cache-dir is set, base-url must also be set") - } else if attachmentS3URL != "" && baseURL == "" { - return errors.New("if attachment-s3-url is set, base-url must also be set") - } else if attachmentS3URL != "" && attachmentCacheDir != "" { - return errors.New("attachment-cache-dir and attachment-s3-url are mutually exclusive") } else if baseURL != "" { u, err := url.Parse(baseURL) if err != nil { @@ -463,7 +457,6 @@ func execServe(c *cli.Context) error { conf.AuthAccess = authAccess conf.AuthTokens = authTokens conf.AttachmentCacheDir = attachmentCacheDir - conf.AttachmentS3URL = attachmentS3URL conf.AttachmentTotalSizeLimit = attachmentTotalSizeLimit conf.AttachmentFileSizeLimit = attachmentFileSizeLimit conf.AttachmentExpiryDuration = attachmentExpiryDuration diff --git a/docs/config.md b/docs/config.md index 34484a51..edfa43ff 100644 --- a/docs/config.md +++ b/docs/config.md @@ -490,7 +490,7 @@ Subscribers can retrieve cached messaging using the [`poll=1` parameter](subscri ## Attachments If desired, you may allow users to upload and [attach files to notifications](publish.md#attachments). To enable this feature, you have to configure an attachment storage backend and a base URL (`base-url`). Attachments can be stored -either on the local filesystem (`attachment-cache-dir`) or in an S3-compatible object store (`attachment-s3-url`). +either on the local filesystem or in an S3-compatible object store, both using the `attachment-cache-dir` option. Once configured, you can upload attachments via PUT. By default, attachments are stored **for only 3 hours**. The main reason for this is to avoid legal issues @@ -498,8 +498,7 @@ and such when hosting user controlled content. Typically, this is more than enou feature) to download the file. The following config options are relevant to attachments: * `base-url` is the root URL for the ntfy server; this is needed for the generated attachment URLs -* `attachment-cache-dir` is the cache directory for attached files (mutually exclusive with `attachment-s3-url`) -* `attachment-s3-url` is the S3 URL for attachment storage (mutually exclusive with `attachment-cache-dir`) +* `attachment-cache-dir` is the cache directory for attached files, or an S3 URL for object storage * `attachment-total-size-limit` is the size limit of the attachment storage (default: 5G) * `attachment-file-size-limit` is the per-file attachment size limit (e.g. 300k, 2M, 100M, default: 15M) * `attachment-expiry-duration` is the duration after which uploaded attachments will be deleted (e.g. 3h, 20h, default: 3h) @@ -528,7 +527,7 @@ Here's an example config using the local filesystem for attachment storage: As an alternative to the local filesystem, you can store attachments in an S3-compatible object store (e.g. AWS S3, MinIO, DigitalOcean Spaces). This is useful for HA/cloud deployments where you don't want to rely on local disk storage. -The `attachment-s3-url` option uses the following format: +To use S3, set `attachment-cache-dir` to an S3 URL with the following format: ``` s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT] @@ -539,13 +538,13 @@ When `endpoint` is specified, path-style addressing is enabled automatically (us === "/etc/ntfy/server.yml (AWS S3)" ``` yaml base-url: "https://ntfy.sh" - attachment-s3-url: "s3://AKID:SECRET@my-bucket/attachments?region=us-east-1" + attachment-cache-dir: "s3://AKID:SECRET@my-bucket/attachments?region=us-east-1" ``` === "/etc/ntfy/server.yml (MinIO/custom endpoint)" ``` yaml base-url: "https://ntfy.sh" - attachment-s3-url: "s3://AKID:SECRET@my-bucket/attachments?region=us-east-1&endpoint=https://s3.example.com" + attachment-cache-dir: "s3://AKID:SECRET@my-bucket/attachments?region=us-east-1&endpoint=https://s3.example.com" ``` Please also refer to the [rate limiting](#rate-limiting) settings below, specifically `visitor-attachment-total-size-limit` @@ -2143,8 +2142,7 @@ variable before running the `ntfy` command (e.g. `export NTFY_LISTEN_HTTP=:80`). | `behind-proxy` | `NTFY_BEHIND_PROXY` | *bool* | false | If set, use forwarded header (e.g. X-Forwarded-For, X-Client-IP) to determine visitor IP address (for rate limiting) | | `proxy-forwarded-header` | `NTFY_PROXY_FORWARDED_HEADER` | *string* | `X-Forwarded-For` | Use specified header to determine visitor IP address (for rate limiting) | | `proxy-trusted-hosts` | `NTFY_PROXY_TRUSTED_HOSTS` | *comma-separated host/IP/CIDR list* | - | Comma-separated list of trusted IP addresses, hosts, or CIDRs to remove from forwarded header | -| `attachment-cache-dir` | `NTFY_ATTACHMENT_CACHE_DIR` | *directory* | - | Cache directory for attached files. Mutually exclusive with `attachment-s3-url`. | -| `attachment-s3-url` | `NTFY_ATTACHMENT_S3_URL` | *URL* | - | S3 URL for attachment storage (format: `s3://KEY:SECRET@BUCKET[/PREFIX]?region=REGION`). Mutually exclusive with `attachment-cache-dir`. | +| `attachment-cache-dir` | `NTFY_ATTACHMENT_CACHE_DIR` | *directory or S3 URL* | - | Cache directory for attached files, or S3 URL for object storage (format: `s3://KEY:SECRET@BUCKET[/PREFIX]?region=REGION`). | | `attachment-total-size-limit` | `NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT` | *size* | 5G | Limit of the on-disk attachment cache directory. If the limits is exceeded, new attachments will be rejected. | | `attachment-file-size-limit` | `NTFY_ATTACHMENT_FILE_SIZE_LIMIT` | *size* | 15M | Per-file attachment size limit (e.g. 300k, 2M, 100M). Larger attachment will be rejected. | | `attachment-expiry-duration` | `NTFY_ATTACHMENT_EXPIRY_DURATION` | *duration* | 3h | Duration after which uploaded attachments will be deleted (e.g. 3h, 20h). Strongly affects `visitor-attachment-total-size-limit`. | @@ -2246,8 +2244,7 @@ OPTIONS: --auth-file value, --auth_file value, -H value auth database file used for access control [$NTFY_AUTH_FILE] --auth-startup-queries value, --auth_startup_queries value queries run when the auth database is initialized [$NTFY_AUTH_STARTUP_QUERIES] --auth-default-access value, --auth_default_access value, -p value default permissions if no matching entries in the auth database are found (default: "read-write") [$NTFY_AUTH_DEFAULT_ACCESS] - --attachment-cache-dir value, --attachment_cache_dir value cache directory for attached files [$NTFY_ATTACHMENT_CACHE_DIR] - --attachment-s3-url value, --attachment_s3_url value S3 URL for attachment storage (s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION) [$NTFY_ATTACHMENT_S3_URL] + --attachment-cache-dir value, --attachment_cache_dir value cache directory for attached files, or S3 URL (s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION) [$NTFY_ATTACHMENT_CACHE_DIR] --attachment-total-size-limit value, --attachment_total_size_limit value, -A value limit of the on-disk attachment cache (default: "5G") [$NTFY_ATTACHMENT_TOTAL_SIZE_LIMIT] --attachment-file-size-limit value, --attachment_file_size_limit value, -Y value per-file attachment size limit (e.g. 300k, 2M, 100M) (default: "15M") [$NTFY_ATTACHMENT_FILE_SIZE_LIMIT] --attachment-expiry-duration value, --attachment_expiry_duration value, -X value duration after which uploaded attachments will be deleted (e.g. 3h, 20h) (default: "3h") [$NTFY_ATTACHMENT_EXPIRY_DURATION] diff --git a/docs/releases.md b/docs/releases.md index dbf9bd41..b16608c6 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -1802,7 +1802,7 @@ and the [ntfy Android app](https://github.com/binwiederhier/ntfy-android/release **Features:** -* Add S3-compatible object storage as an alternative attachment backend via `attachment-s3-url` config option +* Add S3-compatible object storage as an alternative attachment backend via `attachment-cache-dir` config option **Bug fixes + maintenance:** diff --git a/s3/client.go b/s3/client.go index 5ec8caf6..41d940f3 100644 --- a/s3/client.go +++ b/s3/client.go @@ -232,6 +232,77 @@ func (c *Client) ListAllObjects(ctx context.Context) ([]Object, error) { return nil, fmt.Errorf("s3: ListAllObjects exceeded %d pages", maxPages) } +// ListMultipartUploads returns in-progress multipart uploads for the client's prefix. +// It paginates automatically, stopping after 10,000 pages as a safety valve. +func (c *Client) ListMultipartUploads(ctx context.Context) ([]MultipartUpload, error) { + var all []MultipartUpload + var keyMarker, uploadIDMarker string + for page := 0; page < maxPages; page++ { + query := url.Values{"uploads": {""}} + if prefix := c.prefixForList(); prefix != "" { + query.Set("prefix", prefix) + } + if keyMarker != "" { + query.Set("key-marker", keyMarker) + query.Set("upload-id-marker", uploadIDMarker) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.bucketURL()+"?"+query.Encode(), nil) + if err != nil { + return nil, fmt.Errorf("s3: ListMultipartUploads request: %w", err) + } + c.signV4(req, emptyPayloadHash) + resp, err := c.httpClient().Do(req) + if err != nil { + return nil, fmt.Errorf("s3: ListMultipartUploads: %w", err) + } + respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes)) + resp.Body.Close() + if err != nil { + return nil, fmt.Errorf("s3: ListMultipartUploads read: %w", err) + } + if !isHTTPSuccess(resp) { + return nil, parseErrorFromBytes(resp.StatusCode, respBody) + } + var result listMultipartUploadsResult + if err := xml.Unmarshal(respBody, &result); err != nil { + return nil, fmt.Errorf("s3: ListMultipartUploads XML: %w", err) + } + for _, u := range result.Uploads { + var initiated time.Time + if u.Initiated != "" { + initiated, _ = time.Parse(time.RFC3339, u.Initiated) + } + all = append(all, MultipartUpload{ + Key: u.Key, + UploadID: u.UploadID, + Initiated: initiated, + }) + } + if !result.IsTruncated { + return all, nil + } + keyMarker = result.NextKeyMarker + uploadIDMarker = result.NextUploadIDMarker + } + return nil, fmt.Errorf("s3: ListMultipartUploads exceeded %d pages", maxPages) +} + +// AbortIncompleteUploads lists all in-progress multipart uploads and aborts those initiated +// before the given cutoff time. This cleans up orphaned upload parts from interrupted uploads. +func (c *Client) AbortIncompleteUploads(ctx context.Context, cutoff time.Time) error { + uploads, err := c.ListMultipartUploads(ctx) + if err != nil { + return err + } + for _, u := range uploads { + if !u.Initiated.IsZero() && u.Initiated.Before(cutoff) { + log.Tag(tagS3Client).Debug("DeleteIncomplete key=%s uploadId=%s initiated=%s", u.Key, u.UploadID, u.Initiated) + c.abortMultipartUpload(ctx, u.Key, u.UploadID) + } + } + return nil +} + // putObject uploads a body with known size using a simple PUT with UNSIGNED-PAYLOAD. func (c *Client) putObject(ctx context.Context, key string, body io.Reader, size int64) error { fullKey := c.objectKey(key) diff --git a/s3/types.go b/s3/types.go index 65615fcd..f78c4a8b 100644 --- a/s3/types.go +++ b/s3/types.go @@ -69,6 +69,27 @@ type deleteError struct { Message string `xml:"Message"` } +// MultipartUpload represents an in-progress multipart upload returned by ListMultipartUploads. +type MultipartUpload struct { + Key string + UploadID string + Initiated time.Time +} + +// listMultipartUploadsResult is the XML response from S3 ListMultipartUploads +type listMultipartUploadsResult struct { + Uploads []listUpload `xml:"Upload"` + IsTruncated bool `xml:"IsTruncated"` + NextKeyMarker string `xml:"NextKeyMarker"` + NextUploadIDMarker string `xml:"NextUploadIdMarker"` +} + +type listUpload struct { + Key string `xml:"Key"` + UploadID string `xml:"UploadId"` + Initiated string `xml:"Initiated"` +} + // initiateMultipartUploadResult is the XML response from S3 InitiateMultipartUpload type initiateMultipartUploadResult struct { UploadID string `xml:"UploadId"` diff --git a/server/config.go b/server/config.go index 97f72a1c..8ead312c 100644 --- a/server/config.go +++ b/server/config.go @@ -112,7 +112,6 @@ type Config struct { AuthBcryptCost int AuthStatsQueueWriterInterval time.Duration AttachmentCacheDir string - AttachmentS3URL string AttachmentTotalSizeLimit int64 AttachmentFileSizeLimit int64 AttachmentExpiryDuration time.Duration diff --git a/server/server.go b/server/server.go index b43b71ef..99a61906 100644 --- a/server/server.go +++ b/server/server.go @@ -301,13 +301,13 @@ func createMessageCache(conf *Config, pool *db.DB) (*message.Cache, error) { } func createAttachmentStore(conf *Config, messageCache *message.Cache) (*attachment.Store, error) { - idProvider := func() ([]string, error) { + attachmentIDs := func() ([]string, error) { return messageCache.AttachmentIDs() } - if conf.AttachmentS3URL != "" { - return attachment.NewS3Store(conf.AttachmentS3URL, conf.AttachmentTotalSizeLimit, idProvider) + if strings.HasPrefix(conf.AttachmentCacheDir, "s3://") { + return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, attachmentIDs) } else if conf.AttachmentCacheDir != "" { - return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, idProvider) + return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, attachmentIDs) } return nil, nil } diff --git a/server/server.yml b/server/server.yml index e6f7afee..9dc92968 100644 --- a/server/server.yml +++ b/server/server.yml @@ -153,13 +153,13 @@ # If enabled, clients can attach files to notifications as attachments. Minimum settings to enable attachments # are "attachment-cache-dir" and "base-url". # -# - attachment-cache-dir is the cache directory for attached files +# - attachment-cache-dir is the cache directory for attached files, or an S3 URL for object storage +# e.g. /var/cache/ntfy/attachments, or s3://ACCESS_KEY:SECRET_KEY@bucket/prefix?region=us-east-1&endpoint=https://... # - attachment-total-size-limit is the limit of the on-disk attachment cache directory (total size) # - attachment-file-size-limit is the per-file attachment size limit (e.g. 300k, 2M, 100M) # - attachment-expiry-duration is the duration after which uploaded attachments will be deleted (e.g. 3h, 20h) # # attachment-cache-dir: -# attachment-s3-url: "s3://ACCESS_KEY:SECRET_KEY@bucket/prefix?region=us-east-1" # attachment-total-size-limit: "5G" # attachment-file-size-limit: "15M" # attachment-expiry-duration: "3h"