diff --git a/attachment/store.go b/attachment/store.go index 70fb55c0..14b39f81 100644 --- a/attachment/store.go +++ b/attachment/store.go @@ -14,9 +14,8 @@ import ( ) const ( - tagStore = "attachment_store" - syncInterval = 15 * time.Minute // How often to run the background sync loop - orphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads + tagStore = "attachment_store" + syncInterval = 15 * time.Minute // How often to run the background sync loop ) var errInvalidFileID = errors.New("invalid file ID") @@ -29,36 +28,38 @@ type Store struct { size int64 // Current size of the store in bytes sizes map[string]int64 // File ID -> size, for subtracting on Remove attachmentsWithSizes func() (map[string]int64, error) // Returns file ID -> size for active attachments + orphanGracePeriod time.Duration // Don't delete orphaned objects younger than this closeChan chan struct{} mu sync.RWMutex // Protects size and sizes } // NewFileStore creates a new file-system backed attachment cache -func NewFileStore(dir string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { +func NewFileStore(dir string, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { b, err := newFileBackend(dir) if err != nil { return nil, err } - return newStore(b, totalSizeLimit, attachmentsWithSizes) + return newStore(b, totalSizeLimit, orphanGracePeriod, attachmentsWithSizes) } // NewS3Store creates a new S3-backed attachment cache. The s3URL must be in the format: // // s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT] -func NewS3Store(s3URL string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { +func NewS3Store(s3URL string, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { config, err := s3.ParseURL(s3URL) if err != nil { return nil, err } - return newStore(newS3Backend(s3.New(config)), totalSizeLimit, attachmentsWithSizes) + return newStore(newS3Backend(s3.New(config)), totalSizeLimit, orphanGracePeriod, attachmentsWithSizes) } -func newStore(backend backend, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { +func newStore(backend backend, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { c := &Store{ backend: backend, limit: totalSizeLimit, sizes: make(map[string]int64), attachmentsWithSizes: attachmentsWithSizes, + orphanGracePeriod: orphanGracePeriod, closeChan: make(chan struct{}), } // Hydrate sizes from the database immediately so that Size()/Remaining()/Remove() @@ -140,9 +141,14 @@ func (c *Store) Remove(ids ...string) error { return nil } +// Sync triggers an immediate reconciliation of storage with the database. +func (c *Store) Sync() error { + return c.sync() +} + // sync reconciles the backend storage with the database. It lists all objects, -// deletes orphans (not in the valid ID set and older than 1 hour), and recomputes -// the total size from the existing attachments in the database. +// deletes orphans (not in the valid ID set and older than the grace period), and +// recomputes the total size from the existing attachments in the database. func (c *Store) sync() error { if c.attachmentsWithSizes == nil { return nil @@ -157,7 +163,7 @@ func (c *Store) sync() error { } // Calculate total cache size and collect orphaned attachments, excluding objects younger // than the grace period to account for races, and skipping objects with invalid IDs. - cutoff := time.Now().Add(-orphanGracePeriod) + cutoff := time.Now().Add(-c.orphanGracePeriod) var orphanIDs []string var count, totalSize int64 sizes := make(map[string]int64, len(remoteObjects)) diff --git a/attachment/store_file_test.go b/attachment/store_file_test.go index d0b6e135..0f7495b4 100644 --- a/attachment/store_file_test.go +++ b/attachment/store_file_test.go @@ -2,6 +2,7 @@ package attachment import ( "testing" + "time" "github.com/stretchr/testify/require" ) @@ -9,7 +10,7 @@ import ( func newTestFileStore(t *testing.T, totalSizeLimit int64) (dir string, cache *Store) { t.Helper() dir = t.TempDir() - cache, err := NewFileStore(dir, totalSizeLimit, nil) + cache, err := NewFileStore(dir, totalSizeLimit, time.Hour, nil) require.Nil(t, err) t.Cleanup(func() { cache.Close() }) return dir, cache diff --git a/attachment/store_s3_test.go b/attachment/store_s3_test.go index 6615f4e9..22c1d6bf 100644 --- a/attachment/store_s3_test.go +++ b/attachment/store_s3_test.go @@ -24,7 +24,7 @@ func TestS3Store_WriteWithPrefix(t *testing.T) { client := s3.New(cfg) deleteAllObjects(t, client) backend := newS3Backend(client) - cache, err := newStore(backend, 10*1024, nil) + cache, err := newStore(backend, 10*1024, time.Hour, nil) require.Nil(t, err) t.Cleanup(func() { deleteAllObjects(t, client) @@ -62,7 +62,7 @@ func newTestRealS3Store(t *testing.T, totalSizeLimit int64) (*Store, *modTimeOve inner := newS3Backend(client) wrapper := &modTimeOverrideBackend{backend: inner, modTimes: make(map[string]time.Time)} deleteAllObjects(t, client) - store, err := newStore(wrapper, totalSizeLimit, nil) + store, err := newStore(wrapper, totalSizeLimit, time.Hour, nil) require.Nil(t, err) t.Cleanup(func() { deleteAllObjects(t, client) diff --git a/docs/releases.md b/docs/releases.md index 101ff949..70783a9f 100644 --- a/docs/releases.md +++ b/docs/releases.md @@ -1815,7 +1815,7 @@ for details. **Features:** -* Add S3-compatible object storage as an alternative [attachment store](config.md#attachments) via `attachment-cache-dir` config option +* Add S3-compatible object storage as an alternative [attachment store](config.md#attachments) via `attachment-cache-dir` config option ([#1656](https://github.com/binwiederhier/ntfy/pull/1656)/[#1672](https://github.com/binwiederhier/ntfy/pull/1672)) **Bug fixes + maintenance:** diff --git a/message/cache.go b/message/cache.go index 76ba7926..90fbf51d 100644 --- a/message/cache.go +++ b/message/cache.go @@ -24,7 +24,6 @@ var errNoRows = errors.New("no rows found") // queries holds the database-specific SQL queries type queries struct { insertMessage string - deleteMessage string selectScheduledMessageIDsBySeqID string deleteScheduledBySequenceID string updateMessagesForTopicExpiry string @@ -35,12 +34,11 @@ type queries struct { selectMessagesSinceIDScheduled string selectMessagesLatest string selectMessagesDue string - selectMessagesExpired string + deleteExpiredMessages string updateMessagePublished string selectMessagesCount string selectTopics string - updateAttachmentDeleted string - selectAttachmentsExpired string + markExpiredAttachmentsDeleted string selectAttachmentsSizeBySender string selectAttachmentsSizeByUserID string selectAttachmentsWithSizes string @@ -246,14 +244,16 @@ func (c *Cache) MessagesDue() ([]*model.Message, error) { return readMessages(rows) } -// MessagesExpired returns a list of IDs for messages that have expired (should be deleted) -func (c *Cache) MessagesExpired() ([]string, error) { - rows, err := c.db.Query(c.queries.selectMessagesExpired, time.Now().Unix()) +// DeleteExpiredMessages deletes up to `limit` expired messages in a single query +// and returns the number of deleted rows. +func (c *Cache) DeleteExpiredMessages(limit int) (int64, error) { + c.maybeLock() + defer c.maybeUnlock() + result, err := c.db.Exec(c.queries.deleteExpiredMessages, time.Now().Unix(), limit) if err != nil { - return nil, err + return 0, err } - defer rows.Close() - return readStrings(rows) + return result.RowsAffected() } // Message returns the message with the given ID, or ErrMessageNotFound if not found @@ -262,10 +262,10 @@ func (c *Cache) Message(id string) (*model.Message, error) { if err != nil { return nil, err } + defer rows.Close() if !rows.Next() { return nil, model.ErrMessageNotFound } - defer rows.Close() return readMessage(rows) } @@ -312,20 +312,6 @@ func (c *Cache) Topics() ([]string, error) { return readStrings(rows) } -// DeleteMessages deletes the messages with the given IDs -func (c *Cache) DeleteMessages(ids ...string) error { - c.maybeLock() - defer c.maybeUnlock() - return db.ExecTx(c.db, func(tx *sql.Tx) error { - for _, id := range ids { - if _, err := tx.Exec(c.queries.deleteMessage, id); err != nil { - return err - } - } - return nil - }) -} - // DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID. // It returns the message IDs of the deleted messages, which can be used to clean up attachment files. func (c *Cache) DeleteScheduledBySequenceID(topic, sequenceID string) ([]string, error) { @@ -363,28 +349,16 @@ func (c *Cache) ExpireMessages(topics ...string) error { }) } -// AttachmentsExpired returns message IDs with expired attachments that have not been deleted -func (c *Cache) AttachmentsExpired() ([]string, error) { - rows, err := c.db.Query(c.queries.selectAttachmentsExpired, time.Now().Unix()) - if err != nil { - return nil, err - } - defer rows.Close() - return readStrings(rows) -} - -// MarkAttachmentsDeleted marks the attachments for the given message IDs as deleted -func (c *Cache) MarkAttachmentsDeleted(ids ...string) error { +// MarkExpiredAttachmentsDeleted marks up to `limit` expired attachments as deleted in a single +// query and returns the number of updated rows. +func (c *Cache) MarkExpiredAttachmentsDeleted(limit int) (int64, error) { c.maybeLock() defer c.maybeUnlock() - return db.ExecTx(c.db, func(tx *sql.Tx) error { - for _, id := range ids { - if _, err := tx.Exec(c.queries.updateAttachmentDeleted, id); err != nil { - return err - } - } - return nil - }) + result, err := c.db.Exec(c.queries.markExpiredAttachmentsDeleted, time.Now().Unix(), limit) + if err != nil { + return 0, err + } + return result.RowsAffected() } // AttachmentBytesUsedBySender returns the total size of active attachments sent by the given sender diff --git a/message/cache_postgres.go b/message/cache_postgres.go index f0a32036..4d7c3f93 100644 --- a/message/cache_postgres.go +++ b/message/cache_postgres.go @@ -12,7 +12,6 @@ const ( INSERT INTO message (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user_id, content_type, encoding, published) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) ` - postgresDeleteMessageQuery = `DELETE FROM message WHERE mid = $1` postgresSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE` postgresDeleteScheduledBySequenceIDQuery = `DELETE FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE` postgresUpdateMessagesForTopicExpiryQuery = `UPDATE message SET expires = $1 WHERE topic = $2` @@ -61,13 +60,12 @@ const ( WHERE time <= $1 AND published = FALSE ORDER BY time, id ` - postgresSelectMessagesExpiredQuery = `SELECT mid FROM message WHERE expires <= $1 AND published = TRUE` postgresUpdateMessagePublishedQuery = `UPDATE message SET published = TRUE WHERE mid = $1` postgresSelectMessagesCountQuery = `SELECT COUNT(*) FROM message` postgresSelectTopicsQuery = `SELECT topic FROM message GROUP BY topic` - postgresUpdateAttachmentDeletedQuery = `UPDATE message SET attachment_deleted = TRUE WHERE mid = $1` - postgresSelectAttachmentsExpiredQuery = `SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE` + postgresDeleteExpiredMessagesQuery = `DELETE FROM message WHERE mid IN (SELECT mid FROM message WHERE expires <= $1 AND published = TRUE LIMIT $2)` + postgresMarkExpiredAttachmentsDeletedQuery = `UPDATE message SET attachment_deleted = TRUE WHERE mid IN (SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE LIMIT $2)` postgresSelectAttachmentsSizeBySenderQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = '' AND sender = $1 AND attachment_expires >= $2` postgresSelectAttachmentsSizeByUserIDQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = $1 AND attachment_expires >= $2` postgresSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM message WHERE attachment_expires > $1 AND attachment_deleted = FALSE` @@ -79,7 +77,6 @@ const ( var postgresQueries = queries{ insertMessage: postgresInsertMessageQuery, - deleteMessage: postgresDeleteMessageQuery, selectScheduledMessageIDsBySeqID: postgresSelectScheduledMessageIDsBySeqIDQuery, deleteScheduledBySequenceID: postgresDeleteScheduledBySequenceIDQuery, updateMessagesForTopicExpiry: postgresUpdateMessagesForTopicExpiryQuery, @@ -90,12 +87,11 @@ var postgresQueries = queries{ selectMessagesSinceIDScheduled: postgresSelectMessagesSinceIDIncludeScheduledQuery, selectMessagesLatest: postgresSelectMessagesLatestQuery, selectMessagesDue: postgresSelectMessagesDueQuery, - selectMessagesExpired: postgresSelectMessagesExpiredQuery, + deleteExpiredMessages: postgresDeleteExpiredMessagesQuery, updateMessagePublished: postgresUpdateMessagePublishedQuery, selectMessagesCount: postgresSelectMessagesCountQuery, selectTopics: postgresSelectTopicsQuery, - updateAttachmentDeleted: postgresUpdateAttachmentDeletedQuery, - selectAttachmentsExpired: postgresSelectAttachmentsExpiredQuery, + markExpiredAttachmentsDeleted: postgresMarkExpiredAttachmentsDeletedQuery, selectAttachmentsSizeBySender: postgresSelectAttachmentsSizeBySenderQuery, selectAttachmentsSizeByUserID: postgresSelectAttachmentsSizeByUserIDQuery, selectAttachmentsWithSizes: postgresSelectAttachmentsWithSizesQuery, diff --git a/message/cache_postgres_schema.go b/message/cache_postgres_schema.go index 4a539d0e..994df9b0 100644 --- a/message/cache_postgres_schema.go +++ b/message/cache_postgres_schema.go @@ -5,6 +5,7 @@ import ( "fmt" "heckel.io/ntfy/v2/db" + "heckel.io/ntfy/v2/log" ) // Initial PostgreSQL schema @@ -41,6 +42,7 @@ const ( CREATE INDEX IF NOT EXISTS idx_message_sequence_id ON message (sequence_id); CREATE INDEX IF NOT EXISTS idx_message_topic_published_time ON message (topic, published, time, id); CREATE INDEX IF NOT EXISTS idx_message_published_expires ON message (published, expires); + CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE; CREATE INDEX IF NOT EXISTS idx_message_sender_attachment_expires ON message (sender, attachment_expires) WHERE user_id = ''; CREATE INDEX IF NOT EXISTS idx_message_user_id_attachment_expires ON message (user_id, attachment_expires); CREATE TABLE IF NOT EXISTS message_stats ( @@ -57,21 +59,57 @@ const ( // PostgreSQL schema management queries const ( - postgresCurrentSchemaVersion = 14 + postgresCurrentSchemaVersion = 15 postgresInsertSchemaVersionQuery = `INSERT INTO schema_version (store, version) VALUES ('message', $1)` + postgresUpdateSchemaVersionQuery = `UPDATE schema_version SET version = $1 WHERE store = 'message'` postgresSelectSchemaVersionQuery = `SELECT version FROM schema_version WHERE store = 'message'` ) -func setupPostgres(db *sql.DB) error { +// PostgreSQL schema migrations +const ( + // 14 -> 15 + postgresMigrate14To15CreateIndexQuery = ` + CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires) WHERE attachment_deleted = FALSE; + ` +) + +var postgresMigrations = map[int]func(d *sql.DB) error{ + 14: postgresMigrateFrom14, +} + +func setupPostgres(d *sql.DB) error { var schemaVersion int - if err := db.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil { - return setupNewPostgresDB(db) + if err := d.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil { + return setupNewPostgresDB(d) + } else if schemaVersion == postgresCurrentSchemaVersion { + return nil } else if schemaVersion > postgresCurrentSchemaVersion { return fmt.Errorf("unexpected schema version: version %d is higher than current version %d", schemaVersion, postgresCurrentSchemaVersion) } + for i := schemaVersion; i < postgresCurrentSchemaVersion; i++ { + fn, ok := postgresMigrations[i] + if !ok { + return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1) + } else if err := fn(d); err != nil { + return err + } + } return nil } +func postgresMigrateFrom14(d *sql.DB) error { + log.Tag(tagMessageCache).Info("Migrating message cache database schema: from 14 to 15") + return db.ExecTx(d, func(tx *sql.Tx) error { + if _, err := tx.Exec(postgresMigrate14To15CreateIndexQuery); err != nil { + return err + } + if _, err := tx.Exec(postgresUpdateSchemaVersionQuery, 15); err != nil { + return err + } + return nil + }) +} + func setupNewPostgresDB(sqlDB *sql.DB) error { return db.ExecTx(sqlDB, func(tx *sql.Tx) error { if _, err := tx.Exec(postgresCreateTablesQuery); err != nil { diff --git a/message/cache_sqlite.go b/message/cache_sqlite.go index b39095e0..b9d7394f 100644 --- a/message/cache_sqlite.go +++ b/message/cache_sqlite.go @@ -18,7 +18,6 @@ const ( INSERT INTO messages (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` - sqliteDeleteMessageQuery = `DELETE FROM messages WHERE mid = ?` sqliteSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` sqliteDeleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` sqliteUpdateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` @@ -64,13 +63,12 @@ const ( WHERE time <= ? AND published = 0 ORDER BY time, id ` - sqliteSelectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1` sqliteUpdateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` sqliteSelectMessagesCountQuery = `SELECT COUNT(*) FROM messages` sqliteSelectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` - sqliteUpdateAttachmentDeletedQuery = `UPDATE messages SET attachment_deleted = 1 WHERE mid = ?` - sqliteSelectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0` + sqliteDeleteExpiredMessagesQuery = `DELETE FROM messages WHERE mid IN (SELECT mid FROM messages WHERE expires <= ? AND published = 1 LIMIT ?)` + sqliteMarkExpiredAttachmentsDeletedQuery = `UPDATE messages SET attachment_deleted = 1 WHERE mid IN (SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0 LIMIT ?)` sqliteSelectAttachmentsSizeBySenderQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = '' AND sender = ? AND attachment_expires >= ?` sqliteSelectAttachmentsSizeByUserIDQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = ? AND attachment_expires >= ?` sqliteSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM messages WHERE attachment_expires > ? AND attachment_deleted = 0` @@ -82,7 +80,6 @@ const ( var sqliteQueries = queries{ insertMessage: sqliteInsertMessageQuery, - deleteMessage: sqliteDeleteMessageQuery, selectScheduledMessageIDsBySeqID: sqliteSelectScheduledMessageIDsBySeqIDQuery, deleteScheduledBySequenceID: sqliteDeleteScheduledBySequenceIDQuery, updateMessagesForTopicExpiry: sqliteUpdateMessagesForTopicExpiryQuery, @@ -93,12 +90,11 @@ var sqliteQueries = queries{ selectMessagesSinceIDScheduled: sqliteSelectMessagesSinceIDIncludeScheduledQuery, selectMessagesLatest: sqliteSelectMessagesLatestQuery, selectMessagesDue: sqliteSelectMessagesDueQuery, - selectMessagesExpired: sqliteSelectMessagesExpiredQuery, + deleteExpiredMessages: sqliteDeleteExpiredMessagesQuery, updateMessagePublished: sqliteUpdateMessagePublishedQuery, selectMessagesCount: sqliteSelectMessagesCountQuery, selectTopics: sqliteSelectTopicsQuery, - updateAttachmentDeleted: sqliteUpdateAttachmentDeletedQuery, - selectAttachmentsExpired: sqliteSelectAttachmentsExpiredQuery, + markExpiredAttachmentsDeleted: sqliteMarkExpiredAttachmentsDeletedQuery, selectAttachmentsSizeBySender: sqliteSelectAttachmentsSizeBySenderQuery, selectAttachmentsSizeByUserID: sqliteSelectAttachmentsSizeByUserIDQuery, selectAttachmentsWithSizes: sqliteSelectAttachmentsWithSizesQuery, diff --git a/message/cache_sqlite_schema.go b/message/cache_sqlite_schema.go index 8c68bad8..b19bfca1 100644 --- a/message/cache_sqlite_schema.go +++ b/message/cache_sqlite_schema.go @@ -57,7 +57,7 @@ const ( // Schema version management for SQLite const ( - sqliteCurrentSchemaVersion = 14 + sqliteCurrentSchemaVersion = 15 sqliteCreateSchemaVersionTableQuery = ` CREATE TABLE IF NOT EXISTS schemaVersion ( id INT PRIMARY KEY, @@ -208,6 +208,7 @@ var ( 11: sqliteMigrateFrom11, 12: sqliteMigrateFrom12, 13: sqliteMigrateFrom13, + 14: sqliteMigrateFrom14, } ) @@ -451,3 +452,15 @@ func sqliteMigrateFrom13(sqlDB *sql.DB, _ time.Duration) error { return nil }) } + +// sqliteMigrateFrom14 is a no-op; the corresponding Postgres migration adds +// idx_message_attachment_expires, which SQLite already has from the initial schema. +func sqliteMigrateFrom14(sqlDB *sql.DB, _ time.Duration) error { + log.Tag(tagMessageCache).Info("Migrating cache database schema: from 14 to 15") + return db.ExecTx(sqlDB, func(tx *sql.Tx) error { + if _, err := tx.Exec(sqliteUpdateSchemaVersionQuery, 15); err != nil { + return err + } + return nil + }) +} diff --git a/message/cache_sqlite_test.go b/message/cache_sqlite_test.go index e69488e6..95ff7e48 100644 --- a/message/cache_sqlite_test.go +++ b/message/cache_sqlite_test.go @@ -209,7 +209,7 @@ func TestSqliteStore_Migration_From9(t *testing.T) { require.True(t, rows.Next()) var version int require.Nil(t, rows.Scan(&version)) - require.Equal(t, 14, version) + require.Equal(t, 15, version) require.Nil(t, rows.Close()) messages, err := s.Messages("mytopic", model.SinceAllMessages, false) @@ -287,6 +287,6 @@ func checkSqliteSchemaVersion(t *testing.T, filename string) { require.True(t, rows.Next()) var schemaVersion int require.Nil(t, rows.Scan(&schemaVersion)) - require.Equal(t, 14, schemaVersion) + require.Equal(t, 15, schemaVersion) require.Nil(t, rows.Close()) } diff --git a/message/cache_test.go b/message/cache_test.go index 0fddc88b..059a1f62 100644 --- a/message/cache_test.go +++ b/message/cache_test.go @@ -3,7 +3,6 @@ package message_test import ( "net/netip" "path/filepath" - "sort" "sync" "testing" "time" @@ -274,9 +273,9 @@ func TestStore_Prune(t *testing.T) { require.Nil(t, err) require.Equal(t, 3, count) - expiredMessageIDs, err := s.MessagesExpired() + deleted, err := s.DeleteExpiredMessages(10) require.Nil(t, err) - require.Nil(t, s.DeleteMessages(expiredMessageIDs...)) + require.Equal(t, int64(2), deleted) count, err = s.MessagesCount() require.Nil(t, err) @@ -414,10 +413,9 @@ func TestStore_AttachmentsExpired(t *testing.T) { } require.Nil(t, s.AddMessage(m)) - ids, err := s.AttachmentsExpired() + count, err := s.MarkExpiredAttachmentsDeleted(10) require.Nil(t, err) - require.Equal(t, 1, len(ids)) - require.Equal(t, "m4", ids[0]) + require.Equal(t, int64(1), count) }) } @@ -583,13 +581,9 @@ func TestStore_ExpireMessages(t *testing.T) { require.Nil(t, s.ExpireMessages("topic1")) // topic1 messages should now be expired (expires set to past) - expiredIDs, err := s.MessagesExpired() + deleted, err := s.DeleteExpiredMessages(100) require.Nil(t, err) - require.Equal(t, 2, len(expiredIDs)) - sort.Strings(expiredIDs) - expectedIDs := []string{m1.ID, m2.ID} - sort.Strings(expectedIDs) - require.Equal(t, expectedIDs, expiredIDs) + require.Equal(t, int64(2), deleted) // topic2 should be unaffected messages, err = s.Messages("topic2", model.SinceAllMessages, false) @@ -629,27 +623,15 @@ func TestStore_MarkAttachmentsDeleted(t *testing.T) { } require.Nil(t, s.AddMessage(m2)) - // Both should show as expired attachments needing cleanup - ids, err := s.AttachmentsExpired() + // Both should be marked as deleted in one batch + count, err := s.MarkExpiredAttachmentsDeleted(10) require.Nil(t, err) - require.Equal(t, 2, len(ids)) - - // Mark msg1's attachment as deleted (file cleaned up) - require.Nil(t, s.MarkAttachmentsDeleted("msg1")) - - // Now only msg2 should show as needing cleanup - ids, err = s.AttachmentsExpired() - require.Nil(t, err) - require.Equal(t, 1, len(ids)) - require.Equal(t, "msg2", ids[0]) - - // Mark msg2 too - require.Nil(t, s.MarkAttachmentsDeleted("msg2")) + require.Equal(t, int64(2), count) // No more expired attachments to clean up - ids, err = s.AttachmentsExpired() + count, err = s.MarkExpiredAttachmentsDeleted(10) require.Nil(t, err) - require.Equal(t, 0, len(ids)) + require.Equal(t, int64(0), count) // Messages themselves still exist messages, err := s.Messages("mytopic", model.SinceAllMessages, false) diff --git a/server/config.go b/server/config.go index 8ead312c..8497b18e 100644 --- a/server/config.go +++ b/server/config.go @@ -20,6 +20,7 @@ const ( DefaultCacheBatchTimeout = time.Duration(0) DefaultKeepaliveInterval = 45 * time.Second // Not too frequently to save battery (Android read timeout used to be 77s!) DefaultManagerInterval = time.Minute + DefaultManagerBatchSize = 30000 DefaultDelayedSenderInterval = 10 * time.Second DefaultMessageDelayMin = 10 * time.Second DefaultMessageDelayMax = 3 * 24 * time.Hour @@ -46,11 +47,13 @@ const ( // - total topic limit: max number of topics overall // - various attachment limits const ( - DefaultMessageSizeLimit = 4096 // Bytes; note that FCM/APNS have a limit of ~4 KB for the entire message - DefaultTotalTopicLimit = 15000 - DefaultAttachmentTotalSizeLimit = int64(5 * 1024 * 1024 * 1024) // 5 GB - DefaultAttachmentFileSizeLimit = int64(15 * 1024 * 1024) // 15 MB - DefaultAttachmentExpiryDuration = 3 * time.Hour + DefaultMessageSizeLimit = 4096 // Bytes; note that FCM/APNS have a limit of ~4 KB for the entire message + DefaultTotalTopicLimit = 15000 + DefaultAttachmentTotalSizeLimit = int64(5 * 1024 * 1024 * 1024) // 5 GB + DefaultAttachmentFileSizeLimit = int64(15 * 1024 * 1024) // 15 MB + DefaultAttachmentExpiryDuration = 3 * time.Hour + DefaultAttachmentOrphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads + ) // Defines all per-visitor limits @@ -115,9 +118,11 @@ type Config struct { AttachmentTotalSizeLimit int64 AttachmentFileSizeLimit int64 AttachmentExpiryDuration time.Duration + AttachmentOrphanGracePeriod time.Duration TemplateDir string // Directory to load named templates from KeepaliveInterval time.Duration ManagerInterval time.Duration + ManagerBatchSize int DisallowedTopics []string WebRoot string // empty to disable DelayedSenderInterval time.Duration @@ -217,9 +222,11 @@ func NewConfig() *Config { AttachmentTotalSizeLimit: DefaultAttachmentTotalSizeLimit, AttachmentFileSizeLimit: DefaultAttachmentFileSizeLimit, AttachmentExpiryDuration: DefaultAttachmentExpiryDuration, + AttachmentOrphanGracePeriod: DefaultAttachmentOrphanGracePeriod, TemplateDir: DefaultTemplateDir, KeepaliveInterval: DefaultKeepaliveInterval, ManagerInterval: DefaultManagerInterval, + ManagerBatchSize: DefaultManagerBatchSize, DisallowedTopics: DefaultDisallowedTopics, WebRoot: "/", DelayedSenderInterval: DefaultDelayedSenderInterval, diff --git a/server/server.go b/server/server.go index dc56d57f..71d08b01 100644 --- a/server/server.go +++ b/server/server.go @@ -302,9 +302,9 @@ func createMessageCache(conf *Config, pool *db.DB) (*message.Cache, error) { func createAttachmentStore(conf *Config, messageCache *message.Cache) (*attachment.Store, error) { if strings.HasPrefix(conf.AttachmentCacheDir, "s3://") { - return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes) + return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, conf.AttachmentOrphanGracePeriod, messageCache.AttachmentsWithSizes) } else if conf.AttachmentCacheDir != "" { - return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes) + return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, conf.AttachmentOrphanGracePeriod, messageCache.AttachmentsWithSizes) } return nil, nil } @@ -1426,6 +1426,9 @@ func (s *Server) handleBodyAsAttachment(r *http.Request, v *visitor, m *model.Me return err } attachmentExpiry := time.Now().Add(vinfo.Limits.AttachmentExpiryDuration).Unix() + if m.Expires > 0 && attachmentExpiry > m.Expires { + attachmentExpiry = m.Expires // Attachment must never outlive the message + } if m.Time > attachmentExpiry { return errHTTPBadRequestAttachmentsExpiryBeforeDelivery.With(m) } diff --git a/server/server_account_test.go b/server/server_account_test.go index 0360fcd4..58f4d5d4 100644 --- a/server/server_account_test.go +++ b/server/server_account_test.go @@ -673,6 +673,7 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) { t.Parallel() conf := newTestConfigWithAuthFile(t, databaseURL) conf.AuthDefault = user.PermissionReadWrite + conf.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately s := newTestServer(t, conf) // Create user with tier @@ -742,6 +743,7 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) { // Verify that messages and attachments were deleted // This does not explicitly call the manager! waitFor(t, func() bool { + s.attachment.Sync() // File cleanup is done by sync, not by the manager ms, err := s.messageCache.Messages("mytopic1", model.SinceAllMessages, false) require.Nil(t, err) return len(ms) == 0 && !util.FileExists(filepath.Join(s.config.AttachmentCacheDir, m1.ID)) diff --git a/server/server_manager.go b/server/server_manager.go index 89ff38c2..387ad2b8 100644 --- a/server/server_manager.go +++ b/server/server_manager.go @@ -3,7 +3,6 @@ package server import ( "heckel.io/ntfy/v2/log" "heckel.io/ntfy/v2/util" - "strings" ) func (s *Server) execManager() { @@ -120,7 +119,7 @@ func (s *Server) pruneVisitors() { } }). Field("stale_visitors", staleVisitors). - Debug("Deleted %d stale visitor(s)", staleVisitors) + Debug("Finished deleting stale visitors") } func (s *Server) pruneTokens() { @@ -135,7 +134,7 @@ func (s *Server) pruneTokens() { log.Tag(tagManager).Err(err).Warn("Error deleting soft-deleted users") } }). - Debug("Removed expired tokens and users") + Debug("Finished deleting expired tokens and users") } } @@ -143,48 +142,39 @@ func (s *Server) pruneAttachments() { if s.attachment == nil { return } + // Only mark as deleted in DB. The actual storage files are cleaned up + // by the attachment store's sync() loop, which periodically reconciles + // storage with the database and removes orphaned files. log. Tag(tagManager). Timing(func() { - ids, err := s.messageCache.AttachmentsExpired() + count, err := s.messageCache.MarkExpiredAttachmentsDeleted(s.config.ManagerBatchSize) if err != nil { - log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments") - } else if len(ids) > 0 { - if log.Tag(tagManager).IsDebug() { - log.Tag(tagManager).Debug("Deleting attachments %s", strings.Join(ids, ", ")) - } - if err := s.attachment.Remove(ids...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error deleting attachments") - } - if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") - } + log.Tag(tagManager).Err(err).Warn("Error marking expired attachments as deleted") + } else if count > 0 { + log.Tag(tagManager).Debug("Marked %d expired attachment(s) as deleted", count) } else { log.Tag(tagManager).Debug("No expired attachments to delete") } }). - Debug("Deleted expired attachments") + Debug("Finished marking expired attachments as deleted") } func (s *Server) pruneMessages() { + // Only delete DB rows. Attachment storage files are cleaned up by the + // attachment store's sync() loop, which periodically reconciles storage + // with the database and removes orphaned files. log. Tag(tagManager). Timing(func() { - expiredMessageIDs, err := s.messageCache.MessagesExpired() + count, err := s.messageCache.DeleteExpiredMessages(s.config.ManagerBatchSize) if err != nil { - log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages") - } else if len(expiredMessageIDs) > 0 { - if s.attachment != nil { - if err := s.attachment.Remove(expiredMessageIDs...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error deleting attachments for expired messages") - } - } - if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil { - log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted") - } + log.Tag(tagManager).Err(err).Warn("Error deleting expired messages") + } else if count > 0 { + log.Tag(tagManager).Debug("Deleted %d expired message(s)", count) } else { log.Tag(tagManager).Debug("No expired messages to delete") } }). - Debug("Pruned messages") + Debug("Finished deleting expired messages") } diff --git a/server/server_payments_test.go b/server/server_payments_test.go index 9873d6d8..30b1a22e 100644 --- a/server/server_payments_test.go +++ b/server/server_payments_test.go @@ -443,6 +443,7 @@ func TestPayments_Webhook_Subscription_Updated_Downgrade_From_PastDue_To_Active( c := newTestConfigWithAuthFile(t, databaseURL) c.StripeSecretKey = "secret key" c.StripeWebhookKey = "webhook key" + c.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately s := newTestServer(t, c) s.stripe = stripeMock @@ -546,6 +547,7 @@ func TestPayments_Webhook_Subscription_Updated_Downgrade_From_PastDue_To_Active( // Verify that messages and attachments were deleted time.Sleep(time.Second) s.execManager() + s.attachment.Sync() // File cleanup is done by sync, not by the manager ms, err := s.messageCache.Messages("atopic", model.SinceAllMessages, false) require.Nil(t, err) diff --git a/server/server_test.go b/server/server_test.go index 44b9ac94..75d61772 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -2285,6 +2285,7 @@ func TestServer_PublishAttachmentAndExpire(t *testing.T) { c := newTestConfig(t, databaseURL) c.AttachmentExpiryDuration = time.Millisecond // Hack + c.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately s := newTestServer(t, c) // Publish and make sure we can retrieve it @@ -2301,7 +2302,8 @@ func TestServer_PublishAttachmentAndExpire(t *testing.T) { // Prune and makes sure it's gone waitFor(t, func() bool { - s.execManager() // May run many times + s.execManager() + s.attachment.Sync() // File cleanup is done by sync, not by the manager return !util.FileExists(file) }) response = request(t, s, "GET", path, "", nil) @@ -2698,7 +2700,7 @@ func TestServer_PublishWhileUpdatingStatsWithLotsOfMessages(t *testing.T) { response := request(t, s, "PUT", "/mytopic", "some body", nil) m := toMessage(t, response.Body.String()) require.Equal(t, "some body", m.Message) - require.True(t, time.Since(start) < 100*time.Millisecond) + require.True(t, time.Since(start) < 500*time.Millisecond) log.Info("Done: Publishing message; took %s", time.Since(start).Round(time.Millisecond)) // Wait for all Goroutines diff --git a/web/package-lock.json b/web/package-lock.json index 175ef11b..b04b2b41 100644 --- a/web/package-lock.json +++ b/web/package-lock.json @@ -9514,24 +9514,6 @@ "dev": true, "license": "ISC" }, - "node_modules/yaml": { - "version": "2.8.3", - "resolved": "https://registry.npmjs.org/yaml/-/yaml-2.8.3.tgz", - "integrity": "sha512-AvbaCLOO2Otw/lW5bmh9d/WEdcDFdQp2Z2ZUH3pX9U2ihyUY0nvLv7J6TrWowklRGPYbB/IuIMfYgxaCPg5Bpg==", - "dev": true, - "license": "ISC", - "optional": true, - "peer": true, - "bin": { - "yaml": "bin.mjs" - }, - "engines": { - "node": ">= 14.6" - }, - "funding": { - "url": "https://github.com/sponsors/eemeli" - } - }, "node_modules/yocto-queue": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz",