Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ Supported `DB API paramstyle`_ is only ``PyFormat``.
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s
""", {"param": "a string"})
""",
{"param": "a string"})
print(cursor.fetchall())

if ``%`` character is contained in your query, it must be escaped with ``%%`` like the following:
Expand All @@ -54,6 +55,43 @@ if ``%`` character is contained in your query, it must be escaped with ``%%`` li
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s OR col_string LIKE 'a%%'

Use parameterized queries
~~~~~~~~~~~~~~~~~~~~~~~~~

If you want to use Athena's parameterized queries, you can do so by changing the ``paramstyle`` to ``qmark`` as follows.

.. code:: python

from pyathena import connect

pyathena.paramstyle = "qmark"
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = ?
""",
["'a string'"])
print(cursor.fetchall())

You can also specify the ``paramstyle`` using the execute method when executing a query.

.. code:: python

from pyathena import connect

cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = ?
""",
["'a string'"],
paramstyle="qmark")
print(cursor.fetchall())

You can find more information about the `considerations and limitations of parameterized queries`_ in the official documentation.

Quickly re-run queries
----------------------

Expand Down Expand Up @@ -270,3 +308,4 @@ No need to specify credential information.
.. _`reuse the results of previous queries`: https://docs.aws.amazon.com/athena/latest/ug/reusing-query-results.html
.. _`Boto3 environment variables`: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#using-environment-variables
.. _`Boto3 credentials`: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html
.. _`considerations and limitations of parameterized queries`: https://docs.aws.amazon.com/athena/latest/ug/querying-with-prepared-statements.html
6 changes: 2 additions & 4 deletions pyathena/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,13 @@ def __hash__(self):


@overload
def connect(*args, cursor_class: None = ..., **kwargs) -> "Connection[Cursor]":
...
def connect(*args, cursor_class: None = ..., **kwargs) -> "Connection[Cursor]": ...


@overload
def connect(
*args, cursor_class: Type[ConnectionCursor], **kwargs
) -> "Connection[ConnectionCursor]":
...
) -> "Connection[ConnectionCursor]": ...


def connect(*args, **kwargs) -> "Connection[Any]":
Expand Down
6 changes: 4 additions & 2 deletions pyathena/arrow/async_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
from concurrent.futures import Future
from multiprocessing import cpu_count
from typing import Any, Dict, Optional, Tuple, Union, cast
from typing import Any, Dict, List, Optional, Tuple, Union, cast

from pyathena import ProgrammingError
from pyathena.arrow.converter import (
Expand Down Expand Up @@ -96,13 +96,14 @@ def _collect_result_set(
def execute(
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: Optional[int] = 0,
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
**kwargs,
) -> Tuple[str, "Future[Union[AthenaArrowResultSet, Any]]"]:
if self._unload:
Expand All @@ -125,6 +126,7 @@ def execute(
cache_expiration_time=cache_expiration_time,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
paramstyle=paramstyle,
)
return (
query_id,
Expand Down
9 changes: 7 additions & 2 deletions pyathena/arrow/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,14 @@ def close(self) -> None:
def execute(
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: Optional[int] = 0,
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
**kwargs,
) -> ArrowCursor:
self._reset_state()
Expand All @@ -129,6 +130,7 @@ def execute(
cache_expiration_time=cache_expiration_time,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
paramstyle=paramstyle,
)
query_execution = cast(AthenaQueryExecution, self._poll(self.query_id))
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
Expand All @@ -147,7 +149,10 @@ def execute(
return self

def executemany(
self, operation: str, seq_of_parameters: List[Optional[Dict[str, Any]]], **kwargs
self,
operation: str,
seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
**kwargs,
) -> None:
for parameters in seq_of_parameters:
self.execute(operation, parameters, **kwargs)
Expand Down
7 changes: 6 additions & 1 deletion pyathena/async_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ def execute(
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
**kwargs,
) -> Tuple[str, "Future[Union[AthenaResultSet, Any]]"]:
query_id = self._execute(
Expand All @@ -115,11 +116,15 @@ def execute(
cache_expiration_time=cache_expiration_time,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
paramstyle=paramstyle,
)
return query_id, self._executor.submit(self._collect_result_set, query_id)

def executemany(
self, operation: str, seq_of_parameters: List[Optional[Dict[str, Any]]], **kwargs
self,
operation: str,
seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
**kwargs,
) -> None:
raise NotSupportedError

Expand Down
22 changes: 18 additions & 4 deletions pyathena/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast

import pyathena
from pyathena.converter import Converter, DefaultTypeConverter
from pyathena.error import DatabaseError, OperationalError, ProgrammingError
from pyathena.formatter import Formatter
Expand Down Expand Up @@ -144,6 +145,7 @@ def _build_start_query_execution_request(
s3_staging_dir: Optional[str] = None,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
execution_parameters: Optional[List[str]] = None,
) -> Dict[str, Any]:
request: Dict[str, Any] = {
"QueryString": query,
Expand Down Expand Up @@ -177,6 +179,8 @@ def _build_start_query_execution_request(
else self._result_reuse_minutes,
}
request["ResultReuseConfiguration"] = {"ResultReuseByAgeConfiguration": reuse_conf}
if execution_parameters:
request["ExecutionParameters"] = execution_parameters
return request

def _build_start_calculation_execution_request(
Expand Down Expand Up @@ -546,15 +550,21 @@ def _find_previous_query_id(
def _execute(
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: Optional[int] = 0,
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
) -> str:
query = self._formatter.format(operation, parameters)
if pyathena.paramstyle == "qmark" or paramstyle == "qmark":
query = operation
execution_parameters = cast(Optional[List[str]], parameters)
else:
query = self._formatter.format(operation, cast(Optional[Dict[str, Any]], parameters))
execution_parameters = None
_logger.debug(query)

request = self._build_start_query_execution_request(
Expand All @@ -563,6 +573,7 @@ def _execute(
s3_staging_dir=s3_staging_dir,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
execution_parameters=execution_parameters,
)
query_id = self._find_previous_query_id(
query,
Expand Down Expand Up @@ -612,14 +623,17 @@ def _calculate(
def execute(
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
**kwargs,
):
raise NotImplementedError # pragma: no cover

@abstractmethod
def executemany(
self, operation: str, seq_of_parameters: List[Optional[Dict[str, Any]]], **kwargs
self,
operation: str,
seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
**kwargs,
) -> None:
raise NotImplementedError # pragma: no cover

Expand Down
12 changes: 4 additions & 8 deletions pyathena/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ def __init__(
result_reuse_enable: bool = ...,
result_reuse_minutes: int = ...,
**kwargs,
) -> None:
...
) -> None: ...

@overload
def __init__(
Expand Down Expand Up @@ -121,8 +120,7 @@ def __init__(
result_reuse_enable: bool = ...,
result_reuse_minutes: int = ...,
**kwargs,
) -> None:
...
) -> None: ...

def __init__(
self,
Expand Down Expand Up @@ -329,12 +327,10 @@ def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

@overload
def cursor(self, cursor: None = ..., **kwargs) -> ConnectionCursor:
...
def cursor(self, cursor: None = ..., **kwargs) -> ConnectionCursor: ...

@overload
def cursor(self, cursor: Type[FunctionalCursor], **kwargs) -> FunctionalCursor:
...
def cursor(self, cursor: Type[FunctionalCursor], **kwargs) -> FunctionalCursor: ...

def cursor(
self, cursor: Optional[Type[FunctionalCursor]] = None, **kwargs
Expand Down
9 changes: 7 additions & 2 deletions pyathena/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,14 @@ def close(self) -> None:
def execute(
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: int = 0,
cache_expiration_time: int = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
**kwargs,
) -> Cursor:
self._reset_state()
Expand All @@ -94,6 +95,7 @@ def execute(
cache_expiration_time=cache_expiration_time,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
paramstyle=paramstyle,
)
query_execution = cast(AthenaQueryExecution, self._poll(self.query_id))
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
Expand All @@ -109,7 +111,10 @@ def execute(
return self

def executemany(
self, operation: str, seq_of_parameters: List[Optional[Dict[str, Any]]], **kwargs
self,
operation: str,
seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
**kwargs,
) -> None:
for parameters in seq_of_parameters:
self.execute(operation, parameters, **kwargs)
Expand Down
4 changes: 3 additions & 1 deletion pyathena/filesystem/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,9 @@ def touch(self, path: str, truncate: bool = True, **kwargs) -> Dict[str, Any]:
self.invalidate_cache(path)
return object_.to_dict()

def cp_file(self, path1: str, path2: str, recursive=False, maxdepth=None, on_error=None, **kwargs):
def cp_file(
self, path1: str, path2: str, recursive=False, maxdepth=None, on_error=None, **kwargs
):
# TODO: Delete the value that seems to be a typo, onerror=false.
# https://github.com/fsspec/filesystem_spec/commit/346a589fef9308550ffa3d0d510f2db67281bb05
# https://github.com/fsspec/filesystem_spec/blob/2024.10.0/fsspec/spec.py#L1185
Expand Down
6 changes: 3 additions & 3 deletions pyathena/filesystem/s3_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ def __init__(
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html#API_HeadObject_ResponseSyntax
# Amazon S3 returns this header for all objects except for
# S3 Standard storage class objects.
filtered[
_API_FIELD_TO_S3_OBJECT_PROPERTY["StorageClass"]
] = S3StorageClass.S3_STORAGE_CLASS_STANDARD
filtered[_API_FIELD_TO_S3_OBJECT_PROPERTY["StorageClass"]] = (
S3StorageClass.S3_STORAGE_CLASS_STANDARD
)
super().update(filtered)
if "Size" in init:
self.content_length = init["Size"]
Expand Down
4 changes: 3 additions & 1 deletion pyathena/pandas/async_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ def _collect_result_set(
def execute(
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: Optional[int] = 0,
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
keep_default_na: bool = False,
na_values: Optional[Iterable[str]] = ("",),
quoting: int = 1,
Expand All @@ -140,6 +141,7 @@ def execute(
cache_expiration_time=cache_expiration_time,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
paramstyle=paramstyle,
)
return (
query_id,
Expand Down
4 changes: 3 additions & 1 deletion pyathena/pandas/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,14 @@ def close(self) -> None:
def execute(
self,
operation: str,
parameters: Optional[Dict[str, Any]] = None,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: Optional[int] = 0,
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
keep_default_na: bool = False,
na_values: Optional[Iterable[str]] = ("",),
quoting: int = 1,
Expand All @@ -154,6 +155,7 @@ def execute(
cache_expiration_time=cache_expiration_time,
result_reuse_enable=result_reuse_enable,
result_reuse_minutes=result_reuse_minutes,
paramstyle=paramstyle,
)
query_execution = cast(AthenaQueryExecution, self._poll(self.query_id))
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
Expand Down
Loading
Loading