ntfy-server/attachment/store.go

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

235 lines
7.4 KiB
Go
Raw Normal View History

2026-03-14 21:10:46 -04:00
package attachment
import (
"errors"
"fmt"
"io"
2026-03-17 20:53:41 -04:00
"sync"
"time"
2026-03-14 21:10:46 -04:00
2026-03-17 20:53:41 -04:00
"heckel.io/ntfy/v2/log"
2026-03-14 21:10:46 -04:00
"heckel.io/ntfy/v2/model"
2026-03-17 20:53:41 -04:00
"heckel.io/ntfy/v2/s3"
2026-03-14 21:10:46 -04:00
"heckel.io/ntfy/v2/util"
)
2026-03-17 20:53:41 -04:00
const (
2026-03-21 16:29:58 -04:00
tagStore = "attachment_store"
2026-03-17 20:53:41 -04:00
syncInterval = 15 * time.Minute // How often to run the background sync loop
orphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads
)
2026-03-23 12:54:13 -04:00
var errInvalidFileID = errors.New("invalid file ID")
2026-03-17 16:25:45 -04:00
2026-03-17 20:53:41 -04:00
// Store manages attachment storage with shared logic for size tracking, limiting,
// ID validation, and background sync to reconcile storage with the database.
type Store struct {
2026-03-23 12:44:40 -04:00
backend backend
limit int64 // Defined limit of the store in bytes
size int64 // Current size of the store in bytes
sizes map[string]int64 // File ID -> size, for subtracting on Remove
attachmentsWithSizes func() (map[string]int64, error) // Returns file ID -> size for active attachments
closeChan chan struct{}
mu sync.RWMutex // Protects size and sizes
2026-03-17 20:53:41 -04:00
}
// NewFileStore creates a new file-system backed attachment cache
2026-03-23 12:44:40 -04:00
func NewFileStore(dir string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
b, err := newFileBackend(dir)
2026-03-17 20:53:41 -04:00
if err != nil {
return nil, err
}
2026-03-23 12:44:40 -04:00
return newStore(b, totalSizeLimit, attachmentsWithSizes)
2026-03-17 20:53:41 -04:00
}
// NewS3Store creates a new S3-backed attachment cache. The s3URL must be in the format:
//
// s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT]
2026-03-23 12:44:40 -04:00
func NewS3Store(s3URL string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
2026-03-17 20:53:41 -04:00
config, err := s3.ParseURL(s3URL)
if err != nil {
return nil, err
}
2026-03-23 12:44:40 -04:00
return newStore(newS3Backend(s3.New(config)), totalSizeLimit, attachmentsWithSizes)
2026-03-17 20:53:41 -04:00
}
2026-03-23 12:44:40 -04:00
func newStore(backend backend, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
2026-03-17 20:53:41 -04:00
c := &Store{
2026-03-23 12:44:40 -04:00
backend: backend,
limit: totalSizeLimit,
sizes: make(map[string]int64),
attachmentsWithSizes: attachmentsWithSizes,
closeChan: make(chan struct{}),
}
// Hydrate sizes from the database immediately so that Size()/Remaining()/Remove()
// are accurate from the start, without waiting for the first sync() call.
if attachmentsWithSizes != nil {
attachments, err := attachmentsWithSizes()
if err != nil {
return nil, fmt.Errorf("attachment store: failed to load existing attachments: %w", err)
}
for id, size := range attachments {
c.sizes[id] = size
c.size += size
}
2026-03-17 20:53:41 -04:00
go c.syncLoop()
}
return c, nil
}
// Write stores an attachment file. The id is validated, and the write is subject to
2026-03-21 21:14:49 -04:00
// the total size limit and any additional limiters. The untrustedLength is a hint
// from the client's Content-Length header; backends may use it to optimize uploads (e.g.
// streaming directly to S3 without buffering).
func (c *Store) Write(id string, reader io.Reader, untrustedLength int64, limiters ...util.Limiter) (int64, error) {
2026-03-23 12:54:13 -04:00
if !model.ValidMessageID(id) {
2026-03-17 20:53:41 -04:00
return 0, errInvalidFileID
}
log.Tag(tagStore).Field("message_id", id).Debug("Writing attachment")
limiters = append(limiters, util.NewFixedLimiter(c.Remaining()))
2026-03-21 21:14:49 -04:00
countingReader := util.NewCountingReader(reader)
limitReader := util.NewLimitReader(countingReader, limiters...)
if err := c.backend.Put(id, limitReader, untrustedLength); err != nil {
2026-03-17 20:53:41 -04:00
c.backend.Delete(id) //nolint:errcheck
return 0, err
}
2026-03-21 21:14:49 -04:00
size := countingReader.Total()
2026-03-17 20:53:41 -04:00
c.mu.Lock()
c.size += size
c.sizes[id] = size
2026-03-17 20:53:41 -04:00
c.mu.Unlock()
return size, nil
}
// Read retrieves an attachment file by ID
func (c *Store) Read(id string) (io.ReadCloser, int64, error) {
2026-03-23 12:54:13 -04:00
if !model.ValidMessageID(id) {
2026-03-17 20:53:41 -04:00
return nil, 0, errInvalidFileID
}
return c.backend.Get(id)
}
// Remove deletes attachment files by ID and subtracts their known sizes from
// the total. Sizes for objects not tracked (e.g. written before this process
// started and before the first sync) are corrected by the next sync() call.
2026-03-17 20:53:41 -04:00
func (c *Store) Remove(ids ...string) error {
for _, id := range ids {
2026-03-23 12:54:13 -04:00
if !model.ValidMessageID(id) {
2026-03-17 20:53:41 -04:00
return errInvalidFileID
}
}
2026-03-21 16:27:41 -04:00
// Remove from backend
2026-03-22 08:38:41 -04:00
for _, id := range ids {
log.Tag(tagStore).Field("message_id", id).Debug("Removing attachment")
}
if err := c.backend.Delete(ids...); err != nil {
return err
}
2026-03-21 16:27:41 -04:00
// Update total cache size
c.mu.Lock()
for _, id := range ids {
if size, ok := c.sizes[id]; ok {
c.size -= size
delete(c.sizes, id)
}
}
if c.size < 0 {
c.size = 0
}
c.mu.Unlock()
return nil
2026-03-17 20:53:41 -04:00
}
// sync reconciles the backend storage with the database. It lists all objects,
// deletes orphans (not in the valid ID set and older than 1 hour), and recomputes
2026-03-23 12:44:40 -04:00
// the total size from the existing attachments in the database.
2026-03-17 20:53:41 -04:00
func (c *Store) sync() error {
2026-03-23 12:44:40 -04:00
if c.attachmentsWithSizes == nil {
return nil
}
2026-03-23 12:44:40 -04:00
attachmentsWithSizes, err := c.attachmentsWithSizes()
2026-03-17 20:53:41 -04:00
if err != nil {
2026-03-23 12:44:40 -04:00
return fmt.Errorf("attachment sync: failed to get existing attachments: %w", err)
2026-03-17 20:53:41 -04:00
}
remoteObjects, err := c.backend.List()
if err != nil {
return fmt.Errorf("attachment sync: failed to list objects: %w", err)
}
// Calculate total cache size and collect orphaned attachments, excluding objects younger
// than the grace period to account for races, and skipping objects with invalid IDs.
cutoff := time.Now().Add(-orphanGracePeriod)
var orphanIDs []string
2026-03-23 12:44:40 -04:00
var count, totalSize int64
sizes := make(map[string]int64, len(remoteObjects))
2026-03-17 20:53:41 -04:00
for _, obj := range remoteObjects {
2026-03-23 12:54:13 -04:00
if !model.ValidMessageID(obj.ID) {
2026-03-17 20:53:41 -04:00
continue
}
2026-03-23 12:44:40 -04:00
if _, ok := attachmentsWithSizes[obj.ID]; !ok && obj.LastModified.Before(cutoff) {
2026-03-17 20:53:41 -04:00
orphanIDs = append(orphanIDs, obj.ID)
} else {
2026-03-22 20:52:25 -04:00
count++
2026-03-23 12:44:40 -04:00
totalSize += attachmentsWithSizes[obj.ID]
sizes[obj.ID] = attachmentsWithSizes[obj.ID]
2026-03-17 20:53:41 -04:00
}
}
2026-03-23 12:44:40 -04:00
log.Tag(tagStore).Debug("Attachment store updated: %d attachment(s), %s", count, util.FormatSizeHuman(totalSize))
2026-03-17 20:53:41 -04:00
c.mu.Lock()
2026-03-23 12:44:40 -04:00
c.size = totalSize
c.sizes = sizes
2026-03-17 20:53:41 -04:00
c.mu.Unlock()
// Delete orphaned attachments
if len(orphanIDs) > 0 {
2026-03-19 22:42:38 -04:00
log.Tag(tagStore).Debug("Deleting %d orphaned attachment(s)", len(orphanIDs))
2026-03-17 20:53:41 -04:00
if err := c.backend.Delete(orphanIDs...); err != nil {
return fmt.Errorf("attachment sync: failed to delete orphaned objects: %w", err)
}
}
// Clean up incomplete uploads (S3 only)
if err := c.backend.DeleteIncomplete(cutoff); err != nil {
2026-03-19 22:42:38 -04:00
log.Tag(tagStore).Err(err).Warn("Failed to abort incomplete uploads from attachment cache")
}
2026-03-17 20:53:41 -04:00
return nil
}
// Size returns the current total size of all attachments
func (c *Store) Size() int64 {
2026-03-22 16:20:45 -04:00
c.mu.RLock()
defer c.mu.RUnlock()
return c.size
2026-03-17 20:53:41 -04:00
}
// Remaining returns the remaining capacity for attachments
func (c *Store) Remaining() int64 {
2026-03-22 16:20:45 -04:00
c.mu.RLock()
defer c.mu.RUnlock()
remaining := c.limit - c.size
2026-03-17 20:53:41 -04:00
if remaining < 0 {
return 0
}
return remaining
}
// Close stops the background sync goroutine
func (c *Store) Close() {
close(c.closeChan)
}
func (c *Store) syncLoop() {
if err := c.sync(); err != nil {
log.Tag(tagStore).Err(err).Warn("Attachment sync failed")
}
ticker := time.NewTicker(syncInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.sync(); err != nil {
log.Tag(tagStore).Err(err).Warn("Attachment sync failed")
}
case <-c.closeChan:
return
}
}
2026-03-14 21:10:46 -04:00
}