Refine, manual review, re-org
This commit is contained in:
parent
039ef19e6c
commit
02ea09ab0f
8 changed files with 206 additions and 193 deletions
|
|
@ -33,7 +33,7 @@ func (b *s3Backend) Get(id string) (io.ReadCloser, int64, error) {
|
|||
}
|
||||
|
||||
func (b *s3Backend) List() ([]object, error) {
|
||||
objects, err := b.client.ListAllObjects(context.Background())
|
||||
objects, err := b.client.ListObjectsV2(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
231
s3/client.go
231
s3/client.go
|
|
@ -47,9 +47,10 @@ func New(config *Config) *Client {
|
|||
// PutObject uploads body to the given key. The key is automatically prefixed with the client's
|
||||
// configured prefix. The body size does not need to be known in advance.
|
||||
//
|
||||
// If the entire body fits in a single part (5 MB), it is uploaded with a simple PUT request.
|
||||
// Otherwise, the body is uploaded using S3 multipart upload, reading one part at a time
|
||||
// into memory.
|
||||
// If the entire body fits in a single part (5 MB), it is uploaded with a simple PUT request
|
||||
// (https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html). Otherwise, the body
|
||||
// is uploaded using S3 multipart upload, reading one part at a time into memory
|
||||
// (https://docs.aws.amazon.com/AmazonS3/latest/API/API_CreateMultipartUpload.html).
|
||||
func (c *Client) PutObject(ctx context.Context, key string, body io.Reader) error {
|
||||
first := make([]byte, partSize)
|
||||
n, err := io.ReadFull(body, first)
|
||||
|
|
@ -61,11 +62,33 @@ func (c *Client) PutObject(ctx context.Context, key string, body io.Reader) erro
|
|||
return c.putObjectMultipart(ctx, key, io.MultiReader(bytes.NewReader(first), body))
|
||||
}
|
||||
|
||||
// putObjectSimple uploads a body with known size using a simple PUT with UNSIGNED-PAYLOAD.
|
||||
func (c *Client) putObjectSimple(ctx context.Context, key string, body io.Reader, size int64) error {
|
||||
log.Tag(tagS3Client).Debug("Uploading object %s (%d bytes)", key, size)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, c.config.ObjectURL(key), body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating upload request object %s failed: %w", key, err)
|
||||
}
|
||||
req.ContentLength = size
|
||||
c.signV4(req, unsignedPayload)
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("uploading object %s failed: %w", key, err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if !isHTTPSuccess(resp) {
|
||||
return parseError(resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetObject downloads an object. The key is automatically prefixed with the client's configured
|
||||
// prefix. The caller must close the returned ReadCloser.
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html
|
||||
func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, int64, error) {
|
||||
log.Tag(tagS3Client).Debug("Fetching object %s from backend", key)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.objectURL(key), nil)
|
||||
log.Tag(tagS3Client).Debug("Fetching object %s", key)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.config.ObjectURL(key), nil)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("error creating HTTP GET request for %s: %w", key, err)
|
||||
}
|
||||
|
|
@ -81,19 +104,88 @@ func (c *Client) GetObject(ctx context.Context, key string) (io.ReadCloser, int6
|
|||
return resp.Body, resp.ContentLength, nil
|
||||
}
|
||||
|
||||
// ListObjectsV2 returns all objects under the client's configured prefix by paginating through
|
||||
// ListObjectsV2 results automatically. Keys in the returned objects have the prefix stripped,
|
||||
// so they match the keys used with PutObject/GetObject/DeleteObjects. It stops after 10,000
|
||||
// pages as a safety valve.
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
|
||||
func (c *Client) ListObjectsV2(ctx context.Context) ([]*Object, error) {
|
||||
var all []*Object
|
||||
var token string
|
||||
for page := 0; page < maxPages; page++ {
|
||||
result, err := c.listObjectsV2(ctx, token, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, obj := range result.Objects {
|
||||
obj.Key = c.config.StripPrefix(obj.Key)
|
||||
all = append(all, obj)
|
||||
}
|
||||
if !result.IsTruncated {
|
||||
return all, nil
|
||||
}
|
||||
token = result.NextContinuationToken
|
||||
}
|
||||
return nil, fmt.Errorf("listing objects exceeded %d pages", maxPages)
|
||||
}
|
||||
|
||||
// listObjectsV2 performs a single ListObjectsV2 request using the client's configured prefix.
|
||||
// Use continuationToken for pagination. Set maxKeys to 0 for the server default (typically 1000).
|
||||
func (c *Client) listObjectsV2(ctx context.Context, continuationToken string, maxKeys int) (*listObjectsV2Result, error) {
|
||||
log.Tag(tagS3Client).Debug("Listing remote objects with continuation token '%s'", continuationToken)
|
||||
query := url.Values{"list-type": {"2"}}
|
||||
if prefix := c.config.ListPrefix(); prefix != "" {
|
||||
query.Set("prefix", prefix)
|
||||
}
|
||||
if continuationToken != "" {
|
||||
query.Set("continuation-token", continuationToken)
|
||||
}
|
||||
if maxKeys > 0 {
|
||||
query.Set("max-keys", strconv.Itoa(maxKeys))
|
||||
}
|
||||
respBody, err := c.do(ctx, "ListObjects", http.MethodGet, c.config.BucketURL()+"?"+query.Encode(), nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var result listObjectsV2Response
|
||||
if err := xml.Unmarshal(respBody, &result); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal list object response: %w", err)
|
||||
}
|
||||
objects := make([]*Object, len(result.Contents))
|
||||
for i, obj := range result.Contents {
|
||||
var lastModified time.Time
|
||||
if obj.LastModified != "" {
|
||||
lastModified, _ = time.Parse(time.RFC3339, obj.LastModified)
|
||||
}
|
||||
objects[i] = &Object{
|
||||
Key: obj.Key,
|
||||
Size: obj.Size,
|
||||
LastModified: lastModified,
|
||||
}
|
||||
}
|
||||
return &listObjectsV2Result{
|
||||
Objects: objects,
|
||||
IsTruncated: result.IsTruncated,
|
||||
NextContinuationToken: result.NextContinuationToken,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// DeleteObjects removes multiple objects in a single batch request. Keys are automatically
|
||||
// prefixed with the client's configured prefix. S3 supports up to 1000 keys per call; the
|
||||
// caller is responsible for batching if needed.
|
||||
//
|
||||
// Even when S3 returns HTTP 200, individual keys may fail. If any per-key errors are present
|
||||
// in the response, they are returned as a combined error.
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
|
||||
func (c *Client) DeleteObjects(ctx context.Context, keys []string) error {
|
||||
log.Tag(tagS3Client).Debug("Deleting %d object(s)", len(keys))
|
||||
req := &deleteRequest{
|
||||
req := &deleteObjectsRequest{
|
||||
Quiet: true,
|
||||
}
|
||||
for _, key := range keys {
|
||||
req.Objects = append(req.Objects, &deleteObject{Key: c.objectKey(key)})
|
||||
req.Objects = append(req.Objects, &deleteObject{Key: c.config.ObjectKey(key)})
|
||||
}
|
||||
body, err := xml.Marshal(req)
|
||||
if err != nil {
|
||||
|
|
@ -105,14 +197,14 @@ func (c *Client) DeleteObjects(ctx context.Context, keys []string) error {
|
|||
headers := map[string]string{
|
||||
"Content-MD5": base64.StdEncoding.EncodeToString(md5Sum[:]),
|
||||
}
|
||||
reqURL := c.config.BucketURL() + "?delete="
|
||||
respBody, err := c.do(ctx, http.MethodPost, reqURL, body, headers, "DeleteObjects")
|
||||
reqURL := c.config.BucketURL() + "?delete"
|
||||
respBody, err := c.do(ctx, "DeleteObjects", http.MethodPost, reqURL, body, headers)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error deleting objects: %w", err)
|
||||
}
|
||||
|
||||
// S3 may return HTTP 200 with per-key errors in the response body
|
||||
var result deleteResult
|
||||
var result deleteObjectsResult
|
||||
if err := xml.Unmarshal(respBody, &result); err != nil {
|
||||
return nil // If we can't parse, assume success (Quiet mode returns empty body on success)
|
||||
}
|
||||
|
|
@ -126,95 +218,10 @@ func (c *Client) DeleteObjects(ctx context.Context, keys []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// listObjects performs a single ListObjectsV2 request using the client's configured prefix.
|
||||
// Use continuationToken for pagination. Set maxKeys to 0 for the server default (typically 1000).
|
||||
func (c *Client) listObjects(ctx context.Context, continuationToken string, maxKeys int) (*listResult, error) {
|
||||
log.Tag(tagS3Client).Debug("ListObjects continuation=%s maxKeys=%d", continuationToken, maxKeys)
|
||||
query := url.Values{"list-type": {"2"}}
|
||||
if prefix := c.prefixForList(); prefix != "" {
|
||||
query.Set("prefix", prefix)
|
||||
}
|
||||
if continuationToken != "" {
|
||||
query.Set("continuation-token", continuationToken)
|
||||
}
|
||||
if maxKeys > 0 {
|
||||
query.Set("max-keys", strconv.Itoa(maxKeys))
|
||||
}
|
||||
respBody, err := c.do(ctx, http.MethodGet, c.config.BucketURL()+"?"+query.Encode(), nil, nil, "ListObjects")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var result listObjectsV2Response
|
||||
if err := xml.Unmarshal(respBody, &result); err != nil {
|
||||
return nil, fmt.Errorf("s3: ListObjects XML: %w", err)
|
||||
}
|
||||
objects := make([]Object, len(result.Contents))
|
||||
for i, obj := range result.Contents {
|
||||
var lastModified time.Time
|
||||
if obj.LastModified != "" {
|
||||
lastModified, _ = time.Parse(time.RFC3339, obj.LastModified)
|
||||
}
|
||||
objects[i] = Object{
|
||||
Key: obj.Key,
|
||||
Size: obj.Size,
|
||||
LastModified: lastModified,
|
||||
}
|
||||
}
|
||||
return &listResult{
|
||||
Objects: objects,
|
||||
IsTruncated: result.IsTruncated,
|
||||
NextContinuationToken: result.NextContinuationToken,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListAllObjects returns all objects under the client's configured prefix by paginating through
|
||||
// ListObjectsV2 results automatically. Keys in the returned objects have the prefix stripped,
|
||||
// so they match the keys used with PutObject/GetObject/DeleteObjects. It stops after 10,000
|
||||
// pages as a safety valve.
|
||||
func (c *Client) ListAllObjects(ctx context.Context) ([]Object, error) {
|
||||
var all []Object
|
||||
var token string
|
||||
for page := 0; page < maxPages; page++ {
|
||||
result, err := c.listObjects(ctx, token, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, obj := range result.Objects {
|
||||
obj.Key = c.stripPrefix(obj.Key)
|
||||
all = append(all, obj)
|
||||
}
|
||||
if !result.IsTruncated {
|
||||
return all, nil
|
||||
}
|
||||
token = result.NextContinuationToken
|
||||
}
|
||||
return nil, fmt.Errorf("s3: ListAllObjects exceeded %d pages", maxPages)
|
||||
}
|
||||
|
||||
// putObjectSimple uploads a body with known size using a simple PUT with UNSIGNED-PAYLOAD.
|
||||
func (c *Client) putObjectSimple(ctx context.Context, key string, body io.Reader, size int64) error {
|
||||
log.Tag(tagS3Client).Debug("Uploading object %s (%d bytes)", key, size)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, c.objectURL(key), body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("uploading object %s failed: %w", key, err)
|
||||
}
|
||||
req.ContentLength = size
|
||||
c.signV4(req, unsignedPayload)
|
||||
resp, err := c.http.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3: PutObject: %w", err)
|
||||
}
|
||||
resp.Body.Close()
|
||||
if !isHTTPSuccess(resp) {
|
||||
return parseError(resp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// do creates a signed request, executes it, reads the response body, and checks for errors.
|
||||
// If body is nil, the request is sent with an empty payload. If body is non-nil, it is sent
|
||||
// with a computed SHA-256 payload hash and Content-Type: application/xml.
|
||||
func (c *Client) do(ctx context.Context, method, reqURL string, body []byte, headers map[string]string, op string) ([]byte, error) {
|
||||
func (c *Client) do(ctx context.Context, op, method, reqURL string, body []byte, headers map[string]string) ([]byte, error) {
|
||||
var reader io.Reader
|
||||
var hash string
|
||||
if body != nil {
|
||||
|
|
@ -251,35 +258,3 @@ func (c *Client) do(ctx context.Context, method, reqURL string, body []byte, hea
|
|||
}
|
||||
return respBody, nil
|
||||
}
|
||||
|
||||
// prefixForList returns the prefix to use in ListObjectsV2 requests,
|
||||
// with a trailing slash so that only objects under the prefix directory are returned.
|
||||
func (c *Client) prefixForList() string {
|
||||
if c.config.Prefix != "" {
|
||||
return c.config.Prefix + "/"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// stripPrefix removes the configured prefix from a key returned by ListObjectsV2,
|
||||
// so keys match what was passed to PutObject/GetObject/DeleteObjects.
|
||||
func (c *Client) stripPrefix(key string) string {
|
||||
if c.config.Prefix != "" {
|
||||
return strings.TrimPrefix(key, c.config.Prefix+"/")
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// objectKey prepends the configured prefix to the given key.
|
||||
func (c *Client) objectKey(key string) string {
|
||||
if c.config.Prefix != "" {
|
||||
return c.config.Prefix + "/" + key
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// objectURL returns the full URL for an object, automatically prepending the configured prefix.
|
||||
func (c *Client) objectURL(key string) string {
|
||||
u, _ := url.JoinPath(c.config.BucketURL(), c.objectKey(key))
|
||||
return u
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,8 @@ import (
|
|||
|
||||
// signV4 signs req in place using AWS Signature V4. payloadHash is the hex-encoded SHA-256
|
||||
// of the request body, or the literal string "UNSIGNED-PAYLOAD" for streaming uploads.
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/sig-v4-authenticating-requests.html
|
||||
func (c *Client) signV4(req *http.Request, hash string) {
|
||||
now := time.Now().UTC()
|
||||
datestamp := now.Format("20060102")
|
||||
|
|
|
|||
|
|
@ -16,6 +16,9 @@ import (
|
|||
|
||||
// AbortIncompleteUploads lists all in-progress multipart uploads and aborts those initiated
|
||||
// before the given cutoff time. This cleans up orphaned upload parts from interrupted uploads.
|
||||
//
|
||||
// See https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListMultipartUploads.html
|
||||
// and https://docs.aws.amazon.com/AmazonS3/latest/API/API_AbortMultipartUpload.html
|
||||
func (c *Client) AbortIncompleteUploads(ctx context.Context, cutoff time.Time) error {
|
||||
uploads, err := c.listMultipartUploads(ctx)
|
||||
if err != nil {
|
||||
|
|
@ -32,19 +35,19 @@ func (c *Client) AbortIncompleteUploads(ctx context.Context, cutoff time.Time) e
|
|||
|
||||
// listMultipartUploads returns in-progress multipart uploads for the client's prefix.
|
||||
// It paginates automatically, stopping after 10,000 pages as a safety valve.
|
||||
func (c *Client) listMultipartUploads(ctx context.Context) ([]MultipartUpload, error) {
|
||||
var all []MultipartUpload
|
||||
func (c *Client) listMultipartUploads(ctx context.Context) ([]*multipartUpload, error) {
|
||||
var all []*multipartUpload
|
||||
var keyMarker, uploadIDMarker string
|
||||
for page := 0; page < maxPages; page++ {
|
||||
query := url.Values{"uploads": {""}}
|
||||
if prefix := c.prefixForList(); prefix != "" {
|
||||
if prefix := c.config.ListPrefix(); prefix != "" {
|
||||
query.Set("prefix", prefix)
|
||||
}
|
||||
if keyMarker != "" {
|
||||
query.Set("key-marker", keyMarker)
|
||||
query.Set("upload-id-marker", uploadIDMarker)
|
||||
}
|
||||
respBody, err := c.do(ctx, http.MethodGet, c.config.BucketURL()+"?"+query.Encode(), nil, nil, "listMultipartUploads")
|
||||
respBody, err := c.do(ctx, "listMultipartUploads", http.MethodGet, c.config.BucketURL()+"?"+query.Encode(), nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -57,7 +60,7 @@ func (c *Client) listMultipartUploads(ctx context.Context) ([]MultipartUpload, e
|
|||
if u.Initiated != "" {
|
||||
initiated, _ = time.Parse(time.RFC3339, u.Initiated)
|
||||
}
|
||||
all = append(all, MultipartUpload{
|
||||
all = append(all, &multipartUpload{
|
||||
Key: u.Key,
|
||||
UploadID: u.UploadID,
|
||||
Initiated: initiated,
|
||||
|
|
@ -114,7 +117,7 @@ func (c *Client) putObjectMultipart(ctx context.Context, key string, body io.Rea
|
|||
|
||||
// initiateMultipartUpload starts a new multipart upload and returns the upload ID.
|
||||
func (c *Client) initiateMultipartUpload(ctx context.Context, key string) (string, error) {
|
||||
respBody, err := c.do(ctx, http.MethodPost, c.objectURL(key)+"?uploads", nil, nil, "InitiateMultipartUpload")
|
||||
respBody, err := c.do(ctx, "InitiateMultipartUpload", http.MethodPost, c.config.ObjectURL(key)+"?uploads", nil, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
@ -129,7 +132,7 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, key string) (strin
|
|||
// uploadPart uploads a single part of a multipart upload and returns the ETag.
|
||||
func (c *Client) uploadPart(ctx context.Context, key, uploadID string, partNumber int, data []byte) (string, error) {
|
||||
log.Tag(tagS3Client).Debug("UploadPart key=%s part=%d size=%d", key, partNumber, len(data))
|
||||
reqURL := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", c.objectURL(key), partNumber, url.QueryEscape(uploadID))
|
||||
reqURL := fmt.Sprintf("%s?partNumber=%d&uploadId=%s", c.config.ObjectURL(key), partNumber, url.QueryEscape(uploadID))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, reqURL, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("s3: UploadPart request: %w", err)
|
||||
|
|
@ -155,9 +158,8 @@ func (c *Client) completeMultipartUpload(ctx context.Context, key, uploadID stri
|
|||
if err != nil {
|
||||
return fmt.Errorf("s3: CompleteMultipartUpload marshal: %w", err)
|
||||
}
|
||||
respBody, err := c.do(ctx, http.MethodPost,
|
||||
fmt.Sprintf("%s?uploadId=%s", c.objectURL(key), url.QueryEscape(uploadID)),
|
||||
bodyBytes, nil, "CompleteMultipartUpload")
|
||||
reqURL := fmt.Sprintf("%s?uploadId=%s", c.config.ObjectURL(key), url.QueryEscape(uploadID))
|
||||
respBody, err := c.do(ctx, "CompleteMultipartUpload", http.MethodPost, reqURL, bodyBytes, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -172,7 +174,7 @@ func (c *Client) completeMultipartUpload(ctx context.Context, key, uploadID stri
|
|||
// abortMultipartUpload cancels an in-progress multipart upload. Called on error to clean up.
|
||||
func (c *Client) abortMultipartUpload(ctx context.Context, key, uploadID string) {
|
||||
log.Tag(tagS3Client).Debug("AbortMultipartUpload key=%s uploadId=%s", key, uploadID)
|
||||
reqURL := fmt.Sprintf("%s?uploadId=%s", c.objectURL(key), url.QueryEscape(uploadID))
|
||||
reqURL := fmt.Sprintf("%s?uploadId=%s", c.config.ObjectURL(key), url.QueryEscape(uploadID))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, reqURL, nil)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
|
|||
|
|
@ -373,14 +373,14 @@ func TestConfig_BucketURL_VirtualHosted(t *testing.T) {
|
|||
require.Equal(t, "https://my-bucket.s3.us-east-1.amazonaws.com", c.BucketURL())
|
||||
}
|
||||
|
||||
func TestClient_ObjectURL_PathStyle(t *testing.T) {
|
||||
c := &Client{config: &Config{Endpoint: "s3.example.com", Bucket: "my-bucket", Prefix: "prefix", PathStyle: true}}
|
||||
require.Equal(t, "https://s3.example.com/my-bucket/prefix/obj", c.objectURL("obj"))
|
||||
func TestConfig_ObjectURL_PathStyle(t *testing.T) {
|
||||
c := &Config{Endpoint: "s3.example.com", Bucket: "my-bucket", Prefix: "prefix", PathStyle: true}
|
||||
require.Equal(t, "https://s3.example.com/my-bucket/prefix/obj", c.ObjectURL("obj"))
|
||||
}
|
||||
|
||||
func TestClient_ObjectURL_VirtualHosted(t *testing.T) {
|
||||
c := &Client{config: &Config{Endpoint: "s3.us-east-1.amazonaws.com", Bucket: "my-bucket", Prefix: "prefix", PathStyle: false}}
|
||||
require.Equal(t, "https://my-bucket.s3.us-east-1.amazonaws.com/prefix/obj", c.objectURL("obj"))
|
||||
func TestConfig_ObjectURL_VirtualHosted(t *testing.T) {
|
||||
c := &Config{Endpoint: "s3.us-east-1.amazonaws.com", Bucket: "my-bucket", Prefix: "prefix", PathStyle: false}
|
||||
require.Equal(t, "https://my-bucket.s3.us-east-1.amazonaws.com/prefix/obj", c.ObjectURL("obj"))
|
||||
}
|
||||
|
||||
func TestConfig_HostHeader_PathStyle(t *testing.T) {
|
||||
|
|
@ -393,20 +393,20 @@ func TestConfig_HostHeader_VirtualHosted(t *testing.T) {
|
|||
require.Equal(t, "my-bucket.s3.us-east-1.amazonaws.com", c.HostHeader())
|
||||
}
|
||||
|
||||
func TestClient_ObjectKey(t *testing.T) {
|
||||
c := &Client{config: &Config{Prefix: "attachments"}}
|
||||
require.Equal(t, "attachments/file123", c.objectKey("file123"))
|
||||
func TestConfig_ObjectKey(t *testing.T) {
|
||||
c := &Config{Prefix: "attachments"}
|
||||
require.Equal(t, "attachments/file123", c.ObjectKey("file123"))
|
||||
|
||||
c2 := &Client{config: &Config{Prefix: ""}}
|
||||
require.Equal(t, "file123", c2.objectKey("file123"))
|
||||
c2 := &Config{Prefix: ""}
|
||||
require.Equal(t, "file123", c2.ObjectKey("file123"))
|
||||
}
|
||||
|
||||
func TestClient_PrefixForList(t *testing.T) {
|
||||
c := &Client{config: &Config{Prefix: "attachments"}}
|
||||
require.Equal(t, "attachments/", c.prefixForList())
|
||||
func TestConfig_ListPrefix(t *testing.T) {
|
||||
c := &Config{Prefix: "attachments"}
|
||||
require.Equal(t, "attachments/", c.ListPrefix())
|
||||
|
||||
c2 := &Client{config: &Config{Prefix: ""}}
|
||||
require.Equal(t, "", c2.prefixForList())
|
||||
c2 := &Config{Prefix: ""}
|
||||
require.Equal(t, "", c2.ListPrefix())
|
||||
}
|
||||
|
||||
// --- Integration tests using mock S3 server ---
|
||||
|
|
@ -512,13 +512,13 @@ func TestClient_ListObjects(t *testing.T) {
|
|||
require.Nil(t, err)
|
||||
|
||||
// List with prefix client: should only see 3
|
||||
result, err := client.listObjects(ctx, "", 0)
|
||||
result, err := client.listObjectsV2(ctx, "", 0)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result.Objects, 3)
|
||||
require.False(t, result.IsTruncated)
|
||||
|
||||
// List with no-prefix client: should see all 4
|
||||
result, err = clientNoPrefix.listObjects(ctx, "", 0)
|
||||
result, err = clientNoPrefix.listObjectsV2(ctx, "", 0)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result.Objects, 4)
|
||||
}
|
||||
|
|
@ -537,20 +537,20 @@ func TestClient_ListObjects_Pagination(t *testing.T) {
|
|||
}
|
||||
|
||||
// List with max-keys=2
|
||||
result, err := client.listObjects(ctx, "", 2)
|
||||
result, err := client.listObjectsV2(ctx, "", 2)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result.Objects, 2)
|
||||
require.True(t, result.IsTruncated)
|
||||
require.NotEmpty(t, result.NextContinuationToken)
|
||||
|
||||
// Get next page
|
||||
result2, err := client.listObjects(ctx, result.NextContinuationToken, 2)
|
||||
result2, err := client.listObjectsV2(ctx, result.NextContinuationToken, 2)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result2.Objects, 2)
|
||||
require.True(t, result2.IsTruncated)
|
||||
|
||||
// Get last page
|
||||
result3, err := client.listObjects(ctx, result2.NextContinuationToken, 2)
|
||||
result3, err := client.listObjectsV2(ctx, result2.NextContinuationToken, 2)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, result3.Objects, 1)
|
||||
require.False(t, result3.IsTruncated)
|
||||
|
|
@ -568,7 +568,7 @@ func TestClient_ListAllObjects(t *testing.T) {
|
|||
require.Nil(t, err)
|
||||
}
|
||||
|
||||
objects, err := client.ListAllObjects(ctx)
|
||||
objects, err := client.ListObjectsV2(ctx)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, objects, 10)
|
||||
}
|
||||
|
|
@ -688,7 +688,7 @@ func TestClient_ListAllObjects_20k(t *testing.T) {
|
|||
}
|
||||
|
||||
// List all 20k objects with pagination
|
||||
objects, err := client.ListAllObjects(ctx)
|
||||
objects, err := client.ListObjectsV2(ctx)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, objects, numObjects)
|
||||
|
||||
|
|
@ -708,7 +708,7 @@ func TestClient_ListAllObjects_20k(t *testing.T) {
|
|||
require.Nil(t, err)
|
||||
|
||||
// List again: should have 19000
|
||||
objects, err = client.ListAllObjects(ctx)
|
||||
objects, err = client.ListObjectsV2(ctx)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, objects, numObjects-1000)
|
||||
}
|
||||
|
|
@ -757,7 +757,7 @@ func TestClient_RealBucket(t *testing.T) {
|
|||
ctx := context.Background()
|
||||
|
||||
// Clean up any leftover objects from previous runs
|
||||
existing, err := client.ListAllObjects(ctx)
|
||||
existing, err := client.ListObjectsV2(ctx)
|
||||
require.Nil(t, err)
|
||||
if len(existing) > 0 {
|
||||
keys := make([]string, len(existing))
|
||||
|
|
@ -823,7 +823,7 @@ func TestClient_RealBucket(t *testing.T) {
|
|||
}
|
||||
|
||||
// List
|
||||
objects, err := listClient.ListAllObjects(ctx)
|
||||
objects, err := listClient.ListObjectsV2(ctx)
|
||||
require.Nil(t, err)
|
||||
require.Len(t, objects, 10)
|
||||
|
||||
|
|
|
|||
78
s3/types.go
78
s3/types.go
|
|
@ -4,6 +4,8 @@ import (
|
|||
"encoding/xml"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
|
@ -19,7 +21,7 @@ type Config struct {
|
|||
HTTPClient *http.Client // if nil, http.DefaultClient is used
|
||||
}
|
||||
|
||||
// bucketURL returns the base URL for bucket-level operations.
|
||||
// BucketURL returns the base URL for bucket-level operations.
|
||||
func (c *Config) BucketURL() string {
|
||||
if c.PathStyle {
|
||||
return fmt.Sprintf("https://%s/%s", c.Endpoint, c.Bucket)
|
||||
|
|
@ -27,7 +29,7 @@ func (c *Config) BucketURL() string {
|
|||
return fmt.Sprintf("https://%s.%s", c.Bucket, c.Endpoint)
|
||||
}
|
||||
|
||||
// hostHeader returns the value for the Host header.
|
||||
// HostHeader returns the value for the Host header.
|
||||
func (c *Config) HostHeader() string {
|
||||
if c.PathStyle {
|
||||
return c.Endpoint
|
||||
|
|
@ -35,6 +37,38 @@ func (c *Config) HostHeader() string {
|
|||
return c.Bucket + "." + c.Endpoint
|
||||
}
|
||||
|
||||
// ListPrefix returns the prefix to use in ListObjectsV2 requests,
|
||||
// with a trailing slash so that only objects under the prefix directory are returned.
|
||||
func (c *Config) ListPrefix() string {
|
||||
if c.Prefix != "" {
|
||||
return c.Prefix + "/"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// StripPrefix removes the configured prefix from a key returned by ListObjectsV2,
|
||||
// so keys match what was passed to PutObject/GetObject/DeleteObjects.
|
||||
func (c *Config) StripPrefix(key string) string {
|
||||
if c.Prefix != "" {
|
||||
return strings.TrimPrefix(key, c.Prefix+"/")
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// ObjectKey prepends the configured prefix to the given key.
|
||||
func (c *Config) ObjectKey(key string) string {
|
||||
if c.Prefix != "" {
|
||||
return c.Prefix + "/" + key
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
// ObjectURL returns the full URL for an object, automatically prepending the configured prefix.
|
||||
func (c *Config) ObjectURL(key string) string {
|
||||
u, _ := url.JoinPath(c.BucketURL(), c.ObjectKey(key))
|
||||
return u
|
||||
}
|
||||
|
||||
// Object represents an S3 object returned by list operations.
|
||||
type Object struct {
|
||||
Key string
|
||||
|
|
@ -42,13 +76,6 @@ type Object struct {
|
|||
LastModified time.Time
|
||||
}
|
||||
|
||||
// listResult holds the response from a single ListObjectsV2 page.
|
||||
type listResult struct {
|
||||
Objects []Object
|
||||
IsTruncated bool
|
||||
NextContinuationToken string
|
||||
}
|
||||
|
||||
// ErrorResponse is returned when S3 responds with a non-2xx status code.
|
||||
type ErrorResponse struct {
|
||||
StatusCode int
|
||||
|
|
@ -66,9 +93,16 @@ func (e *ErrorResponse) Error() string {
|
|||
|
||||
// listObjectsV2Response is the XML response from S3 ListObjectsV2
|
||||
type listObjectsV2Response struct {
|
||||
Contents []listObject `xml:"Contents"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
NextContinuationToken string `xml:"NextContinuationToken"`
|
||||
Contents []*listObject `xml:"Contents"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
NextContinuationToken string `xml:"NextContinuationToken"`
|
||||
}
|
||||
|
||||
// listObjectsV2Result holds the response from a single ListObjectsV2 page.
|
||||
type listObjectsV2Result struct {
|
||||
Objects []*Object
|
||||
IsTruncated bool
|
||||
NextContinuationToken string
|
||||
}
|
||||
|
||||
type listObject struct {
|
||||
|
|
@ -77,8 +111,8 @@ type listObject struct {
|
|||
LastModified string `xml:"LastModified"`
|
||||
}
|
||||
|
||||
// deleteRequest is the XML request body for S3 DeleteObjects
|
||||
type deleteRequest struct {
|
||||
// deleteObjectsRequest is the XML request body for S3 DeleteObjects
|
||||
type deleteObjectsRequest struct {
|
||||
XMLName xml.Name `xml:"Delete"`
|
||||
Quiet bool `xml:"Quiet"`
|
||||
Objects []*deleteObject `xml:"Object"`
|
||||
|
|
@ -88,8 +122,8 @@ type deleteObject struct {
|
|||
Key string `xml:"Key"`
|
||||
}
|
||||
|
||||
// deleteResult is the XML response from S3 DeleteObjects
|
||||
type deleteResult struct {
|
||||
// deleteObjectsResult is the XML response from S3 DeleteObjects
|
||||
type deleteObjectsResult struct {
|
||||
Errors []deleteError `xml:"Error"`
|
||||
}
|
||||
|
||||
|
|
@ -99,8 +133,8 @@ type deleteError struct {
|
|||
Message string `xml:"Message"`
|
||||
}
|
||||
|
||||
// MultipartUpload represents an in-progress multipart upload returned by listMultipartUploads.
|
||||
type MultipartUpload struct {
|
||||
// multipartUpload represents an in-progress multipart upload returned by listMultipartUploads.
|
||||
type multipartUpload struct {
|
||||
Key string
|
||||
UploadID string
|
||||
Initiated time.Time
|
||||
|
|
@ -108,10 +142,10 @@ type MultipartUpload struct {
|
|||
|
||||
// listMultipartUploadsResult is the XML response from S3 listMultipartUploads
|
||||
type listMultipartUploadsResult struct {
|
||||
Uploads []listUpload `xml:"Upload"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
NextKeyMarker string `xml:"NextKeyMarker"`
|
||||
NextUploadIDMarker string `xml:"NextUploadIdMarker"`
|
||||
Uploads []*listUpload `xml:"Upload"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
NextKeyMarker string `xml:"NextKeyMarker"`
|
||||
NextUploadIDMarker string `xml:"NextUploadIdMarker"`
|
||||
}
|
||||
|
||||
type listUpload struct {
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ const (
|
|||
// Sent as the payload hash for streaming uploads where the body is not buffered in memory
|
||||
unsignedPayload = "UNSIGNED-PAYLOAD"
|
||||
|
||||
// maxResponseBytes caps the size of S3 response bodies we read into memory (10 MB)
|
||||
maxResponseBytes = 10 * 1024 * 1024
|
||||
// maxResponseBytes caps the size of S3 response bodies we read into memory
|
||||
maxResponseBytes = 2 * 1024 * 1024
|
||||
|
||||
// partSize is the size of each part for multipart uploads (5 MB). This is also the threshold
|
||||
// above which PutObject switches from a simple PUT to multipart upload. S3 requires a minimum
|
||||
|
|
@ -29,7 +29,7 @@ const (
|
|||
partSize = 5 * 1024 * 1024
|
||||
|
||||
// maxPages is the max number of pages to iterate through when listing objects
|
||||
maxPages = 10000
|
||||
maxPages = 500
|
||||
)
|
||||
|
||||
// ParseURL parses an S3 URL of the form:
|
||||
|
|
@ -88,7 +88,7 @@ func ParseURL(s3URL string) (*Config, error) {
|
|||
func parseError(resp *http.Response) error {
|
||||
body, err := io.ReadAll(io.LimitReader(resp.Body, maxResponseBytes))
|
||||
if err != nil {
|
||||
return fmt.Errorf("s3: reading error response: %w", err)
|
||||
return fmt.Errorf("error reading S3 error response: %w", err)
|
||||
}
|
||||
return parseErrorFromBytes(resp.StatusCode, body)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ func cmdRm(ctx context.Context, client *s3.Client) {
|
|||
}
|
||||
|
||||
func cmdLs(ctx context.Context, client *s3.Client) {
|
||||
objects, err := client.ListAllObjects(ctx)
|
||||
objects, err := client.ListObjectsV2(ctx)
|
||||
if err != nil {
|
||||
fail("ls: %s", err)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue