This commit is contained in:
binwiederhier 2026-03-25 15:28:23 -04:00
parent e55d1cee6b
commit 071543efda
15 changed files with 102 additions and 141 deletions

View file

@ -14,9 +14,8 @@ import (
) )
const ( const (
tagStore = "attachment_store" tagStore = "attachment_store"
syncInterval = 15 * time.Minute // How often to run the background sync loop syncInterval = 15 * time.Minute // How often to run the background sync loop
orphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads
) )
var errInvalidFileID = errors.New("invalid file ID") var errInvalidFileID = errors.New("invalid file ID")
@ -29,36 +28,38 @@ type Store struct {
size int64 // Current size of the store in bytes size int64 // Current size of the store in bytes
sizes map[string]int64 // File ID -> size, for subtracting on Remove sizes map[string]int64 // File ID -> size, for subtracting on Remove
attachmentsWithSizes func() (map[string]int64, error) // Returns file ID -> size for active attachments attachmentsWithSizes func() (map[string]int64, error) // Returns file ID -> size for active attachments
orphanGracePeriod time.Duration // Don't delete orphaned objects younger than this
closeChan chan struct{} closeChan chan struct{}
mu sync.RWMutex // Protects size and sizes mu sync.RWMutex // Protects size and sizes
} }
// NewFileStore creates a new file-system backed attachment cache // NewFileStore creates a new file-system backed attachment cache
func NewFileStore(dir string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { func NewFileStore(dir string, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
b, err := newFileBackend(dir) b, err := newFileBackend(dir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newStore(b, totalSizeLimit, attachmentsWithSizes) return newStore(b, totalSizeLimit, orphanGracePeriod, attachmentsWithSizes)
} }
// NewS3Store creates a new S3-backed attachment cache. The s3URL must be in the format: // NewS3Store creates a new S3-backed attachment cache. The s3URL must be in the format:
// //
// s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT] // s3://ACCESS_KEY:SECRET_KEY@BUCKET[/PREFIX]?region=REGION[&endpoint=ENDPOINT]
func NewS3Store(s3URL string, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { func NewS3Store(s3URL string, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
config, err := s3.ParseURL(s3URL) config, err := s3.ParseURL(s3URL)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newStore(newS3Backend(s3.New(config)), totalSizeLimit, attachmentsWithSizes) return newStore(newS3Backend(s3.New(config)), totalSizeLimit, orphanGracePeriod, attachmentsWithSizes)
} }
func newStore(backend backend, totalSizeLimit int64, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) { func newStore(backend backend, totalSizeLimit int64, orphanGracePeriod time.Duration, attachmentsWithSizes func() (map[string]int64, error)) (*Store, error) {
c := &Store{ c := &Store{
backend: backend, backend: backend,
limit: totalSizeLimit, limit: totalSizeLimit,
sizes: make(map[string]int64), sizes: make(map[string]int64),
attachmentsWithSizes: attachmentsWithSizes, attachmentsWithSizes: attachmentsWithSizes,
orphanGracePeriod: orphanGracePeriod,
closeChan: make(chan struct{}), closeChan: make(chan struct{}),
} }
// Hydrate sizes from the database immediately so that Size()/Remaining()/Remove() // Hydrate sizes from the database immediately so that Size()/Remaining()/Remove()
@ -140,9 +141,14 @@ func (c *Store) Remove(ids ...string) error {
return nil return nil
} }
// Sync triggers an immediate reconciliation of storage with the database.
func (c *Store) Sync() error {
return c.sync()
}
// sync reconciles the backend storage with the database. It lists all objects, // sync reconciles the backend storage with the database. It lists all objects,
// deletes orphans (not in the valid ID set and older than 1 hour), and recomputes // deletes orphans (not in the valid ID set and older than the grace period), and
// the total size from the existing attachments in the database. // recomputes the total size from the existing attachments in the database.
func (c *Store) sync() error { func (c *Store) sync() error {
if c.attachmentsWithSizes == nil { if c.attachmentsWithSizes == nil {
return nil return nil
@ -157,7 +163,7 @@ func (c *Store) sync() error {
} }
// Calculate total cache size and collect orphaned attachments, excluding objects younger // Calculate total cache size and collect orphaned attachments, excluding objects younger
// than the grace period to account for races, and skipping objects with invalid IDs. // than the grace period to account for races, and skipping objects with invalid IDs.
cutoff := time.Now().Add(-orphanGracePeriod) cutoff := time.Now().Add(-c.orphanGracePeriod)
var orphanIDs []string var orphanIDs []string
var count, totalSize int64 var count, totalSize int64
sizes := make(map[string]int64, len(remoteObjects)) sizes := make(map[string]int64, len(remoteObjects))

View file

@ -2,6 +2,7 @@ package attachment
import ( import (
"testing" "testing"
"time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -9,7 +10,7 @@ import (
func newTestFileStore(t *testing.T, totalSizeLimit int64) (dir string, cache *Store) { func newTestFileStore(t *testing.T, totalSizeLimit int64) (dir string, cache *Store) {
t.Helper() t.Helper()
dir = t.TempDir() dir = t.TempDir()
cache, err := NewFileStore(dir, totalSizeLimit, nil) cache, err := NewFileStore(dir, totalSizeLimit, time.Hour, nil)
require.Nil(t, err) require.Nil(t, err)
t.Cleanup(func() { cache.Close() }) t.Cleanup(func() { cache.Close() })
return dir, cache return dir, cache

View file

@ -24,7 +24,7 @@ func TestS3Store_WriteWithPrefix(t *testing.T) {
client := s3.New(cfg) client := s3.New(cfg)
deleteAllObjects(t, client) deleteAllObjects(t, client)
backend := newS3Backend(client) backend := newS3Backend(client)
cache, err := newStore(backend, 10*1024, nil) cache, err := newStore(backend, 10*1024, time.Hour, nil)
require.Nil(t, err) require.Nil(t, err)
t.Cleanup(func() { t.Cleanup(func() {
deleteAllObjects(t, client) deleteAllObjects(t, client)
@ -62,7 +62,7 @@ func newTestRealS3Store(t *testing.T, totalSizeLimit int64) (*Store, *modTimeOve
inner := newS3Backend(client) inner := newS3Backend(client)
wrapper := &modTimeOverrideBackend{backend: inner, modTimes: make(map[string]time.Time)} wrapper := &modTimeOverrideBackend{backend: inner, modTimes: make(map[string]time.Time)}
deleteAllObjects(t, client) deleteAllObjects(t, client)
store, err := newStore(wrapper, totalSizeLimit, nil) store, err := newStore(wrapper, totalSizeLimit, time.Hour, nil)
require.Nil(t, err) require.Nil(t, err)
t.Cleanup(func() { t.Cleanup(func() {
deleteAllObjects(t, client) deleteAllObjects(t, client)

View file

@ -24,7 +24,6 @@ var errNoRows = errors.New("no rows found")
// queries holds the database-specific SQL queries // queries holds the database-specific SQL queries
type queries struct { type queries struct {
insertMessage string insertMessage string
deleteMessage string
selectScheduledMessageIDsBySeqID string selectScheduledMessageIDsBySeqID string
deleteScheduledBySequenceID string deleteScheduledBySequenceID string
updateMessagesForTopicExpiry string updateMessagesForTopicExpiry string
@ -35,12 +34,11 @@ type queries struct {
selectMessagesSinceIDScheduled string selectMessagesSinceIDScheduled string
selectMessagesLatest string selectMessagesLatest string
selectMessagesDue string selectMessagesDue string
selectMessagesExpired string deleteExpiredMessages string
updateMessagePublished string updateMessagePublished string
selectMessagesCount string selectMessagesCount string
selectTopics string selectTopics string
updateAttachmentDeleted string markExpiredAttachmentsDeleted string
selectAttachmentsExpired string
selectAttachmentsSizeBySender string selectAttachmentsSizeBySender string
selectAttachmentsSizeByUserID string selectAttachmentsSizeByUserID string
selectAttachmentsWithSizes string selectAttachmentsWithSizes string
@ -246,14 +244,16 @@ func (c *Cache) MessagesDue() ([]*model.Message, error) {
return readMessages(rows) return readMessages(rows)
} }
// MessagesExpired returns a list of message IDs that have expired and should be deleted // DeleteExpiredMessages deletes up to `limit` expired messages in a single query
func (c *Cache) MessagesExpired() ([]string, error) { // and returns the number of deleted rows.
rows, err := c.db.Query(c.queries.selectMessagesExpired, time.Now().Unix()) func (c *Cache) DeleteExpiredMessages(limit int) (int64, error) {
c.maybeLock()
defer c.maybeUnlock()
result, err := c.db.Exec(c.queries.deleteExpiredMessages, time.Now().Unix(), limit)
if err != nil { if err != nil {
return nil, err return 0, err
} }
defer rows.Close() return result.RowsAffected()
return readStrings(rows)
} }
// Message returns the message with the given ID, or ErrMessageNotFound if not found // Message returns the message with the given ID, or ErrMessageNotFound if not found
@ -312,20 +312,6 @@ func (c *Cache) Topics() ([]string, error) {
return readStrings(rows) return readStrings(rows)
} }
// DeleteMessages deletes the messages with the given IDs
func (c *Cache) DeleteMessages(ids ...string) error {
c.maybeLock()
defer c.maybeUnlock()
return db.ExecTx(c.db, func(tx *sql.Tx) error {
for _, id := range ids {
if _, err := tx.Exec(c.queries.deleteMessage, id); err != nil {
return err
}
}
return nil
})
}
// DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID. // DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID.
// It returns the message IDs of the deleted messages, which can be used to clean up attachment files. // It returns the message IDs of the deleted messages, which can be used to clean up attachment files.
func (c *Cache) DeleteScheduledBySequenceID(topic, sequenceID string) ([]string, error) { func (c *Cache) DeleteScheduledBySequenceID(topic, sequenceID string) ([]string, error) {
@ -363,28 +349,16 @@ func (c *Cache) ExpireMessages(topics ...string) error {
}) })
} }
// AttachmentsExpired returns message IDs with expired attachments that have not been deleted // MarkExpiredAttachmentsDeleted marks up to `limit` expired attachments as deleted in a single
func (c *Cache) AttachmentsExpired() ([]string, error) { // query and returns the number of updated rows.
rows, err := c.db.Query(c.queries.selectAttachmentsExpired, time.Now().Unix()) func (c *Cache) MarkExpiredAttachmentsDeleted(limit int) (int64, error) {
if err != nil {
return nil, err
}
defer rows.Close()
return readStrings(rows)
}
// MarkAttachmentsDeleted marks the attachments for the given message IDs as deleted
func (c *Cache) MarkAttachmentsDeleted(ids ...string) error {
c.maybeLock() c.maybeLock()
defer c.maybeUnlock() defer c.maybeUnlock()
return db.ExecTx(c.db, func(tx *sql.Tx) error { result, err := c.db.Exec(c.queries.markExpiredAttachmentsDeleted, time.Now().Unix(), limit)
for _, id := range ids { if err != nil {
if _, err := tx.Exec(c.queries.updateAttachmentDeleted, id); err != nil { return 0, err
return err }
} return result.RowsAffected()
}
return nil
})
} }
// AttachmentBytesUsedBySender returns the total size of active attachments sent by the given sender // AttachmentBytesUsedBySender returns the total size of active attachments sent by the given sender

View file

@ -12,7 +12,6 @@ const (
INSERT INTO message (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user_id, content_type, encoding, published) INSERT INTO message (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user_id, content_type, encoding, published)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24)
` `
postgresDeleteMessageQuery = `DELETE FROM message WHERE mid = $1`
postgresSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE` postgresSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE`
postgresDeleteScheduledBySequenceIDQuery = `DELETE FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE` postgresDeleteScheduledBySequenceIDQuery = `DELETE FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE`
postgresUpdateMessagesForTopicExpiryQuery = `UPDATE message SET expires = $1 WHERE topic = $2` postgresUpdateMessagesForTopicExpiryQuery = `UPDATE message SET expires = $1 WHERE topic = $2`
@ -61,13 +60,12 @@ const (
WHERE time <= $1 AND published = FALSE WHERE time <= $1 AND published = FALSE
ORDER BY time, id ORDER BY time, id
` `
postgresSelectMessagesExpiredQuery = `SELECT mid FROM message WHERE expires <= $1 AND published = TRUE`
postgresUpdateMessagePublishedQuery = `UPDATE message SET published = TRUE WHERE mid = $1` postgresUpdateMessagePublishedQuery = `UPDATE message SET published = TRUE WHERE mid = $1`
postgresSelectMessagesCountQuery = `SELECT COUNT(*) FROM message` postgresSelectMessagesCountQuery = `SELECT COUNT(*) FROM message`
postgresSelectTopicsQuery = `SELECT topic FROM message GROUP BY topic` postgresSelectTopicsQuery = `SELECT topic FROM message GROUP BY topic`
postgresUpdateAttachmentDeletedQuery = `UPDATE message SET attachment_deleted = TRUE WHERE mid = $1` postgresDeleteExpiredMessagesQuery = `DELETE FROM message WHERE mid IN (SELECT mid FROM message WHERE expires <= $1 AND published = TRUE LIMIT $2)`
postgresSelectAttachmentsExpiredQuery = `SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE` postgresMarkExpiredAttachmentsDeletedQuery = `UPDATE message SET attachment_deleted = TRUE WHERE mid IN (SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE LIMIT $2)`
postgresSelectAttachmentsSizeBySenderQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = '' AND sender = $1 AND attachment_expires >= $2` postgresSelectAttachmentsSizeBySenderQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = '' AND sender = $1 AND attachment_expires >= $2`
postgresSelectAttachmentsSizeByUserIDQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = $1 AND attachment_expires >= $2` postgresSelectAttachmentsSizeByUserIDQuery = `SELECT COALESCE(SUM(attachment_size), 0) FROM message WHERE user_id = $1 AND attachment_expires >= $2`
postgresSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM message WHERE attachment_expires > $1 AND attachment_deleted = FALSE` postgresSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM message WHERE attachment_expires > $1 AND attachment_deleted = FALSE`
@ -79,7 +77,6 @@ const (
var postgresQueries = queries{ var postgresQueries = queries{
insertMessage: postgresInsertMessageQuery, insertMessage: postgresInsertMessageQuery,
deleteMessage: postgresDeleteMessageQuery,
selectScheduledMessageIDsBySeqID: postgresSelectScheduledMessageIDsBySeqIDQuery, selectScheduledMessageIDsBySeqID: postgresSelectScheduledMessageIDsBySeqIDQuery,
deleteScheduledBySequenceID: postgresDeleteScheduledBySequenceIDQuery, deleteScheduledBySequenceID: postgresDeleteScheduledBySequenceIDQuery,
updateMessagesForTopicExpiry: postgresUpdateMessagesForTopicExpiryQuery, updateMessagesForTopicExpiry: postgresUpdateMessagesForTopicExpiryQuery,
@ -90,12 +87,11 @@ var postgresQueries = queries{
selectMessagesSinceIDScheduled: postgresSelectMessagesSinceIDIncludeScheduledQuery, selectMessagesSinceIDScheduled: postgresSelectMessagesSinceIDIncludeScheduledQuery,
selectMessagesLatest: postgresSelectMessagesLatestQuery, selectMessagesLatest: postgresSelectMessagesLatestQuery,
selectMessagesDue: postgresSelectMessagesDueQuery, selectMessagesDue: postgresSelectMessagesDueQuery,
selectMessagesExpired: postgresSelectMessagesExpiredQuery, deleteExpiredMessages: postgresDeleteExpiredMessagesQuery,
updateMessagePublished: postgresUpdateMessagePublishedQuery, updateMessagePublished: postgresUpdateMessagePublishedQuery,
selectMessagesCount: postgresSelectMessagesCountQuery, selectMessagesCount: postgresSelectMessagesCountQuery,
selectTopics: postgresSelectTopicsQuery, selectTopics: postgresSelectTopicsQuery,
updateAttachmentDeleted: postgresUpdateAttachmentDeletedQuery, markExpiredAttachmentsDeleted: postgresMarkExpiredAttachmentsDeletedQuery,
selectAttachmentsExpired: postgresSelectAttachmentsExpiredQuery,
selectAttachmentsSizeBySender: postgresSelectAttachmentsSizeBySenderQuery, selectAttachmentsSizeBySender: postgresSelectAttachmentsSizeBySenderQuery,
selectAttachmentsSizeByUserID: postgresSelectAttachmentsSizeByUserIDQuery, selectAttachmentsSizeByUserID: postgresSelectAttachmentsSizeByUserIDQuery,
selectAttachmentsWithSizes: postgresSelectAttachmentsWithSizesQuery, selectAttachmentsWithSizes: postgresSelectAttachmentsWithSizesQuery,

View file

@ -73,14 +73,14 @@ const (
` `
) )
var postgresMigrations = map[int]func(db *sql.DB) error{ var postgresMigrations = map[int]func(d *sql.DB) error{
14: postgresMigrateFrom14, 14: postgresMigrateFrom14,
} }
func setupPostgres(sqlDB *sql.DB) error { func setupPostgres(d *sql.DB) error {
var schemaVersion int var schemaVersion int
if err := sqlDB.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil { if err := d.QueryRow(postgresSelectSchemaVersionQuery).Scan(&schemaVersion); err != nil {
return setupNewPostgresDB(sqlDB) return setupNewPostgresDB(d)
} else if schemaVersion == postgresCurrentSchemaVersion { } else if schemaVersion == postgresCurrentSchemaVersion {
return nil return nil
} else if schemaVersion > postgresCurrentSchemaVersion { } else if schemaVersion > postgresCurrentSchemaVersion {
@ -90,16 +90,16 @@ func setupPostgres(sqlDB *sql.DB) error {
fn, ok := postgresMigrations[i] fn, ok := postgresMigrations[i]
if !ok { if !ok {
return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1) return fmt.Errorf("cannot find migration step from schema version %d to %d", i, i+1)
} else if err := fn(sqlDB); err != nil { } else if err := fn(d); err != nil {
return err return err
} }
} }
return nil return nil
} }
func postgresMigrateFrom14(sqlDB *sql.DB) error { func postgresMigrateFrom14(d *sql.DB) error {
log.Tag(tagMessageCache).Info("Migrating message cache database schema: from 14 to 15") log.Tag(tagMessageCache).Info("Migrating message cache database schema: from 14 to 15")
return db.ExecTx(sqlDB, func(tx *sql.Tx) error { return db.ExecTx(d, func(tx *sql.Tx) error {
if _, err := tx.Exec(postgresMigrate14To15CreateIndexQuery); err != nil { if _, err := tx.Exec(postgresMigrate14To15CreateIndexQuery); err != nil {
return err return err
} }

View file

@ -18,7 +18,6 @@ const (
INSERT INTO messages (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published) INSERT INTO messages (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
` `
sqliteDeleteMessageQuery = `DELETE FROM messages WHERE mid = ?`
sqliteSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` sqliteSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0`
sqliteDeleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` sqliteDeleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0`
sqliteUpdateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` sqliteUpdateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?`
@ -64,13 +63,12 @@ const (
WHERE time <= ? AND published = 0 WHERE time <= ? AND published = 0
ORDER BY time, id ORDER BY time, id
` `
sqliteSelectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1`
sqliteUpdateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` sqliteUpdateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
sqliteSelectMessagesCountQuery = `SELECT COUNT(*) FROM messages` sqliteSelectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
sqliteSelectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` sqliteSelectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
sqliteUpdateAttachmentDeletedQuery = `UPDATE messages SET attachment_deleted = 1 WHERE mid = ?` sqliteDeleteExpiredMessagesQuery = `DELETE FROM messages WHERE mid IN (SELECT mid FROM messages WHERE expires <= ? AND published = 1 LIMIT ?)`
sqliteSelectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0` sqliteMarkExpiredAttachmentsDeletedQuery = `UPDATE messages SET attachment_deleted = 1 WHERE mid IN (SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0 LIMIT ?)`
sqliteSelectAttachmentsSizeBySenderQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = '' AND sender = ? AND attachment_expires >= ?` sqliteSelectAttachmentsSizeBySenderQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = '' AND sender = ? AND attachment_expires >= ?`
sqliteSelectAttachmentsSizeByUserIDQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = ? AND attachment_expires >= ?` sqliteSelectAttachmentsSizeByUserIDQuery = `SELECT IFNULL(SUM(attachment_size), 0) FROM messages WHERE user = ? AND attachment_expires >= ?`
sqliteSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM messages WHERE attachment_expires > ? AND attachment_deleted = 0` sqliteSelectAttachmentsWithSizesQuery = `SELECT mid, attachment_size FROM messages WHERE attachment_expires > ? AND attachment_deleted = 0`
@ -82,7 +80,6 @@ const (
var sqliteQueries = queries{ var sqliteQueries = queries{
insertMessage: sqliteInsertMessageQuery, insertMessage: sqliteInsertMessageQuery,
deleteMessage: sqliteDeleteMessageQuery,
selectScheduledMessageIDsBySeqID: sqliteSelectScheduledMessageIDsBySeqIDQuery, selectScheduledMessageIDsBySeqID: sqliteSelectScheduledMessageIDsBySeqIDQuery,
deleteScheduledBySequenceID: sqliteDeleteScheduledBySequenceIDQuery, deleteScheduledBySequenceID: sqliteDeleteScheduledBySequenceIDQuery,
updateMessagesForTopicExpiry: sqliteUpdateMessagesForTopicExpiryQuery, updateMessagesForTopicExpiry: sqliteUpdateMessagesForTopicExpiryQuery,
@ -93,12 +90,11 @@ var sqliteQueries = queries{
selectMessagesSinceIDScheduled: sqliteSelectMessagesSinceIDIncludeScheduledQuery, selectMessagesSinceIDScheduled: sqliteSelectMessagesSinceIDIncludeScheduledQuery,
selectMessagesLatest: sqliteSelectMessagesLatestQuery, selectMessagesLatest: sqliteSelectMessagesLatestQuery,
selectMessagesDue: sqliteSelectMessagesDueQuery, selectMessagesDue: sqliteSelectMessagesDueQuery,
selectMessagesExpired: sqliteSelectMessagesExpiredQuery, deleteExpiredMessages: sqliteDeleteExpiredMessagesQuery,
updateMessagePublished: sqliteUpdateMessagePublishedQuery, updateMessagePublished: sqliteUpdateMessagePublishedQuery,
selectMessagesCount: sqliteSelectMessagesCountQuery, selectMessagesCount: sqliteSelectMessagesCountQuery,
selectTopics: sqliteSelectTopicsQuery, selectTopics: sqliteSelectTopicsQuery,
updateAttachmentDeleted: sqliteUpdateAttachmentDeletedQuery, markExpiredAttachmentsDeleted: sqliteMarkExpiredAttachmentsDeletedQuery,
selectAttachmentsExpired: sqliteSelectAttachmentsExpiredQuery,
selectAttachmentsSizeBySender: sqliteSelectAttachmentsSizeBySenderQuery, selectAttachmentsSizeBySender: sqliteSelectAttachmentsSizeBySenderQuery,
selectAttachmentsSizeByUserID: sqliteSelectAttachmentsSizeByUserIDQuery, selectAttachmentsSizeByUserID: sqliteSelectAttachmentsSizeByUserIDQuery,
selectAttachmentsWithSizes: sqliteSelectAttachmentsWithSizesQuery, selectAttachmentsWithSizes: sqliteSelectAttachmentsWithSizesQuery,

View file

@ -209,7 +209,7 @@ func TestSqliteStore_Migration_From9(t *testing.T) {
require.True(t, rows.Next()) require.True(t, rows.Next())
var version int var version int
require.Nil(t, rows.Scan(&version)) require.Nil(t, rows.Scan(&version))
require.Equal(t, 14, version) require.Equal(t, 15, version)
require.Nil(t, rows.Close()) require.Nil(t, rows.Close())
messages, err := s.Messages("mytopic", model.SinceAllMessages, false) messages, err := s.Messages("mytopic", model.SinceAllMessages, false)
@ -287,6 +287,6 @@ func checkSqliteSchemaVersion(t *testing.T, filename string) {
require.True(t, rows.Next()) require.True(t, rows.Next())
var schemaVersion int var schemaVersion int
require.Nil(t, rows.Scan(&schemaVersion)) require.Nil(t, rows.Scan(&schemaVersion))
require.Equal(t, 14, schemaVersion) require.Equal(t, 15, schemaVersion)
require.Nil(t, rows.Close()) require.Nil(t, rows.Close())
} }

View file

@ -3,7 +3,6 @@ package message_test
import ( import (
"net/netip" "net/netip"
"path/filepath" "path/filepath"
"sort"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -274,9 +273,9 @@ func TestStore_Prune(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 3, count) require.Equal(t, 3, count)
expiredMessageIDs, err := s.MessagesExpired() deleted, err := s.DeleteExpiredMessages(10)
require.Nil(t, err) require.Nil(t, err)
require.Nil(t, s.DeleteMessages(expiredMessageIDs...)) require.Equal(t, int64(2), deleted)
count, err = s.MessagesCount() count, err = s.MessagesCount()
require.Nil(t, err) require.Nil(t, err)
@ -414,10 +413,9 @@ func TestStore_AttachmentsExpired(t *testing.T) {
} }
require.Nil(t, s.AddMessage(m)) require.Nil(t, s.AddMessage(m))
ids, err := s.AttachmentsExpired() count, err := s.MarkExpiredAttachmentsDeleted(10)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 1, len(ids)) require.Equal(t, int64(1), count)
require.Equal(t, "m4", ids[0])
}) })
} }
@ -583,13 +581,9 @@ func TestStore_ExpireMessages(t *testing.T) {
require.Nil(t, s.ExpireMessages("topic1")) require.Nil(t, s.ExpireMessages("topic1"))
// topic1 messages should now be expired (expires set to past) // topic1 messages should now be expired (expires set to past)
expiredIDs, err := s.MessagesExpired() deleted, err := s.DeleteExpiredMessages(100)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 2, len(expiredIDs)) require.Equal(t, int64(2), deleted)
sort.Strings(expiredIDs)
expectedIDs := []string{m1.ID, m2.ID}
sort.Strings(expectedIDs)
require.Equal(t, expectedIDs, expiredIDs)
// topic2 should be unaffected // topic2 should be unaffected
messages, err = s.Messages("topic2", model.SinceAllMessages, false) messages, err = s.Messages("topic2", model.SinceAllMessages, false)
@ -629,27 +623,15 @@ func TestStore_MarkAttachmentsDeleted(t *testing.T) {
} }
require.Nil(t, s.AddMessage(m2)) require.Nil(t, s.AddMessage(m2))
// Both should show as expired attachments needing cleanup // Both should be marked as deleted in one batch
ids, err := s.AttachmentsExpired() count, err := s.MarkExpiredAttachmentsDeleted(10)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 2, len(ids)) require.Equal(t, int64(2), count)
// Mark msg1's attachment as deleted (file cleaned up)
require.Nil(t, s.MarkAttachmentsDeleted("msg1"))
// Now only msg2 should show as needing cleanup
ids, err = s.AttachmentsExpired()
require.Nil(t, err)
require.Equal(t, 1, len(ids))
require.Equal(t, "msg2", ids[0])
// Mark msg2 too
require.Nil(t, s.MarkAttachmentsDeleted("msg2"))
// No more expired attachments to clean up // No more expired attachments to clean up
ids, err = s.AttachmentsExpired() count, err = s.MarkExpiredAttachmentsDeleted(10)
require.Nil(t, err) require.Nil(t, err)
require.Equal(t, 0, len(ids)) require.Equal(t, int64(0), count)
// Messages themselves still exist // Messages themselves still exist
messages, err := s.Messages("mytopic", model.SinceAllMessages, false) messages, err := s.Messages("mytopic", model.SinceAllMessages, false)

View file

@ -20,6 +20,7 @@ const (
DefaultCacheBatchTimeout = time.Duration(0) DefaultCacheBatchTimeout = time.Duration(0)
DefaultKeepaliveInterval = 45 * time.Second // Not too frequently to save battery (Android read timeout used to be 77s!) DefaultKeepaliveInterval = 45 * time.Second // Not too frequently to save battery (Android read timeout used to be 77s!)
DefaultManagerInterval = time.Minute DefaultManagerInterval = time.Minute
DefaultManagerBatchSize = 30000
DefaultDelayedSenderInterval = 10 * time.Second DefaultDelayedSenderInterval = 10 * time.Second
DefaultMessageDelayMin = 10 * time.Second DefaultMessageDelayMin = 10 * time.Second
DefaultMessageDelayMax = 3 * 24 * time.Hour DefaultMessageDelayMax = 3 * 24 * time.Hour
@ -46,11 +47,13 @@ const (
// - total topic limit: max number of topics overall // - total topic limit: max number of topics overall
// - various attachment limits // - various attachment limits
const ( const (
DefaultMessageSizeLimit = 4096 // Bytes; note that FCM/APNS have a limit of ~4 KB for the entire message DefaultMessageSizeLimit = 4096 // Bytes; note that FCM/APNS have a limit of ~4 KB for the entire message
DefaultTotalTopicLimit = 15000 DefaultTotalTopicLimit = 15000
DefaultAttachmentTotalSizeLimit = int64(5 * 1024 * 1024 * 1024) // 5 GB DefaultAttachmentTotalSizeLimit = int64(5 * 1024 * 1024 * 1024) // 5 GB
DefaultAttachmentFileSizeLimit = int64(15 * 1024 * 1024) // 15 MB DefaultAttachmentFileSizeLimit = int64(15 * 1024 * 1024) // 15 MB
DefaultAttachmentExpiryDuration = 3 * time.Hour DefaultAttachmentExpiryDuration = 3 * time.Hour
DefaultAttachmentOrphanGracePeriod = time.Hour // Don't delete orphaned objects younger than this to avoid races with in-flight uploads
) )
// Defines all per-visitor limits // Defines all per-visitor limits
@ -115,9 +118,11 @@ type Config struct {
AttachmentTotalSizeLimit int64 AttachmentTotalSizeLimit int64
AttachmentFileSizeLimit int64 AttachmentFileSizeLimit int64
AttachmentExpiryDuration time.Duration AttachmentExpiryDuration time.Duration
AttachmentOrphanGracePeriod time.Duration
TemplateDir string // Directory to load named templates from TemplateDir string // Directory to load named templates from
KeepaliveInterval time.Duration KeepaliveInterval time.Duration
ManagerInterval time.Duration ManagerInterval time.Duration
ManagerBatchSize int
DisallowedTopics []string DisallowedTopics []string
WebRoot string // empty to disable WebRoot string // empty to disable
DelayedSenderInterval time.Duration DelayedSenderInterval time.Duration
@ -217,9 +222,11 @@ func NewConfig() *Config {
AttachmentTotalSizeLimit: DefaultAttachmentTotalSizeLimit, AttachmentTotalSizeLimit: DefaultAttachmentTotalSizeLimit,
AttachmentFileSizeLimit: DefaultAttachmentFileSizeLimit, AttachmentFileSizeLimit: DefaultAttachmentFileSizeLimit,
AttachmentExpiryDuration: DefaultAttachmentExpiryDuration, AttachmentExpiryDuration: DefaultAttachmentExpiryDuration,
AttachmentOrphanGracePeriod: DefaultAttachmentOrphanGracePeriod,
TemplateDir: DefaultTemplateDir, TemplateDir: DefaultTemplateDir,
KeepaliveInterval: DefaultKeepaliveInterval, KeepaliveInterval: DefaultKeepaliveInterval,
ManagerInterval: DefaultManagerInterval, ManagerInterval: DefaultManagerInterval,
ManagerBatchSize: DefaultManagerBatchSize,
DisallowedTopics: DefaultDisallowedTopics, DisallowedTopics: DefaultDisallowedTopics,
WebRoot: "/", WebRoot: "/",
DelayedSenderInterval: DefaultDelayedSenderInterval, DelayedSenderInterval: DefaultDelayedSenderInterval,

View file

@ -302,9 +302,9 @@ func createMessageCache(conf *Config, pool *db.DB) (*message.Cache, error) {
func createAttachmentStore(conf *Config, messageCache *message.Cache) (*attachment.Store, error) { func createAttachmentStore(conf *Config, messageCache *message.Cache) (*attachment.Store, error) {
if strings.HasPrefix(conf.AttachmentCacheDir, "s3://") { if strings.HasPrefix(conf.AttachmentCacheDir, "s3://") {
return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes) return attachment.NewS3Store(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, conf.AttachmentOrphanGracePeriod, messageCache.AttachmentsWithSizes)
} else if conf.AttachmentCacheDir != "" { } else if conf.AttachmentCacheDir != "" {
return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, messageCache.AttachmentsWithSizes) return attachment.NewFileStore(conf.AttachmentCacheDir, conf.AttachmentTotalSizeLimit, conf.AttachmentOrphanGracePeriod, messageCache.AttachmentsWithSizes)
} }
return nil, nil return nil, nil
} }

View file

@ -673,6 +673,7 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) {
t.Parallel() t.Parallel()
conf := newTestConfigWithAuthFile(t, databaseURL) conf := newTestConfigWithAuthFile(t, databaseURL)
conf.AuthDefault = user.PermissionReadWrite conf.AuthDefault = user.PermissionReadWrite
conf.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately
s := newTestServer(t, conf) s := newTestServer(t, conf)
// Create user with tier // Create user with tier
@ -742,6 +743,7 @@ func TestAccount_Reservation_Delete_Messages_And_Attachments(t *testing.T) {
// Verify that messages and attachments were deleted // Verify that messages and attachments were deleted
// This does not explicitly call the manager! // This does not explicitly call the manager!
waitFor(t, func() bool { waitFor(t, func() bool {
s.attachment.Sync() // File cleanup is done by sync, not by the manager
ms, err := s.messageCache.Messages("mytopic1", model.SinceAllMessages, false) ms, err := s.messageCache.Messages("mytopic1", model.SinceAllMessages, false)
require.Nil(t, err) require.Nil(t, err)
return len(ms) == 0 && !util.FileExists(filepath.Join(s.config.AttachmentCacheDir, m1.ID)) return len(ms) == 0 && !util.FileExists(filepath.Join(s.config.AttachmentCacheDir, m1.ID))

View file

@ -142,22 +142,17 @@ func (s *Server) pruneAttachments() {
if s.attachment == nil { if s.attachment == nil {
return return
} }
// Only mark as deleted in DB. The actual storage files are cleaned up
// by the attachment store's sync() loop, which periodically reconciles
// storage with the database and removes orphaned files.
log. log.
Tag(tagManager). Tag(tagManager).
Timing(func() { Timing(func() {
ids, err := s.messageCache.AttachmentsExpired() count, err := s.messageCache.MarkExpiredAttachmentsDeleted(s.config.ManagerBatchSize)
if err != nil { if err != nil {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired attachments") log.Tag(tagManager).Err(err).Warn("Error marking expired attachments as deleted")
} else if len(ids) > 0 { } else if count > 0 {
if log.Tag(tagManager).IsDebug() { log.Tag(tagManager).Debug("Marked %d expired attachment(s) as deleted", count)
log.Tag(tagManager).Debug("Marking %d expired attachment(s) as deleted", len(ids))
}
// Only mark as deleted in DB. The actual storage files are cleaned up
// by the attachment store's sync() loop, which periodically reconciles
// storage with the database and removes orphaned files.
if err := s.messageCache.MarkAttachmentsDeleted(ids...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error marking attachments deleted")
}
} else { } else {
log.Tag(tagManager).Debug("No expired attachments to delete") log.Tag(tagManager).Debug("No expired attachments to delete")
} }
@ -166,19 +161,17 @@ func (s *Server) pruneAttachments() {
} }
func (s *Server) pruneMessages() { func (s *Server) pruneMessages() {
// Only delete DB rows. Attachment storage files are cleaned up by the
// attachment store's sync() loop, which periodically reconciles storage
// with the database and removes orphaned files.
log. log.
Tag(tagManager). Tag(tagManager).
Timing(func() { Timing(func() {
expiredMessageIDs, err := s.messageCache.MessagesExpired() count, err := s.messageCache.DeleteExpiredMessages(s.config.ManagerBatchSize)
if err != nil { if err != nil {
log.Tag(tagManager).Err(err).Warn("Error retrieving expired messages") log.Tag(tagManager).Err(err).Warn("Error deleting expired messages")
} else if len(expiredMessageIDs) > 0 { } else if count > 0 {
// Only delete DB rows. Attachment storage files are cleaned up by the log.Tag(tagManager).Debug("Deleted %d expired message(s)", count)
// attachment store's sync() loop, which periodically reconciles storage
// with the database and removes orphaned files.
if err := s.messageCache.DeleteMessages(expiredMessageIDs...); err != nil {
log.Tag(tagManager).Err(err).Warn("Error deleting expired messages")
}
} else { } else {
log.Tag(tagManager).Debug("No expired messages to delete") log.Tag(tagManager).Debug("No expired messages to delete")
} }

View file

@ -443,6 +443,7 @@ func TestPayments_Webhook_Subscription_Updated_Downgrade_From_PastDue_To_Active(
c := newTestConfigWithAuthFile(t, databaseURL) c := newTestConfigWithAuthFile(t, databaseURL)
c.StripeSecretKey = "secret key" c.StripeSecretKey = "secret key"
c.StripeWebhookKey = "webhook key" c.StripeWebhookKey = "webhook key"
c.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately
s := newTestServer(t, c) s := newTestServer(t, c)
s.stripe = stripeMock s.stripe = stripeMock
@ -546,6 +547,7 @@ func TestPayments_Webhook_Subscription_Updated_Downgrade_From_PastDue_To_Active(
// Verify that messages and attachments were deleted // Verify that messages and attachments were deleted
time.Sleep(time.Second) time.Sleep(time.Second)
s.execManager() s.execManager()
s.attachment.Sync() // File cleanup is done by sync, not by the manager
ms, err := s.messageCache.Messages("atopic", model.SinceAllMessages, false) ms, err := s.messageCache.Messages("atopic", model.SinceAllMessages, false)
require.Nil(t, err) require.Nil(t, err)

View file

@ -2285,6 +2285,7 @@ func TestServer_PublishAttachmentAndExpire(t *testing.T) {
c := newTestConfig(t, databaseURL) c := newTestConfig(t, databaseURL)
c.AttachmentExpiryDuration = time.Millisecond // Hack c.AttachmentExpiryDuration = time.Millisecond // Hack
c.AttachmentOrphanGracePeriod = 0 // For testing: delete orphans immediately
s := newTestServer(t, c) s := newTestServer(t, c)
// Publish and make sure we can retrieve it // Publish and make sure we can retrieve it
@ -2301,7 +2302,8 @@ func TestServer_PublishAttachmentAndExpire(t *testing.T) {
// Prune and makes sure it's gone // Prune and makes sure it's gone
waitFor(t, func() bool { waitFor(t, func() bool {
s.execManager() // May run many times s.execManager()
s.attachment.Sync() // File cleanup is done by sync, not by the manager
return !util.FileExists(file) return !util.FileExists(file)
}) })
response = request(t, s, "GET", path, "", nil) response = request(t, s, "GET", path, "", nil)