diff --git a/attachment/store.go b/attachment/store.go index c48a1e90..302eb585 100644 --- a/attachment/store.go +++ b/attachment/store.go @@ -10,6 +10,11 @@ import ( "heckel.io/ntfy/v2/util" ) +var ( + fileIDRegex = regexp.MustCompile(fmt.Sprintf(`^[-_A-Za-z0-9]{%d}$`, model.MessageIDLength)) + errInvalidFileID = errors.New("invalid file ID") +) + // Store is an interface for storing and retrieving attachment files type Store interface { Write(id string, in io.Reader, limiters ...util.Limiter) (int64, error) @@ -18,8 +23,3 @@ type Store interface { Size() int64 Remaining() int64 } - -var ( - fileIDRegex = regexp.MustCompile(fmt.Sprintf(`^[-_A-Za-z0-9]{%d}$`, model.MessageIDLength)) - errInvalidFileID = errors.New("invalid file ID") -) diff --git a/s3/client.go b/s3/client.go index ffc4fa8a..56f9608e 100644 --- a/s3/client.go +++ b/s3/client.go @@ -19,6 +19,12 @@ import ( "strconv" "strings" "time" + + "heckel.io/ntfy/v2/log" +) + +const ( + tagS3Client = "s3_client" ) // Client is a minimal S3-compatible client. It supports PutObject, GetObject, DeleteObjects, @@ -60,11 +66,13 @@ func (c *Client) PutObject(ctx context.Context, key string, body io.Reader) erro first := make([]byte, partSize) n, err := io.ReadFull(body, first) if errors.Is(err, io.ErrUnexpectedEOF) || err == io.EOF { + log.Tag(tagS3Client).Debug("PutObject key=%s size=%d (simple)", key, n) return c.putObject(ctx, key, bytes.NewReader(first[:n]), int64(n)) } if err != nil { return fmt.Errorf("s3: PutObject read: %w", err) } + log.Tag(tagS3Client).Debug("PutObject key=%s (multipart)", key) combined := io.MultiReader(bytes.NewReader(first), body) return c.putObjectMultipart(ctx, key, combined) } @@ -72,6 +80,7 @@ func (c *Client) PutObject(ctx context.Context, key string, body io.Reader) erro // GetObject downloads an object. The key is automatically prefixed with the client's configured // prefix. The caller must close the returned ReadCloser. func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, int64, error) { + log.Tag(tagS3Client).Debug("GetObject key=%s", key) fullKey := c.objectKey(key) req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.objectURL(fullKey), nil) if err != nil { @@ -97,6 +106,7 @@ func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, int6 // Even when S3 returns HTTP 200, individual keys may fail. If any per-key errors are present // in the response, they are returned as a combined error. func (c *Client) DeleteObjects(ctx context.Context, keys []string) error { + log.Tag(tagS3Client).Debug("DeleteObjects keys=%d", len(keys)) var body bytes.Buffer body.WriteString("true") for _, key := range keys { @@ -152,6 +162,7 @@ func (c *Client) DeleteObjects(ctx context.Context, keys []string) error { // ListObjects performs a single ListObjectsV2 request using the client's configured prefix. // Use continuationToken for pagination. Set maxKeys to 0 for the server default (typically 1000). func (c *Client) ListObjects(ctx context.Context, continuationToken string, maxKeys int) (*ListResult, error) { + log.Tag(tagS3Client).Debug("ListObjects continuation=%s maxKeys=%d", continuationToken, maxKeys) query := url.Values{"list-type": {"2"}} if prefix := c.prefixForList(); prefix != "" { query.Set("prefix", prefix) @@ -197,7 +208,6 @@ func (c *Client) ListObjects(ctx context.Context, continuationToken string, maxK // ListAllObjects returns all objects under the client's configured prefix by paginating through // ListObjectsV2 results automatically. It stops after 10,000 pages as a safety valve. func (c *Client) ListAllObjects(ctx context.Context) ([]Object, error) { - const maxPages = 10000 var all []Object var token string for page := 0; page < maxPages; page++ { @@ -299,11 +309,13 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, fullKey string) (s if err := xml.Unmarshal(respBody, &result); err != nil { return "", fmt.Errorf("s3: InitiateMultipartUpload XML: %w", err) } + log.Tag(tagS3Client).Debug("InitiateMultipartUpload key=%s uploadId=%s", fullKey, result.UploadID) return result.UploadID, nil } // uploadPart uploads a single part of a multipart upload and returns the ETag. func (c *Client) uploadPart(ctx context.Context, fullKey, uploadID string, partNumber int, data []byte) (string, error) { + log.Tag(tagS3Client).Debug("UploadPart key=%s part=%d size=%d", fullKey, partNumber, len(data)) reqURL := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", c.objectURL(fullKey), partNumber, url.QueryEscape(uploadID)) req, err := http.NewRequestWithContext(ctx, http.MethodPut, reqURL, bytes.NewReader(data)) if err != nil { @@ -325,6 +337,7 @@ func (c *Client) uploadPart(ctx context.Context, fullKey, uploadID string, partN // completeMultipartUpload finalizes a multipart upload with the given parts. func (c *Client) completeMultipartUpload(ctx context.Context, fullKey, uploadID string, parts []completedPart) error { + log.Tag(tagS3Client).Debug("CompleteMultipartUpload key=%s uploadId=%s parts=%d", fullKey, uploadID, len(parts)) var body bytes.Buffer body.WriteString("") for _, p := range parts { @@ -366,6 +379,7 @@ func (c *Client) completeMultipartUpload(ctx context.Context, fullKey, uploadID // abortMultipartUpload cancels an in-progress multipart upload. Called on error to clean up. func (c *Client) abortMultipartUpload(ctx context.Context, fullKey, uploadID string) { + log.Tag(tagS3Client).Debug("AbortMultipartUpload key=%s uploadId=%s", fullKey, uploadID) reqURL := fmt.Sprintf("%s?uploadId=%s", c.objectURL(fullKey), url.QueryEscape(uploadID)) req, err := http.NewRequestWithContext(ctx, http.MethodDelete, reqURL, nil) if err != nil { diff --git a/s3/util.go b/s3/util.go index 2e3fc233..546a940a 100644 --- a/s3/util.go +++ b/s3/util.go @@ -27,6 +27,9 @@ const ( // above which PutObject switches from a simple PUT to multipart upload. S3 requires a minimum // part size of 5 MB for all parts except the last. partSize = 5 * 1024 * 1024 + + // maxPages is the max number of pages to iterate through when listing objects + maxPages = 10000 ) // ParseURL parses an S3 URL of the form: