11"""Collect and write time series data to InfluxDB Cloud or InfluxDB OSS."""
22
33# coding: utf-8
4+ # TODO Remove after this program no longer supports Python 3.8.*
5+ from __future__ import annotations
6+
47import logging
58import os
69import warnings
@@ -297,7 +300,7 @@ def __init__(self,
297300You can use native asynchronous version of the client:
298301- https://influxdb-client.readthedocs.io/en/stable/usage.html#how-to-use-asyncio
299302 """
300- # TODO above message has link to Influxdb2 API __NOT__ Influxdb3 API !!! - illustrates different API
303+ # TODO above message has link to Influxdb2 API __NOT__ Influxdb3 API !!! - illustrates different API
301304 warnings .warn (message , DeprecationWarning )
302305
303306 def write (self , bucket : str , org : str = None ,
@@ -420,14 +423,15 @@ def _create_batching_pipeline(self) -> tuple[Subject[Any], rx.abc.DisposableBase
420423 # Create batch (concatenation line protocols by \n)
421424 ops .map (lambda group : group .pipe ( # type: ignore
422425 ops .to_iterable (),
423- ops .map (lambda xs : _BatchItem (key = group .key , data = _body_reduce (xs ), size = len (xs ))))), # type: ignore
426+ ops .map (lambda xs : _BatchItem (key = group .key , data = _body_reduce (xs ), size = len (xs ))))),
427+ # type: ignore
424428 ops .merge_all ())),
425429 # Write data into InfluxDB (possibility to retry if its fail)
426430 ops .filter (lambda batch : batch .size > 0 ),
427431 ops .map (mapper = lambda batch : self ._to_response (data = batch , delay = self ._jitter_delay ())),
428432 ops .merge_all ()) \
429433 .subscribe (self ._on_next , self ._on_error , self ._on_complete )
430-
434+
431435 return subject , disposable
432436
433437 def flush (self ):
@@ -453,7 +457,7 @@ def close(self):
453457 """Flush data and dispose a batching buffer."""
454458 if self ._subject is None :
455459 return # Already closed
456-
460+
457461 self ._subject .on_completed ()
458462 self ._subject .dispose ()
459463 self ._subject = None
0 commit comments