9191 ServerVersion ,
9292 AnyEntityDict ,
9393 StreamType ,
94+ BackgroundOperationTask ,
9495 )
9596
9697VERSION_REGEX = re .compile (
@@ -1870,7 +1871,7 @@ def send_batch_operations(
18701871 project_name : str ,
18711872 operations : list [dict [str , Any ]],
18721873 can_fail : bool = False ,
1873- raise_on_fail : bool = True
1874+ raise_on_fail : bool = True ,
18741875 ) -> list [dict [str , Any ]]:
18751876 """Post multiple CRUD operations to server.
18761877
@@ -1904,17 +1905,98 @@ def send_batch_operations(
19041905 raise_on_fail ,
19051906 )
19061907
1907- def _send_batch_operations (
1908+ def send_background_batch_operations (
19081909 self ,
1909- uri : str ,
1910+ project_name : str ,
19101911 operations : list [dict [str , Any ]],
1911- can_fail : bool ,
1912- raise_on_fail : bool
1913- ) -> list [dict [str , Any ]]:
1914- if not operations :
1915- return []
1912+ * ,
1913+ can_fail : bool = False ,
1914+ wait : bool = False ,
1915+ raise_on_fail : bool = True ,
1916+ ) -> BackgroundOperationTask :
1917+ """Post multiple CRUD operations to server.
1918+
1919+ When multiple changes should be made on server side this is the best
1920+ way to go. It is possible to pass multiple operations to process on a
1921+ server side and do the changes in a transaction.
1922+
1923+ Compared to 'send_batch_operations' this function creates a task on
1924+ server which then can be periodically checked for a status and
1925+ receive it's result.
1926+
1927+ When used with 'wait' set to 'True' this method blocks until task is
1928+ finished. Which makes it work as 'send_batch_operations'
1929+ but safer for large operations batch as is not bound to
1930+ response timeout.
1931+
1932+ Args:
1933+ project_name (str): On which project should be operations
1934+ processed.
1935+ operations (list[dict[str, Any]]): Operations to be processed.
1936+ can_fail (Optional[bool]): Server will try to process all
1937+ operations even if one of them fails.
1938+ wait (bool): Wait for operations to end.
1939+ raise_on_fail (Optional[bool]): Raise exception if an operation
1940+ fails. You can handle failed operations on your own
1941+ when set to 'False'. Used when 'wait' is enabled.
1942+
1943+ Raises:
1944+ ValueError: Operations can't be converted to json string.
1945+ FailedOperations: When output does not contain server operations
1946+ or 'raise_on_fail' is enabled and any operation fails.
1947+
1948+ Returns:
1949+ BackgroundOperationTask: Background operation.
1950+
1951+ """
1952+ operations_body = self ._prepare_operations_body (operations )
1953+ response = self .post (
1954+ f"projects/{ project_name } /operations/background" ,
1955+ operations = operations_body ,
1956+ canFail = can_fail
1957+ )
1958+ response .raise_for_status ()
1959+ if not wait :
1960+ return response .data
19161961
1917- body_by_id = {}
1962+ task_id = response ["id" ]
1963+ time .sleep (0.1 )
1964+ while True :
1965+ op_status = self .get_background_operations_status (
1966+ project_name , task_id
1967+ )
1968+ if op_status ["status" ] == "completed" :
1969+ break
1970+ time .sleep (1 )
1971+
1972+ if raise_on_fail :
1973+ self ._validate_operations_result (
1974+ op_status ["result" ], operations_body
1975+ )
1976+ return op_status
1977+
1978+ def get_background_operations_status (
1979+ self , project_name : str , task_id : str
1980+ ) -> BackgroundOperationTask :
1981+ """Get status of background operations task.
1982+
1983+ Args:
1984+ project_name (str): Project name.
1985+ task_id (str): Backgorund operation task id.
1986+
1987+ Returns:
1988+ BackgroundOperationTask: Background operation.
1989+
1990+ """
1991+ response = self .get (
1992+ f"projects/{ project_name } /operations/background/{ task_id } "
1993+ )
1994+ response .raise_for_status ()
1995+ return response .data
1996+
1997+ def _prepare_operations_body (
1998+ self , operations : list [dict [str , Any ]]
1999+ ) -> list [dict [str , Any ]]:
19182000 operations_body = []
19192001 for operation in operations :
19202002 if not operation :
@@ -1936,42 +2018,68 @@ def _send_batch_operations(
19362018 )
19372019 ))
19382020
1939- body_by_id [op_id ] = body
19402021 operations_body .append (body )
2022+ return operations_body
19412023
2024+ def _send_batch_operations (
2025+ self ,
2026+ uri : str ,
2027+ operations : list [dict [str , Any ]],
2028+ can_fail : bool ,
2029+ raise_on_fail : bool
2030+ ) -> list [dict [str , Any ]]:
2031+ if not operations :
2032+ return []
2033+
2034+ operations_body = self ._prepare_operations_body (operations )
19422035 if not operations_body :
19432036 return []
19442037
1945- result = self .post (
2038+ response = self .post (
19462039 uri ,
19472040 operations = operations_body ,
19482041 canFail = can_fail
19492042 )
19502043
1951- op_results = result .get ("operations" )
2044+ op_results = response .get ("operations" )
19522045 if op_results is None :
1953- detail = result .get ("detail" )
2046+ detail = response .get ("detail" )
19542047 if detail :
19552048 raise FailedOperations (f"Operation failed. Detail: { detail } " )
19562049 raise FailedOperations (
1957- f"Operation failed. Content: { result .text } "
2050+ f"Operation failed. Content: { response .text } "
19582051 )
19592052
1960- if result .get ("success" ) or not raise_on_fail :
1961- return op_results
1962-
1963- for op_result in op_results :
1964- if not op_result ["success" ]:
1965- operation_id = op_result ["id" ]
1966- raise FailedOperations ((
1967- "Operation \" {}\" failed with data:\n {}\n Detail: {}."
1968- ).format (
1969- operation_id ,
1970- json .dumps (body_by_id [operation_id ], indent = 4 ),
1971- op_result ["detail" ],
1972- ))
2053+ if raise_on_fail :
2054+ self ._validate_operations_result (response .data , operations_body )
19732055 return op_results
19742056
2057+ def _validate_operations_result (
2058+ self ,
2059+ result : dict [str , Any ],
2060+ operations_body : list [dict [str , Any ]],
2061+ ) -> None :
2062+ if result .get ("success" ):
2063+ return None
2064+
2065+ print (result )
2066+ for op_result in result ["operations" ]:
2067+ if op_result ["success" ]:
2068+ continue
2069+
2070+ operation_id = op_result ["id" ]
2071+ operation = next (
2072+ op
2073+ for op in operations_body
2074+ if op ["id" ] == operation_id
2075+ )
2076+ detail = op_result ["detail" ]
2077+ raise FailedOperations (
2078+ f"Operation \" { operation_id } \" failed with data:"
2079+ f"\n { json .dumps (operation , indent = 4 )} "
2080+ f"\n Detail: { detail } ."
2081+ )
2082+
19752083 def _prepare_fields (
19762084 self , entity_type : str , fields : set [str ], own_attributes : bool = False
19772085 ):
0 commit comments