Source code for kgsaf_jdex.utils.conversion

#!/usr/bin/env python3

import json
from pathlib import Path

from rdflib import OWL, RDF, RDFS, BNode, Graph, Literal, Namespace
from rdflib.namespace import split_uri
from rdflib.term import URIRef

import kgsaf_jdex.utils.conventions.ids as idc
import kgsaf_jdex.utils.conventions.paths as pc
from kgsaf_jdex.utils.utility import verbose_print
from kgsaf_jdex.utils.conventions.builtins import BUILTIN_URIS


def rdf_list_to_python_list(graph: Graph, head: URIRef, depth: int, verbose: bool = True) -> list:
    """Convert an RDF list (rdf:first/rest/nil chain) into a Python list.


    Args:
        graph (Graph): RDFLib Graph to be parsed
        head (URIRef): List starting node
        depth (int): Recursion depth
        verbose (bool): Log printing. Defaults to True.

    Returns:
        list: Python list from RDF list
    """
    items = []
    while head and head != RDF.nil:
        first = next(graph.objects(head, RDF.first), None)
        verbose_print(f"{"\t"*depth}List Element {first}", verbose)
        if first is not None:
            items.append(bnode_to_dict(graph, first, depth + 1))
        head = next(graph.objects(head, RDF.rest), None)
    return items


def bnode_to_dict(graph: Graph, node: URIRef, depth: int = 1, verbose: bool = True) -> dict:
    """Recursively convert an RDF node (especially blank nodes) into JSON.

    Args:
        graph (Graph): RDFLib Graph to be parsed
        node (URIRef): Starting node
        depth (int, optional): Recursion depth. Defaults to 1.
        verbose (bool): Log printing. Defaults to True.

    Returns:
        dict: Python dict from RDF description
    """

    if isinstance(node, URIRef):
        return str(node)
    if isinstance(node, Literal):
        return str(node)
    if not isinstance(node, BNode):
        return str(node)

    node_dict = {}

    verbose_print(f"{"\t"*depth}Found BNode {node} Starting Recursive Evaluation", verbose)

    for _, p, o in graph.triples((node, None, None)):
        pred = str(p)

        verbose_print(f"{"\t"*depth}Evaluating  - {p} {o}", verbose)

        if pred in {
            str(OWL.unionOf),
            str(OWL.intersectionOf),
            str(OWL.oneOf),
            str(OWL.AllDisjointClasses),
            str(OWL.AllDisjointProperties),
        }:
            verbose_print(
                f"{"\t"*(depth+1)}Found Collection {pred} Starting Recursive Evaluation",
                verbose
            )
            node_dict[pred] = rdf_list_to_python_list(graph, o, depth + 1, verbose)
        else:
            node_dict.setdefault(pred, []).append(bnode_to_dict(graph, o, depth + 1, verbose))

    return node_dict


[docs] class OWLConverter: """Converts a subset of OWL Ontology axioms to JSON Serialization""" def __init__( self, path: str, ): """Initialize the converter with a dataset base path Args: path (str): Dataset location path """ self.p_data = dict() self.base_path = Path(path).resolve().absolute()
[docs] def preprocess( self, taxonomy: bool = True, class_assertions: bool = True, obj_prop_domain_range: bool = True, obj_prop_hierarchy: bool = True, verbose: bool = True ): """Preprocess a subset of the dataset schema into Python data structure Args: taxonomy (bool, optional): Load and convert taxonomy axioms. Defaults to True. class_assertions (bool, optional): Load and convert class assertions axioms. Defaults to True. obj_prop_domain_range (bool, optional): Load and convert object propoerty domain and range. Defaults to True. obj_prop_hierarchy (bool, optional): Load and convert object property hierarchy. Defaults to True. verbose (bool): Log printing. Defaults to True. """ print(f"Processing Dataset at {self.base_path}") if taxonomy: print("Processing Taxonomy") self.p_data["taxonomy"] = ( self.preprocess_taxonomy(verbose), self.base_path / pc.TAXONOMY, ) if class_assertions: print("Processing Class Assertions") self.p_data["class_assertions"] = ( self.preprocess_class_assertions(verbose), self.base_path / pc.CLASS_ASSERTIONS, ) if obj_prop_hierarchy: print("Processing Object Property Hierarchy") self.p_data["obj_prop_hierarchy"] = ( self.preprocess_obj_prop_hierarchy(verbose), self.base_path / pc.OBJ_PROP_HIERARCHY, ) if obj_prop_domain_range: print("Processing Object Property Domain and Range") self.p_data["obj_prop_domain_range"] = ( self.preprocess_obj_prop_domain_range(verbose), self.base_path / pc.OBJ_PROP_DOMAIN_RANGE, )
[docs] def serialize(self): """Serialize loaded and converted data into JSON format""" for values in self.p_data.values(): obj = values[0] path = values[1] with open(path, "w") as f: json.dump(obj, f, indent=4)
[docs] def preprocess_taxonomy(self, verbose: bool) -> dict: """Process taxonomy data, the out dictionary will be formatted as: ``` uri_class : ['uri_sup_class_1',..., 'uri_sup_class_n'] ``` If complex classes are found (restrictions or lists). These will be kept and recusively added as a Python dictionary Args: verbose (bool): Log printing. Returns: dict: Dictionary with list of classes and theri super classes """ onto = Graph() onto.parse(self.base_path / pc.RDF_TAXONOMY) classes = set(onto.subjects(RDF.type, OWL.Class)) out_json = {} for c in classes: verbose_print(f"Processing main class {c}", verbose) sup_c = [] for o in set(onto.objects(c, RDFS.subClassOf)) - BUILTIN_URIS: sup_c.append(bnode_to_dict(onto, o, verbose=verbose)) if sup_c: out_json[c] = sup_c return out_json
[docs] def preprocess_class_assertions(self, verbose: bool) -> dict: """Process class assertions data, the out dictionary will be formatted as: ``` uri_individuals : ['uri_class_1',...,'uri_class_n'] ``` Args: verbose (bool): Log printing. Returns: dict: Dictionary with list of individuals and their types """ onto = Graph() onto.parse(self.base_path / pc.RDF_CLASS_ASSERTIONS) individuals = set(onto.subjects(RDF.type, OWL.NamedIndividual)) out_json = {} for ind in individuals: ind_cls = [] for cls in set(onto.objects(ind, RDF.type)) - BUILTIN_URIS: if cls != OWL.NamedIndividual: ind_cls.append(cls) if ind_cls: out_json[ind] = ind_cls return out_json
[docs] def preprocess_obj_prop_domain_range(self, verbose: bool) -> dict: """Process object properties domain and range, the out dictionary will be formatted as: ``` uri_obj_prop : { domain : ['uri_c_1', ..., 'uri_c_n'] range : ['uri_c_1', ..., 'uri_c_m'] } ``` If complex classes are found (restrictions or lists). These will be kept and recusively added as a Python dictionary Args: verbose (bool): Log printing. Returns: dict: Dictionary with list of object properties and domain and range classes """ onto = Graph() onto.parse(self.base_path / pc.RDF_OBJ_PROP) obj_props = set(onto.subjects(RDF.type, OWL.ObjectProperty)) out_json = {} for prop in obj_props: prop_data = {} # Get domains domains = list(onto.objects(prop, RDFS.domain)) prop_data["domain"] = ( [bnode_to_dict(onto, d, verbose=verbose) for d in domains] if domains else [OWL.Thing] ) # Get ranges ranges = list(onto.objects(prop, RDFS.range)) prop_data["range"] = ( [bnode_to_dict(onto, r, verbose=verbose) for r in ranges] if ranges else [OWL.Thing] ) out_json[str(prop)] = prop_data return out_json
[docs] def preprocess_obj_prop_hierarchy(self, verbose:bool) -> dict: """Process object properties hierarchy, the out dictionary will be formatted as: ``` uri_obj_prop : ['sup_uri_obj_prop_1',...,'sup_uri_obj_prop_1'] ``` If complex classes are found (restrictions or lists). These will be kept and recusively added as a Python dictionary Args: verbose (bool): Log printing. Returns: dict: Dictionary with list of object properties and their hierarchy """ onto = Graph() onto.parse(self.base_path / pc.RDF_OBJ_PROP) out_json = {} for r in onto.subjects(RDF.type, OWL.ObjectProperty): val = [] for sup_r in set(onto.objects(r, RDFS.subPropertyOf)) - BUILTIN_URIS: val.append(bnode_to_dict(onto, sup_r, verbose=verbose)) if val: out_json[r] = val return out_json
[docs] class TSVConverter: """Converts RDF triple files into TSV format.""" def __init__( self, path: str, ): """ Initialize the TSV converter. Args: path (str): Dataset base directory. """ self.p_data = dict() self.base_path = Path(path).resolve().absolute()
[docs] def convert( self, triples: bool = True, splits: bool = True, ): """ Convert RDF triple files into TSV files. Prepares TSV representations for serialization. Args: triples (bool, optional): Convert full ABox triples. Defaults to True. splits (bool, optional): Convert train/valid/test splits. Defaults to True. """ if triples: self.p_data["triples"] = ( self.preprocess_triples(self.base_path / "abox/obj_prop_assertions.nt"), self.base_path / "abox/obj_prop_assertions.tsv", ) if splits: self.p_data["train"] = ( self.preprocess_triples(self.base_path / pc.RDF_TRAIN), self.base_path / pc.TRAIN, ) self.p_data["test"] = ( self.preprocess_triples(self.base_path / pc.RDF_TEST), self.base_path / pc.TEST, ) self.p_data["valid"] = ( self.preprocess_triples(self.base_path / pc.RDF_VALID), self.base_path / pc.VALID, )
[docs] def serialize(self): """ Write converted TSV data to disk. """ for key, values in self.p_data.items(): obj = values[0] path = values[1] with open(path, "w") as f: if key in ["triples", "train", "valid", "test"]: f.write(obj)
[docs] def preprocess_triples(self, path): """ Convert an RDF triple file into a TSV string. Args: path (Path): Path to an RDF triple file. Returns: str: TSV-formatted string of triples (s, p, o). """ triples = Graph() triples.parse(path) out_str = "" for s, p, o in triples: out_str += f"{str(s)}\t{str(p)}\t{str(o)}\n" return out_str
[docs] class IDMapper: """Maps ontology URIs to integer identifiers.""" def __init__( self, path: str, ): """Initialize the mapper with a dataset base path Args: path (str): Dataset location path """ self.p_data = dict() self.base_path = Path(path).resolve().absolute() self.onto = Graph() self.onto.parse(self.base_path / pc.ONTOLOGY) self.ind_onto = Graph() self.ind_onto.parse(self.base_path / pc.INDIVIDUALS) self.out_data = dict()
[docs] def map_to_id(self): """ Assign unique integer IDs to ontology elements. IDs are assigned deterministically after sorting URIs. Generates mappings for: - Classes - Object properties - Individuals """ classes = set(self.onto.subjects(RDF.type, OWL.Class)) - BUILTIN_URIS classes = {c for c in classes if not isinstance(c, BNode)} properties = set(self.onto.subjects(RDF.type, OWL.ObjectProperty)) - BUILTIN_URIS individuals = set(self.ind_onto.subjects(RDF.type, OWL.NamedIndividual)) - BUILTIN_URIS classes = list(classes) properties = list(properties) individuals = list(individuals) classes.sort() properties.sort() individuals.sort() print("Classes", len(classes)) print("Properties", len(properties)) print("Individuals", len(individuals)) self.out_data["c"] = ({str(c):i for i,c in enumerate(classes)}, self.base_path / pc.CLASS_MAPPINGS) self.out_data["i"] = ({str(c):i for i,c in enumerate(individuals)}, self.base_path / pc.INDIVIDUAL_MAPPINGS ) self.out_data["p"] = ({str(c):i for i,c in enumerate(properties)}, self.base_path / pc.OBJ_PROP_MAPPINGS)
[docs] def serialize(self): """ Write generated ID mappings to JSON files. Writes class, individual, and property mappings to disk. """ (self.base_path / pc.MAPPINGS).mkdir(exist_ok=True, parents=True) for _, data in self.out_data.items(): mapping = data[0] path = data[1] with open(path, "w") as f: json.dump(mapping, f, indent=4)