# These pylint ignores were put in place to make the state of the code
# at the introduction of automated checks work.
# The code should be refactored to remove these issues.
# If an issue which is warned about without these ignores cannot
# or should not be fixed, the ignore should be placed at that specific location.
# pylint: disable=too-many-lines,too-many-locals,too-many-instance-attributes,too-many-arguments
# pylint: disable=fixme
# TODO: in addition to the problems described above, a few things should be fixable quickly.
# These are marked with TODO. Fix them and re-enable the TODO warning above.
"""
This module can be installed within GitLab
to handle events in its projects.
Its purpose is checking for the presence of metadata
describing the repository's content
and putting valid metadata into an elasticsearch index.
"""
import base64
import io
import json
import logging
import os
import pathlib
import re
import smtplib
import ssl
import sys
from configparser import ConfigParser
from copy import copy
from dataclasses import InitVar, dataclass, field
from datetime import datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from enum import Enum
from json.decoder import JSONDecodeError
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple
from urllib.parse import urlparse
import gitlab # type: ignore
import jsonschema # type: ignore
import requests
import ruamel.yaml
from elasticsearch import ConflictError, Elasticsearch, NotFoundError
from gitlab import Gitlab, GitlabGetError
from collections import namedtuple
from gitlab.v4.objects import ( # type: ignore
Group,
Project,
ProjectCommit,
ProjectFile,
User,
)
from jinja2 import Environment, FileSystemLoader
from jsonschema import ValidationError, draft7_format_checker
from ruamel.yaml.parser import ParserError
from ruamel.yaml.scanner import ScannerError
MISSING_METADATA_MSG = (
"Metadata file is missing! Please, provide a metadata file! You may use "
"either 'metadata.yml', 'metadata.yaml' or 'metadata.json'!"
)
HELP_MSG = "For more information about the expected metadata file see {}."
MAIN_MSG = "The health check for your commit failed! The following errors were found:"
PATH_MAIL_TEMPLATE = "templates/template.html"
logger = logging.getLogger(__name__)
DIR = pathlib.Path(os.path.dirname(__file__))
SCHEMA_METADATA = (DIR / "schemas/gitlab_metadata.schema.json").read_text(encoding="utf-8")
MAP_EXTENSION2FILE_TYPE = json.loads(
(DIR / "schemas/extension2file_type.json").read_text(encoding="utf-8")
)
ES_SCHEMA_METADATA = (DIR / "schemas/es_metadata.schema.json").read_text(encoding="utf-8")
BRANCH_PRIORITY = ["main", "master"]
METADATA_FILENAMES = ["metadata.yaml", "metadata.yml", "metadata.json"]
METADATA_INDEX = "metadata"
GITLAB_MAIN_GROUP = "sharing"
[docs]class ConfigType(Enum):
"""
Enum for choosing the desired configuration
"""
PRODUCTION = 0
TEST = 1
DEBUG = 2
STAGING = 3
LOCAL = 4
[docs]@dataclass()
class Mail:
"""
Class for sending e-mails about validation errors.
"""
username: str
password: str
address: str
mail_from: str
host: str
port: int
homepage: str
[docs] @classmethod
def from_dict(cls, config: Dict[str, Any]) -> Any:
"""
Creates an instance of Mail from the given config.
:param config: configuration to create the class from
:return: the created Mail instance
"""
return cls(
config["username"],
config["password"],
config["address"],
config["mail_from"],
config["host"],
config["port"],
config["homepage"],
)
[docs] def send_mail(self, mail_to: str, mail_subject: str, mail_html: str, mail_plain: str) -> None:
"""
Sends an e-mail via an SMTP server
:param mail_to: Mail recipient
:param mail_subject: Mail subject
:param mail_html: Mail body as HTML
:param mail_plain: Mail body as plain text
:return: None
"""
msg = MIMEMultipart("alternative")
msg["From"] = self.mail_from
msg["To"] = mail_to
msg["Subject"] = mail_subject
part1 = MIMEText(mail_plain, "plain")
part2 = MIMEText(mail_html, "html")
msg.attach(part1)
msg.attach(part2)
logger.info("To: <%s> Subject: <%s> Msg: <%s>", mail_to, mail_subject, mail_plain)
context = ssl.create_default_context()
with smtplib.SMTP(self.host, self.port) as smtp:
smtp.starttls(context=context)
smtp.login(self.username, self.password)
smtp.send_message(msg)
[docs]class ValidationResult:
"""
type that is returned by the validation service
"""
errors: List[str]
warnings: List[str]
[docs] def is_empty(self) -> bool:
return not (self.errors or self.warnings)
[docs]@dataclass()
class ValidationService:
"""
Handles the validation of projectsÄ metadata and collects errors and warnings
"""
rest_url: str = "http://localhost:8080/api/validateMetaData"
[docs] def custom_validation_result_decoder(s, resultDict) -> ValidationResult:
tuple = namedtuple('ValidationResult', resultDict.keys())(*resultDict.values())
vr = ValidationResult()
vr.errors = tuple.errors
vr.warnings = tuple.warnings
return vr
[docs]@dataclass
class ItemPath:
"""
Represents a link to another Item (either relative to this path, to another project,
or even to another repository
"""
path: str
"""directory path either relative to some other Item path, or absolute to root of object"""
metadata_file: str
"""meta data file name"""
project_id: int
"""gitlab project id"""
commit: str
"""git commit id"""
gitlab_project: Project
"""the cached gitlab project"""
gitlab_instance: Gitlab
""" a gitlab Instance """
def __init__(
self,
path: str,
metadata_file: str,
commit: str,
project_id: int,
gitlab_instance: Gitlab,
):
"""constructor for ItemPath
:param path: the path to potential sub directory
:param metadata_file: the name of the meta data file
:param commit: the commit hash
:param project_id: the gitlab project_id
:return: None
"""
self.path = path
self.metadata_file = metadata_file
self.commit = commit
self.project_id = project_id
self.gitlab_instance = gitlab_instance
self.gitlab_project = self.gitlab_instance.projects.get(project_id)
@classmethod
def __split_path(cls, path: str) -> Tuple[str, str]:
return os.path.dirname(path), os.path.basename(path)
[docs] def create_children_itempath(self, path: str) -> "ItemPath":
"""
creates a new itempath relative to self
:param path: an id
"""
git_url, project_id, relative_path = parse_child_path(path)
if git_url is None:
if project_id < 0:
normalized_path = normalize_path(
relative_path, self, self.path + "/" + self.metadata_file
)
child_path, child_filename = self.__split_path(normalized_path)
child_item_path = ItemPath(
child_path,
child_filename,
self.commit,
self.project_id,
self.gitlab_instance,
)
# a minor optimization
child_item_path.gitlab_project = self.gitlab_project
return child_item_path
if git_url is None:
other_project: Project = get_project_for_id(project_id, self.gitlab_instance)
branch = get_branch_to_index(other_project)
last_commit = other_project.commits.list(ref_name=branch)[0]
normalized_path = normalize_path(relative_path, self)
child_path, child_filename = self.__split_path(normalized_path)
result = ItemPath(
child_path,
child_filename,
last_commit.id,
project_id,
self.gitlab_instance,
)
result.gitlab_project = other_project
return result
# else:
# TODO
print("not yet supported")
raise ValueError("not supported")
[docs] def get_full_path(self) -> str:
"""returns the full file path"""
if self.path is None or self.path == "":
return self.metadata_file
return self.path + "/" + self.metadata_file
[docs] def get_project_id(self) -> int:
"""returns the project id"""
return self.project_id
[docs] def doc_id(self) -> str:
"""constructs the doc_id for this item path"""
max_id_length = 512 # this is the maximum id length in elastic search
# a local reference
prefix: str = "[" + str(self.project_id) + "]"
rest_length = max_id_length - len(prefix)
if rest_length >= len(self.path):
doc_id = prefix + self.path
else:
doc_id = prefix + self.path[len(self.path) - rest_length :]
return doc_id
[docs] def get_path(self) -> str:
"""returns the path"""
return self.path
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, ItemPath)
and self.path == other.path
and self.metadata_file == other.metadata_file
and self.commit == other.commit
and self.project_id == other.project_id
)
[docs]class Node:
"""
A simple tree structure for representing a metadata collection
"""
item: ItemPath
children: List[Any] # recursive type annotations are not supported by mypy yet
def __init__(self, item: ItemPath):
self.item = item
self.children = []
def __eq__(self, other: Any) -> bool:
return (
isinstance(other, Node)
and self.item == other.item
and len(self.children) == len(other.children)
and all(self.children[i] == other.children[i] for i in range(len(self.children)))
)
[docs] def get_all_paths(self) -> List[ItemPath]:
"""
Returns a list of all Nodes' paths reachable from this node (the tree having this node
as the root flattened into a list).
:return: all Nodes' paths reachable from this node
"""
all_paths = [self.item]
for child in self.children:
all_paths.extend(child.get_all_paths())
return all_paths
[docs] def get_child_paths(self) -> List[ItemPath]:
"""
Returns a list containing paths of all direct child nodes of this node.
:return: the paths of all direct child nodes
"""
return [child.item for child in self.children]
def __str__(self) -> str:
"""
returns a readable string representation (nearly as print_tree) on one line
"""
return f"Node[{self.item.project_id}, {self.item.path}, {self.str_children()}"
[docs] def str_children(self) -> str:
"""
returns a readable string representation (nearly as print_tree) on one line
"""
representation = "(" + ",".join(map(str, self.children)) + ")"
return representation
[docs] def print_tree(self) -> None:
"""
Logs a visual representation of the tree having this node as the root.
Siblings have the same indentation.
A node's children are the nodes logged directly below whose indentation level is
one more than the parent node's.
:return: None
"""
self.print_tree_recursive(0)
[docs] def print_tree_recursive(self, level: int) -> None:
"""
Used internally to recursively generate a visual representation of the tree.
:param level: distance from the root
:return: None
"""
indentation = " " * (4 * level)
logger.info("%s", indentation + self.item.get_full_path())
for child in self.children:
child.print_tree_recursive(level + 1)
[docs]@dataclass
class ProjectInfo: # pylint: disable=too-many-instance-attributes
"""
Represents a project with erroneous metadata file(s)
"""
author_link: str = field(init=False)
branch: str = field(init=False)
branch_url: str = field(init=False)
commit_author: str = field(init=False)
commit_id: str = field(init=False)
commit_message: str = field(init=False)
commit_url: str = field(init=False)
repository: str = field(init=False)
repository_url: str = field(init=False)
urls: List[Tuple[str, str]] = field(init=False)
user_avatar: str = field(init=False)
event: InitVar[Dict[str, Any]] = field()
branch_name: InitVar[str] = field()
commit: InitVar[ProjectCommit] = field()
def __post_init__(self, event: Dict[str, Any], branch_name: str, commit: ProjectCommit) -> None:
web_url = event["project"]["web_url"]
url_parse = urlparse(web_url)
parts = list(map(str, url_parse.path[1:].split("/")))
host = f"{url_parse.scheme}://{url_parse.netloc}/"
pref = host
parts_url = []
for part in parts:
pref += part + "/"
parts_url.append(pref)
self.urls = list(zip(parts, parts_url))
self.branch = branch_name
self.branch_url = web_url + "/-/commits/" + branch_name
self.commit_id = commit.id
self.commit_message = commit.title
self.commit_author = commit.author_name
self.user_avatar = event["user_avatar"]
self.repository = event["project"]["name"]
self.author_link = host + event["user_username"]
self.commit_url = web_url + "/-/commit/" + self.commit_id
self.repository_url = web_url
[docs]@dataclass
class ErrorMessage:
"""
Collection of all invalid metadata files for a project
"""
project_info: ProjectInfo
metadata_info: List[MetadataInfo]
footer_msg: str = ""
help_msg: str = ""
[docs] def create_html(self) -> str:
"""
Returns the error message as HTML
:return: The HTML
"""
self.footer_msg = (
"This email message was auto-generated by the <a "
'href="https://sharing-codeability.uibk.ac.at/">CodeAbility Sharing '
"Platform</a>.<br/>Please do not respond.<br/>If you have any "
"questions feel free to contact the <a "
'href="mailto:sharing-codeability@uibk.ac.at">support</a>. '
)
self.help_msg = (
"More information about the expected metadata file can be found in "
'the <a href="https://sharing-codeability.uibk.ac.at/sharing/codeability-'
'sharing-platform/-/wikis/technical/MetaData-Documentation">'
"metadata reference</a>. "
)
template_env = Environment(loader=FileSystemLoader(DIR))
template = template_env.get_template(PATH_MAIL_TEMPLATE)
return str(template.render(error_msg=self)) # cast to string to make mypy happy
[docs] def create_plain(self) -> str:
"""
Returns the error message as plain text
:return: The plain text
"""
title = "Sharing Platform repository health check failed!"
project = f"Project: {self.project_info.repository} ({self.project_info.urls[-1][1]})"
branch = f"Branch: {self.project_info.branch} ({self.project_info.branch_url})"
commit = f"Commit: {self.project_info.commit_id} ({self.project_info.commit_url})"
commit_msg = f"Commit Message: {self.project_info.commit_message}"
commit_author = f"Commit Author: {self.project_info.commit_author}"
body = "The health check for your commit failed! The following errors were found:\n"
for metadata_info in self.metadata_info:
file_url = ""
if metadata_info.url:
file_url = f" ({self.project_info.repository_url}{metadata_info.url})"
body += f"\n{metadata_info.filename}{file_url}:"
for error in metadata_info.errors:
body += f"\n- {error}"
help_msg = (
"More information about the expected metadata file can be found in the "
"metadata reference ("
"https://sharing-codeability.uibk.ac.at/sharing/codeability-sharing"
"-platform/-/wikis/technical/MetaData)."
)
footnote = (
"This email message was auto-generated by the CodeAbility Sharing Platform ("
"https://sharing-codeability.uibk.ac.at/). Please do not respond. If you have "
"any questions feel free to contact the support ("
"sharing-codeability@uibk.ac.at). "
)
return (
f"{title}\n\n{project}\n{branch}\n{commit}\n{commit_msg}\n{commit_author}\n\n"
f"{body}\n\n{help_msg}\n-----\n{footnote} "
)
[docs]class EventHandler:
"""
Class for handling GitLab events
"""
gitlab_instance: Gitlab
validation_service: ValidationService
mail: Mail
elasticsearch_instance: Elasticsearch
git_event: Dict[str, Any]
def __init__(
self,
gitlab_instance: Gitlab,
mail: Mail,
elasticsearch_instance: Elasticsearch,
validation_service: ValidationService,
git_event: Dict[str, Any],
) -> None:
self.gitlab_instance = gitlab_instance
self.mail = mail
self.validation_service = validation_service
self.elasticsearch_instance = elasticsearch_instance
self.git_event = git_event
[docs] def handle_event(self) -> None:
"""
Calls the appropriate function to handle the GitLab system hook events `push`,
`project_rename`, `project_transfer`, `project_destroy`, and `group_rename`.
(https://docs.gitlab.com/ee/system_hooks/system_hooks.html)
:return: None
"""
if "event_name" in self.git_event:
event_name = self.git_event["event_name"]
logger.info("Received '%s'", event_name)
if event_name == "push":
# handle if in sharing on the branch which should be indexed
self.handle_push_event()
elif event_name == "project_rename":
# handle if in sharing
self.handle_project_rename_event()
elif event_name == "project_transfer":
# into sharing: index
# out of sharing: delete index
# inside sharing: update index
# outside sharing: ignore
self.handle_project_transfer_event()
elif event_name == "project_destroy":
# delete index if in sharing
self.handle_project_destroy_event()
elif event_name == "group_rename":
# into sharing: index
# out of sharing: delete index
# inside sharing: update index
# outside sharing: ignore
self.handle_group_rename_event()
elif event_name in ("user_add_to_team", "user_remove_from_team"):
# team changed events:
# the solution is to construct a new event that
# mimics a push and call the handle_push_event
self.handle_team_change_event()
else:
logger.info("Ignoring event %s", event_name)
else:
if "object_kind" in self.git_event:
logger.info("Ignoring event of kind %s", self.git_event["object_kind"])
else:
logger.info("Ignoring unknown event %s", self.git_event)
[docs] def handle_team_change_event(self) -> None:
"""
Handles the user_add_to/remove_from_team event.
:return: None
"""
project_id = self.git_event["project_id"]
project = self.gitlab_instance.projects.get(project_id)
branch = get_branch_to_index(project)
logger.info(
"Handle Team Change Event %s for user %s in %s ",
self.git_event["event_name"],
self.git_event["user_username"],
self.git_event["project_path_with_namespace"],
)
new_event = {
"project_id": project_id,
"project": {
"path_with_namespace": self.git_event["project_path_with_namespace"],
},
"path": self.git_event["project_path"],
"ref": "refs/heads/" + branch,
"user_id": self.git_event["user_id"],
}
# old_event = self.git_event
self.git_event = new_event
self.handle_push_event()
[docs] def handle_push_event(self) -> None:
"""
Handles the 'push' event.
:return: None
"""
metadata_mandatory = in_main_group(self.git_event["project"]["path_with_namespace"])
logger.info(
"Indexing %s is mandatory: %s",
self.git_event["project"]["path_with_namespace"],
metadata_mandatory,
)
branch_name = self.git_event["ref"][11:]
if branch_name not in BRANCH_PRIORITY:
return
project_id = self.git_event["project_id"]
project = self.gitlab_instance.projects.get(project_id)
try:
branch_to_index = get_branch_to_index(project)
except NoBranchToIndexError:
logger.info(
"Could not index project. No suitable branch exists. Deleting existing data."
)
indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance)
indexing.project_destroy(project_id)
return
if branch_name != branch_to_index:
return
analysed_commit = check_and_index_project(
project_id,
self.gitlab_instance,
self.validation_service,
self.mail,
self.elasticsearch_instance,
metadata_mandatory,
)
if analysed_commit is None:
logger.info(
"Stopped handling push event. Error occurred while trying to get the right commit."
)
return
logger.info(
"Validated push event. branch %s, commit %s.",
analysed_commit.branch_name,
analysed_commit.commit_hash,
)
if analysed_commit.errors and "user_id" in self.git_event:
health_check = HealthCheck(self.gitlab_instance, self.validation_service, self.mail)
health_check.send_validation_error_mail(self.git_event, analysed_commit)
logger.info(
"completed handling for push event: project_id: '%s', branch: '%s', commit: '%s'",
project_id,
analysed_commit.branch_name,
analysed_commit.commit_hash,
)
[docs] def handle_project_rename_event(self) -> None:
"""
Handles the 'project_rename' event.
:return: None
"""
project_id = self.git_event["project_id"]
path = self.git_event["path"]
path_with_namespace = self.git_event["path_with_namespace"]
logger.info(
"project_id: '%s', path: '%s', path_with_namespace: '%s'",
project_id,
path,
path_with_namespace,
)
if in_main_group(path_with_namespace):
indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance)
web_url = indexing.web_url_project(project_id)
indexing.project_rename(project_id, path, path_with_namespace, web_url)
[docs] def handle_project_transfer_event(self) -> None:
"""
Handles the 'project_transfer' event.
:return: None
"""
project_id = self.git_event["project_id"]
path_with_namespace = self.git_event["path_with_namespace"]
old_path_with_namespace = self.git_event["old_path_with_namespace"]
logger.info(
"project_id: '%s', path_with_namespace: '%s', old_path_with_namespace: '%s'",
project_id,
path_with_namespace,
old_path_with_namespace,
)
indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance)
indexing.project_destroy(project_id)
metadata_mandatory = in_main_group(path_with_namespace)
check_and_index_project(
project_id,
self.gitlab_instance,
self.mail,
self.elasticsearch_instance,
metadata_mandatory,
)
[docs] def handle_project_destroy_event(self) -> None:
"""
Handles the 'project_destroy' event.
:return: None
"""
path_with_namespace = self.git_event["path_with_namespace"]
if in_main_group(path_with_namespace):
project_id = self.git_event["project_id"]
indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance)
indexing.project_destroy(project_id)
[docs] def handle_group_rename_event(self) -> None:
"""
Handles the 'group_rename' event.
:return: None
"""
group_id = self.git_event["group_id"]
path_with_namespace = self.git_event["full_path"]
old_path_with_namespace = self.git_event["old_full_path"]
logger.info("group_id: '%s', path_with_namespace: '%s'", group_id, path_with_namespace)
if path_with_namespace == old_path_with_namespace:
return
# A group_rename event cannot move a group into another group,
# only the group name and url change.
# Thus, the only way for a group which was not in the main group previously
# to be in the main group after a group_rename event
# is when the group is renamed into the main group name.
# This is only possible when the main group does not exist.
indexing = Indexing(self.gitlab_instance, self.mail, self.elasticsearch_instance)
metadata_mandatory = (path_with_namespace == GITLAB_MAIN_GROUP) or (
path_with_namespace.startswith(GITLAB_MAIN_GROUP + "/")
)
projects = indexing.get_all_projects(group_id)
for project in projects:
logger.info("Adding project with id: %s", project.id)
indexing.project_destroy(project.id)
check_and_index_project(
project.id,
self.gitlab_instance,
self.mail,
self.elasticsearch_instance,
metadata_mandatory,
)
[docs]def get_indexing_commit(project: Project) -> Tuple[str, str]:
"""
Tries to obtain the branch name and commit of the project which should be indexed.
:param project: the project which should be indexed
:return: branch name, commit hash
:raises NoBranchToIndexError: if no suitable branch exists
"""
branch_to_index = get_branch_to_index(project)
try:
commit_hash = get_relevant_commit_hash(project, branch_to_index)
except GitlabGetError as error:
raise NoBranchToIndexError from error
return branch_to_index, commit_hash
[docs]def get_project_for_id(project_id: int, gitlab_instance: Gitlab) -> Project:
"""
Tries to obtain the project from git.
:param project_id: the id of the GitLab project
:param gitlab_instance: the GitLab instance
"""
return gitlab_instance.projects.get(project_id)
[docs]def get_commit_to_index(project_id: int, gitlab_instance: Gitlab) -> Tuple[Project, str, str]:
"""
Tries to obtain the project, branch name and commit of the project which should be indexed.
Raises a NoBranchToIndexError if no suitable branch is found.
:param project_id: the id of the GitLab project
:param gitlab_instance: the GitLab instance
:return: project, branch name, commit hash
:raises NoBranchToIndexError: if no suitable branch exists
"""
project = get_project_for_id(project_id, gitlab_instance)
branch_to_index, commit_hash = get_indexing_commit(project)
return project, branch_to_index, commit_hash
[docs]@dataclass()
class AnalysedCommit:
"""
Contains information about the commit which was analysed, including the result of the analysis.
"""
project: Project
branch_name: str
commit_hash: str
errors: List[MetadataInfo]
[docs]def check_and_index_project(
project_id: int,
gitlab_instance: Gitlab,
validation_service: ValidationService,
mail: Mail,
elasticsearch_instance: Elasticsearch,
metadata_mandatory: bool,
) -> Optional[AnalysedCommit]:
"""
Validates and indexes the project with the given id.
Does not send error notifications to the user,
but information about the analysed commit,
including a (potentially empty) list of validation errors.
:param project_id: the id of the project to index
:param gitlab_instance: the GitLab instance
:param mail: Mail object for sending error messages
:param elasticsearch_instance: the elasticsearch instance
:return: information about the analysed commit or None if no suitable commit can be obtained
"""
try:
project, branch_name, commit_hash = get_commit_to_index(project_id, gitlab_instance)
except NoBranchToIndexError:
logger.warning("Could not index project with id %s.", project_id)
return None
health_check = HealthCheck(gitlab_instance, validation_service, mail)
validation_errors, tree = health_check.validate_project(gitlab_instance, project, commit_hash)
if tree is None:
if (
any(error.errors == [MISSING_METADATA_MSG] for error in validation_errors)
and not metadata_mandatory
):
logger.info("project %s is not index. No metadata found", project_id)
return None
if tree: # debugging
logger.info("parsed tree:") # debugging
tree.print_tree() # debugging
if tree:
indexing = Indexing(gitlab_instance, mail, elasticsearch_instance)
indexing.index_entire_repository(tree)
return AnalysedCommit(project, branch_name, commit_hash, validation_errors)
[docs]def calculate_project_members(
project: Project, gitlab_instance: Gitlab
) -> Tuple[List[str], List[str]]:
"""
Returns a list of user emails allowed to read the project and a list of group names
with read access to the project.
"""
project_member_emails = []
for member in project.members.list(all=True):
user = gitlab_instance.users.get(member.id)
project_member_emails.append(user.email)
path = f"/projects/{project.get_id()}/groups"
groups = project.manager.gitlab.http_get(path)
group_names = calculate_all_groups_members(groups, gitlab_instance)
for project_group in project.shared_with_groups:
group_names.append(project_group.get("group_full_path"))
return project_member_emails, group_names
[docs]def calculate_all_groups_members(
groups: List[gitlab.v4.objects.Group], gitlab_instance: Gitlab
) -> List[str]:
# TODO: add docstring, especially to explain who is a group member
# as seen by GitLab and this function
# pylint: disable=missing-function-docstring
result = []
for group in groups:
gitlab_group = gitlab_instance.groups.get(group.get("id"))
result.extend(calculate_group_members(gitlab_group))
return result
[docs]def calculate_group_members(group: gitlab.v4.objects.Group) -> List[str]:
# TODO: add docstring or inline into calculate_all_group_members
# pylint: disable=missing-function-docstring
return [shared_group["group_full_path"] for shared_group in group.shared_with_groups] + [
group.full_path
]
[docs]class NoBranchToIndexError(Exception):
"""
Error raised when a project has no branch with a name
appearing in BRANCH_PRIORITY.
"""
[docs]def get_branch_to_index(project: Project) -> str:
"""
Returns the name of the branch which should be indexed,
following the priority given by BRANCH_PRIORITY.
Raises NoBranchToIndexError if no branch with a name
appearing in BRANCH_PRIORITY exists.
:param project: The project whose branches should be checked
:return: The name of the branch to index
:raises NoBranchToIndexError: when no branch with a name in BRANCH_PRIORITY exists
"""
for index_branch in BRANCH_PRIORITY:
try:
project.branches.get(index_branch)
return index_branch
except gitlab.exceptions.GitlabGetError:
pass
raise NoBranchToIndexError
[docs]def get_relevant_commit_hash(project: Project, branch_name: str) -> str:
"""
Gets the hash of the latest commit on the branch to index for the given project.
:param project: the project to get the hash for
:param branch_name: name of the branch of which the latest commit should be shown
:return: the hash of the latest commit on the branch to index.
"""
branch = project.branches.get(branch_name)
return branch.commit["id"] # type: ignore
[docs]def in_main_group(path_with_namespace: str) -> bool:
"""
Checks if the root of the given namespace is in the main group
:param path_with_namespace: path to the repository to be checked
:return: True, if in main_group
"""
return path_with_namespace.startswith(GITLAB_MAIN_GROUP + "/")
[docs]def check_for_mandatory_fields_on_toplevel(
file_content: Dict[str, int],
) -> Iterator[jsonschema.ValidationError]:
"""checks the file_content for entries that are mandatory on the top level"""
list_attributes_required_on_toplevel = [
"creator",
"publisher",
"license",
"language",
]
for attr in list_attributes_required_on_toplevel:
if attr not in file_content:
ex = jsonschema.ValidationError(
f"attribute {attr} may not be empty for the top level element"
)
yield ex
[docs]def validate_collection(
gitlab_instance: Gitlab,
parent: ItemPath,
collection_content: List[str],
schema: Any,
nesting: int,
visited_parents: List[ItemPath] = [],
) -> Tuple[List[str], List[MetadataInfo], List[Node]]:
"""
Parses a metadata file's collectionContent.
Returns a tuple consisting of a list of errors in the parent's collectionContent specification,
a list of errors obtained recursively from the children, and a list of child nodes.
:param parent: ItemPath to current meta data
:param collection_content: the metadata file's collectionContent (list of paths)
:param schema: a dictionary which can be used by jsonschema Draft 7
:param nesting: nesting level of children (0 is top most)
:return: a tuple consisting of a list of errors in the parent's collectionContent specification,
a list of errors obtained recursively from the children, and a list of child nodes
"""
if parent in visited_parents:
return [f"inifinite recursion in {parent}"], [], []
visited_parents = copy(visited_parents)
visited_parents.append(parent)
path_errors, normalized_paths = normalize_collection_content_paths(collection_content, parent)
child_nodes = []
child_errors = []
for path in normalized_paths:
errors, tree = validate_metadata_file(
gitlab_instance, path, schema, nesting, visited_parents
)
child_errors += errors
if tree:
child_nodes.append(tree)
return path_errors, child_errors, child_nodes
[docs]def normalize_collection_content_paths(
collection_content: List[str], parent: ItemPath
) -> Tuple[List[str], List[ItemPath]]:
"""
Takes the "collectionContent" list from a metadata file, normalizes the paths,
and warns about issues, such as duplicate paths.
:param collection_content: the "collectionContent" list from a metadata file
:param parent: normalized path of the metadata file from which "collectionContent" is taken
:return: tuple consisting of a list of error messages and a list of normalized file paths
without duplicates
"""
normalized_paths = []
errors = []
for path in collection_content:
try:
child_path: ItemPath = parent.create_children_itempath(path)
except PathError as error:
errors.append(str(error))
continue
exists, error_message = check_if_file_exists(child_path)
if exists:
normalized_paths.append(child_path)
else:
errors.append(
f"collectionContent path {path} ({child_path.path}): "
+ f"no such file or directory: {error_message}"
)
duplicate_paths, paths_without_duplicates = deduplicate_paths(normalized_paths)
duplicate_warnings = [generate_duplicate_path_warning(path) for path in duplicate_paths]
errors += duplicate_warnings
return errors, paths_without_duplicates
[docs]class PathError(ValueError):
"""
Represents an error in a file path.
"""
[docs]def normalize_path(child_path: str, parent: ItemPath, parent_path: Optional[str] = None) -> str:
"""
Takes a potentially un-normalized child path and normalizes it, checking for potential errors.
if the path can be normalized and is valid, the normalized path is returned as a str.
Otherwise, a PathError is raised.
:param child_path: potentially un-normalized path from the parent's "collectionContent"
:param parent: the parent of this child
:param parent_path: normalized path of a metadata file
:return: the normalized path
:raise PathError: when the path cannot be normalized or is invalid
"""
if child_path is None:
metadata_files = get_repository_metadata_files(parent.gitlab_project, parent.commit)
if len(metadata_files) == 0:
logger.warning("Cannot find %s in %s", child_path, parent)
if len(metadata_files) > 1:
logger.warning("Multiple %s in %s found", child_path, parent)
return metadata_files[0]
if "../" in child_path:
raise PathError(
"collectionContent path '"
+ child_path
+ "' contains a reference to a parent directory ('../'). "
"This is not allowed."
)
if parent_path:
parent_directory = os.path.dirname(parent_path)
if parent_directory == "/":
parent_directory = ""
else:
parent_directory = ""
if child_path.startswith("/"):
# absolute path from repo root
# remove leading slash since gitlab takes relative paths from the repo root
child_path = child_path[1:]
elif parent_directory:
# convert relative to absolute path
child_path = parent_directory + "/" + child_path
child_directory = os.path.dirname(child_path)
if parent_path is not None and (
child_directory == parent_directory or not child_path.startswith(parent_directory)
):
raise PathError(
"collectionContent path '"
+ child_path
+ "' is not in a subdirectory of parent directory /"
+ parent_directory
)
# valid, normalized path
return child_path
[docs]def parse_child_path(child_path: str) -> Tuple[str, int, str]:
"""
parses a child_path and returns the git url, the project id, and the relative path
"""
pattern = re.compile("^(http[^[]+)?(\\[(\\d*)\\])?(.+)?$")
match = pattern.search(child_path)
if match is None:
raise ParserError("Cannot parse " + child_path)
git_url = match.group(1)
if match.group(3):
project_id = int(match.group(3))
else:
project_id = -1
path = match.group(4)
return git_url, project_id, path
[docs]def check_if_file_exists(path: ItemPath) -> Tuple[bool, str]:
"""
Checks if a file specified by path exists.
:param path: a normalized path
:return: True if the file exists, False otherwise
"""
try:
project = path.gitlab_project
if project is not None:
if not path.get_full_path():
return True, ""
project.files.get(file_path=path.get_full_path(), ref=path.commit)
return True, ""
# else:
# TODO for references to an external project, we should invest more effort
return True, ""
except GitlabGetError as err:
logger.warning("Gitlab error: %s", err)
return False, err.error_message
[docs]def deduplicate_paths(paths: List[ItemPath]) -> Tuple[List[ItemPath], List[ItemPath]]:
"""
Takes a list of paths and checks for duplicates.
If duplicates are found, they are returned in the first element of the return tuple.
The second element of the return tuple contains the de-duplicated list of paths.
The order of the paths will be preserved. For duplicates, the first element will be kept.
:param paths: a list of normalized paths, potentially containing duplicates.
:return: tuple consisting of a list of duplicate paths and the de-duplicated list.
"""
paths_without_duplicates = []
duplicate_paths = []
# O(n^2) implementation to preserve ordering
for path in paths:
if path in paths_without_duplicates:
# do not generate more than one warning if a path appears more than twice
if path not in duplicate_paths:
duplicate_paths.append(path)
else:
paths_without_duplicates.append(path)
return duplicate_paths, paths_without_duplicates
[docs]def generate_duplicate_path_warning(path: ItemPath) -> str:
"""
Generates a message warning about a duplicate path in a metadata file's collectionContent.
:param path: the path appearing more than once
:return: a message warning about a duplicate path in a metadata file's collectionContent
"""
return "collectionContent contains more than one reference to " + path.path
[docs]def path_2_key(validation_error: ValidationError) -> str:
"""
Returns the path to the key for which a validation error occurred.
:param validation_error: Validation error
:return: Path to validation error
"""
path = list(validation_error.path)
if not path:
return ""
key_path = str(path[0])
for segment in path[1:]:
if isinstance(segment, int):
key_path += f"[{segment}]"
else:
key_path += f".{segment}"
return f"'{key_path}': "
[docs]def is_regular_file(mode: str) -> bool:
"""
Returns true if the file mode corresponds to a regular file and false otherwise.
:param mode: file mode
:return: True, if regular file.
"""
file_mode = int(mode, base=8)
return ((file_mode & 0o100000) != 0) and ((file_mode & 0o070000) == 0)
[docs]@dataclass()
class HealthCheck:
"""
Handles validation of a project's metadata and warns users when problems with the metadata
are found.
"""
gitlab: Gitlab
mail: Mail
validation_service: ValidationService
def __init__(self, gitlab_instance: Gitlab, validation_service: ValidationService, mail: Mail) -> None:
self.gitlab = gitlab_instance
self.mail = mail
self.validation_service = validation_service
[docs] def send_validation_error_mail(
self, git_event: Dict[str, Any], analysed_commit: AnalysedCommit
) -> None:
"""
Sends an email to the user who triggered the validation warning about errors.
Only call this method if there were errors, if the validation did not find
any issues no email should be sent
:param git_event: the event which triggered the validation
:param analysed_commit: information about the commit which was analysed
"""
user = self.gitlab.users.get(git_event["user_id"])
commit = analysed_commit.project.commits.get(analysed_commit.commit_hash)
subject = f"{git_event['project']['path_with_namespace']}: Health check failed!"
project_info = ProjectInfo(git_event, analysed_commit.branch_name, commit)
error_msg = ErrorMessage(project_info, analysed_commit.errors)
mail_html = error_msg.create_html()
mail_plain = error_msg.create_plain()
try:
self.mail.send_mail(user.email, subject, mail_html, mail_plain)
except smtplib.SMTPAuthenticationError as error:
logger.error("Could not send mail: %s", error)
[docs] def validate_project(
self, gitlab_instance: Gitlab, project: Project, commit: str
) -> Tuple[List[MetadataInfo], Optional[Node]]:
"""
Checks if the project's metadata is defined correctly.
This requires exactly one top-level metadata file, recognised by naming convention.
While parsing the metadata, errors are collected in a list.
Valid metadata file names are stored as a tree, resembling either an atomic project
or a collection.
:param project: the gitlab project to check
:param commit: the hash of the commit to analyze
:return: List of validation errors, Tree of metadata
(single node if repo is not a collection)
"""
metadata_files = get_repository_metadata_files(project, commit)
errors = check_for_single_metadata_files(metadata_files)
if errors:
# only validate further if there is exactly one top-level metadata file
return errors, None
# This code in only reached if there is exactly one top-level metadata file
metadata_file = metadata_files[0]
schema = json.loads(SCHEMA_METADATA)
root_node = ItemPath("", metadata_file, commit, project.id, self.gitlab)
return validate_metadata_file(
gitlab_instance,
self.validation_service,
root_node,
schema,
0,
)
[docs]@dataclass
class ProjectPermissions:
"""
Data class for storing who has read access to a project.
"""
email_addresses_with_access: List[str]
groups_with_access: List[str]
[docs]def unify_keywords(file_contents: Dict[str, Any]) -> Dict[str, Any]:
"""
Given a dictionary representing the user provided metadata,
this function normalizes some keywords.
Currently the only thing it does is converting the programming language names to titlecase.
:param file_contents: dictionary representing the user provided metadata
:return: the input dictionary with normalized keywords
"""
if "programmingLanguage" in file_contents:
file_contents["programmingLanguage"] = [
language.title() for language in file_contents["programmingLanguage"]
]
return file_contents
# pylint: disable=R0904
[docs]class Indexing:
"""
Contains the functionality required to create and update entries in the metadata index.
"""
gitlab_instance: Gitlab
mail: Mail
elasticsearch: Elasticsearch
index_name: str
def __init__(
self,
gitlab_instance: Gitlab,
mail: Mail,
elasticsearch_instance: Elasticsearch,
index_name: str = METADATA_INDEX,
):
self.gitlab_instance = gitlab_instance
self.mail = mail
self.elasticsearch = elasticsearch_instance
self.index_name = index_name
[docs] def project_destroy(self, project_id: int) -> None:
"""
Handles a project_destroy event. Deletes all elements in the index for the given project_id.
:param project_id: ID of the project
:return: None
"""
delete_query = {"match": {"project.project_id": project_id}}
try:
self.elasticsearch.delete_by_query(index=self.index_name, query=delete_query)
except (NotFoundError, ConflictError):
pass
[docs] def project_rename(
self, project_id: int, path: str, path_with_namespace: str, url: str
) -> None:
"""
Handles a project_rename event. Updates project_name, namespace, main_group and sub_group
for all elements of the given project_id in the metadata index
:param project_id: ID of the project
:param path: the path of the repository (project name)
:param path_with_namespace: repository path with namespace
:param url: GITLAB_URL to the repository
:return: None
"""
namespaces = path_with_namespace.split("/")
sub_group = f"'{namespaces[1]}'" if len(namespaces) > 2 else "null"
rename_script = {
"source": f"ctx._source.project.project_name='{path}'; "
f"ctx._source.project.namespace='{path_with_namespace}'; "
f"ctx._source.project.main_group='{namespaces[0]}'; "
f"ctx._source.project.sub_group={sub_group}; "
f"ctx._source.project.url='{url}'; ",
"lang": "painless",
}
rename_query = {"term": {"project.project_id": project_id}}
self.elasticsearch.update_by_query(
index=self.index_name, query=rename_query, script=rename_script
)
[docs] def web_url_project(self, project_id: int) -> str:
"""
Returns the web url of a given project id after querying GitLab.
:param project_id: ID of the project
:return: The Url to the project
"""
project = self.gitlab_instance.projects.get(project_id)
return str(project.web_url)
[docs] def index_entire_repository(self, metadata_tree: Node) -> None:
"""
Indexes the files of an entire project
:param metadata_tree: a tree of valid metadata file paths
:return: None
"""
if metadata_tree is not None:
logger.debug(
"indexing entire repository starting at %s (%s)",
metadata_tree.item.project_id,
metadata_tree.item.path,
)
self.project_destroy(metadata_tree.item.project_id)
user_emails, group_names = calculate_project_members(
metadata_tree.item.gitlab_project, self.gitlab_instance
)
unique_user_emails = list(dict.fromkeys(user_emails)) # just to remove duplicates
unique_group_names = list(dict.fromkeys(group_names)) # just to remove duplicates
project_permissions = ProjectPermissions(unique_user_emails, unique_group_names)
# metadata
if metadata_tree is not None:
self.index_all_metadata(project_permissions, metadata_tree)
[docs] def get_all_projects(self, group_id: int) -> List[Project]:
"""
Returns all projects of a group including projects from subgroups.
:param group_id: ID of the group
:return: List of all projects
"""
group = self.gitlab_instance.groups.get(group_id)
group_projects = group.projects.list(all=True, include_subgroups=True)
# The call above returns instances of "GroupProject"
# which are not properly usable (e.g. they do not have a repository_tree method).
# Therefore, corresponding "Project" instances are loaded in this list comprehension.
projects = [
self.gitlab_instance.projects.get(group_project.id) for group_project in group_projects
]
return projects
[docs] def group_rename(self, group_id: int) -> None:
"""
Handles a group_rename event. Renames all projects of the given group (including subgroups).
:param group_id: ID of the group to rename
:return: None
"""
projects = self.get_all_projects(group_id)
for project in projects:
self.project_rename(
project.id, project.path, project.path_with_namespace, project.web_url
)
@classmethod
def _log_validation_errors(cls, validation_errors: List[MetadataInfo]) -> None:
for validation_error in validation_errors:
for error in validation_error.errors:
logger.info(" %s: %s", validation_error.filename, error)
[docs] def index_all_projects(
self, get_relevant_projects: Callable[[], List[Project]], logger_prefix: str = ""
) -> None:
"""
Indexes the files of an entire group
:param get_relevant_projects: method to generate relevant projects
:param logger_prefix just to add relevant info to logString
:return: None
"""
projects = get_relevant_projects()
for project_number, project in enumerate(projects):
if project.empty_repo:
logger.debug(
"%s / repository %s (skipped - empty repository, %d of %d)",
logger_prefix,
project.id,
project_number,
len(projects),
)
else:
logger.debug(
"%s / project %s %s (%d of %d)",
logger_prefix,
project.id,
project.name,
project_number,
len(projects),
)
try:
branch_name, commit = get_indexing_commit(project)
except NoBranchToIndexError:
continue
validation_errors, project_root = HealthCheck(
self.gitlab_instance, self.mail
).validate_project(self.gitlab_instance, project, commit)
if project_root:
self.index_entire_repository(project_root)
else:
logger.info(
"there are meta data errors in %s branch %s",
project.id,
branch_name,
)
Indexing._log_validation_errors(validation_errors)
[docs] def get_main_group_id(self) -> int:
"""
Returns the group ID of the main group.
:return: ID of the group
"""
return int(self.gitlab_instance.groups.get().id)
[docs] def get_root_groups(self) -> list[Group]:
"""
Returns the group ID of the main group.
:return: ID of the group
"""
root_groups = self.gitlab_instance.groups.list(top_level_only=True)
return root_groups # type: ignore
[docs] def get_all_gitlab_users(self) -> list[User]:
"""
Returns a list of all users.
:return: list of users.
"""
return self.gitlab_instance.users.list(all=True) # type: ignore
[docs] def get_projects_of_user(self, user: User) -> list[Project]:
"""
returns the list of projects for this user
:return: list of user projects
"""
user_projects = user.projects.list(all=True, include_subgroups=True)
projects = [
self.gitlab_instance.projects.get(user_project.id) for user_project in user_projects
]
return projects
[docs] def get_all_index_names(self) -> List[str]:
"""
Returns the names of all indices in the elasticsearch instance.
"""
return str(self.elasticsearch.cat.indices(h="index")).splitlines()
[docs] def print_all_indexes(self) -> None:
"""
Prints information of all elasticsearch indices on stdout
:return: None
"""
indices = self.elasticsearch.cat.indices(v=True)
if indices == "":
print("No index exists!")
return
print(indices)
[docs] def print_all_aliases(self) -> None:
"""
Prints information of all elasticsearch aliases on stdout
:return: None
"""
status_msgs = self.elasticsearch.cat.aliases(v=True)
if status_msgs == "":
print("No alias!")
return
print(status_msgs)
[docs] def get_alias(self, alias: str) -> Any:
"""
Tries to get the elasticsearch alias specified by the argument.
:param alias: the name of an alias
:return: the alias if it exists, None otherwise
"""
try:
return self.elasticsearch.indices.get_alias(index=alias)
except NotFoundError:
return None
[docs] def add_alias(self, alias: str, indexes: List[str]) -> None:
"""
Adds an alias for a list of elasticsearch indexes.
:param alias: The alias to be set
:param indexes: The list of indexes to set the alias
:return: None
"""
if self.elasticsearch.indices.exists_alias(name=alias):
logger.error("ERROR: The alias %s already exists!", alias)
return
for index_ in indexes:
if not self.elasticsearch.indices.exists(index=index_):
logger.error("ERROR: The index %s does not exist!", index_)
return
actions_ = []
for index in indexes:
action = {"add": {"index": index, "alias": alias, "is_write_index": True}}
actions_.append(action)
# the type hints seem to be wrong for this function
self.elasticsearch.indices.update_aliases(actions=actions_) # type: ignore
[docs] def delete_indexes(self, indexes: List[str]) -> None:
"""
Deletes a list of elasticsearch indexes
:param indexes:
:return: None
"""
for index_ in indexes:
self.elasticsearch.indices.delete(index=index_)
[docs] def change_main_indexes(self, indexes_metadata: Tuple[str, str]) -> None:
"""
Changes the main indexes for the metadata index by removing the alias from the
old index and adding the alias the new index.
:param indexes_metadata: A pair (old, new) of metadata indexes
:return:
"""
if not self.elasticsearch.indices.exists(index=indexes_metadata[0]):
logger.error("ERROR: The old metadata index %s does not exist!", indexes_metadata[0])
return
if not self.elasticsearch.indices.exists(index=indexes_metadata[1]):
logger.error("ERROR: The new metadata index %s does not exist!", indexes_metadata[1])
return
if not self.elasticsearch.indices.exists_alias(name=self.index_name):
logger.error("ERROR: The metadata alias %s does not exist!", self.index_name)
return
elasticsearch_actions = [
{"remove": {"index": indexes_metadata[0], "alias": self.index_name}},
{
"add": {
"index": indexes_metadata[1],
"alias": self.index_name,
"is_write_index": True,
}
},
]
self.elasticsearch.indices.update_aliases(actions=elasticsearch_actions) # type: ignore
[docs] def create_new_index(self) -> None:
"""
Creates a new index for metadata information.
Afterwards the entire main group (sharing-group) is indexed.
:return: None
"""
schema_metadata = json.loads(ES_SCHEMA_METADATA)
if self.elasticsearch.indices.exists(index=self.index_name):
logger.error("ERROR: Index %s already exists. Aborting!", self.index_name)
return
logger.debug("Creating indexes")
res_create_metadata = self.elasticsearch.indices.create(
index=self.index_name,
mappings=schema_metadata["mappings"],
settings=schema_metadata["settings"],
)
logger.debug("%s: %s", self.index_name, res_create_metadata)
root_groups = self.get_root_groups()
for index, root_group in enumerate(root_groups):
logger.debug(
"------------------------ indexing root group %s (%d of %d groups)",
root_group.full_name,
index,
len(root_groups),
)
log_prefix = f"group {root_group.id} {root_group.name}"
def project_finder(group: Group = root_group) -> List[Project]:
return self.get_all_projects(group.id)
self.index_all_projects(project_finder, log_prefix)
users = self.get_all_gitlab_users()
for index, user in enumerate(users):
logger.debug(
"------------------------ indexing user %s (%d of %d users)",
user.name,
index,
len(users),
)
log_prefix = "user project"
def user_project_finder(project_user: User = user) -> List[Project]:
return self.get_projects_of_user(project_user)
self.index_all_projects(user_project_finder, log_prefix)
[docs]def logger_setup(filepath: str) -> Dict[str, Any]:
"""
Returns a dictionary which can be used to configure a logger.
:param filepath: path of the log file
:return: a dictionary to configure a logger
"""
return {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"standard": {
"format": "%(asctime)s %(levelname)-8s [%(filename)-20s:%(lineno)4d]: %(message)s"
},
},
"handlers": {
"file_handler": {
"level": "INFO",
"filename": filepath,
"class": "logging.handlers.RotatingFileHandler",
"formatter": "standard",
"maxBytes": 524288,
"backupCount": 2,
}
},
"loggers": {
"": {"handlers": ["file_handler"], "level": "INFO", "propagate": True},
},
}
[docs]def read_gitlab_event() -> Optional[Dict[str, Any]]:
"""
Reads the GitLab system hook event from stdin.
:return: The event
"""
event_content = ""
for line in sys.stdin:
event_content = event_content + line
event = json.load(io.StringIO(event_content))
if not isinstance(event, dict):
logger.error("Received event which is not a dictionary.")
return None
return event
[docs]def load_config(
config_type: ConfigType = ConfigType.PRODUCTION,
) -> Tuple[Gitlab, Mail, Elasticsearch]:
"""
Parses the configuration in filehooks/config.ini.
This file only exists when deployed in the GitLab container
and is a copy of one of the files in filehooks/conf/.
Which file is used depends on the configuration when setting up GitLab.
"""
config = _load_config(config_type)
gitlab_config = config["gitlab"]
gitlab_instance = Gitlab(gitlab_config["url"], gitlab_config["token"])
mail = Mail.from_dict(config["mail"])
elasticsearch_config = config["elasticsearch"]
elasticsearch_instance = Elasticsearch(elasticsearch_config["url"])
validation_service = ValidationService(config["validation"]["validation_url"])
return gitlab_instance, mail, elasticsearch_instance, validation_service
def _load_config(config_type):
config_file = "config.ini"
if config_type == ConfigType.PRODUCTION:
config_file = "config.ini"
if config_type == ConfigType.STAGING:
config_file = "conf/staging.ini"
if config_type == ConfigType.LOCAL:
config_file = "conf/localConfig.ini"
if config_type == ConfigType.DEBUG:
config_file = "conf/conf.debug.ini"
if config_type == ConfigType.TEST:
config_file = "conf/conf.test.ini"
config_parser = ConfigParser()
config_parser.read(DIR / config_file)
config = {s: dict(config_parser.items(s)) for s in config_parser.sections()}
return config