You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

731 lines
23 KiB
Python

"""
Image Process
=============
This plugin process images according to their class attribute.
"""
import codecs
import collections
import copy
import functools
import html
import logging
import os.path
import posixpath
import re
import shutil
import subprocess
import sys
from PIL import Image, ImageFilter, UnidentifiedImageError
from bs4 import BeautifulSoup
import six
from six.moves.urllib_parse import unquote, urlparse
from six.moves.urllib_request import pathname2url, url2pathname
from pelican import __version__ as pelican_version
from pelican import signals
log = logging.getLogger(__name__)
IMAGE_PROCESS_REGEX = re.compile("image-process-[-a-zA-Z0-9_]+")
PELICAN_MAJOR_VERSION = int(pelican_version.split(".")[0])
Path = collections.namedtuple("Path", ["base_url", "source", "base_path", "filename"])
# A lot of inspiration from pyexiftool (https://github.com/smarnach/pyexiftool)
class ExifTool(object):
errors = "strict"
sentinel = b"{ready}"
block_size = 4096
_instance = None
@staticmethod
def start_exiftool():
if shutil.which("exiftool") is None:
log.warning(
"[image_process] EXIF tags will not be copied because "
"the exiftool program could not be found. "
"Please install exiftool and make sure it is in your path."
)
else:
ExifTool._instance = ExifTool()
@staticmethod
def copy_tags(src, dst):
if ExifTool._instance is not None:
ExifTool._instance._copy_tags(src, dst)
@staticmethod
def stop_exiftool():
ExifTool._instance = None
def __init__(self):
self.encoding = sys.getfilesystemencoding()
if self.encoding != "mbcs":
try:
codecs.lookup_error("surrogateescape")
except LookupError:
pass
with open(os.devnull, "w") as devnull:
self.process = subprocess.Popen(
[
"exiftool",
"-stay_open",
"True",
"-@",
"-",
"-common_args",
"-G",
"-n",
],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=devnull,
)
def __del__(self):
if self.process is not None:
self.process.terminate()
def _copy_tags(self, src, dst):
params = (
b"-TagsFromFile",
src.encode(self.encoding, ExifTool.errors),
b'"-all:all>all:all"',
dst.encode(self.encoding, ExifTool.errors),
)
self._send_command(params)
params = (b"-delete_original!", dst.encode(self.encoding, ExifTool.errors))
self._send_command(params)
def _send_command(self, params):
self.process.stdin.write(b"\n".join(params + (b"-j\n", b"-execute\n")))
self.process.stdin.flush()
output = b""
fd = self.process.stdout.fileno()
while not output.strip().endswith(ExifTool.sentinel):
output += os.read(fd, ExifTool.block_size)
exiftool_result = output.strip()[: -len(ExifTool.sentinel)]
log.debug(
"[image_process] exiftool result: {}".format(
exiftool_result.decode("utf-8")
)
)
def convert_box(image, top, left, right, bottom):
"""Convert box coordinates strings to integer.
t, l, r, b (top, left, right, bottom) must be strings specifying
either a number or a percentage.
"""
bbox = image.getbbox()
img_width = bbox[2] - bbox[0]
img_height = bbox[3] - bbox[1]
if top[-1] == "%":
top = img_height * float(top[:-1]) / 100.0
else:
top = float(top)
if left[-1] == "%":
left = img_width * float(left[:-1]) / 100.0
else:
left = float(left)
if right[-1] == "%":
right = img_width * float(right[:-1]) / 100.0
else:
right = float(right)
if bottom[-1] == "%":
bottom = img_height * float(bottom[:-1]) / 100.0
else:
bottom = float(bottom)
return (top, left, right, bottom)
def crop(i, left, top, right, bottom):
"""Crop image i to the box (left, top)-(right, bottom).
left, top, right, bottom must be strings specifying
either a number or a percentage.
"""
top, left, right, bottom = convert_box(i, top, left, right, bottom)
return i.crop((int(left), int(top), int(right), int(bottom)))
def resize(i, w, h):
"""Resize the image to the dimension specified.
w, h (width, height) must be strings specifying either a number
or a percentage.
"""
_, _, w, h = convert_box(i, "0", "0", w, h)
if i.mode == "P":
i = i.convert("RGBA")
elif i.mode == "1":
i = i.convert("L")
return i.resize((int(w), int(h)), Image.LANCZOS)
def scale(i, w, h, upscale, inside):
"""Resize the image to the dimension specified, keeping the aspect
ratio.
w, h (width, height) must be strings specifying either a number
or a percentage, or "None" to ignore this constraint.
If upscale is True, upscaling is allowed.
If inside is True, the resulting image will not be larger than the
dimensions specified, else it will not be smaller.
"""
bbox = i.getbbox()
iw = bbox[2] - bbox[0]
ih = bbox[3] - bbox[1]
if w == "None":
w = 1.0
elif w[-1] == "%":
w = float(w[:-1]) / 100.0
else:
w = float(w) / iw
if h == "None":
h = 1.0
elif h[-1] == "%":
h = float(h[:-1]) / 100.0
else:
h = float(h) / ih
if inside:
scale = min(w, h)
else:
scale = max(w, h)
if upscale in [0, "0", "False", False]:
scale = min(scale, 1.0)
if i.mode == "P":
i = i.convert("RGBA")
elif i.mode == "1":
i = i.convert("L")
return i.resize((int(scale * iw), int(scale * ih)), Image.LANCZOS)
def rotate(i, degrees):
if i.mode == "P":
i = i.convert("RGBA")
elif i.mode == "1":
i = i.convert("L")
# rotate does not support the LANCZOS filter (Pillow 2.7.0).
return i.rotate(int(degrees), Image.BICUBIC, True)
def apply_filter(i, f):
if i.mode == "P":
i = i.convert("RGBA")
elif i.mode == "1":
i = i.convert("L")
return i.filter(f)
basic_ops = {
"crop": crop,
"flip_horizontal": lambda i: i.transpose(Image.FLIP_LEFT_RIGHT),
"flip_vertical": lambda i: i.transpose(Image.FLIP_TOP_BOTTOM),
"grayscale": lambda i: i.convert("L"),
"resize": resize,
"rotate": rotate,
"scale_in": functools.partial(scale, inside=True),
"scale_out": functools.partial(scale, inside=False),
"blur": functools.partial(apply_filter, f=ImageFilter.BLUR),
"contour": functools.partial(apply_filter, f=ImageFilter.CONTOUR),
"detail": functools.partial(apply_filter, f=ImageFilter.DETAIL),
"edge_enhance": functools.partial(apply_filter, f=ImageFilter.EDGE_ENHANCE),
"edge_enhance_more": functools.partial(
apply_filter, f=ImageFilter.EDGE_ENHANCE_MORE
),
"emboss": functools.partial(apply_filter, f=ImageFilter.EMBOSS),
"find_edges": functools.partial(apply_filter, f=ImageFilter.FIND_EDGES),
"smooth": functools.partial(apply_filter, f=ImageFilter.SMOOTH),
"smooth_more": functools.partial(apply_filter, f=ImageFilter.SMOOTH_MORE),
"sharpen": functools.partial(apply_filter, f=ImageFilter.SHARPEN),
}
def harvest_images(path, context):
log.debug("[image_process] harvesting %r", path)
# Set default value for 'IMAGE_PROCESS_DIR'.
if "IMAGE_PROCESS_DIR" not in context:
context["IMAGE_PROCESS_DIR"] = "derivatives"
# Set default value for 'IMAGE_PROCESS_ENCODING'
if "IMAGE_PROCESS_ENCODING" not in context:
context["IMAGE_PROCESS_ENCODING"] = "utf-8"
# Set default value for 'IMAGE_PROCESS_COPY_EXIF_TAGS'
if "IMAGE_PROCESS_COPY_EXIF_TAGS" not in context:
context["IMAGE_PROCESS_COPY_EXIF_TAGS"] = False
with open(path, "r+", encoding=context["IMAGE_PROCESS_ENCODING"]) as f:
res = harvest_images_in_fragment(f, context)
f.seek(0)
f.truncate()
f.write(res)
def harvest_feed_images(path, context, feed):
# Set default value for 'IMAGE_PROCESS_DIR'.
if "IMAGE_PROCESS_DIR" not in context:
context["IMAGE_PROCESS_DIR"] = "derivatives"
# Set default value for 'IMAGE_PROCESS_ENCODING'
if "IMAGE_PROCESS_ENCODING" not in context:
context["IMAGE_PROCESS_ENCODING"] = "utf-8"
# Set default value for 'IMAGE_PROCESS_COPY_EXIF_TAGS'
if "IMAGE_PROCESS_COPY_EXIF_TAGS" not in context:
context["IMAGE_PROCESS_COPY_EXIF_TAGS"] = False
with open(path, "r+", encoding=context["IMAGE_PROCESS_ENCODING"]) as f:
soup = BeautifulSoup(f, "xml")
for content in soup.find_all("content"):
if content["type"] != "html":
continue
doc = html.unescape(content.string)
res = harvest_images_in_fragment(doc, context)
content.string = res
f.seek(0)
f.truncate()
f.write(str(soup))
def harvest_images_in_fragment(fragment, settings):
parser = settings.get("IMAGE_PROCESS_PARSER", "html.parser")
soup = BeautifulSoup(fragment, parser)
copy_exif_tags = settings.get("IMAGE_PROCESS_COPY_EXIF_TAGS", False)
if copy_exif_tags:
ExifTool.start_exiftool()
for img in soup.find_all("img", class_=IMAGE_PROCESS_REGEX):
for c in img["class"]:
if c.startswith("image-process-"):
derivative = c[14:]
break
else:
continue
try:
d = settings["IMAGE_PROCESS"][derivative]
except KeyError:
raise RuntimeError("Derivative %s undefined." % derivative)
if isinstance(d, list):
# Single source image specification.
process_img_tag(img, settings, derivative)
elif not isinstance(d, dict):
raise RuntimeError(
"Derivative %s definition not handled"
"(must be list or dict)" % (derivative)
)
elif "type" not in d:
raise RuntimeError('"type" is mandatory for %s.' % derivative)
elif d["type"] == "image":
# Single source image specification.
process_img_tag(img, settings, derivative)
elif d["type"] == "responsive-image" and "srcset" not in img.attrs:
# srcset image specification.
build_srcset(img, settings, derivative)
elif d["type"] == "picture":
# Multiple source (picture) specification.
group = img.find_parent()
if group.name == "div":
convert_div_to_picture_tag(soup, img, group, settings, derivative)
elif group.name == "picture":
process_picture(soup, img, group, settings, derivative)
ExifTool.stop_exiftool()
return str(soup)
def compute_paths(img, settings, derivative):
process_dir = settings["IMAGE_PROCESS_DIR"]
img_src = urlparse(img["src"])
img_src_path = url2pathname(img_src.path.lstrip("/"))
img_src_dirname, filename = os.path.split(img_src_path)
derivative_path = os.path.join(process_dir, derivative)
# urljoin truncates leading ../ elements
base_url = posixpath.join(
posixpath.dirname(img["src"]), pathname2url(derivative_path)
)
if PELICAN_MAJOR_VERSION < 4:
file_paths = settings["filenames"]
else:
file_paths = settings["static_content"]
for f, contobj in file_paths.items():
save_as = contobj.get_url_setting("save_as")
# save_as can be set to empty string, which would match everything
if save_as and img_src_path.endswith(save_as):
source = contobj.source_path
base_path = os.path.join(
contobj.settings["OUTPUT_PATH"],
os.path.dirname(contobj.get_url_setting("save_as")),
process_dir,
derivative,
)
break
else:
if "SITEURL" in settings:
site_url = urlparse(settings["SITEURL"])
site_url_path = url2pathname(site_url.path[1:])
else:
# if SITEURL is undefined, don't break!
site_url_path = None
if site_url_path:
src_path = img_src_path.partition(site_url_path)[2].lstrip("/")
else:
src_path = img_src_path.lstrip("/")
source = os.path.join(settings["PATH"], src_path)
base_path = os.path.join(
settings["OUTPUT_PATH"], os.path.dirname(src_path), derivative_path
)
return Path(base_url, source, base_path, filename)
def process_img_tag(img, settings, derivative):
path = compute_paths(img, settings, derivative)
if not is_img_identifiable(path.source):
log.warn(
"[image_process] Skipping image %s that could not be identified by Pillow",
path.source,
)
return
process = settings["IMAGE_PROCESS"][derivative]
img["src"] = posixpath.join(path.base_url, path.filename)
destination = os.path.join(path.base_path, path.filename)
if not isinstance(process, list):
process = process["ops"]
process_image((path.source, destination, process), settings)
def is_img_identifiable(img_filepath):
try:
Image.open(img_filepath)
return True
except (FileNotFoundError, UnidentifiedImageError):
return False
def build_srcset(img, settings, derivative):
path = compute_paths(img, settings, derivative)
if not is_img_identifiable(path.source):
log.warn(
"[image_process] Skipping image %s that could not be identified by Pillow",
path.source,
)
return
process = settings["IMAGE_PROCESS"][derivative]
default = process["default"]
if isinstance(default, six.string_types):
breakpoints = {i for i, _ in process["srcset"]}
if default not in breakpoints:
log.error(
'[image_process] srcset "%s" does not define default "%s"',
derivative,
default,
)
default_name = default
elif isinstance(default, list):
default_name = "default"
destination = os.path.join(path.base_path, default_name, path.filename)
process_image((path.source, destination, default), settings)
img["src"] = posixpath.join(path.base_url, default_name, path.filename)
if "sizes" in process:
img["sizes"] = process["sizes"]
srcset = []
for src in process["srcset"]:
file_path = posixpath.join(path.base_url, src[0], path.filename)
srcset.append("%s %s" % (file_path, src[0]))
destination = os.path.join(path.base_path, src[0], path.filename)
process_image((path.source, destination, src[1]), settings)
if len(srcset) > 0:
img["srcset"] = ", ".join(srcset)
def convert_div_to_picture_tag(soup, img, group, settings, derivative):
"""
Convert a div containing multiple images to a picture.
"""
process_dir = settings["IMAGE_PROCESS_DIR"]
# Compile sources URL. Special source "default" uses the main
# image URL. Other sources use the img with classes
# [source['name'], 'image-process']. We also remove the img from
# the DOM.
sources = copy.deepcopy(settings["IMAGE_PROCESS"][derivative]["sources"])
for s in sources:
if s["name"] == "default":
s["url"] = img["src"]
else:
candidates = group.find_all("img", class_=s["name"])
for candidate in candidates:
if "image-process" in candidate["class"]:
s["url"] = candidate["src"]
candidate.decompose()
break
url_path, s["filename"] = os.path.split(s["url"])
s["base_url"] = os.path.join(url_path, process_dir, derivative)
s["base_path"] = os.path.join(settings["OUTPUT_PATH"], s["base_url"][1:])
# If default is not None, change default img source to the image
# derivative referenced.
default = settings["IMAGE_PROCESS"][derivative]["default"]
if default is not None:
default_source_name = default[0]
default_source = None
for s in sources:
if s["name"] == default_source_name:
default_source = s
break
if default_source is None:
raise RuntimeError(
'No source matching "%s", referenced in default setting.',
(default_source_name,),
)
if isinstance(default[1], six.string_types):
default_item_name = default[1]
elif isinstance(default[1], list):
default_item_name = "default"
source = os.path.join(settings["PATH"], default_source["url"][1:])
destination = os.path.join(
s["base_path"],
default_source_name,
default_item_name,
default_source["filename"],
)
process_image((source, destination, default[1]), settings)
# Change img src to url of default processed image.
img["src"] = os.path.join(
s["base_url"],
default_source_name,
default_item_name,
default_source["filename"],
)
# Create picture tag.
picture_tag = soup.new_tag("picture")
for s in sources:
# Create new <source>
source_attrs = {k: s[k] for k in s if k in ["media", "sizes"]}
source_tag = soup.new_tag("source", **source_attrs)
srcset = []
for src in s["srcset"]:
srcset.append(
"%s %s"
% (
os.path.join(s["base_url"], s["name"], src[0], s["filename"]),
src[0],
)
)
source = os.path.join(settings["PATH"], s["url"][1:])
destination = os.path.join(s["base_path"], s["name"], src[0], s["filename"])
process_image((source, destination, src[1]), settings)
if len(srcset) > 0:
source_tag["srcset"] = ", ".join(srcset)
picture_tag.append(source_tag)
# Wrap img with <picture>
img.wrap(picture_tag)
def process_picture(soup, img, group, settings, derivative):
"""
Convert a simplified picture to a full HTML picture:
<picture>
<source class="source-1" src="image1.jpg"></source>
<source class="source-2" src="image2.jpg"></source>
<img class="image-process-picture" src="image3.jpg"></img>
</picture>
to
<picture>
<source srcset="...image1.jpg..." media="..." sizes="..."></source>
<source srcset="...image2.jpg..."></source>
<source srcset="...image3.jpg..." media="..." sizes="..."></source>
<img src=".../image3.jpg"></img>
</picture>
"""
process_dir = settings["IMAGE_PROCESS_DIR"]
process = settings["IMAGE_PROCESS"][derivative]
# Compile sources URL. Special source "default" uses the main
# image URL. Other sources use the <source> with classes
# source['name']. We also remove the <source>s from the DOM.
sources = copy.deepcopy(process["sources"])
for s in sources:
if s["name"] == "default":
s["url"] = img["src"]
source_attrs = {k: s[k] for k in s if k in ["media", "sizes"]}
s["element"] = soup.new_tag("source", **source_attrs)
else:
s["element"] = group.find("source", class_=s["name"]).extract()
s["url"] = s["element"]["src"]
del s["element"]["src"]
del s["element"]["class"]
url_path, s["filename"] = os.path.split(s["url"])
s["base_url"] = posixpath.join(url_path, process_dir, derivative)
s["base_path"] = os.path.join(settings["OUTPUT_PATH"], s["base_url"][1:])
# If default is not None, change default img source to the image
# derivative referenced.
default = process["default"]
if default is not None:
default_source_name = default[0]
default_source = None
for s in sources:
if s["name"] == default_source_name:
default_source = s
break
if default_source is None:
raise RuntimeError(
'No source matching "%s", referenced in default setting.',
(default_source_name,),
)
if isinstance(default[1], six.string_types):
default_item_name = default[1]
elif isinstance(default[1], list):
default_item_name = "default"
source = os.path.join(settings["PATH"], default_source["url"][1:])
destination = os.path.join(
s["base_path"],
default_source_name,
default_item_name,
default_source["filename"],
)
process_image((source, destination, default[1]), settings)
# Change img src to url of default processed image.
img["src"] = posixpath.join(
s["base_url"],
default_source_name,
default_item_name,
default_source["filename"],
)
# Generate srcsets and put back <source>s in <picture>.
for s in sources:
srcset = []
for src in s["srcset"]:
srcset.append(
"%s %s"
% (
posixpath.join(s["base_url"], s["name"], src[0], s["filename"]),
src[0],
)
)
source = os.path.join(settings["PATH"], s["url"][1:])
destination = os.path.join(s["base_path"], s["name"], src[0], s["filename"])
process_image((source, destination, src[1]), settings)
if len(srcset) > 0:
# Append source elements to the picture in the same order
# as they are found in
# settings['IMAGE_PROCESS'][derivative]['sources'].
s["element"]["srcset"] = ", ".join(srcset)
img.insert_before(s["element"])
def process_image(image, settings):
# Set default value for 'IMAGE_PROCESS_FORCE'.
if "IMAGE_PROCESS_FORCE" not in settings:
settings["IMAGE_PROCESS_FORCE"] = False
# remove URL encoding to get to physical filenames
image = list(image)
image[0] = unquote(image[0])
image[1] = unquote(image[1])
# image[2] is the transformation
log.debug("[image_process] {} -> {}".format(image[0], image[1]))
os.makedirs(os.path.dirname(image[1]), exist_ok=True)
# If original image is older than existing derivative, skip
# processing to save time, unless user explicitly forced
# image generation.
if (
settings["IMAGE_PROCESS_FORCE"]
or not os.path.exists(image[1])
or os.path.getmtime(image[0]) > os.path.getmtime(image[1])
):
i = Image.open(image[0])
for step in image[2]:
if hasattr(step, "__call__"):
i = step(i)
else:
elems = step.split(" ")
i = basic_ops[elems[0]](i, *(elems[1:]))
# `save_all=True` will allow saving multi-page (aka animated) GIF's
# however, turning it on seems to break PNG support, and doesn't seem
# to work on GIF's either...
i.save(image[1], progressive=True)
ExifTool.copy_tags(image[0], image[1])
def register():
signals.content_written.connect(harvest_images)
signals.feed_written.connect(harvest_feed_images)