Source code for cls_luigi.cls_tasks

# -*- coding: utf-8 -*-
#
# Apache Software License 2.0
#
# Copyright (c) 2022-2023, Jan Bessai, Anne Meyer, Hadi Kutabi, Daniel Scholtyssek
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This file contains the CLSBaseTask implementation, which is used to extend the actual implementation of
luigi.Task, luigi.WrapperTask and luigi.ExternalTask with helper methods to make the usage of our
framework a bit easier.
"""

from cls_luigi.inhabitation_task import LuigiCombinator
from cls_luigi import INVISIBLE_PATH, HASH_PATH, RESULTS_PATH
import hashlib
import luigi
from pathlib import Path
from os.path import join, exists
from os import makedirs


[docs]class ClsBaseTask(): """ This class is used to implement some helper methods that all tasks in a CLS-Luigi pipeline should have. The key methods are: * :py:meth:`get_variant_filename` - This method creates a unique filename for (intermediate) results, which remains unique across the executed pipelines. For this purpose a hash value is created and persisted to disk, which can be looked up afterwards to trace the executed tasks back to this file. """
[docs] initialized = False
def __init__(self) -> None: if not ClsBaseTask.initialized: if not exists(INVISIBLE_PATH): makedirs(INVISIBLE_PATH) if not exists(HASH_PATH): makedirs(HASH_PATH) if not exists(RESULTS_PATH): makedirs(RESULTS_PATH) ClsBaseTask.initialized = True self.result_path = RESULTS_PATH self.hash_path = HASH_PATH
[docs] def __get_flatten_data(self, data : luigi.target.FileSystemTarget|tuple|dict) -> list: """ Helper method to flatten different possible input data types into a flattened list. :param data: the data structure that should be flattened to a list. :type data: luigi.target.FileSystemTarget | tuple | dict :return: a flattened list of the data provided. :rtype: list """ flattened_data = [] if isinstance(data, luigi.target.FileSystemTarget): flattened_data.append(data) elif isinstance(data, (list, tuple)): for item in data: flattened_data.extend(self.__get_flatten_data(item)) elif isinstance(data, dict): for _, value in data.items(): flattened_data.extend(self.__get_flatten_data(value)) else: # error case ? just throw away? pass return flattened_data
[docs] def get_variant_filename(self, name="") -> str: """ Returns a variant filename based on the provided name. Also does hashing of the name, since is has to be unique over every pipeline. :param name: optional name for the variant, defaults to "" :type name: str, optional :return: the variant filename. :rtype: str """ if name == "": name = self.__class__.__name__ + "_result" #makedirs(dirname(self.hash_path), exist_ok=True) hash_value = hashlib.md5() label = "" if isinstance(self.input(), luigi.target.FileSystemTarget): input_file = Path(self.input().path) label = self.__helper_variant_label(input_file) elif isinstance(self.input(), (list, tuple, dict)): flattened_data = self.__get_flatten_data(self.input()) var_label_name = [] for item in flattened_data: input_file = Path(item.path) var_label_name.append(self.__helper_variant_label(input_file)) label = "(" + (", ".join(var_label_name)) + ")" + " --> " + \ self.__class__.__name__ if len( ", ".join(var_label_name)) > 0 else self.__class__.__name__ else: label = self.__class__.__name__ label = "(" + label + "_" + self.__class__.__name__ + "-" + name + ")" hash_value.update(label.encode()) path = Path(join(self.hash_path, hash_value.hexdigest())) if not path.is_file(): with path.open(mode='w+') as hash_file: hash_file.write(label) return self.__class__.__name__ + "_" + "#" + hash_value.hexdigest() + "#" + "_" + name if label else self.__class__.__name__ + "_" + name
[docs] def __helper_variant_label(self, input_file): input_filename = input_file.name try: _, lookup_hash, _ = input_filename.split("#", maxsplit=2) if len(lookup_hash) == 32: hash_file = Path(join(self.hash_path, lookup_hash)) if hash_file.is_file(): with hash_file.open(mode='r') as f: replacement_of_hash = f.read() # label = "(" + replacement_of_hash + ")" + " --> " + self.__class__.__name__ if len( # input_filename) > 0 else self.__class__.__name__ label = replacement_of_hash return label else: raise ValueError else: raise ValueError except ValueError: label = input_filename return label
[docs] def create_result_file(self, file_name): return luigi.LocalTarget(join(self.result_path, self.get_variant_filename(file_name)))
[docs]class ClsTask(luigi.Task, LuigiCombinator, ClsBaseTask): """ Abstract class representing a CLS-Luigi task. It combines the functionality of `luigi.Task`, `LuigiCombinator`, and `ClsBaseTask`. """
[docs] abstract = True
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) ClsBaseTask.__init__(self)
[docs]class ClsWrapperTask(luigi.WrapperTask, LuigiCombinator, ClsBaseTask): """ Abstract class representing a CLS-Luigi wrapper task. It combines the functionality of `luigi.WrapperTask`, `LuigiCombinator`, and `ClsBaseTask`. """
[docs] abstract = True
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) ClsBaseTask.__init__(self)
[docs]class ClsExternalTask(luigi.ExternalTask, LuigiCombinator, ClsBaseTask): """ Abstract class representing a CLS-Luigi external task. It combines the functionality of `luigi.ExternalTask`, `LuigiCombinator`, and `ClsBaseTask`. """
[docs] abstract = True
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) ClsBaseTask.__init__(self)