Source code for filehooks

#  These pylint ignores were put in place to make the state of the code
#  at the introduction of automated checks work.
#  The code should be refactored to remove these issues.
#  If an issue which is warned about without these ignores cannot
#  or should not be fixed, the ignore should be placed at that specific location.
# pylint: disable=too-many-lines,too-many-locals,too-many-instance-attributes,too-many-arguments

# pylint: disable=fixme
# TODO: in addition to the problems described above, a few things should be fixable quickly.
#  These are marked with TODO. Fix them and re-enable the TODO warning above.

"""
This module can be installed within GitLab
to handle events in its projects.
Its purpose is checking for the presence of metadata
describing the repository's content
and putting valid metadata into an elasticsearch index.
"""

import base64
import io
import json
import logging
import os
import pathlib
import re
import smtplib
import ssl
import sys
from configparser import ConfigParser
from copy import copy
from dataclasses import InitVar, dataclass, field
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from enum import Enum
from json.decoder import JSONDecodeError
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple
from urllib.parse import urlparse

import gitlab  # type: ignore
import jsonschema  # type: ignore

import requests
import ruamel.yaml
from elasticsearch import ConflictError, Elasticsearch, NotFoundError
from gitlab import Gitlab, GitlabGetError
from collections import namedtuple
from gitlab.v4.objects import (  # type: ignore
    Group,
    Project,
    ProjectCommit,
    ProjectFile,
    User,
)
from jinja2 import Environment, FileSystemLoader
from jsonschema import ValidationError, draft7_format_checker
from ruamel.yaml.parser import ParserError
from ruamel.yaml.scanner import ScannerError

MISSING_METADATA_MSG = (
    "Metadata file is missing! Please, provide a metadata file! You may use "
    "either 'metadata.yml', 'metadata.yaml' or 'metadata.json'!"
)
HELP_MSG = "For more information about the expected metadata file see {}."
MAIN_MSG = "The health check for your commit failed! The following errors were found:"
PATH_MAIL_TEMPLATE = "templates/template.html"

logger = logging.getLogger(__name__)

DIR = pathlib.Path(os.path.dirname(__file__))
SCHEMA_METADATA = (DIR / "schemas/gitlab_metadata.schema.json").read_text(encoding="utf-8")
MAP_EXTENSION2FILE_TYPE = json.loads(
    (DIR / "schemas/extension2file_type.json").read_text(encoding="utf-8")
)
ES_SCHEMA_METADATA = (DIR / "schemas/es_metadata.schema.json").read_text(encoding="utf-8")

BRANCH_PRIORITY = ["main", "master"]
METADATA_FILENAMES = ["metadata.yaml", "metadata.yml", "metadata.json"]
METADATA_INDEX = "metadata"
GITLAB_MAIN_GROUP = "sharing"


[docs]class ConfigType(Enum): """ Enum for choosing the desired configuration """ PRODUCTION = 0 TEST = 1 DEBUG = 2 STAGING = 3 LOCAL = 4
[docs]@dataclass() class Mail: """ Class for sending e-mails about validation errors. """ username: str password: str address: str mail_from: str host: str port: int homepage: str
[docs] @classmethod def from_dict(cls, config: Dict[str, Any]) -> Any: """ Creates an instance of Mail from the given config. :param config: configuration to create the class from :return: the created Mail instance """ return cls( config["username"], config["password"], config["address"], config["mail_from"], config["host"], config["port"], config["homepage"], )
[docs] def send_mail(self, mail_to: str, mail_subject: str, mail_html: str, mail_plain: str) -> None: """ Sends an e-mail via an SMTP server :param mail_to: Mail recipient :param mail_subject: Mail subject :param mail_html: Mail body as HTML :param mail_plain: Mail body as plain text :return: None """ msg = MIMEMultipart("alternative") msg["From"] = self.mail_from msg["To"] = mail_to msg["Subject"] = mail_subject part1 = MIMEText(mail_plain, "plain") part2 = MIMEText(mail_html, "html") msg.attach(part1) msg.attach(part2) logger.info("To: <%s> Subject: <%s> Msg: <%s>", mail_to, mail_subject, mail_plain) context = ssl.create_default_context() with smtplib.SMTP(self.host, self.port) as smtp: smtp.starttls(context=context) smtp.login(self.username, self.password) smtp.send_message(msg)
[docs]class ValidationResult: """ type that is returned by the validation service """ errors: List[str] warnings: List[str]
[docs] def is_empty(self) -> bool: return not (self.errors or self.warnings)
[docs]@dataclass() class ValidationService: """ Handles the validation of projectsÄ metadata and collects errors and warnings """ rest_url: str = "http://localhost:8080/api/validateMetaData"
[docs] def custom_validation_result_decoder(s, resultDict) -> ValidationResult: tuple = namedtuple('ValidationResult', resultDict.keys())(*resultDict.values()) vr = ValidationResult() vr.errors = tuple.errors vr.warnings = tuple.warnings return vr
[docs] def validate_metadata(self, metadata: dict, top_level: bool) -> ValidationResult: response = requests.post(self.rest_url, json=metadata) print(response.status_code) print(response.json()) return json.loads(response.text, object_hook=self.custom_validation_result_decoder)
[docs]@dataclass class ItemPath: """ Represents a link to another Item (either relative to this path, to another project, or even to another repository """ path: str """directory path either relative to some other Item path, or absolute to root of object""" metadata_file: str """meta data file name""" project_id: int """gitlab project id""" commit: str """git commit id""" gitlab_project: Project """the cached gitlab project""" gitlab_instance: Gitlab """ a gitlab Instance """ def __init__( self, path: str, metadata_file: str, commit: str, project_id: int, gitlab_instance: Gitlab, ): """constructor for ItemPath :param path: the path to potential sub directory :param metadata_file: the name of the meta data file :param commit: the commit hash :param project_id: the gitlab project_id :return: None """ self.path = path self.metadata_file = metadata_file self.commit = commit self.project_id = project_id self.gitlab_instance = gitlab_instance self.gitlab_project = self.gitlab_instance.projects.get(project_id) @classmethod def __split_path(cls, path: str) -> Tuple[str, str]: return os.path.dirname(path), os.path.basename(path)
[docs] def create_children_itempath(self, path: str) -> "ItemPath": """ creates a new itempath relative to self :param path: an id """ git_url, project_id, relative_path = parse_child_path(path) if git_url is None: if project_id < 0: normalized_path = normalize_path( relative_path, self, self.path + "/" + self.metadata_file ) child_path, child_filename = self.__split_path(normalized_path) child_item_path = ItemPath( child_path, child_filename, self.commit, self.project_id, self.gitlab_instance, ) # a minor optimization child_item_path.gitlab_project = self.gitlab_project return child_item_path if git_url is None: other_project: Project = get_project_for_id(project_id, self.gitlab_instance) branch = get_branch_to_index(other_project) last_commit = other_project.commits.list(ref_name=branch)[0] normalized_path = normalize_path(relative_path, self) child_path, child_filename = self.__split_path(normalized_path) result = ItemPath( child_path, child_filename, last_commit.id, project_id, self.gitlab_instance, ) result.gitlab_project = other_project return result # else: # TODO print("not yet supported") raise ValueError("not supported")
[docs] def get_full_path(self) -> str: """returns the full file path""" if self.path is None or self.path == "": return self.metadata_file return self.path + "/" + self.metadata_file
[docs] def get_project_id(self) -> int: """returns the project id""" return self.project_id
[docs] def doc_id(self) -> str: """constructs the doc_id for this item path""" max_id_length = 512 # this is the maximum id length in elastic search # a local reference prefix: str = "[" + str(self.project_id) + "]" rest_length = max_id_length - len(prefix) if rest_length >= len(self.path): doc_id = prefix + self.path else: doc_id = prefix + self.path[len(self.path) - rest_length :] return doc_id
[docs] def get_metadata_file(self) -> str: """returns the base name of the metadata_file, (relative to path)""" return self.metadata_file
[docs] def get_path(self) -> str: """returns the path""" return self.path
def __eq__(self, other: Any) -> bool: return ( isinstance(other, ItemPath) and self.path == other.path and self.metadata_file == other.metadata_file and self.commit == other.commit and self.project_id == other.project_id )
[docs]class Node: """ A simple tree structure for representing a metadata collection """ item: ItemPath children: List[Any] # recursive type annotations are not supported by mypy yet def __init__(self, item: ItemPath): self.item = item self.children = [] def __eq__(self, other: Any) -> bool: return ( isinstance(other, Node) and self.item == other.item and len(self.children) == len(other.children) and all(self.children[i] == other.children[i] for i in range(len(self.children))) )
[docs] def get_all_paths(self) -> List[ItemPath]: """ Returns a list of all Nodes' paths reachable from this node (the tree having this node as the root flattened into a list). :return: all Nodes' paths reachable from this node """ all_paths = [self.item] for child in self.children: all_paths.extend(child.get_all_paths()) return all_paths
[docs] def get_child_paths(self) -> List[ItemPath]: """ Returns a list containing paths of all direct child nodes of this node. :return: the paths of all direct child nodes """ return [child.item for child in self.children]
def __str__(self) -> str: """ returns a readable string representation (nearly as print_tree) on one line """ return f"Node[{self.item.project_id}, {self.item.path}, {self.str_children()}"
[docs] def str_children(self) -> str: """ returns a readable string representation (nearly as print_tree) on one line """ representation = "(" + ",".join(map(str, self.children)) + ")" return representation
[docs] def print_tree(self) -> None: """ Logs a visual representation of the tree having this node as the root. Siblings have the same indentation. A node's children are the nodes logged directly below whose indentation level is one more than the parent node's. :return: None """ self.print_tree_recursive(0)
[docs] def print_tree_recursive(self, level: int) -> None: """ Used internally to recursively generate a visual representation of the tree. :param level: distance from the root :return: None """ indentation = " " * (4 * level) logger.info("%s", indentation + self.item.get_full_path()) for child in self.children: child.print_tree_recursive(level + 1)
[docs]@dataclass class ProjectInfo: # pylint: disable=too-many-instance-attributes """ Represents a project with erroneous metadata file(s) """ author_link: str = field(init=False) branch: str = field(init=False) branch_url: str = field(init=False) commit_author: str = field(init=False) commit_id: str = field(init=False) commit_message: str = field(init=False) commit_url: str = field(init=False) repository: str = field(init=False) repository_url: str = field(init=False) urls: List[Tuple[str, str]] = field(init=False) user_avatar: str = field(init=False) event: InitVar[Dict[str, Any]] = field() branch_name: InitVar[str] = field() commit: InitVar[ProjectCommit] = field() def __post_init__(self, event: Dict[str, Any], branch_name: str, commit: ProjectCommit) -> None: web_url = event["project"]["web_url"] url_parse = urlparse(web_url) parts = list(map(str, url_parse.path[1:].split("/"))) host = f"{url_parse.scheme}://{url_parse.netloc}/" pref = host parts_url = [] for part in parts: pref += part + "/" parts_url.append(pref) self.urls = list(zip(parts, parts_url)) self.branch = branch_name self.branch_url = web_url + "/-/commits/" + branch_name self.commit_id = commit.id self.commit_message = commit.title self.commit_author = commit.author_name self.user_avatar = event["user_avatar"] self.repository = event["project"]["name"] self.author_link = host + event["user_username"] self.commit_url = web_url + "/-/commit/" + self.commit_id self.repository_url = web_url
[docs]@dataclass class MetadataInfo: """ Represents a metadata file with errors """ url: Optional[str] filename: Optional[str] errors: List[str] = field(default_factory=list) warnings: List[str] = field(default_factory=list) def __eq__(self, other: Any) -> bool: return bool( self.url == other.url and self.filename == other.filename and self.errors == other.errors and self.warnings == other.warnings )
[docs]@dataclass class ErrorMessage: """ Collection of all invalid metadata files for a project """ project_info: ProjectInfo metadata_info: List[MetadataInfo] footer_msg: str = "" help_msg: str = ""
[docs] def create_html(self) -> str: """ Returns the error message as HTML :return: The HTML """ self.footer_msg = ( "This email message was auto-generated by the <a " 'href="https://sharing-codeability.uibk.ac.at/">CodeAbility Sharing ' "Platform</a>.<br/>Please do not respond.<br/>If you have any " "questions feel free to contact the <a " 'href="mailto:sharing-codeability@uibk.ac.at">support</a>. ' ) self.help_msg = ( "More information about the expected metadata file can be found in " 'the <a href="https://sharing-codeability.uibk.ac.at/sharing/codeability-' 'sharing-platform/-/wikis/technical/MetaData-Documentation">' "metadata reference</a>. " ) template_env = Environment(loader=FileSystemLoader(DIR)) template = template_env.get_template(PATH_MAIL_TEMPLATE) return str(template.render(error_msg=self)) # cast to string to make mypy happy
[docs] def create_plain(self) -> str: """ Returns the error message as plain text :return: The plain text """ title = "Sharing Platform repository health check failed!" project = f"Project: {self.project_info.repository} ({self.project_info.urls[-1][1]})" branch = f"Branch: {self.project_info.branch} ({self.project_info.branch_url})" commit = f"Commit: {self.project_info.commit_id} ({self.project_info.commit_url})" commit_msg = f"Commit Message: {self.project_info.commit_message}" commit_author = f"Commit Author: {self.project_info.commit_author}" body = "The health check for your commit failed! The following errors were found:\n" for metadata_info in self.metadata_info: file_url = "" if metadata_info.url: file_url = f" ({self.project_info.repository_url}{metadata_info.url})" body += f"\n{metadata_info.filename}{file_url}:" for error in metadata_info.errors: body += f"\n- {error}" help_msg = ( "More information about the expected metadata file can be found in the " "metadata reference (" "https://sharing-codeability.uibk.ac.at/sharing/codeability-sharing" "-platform/-/wikis/technical/MetaData)." ) footnote = ( "This email message was auto-generated by the CodeAbility Sharing Platform (" "https://sharing-codeability.uibk.ac.at/). Please do not respond. If you have " "any questions feel free to contact the support (" "sharing-codeability@uibk.ac.at). " ) return ( f"{title}\n\n{project}\n{branch}\n{commit}\n{commit_msg}\n{commit_author}\n\n" f"{body}\n\n{help_msg}\n-----\n{footnote} " )
[docs]class EventHandler: """ Class for handling GitLab events """ gitlab_instance: Gitlab validation_service: ValidationService mail: Mail elasticsearch_instance: Elasticsearch git_event: Dict[str, Any] def __init__( self, gitlab_instance: Gitlab, mail: Mail, elasticsearch_instance: Elasticsearch, validation_service: ValidationService, git_event: Dict[str, Any], ) -> None: self.gitlab_instance = gitlab_instance self.mail = mail self.validation_service = validation_service self.elasticsearch_instance = elasticsearch_instance self.git_event = git_event
[docs] def handle_event(self) -> None: """ Calls the appropriate function to handle the GitLab system hook events `push`, `project_rename`, `project_transfer`, `project_destroy`, and `group_rename`. (https://docs.gitlab.com/ee/system_hooks/system_hooks.html) :return: None """ if "event_name" in self.git_event: event_name = self.git_event["event_name"] logger.info("Received '%s'", event_name) if event_name == "push": # handle if in sharing on the branch which should be indexed self.handle_push_event() elif event_name == "project_rename": # handle if in sharing self.handle_project_rename_event() elif event_name == "project_transfer": # into sharing: index # out of sharing: delete index # inside sharing: update index # outside sharing: ignore self.handle_project_transfer_event() elif event_name == "project_destroy": # delete index if in sharing self.handle_project_destroy_event() elif event_name == "group_rename": # into sharing: index # out of sharing: delete index # inside sharing: update index # outside sharing: ignore self.handle_group_rename_event() elif event_name in ("user_add_to_team", "user_remove_from_team"): # team changed events: # the solution is to construct a new event that # mimics a push and call the handle_push_event self.handle_team_change_event() else: logger.info("Ignoring event %s", event_name) else: if "object_kind" in self.git_event: logger.info("Ignoring event of kind %s", self.git_event["object_kind"]) else: logger.info("Ignoring unknown event %s", self.git_event)
[docs] def handle_team_change_event(self) -> None: """ Handles the user_add_to/remove_from_team event. :return: None """ project_id = self.git_event["project_id"] project = self.gitlab_instance.projects.get(project_id) branch = get_branch_to_index(project) logger.info( "Handle Team Change Event %s for user %s in %s ", self.git_event["event_name"], self.git_event["user_username"], self.git_event["project_path_with_namespace"], ) new_event = { "project_id": project_id, "project": { "path_with_namespace": self.git_event["project_path_with_namespace"], }, "path": self.git_event["project_path"], "ref": "refs/heads/" + branch, "user_id": self.git_event["user_id"], } # old_event = self.git_event self.git_event = new_event self.handle_push_event()
[docs] def handle_push_event(self) -> None: """ Handles the 'push' event. :return: None """ metadata_mandatory = in_main_group(self.git_event["project"]["path_with_namespace"]) logger.info( "Indexing %s is mandatory: %s", self.git_event["project"]["path_with_namespace"], metadata_mandatory, ) branch_name = self.git_event["ref"][11:] if branch_name not in BRANCH_PRIORITY: return project_id = self.git_event["project_id"] project = self.gitlab_instance.projects.get(project_id) try: branch_to_index = get_branch_to_index(project) except NoBranchToIndexError: logger.info( "Could not index project. No suitable branch exists. Deleting existing data." ) indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance) indexing.project_destroy(project_id) return if branch_name != branch_to_index: return analysed_commit = check_and_index_project( project_id, self.gitlab_instance, self.validation_service, self.mail, self.elasticsearch_instance, metadata_mandatory, ) if analysed_commit is None: logger.info( "Stopped handling push event. Error occurred while trying to get the right commit." ) return logger.info( "Validated push event. branch %s, commit %s.", analysed_commit.branch_name, analysed_commit.commit_hash, ) if analysed_commit.errors and "user_id" in self.git_event: health_check = HealthCheck(self.gitlab_instance, self.validation_service, self.mail) health_check.send_validation_error_mail(self.git_event, analysed_commit) logger.info( "completed handling for push event: project_id: '%s', branch: '%s', commit: '%s'", project_id, analysed_commit.branch_name, analysed_commit.commit_hash, )
[docs] def handle_project_rename_event(self) -> None: """ Handles the 'project_rename' event. :return: None """ project_id = self.git_event["project_id"] path = self.git_event["path"] path_with_namespace = self.git_event["path_with_namespace"] logger.info( "project_id: '%s', path: '%s', path_with_namespace: '%s'", project_id, path, path_with_namespace, ) if in_main_group(path_with_namespace): indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance) web_url = indexing.web_url_project(project_id) indexing.project_rename(project_id, path, path_with_namespace, web_url)
[docs] def handle_project_transfer_event(self) -> None: """ Handles the 'project_transfer' event. :return: None """ project_id = self.git_event["project_id"] path_with_namespace = self.git_event["path_with_namespace"] old_path_with_namespace = self.git_event["old_path_with_namespace"] logger.info( "project_id: '%s', path_with_namespace: '%s', old_path_with_namespace: '%s'", project_id, path_with_namespace, old_path_with_namespace, ) indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance) indexing.project_destroy(project_id) metadata_mandatory = in_main_group(path_with_namespace) check_and_index_project( project_id, self.gitlab_instance, self.mail, self.elasticsearch_instance, metadata_mandatory, )
[docs] def handle_project_destroy_event(self) -> None: """ Handles the 'project_destroy' event. :return: None """ path_with_namespace = self.git_event["path_with_namespace"] if in_main_group(path_with_namespace): project_id = self.git_event["project_id"] indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance) indexing.project_destroy(project_id)
[docs] def handle_group_rename_event(self) -> None: """ Handles the 'group_rename' event. :return: None """ group_id = self.git_event["group_id"] path_with_namespace = self.git_event["full_path"] old_path_with_namespace = self.git_event["old_full_path"] logger.info("group_id: '%s', path_with_namespace: '%s'", group_id, path_with_namespace) if path_with_namespace == old_path_with_namespace: return # A group_rename event cannot move a group into another group, # only the group name and url change. # Thus, the only way for a group which was not in the main group previously # to be in the main group after a group_rename event # is when the group is renamed into the main group name. # This is only possible when the main group does not exist. indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance) metadata_mandatory = (path_with_namespace == GITLAB_MAIN_GROUP) or ( path_with_namespace.startswith(GITLAB_MAIN_GROUP + "/") ) projects = indexing.get_all_projects(group_id) for project in projects: logger.info("Adding project with id: %s", project.id) indexing.project_destroy(project.id) check_and_index_project( project.id, self.gitlab_instance, self.mail, self.elasticsearch_instance, metadata_mandatory, )
[docs]def get_indexing_commit(project: Project) -> Tuple[str, str]: """ Tries to obtain the branch name and commit of the project which should be indexed. :param project: the project which should be indexed :return: branch name, commit hash :raises NoBranchToIndexError: if no suitable branch exists """ branch_to_index = get_branch_to_index(project) try: commit_hash = get_relevant_commit_hash(project, branch_to_index) except GitlabGetError as error: raise NoBranchToIndexError from error return branch_to_index, commit_hash
[docs]def get_project_for_id(project_id: int, gitlab_instance: Gitlab) -> Project: """ Tries to obtain the project from git. :param project_id: the id of the GitLab project :param gitlab_instance: the GitLab instance """ return gitlab_instance.projects.get(project_id)
[docs]def get_commit_to_index(project_id: int, gitlab_instance: Gitlab) -> Tuple[Project, str, str]: """ Tries to obtain the project, branch name and commit of the project which should be indexed. Raises a NoBranchToIndexError if no suitable branch is found. :param project_id: the id of the GitLab project :param gitlab_instance: the GitLab instance :return: project, branch name, commit hash :raises NoBranchToIndexError: if no suitable branch exists """ project = get_project_for_id(project_id, gitlab_instance) branch_to_index, commit_hash = get_indexing_commit(project) return project, branch_to_index, commit_hash
[docs]@dataclass() class AnalysedCommit: """ Contains information about the commit which was analysed, including the result of the analysis. """ project: Project branch_name: str commit_hash: str errors: List[MetadataInfo]
[docs]def check_and_index_project( project_id: int, gitlab_instance: Gitlab, validation_service: ValidationService, mail: Mail, elasticsearch_instance: Elasticsearch, metadata_mandatory: bool, ) -> Optional[AnalysedCommit]: """ Validates and indexes the project with the given id. Does not send error notifications to the user, but information about the analysed commit, including a (potentially empty) list of validation errors. :param project_id: the id of the project to index :param gitlab_instance: the GitLab instance :param mail: Mail object for sending error messages :param elasticsearch_instance: the elasticsearch instance :return: information about the analysed commit or None if no suitable commit can be obtained """ try: project, branch_name, commit_hash = get_commit_to_index(project_id, gitlab_instance) except NoBranchToIndexError: logger.warning("Could not index project with id %s.", project_id) return None health_check = HealthCheck(gitlab_instance, validation_service, mail) validation_errors, tree = health_check.validate_project(gitlab_instance, project, commit_hash) if tree is None: if ( any(error.errors == [MISSING_METADATA_MSG] for error in validation_errors) and not metadata_mandatory ): logger.info("project %s is not index. No metadata found", project_id) return None if tree: # debugging logger.info("parsed tree:") # debugging tree.print_tree() # debugging if tree: indexing = Indexing(gitlab_instance, mail, elasticsearch_instance) indexing.index_entire_repository(tree) return AnalysedCommit(project, branch_name, commit_hash, validation_errors)
[docs]def calculate_project_members( project: Project, gitlab_instance: Gitlab ) -> Tuple[List[str], List[str]]: """ Returns a list of user emails allowed to read the project and a list of group names with read access to the project. """ project_member_emails = [] for member in project.members.list(all=True): user = gitlab_instance.users.get(member.id) project_member_emails.append(user.email) path = f"/projects/{project.get_id()}/groups" groups = project.manager.gitlab.http_get(path) group_names = calculate_all_groups_members(groups, gitlab_instance) for project_group in project.shared_with_groups: group_names.append(project_group.get("group_full_path")) return project_member_emails, group_names
[docs]def calculate_all_groups_members( groups: List[gitlab.v4.objects.Group], gitlab_instance: Gitlab ) -> List[str]: # TODO: add docstring, especially to explain who is a group member # as seen by GitLab and this function # pylint: disable=missing-function-docstring result = [] for group in groups: gitlab_group = gitlab_instance.groups.get(group.get("id")) result.extend(calculate_group_members(gitlab_group)) return result
[docs]def calculate_group_members(group: gitlab.v4.objects.Group) -> List[str]: # TODO: add docstring or inline into calculate_all_group_members # pylint: disable=missing-function-docstring return [shared_group["group_full_path"] for shared_group in group.shared_with_groups] + [ group.full_path ]
[docs]class NoBranchToIndexError(Exception): """ Error raised when a project has no branch with a name appearing in BRANCH_PRIORITY. """
[docs]def get_branch_to_index(project: Project) -> str: """ Returns the name of the branch which should be indexed, following the priority given by BRANCH_PRIORITY. Raises NoBranchToIndexError if no branch with a name appearing in BRANCH_PRIORITY exists. :param project: The project whose branches should be checked :return: The name of the branch to index :raises NoBranchToIndexError: when no branch with a name in BRANCH_PRIORITY exists """ for index_branch in BRANCH_PRIORITY: try: project.branches.get(index_branch) return index_branch except gitlab.exceptions.GitlabGetError: pass raise NoBranchToIndexError
[docs]def get_relevant_commit_hash(project: Project, branch_name: str) -> str: """ Gets the hash of the latest commit on the branch to index for the given project. :param project: the project to get the hash for :param branch_name: name of the branch of which the latest commit should be shown :return: the hash of the latest commit on the branch to index. """ branch = project.branches.get(branch_name) return branch.commit["id"] # type: ignore
[docs]def in_main_group(path_with_namespace: str) -> bool: """ Checks if the root of the given namespace is in the main group :param path_with_namespace: path to the repository to be checked :return: True, if in main_group """ return path_with_namespace.startswith(GITLAB_MAIN_GROUP + "/")
[docs]def get_repository_metadata_files(project: Project, commit: str) -> List[str]: """ Returns all metadata files in the repository's root. :param project: the project instance :param commit: the hash of the commit to analyze :return: List of metadata files """ files = project.repository_tree(as_list=False, ref=commit) found_files = [] for file in files: if not is_regular_file(file["mode"]): continue file_name = file["name"] if file_name.lower() not in METADATA_FILENAMES: continue found_files.append(file_name) return found_files
[docs]def check_for_single_metadata_files(metadata_files: List[str]) -> List[MetadataInfo]: """ Checks if there is exactly one metadata file. :param metadata_files: List of metadata files :return: List of errors """ errors = [] metadata_info = MetadataInfo(None, None) num_metadata_files = len(metadata_files) if num_metadata_files == 0: error_msg = ( "Metadata file is missing! Please, provide a metadata file! You may use " "either 'metadata.yml', 'metadata.yaml' or 'metadata.json'!" ) metadata_info.errors.append(error_msg) metadata_info.filename = "Metadata file is missing!" errors.append(metadata_info) elif num_metadata_files > 1: error_msg = ( "Multiple metadata files were found! Please, choose one " "of the currently provided files: " + ", ".join(metadata_files) ) metadata_info.errors.append(error_msg) metadata_info.filename = "Multiple metadata files found!" errors.append(metadata_info) return errors
[docs]def validate_metadata_file( gitlab_instance: Gitlab, validation_service: ValidationService, metadata_file_path: ItemPath, schema: Any, nesting: int, visited_parents: List[ItemPath] = [], ) -> Tuple[List[MetadataInfo], Optional[Node]]: """ Validates a single metadata file. A file is valid iff it can be parsed in the format specified by its extension (JSON or YAML) and the content conforms to the given schema. :param metadata_file_path: the file to validate :param schema: a dictionary which can be used by jsonschema Draft 7 :param nesting: nesting level of children (0 is top most) :return: Information about the metadata file, including a potentially empty list of errors. """ # TODO check for external urls file = metadata_file_path.gitlab_project.files.get( file_path=metadata_file_path.get_full_path(), ref=metadata_file_path.commit ) raw_file_contents = base64.b64decode(file.content) extension = pathlib.Path(metadata_file_path.get_full_path()).suffix url = f"/-/blob/{file.commit_id}/{file.file_path}" metadata_info = MetadataInfo(url, metadata_file_path.get_full_path()) try: # tries to parse YAML/JSON file_contents = parse_metadata_file(raw_file_contents, extension) except ValueError as value_error: # Raised upon invalid YAML/JSON metadata_info.errors.append(str(value_error)) return [metadata_info], None # validates against our custom metadata schema local_validator = jsonschema.Draft7Validator(schema, format_checker=draft7_format_checker) local_validation_errors = list(local_validator.iter_errors(file_contents)) validation_errors2 = validation_service.validate_metadata(file_contents, nesting == 0) if not validation_errors2.is_empty(): for error in validation_errors2.errors: metadata_info.errors.append(error); for warning in validation_errors2.warnings: metadata_info.warnings.append(warning) if not local_validation_errors and nesting == 0: local_validation_errors = list(check_for_mandatory_fields_on_toplevel(file_contents)) if local_validation_errors: for validation_error in local_validation_errors: error_msg = path_2_key(validation_error) + validation_error.message metadata_info.errors.append(error_msg) return [metadata_info], None # If this point is reached, the metadata file is valid. # Further errors could be in children if this is a collection. node = Node(metadata_file_path) collection_content = file_contents.get("collectionContent") if collection_content is not None: parent_errors, child_metadata_info, child_nodes = validate_collection( gitlab_instance, node.item, collection_content, schema, nesting + 1, visited_parents, ) node.children = child_nodes if parent_errors: metadata_info.errors = parent_errors return [metadata_info] + child_metadata_info, node return child_metadata_info, node return [metadata_info], node
[docs]def check_for_mandatory_fields_on_toplevel( file_content: Dict[str, int], ) -> Iterator[jsonschema.ValidationError]: """checks the file_content for entries that are mandatory on the top level""" list_attributes_required_on_toplevel = [ "creator", "publisher", "license", "language", ] for attr in list_attributes_required_on_toplevel: if attr not in file_content: ex = jsonschema.ValidationError( f"attribute {attr} may not be empty for the top level element" ) yield ex
[docs]def validate_collection( gitlab_instance: Gitlab, parent: ItemPath, collection_content: List[str], schema: Any, nesting: int, visited_parents: List[ItemPath] = [], ) -> Tuple[List[str], List[MetadataInfo], List[Node]]: """ Parses a metadata file's collectionContent. Returns a tuple consisting of a list of errors in the parent's collectionContent specification, a list of errors obtained recursively from the children, and a list of child nodes. :param parent: ItemPath to current meta data :param collection_content: the metadata file's collectionContent (list of paths) :param schema: a dictionary which can be used by jsonschema Draft 7 :param nesting: nesting level of children (0 is top most) :return: a tuple consisting of a list of errors in the parent's collectionContent specification, a list of errors obtained recursively from the children, and a list of child nodes """ if parent in visited_parents: return [f"inifinite recursion in {parent}"], [], [] visited_parents = copy(visited_parents) visited_parents.append(parent) path_errors, normalized_paths = normalize_collection_content_paths(collection_content, parent) child_nodes = [] child_errors = [] for path in normalized_paths: errors, tree = validate_metadata_file( gitlab_instance, path, schema, nesting, visited_parents ) child_errors += errors if tree: child_nodes.append(tree) return path_errors, child_errors, child_nodes
[docs]def normalize_collection_content_paths( collection_content: List[str], parent: ItemPath ) -> Tuple[List[str], List[ItemPath]]: """ Takes the "collectionContent" list from a metadata file, normalizes the paths, and warns about issues, such as duplicate paths. :param collection_content: the "collectionContent" list from a metadata file :param parent: normalized path of the metadata file from which "collectionContent" is taken :return: tuple consisting of a list of error messages and a list of normalized file paths without duplicates """ normalized_paths = [] errors = [] for path in collection_content: try: child_path: ItemPath = parent.create_children_itempath(path) except PathError as error: errors.append(str(error)) continue exists, error_message = check_if_file_exists(child_path) if exists: normalized_paths.append(child_path) else: errors.append( f"collectionContent path {path} ({child_path.path}): " + f"no such file or directory: {error_message}" ) duplicate_paths, paths_without_duplicates = deduplicate_paths(normalized_paths) duplicate_warnings = [generate_duplicate_path_warning(path) for path in duplicate_paths] errors += duplicate_warnings return errors, paths_without_duplicates
[docs]class PathError(ValueError): """ Represents an error in a file path. """
[docs]def normalize_path(child_path: str, parent: ItemPath, parent_path: Optional[str] = None) -> str: """ Takes a potentially un-normalized child path and normalizes it, checking for potential errors. if the path can be normalized and is valid, the normalized path is returned as a str. Otherwise, a PathError is raised. :param child_path: potentially un-normalized path from the parent's "collectionContent" :param parent: the parent of this child :param parent_path: normalized path of a metadata file :return: the normalized path :raise PathError: when the path cannot be normalized or is invalid """ if child_path is None: metadata_files = get_repository_metadata_files(parent.gitlab_project, parent.commit) if len(metadata_files) == 0: logger.warning("Cannot find %s in %s", child_path, parent) if len(metadata_files) > 1: logger.warning("Multiple %s in %s found", child_path, parent) return metadata_files[0] if "../" in child_path: raise PathError( "collectionContent path '" + child_path + "' contains a reference to a parent directory ('../'). " "This is not allowed." ) if parent_path: parent_directory = os.path.dirname(parent_path) if parent_directory == "/": parent_directory = "" else: parent_directory = "" if child_path.startswith("/"): # absolute path from repo root # remove leading slash since gitlab takes relative paths from the repo root child_path = child_path[1:] elif parent_directory: # convert relative to absolute path child_path = parent_directory + "/" + child_path child_directory = os.path.dirname(child_path) if parent_path is not None and ( child_directory == parent_directory or not child_path.startswith(parent_directory) ): raise PathError( "collectionContent path '" + child_path + "' is not in a subdirectory of parent directory /" + parent_directory ) # valid, normalized path return child_path
[docs]def parse_child_path(child_path: str) -> Tuple[str, int, str]: """ parses a child_path and returns the git url, the project id, and the relative path """ pattern = re.compile("^(http[^[]+)?(\\[(\\d*)\\])?(.+)?$") match = pattern.search(child_path) if match is None: raise ParserError("Cannot parse " + child_path) git_url = match.group(1) if match.group(3): project_id = int(match.group(3)) else: project_id = -1 path = match.group(4) return git_url, project_id, path
[docs]def check_if_file_exists(path: ItemPath) -> Tuple[bool, str]: """ Checks if a file specified by path exists. :param path: a normalized path :return: True if the file exists, False otherwise """ try: project = path.gitlab_project if project is not None: if not path.get_full_path(): return True, "" project.files.get(file_path=path.get_full_path(), ref=path.commit) return True, "" # else: # TODO for references to an external project, we should invest more effort return True, "" except GitlabGetError as err: logger.warning("Gitlab error: %s", err) return False, err.error_message
[docs]def deduplicate_paths(paths: List[ItemPath]) -> Tuple[List[ItemPath], List[ItemPath]]: """ Takes a list of paths and checks for duplicates. If duplicates are found, they are returned in the first element of the return tuple. The second element of the return tuple contains the de-duplicated list of paths. The order of the paths will be preserved. For duplicates, the first element will be kept. :param paths: a list of normalized paths, potentially containing duplicates. :return: tuple consisting of a list of duplicate paths and the de-duplicated list. """ paths_without_duplicates = [] duplicate_paths = [] # O(n^2) implementation to preserve ordering for path in paths: if path in paths_without_duplicates: # do not generate more than one warning if a path appears more than twice if path not in duplicate_paths: duplicate_paths.append(path) else: paths_without_duplicates.append(path) return duplicate_paths, paths_without_duplicates
[docs]def generate_duplicate_path_warning(path: ItemPath) -> str: """ Generates a message warning about a duplicate path in a metadata file's collectionContent. :param path: the path appearing more than once :return: a message warning about a duplicate path in a metadata file's collectionContent """ return "collectionContent contains more than one reference to " + path.path
[docs]def parse_metadata_file(file_contents: bytes, extension: str) -> Dict[str, Any]: """ Given the base64 encoded content of a file and the extension of the file, this function attempts to parse the content according to the format specified by the extension. If the extension is '.json' it tries to parse it as a json file, otherwise the data is treated as YAML. :param file_contents: base64 encoded content of the file to parse :param extension: the file extension, including a leading dot. One of {'.json', '.yaml', '.yml'} :return: If successful, a dictionary with the parsed key-value pairs :raises ValueError: if the file contents cannot be parsed as a dictionary """ if not file_contents: raise ValueError("Provided file is empty!") if extension == ".json": try: parsed_metadata = json.loads(file_contents) if not isinstance(parsed_metadata, dict): raise ValueError("The metadata file must be a dictionary!") return parsed_metadata except JSONDecodeError as json_error: raise ValueError("Provided file is not a valid JSON-file!") from json_error else: try: yaml = ruamel.yaml.YAML(typ="safe") parsed_metadata = yaml.load(file_contents) if not isinstance(parsed_metadata, dict): raise ValueError("The metadata file must be a dictionary!") return parsed_metadata except (ParserError, ScannerError) as error: raise ValueError("Provided file is not a valid YAML-file!") from error
[docs]def path_2_key(validation_error: ValidationError) -> str: """ Returns the path to the key for which a validation error occurred. :param validation_error: Validation error :return: Path to validation error """ path = list(validation_error.path) if not path: return "" key_path = str(path[0]) for segment in path[1:]: if isinstance(segment, int): key_path += f"[{segment}]" else: key_path += f".{segment}" return f"'{key_path}': "
[docs]def is_regular_file(mode: str) -> bool: """ Returns true if the file mode corresponds to a regular file and false otherwise. :param mode: file mode :return: True, if regular file. """ file_mode = int(mode, base=8) return ((file_mode & 0o100000) != 0) and ((file_mode & 0o070000) == 0)
[docs]@dataclass() class HealthCheck: """ Handles validation of a project's metadata and warns users when problems with the metadata are found. """ gitlab: Gitlab mail: Mail validation_service: ValidationService def __init__(self, gitlab_instance: Gitlab, validation_service: ValidationService, mail: Mail) -> None: self.gitlab = gitlab_instance self.mail = mail self.validation_service = validation_service
[docs] def send_validation_error_mail( self, git_event: Dict[str, Any], analysed_commit: AnalysedCommit ) -> None: """ Sends an email to the user who triggered the validation warning about errors. Only call this method if there were errors, if the validation did not find any issues no email should be sent :param git_event: the event which triggered the validation :param analysed_commit: information about the commit which was analysed """ user = self.gitlab.users.get(git_event["user_id"]) commit = analysed_commit.project.commits.get(analysed_commit.commit_hash) subject = f"{git_event['project']['path_with_namespace']}: Health check failed!" project_info = ProjectInfo(git_event, analysed_commit.branch_name, commit) error_msg = ErrorMessage(project_info, analysed_commit.errors) mail_html = error_msg.create_html() mail_plain = error_msg.create_plain() try: self.mail.send_mail(user.email, subject, mail_html, mail_plain) except smtplib.SMTPAuthenticationError as error: logger.error("Could not send mail: %s", error)
[docs] def validate_project( self, gitlab_instance: Gitlab, project: Project, commit: str ) -> Tuple[List[MetadataInfo], Optional[Node]]: """ Checks if the project's metadata is defined correctly. This requires exactly one top-level metadata file, recognised by naming convention. While parsing the metadata, errors are collected in a list. Valid metadata file names are stored as a tree, resembling either an atomic project or a collection. :param project: the gitlab project to check :param commit: the hash of the commit to analyze :return: List of validation errors, Tree of metadata (single node if repo is not a collection) """ metadata_files = get_repository_metadata_files(project, commit) errors = check_for_single_metadata_files(metadata_files) if errors: # only validate further if there is exactly one top-level metadata file return errors, None # This code in only reached if there is exactly one top-level metadata file metadata_file = metadata_files[0] schema = json.loads(SCHEMA_METADATA) root_node = ItemPath("", metadata_file, commit, project.id, self.gitlab) return validate_metadata_file( gitlab_instance, self.validation_service, root_node, schema, 0, )
[docs]@dataclass class ProjectPermissions: """ Data class for storing who has read access to a project. """ email_addresses_with_access: List[str] groups_with_access: List[str]
[docs]def unify_keywords(file_contents: Dict[str, Any]) -> Dict[str, Any]: """ Given a dictionary representing the user provided metadata, this function normalizes some keywords. Currently the only thing it does is converting the programming language names to titlecase. :param file_contents: dictionary representing the user provided metadata :return: the input dictionary with normalized keywords """ if "programmingLanguage" in file_contents: file_contents["programmingLanguage"] = [ language.title() for language in file_contents["programmingLanguage"] ] return file_contents
[docs]class MetadataFileIndexer: """ File indexer used to handle indexing of a metadata file (metadata.json, metadata.yml, or metadata.yaml) in elasticsearch :param elasticsearch the elasticsearch interfaces :param index the name of the index :param item path to the item :param project_permissions (read) permissions on this project :param parent_metadata the meta data of the parent object (in order to get it inherited :param parent_id the id of the parent :param child_ids a collection for child ids """ elasticsearch: Elasticsearch index: str project: Project item: ItemPath project_permissions: ProjectPermissions commit: str file_path: str parent_metadata: Dict[str, Any] parent_id: Optional[str] child_ids: List[str] def __init__( self, elasticsearch: Elasticsearch, index: str, item: ItemPath, project_permissions: ProjectPermissions, parent_metadata: Dict[str, Any], parent_id: Optional[str], child_ids: List[str], ) -> None: self.elasticsearch = elasticsearch self.index = index self.project = item.gitlab_project self.item = item self.project_permissions = project_permissions self.commit = item.commit self.file_path = item.get_full_path() self.parent_metadata = parent_metadata self.parent_id = parent_id self.child_ids = child_ids
[docs] def get_file(self) -> ProjectFile: """ Returns a single file of a commit. :return: the file """ return self.project.files.get(file_path=self.item.get_full_path(), ref=self.commit)
[docs] def update_doc(self) -> None: """ Updates an existing document in elasticsearch or creates a new one if no document with this id exists. :return: None """ try: doc = self.create_json_doc() doc_id = self.item.doc_id() self.elasticsearch.index(index=self.index, document=doc, id=doc_id) # TODO: catch more specific exception # pylint: disable=broad-except except Exception as exception: logger.warning("Cannot index %s.\n%s", self.file_path, exception)
[docs] def create_json_doc(self) -> Dict[str, Any]: """ Returns an indexable json document for elasticsearch. :return: The document which should be added/updated to/in elasticsearch """ # only called for valid metadata files, so no validation here file = self.get_file() doc_json = { "project": self.project_info_metadata(), "file": { "filename": str(pathlib.Path(self.file_path).name), "path": self.file_path, "commit_id": file.last_commit_id, "indexing_date": datetime.now(), "last_activity_at": datetime.now(), "parentId": self.parent_id, "children": self.child_ids, }, "metadata": self.get_user_provided_metadata(), } return doc_json
[docs] def project_info(self) -> Dict[str, Any]: """ Returns information about the project. :return: The project information """ namespaces = self.project.path_with_namespace.split("/") sub_group = namespaces[1] if len(namespaces) > 2 else None return { "project_id": self.project.id, "project_name": self.project.path, "namespace": self.project.path_with_namespace, "main_group": namespaces[0], "sub_group": sub_group, "url": self.project.web_url, }
[docs] def get_user_provided_metadata(self) -> Dict[str, Any]: """ Prepares the user provided part of the metadata which is to be stored in the metadata index. Also handles inheritance. :return: the user provided metadata which should be put in the index for the current file """ file = self.get_file() extension = pathlib.Path(file.file_name).suffix raw_file_contents = self.get_file_contents(file) parsed_file_contents = parse_metadata_file(raw_file_contents, extension) # collectionContent should not be in the index, as it potentially contains references # to invalid metadata. To parse a collection's structure from the index # the fields "parentId" and "children" should be used. if parsed_file_contents.get("collectionContent") is not None: del parsed_file_contents["collectionContent"] normalized_file_contents = unify_keywords(parsed_file_contents) atomic_attributes_to_inherit = [ "audience", "difficulty", "educationalAlignment", "educationalFramework", "image", "license", "interactivityType", "status", "typicalAgeRange", "valid", ] list_attributes_to_inherit = [ "assesses", "creator", "publisher", "language", "contributor", "educationalLevel", "educationalUse", "isBasedOn", "programmingLanguage", "requires", "subject", "teaches", ] # inherit string or integer data for attr in atomic_attributes_to_inherit: if attr not in normalized_file_contents and attr in self.parent_metadata: normalized_file_contents[attr] = self.parent_metadata[attr] # inherit list data for attr in list_attributes_to_inherit: if ( attr not in normalized_file_contents or not normalized_file_contents[attr] ) and attr in self.parent_metadata: normalized_file_contents[attr] = self.parent_metadata[attr] return normalized_file_contents
[docs] def project_info_metadata(self) -> Dict[str, Any]: """ Returns a dictionary containing the data which should be put as the value of the 'project' key in the metadata index :return: dictionary containing the project metadata """ return { **self.project_info(), "visibility": self.project.visibility, "users": self.project_permissions.email_addresses_with_access, "groups": self.project_permissions.groups_with_access, "archived": self.project.archived, "star_count": self.project.star_count, "open_issues_count": self.project.open_issues_count, "forks_count": self.project.forks_count, "last_activity_at": self.project.last_activity_at, "description": self.project.description, }
[docs] @staticmethod def get_file_contents(file: ProjectFile) -> bytes: """ Returns the file contents. :param file: file to extract the file content :return: The file content """ content = file.content return base64.b64decode(content)
# pylint: disable=R0904
[docs]class Indexing: """ Contains the functionality required to create and update entries in the metadata index. """ gitlab_instance: Gitlab mail: Mail elasticsearch: Elasticsearch index_name: str def __init__( self, gitlab_instance: Gitlab, mail: Mail, elasticsearch_instance: Elasticsearch, index_name: str = METADATA_INDEX, ): self.gitlab_instance = gitlab_instance self.mail = mail self.elasticsearch = elasticsearch_instance self.index_name = index_name
[docs] def index_all_metadata( self, project_permissions: ProjectPermissions, metadata_tree: Node, ) -> None: """ Index the metadata of the project. :param project_permissions: contains information about who may access the project :param metadata_tree: a tree of valid metadata file paths :return: None """ self.index_metadata_node(project_permissions, metadata_tree, {}, None)
[docs] def index_metadata_node( self, project_permissions: ProjectPermissions, metadata_tree: Node, parent_metadata: Dict[str, Any], parent_id: Optional[str], ) -> None: """ Recursive function for indexing the metadata tree of a project. :param project_permissions: contains information about who may access the project :param metadata_tree: tree of valid metadata file paths :param parent_metadata: the parent's metadata read from the file, used for inheritance :param parent_id: the id of the parent metadata :return: None """ doc_id = metadata_tree.item.doc_id() logger.debug("Indexing metadata for %s in %s", doc_id, metadata_tree.item.get_full_path()) branch_ids = [child_path.doc_id() for child_path in metadata_tree.get_child_paths()] metadata_indexer = MetadataFileIndexer( self.elasticsearch, self.index_name, metadata_tree.item, project_permissions, parent_metadata, parent_id, branch_ids, ) metadata_indexer.update_doc() parent_metadata = metadata_indexer.get_user_provided_metadata() for tree_branch in metadata_tree.children: if tree_branch.item.project_id == metadata_tree.item.project_id: # same project self.index_metadata_node(project_permissions, tree_branch, parent_metadata, doc_id)
[docs] def project_destroy(self, project_id: int) -> None: """ Handles a project_destroy event. Deletes all elements in the index for the given project_id. :param project_id: ID of the project :return: None """ delete_query = {"match": {"project.project_id": project_id}} try: self.elasticsearch.delete_by_query(index=self.index_name, query=delete_query) except (NotFoundError, ConflictError): pass
[docs] def project_rename( self, project_id: int, path: str, path_with_namespace: str, url: str ) -> None: """ Handles a project_rename event. Updates project_name, namespace, main_group and sub_group for all elements of the given project_id in the metadata index :param project_id: ID of the project :param path: the path of the repository (project name) :param path_with_namespace: repository path with namespace :param url: GITLAB_URL to the repository :return: None """ namespaces = path_with_namespace.split("/") sub_group = f"'{namespaces[1]}'" if len(namespaces) > 2 else "null" rename_script = { "source": f"ctx._source.project.project_name='{path}'; " f"ctx._source.project.namespace='{path_with_namespace}'; " f"ctx._source.project.main_group='{namespaces[0]}'; " f"ctx._source.project.sub_group={sub_group}; " f"ctx._source.project.url='{url}'; ", "lang": "painless", } rename_query = {"term": {"project.project_id": project_id}} self.elasticsearch.update_by_query( index=self.index_name, query=rename_query, script=rename_script )
[docs] def web_url_project(self, project_id: int) -> str: """ Returns the web url of a given project id after querying GitLab. :param project_id: ID of the project :return: The Url to the project """ project = self.gitlab_instance.projects.get(project_id) return str(project.web_url)
[docs] def index_entire_repository(self, metadata_tree: Node) -> None: """ Indexes the files of an entire project :param metadata_tree: a tree of valid metadata file paths :return: None """ if metadata_tree is not None: logger.debug( "indexing entire repository starting at %s (%s)", metadata_tree.item.project_id, metadata_tree.item.path, ) self.project_destroy(metadata_tree.item.project_id) user_emails, group_names = calculate_project_members( metadata_tree.item.gitlab_project, self.gitlab_instance ) unique_user_emails = list(dict.fromkeys(user_emails)) # just to remove duplicates unique_group_names = list(dict.fromkeys(group_names)) # just to remove duplicates project_permissions = ProjectPermissions(unique_user_emails, unique_group_names) # metadata if metadata_tree is not None: self.index_all_metadata(project_permissions, metadata_tree)
[docs] def get_all_projects(self, group_id: int) -> List[Project]: """ Returns all projects of a group including projects from subgroups. :param group_id: ID of the group :return: List of all projects """ group = self.gitlab_instance.groups.get(group_id) group_projects = group.projects.list(all=True, include_subgroups=True) # The call above returns instances of "GroupProject" # which are not properly usable (e.g. they do not have a repository_tree method). # Therefore, corresponding "Project" instances are loaded in this list comprehension. projects = [ self.gitlab_instance.projects.get(group_project.id) for group_project in group_projects ] return projects
[docs] def group_rename(self, group_id: int) -> None: """ Handles a group_rename event. Renames all projects of the given group (including subgroups). :param group_id: ID of the group to rename :return: None """ projects = self.get_all_projects(group_id) for project in projects: self.project_rename( project.id, project.path, project.path_with_namespace, project.web_url )
@classmethod def _log_validation_errors(cls, validation_errors: List[MetadataInfo]) -> None: for validation_error in validation_errors: for error in validation_error.errors: logger.info(" %s: %s", validation_error.filename, error)
[docs] def index_all_projects( self, get_relevant_projects: Callable[[], List[Project]], logger_prefix: str = "" ) -> None: """ Indexes the files of an entire group :param get_relevant_projects: method to generate relevant projects :param logger_prefix just to add relevant info to logString :return: None """ projects = get_relevant_projects() for project_number, project in enumerate(projects): if project.empty_repo: logger.debug( "%s / repository %s (skipped - empty repository, %d of %d)", logger_prefix, project.id, project_number, len(projects), ) else: logger.debug( "%s / project %s %s (%d of %d)", logger_prefix, project.id, project.name, project_number, len(projects), ) try: branch_name, commit = get_indexing_commit(project) except NoBranchToIndexError: continue validation_errors, project_root = HealthCheck( self.gitlab_instance, self.mail ).validate_project(self.gitlab_instance, project, commit) if project_root: self.index_entire_repository(project_root) else: logger.info( "there are meta data errors in %s branch %s", project.id, branch_name, ) Indexing._log_validation_errors(validation_errors)
[docs] def get_main_group_id(self) -> int: """ Returns the group ID of the main group. :return: ID of the group """ return int(self.gitlab_instance.groups.get().id)
[docs] def get_root_groups(self) -> list[Group]: """ Returns the group ID of the main group. :return: ID of the group """ root_groups = self.gitlab_instance.groups.list(top_level_only=True) return root_groups # type: ignore
[docs] def get_all_gitlab_users(self) -> list[User]: """ Returns a list of all users. :return: list of users. """ return self.gitlab_instance.users.list(all=True) # type: ignore
[docs] def get_projects_of_user(self, user: User) -> list[Project]: """ returns the list of projects for this user :return: list of user projects """ user_projects = user.projects.list(all=True, include_subgroups=True) projects = [ self.gitlab_instance.projects.get(user_project.id) for user_project in user_projects ] return projects
[docs] def get_all_index_names(self) -> List[str]: """ Returns the names of all indices in the elasticsearch instance. """ return str(self.elasticsearch.cat.indices(h="index")).splitlines()
[docs] def print_all_indexes(self) -> None: """ Prints information of all elasticsearch indices on stdout :return: None """ indices = self.elasticsearch.cat.indices(v=True) if indices == "": print("No index exists!") return print(indices)
[docs] def print_all_aliases(self) -> None: """ Prints information of all elasticsearch aliases on stdout :return: None """ status_msgs = self.elasticsearch.cat.aliases(v=True) if status_msgs == "": print("No alias!") return print(status_msgs)
[docs] def get_alias(self, alias: str) -> Any: """ Tries to get the elasticsearch alias specified by the argument. :param alias: the name of an alias :return: the alias if it exists, None otherwise """ try: return self.elasticsearch.indices.get_alias(index=alias) except NotFoundError: return None
[docs] def add_alias(self, alias: str, indexes: List[str]) -> None: """ Adds an alias for a list of elasticsearch indexes. :param alias: The alias to be set :param indexes: The list of indexes to set the alias :return: None """ if self.elasticsearch.indices.exists_alias(name=alias): logger.error("ERROR: The alias %s already exists!", alias) return for index_ in indexes: if not self.elasticsearch.indices.exists(index=index_): logger.error("ERROR: The index %s does not exist!", index_) return actions_ = [] for index in indexes: action = {"add": {"index": index, "alias": alias, "is_write_index": True}} actions_.append(action) # the type hints seem to be wrong for this function self.elasticsearch.indices.update_aliases(actions=actions_) # type: ignore
[docs] def delete_indexes(self, indexes: List[str]) -> None: """ Deletes a list of elasticsearch indexes :param indexes: :return: None """ for index_ in indexes: self.elasticsearch.indices.delete(index=index_)
[docs] def change_main_indexes(self, indexes_metadata: Tuple[str, str]) -> None: """ Changes the main indexes for the metadata index by removing the alias from the old index and adding the alias the new index. :param indexes_metadata: A pair (old, new) of metadata indexes :return: """ if not self.elasticsearch.indices.exists(index=indexes_metadata[0]): logger.error("ERROR: The old metadata index %s does not exist!", indexes_metadata[0]) return if not self.elasticsearch.indices.exists(index=indexes_metadata[1]): logger.error("ERROR: The new metadata index %s does not exist!", indexes_metadata[1]) return if not self.elasticsearch.indices.exists_alias(name=self.index_name): logger.error("ERROR: The metadata alias %s does not exist!", self.index_name) return elasticsearch_actions = [ {"remove": {"index": indexes_metadata[0], "alias": self.index_name}}, { "add": { "index": indexes_metadata[1], "alias": self.index_name, "is_write_index": True, } }, ] self.elasticsearch.indices.update_aliases(actions=elasticsearch_actions) # type: ignore
[docs] def create_new_index(self) -> None: """ Creates a new index for metadata information. Afterwards the entire main group (sharing-group) is indexed. :return: None """ schema_metadata = json.loads(ES_SCHEMA_METADATA) if self.elasticsearch.indices.exists(index=self.index_name): logger.error("ERROR: Index %s already exists. Aborting!", self.index_name) return logger.debug("Creating indexes") res_create_metadata = self.elasticsearch.indices.create( index=self.index_name, mappings=schema_metadata["mappings"], settings=schema_metadata["settings"], ) logger.debug("%s: %s", self.index_name, res_create_metadata) root_groups = self.get_root_groups() for index, root_group in enumerate(root_groups): logger.debug( "------------------------ indexing root group %s (%d of %d groups)", root_group.full_name, index, len(root_groups), ) log_prefix = f"group {root_group.id} {root_group.name}" def project_finder(group: Group = root_group) -> List[Project]: return self.get_all_projects(group.id) self.index_all_projects(project_finder, log_prefix) users = self.get_all_gitlab_users() for index, user in enumerate(users): logger.debug( "------------------------ indexing user %s (%d of %d users)", user.name, index, len(users), ) log_prefix = "user project" def user_project_finder(project_user: User = user) -> List[Project]: return self.get_projects_of_user(project_user) self.index_all_projects(user_project_finder, log_prefix)
[docs]def logger_setup(filepath: str) -> Dict[str, Any]: """ Returns a dictionary which can be used to configure a logger. :param filepath: path of the log file :return: a dictionary to configure a logger """ return { "version": 1, "disable_existing_loggers": False, "formatters": { "standard": { "format": "%(asctime)s %(levelname)-8s [%(filename)-20s:%(lineno)4d]: %(message)s" }, }, "handlers": { "file_handler": { "level": "INFO", "filename": filepath, "class": "logging.handlers.RotatingFileHandler", "formatter": "standard", "maxBytes": 524288, "backupCount": 2, } }, "loggers": { "": {"handlers": ["file_handler"], "level": "INFO", "propagate": True}, }, }
[docs]def read_gitlab_event() -> Optional[Dict[str, Any]]: """ Reads the GitLab system hook event from stdin. :return: The event """ event_content = "" for line in sys.stdin: event_content = event_content + line event = json.load(io.StringIO(event_content)) if not isinstance(event, dict): logger.error("Received event which is not a dictionary.") return None return event
[docs]def load_config( config_type: ConfigType = ConfigType.PRODUCTION, ) -> Tuple[Gitlab, Mail, Elasticsearch]: """ Parses the configuration in filehooks/config.ini. This file only exists when deployed in the GitLab container and is a copy of one of the files in filehooks/conf/. Which file is used depends on the configuration when setting up GitLab. """ config = _load_config(config_type) gitlab_config = config["gitlab"] gitlab_instance = Gitlab(gitlab_config["url"], gitlab_config["token"]) mail = Mail.from_dict(config["mail"]) elasticsearch_config = config["elasticsearch"] elasticsearch_instance = Elasticsearch(elasticsearch_config["url"]) validation_service = ValidationService(config["validation"]["validation_url"]) return gitlab_instance, mail, elasticsearch_instance, validation_service
def _load_config(config_type): config_file = "config.ini" if config_type == ConfigType.PRODUCTION: config_file = "config.ini" if config_type == ConfigType.STAGING: config_file = "conf/staging.ini" if config_type == ConfigType.LOCAL: config_file = "conf/localConfig.ini" if config_type == ConfigType.DEBUG: config_file = "conf/conf.debug.ini" if config_type == ConfigType.TEST: config_file = "conf/conf.test.ini" config_parser = ConfigParser() config_parser.read(DIR / config_file) config = {s: dict(config_parser.items(s)) for s in config_parser.sections()} return config