lib/httpimagestore/configuration/s3.rb in httpimagestore-1.2.0 vs lib/httpimagestore/configuration/s3.rb in httpimagestore-1.3.0

- old
+ new

@@ -1,6 +1,8 @@ require 'aws-sdk' +require 'digest/sha2' +require 'msgpack' require 'httpimagestore/aws_sdk_regions_hack' require 'httpimagestore/configuration/path' require 'httpimagestore/configuration/handler' module Configuration @@ -59,10 +61,194 @@ Global.register_node_parser S3 class S3SourceStoreBase < SourceStoreBase include ClassLogging + class CacheRoot + CacheRootError = Class.new ArgumentError + class CacheRootNotDirError < CacheRootError + def initialize(root_dir) + super "S3 object cache directory '#{root_dir}' does not exist or not a directory" + end + end + + class CacheRootNotWritableError < CacheRootError + def initialize(root_dir) + super "S3 object cache directory '#{root_dir}' is not writable" + end + end + + class CacheRootNotAccessibleError < CacheRootError + def initialize(root_dir) + super "S3 object cache directory '#{root_dir}' is not readable" + end + end + + def initialize(root_dir) + @root = Pathname.new(root_dir) + @root.directory? or raise CacheRootNotDirError.new(root_dir) + @root.executable? or raise CacheRootNotAccessibleError.new(root_dir) + @root.writable? or raise CacheRootNotWritableError.new(root_dir) + end + + def cache_file(bucket, key) + File.join(Digest::SHA2.new.update("#{bucket}/#{key}").to_s[0,32].match(/(..)(..)(.*)/).captures) + end + + def open(bucket, key) + # TODO: locking + file = @root + cache_file(bucket, key) + + file.dirname.directory? or file.dirname.mkpath + if file.exist? + file.open('r+') do |io| + yield io + end + else + file.open('w+') do |io| + yield io + end + end + end + end + + class S3Object + def initialize(client, bucket, key) + @client = client + @bucket = bucket + @key = key + end + + def s3_object + return @s3_object if @s3_object + @s3_object = @client.buckets[@bucket].objects[@key] + end + + def read(max_bytes = nil) + options = {} + options[:range] = 0..max_bytes if max_bytes + s3_object.read(options) + end + + def write(data, options = {}) + s3_object.write(data, options) + end + + def private_url + s3_object.url_for(:read, expires: 60 * 60 * 24 * 365 * 20).to_s # expire in 20 years + end + + def public_url + s3_object.public_url.to_s + end + + def content_type + s3_object.head[:content_type] + end + end + + class CacheObject < S3Object + include ClassLogging + + def initialize(io, client, bucket, key) + @io = io + super(client, bucket, key) + + @header = {} + @have_cache = false + @dirty = false + + begin + head_length = @io.read(4) + + if head_length and head_length.length == 4 + head_length = head_length.unpack('L').first + @header = MessagePack.unpack(@io.read(head_length)) + @have_cache = true + + log.debug{"S3 object cache hit; bucket: '#{@bucket}' key: '#{@key}' [#{@io.path}]: header: #{@header}"} + else + log.debug{"S3 object cache miss; bucket: '#{@bucket}' key: '#{@key}' [#{@io.path}]"} + end + rescue => error + log.warn "cannot use cached S3 object; bucket: '#{@bucket}' key: '#{@key}' [#{@io.path}]: #{error}" + # not usable + io.seek 0 + io.truncate 0 + end + + yield self + + # save object as was used if no error happened and there were changes + write_cache if dirty? + end + + def read(max_bytes = nil) + if @have_cache + data_location = @io.seek(0, IO::SEEK_CUR) + begin + return @data = @io.read(max_bytes) + ensure + @io.seek(data_location, IO::SEEK_SET) + end + else + dirty! :read + return @data = super + end + end + + def write(data, options = {}) + out = super + @data = data + dirty! :write + out + end + + def private_url + @header['private_url'] ||= (dirty! :private_url; super) + end + + def public_url + @header['public_url'] ||= (dirty! :public_url; super) + end + + def content_type + @header['content_type'] ||= (dirty! :content_type; super) + end + + private + + def write_cache + begin + log.debug{"S3 object is dirty, wirting cache file; bucket: '#{@bucket}' key: '#{@key}' [#{@io.path}]; header: #{@header}"} + + raise 'nil data!' unless @data + # rewrite + @io.seek(0, IO::SEEK_SET) + @io.truncate 0 + + header = MessagePack.pack(@header) + @io.write [header.length].pack('L') # header length + @io.write header + @io.write @data + rescue => error + log.warn "cannot store S3 object in cache: bucket: '#{@bucket}' key: '#{@key}' [#{@io.path}]: #{error}" + ensure + @dirty = false + end + end + + def dirty!(reason = :unknown) + log.debug{"marking cache dirty for reason: #{reason}"} + @dirty = true + end + + def dirty? + @dirty + end + end + extend Stats def_stats( :total_s3_store, :total_s3_store_bytes, :total_s3_source, @@ -73,12 +259,12 @@ image_name = node.grab_values('image name').first node.required_attributes('bucket', 'path') node.valid_attribute_values('public_access', true, false, nil) - bucket, path_spec, public_access, cache_control, prefix, if_image_name_on = - *node.grab_attributes('bucket', 'path', 'public', 'cache-control', 'prefix', 'if-image-name-on') + bucket, path_spec, public_access, cache_control, prefix, cache_root, if_image_name_on = + *node.grab_attributes('bucket', 'path', 'public', 'cache-control', 'prefix', 'cache-root', 'if-image-name-on') public_access = false if public_access.nil? prefix = '' if prefix.nil? self.new( configuration.global, @@ -86,71 +272,104 @@ InclusionMatcher.new(image_name, if_image_name_on), bucket, path_spec, public_access, cache_control, - prefix + prefix, + cache_root ) end - def initialize(global, image_name, matcher, bucket, path_spec, public_access, cache_control, prefix) + def initialize(global, image_name, matcher, bucket, path_spec, public_access, cache_control, prefix, cache_root) super global, image_name, matcher @bucket = bucket @path_spec = path_spec @public_access = public_access @cache_control = cache_control @prefix = prefix + + @cache_root = nil + begin + if cache_root + @cache_root = CacheRoot.new(cache_root) + log.info "using S3 object cache directory '#{cache_root}' for image '#{image_name}'" + else + log.info "S3 object cache not configured (no cache-root) for image '#{image_name}'" + end + rescue CacheRoot::CacheRootNotDirError => error + log.warn "not using S3 object cache for image '#{image_name}': #{error}" + end + local :bucket, @bucket end def client @global.s3 or raise S3NotConfiguredError end def url(object) if @public_access - object.public_url.to_s + object.public_url else - object.url_for(:read, expires: 60 * 60 * 24 * 365 * 20).to_s # expire in 20 years + object.private_url end end def object(path) begin - bucket = client.buckets[@bucket] - yield bucket.objects[@prefix + path] + key = @prefix + path + image = nil + + if @cache_root + begin + @cache_root.open(@bucket, key) do |cahce_file_io| + CacheObject.new(cahce_file_io, client, @bucket, key) do |obj| + image = yield obj + end + end + rescue IOError => error + log.warn "cannot use S3 object cache '#{@cache_root.cache_file(@bucket, key)}': #{error}" + image = yield obj + end + else + image = yield S3Object.new(client, @bucket, key) + end rescue AWS::S3::Errors::AccessDenied raise S3AccessDenied.new(@bucket, path) rescue AWS::S3::Errors::NoSuchBucket raise S3NoSuchBucketError.new(@bucket) rescue AWS::S3::Errors::NoSuchKey raise S3NoSuchKeyError.new(@bucket, path) end + image end + + S3SourceStoreBase.logger = Handler.logger_for(S3SourceStoreBase) + CacheObject.logger = S3SourceStoreBase.logger_for(CacheObject) end class S3Source < S3SourceStoreBase def self.match(node) node.name == 'source_s3' end def self.parse(configuration, node) - configuration.image_sources << super + configuration.sources << super end def realize(request_state) put_sourced_named_image(request_state) do |image_name, rendered_path| log.info "sourcing '#{image_name}' image from S3 '#{@bucket}' bucket under '#{rendered_path}' key" object(rendered_path) do |object| data = request_state.memory_limit.get do |limit| - object.read range: 0..(limit + 1) + object.read(limit + 1) end S3SourceStoreBase.stats.incr_total_s3_source S3SourceStoreBase.stats.incr_total_s3_source_bytes(data.bytesize) - image = Image.new(data, object.head[:content_type]) + image = Image.new(data, object.content_type) image.source_url = url(object) image end end end @@ -191,7 +410,6 @@ end end Handler::register_node_parser S3Store StatsReporter << S3SourceStoreBase.stats end -