Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source repository #3

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions vocab_tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
"owl": "http://www.w3.org/2002/07/owl#",
"skos": "http://www.w3.org/2004/02/skos/core#",
"obo": "http://purl.obolibrary.org/obo/",
"dcterm": "http://purl.org/dc/terms/",
"schema": "http://schema.org/",
"geosciml": "http://resource.geosciml.org/classifier/cgi/lithology",
}

Expand All @@ -38,6 +40,9 @@ def rdfT(term):
def rdfsT(term):
return rdflib.URIRef(f"{NS['rdfs']}{term}")

def dctermT(term):
return rdflib.URIRef(f"{NS['rdfs']}{term}")


def find_concept_in_concept_list(
uri: str, concept_list: typing.List["VocabularyConcept"]
Expand All @@ -58,11 +63,12 @@ class VocabularyConcept:
narrower: typing.List[str]
vocabulary: str
history: typing.List[str] = dataclasses.field(default_factory=list)
sources: typing.List[str] = dataclasses.field(default_factory=list)
notes: typing.List[str] = dataclasses.field(default_factory=list)
scopenote: typing.List[str] = dataclasses.field(default_factory=list)
#scopenote: typing.List[str] = dataclasses.field(default_factory=list)
related: typing.List[str] = dataclasses.field(default_factory=list)
example: typing.List[str] = dataclasses.field(default_factory=list)
changenote: typing.List[str] = dataclasses.field(default_factory=list)
#changenote: typing.List[str] = dataclasses.field(default_factory=list)

def get_label(self):
tag = self.name
Expand Down Expand Up @@ -138,6 +144,7 @@ class Vocabulary:
description: str
extends: typing.Optional[str] = None
history: typing.List[str] = dataclasses.field(default_factory=list)
sourceRepository: typing.Optional[str] = None


class VocabularyStore:
Expand Down Expand Up @@ -313,7 +320,7 @@ def bind(self, prefix: str, uri: str, override: bool = True):
self._g.namespace_manager.bind(prefix, uri, override=override)

def query(self, q, **bindings):
L.debug(q)
# L.debug(f"query: {q}")
sparql = rdflib.plugins.sparql.prepareQuery(VocabularyStore._PFX + q)
return self._g.query(sparql, initBindings=bindings)

Expand All @@ -322,12 +329,13 @@ def vocabulary(self, uri: str) -> Vocabulary:

Raises KeyError if the vocabulary is not in the graph.
"""
q = """SELECT ?vocabulary ?label ?definition ?extends ?history
WHERE {
q = """SELECT ?vocabulary ?label ?definition ?extends ?repository
WHERE {
?vocabulary rdf:type skos:ConceptScheme .
?vocabulary skos:prefLabel ?label .
OPTIONAL {?vocabulary skos:definition ?definition .} .
OPTIONAL {?vocabulary skos:inScheme ?extends .} .
OPTIONAL {?vocabulary schema:codeRepository ?repository .} .
}"""
qh = """SELECT ?history
WHERE {
Expand All @@ -336,13 +344,16 @@ def vocabulary(self, uri: str) -> Vocabulary:
qres = self.query(q, vocabulary=rdflib.URIRef(uri))
for res in qres:
_ext = res[3]
_repo = res[4]
L.debug(f"sourceRepository: {_repo}")
qhres = self.query(qh, vocabulary=res[0])
_hist = [h[0] for h in qhres]
return Vocabulary(
uri=str(res[0]),
label=str(res[1]),
description=str(res[2]),
extends=str(_ext) if _ext is not None else None,
sourceRepository=str(_repo) if _repo is not None else None,
history=_hist,
)
raise KeyError(f"Vocabulary '{uri}' not found.")
Expand Down Expand Up @@ -388,12 +399,16 @@ def concept(self, term: str):
name = ab[-1]
labels = self.objects(term, skosT("prefLabel"))
labels += self.objects(term, skosT("altLabel"))
labels += self.objects(term, rdfT("label"))
#labels += self.objects(term, rdfsT("label")) # these are by convention the same as skos:prefLabel
tmp = self.objects(term, skosT("definition"))
definition = "\n".join(tmp)
broader = self.objects(term, skosT("broader"))
narrower = self.narrower(term)
notes = self.objects(term, skosT("note"))
notes += self.objects(term, skosT("editorialNote"))
notes += self.objects(term, skosT("scopeNote"))
notes += self.objects(term, skosT("changeNote"))
notes += self.objects(term, rdfsT("comment"))
vocabulary = self.objects(term, skosT("inScheme"))
if len(vocabulary) > 0:
vocabulary = vocabulary[0]
Expand All @@ -405,6 +420,7 @@ def concept(self, term: str):
else:
vocabulary = None
history = self.objects(term, skosT("historyNote"))
sources = self.objects(term, dctermT("source"))
return VocabularyConcept(
uri=str(term),
name=name,
Expand All @@ -415,29 +431,45 @@ def concept(self, term: str):
vocabulary=vocabulary,
history=history,
notes=notes,
scopenote=self.objects(term, skosT("changeNote")),
sources=sources,
#scopenote=self.objects(term, skosT("scopeNote")),
related=self.objects(term, skosT("related")),
example=self.objects(term, skosT("example")),
changenote=self.objects(term, skosT("changeNote")),
#changenote=self.objects(term, skosT("changeNote")),
)

def top_concept(self) -> VocabularyConcept:
"""Get the root concept of the specified vocabulary.

This is the concept that is skos:topConceptOf and has no skos:broader
def top_concept(self):
"""Get the root concept(s) in the specified vocabulary.
-> typing.List["VocabularyConcept"]
This is the concept that is skos:topConceptOf the vocabulary.
The top concept in an extension vocabulary is a concept from the parent
vocabulary, and likely has skos:broader concepts in that parent vocabulary
"""
q = """SELECT DISTINCT ?subject
WHERE {
?subject rdf:type skos:Concept .
?subject skos:topConceptOf ?vocabulary .
?subject ?predicate ?foo .
FILTER(?predicate != skos:broader) .
}"""
# remove FILTER(?predicate != skos:broader) .

qres = self.query(q)
uri = self._one_res(qres)
#L.debug(f"top concept uri: {uri}")
L.debug(f"number of top concepts: {len(uri)}")
if len(uri) < 1:
raise ValueError("No topConcept found")
return self.concept(uri[0])

conceptList = []
for acon in uri:
L.debug(f"self.concept(acon) uri: {self.concept(acon).uri}")
conceptList.append(self.concept(acon))

L.debug(f"len(conceptList): {len(conceptList)}")
# return self.concept(uri[0])
# modify to account for vocabs with >1 top concept.
#return [self.concept(arow) for arow in uri]
return conceptList

def concepts(
self, v: typing.Optional[str] = None, abbreviate: bool = False
Expand Down
4 changes: 2 additions & 2 deletions vocab_tools/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def getDefaultVocabulary(vs:vocab_tools.VocabularyStore, abbreviate:bool=False)
@click.group()
def main():
logging.basicConfig(
level="INFO", format=FORMAT, datefmt="[%X]"
level="DEBUG", format=FORMAT, datefmt="[%X]"
)

@main.command()
Expand Down Expand Up @@ -132,7 +132,7 @@ def _convert_to_ui_format(entry: dict) -> dict:
dataset.load(source)
vocabularies = dataset.vocabularies()
base_vocabulary = dataset.base_vocabulary()
top_concept = dataset.top_concept()
top_concept = dataset.top_concept() # note this returns a list, or throws error
concept, vocabulary = dataset.getVocabRoot(None)
L.info("Using vocabulary %s", base_vocabulary)
L.debug("Using %s as root concept", top_concept)
Expand Down
63 changes: 44 additions & 19 deletions vocab_tools/tomarkdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ def describe_concept(
f"{'#' * level} {concept.get_label()}",
"[]{#" + concept.md_link_label() + "}",
"",
f"The concept `{concept.get_label()}` <br/> ",
f"with URI `{concept.uri}` <br/> ",
f"is defined in vocabulary `{concept.vocabulary}`",
#f"The concept `{concept.get_label()}` <br/> ",
f"URI `{concept.uri}` <br/> ",
f"defined in vocabulary `{concept.vocabulary}`",
"",
]
if is_top_concept:
res.append(f"This is the top concept of the vocabulary.")
res.append(f"This is a top concept of the vocabulary.")
else:
res.append(f"Path from the top concept: <br/>")
res.append(f"Path from the top concept: ")
path = []
for uri, _ in store.walk_broader(concept.uri):
path.append(uri)
Expand All @@ -63,7 +63,7 @@ def describe_concept(
res.append(f"{'` -> `'.join(labels)}")
res += ("", "")
if len(concept.narrower) > 0:
res.append("Immediately narrower concepts:\n")
res.append("Immediately narrower concepts: ")
narrowers = []
for c in concept.narrower:
n = vocab_tools.find_concept_in_concept_list(c, concept_list)
Expand All @@ -72,39 +72,49 @@ def describe_concept(
res.append(", ".join([n.md_link(fixed_width=True) for n in narrowers]))
res += (
"",
"**Definition:**",
"",
"**Definition: **",
concept.definition.replace("\n", " <br/> "),
"",
)
if len(concept.notes) > 1:
res += (
"**Notes:**",
"",
"**Notes: **",
"\n\n".join([n.replace("\n", " <br/> ") for n in concept.notes]),
"",
)
if len(concept.label) > 1:
res += (
"**Alternate labels:**",
"",
"**Alternate labels: **",
", ".join([f"`{lb}`" for lb in concept.label[1:]]),
"",
)
if len(concept.history) > 0:
res += (
"**History:**",
"",
"**History: **",
f" <br/> ".join(concept.history),
"",
)
if len(concept.sources) > 0:
res += (
"**Sources: **",
f" <br/> ".join(concept.sources),
"",
)
if len(concept.example) > 0:
res += (
"**Example: **",
f" <br/> ".join(concept.example),
"",
)
return res


def describe_vocabulary(
store: vocab_tools.VocabularyStore, vocab_uri: str
) -> list[str]:
V: vocab_tools.Vocabulary = store.vocabulary(vocab_uri)
# res will be a list of strings for each line in the vocabulary
# description, returned as output from this function.
res = []
title = V.label
# Markdown frontmatter
Expand All @@ -129,15 +139,24 @@ def describe_vocabulary(
for uri, depth in store.walk_vocab_tree(vocab_uri):
voc = store.vocabulary(uri)
res.append(f"{' '*depth}- `{voc.label}` [`{voc.uri}`]({voc.uri})")

# display any history notes recording origin and updates to vocabulary
res += (
"",
"**History:**",
"",
" <br /> ".join(V.history),
"",
)
for history in store._get_objects(vocab_uri, vocab_tools.skosT("historyNote")):
res.append(f"* {history}")
# this seems redundant, comment out
# for history in store._get_objects(vocab_uri, vocab_tools.skosT("historyNote")):
# res.append(f"* {history}")

if V.sourceRepository is not None:
res.append(f"**Source Repository:** {V.sourceRepository}<br />")


# display the hierarchy of concepts in this vocabulary
res += (
"",
"**Concept Hierarchy:**",
Expand All @@ -149,7 +168,10 @@ def describe_vocabulary(
all_concepts = [store.concept(uri) for uri in concept_uris]
top_concepts = []
try:
top_concepts = [store.top_concept(), ]
#top_concepts = [store.top_concept(), ]
top_concepts = store.top_concept()

L.debug(f"count Top concepts: {len(top_concepts)}")
except ValueError as e:
L.warning("No top level concept found.")
# Since there's no top concept available, find the concepts
Expand All @@ -166,6 +188,8 @@ def describe_vocabulary(
if len(broaders) < 3:
top_concepts.append(concept)
for top_concept in top_concepts:

L.debug(f"Top concept.uri: {top_concept.uri}")
for concept, level in concept_tree(top_concept.uri, all_concepts, level=depth):
label = f"{' '*level}- [{concept.get_label()}](#{concept.md_link_label()})"
res.append(label)
Expand All @@ -174,10 +198,11 @@ def describe_vocabulary(
res += describe_concept(store, top_concept, level=2, is_top_concept=True, concept_list=all_concepts)
#res += top_concept.markdown(level=2, concept_list=all_concepts)
res.append("")
for top_concept in top_concepts:
#for top_concept in top_concepts:
for uri, level in store.walk_narrower(top_concept.uri, level=3):
L.debug(f"walk narrower, uri: {uri}, level: {level}")
concept = vocab_tools.find_concept_in_concept_list(uri, all_concepts)
#res += concept.markdown(level=level, concept_list=all_concepts)
res += describe_concept(store, concept, level=2, concept_list=all_concepts)
res += describe_concept(store, concept, level=level, concept_list=all_concepts)
res.append("")
return res