Module styloexport.corpus

Functions

def bail_on_errors(wrapped: Callable, instance: Any, args: list, kwargs: dict) ‑> Any
Expand source code
@wrapt.decorator
def bail_on_errors(wrapped: Callable, instance: Any, args: list, kwargs: dict) -> Any:
    """Decorator to stop processing an article which contains errors."""
    if instance.errors:
        return
    return wrapped(*args, **kwargs)

Decorator to stop processing an article which contains errors.

Classes

class Corpus (edition: str,
edition_configuration: dict,
domain: str,
id: str,
slug: str,
title: str,
download_dir: pathlib.Path,
allowed_editions: List[str],
allowed_base_urls: List[str],
supported_images_extensions: List[str])
Expand source code
class Corpus:
    def __init__(
        self,
        edition: str,
        edition_configuration: dict,
        domain: str,
        id: str,
        slug: str,
        title: str,
        download_dir: Path,
        allowed_editions: List[str],
        allowed_base_urls: List[str],
        supported_images_extensions: List[str],
    ) -> None:
        self.edition = edition
        self.edition_configuration = edition_configuration
        self.domain = domain
        self.domain_slug = slugify(domain)
        self.id = id
        self.slug = slug
        self.name = title
        self.root_download_dir = download_dir
        self.download_dir = (
            download_dir / edition / self.domain_slug / "corpus" / f"{slug}-{id}"
        )
        self.download_dir.mkdir(parents=True, exist_ok=True)
        self.errors: List[str] = []
        self.metadata: Dict[str, str] = {}
        self.allowed_base_urls = allowed_base_urls
        self.allowed_editions = allowed_editions
        self._check_edition(allowed_editions)
        self._check_instance(allowed_base_urls)
        self.supported_images_extensions = supported_images_extensions
        self.articles: Optional[List[Article]] = []
        logger.debug(f"Creating corpus `{self.id}`")

    @property
    def filenames(self) -> dict:
        return {
            "bib": f"{self.slug}.bib",
            "md": f"{self.slug}.md",
            "yaml": f"{self.slug}.yaml",
        }

    def _check_edition(self, allowed_editions: List[str]) -> None:
        if self.edition not in allowed_editions:
            self.errors.append("Stylo edition not supported.")

    def _check_instance(self, allowed_base_urls: List[str]) -> None:
        if self.domain not in [get_domain_from_url(url) for url in allowed_base_urls]:
            self.errors.append("Stylo instance not supported.")

    @bail_on_errors
    def get_stylo_data(self) -> Optional[dict]:
        urls = dict((get_domain_from_url(url), url) for url in self.allowed_base_urls)
        url = urls[self.domain]
        styloapi = StyloAPI(url)

        try:
            response = styloapi.corpus(self.id)
        except httpx.HTTPStatusError as e:
            self.errors.append(e.args[0])
            return None

        data: dict = response.json()
        logger.debug(f"Fetched corpus with data `{data}`")

        if "errors" in data:
            for error in data["errors"]:
                self.errors.append(error["message"])
            logger.debug(f"Fetched corpus with errors `{data['errors']}`")
            return None

        corpus_data: dict = data["data"]["corpus"][0]

        self.name = self.name or corpus_data["name"]
        self.articles = [
            Article(
                self.edition,
                self.edition_configuration,
                self.domain,
                art["article"]["_id"],
                slugify(art["article"]["title"]),
                art["article"]["title"],
                self.root_download_dir,
                self.allowed_editions,
                self.allowed_base_urls,
                self.supported_images_extensions,
            )
            for art in corpus_data["articles"]
        ]
        return corpus_data

    def fetch_all(self, templates_folder: Path, force: bool = False) -> None:
        logger.debug(f"Fetching corpus `{self.id}`")
        self.metadata = self.get_stylo_data()
        logger.debug(f"Corpus data `{self.metadata}`")
        if self.metadata is not None and self.articles:
            for article in self.articles:
                article.fetch_all(templates_folder, force=force)

    def export(
        self,
        formats: List["str"],
        style_name: str,
        templates_folder: Path,
        with_toc: bool = False,
        with_ascii: bool = False,
        with_link_citations: bool = False,
        with_nocite: bool = False,
    ) -> Path:
        zip_file_path = self.download_dir / (
            f"{self.slug}-{self.id}-{'-'.join(sorted(formats))}-"
            f"toc-{int(with_toc)}-ascii-{int(with_ascii)}-"
            f"link-citations-{int(with_link_citations)}-"
            f"nocite-{int(with_nocite)}.zip"
        )
        # Ensure there is no on-disk cache for the generated zip file.
        zip_file_path.unlink(missing_ok=True)

        try:
            self.generate_archive(
                zip_file_path,
                formats,
                style_name,
                templates_folder,
                with_toc,
                with_ascii,
                with_link_citations,
                with_nocite,
            )
        except httpx.HTTPStatusError as exc:
            error = f"Erreur pendant la génération de « {self.name} »"
            if hasattr(exc, "url"):
                error += f" ({exc.url})"
            if hasattr(exc, "detail"):
                error += f" : {exc.detail}"
            self.errors.append(error)

        return zip_file_path

    def _write_to_archive(
        self, archive: ZipFile, file_name: Path, relative_to: Optional[Path] = None
    ) -> None:
        """A wrapper to generate correct relative paths within the archive."""
        arcname = file_name.relative_to(relative_to or self.download_dir.parent)
        if str(arcname) not in archive.namelist():
            archive.write(file_name, arcname=arcname)

    def add_metadata(self, zip_file_path: Path) -> None:
        yaml_file_path = self.download_dir.parent / "corpus.yaml"
        yaml_content = yaml.safe_dump_all(
            [{}, self.metadata, {}], default_flow_style=False
        )
        # We remove the added empty dicts necessary to retrieve the same structure
        # (one YAML document within a multiple one structure ---, dunno why).
        yaml_content = yaml_content[3:-3]
        yaml_file_path.write_text(yaml_content)
        with ZipFile(zip_file_path, mode="a") as archive:
            self._write_to_archive(archive, yaml_file_path)

    def generate_archive(
        self,
        zip_file_path: Path,
        formats: List["str"],
        style_name: str,
        templates_folder: Path,
        with_toc: bool,
        with_ascii: bool,
        with_link_citations: bool,
        with_nocite: bool,
    ) -> None:
        if self.metadata is None or not self.articles:
            return

        for article in self.articles:
            article.generate_archive(
                zip_file_path,
                formats,
                style_name,
                templates_folder,
                with_toc,
                with_ascii,
                with_link_citations,
                with_nocite,
            )

        logger.debug(
            f"Archive generated for corpus `{self.id}` with formats `{formats}`"
        )

        self.add_metadata(zip_file_path)

        full_formats = [format for format in formats if "full" in format]

        if not full_formats:
            return

        csl_file_path = get_env_var("SE_STYLES_DIR") / f"{style_name}.csl"

        bib_full = ""
        md_full = ""

        images_config_path = self.edition_configuration.get("images_path", "images")
        zip_images_path = self.download_dir / "images.zip"
        with ZipFile(zip_images_path, mode="a") as archive:
            for article in self.articles:
                md_content = article.md_original_file_content
                for image in article.images:
                    md_content = md_content.replace(
                        # Useful in case of image URL redirection.
                        image.get("original_url", image["url"]),
                        f"{article.slug}-{article.id}/{images_config_path}/{image['name']}",
                    )
                for image_path in article.image_paths:
                    arcname = image_path.relative_to(
                        self.download_dir.parent.parent / "articles"
                    )
                    if str(arcname) not in archive.namelist():
                        archive.write(image_path, arcname=arcname)
                        logger.debug(f"Archived image `{image_path}`")
                bib_full += article.bib_original_file_content + "\n\n"
                md_full += md_content + "\n\n"

        bib_file_path = self.download_dir / self.filenames["bib"]
        bib_file_path.write_text(bib_full)
        md_file_path = self.download_dir / self.filenames["md"]
        md_file_path.write_text(md_full)
        # For now, use corpus yaml instead of the aggregation of articles’ ones.
        yaml_file_path = self.download_dir.parent / "corpus.yaml"

        pandocapi = PandocAPI(md_file_path, yaml_file_path, bib_file_path)

        with ZipFile(zip_file_path, mode="a") as archive:
            # We want images for all kinds of exports.
            for article in self.articles:
                if not article.images:
                    continue
                images_path = article.download_dir / images_config_path
                images_path_corpus = (
                    self.download_dir
                    / f"{article.slug}-{article.id}"
                    / images_config_path
                )
                images_path_corpus.mkdir(parents=True, exist_ok=True)
                shutil.copytree(images_path, images_path_corpus, dirs_exist_ok=True)
                for image in article.images:
                    image_file_path = images_path_corpus / image["name"]
                    self._write_to_archive(archive, image_file_path)

            if "html-full" in formats:
                export_file_name = f"{self.slug}.html"
                export_file_path = self.download_dir / export_file_name
                template_file_path = templates_folder / "templateHtml5.html5"
                response = pandocapi.html(
                    export_file_name,
                    template_file_path,
                    csl_file_path,
                    with_toc,
                    with_ascii,
                    with_link_citations,
                    with_nocite,
                    style_name,
                )
                export_file_path.write_bytes(response.content)
                self._write_to_archive(archive, export_file_path)
            if "tex-full" in formats:
                export_file_name = f"{self.slug}.tex"
                export_file_path = self.download_dir / export_file_name
                template_file_path = templates_folder / "templateLaTeX.latex"
                response = pandocapi.tex(
                    export_file_name,
                    template_file_path,
                    csl_file_path,
                    with_toc,
                    with_link_citations,
                    with_nocite,
                    style_name,
                )
                export_file_path.write_bytes(response.content)
                self._write_to_archive(archive, export_file_path)
            if "pdf-full" in formats:
                export_file_name = f"{self.slug}.pdf"
                export_file_path = self.download_dir / export_file_name
                images_file_path = self.download_dir / "images.zip"
                template_file_path = templates_folder / "templateLaTeX.latex"
                response = pandocapi.pdf(
                    export_file_name,
                    images_file_path,
                    template_file_path,
                    csl_file_path,
                    with_toc,
                    with_link_citations,
                    with_nocite,
                    style_name,
                )
                export_file_path.write_bytes(response.content)
                self._write_to_archive(archive, export_file_path)
            if "docx-full" in formats:
                export_file_name = f"{self.slug}.docx"
                export_file_path = self.download_dir / export_file_name
                images_file_path = self.download_dir / "images.zip"
                response = pandocapi.docx(
                    export_file_name,
                    images_file_path,
                    csl_file_path,
                    with_toc,
                    with_link_citations,
                    with_nocite,
                    style_name,
                )
                export_file_path.write_bytes(response.content)
                self._write_to_archive(archive, export_file_path)

Instance variables

prop filenames : dict
Expand source code
@property
def filenames(self) -> dict:
    return {
        "bib": f"{self.slug}.bib",
        "md": f"{self.slug}.md",
        "yaml": f"{self.slug}.yaml",
    }

Methods

def add_metadata(self, zip_file_path: pathlib.Path) ‑> None
Expand source code
def add_metadata(self, zip_file_path: Path) -> None:
    yaml_file_path = self.download_dir.parent / "corpus.yaml"
    yaml_content = yaml.safe_dump_all(
        [{}, self.metadata, {}], default_flow_style=False
    )
    # We remove the added empty dicts necessary to retrieve the same structure
    # (one YAML document within a multiple one structure ---, dunno why).
    yaml_content = yaml_content[3:-3]
    yaml_file_path.write_text(yaml_content)
    with ZipFile(zip_file_path, mode="a") as archive:
        self._write_to_archive(archive, yaml_file_path)
def export(self,
formats: List[ForwardRef('str')],
style_name: str,
templates_folder: pathlib.Path,
with_toc: bool = False,
with_ascii: bool = False,
with_link_citations: bool = False,
with_nocite: bool = False) ‑> pathlib.Path
Expand source code
def export(
    self,
    formats: List["str"],
    style_name: str,
    templates_folder: Path,
    with_toc: bool = False,
    with_ascii: bool = False,
    with_link_citations: bool = False,
    with_nocite: bool = False,
) -> Path:
    zip_file_path = self.download_dir / (
        f"{self.slug}-{self.id}-{'-'.join(sorted(formats))}-"
        f"toc-{int(with_toc)}-ascii-{int(with_ascii)}-"
        f"link-citations-{int(with_link_citations)}-"
        f"nocite-{int(with_nocite)}.zip"
    )
    # Ensure there is no on-disk cache for the generated zip file.
    zip_file_path.unlink(missing_ok=True)

    try:
        self.generate_archive(
            zip_file_path,
            formats,
            style_name,
            templates_folder,
            with_toc,
            with_ascii,
            with_link_citations,
            with_nocite,
        )
    except httpx.HTTPStatusError as exc:
        error = f"Erreur pendant la génération de « {self.name} »"
        if hasattr(exc, "url"):
            error += f" ({exc.url})"
        if hasattr(exc, "detail"):
            error += f" : {exc.detail}"
        self.errors.append(error)

    return zip_file_path
def fetch_all(self, templates_folder: pathlib.Path, force: bool = False) ‑> None
Expand source code
def fetch_all(self, templates_folder: Path, force: bool = False) -> None:
    logger.debug(f"Fetching corpus `{self.id}`")
    self.metadata = self.get_stylo_data()
    logger.debug(f"Corpus data `{self.metadata}`")
    if self.metadata is not None and self.articles:
        for article in self.articles:
            article.fetch_all(templates_folder, force=force)
def generate_archive(self,
zip_file_path: pathlib.Path,
formats: List[ForwardRef('str')],
style_name: str,
templates_folder: pathlib.Path,
with_toc: bool,
with_ascii: bool,
with_link_citations: bool,
with_nocite: bool) ‑> None
Expand source code
def generate_archive(
    self,
    zip_file_path: Path,
    formats: List["str"],
    style_name: str,
    templates_folder: Path,
    with_toc: bool,
    with_ascii: bool,
    with_link_citations: bool,
    with_nocite: bool,
) -> None:
    if self.metadata is None or not self.articles:
        return

    for article in self.articles:
        article.generate_archive(
            zip_file_path,
            formats,
            style_name,
            templates_folder,
            with_toc,
            with_ascii,
            with_link_citations,
            with_nocite,
        )

    logger.debug(
        f"Archive generated for corpus `{self.id}` with formats `{formats}`"
    )

    self.add_metadata(zip_file_path)

    full_formats = [format for format in formats if "full" in format]

    if not full_formats:
        return

    csl_file_path = get_env_var("SE_STYLES_DIR") / f"{style_name}.csl"

    bib_full = ""
    md_full = ""

    images_config_path = self.edition_configuration.get("images_path", "images")
    zip_images_path = self.download_dir / "images.zip"
    with ZipFile(zip_images_path, mode="a") as archive:
        for article in self.articles:
            md_content = article.md_original_file_content
            for image in article.images:
                md_content = md_content.replace(
                    # Useful in case of image URL redirection.
                    image.get("original_url", image["url"]),
                    f"{article.slug}-{article.id}/{images_config_path}/{image['name']}",
                )
            for image_path in article.image_paths:
                arcname = image_path.relative_to(
                    self.download_dir.parent.parent / "articles"
                )
                if str(arcname) not in archive.namelist():
                    archive.write(image_path, arcname=arcname)
                    logger.debug(f"Archived image `{image_path}`")
            bib_full += article.bib_original_file_content + "\n\n"
            md_full += md_content + "\n\n"

    bib_file_path = self.download_dir / self.filenames["bib"]
    bib_file_path.write_text(bib_full)
    md_file_path = self.download_dir / self.filenames["md"]
    md_file_path.write_text(md_full)
    # For now, use corpus yaml instead of the aggregation of articles’ ones.
    yaml_file_path = self.download_dir.parent / "corpus.yaml"

    pandocapi = PandocAPI(md_file_path, yaml_file_path, bib_file_path)

    with ZipFile(zip_file_path, mode="a") as archive:
        # We want images for all kinds of exports.
        for article in self.articles:
            if not article.images:
                continue
            images_path = article.download_dir / images_config_path
            images_path_corpus = (
                self.download_dir
                / f"{article.slug}-{article.id}"
                / images_config_path
            )
            images_path_corpus.mkdir(parents=True, exist_ok=True)
            shutil.copytree(images_path, images_path_corpus, dirs_exist_ok=True)
            for image in article.images:
                image_file_path = images_path_corpus / image["name"]
                self._write_to_archive(archive, image_file_path)

        if "html-full" in formats:
            export_file_name = f"{self.slug}.html"
            export_file_path = self.download_dir / export_file_name
            template_file_path = templates_folder / "templateHtml5.html5"
            response = pandocapi.html(
                export_file_name,
                template_file_path,
                csl_file_path,
                with_toc,
                with_ascii,
                with_link_citations,
                with_nocite,
                style_name,
            )
            export_file_path.write_bytes(response.content)
            self._write_to_archive(archive, export_file_path)
        if "tex-full" in formats:
            export_file_name = f"{self.slug}.tex"
            export_file_path = self.download_dir / export_file_name
            template_file_path = templates_folder / "templateLaTeX.latex"
            response = pandocapi.tex(
                export_file_name,
                template_file_path,
                csl_file_path,
                with_toc,
                with_link_citations,
                with_nocite,
                style_name,
            )
            export_file_path.write_bytes(response.content)
            self._write_to_archive(archive, export_file_path)
        if "pdf-full" in formats:
            export_file_name = f"{self.slug}.pdf"
            export_file_path = self.download_dir / export_file_name
            images_file_path = self.download_dir / "images.zip"
            template_file_path = templates_folder / "templateLaTeX.latex"
            response = pandocapi.pdf(
                export_file_name,
                images_file_path,
                template_file_path,
                csl_file_path,
                with_toc,
                with_link_citations,
                with_nocite,
                style_name,
            )
            export_file_path.write_bytes(response.content)
            self._write_to_archive(archive, export_file_path)
        if "docx-full" in formats:
            export_file_name = f"{self.slug}.docx"
            export_file_path = self.download_dir / export_file_name
            images_file_path = self.download_dir / "images.zip"
            response = pandocapi.docx(
                export_file_name,
                images_file_path,
                csl_file_path,
                with_toc,
                with_link_citations,
                with_nocite,
                style_name,
            )
            export_file_path.write_bytes(response.content)
            self._write_to_archive(archive, export_file_path)
def get_stylo_data(self) ‑> dict | None
Expand source code
@bail_on_errors
def get_stylo_data(self) -> Optional[dict]:
    urls = dict((get_domain_from_url(url), url) for url in self.allowed_base_urls)
    url = urls[self.domain]
    styloapi = StyloAPI(url)

    try:
        response = styloapi.corpus(self.id)
    except httpx.HTTPStatusError as e:
        self.errors.append(e.args[0])
        return None

    data: dict = response.json()
    logger.debug(f"Fetched corpus with data `{data}`")

    if "errors" in data:
        for error in data["errors"]:
            self.errors.append(error["message"])
        logger.debug(f"Fetched corpus with errors `{data['errors']}`")
        return None

    corpus_data: dict = data["data"]["corpus"][0]

    self.name = self.name or corpus_data["name"]
    self.articles = [
        Article(
            self.edition,
            self.edition_configuration,
            self.domain,
            art["article"]["_id"],
            slugify(art["article"]["title"]),
            art["article"]["title"],
            self.root_download_dir,
            self.allowed_editions,
            self.allowed_base_urls,
            self.supported_images_extensions,
        )
        for art in corpus_data["articles"]
    ]
    return corpus_data