s3store.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package storage
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "strings"
  10. "time"
  11. "github.com/aws/aws-sdk-go-v2/aws"
  12. "github.com/aws/aws-sdk-go-v2/config"
  13. "github.com/aws/aws-sdk-go-v2/credentials"
  14. "github.com/aws/aws-sdk-go-v2/service/s3"
  15. "github.com/aws/aws-sdk-go-v2/service/s3/types"
  16. cfgpkg "viewer/internal/config"
  17. )
  18. type S3Store struct {
  19. bucket string
  20. client *s3.Client
  21. presigner *s3.PresignClient
  22. }
  23. func NewS3Store(ctx context.Context, cfg cfgpkg.Config) (*S3Store, error) {
  24. awsCfg, err := config.LoadDefaultConfig(
  25. ctx,
  26. config.WithRegion(cfg.S3Region),
  27. config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(cfg.S3AccessKey, cfg.S3SecretKey, "")),
  28. )
  29. if err != nil {
  30. return nil, fmt.Errorf("load aws config: %w", err)
  31. }
  32. awsCfg.EndpointResolverWithOptions = aws.EndpointResolverWithOptionsFunc(
  33. func(service, region string, options ...interface{}) (aws.Endpoint, error) {
  34. if service == s3.ServiceID {
  35. return aws.Endpoint{URL: cfg.S3Endpoint, HostnameImmutable: true}, nil
  36. }
  37. return aws.Endpoint{}, &aws.EndpointNotFoundError{}
  38. },
  39. )
  40. client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
  41. o.UsePathStyle = cfg.S3UsePathStyle
  42. })
  43. return &S3Store{
  44. bucket: cfg.S3Bucket,
  45. client: client,
  46. presigner: s3.NewPresignClient(client),
  47. }, nil
  48. }
  49. func (s *S3Store) PresignPut(ctx context.Context, key string, ttl time.Duration) (string, map[string]string, error) {
  50. out, err := s.presigner.PresignPutObject(ctx, &s3.PutObjectInput{
  51. Bucket: aws.String(s.bucket),
  52. Key: aws.String(key),
  53. }, s3.WithPresignExpires(ttl))
  54. if err != nil {
  55. return "", nil, fmt.Errorf("presign put: %w", err)
  56. }
  57. return out.URL, map[string]string{}, nil
  58. }
  59. func (s *S3Store) PutObject(ctx context.Context, key string, body io.Reader, contentType string) error {
  60. _, err := s.client.PutObject(ctx, &s3.PutObjectInput{
  61. Bucket: aws.String(s.bucket),
  62. Key: aws.String(key),
  63. Body: body,
  64. ContentType: aws.String(contentType),
  65. })
  66. if err != nil {
  67. return fmt.Errorf("put object %s: %w", key, err)
  68. }
  69. return nil
  70. }
  71. func (s *S3Store) PutJSON(ctx context.Context, key string, v any) error {
  72. b, err := json.Marshal(v)
  73. if err != nil {
  74. return fmt.Errorf("marshal json: %w", err)
  75. }
  76. return s.PutObject(ctx, key, bytes.NewReader(b), "application/json")
  77. }
  78. func (s *S3Store) GetObject(ctx context.Context, key string) (io.ReadCloser, string, error) {
  79. out, err := s.client.GetObject(ctx, &s3.GetObjectInput{
  80. Bucket: aws.String(s.bucket),
  81. Key: aws.String(key),
  82. })
  83. if err != nil {
  84. return nil, "", fmt.Errorf("get object %s: %w", key, err)
  85. }
  86. ct := "application/octet-stream"
  87. if out.ContentType != nil {
  88. ct = *out.ContentType
  89. }
  90. return out.Body, ct, nil
  91. }
  92. func (s *S3Store) ReadJSON(ctx context.Context, key string, out any) error {
  93. body, _, err := s.GetObject(ctx, key)
  94. if err != nil {
  95. return err
  96. }
  97. defer body.Close()
  98. if err := json.NewDecoder(body).Decode(out); err != nil {
  99. return fmt.Errorf("decode json %s: %w", key, err)
  100. }
  101. return nil
  102. }
  103. func (s *S3Store) HeadObject(ctx context.Context, key string) (bool, int64, error) {
  104. o, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
  105. Bucket: aws.String(s.bucket),
  106. Key: aws.String(key),
  107. })
  108. if err != nil {
  109. var noSuch *types.NotFound
  110. if errors.As(err, &noSuch) || strings.Contains(strings.ToLower(err.Error()), "not found") {
  111. return false, 0, nil
  112. }
  113. return false, 0, fmt.Errorf("head object %s: %w", key, err)
  114. }
  115. var size int64
  116. if o.ContentLength != nil {
  117. size = *o.ContentLength
  118. }
  119. return true, size, nil
  120. }
  121. func (s *S3Store) ListAlbumIndexKeys(ctx context.Context) ([]string, error) {
  122. keys := make([]string, 0, 128)
  123. var token *string
  124. for {
  125. out, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
  126. Bucket: aws.String(s.bucket),
  127. Prefix: aws.String("albums/"),
  128. ContinuationToken: token,
  129. })
  130. if err != nil {
  131. return nil, fmt.Errorf("list objects: %w", err)
  132. }
  133. for _, obj := range out.Contents {
  134. if obj.Key == nil {
  135. continue
  136. }
  137. if strings.HasSuffix(*obj.Key, "/index.json") {
  138. keys = append(keys, *obj.Key)
  139. }
  140. }
  141. if out.IsTruncated == nil || !*out.IsTruncated {
  142. break
  143. }
  144. token = out.NextContinuationToken
  145. }
  146. return keys, nil
  147. }