Python library for httpx-based SPARQL Query and Update Operations according to the SPARQL 1.2 Protocol.
WARNING: This project is in an early stage of development and should be used with caution.
- Async Interface:
asynciosupport withaquery()andAsyncContextManagerAPI. - Query Response Streaming: Streaming iterators for large result sets available with
query_stream()andaquery_stream() - Synchronous Concurrency Wrapper: Support for concurrent execution of multiple queries from synchronous code with
queries() - RDFLib Integration: Direct conversion to RDFLib SPARQL result representations and support for
rdflib.Graphtargets - Context Managers: Synchronous and asynchronous context managers for lexical resource management
- Client Sharing: Support for sharing and re-using
httpxclients for HTTP connection pooling
sparqlx is a PEP 621-compliant package and available on PyPI.
To run a query against an endpoint, instantiate a SPARQLWrapper object and call its query method:
from sparqlx import SPARQLWrapper
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)
result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")The default response formats are JSON for SELECT and ASK queries and Turtle for CONSTRUCT and DESCRIBE queries.
SPARQLWrapper.query features a response_format parameter that takes
"json","xml","csv","tsv"forSELECTandASKqueries"turtle","xml","ntriples","json-ld"forCONSTRUCTandDESCRIBEqueries- any other string; the supplied value will be passed as MIME Type to the
Acceptheader.
If the convert parameter is set to True, SPARQLWrapper.query returns
- a
listof Python dictionaries with dict-values cast to RDFLib objects forSELECTqueries - a Python
boolforASKqueries - an
rdflib.Graphinstance forCONSTRUCTandDESCRIBEqueries.
Note that only JSON is supported as a response format for convert=True on SELECT and ASK query results.
The return type for calls to SPARQLWrapper.query with convert=True is a union type list[sparqlx.types.SPARQLResultBinding] | bool | rdflib.Graph - the actual runtime return type depends on the query type (SELECT, ASK, CONSTRUCT or DESCRIBE).
Since the query type is not known at static time, type checkers are not able to narrow the union type without explicit annotation.
sparqlx defines typing.overloads for the simple str/sparqlx.types.SPARQLQueryType subclasses
sparqlx.SelectQuery,sparqlx.AskQery,sparqlx.ConstructQueryandsparqlx.DescribeQuery
which can be used to inform static checkers about the type of query and ergo allow static analysis to narrow the return type.
from typing import reveal_type
from sparqlx import ConstructQuery, SPARQLWrapper
sparqlwrapper = SPARQLWrapper(sparql_endpoint=wikidata_sparql_endpoint)
query = "construct {?s ?p ?p} where {?s ?p ?o} limit 10"
result_1 = sparqlwrapper.query(query, convert=True)
result_2 = sparqlwrapper.query(ConstructQuery(query), convert=True)
reveal_type(result_1) # list[_SPARQLBinding] | bool | rdflib.Graph
reveal_type(result_2) # rdflib.GraphNote that fully typing
SPARQLWrapper.queriesis currently not possible since Python does not support variadic type mappings.
By default, SPARQLWrapper creates and manages httpx.Client instances internally.
An httpx.Client can also be supplied by user code; this provides a configuration interface and allows for HTTP connection pooling.
Note that if an
httpx.Clientis supplied toSPARQLWrapper, user code is responsible for managing (closing) the client.
import httpx
from sparqlx import SPARQLWrapper
client = httpx.Client(timeout=10.0)
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql", client=client
)
result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")
print(client.is_closed) # False
client.close()
print(client.is_closed) # TrueIt is also possible to configure SPARQLWrapper-managed clients by passing a dict holding httpx.Client kwargs to the client_config parameter:
from sparqlx import SPARQLWrapper
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
client_config={"timeout": 10.0},
)
result: httpx.Response = sparql_wrapper.query("select * where {?s ?p ?o} limit 10")In that case, SPARQLWrapper will internally create and manage httpx.Client instances (the default behavior if no client is provided), but will instantiate clients based on the supplied client_config kwargs.
SPARQLWrapper.aquery is an asynchronous version of SPARQLWrapper.query.
import asyncio
from sparqlx import SPARQLWrapper
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)
async def run_queries(*queries: str) -> list[httpx.Response]:
return await asyncio.gather(*[sparql_wrapper.aquery(query) for query in queries])
results: list[httpx.Response] = asyncio.run(
run_queries(*["select * where {?s ?p ?o} limit 10" for _ in range(10)])
)For client sharing or configuration of internal client instances, pass an httpx.AsyncClient instance to aclient or kwargs to aclient_config respectively (see SPARQLWrapper.query).
SPARQLWrapper.queries is a synchronous wrapper around asynchronous code and allows to run multiple queries concurrently from synchronous code.
from sparqlx import SPARQLWrapper
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql"
)
results: Iterator[httpx.Response] = sparql_wrapper.queries(
*["select * where {?s ?p ?o} limit 100" for _ in range(10)]
)Note that since SPARQLWrapper.queries runs async code under the hood, httpx client sharing or configuration requires setting aclient or aclient_config in the respective SPARQLWrapper.
Also, SPARQLWrapper.queries creates an event loop and therefore cannot be called from asynchronous code.
If an httpx.AsyncClient is supplied, the client will be closed after the first call to SPARQLWrapper.queries.
User code that wants to run multiple calls to queries can still exert control over the client by using aclient_config. For finer control over concurrent query execution, use the async interface.
HTTP Responses can be streamed using the SPARQLWrapper.query_stream and SPARQLWrapper.aquery_stream Iterators.
from sparqlx import SPARQLWrapper
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
)
stream: Iterator[bytes] = sparql_wrapper.query_stream(
"select * where {?s ?p ?o} limit 10000"
)
astream: AsyncIterator = sparql_wrapper.aquery_stream(
"select * where {?s ?p ?o} limit 10000"
)The streaming method and chunk size (for chunked responses) can be controlled with the streaming_method and chunk_size parameters respectively.
SPARQLWrapper also implements the context manager protocol. This can be useful in two ways:
- Managed Client: Unless an httpx client is passed,
SPARQLWrappercreates and manages clients internally. In that case, the context manager uses a single client per context and enables connection pooling within the context. - Supplied Client: If an httpx client is passed,
SPARQLWrapperwill use that client instance and calling code is responsible for client management. In that case, the context manager will manage the supplied client.
from sparqlx import SPARQLWrapper
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql",
)
with sparql_wrapper as context_wrapper:
result: httpx.Response = context_wrapper.query("select * where {?s ?p ?o} limit 10")import httpx
from sparqlx import SPARQLWrapper
client = httpx.Client()
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://query.wikidata.org/bigdata/namespace/wdq/sparql", client=client
)
with sparql_wrapper as context_wrapper:
result: httpx.Response = context_wrapper.query("select * where {?s ?p ?o} limit 10")
print(client.is_closed) # False
print(client.is_closed) # Truesparqlx supports Update Operations according to the SPARQL 1.2 Protocol.
The following methods implement SPARQL Update:
SPARQLWrapper.updateSPARQLWrapper.aupdateSPARQLWrapper.updates
Given an initially empty Triplestore with SPARQL and SPARQL Update endpoints, one could e.g. insert data like so:
import httpx
from sparqlx import SPARQLWrapper
sparql_wrapper = SPARQLWrapper(
sparql_endpoint="https://triplestore/query",
update_endpoint="https://triplestore/update",
aclient_config = {
"auth": httpx.BasicAuth(username="admin", password="supersecret123")
}
)
with sparql_wrapper as wrapper:
store_empty: bool = not wrapper.query(
"ask where {{?s ?p ?o} union {graph ?g {?s ?p ?o}}}", convert=True
)
assert store_empty, "Expected store to be empty."
wrapper.updates(
"insert data {<urn:s> <urn:p> <urn:o>}",
"insert data {graph <urn:ng1> {<urn:s> <urn:p> <urn:o>}}",
"insert data {graph <urn:ng2> {<urn:s> <urn:p> <urn:o>}}",
)
result = wrapper.query(
"select ?g ?s ?p ?o where { {?s ?p ?o} union { graph ?g {?s ?p ?o} }}",
convert=True,
)This will run the specified update operations asynchronously with an internally managed event loop; the query then returns the following Python conversion:
[
{
"g": rdflib.term.URIRef("urn:ng2"),
"s": rdflib.term.URIRef("urn:s"),
"p": rdflib.term.URIRef("urn:p"),
"o": rdflib.term.URIRef("urn:o"),
},
{
"g": rdflib.term.URIRef("urn:ng1"),
"s": rdflib.term.URIRef("urn:s"),
"p": rdflib.term.URIRef("urn:p"),
"o": rdflib.term.URIRef("urn:o"),
},
{
"g": None,
"s": rdflib.term.URIRef("urn:s"),
"p": rdflib.term.URIRef("urn:p"),
"o": rdflib.term.URIRef("urn:o"),
},
]