diff --git a/gsy_framework/read_user_profile.py b/gsy_framework/read_user_profile.py index 645ea8b9..11db3549 100644 --- a/gsy_framework/read_user_profile.py +++ b/gsy_framework/read_user_profile.py @@ -29,6 +29,7 @@ from gsy_framework.constants_limits import ( DATE_TIME_FORMAT, DATE_TIME_FORMAT_SECONDS, + PROFILE_EXPANSION_DAYS, TIME_FORMAT, TIME_ZONE, GlobalConfig, @@ -40,6 +41,7 @@ generate_market_slot_list, return_ordered_dict, convert_kWh_to_W, + is_number, ) logger = logging.getLogger(__name__) @@ -71,427 +73,437 @@ class LiveProfileTypes(Enum): MEASUREMENT = 2 -def _str_to_datetime(time_string, time_format) -> DateTime: - """ - Converts time_string into a pendulum (DateTime) object that either takes the global start date - or the provided one, dependent on the time_format - :return: DateTime - """ - time = from_format(time_string, time_format, tz=TIME_ZONE) - if time_format in [DATE_TIME_FORMAT, DATE_TIME_FORMAT_SECONDS, DATE_TIME_FORMAT_SPACED]: - return time - if time_format == TIME_FORMAT: - return GlobalConfig.start_date.add( - hours=time.hour, minutes=time.minute, seconds=time.second +class UserProfileReader: + + def __init__(self, read_full_profile: bool = False): + self._profile_length_days = ( + PROFILE_EXPANSION_DAYS if not read_full_profile else GlobalConfig.sim_duration.days ) - raise GSyReadProfileException("Provided time_format invalid.") - - -def default_profile_dict(val=None, current_timestamp=None) -> Dict[DateTime, int]: - """ - Returns dictionary with default values for all market slots. - :param val: Default value - """ - if val is None: - val = 0 - return { - time_slot: val - for time_slot in generate_market_slot_list(start_timestamp=current_timestamp) - } - - -def is_number(number): - """return if number is float""" - try: - float(number) - return True - except ValueError: - return False - - -def _remove_header(profile_dict: Dict) -> Dict: - """ - Checks profile for header entries and removes these - Header entries have values that are not representations of numbers - """ - out_dict = {} - for time_stamp, value in profile_dict.items(): - if is_number(value): - out_dict[time_stamp] = value - return out_dict - - -def _eval_single_format(time_dict, time_format): - # pylint: disable=expression-not-assigned - try: - [from_format(str(ti), time_format) for ti in time_dict.keys()] - return time_format - except ValueError: - return None - - -def _eval_time_format(time_dict: Dict) -> str: - """ - Evaluates which time format the provided dictionary has, also checks if the time-format is - consistent for each time_slot - :return: TIME_FORMAT or DATE_TIME_FORMAT or DATE_TIME_FORMAT_SECONDS - """ - - for time_format in [ - TIME_FORMAT, - DATE_TIME_FORMAT, - DATE_TIME_FORMAT_SECONDS, - DATE_TIME_FORMAT_SPACED, - ]: - if _eval_single_format(time_dict, time_format): + + def _str_to_datetime(self, time_string, time_format) -> DateTime: + """ + Converts time_string into a pendulum (DateTime) object that either takes the global + start date or the provided one, dependent on the time_format + :return: DateTime + """ + time = from_format(time_string, time_format, tz=TIME_ZONE) + if time_format in [DATE_TIME_FORMAT, DATE_TIME_FORMAT_SECONDS, DATE_TIME_FORMAT_SPACED]: + return time + if time_format == TIME_FORMAT: + return GlobalConfig.start_date.add( + hours=time.hour, minutes=time.minute, seconds=time.second + ) + raise GSyReadProfileException("Provided time_format invalid.") + + def default_profile_dict(self, val=None, current_timestamp=None) -> Dict[DateTime, int]: + """ + Returns dictionary with default values for all market slots. + :param val: Default value + """ + if val is None: + val = 0 + return { + time_slot: val + for time_slot in generate_market_slot_list( + start_timestamp=current_timestamp, profile_length_days=self._profile_length_days + ) + } + + def _remove_header(self, profile_dict: Dict) -> Dict: + """ + Checks profile for header entries and removes these + Header entries have values that are not representations of numbers + """ + out_dict = {} + for time_stamp, value in profile_dict.items(): + if is_number(value): + out_dict[time_stamp] = value + return out_dict + + def _eval_single_format(self, time_dict, time_format): + # pylint: disable=expression-not-assigned + try: + [from_format(str(ti), time_format) for ti in time_dict.keys()] return time_format + except ValueError: + return None + + def _eval_time_format(self, time_dict: Dict) -> str: + """ + Evaluates which time format the provided dictionary has, also checks if the time-format is + consistent for each time_slot + :return: TIME_FORMAT or DATE_TIME_FORMAT or DATE_TIME_FORMAT_SECONDS + """ + + for time_format in [ + TIME_FORMAT, + DATE_TIME_FORMAT, + DATE_TIME_FORMAT_SECONDS, + DATE_TIME_FORMAT_SPACED, + ]: + if self._eval_single_format(time_dict, time_format): + return time_format - raise GSyReadProfileException( - f"Format of time-stamp is not one of ('{TIME_FORMAT}', " - f"'{DATE_TIME_FORMAT}', '{DATE_TIME_FORMAT_SECONDS}')" - ) - - -def _read_csv(path: str) -> Dict: - """ - Read a 2-column csv profile file. First column is the time, second column - is the value (power, energy, rate, ...) - :param path: path to csv file - :return: Dict[DateTime, value] - """ - profile_data = {} - with open(path, encoding="utf-8") as csv_file: - csv_rows = csv.reader(csv_file) - for row in csv_rows: - if len(row) == 0: - raise GSyReadProfileException( - f"There must not be an empty line in the profile file {path}" - ) - if len(row) != 2: - row = row[0].split(";") - try: - profile_data[row[0]] = float(row[1]) - except ValueError: - pass - time_format = _eval_time_format(profile_data) - return dict( - (_str_to_datetime(time_str, time_format), value) - for time_str, value in profile_data.items() - ) - - -def _interpolate_profile_values_to_slot(profile_data_W, slot_length): - # pylint: disable=invalid-name - input_time_list = list(profile_data_W.keys()) - - # add one market slot in order to have another data point for integration - additional_time_stamp = input_time_list[-1] + slot_length - profile_data_W[additional_time_stamp] = profile_data_W[input_time_list[-1]] - input_time_list.append(additional_time_stamp) - - input_power_list_W = [float(dp) for dp in profile_data_W.values()] - - time0 = from_timestamp(0) - input_time_seconds_list = [(ti - time0).in_seconds() for ti in input_time_list] - - slot_time_list = list( - range(input_time_seconds_list[0], input_time_seconds_list[-1], slot_length.in_seconds()) - ) - - second_power_list_W = [ - input_power_list_W[index - 1] - for index, seconds in enumerate(input_time_seconds_list) - for _ in range(seconds - input_time_seconds_list[index - 1]) - ] - - avg_power_kW = [] - for index, _ in enumerate(slot_time_list): - first_index = index * slot_length.in_seconds() - if first_index <= len(second_power_list_W): - avg_power_kW.append(second_power_list_W[first_index] / 1000.0) - - return avg_power_kW, slot_time_list - - -def _calculate_energy_from_power_profile( - profile_data_W: Dict[DateTime, float], slot_length: duration -) -> Dict[DateTime, float]: - # pylint: disable=invalid-name - """ - Calculates energy from power profile. Does not use numpy, calculates avg power for each - market slot and based on that calculates energy. - :param profile_data_W: Power profile in W - :param slot_length: slot length duration - :return: a mapping from time to energy values in kWh - """ - - avg_power_kW, slot_time_list = _interpolate_profile_values_to_slot(profile_data_W, slot_length) - slot_energy_kWh = list(map(lambda x: convert_kW_to_kWh(x, slot_length), avg_power_kW)) - - return { - from_timestamp(slot_time_list[ii]): energy for ii, energy in enumerate(slot_energy_kWh) - } - - -def _fill_gaps_in_profile(input_profile: Dict = None, out_profile: Dict = None) -> Dict: - """ - Fills time steps, where no value is provided, with the value value of the - last available time step. - :param input_profile: Dict[Datetime: float, int, tuple] - :param out_profile: dict with the same format as the input_profile that can be used - to provide a default zero-value profile dict with the expected timestamps - that need to be populated in the profile - :return: continuous profile Dict[Datetime: float, int, tuple] - """ - - try: - if isinstance(list(input_profile.values())[0], tuple): - current_val = (0, 0) - else: - current_val = 0 - except IndexError: - logger.warning( - "Failed to parse input profile, skipping profile gap filling. Profile: %s", - input_profile, + raise GSyReadProfileException( + f"Format of time-stamp is not one of ('{TIME_FORMAT}', " + f"'{DATE_TIME_FORMAT}', '{DATE_TIME_FORMAT_SECONDS}')" ) - return input_profile - for time in out_profile.keys(): - if time not in input_profile: - temp_val = get_from_profile_same_weekday_and_time( - input_profile, time, ignore_not_found=True - ) - if temp_val is not None: - current_val = temp_val - else: - current_val = input_profile[time] - out_profile[time] = current_val - - return out_profile - - -def _read_from_different_sources_todict( - input_profile: Any, current_timestamp: DateTime = None -) -> Dict[DateTime, float]: - """ - Reads arbitrary profile. - Handles csv, dict and string input. - :param input_profile:Can be either a csv file path, - or a dict with hourly data (Dict[int, float]) - or a dict with arbitrary time data (Dict[str, float]) - or a string containing a serialized dict of the aforementioned structure - :return: - """ - - if os.path.isfile(str(input_profile)): - # input is csv file - profile = _read_csv(input_profile) - - elif isinstance(input_profile, (dict, str)): - # input is profile - - if isinstance(input_profile, str): - # input in JSON formatting - profile = ast.literal_eval(input_profile.encode("utf-8").decode("utf-8-sig")) - # Remove filename entry to support d3a-web profiles - profile.pop("filename", None) - profile = _remove_header(profile) - time_format = _eval_time_format(profile) - profile = {_str_to_datetime(key, time_format): val for key, val in profile.items()} - elif isinstance(list(input_profile.keys())[0], DateTime): - return input_profile + def _read_csv(self, path: str) -> Dict: + """ + Read a 2-column csv profile file. First column is the time, second column + is the value (power, energy, rate, ...) + :param path: path to csv file + :return: Dict[DateTime, value] + """ + profile_data = {} + with open(path, encoding="utf-8") as csv_file: + csv_rows = csv.reader(csv_file) + for row in csv_rows: + if len(row) == 0: + raise GSyReadProfileException( + f"There must not be an empty line in the profile file {path}" + ) + if len(row) != 2: + row = row[0].split(";") + try: + profile_data[row[0]] = float(row[1]) + except ValueError: + pass + time_format = self._eval_time_format(profile_data) + return dict( + (self._str_to_datetime(time_str, time_format), value) + for time_str, value in profile_data.items() + ) - elif isinstance(list(input_profile.keys())[0], str): - # input is dict with string keys that are properly formatted time stamps - input_profile = _remove_header(input_profile) - # Remove filename from profile - input_profile.pop("filename", None) - time_format = _eval_time_format(input_profile) - profile = { - _str_to_datetime(key, time_format): val for key, val in input_profile.items() - } + def _interpolate_profile_values_to_slot(self, profile_data_W, slot_length): + # pylint: disable=invalid-name + input_time_list = list(profile_data_W.keys()) - elif isinstance(list(input_profile.keys())[0], (float, int)): - # input is hourly profile + # add one market slot in order to have another data point for integration + additional_time_stamp = input_time_list[-1] + slot_length + profile_data_W[additional_time_stamp] = profile_data_W[input_time_list[-1]] + input_time_list.append(additional_time_stamp) - profile = dict( - (today(tz=TIME_ZONE).add(hours=hour), val) for hour, val in input_profile.items() - ) + input_power_list_W = [float(dp) for dp in profile_data_W.values()] - else: - raise GSyReadProfileException( - f"Unsupported input type of {str(list(input_profile.keys())[0])} " - f"(type: {type(list(input_profile.keys())[0])})" + time0 = from_timestamp(0) + input_time_seconds_list = [(ti - time0).in_seconds() for ti in input_time_list] + + slot_time_list = list( + range( + input_time_seconds_list[0], input_time_seconds_list[-1], slot_length.in_seconds() ) + ) - elif isinstance(input_profile, (float, int, tuple)): - # input is single value - profile = default_profile_dict(val=input_profile, current_timestamp=current_timestamp) - - else: - raise GSyReadProfileException(f"Unsupported input type: {str(input_profile)}") - - return profile - - -def _hour_time_str(hour: float, minute: float) -> str: - return f"{hour:02}:{minute:02}" - - -def _copy_profile_to_multiple_days( - in_profile: Dict, current_timestamp: Optional[datetime] = None -) -> Dict: - daytime_dict = dict( - (_hour_time_str(time.hour, time.minute), time) for time in in_profile.keys() - ) - out_profile = {} - for slot_time in generate_market_slot_list(start_timestamp=current_timestamp): - if slot_time not in out_profile: - time_key = _hour_time_str(slot_time.hour, slot_time.minute) - if time_key in daytime_dict: - out_profile[slot_time] = in_profile[daytime_dict[time_key]] - return out_profile - - -@return_ordered_dict -def read_arbitrary_profile( - profile_type: InputProfileTypes, input_profile, current_timestamp: DateTime = None -) -> Dict[DateTime, float]: - """ - Reads arbitrary profile. - Handles csv, dict and string input. - Fills gaps in the profile. - :param profile_type: Can be either rate or power - :param input_profile: Can be either a csv file path, - or a dict with hourly data (Dict[int, float]) - or a dict with arbitrary time data (Dict[str, float]) - or a string containing a serialized dict of the aforementioned structure - :param current_timestamp: - :return: a mapping from time to profile values - """ - if input_profile in [{}, None]: - return {} - profile = _read_from_different_sources_todict( - input_profile, current_timestamp=current_timestamp - ) - profile_time_list = list(profile.keys()) - profile_duration = profile_time_list[-1] - profile_time_list[0] - if ( - GlobalConfig.sim_duration > duration(days=1) >= profile_duration - ) or GlobalConfig.is_canary_network(): - profile = _copy_profile_to_multiple_days(profile, current_timestamp=current_timestamp) - - if not profile: - return {} - try: - if profile_type is InputProfileTypes.ENERGY_KWH: - profile = { - ts: convert_kWh_to_W(energy, GlobalConfig.slot_length) - for ts, energy in profile.items() - } - if profile_type is InputProfileTypes.CARBON_RATIO_G_KWH: - return profile - zero_value_slot_profile = default_profile_dict(current_timestamp=current_timestamp) - filled_profile = _fill_gaps_in_profile(profile, zero_value_slot_profile) - if profile_type in [ - InputProfileTypes.POWER_W, - InputProfileTypes.REBASE_W, - InputProfileTypes.ENERGY_KWH, - ]: - return _calculate_energy_from_power_profile(filled_profile, GlobalConfig.slot_length) - return filled_profile - except IndexError as exc: - logger.error("Profile failed: %s", profile) - raise GSyReadProfileException( - f"Filling gaps and converting profile failed: {input_profile}" - ) from exc + second_power_list_W = [ + input_power_list_W[index - 1] + for index, seconds in enumerate(input_time_seconds_list) + for _ in range(seconds - input_time_seconds_list[index - 1]) + ] + + avg_power_kW = [] + for index, _ in enumerate(slot_time_list): + first_index = index * slot_length.in_seconds() + if first_index <= len(second_power_list_W): + avg_power_kW.append(second_power_list_W[first_index] / 1000.0) + + return avg_power_kW, slot_time_list + + def _calculate_energy_from_power_profile( + self, profile_data_W: Dict[DateTime, float], slot_length: duration + ) -> Dict[DateTime, float]: + # pylint: disable=invalid-name + """ + Calculates energy from power profile. Does not use numpy, calculates avg power for each + market slot and based on that calculates energy. + :param profile_data_W: Power profile in W + :param slot_length: slot length duration + :return: a mapping from time to energy values in kWh + """ + + avg_power_kW, slot_time_list = self._interpolate_profile_values_to_slot( + profile_data_W, slot_length + ) + slot_energy_kWh = list(map(lambda x: convert_kW_to_kWh(x, slot_length), avg_power_kW)) + + return { + from_timestamp(slot_time_list[ii]): energy for ii, energy in enumerate(slot_energy_kWh) + } + + def _fill_gaps_in_profile(self, input_profile: Dict = None, out_profile: Dict = None) -> Dict: + """ + Fills time steps, where no value is provided, with the value of the last available time + step. + :param input_profile: Dict[Datetime: float, int, tuple] + :param out_profile: dict with the same format as the input_profile that can be used + to provide a default zero-value profile dict with the expected + timestamps that need to be populated in the profile + :return: continuous profile Dict[Datetime: float, int, tuple] + """ + + try: + if isinstance(list(input_profile.values())[0], tuple): + current_val = (0, 0) + else: + current_val = 0 + except IndexError: + logger.warning( + "Failed to parse input profile, skipping profile gap filling. Profile: %s", + input_profile, + ) + return input_profile + for time in out_profile.keys(): + if time not in input_profile: + temp_val = get_from_profile_same_weekday_and_time( + input_profile, time, ignore_not_found=True + ) + if temp_val is not None: + current_val = temp_val + else: + current_val = input_profile[time] + out_profile[time] = current_val + + return out_profile + + def _read_from_different_sources_todict( + self, + input_profile: Any, + current_timestamp: DateTime = None, + ) -> Dict[DateTime, float]: + """ + Reads arbitrary profile. + Handles csv, dict and string input. + :param input_profile:Can be either a csv file path, + or a dict with hourly data (Dict[int, float]) + or a dict with arbitrary time data (Dict[str, float]) + or a string containing a serialized dict of the aforementioned structure + :return: + """ + + if os.path.isfile(str(input_profile)): + # input is csv file + profile = self._read_csv(input_profile) + + elif isinstance(input_profile, (dict, str)): + # input is profile + + if isinstance(input_profile, str): + # input in JSON formatting + profile = ast.literal_eval(input_profile.encode("utf-8").decode("utf-8-sig")) + # Remove filename entry to support d3a-web profiles + profile.pop("filename", None) + profile = self._remove_header(profile) + time_format = self._eval_time_format(profile) + profile = { + self._str_to_datetime(key, time_format): val for key, val in profile.items() + } + elif isinstance(list(input_profile.keys())[0], DateTime): + return input_profile + + elif isinstance(list(input_profile.keys())[0], str): + # input is dict with string keys that are properly formatted time stamps + input_profile = self._remove_header(input_profile) + # Remove filename from profile + input_profile.pop("filename", None) + time_format = self._eval_time_format(input_profile) + profile = { + self._str_to_datetime(key, time_format): val + for key, val in input_profile.items() + } + + elif isinstance(list(input_profile.keys())[0], (float, int)): + # input is hourly profile + profile = dict( + (today(tz=TIME_ZONE).add(hours=hour), val) + for hour, val in input_profile.items() + ) -def _generate_slot_based_zero_values_dict_from_profile(profile, slot_length_mins=15): - profile_time_list = list(profile.keys()) - end_time = profile_time_list[-1] - start_time = profile_time_list[0] + else: + raise GSyReadProfileException( + f"Unsupported input type of {str(list(input_profile.keys())[0])} " + f"(type: {type(list(input_profile.keys())[0])})" + ) - slot_length_seconds = slot_length_mins * 60 + elif isinstance(input_profile, (float, int, tuple)): + # input is single value + profile = self.default_profile_dict( + val=input_profile, current_timestamp=current_timestamp + ) - offset_seconds = start_time.timestamp() % slot_length_seconds - start_datetime = from_timestamp(start_time.timestamp() - offset_seconds) + else: + raise GSyReadProfileException(f"Unsupported input type: {str(input_profile)}") - profile_duration = end_time - start_datetime - return { - start_datetime + duration(minutes=slot_length_mins * i): 0.0 - for i in range((int(profile_duration.total_minutes()) // slot_length_mins) + 1) - } + return profile + def _hour_time_str(self, hour: float, minute: float) -> str: + return f"{hour:02}:{minute:02}" -def read_profile_without_config(input_profile: Dict, slot_length_mins=15) -> Dict[DateTime, float]: - """Read profile without using a configuration.""" - profile = _read_from_different_sources_todict(input_profile) - if profile is not None: - slot_based_profile = _generate_slot_based_zero_values_dict_from_profile( - profile, slot_length_mins + def _copy_profile_to_multiple_days( + self, in_profile: Dict, current_timestamp: Optional[datetime] = None + ) -> Dict: + daytime_dict = dict( + (self._hour_time_str(time.hour, time.minute), time) for time in in_profile.keys() ) - filled_profile = _fill_gaps_in_profile(profile, slot_based_profile) - profile_values, slots = _interpolate_profile_values_to_slot( - filled_profile, duration(minutes=slot_length_mins) + out_profile = {} + for slot_time in generate_market_slot_list(start_timestamp=current_timestamp): + if slot_time not in out_profile: + time_key = self._hour_time_str(slot_time.hour, slot_time.minute) + if time_key in daytime_dict: + out_profile[slot_time] = in_profile[daytime_dict[time_key]] + return out_profile + + @return_ordered_dict + def read_arbitrary_profile( + self, + profile_type: InputProfileTypes, + input_profile, + current_timestamp: DateTime = None, + ) -> Dict[DateTime, float]: + """ + Reads arbitrary profile. + Handles csv, dict and string input. + Fills gaps in the profile. + :param profile_type: Can be either rate or power + :param input_profile: Can be either a csv file path, + or a dict with hourly data (Dict[int, float]) + or a dict with arbitrary time data (Dict[str, float]) + or a string containing a serialized dict of the aforementioned structure + :param current_timestamp: + :return: a mapping from time to profile values + """ + if input_profile in [{}, None]: + return {} + profile = self._read_from_different_sources_todict( + input_profile, current_timestamp=current_timestamp ) - return {from_timestamp(slots[ii]): energy for ii, energy in enumerate(profile_values)} + profile_time_list = list(profile.keys()) + profile_duration = profile_time_list[-1] - profile_time_list[0] + if ( + GlobalConfig.sim_duration > duration(days=1) >= profile_duration + ) or GlobalConfig.is_canary_network(): + profile = self._copy_profile_to_multiple_days( + profile, current_timestamp=current_timestamp + ) - raise GSyReadProfileException( - "Profile file cannot be read successfully. Please reconfigure the file path." - ) + if not profile: + return {} + try: + if profile_type is InputProfileTypes.ENERGY_KWH: + profile = { + ts: convert_kWh_to_W(energy, GlobalConfig.slot_length) + for ts, energy in profile.items() + } + if profile_type is InputProfileTypes.CARBON_RATIO_G_KWH: + return profile + zero_value_slot_profile = self.default_profile_dict( + current_timestamp=current_timestamp + ) + filled_profile = self._fill_gaps_in_profile(profile, zero_value_slot_profile) + if profile_type in [ + InputProfileTypes.POWER_W, + InputProfileTypes.REBASE_W, + InputProfileTypes.ENERGY_KWH, + ]: + return self._calculate_energy_from_power_profile( + filled_profile, GlobalConfig.slot_length + ) + return filled_profile + except IndexError as exc: + logger.error("Profile failed: %s", profile) + raise GSyReadProfileException( + f"Filling gaps and converting profile failed: {input_profile}" + ) from exc + def _generate_slot_based_zero_values_dict_from_profile(self, profile, slot_length_mins=15): + profile_time_list = list(profile.keys()) + end_time = profile_time_list[-1] + start_time = profile_time_list[0] -def _generate_time_slots( - slot_length: timedelta, sim_duration: timedelta, start_date: datetime -) -> Generator: - return ( - start_date + timedelta(seconds=slot_length.total_seconds() * time_diff_count) - for time_diff_count in range( - int(sim_duration.total_seconds() / slot_length.total_seconds()) - ) - ) - - -def _get_from_profile(profile: Dict[datetime, float], key: datetime) -> float: - try: - return profile[key] - except KeyError as ex: - raise GSyProfileException(f"The input profile does not contain the key {key}") from ex - - -def resample_hourly_energy_profile( - input_profile: Dict[DateTime, float], - slot_length: timedelta, - sim_duration: timedelta, - start_date: datetime, -) -> Dict[DateTime, float]: - """Resample hourly energy profile in order to fit to the set slot_length.""" - slot_length_minutes = slot_length.total_seconds() / 60 - if slot_length_minutes < 60: - if 60 % slot_length_minutes != 0: - raise GSyProfileException("slot_length is not an exact division of 60 minutes") - scaling_factor = 60 / slot_length_minutes - out_dict = {} - for time_slot in _generate_time_slots(slot_length, sim_duration, start_date): - hour_time_slot = datetime( - time_slot.year, - time_slot.month, - time_slot.day, - time_slot.hour, - tzinfo=time_slot.tzinfo, - ) - out_dict[time_slot] = _get_from_profile(input_profile, hour_time_slot) / scaling_factor - return out_dict - if slot_length_minutes > 60: - if slot_length_minutes % 60 != 0: - raise GSyProfileException("slot_length is not multiple of 1 hour") - number_of_aggregated_slots = int(slot_length_minutes / 60) + slot_length_seconds = slot_length_mins * 60 + + offset_seconds = start_time.timestamp() % slot_length_seconds + start_datetime = from_timestamp(start_time.timestamp() - offset_seconds) + + profile_duration = end_time - start_datetime return { - time_slot: sum( - _get_from_profile(input_profile, time_slot.add(minutes=slot_length_minutes * nn)) - for nn in range(number_of_aggregated_slots) - ) - for time_slot in _generate_time_slots(slot_length, sim_duration, start_date) + start_datetime + duration(minutes=slot_length_mins * i): 0.0 + for i in range((int(profile_duration.total_minutes()) // slot_length_mins) + 1) } - return input_profile + + def read_profile_without_config( + self, input_profile: Dict, slot_length_mins=15 + ) -> Dict[DateTime, float]: + """Read profile without using a configuration.""" + profile = self._read_from_different_sources_todict(input_profile) + if profile is not None: + slot_based_profile = self._generate_slot_based_zero_values_dict_from_profile( + profile, slot_length_mins + ) + filled_profile = self._fill_gaps_in_profile(profile, slot_based_profile) + profile_values, slots = self._interpolate_profile_values_to_slot( + filled_profile, duration(minutes=slot_length_mins) + ) + return {from_timestamp(slots[ii]): energy for ii, energy in enumerate(profile_values)} + + raise GSyReadProfileException( + "Profile file cannot be read successfully. Please reconfigure the file path." + ) + + def _generate_time_slots( + self, slot_length: timedelta, sim_duration: timedelta, start_date: datetime + ) -> Generator: + return ( + start_date + timedelta(seconds=slot_length.total_seconds() * time_diff_count) + for time_diff_count in range( + int(sim_duration.total_seconds() / slot_length.total_seconds()) + ) + ) + + def _get_from_profile(self, profile: Dict[datetime, float], key: datetime) -> float: + try: + return profile[key] + except KeyError as ex: + raise GSyProfileException(f"The input profile does not contain the key {key}") from ex + + def resample_hourly_energy_profile( + self, + input_profile: Dict[DateTime, float], + slot_length: timedelta, + sim_duration: timedelta, + start_date: datetime, + ) -> Dict[DateTime, float]: + """Resample hourly energy profile in order to fit to the set slot_length.""" + slot_length_minutes = slot_length.total_seconds() / 60 + if slot_length_minutes < 60: + if 60 % slot_length_minutes != 0: + raise GSyProfileException("slot_length is not an exact division of 60 minutes") + scaling_factor = 60 / slot_length_minutes + out_dict = {} + for time_slot in self._generate_time_slots(slot_length, sim_duration, start_date): + hour_time_slot = datetime( + time_slot.year, + time_slot.month, + time_slot.day, + time_slot.hour, + tzinfo=time_slot.tzinfo, + ) + out_dict[time_slot] = ( + self._get_from_profile(input_profile, hour_time_slot) / scaling_factor + ) + return out_dict + if slot_length_minutes > 60: + if slot_length_minutes % 60 != 0: + raise GSyProfileException("slot_length is not multiple of 1 hour") + number_of_aggregated_slots = int(slot_length_minutes / 60) + return { + time_slot: sum( + self._get_from_profile( + input_profile, time_slot.add(minutes=slot_length_minutes * nn) + ) + for nn in range(number_of_aggregated_slots) + ) + for time_slot in self._generate_time_slots(slot_length, sim_duration, start_date) + } + return input_profile diff --git a/gsy_framework/solar_api_clients/api_client_base.py b/gsy_framework/solar_api_clients/api_client_base.py index d6d60a48..6266e8f9 100644 --- a/gsy_framework/solar_api_clients/api_client_base.py +++ b/gsy_framework/solar_api_clients/api_client_base.py @@ -4,7 +4,7 @@ from typing import Dict, Any from pendulum import DateTime -from gsy_framework.read_user_profile import resample_hourly_energy_profile +from gsy_framework.read_user_profile import UserProfileReader @dataclass @@ -17,6 +17,7 @@ class PvApiParameters: azimuth: orientation of the PV (cardinal direction) tilt: tilt of the PV panel WRT to the vertical axis (e.g. inclination of the roof) """ + latitude: float longitude: float capacity_kW: float @@ -35,8 +36,12 @@ def _set_use_historical_data(self, _input_datetime: DateTime) -> None: self._use_historical_data = True def get_solar_energy_profile( - self, request_parameters: PvApiParameters, start_date: DateTime, end_date: DateTime, - slot_length: timedelta) -> Dict[DateTime, float]: + self, + request_parameters: PvApiParameters, + start_date: DateTime, + end_date: DateTime, + slot_length: timedelta, + ) -> Dict[DateTime, float]: """Request energy profile from external solar API. return: Dictionary of raw data including a time series of energy production with @@ -46,11 +51,14 @@ def get_solar_energy_profile( raw_data = self._request_raw_solar_energy_data( request_parameters, self._get_corresponding_historical_time_stamp(start_date), - self._get_corresponding_historical_time_stamp(end_date)) + self._get_corresponding_historical_time_stamp(end_date), + ) energy_profile = self._create_time_series_from_solar_profile( - raw_data, start_date.year, end_date.year) - resampled_profile = resample_hourly_energy_profile( - energy_profile, slot_length, end_date - start_date, start_date) + raw_data, start_date.year, end_date.year + ) + resampled_profile = UserProfileReader().resample_hourly_energy_profile( + energy_profile, slot_length, end_date - start_date, start_date + ) return resampled_profile @staticmethod @@ -64,17 +72,18 @@ def _get_corresponding_historical_time_stamp(input_datetime: DateTime) -> DateTi """ @abc.abstractmethod - def _request_raw_solar_energy_data(self, request_parameters: PvApiParameters, - start_date: DateTime, end_date: DateTime) -> Any: + def _request_raw_solar_energy_data( + self, request_parameters: PvApiParameters, start_date: DateTime, end_date: DateTime + ) -> Any: """ Perform the actual request to the API and return raw data in the API specific format. """ @staticmethod @abc.abstractmethod - def _create_time_series_from_solar_profile(request_data: Dict, - out_start_year: int, - out_end_year: int) -> Dict[DateTime, float]: + def _create_time_series_from_solar_profile( + request_data: Dict, out_start_year: int, out_end_year: int + ) -> Dict[DateTime, float]: """ Convert the API specific data into a time series dict that can be digested by gsy-web. """ diff --git a/gsy_framework/utils.py b/gsy_framework/utils.py index 3c4e7e80..8be3880e 100644 --- a/gsy_framework/utils.py +++ b/gsy_framework/utils.py @@ -109,16 +109,19 @@ def generate_market_slot_list_from_config( ] -def generate_market_slot_list(start_timestamp=None): +def generate_market_slot_list(start_timestamp=None, profile_length_days=None): """ Creates a list with datetimes that correspond to market slots of the simulation. No input arguments, required input is only handled by a preconfigured GlobalConfig @return: List with market slot datetimes """ + if not profile_length_days: + profile_length_days = PROFILE_EXPANSION_DAYS + time_span = ( GlobalConfig.slot_length if GlobalConfig.is_canary_network() - else min(GlobalConfig.sim_duration, duration(days=PROFILE_EXPANSION_DAYS)) + else min(GlobalConfig.sim_duration, duration(days=profile_length_days)) ) time_span += duration(hours=ConstSettings.FutureMarketSettings.FUTURE_MARKET_DURATION_HOURS) market_slot_list = generate_market_slot_list_from_config( @@ -693,3 +696,12 @@ def convert_string_to_boolean(input_object: Union[str, bool]) -> bool: if "false" in input_object.lower(): return False return True + + +def is_number(number): + """Return True if number is float""" + try: + float(number) + return True + except ValueError: + return False diff --git a/tests/static/Solar_Curve_W_cloudy.csv b/tests/static/Solar_Curve_W_cloudy.csv new file mode 100644 index 00000000..f55d7976 --- /dev/null +++ b/tests/static/Solar_Curve_W_cloudy.csv @@ -0,0 +1,97 @@ +INTERVAL;POWER(W) +00:00;0 +00:15;0 +00:30;0 +00:45;0 +01:00;0 +01:15;0 +01:30;0 +01:45;0 +02:00;0 +02:15;0 +02:30;0 +02:45;0 +03:00;0 +03:15;0 +03:30;0 +03:45;0 +04:00;0 +04:15;0 +04:30;0 +04:45;0 +05:00;0 +05:15;12.5 +05:30;12.5 +05:45;12.5 +06:00;12.5 +06:15;12.5 +06:30;12.5 +06:45;12.5 +07:00;12.5 +07:15;37.5 +07:30;37.5 +07:45;37.5 +08:00;37.5 +08:15;37.5 +08:30;37.5 +08:45;37.5 +09:00;37.5 +09:15;50 +09:30;50 +09:45;50 +10:00;50 +10:15;50 +10:30;50 +10:45;50 +11:00;50 +11:15;50 +11:30;50 +11:45;50 +12:00;50 +12:15;62.5 +12:30;62.5 +12:45;62.5 +13:00;62.5 +13:15;50 +13:30;50 +13:45;50 +14:00;50 +14:15;50 +14:30;50 +14:45;50 +15:00;50 +15:15;37.5 +15:30;37.5 +15:45;37.5 +16:00;37.5 +16:15;25 +16:30;25 +16:45;25 +17:00;25 +17:15;12.5 +17:30;12.5 +17:45;12.5 +18:00;12.5 +18:15;12.5 +18:30;12.5 +18:45;12.5 +19:00;12.5 +19:15;0 +19:30;0 +19:45;0 +20:00;0 +20:15;0 +20:30;0 +20:45;0 +21:00;0 +21:15;0 +21:30;0 +21:45;0 +22:00;0 +22:15;0 +22:30;0 +22:45;0 +23:00;0 +23:15;0 +23:30;0 +23:45;0 diff --git a/tests/test_read_user_profile.py b/tests/test_read_user_profile.py index 0d79d95b..827631d0 100644 --- a/tests/test_read_user_profile.py +++ b/tests/test_read_user_profile.py @@ -15,15 +15,19 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . """ + +import pathlib import pytest from pendulum import datetime, duration, today -from gsy_framework.constants_limits import TIME_ZONE, GlobalConfig +from gsy_framework.constants_limits import ( + GlobalConfig, + PROFILE_EXPANSION_DAYS, + ConstSettings, + TIME_ZONE, +) from gsy_framework.enums import ConfigurationType -from gsy_framework.read_user_profile import ( - InputProfileTypes, _fill_gaps_in_profile, _generate_slot_based_zero_values_dict_from_profile, - _interpolate_profile_values_to_slot, read_arbitrary_profile, read_profile_without_config, - resample_hourly_energy_profile) +from gsy_framework.read_user_profile import InputProfileTypes, UserProfileReader from gsy_framework.unit_test_utils import assert_dicts_identical from gsy_framework.utils import convert_str_to_pendulum_in_dict @@ -37,16 +41,27 @@ def _setup(value): nonlocal original_value original_value = GlobalConfig.CONFIG_TYPE GlobalConfig.CONFIG_TYPE = ( - ConfigurationType.CANARY_NETWORK.value - if value else ConfigurationType.SIMULATION.value) + ConfigurationType.CANARY_NETWORK.value if value else ConfigurationType.SIMULATION.value + ) yield _setup GlobalConfig.CONFIG_TYPE = original_value +# pylint: disable=protected-access class TestReadUserProfile: """Test reading the user profiles.""" + def setup_method(self): + # pylint: disable=attribute-defined-outside-init + self._profile = UserProfileReader() + self.original_config_type = GlobalConfig.CONFIG_TYPE + self.original_sim_duration = GlobalConfig.sim_duration + + def teardown_method(self): + GlobalConfig.CONFIG_TYPE = self.original_config_type + GlobalConfig.sim_duration = self.original_sim_duration + @staticmethod def _validate_timedeltas_are_followed(profile): timestamps_list = list(profile.keys()) @@ -60,24 +75,27 @@ def _validate_timedeltas_are_followed(profile): def test_generate_slot_based_zero_values_dict_from_profile(self): profile_dict = { datetime(2021, 2, 12, 0, 12, 10): 1234.0, - datetime(2021, 2, 14, 23, 55, 10): 1234.0 + datetime(2021, 2, 14, 23, 55, 10): 1234.0, } - zero_val_dict = _generate_slot_based_zero_values_dict_from_profile(profile_dict) + zero_val_dict = self._profile._generate_slot_based_zero_values_dict_from_profile( + profile_dict + ) assert all(v == 0.0 for v in zero_val_dict.values()) self._validate_timedeltas_are_followed(zero_val_dict) def test_fill_gaps_in_profile(self): profile_dict = { datetime(2021, 2, 12, 0, 0, 0): 1234.0, - datetime(2021, 2, 14, 23, 55, 10): 1234.0 + datetime(2021, 2, 14, 23, 55, 10): 1234.0, } - zero_val_dict = _generate_slot_based_zero_values_dict_from_profile(profile_dict) - filled_profile = _fill_gaps_in_profile(profile_dict, zero_val_dict) + zero_val_dict = self._profile._generate_slot_based_zero_values_dict_from_profile( + profile_dict + ) + filled_profile = self._profile._fill_gaps_in_profile(profile_dict, zero_val_dict) assert all(v == 1234.0 for v in filled_profile.values()) self._validate_timedeltas_are_followed(filled_profile) - @staticmethod - def test_interpolate_profile_values_to_slot(): + def test_interpolate_profile_values_to_slot(self): profile_dict = { datetime(2021, 2, 12, 0, 0, 0): 150.0, datetime(2021, 2, 12, 0, 5, 0): 100.0, @@ -85,11 +103,12 @@ def test_interpolate_profile_values_to_slot(): datetime(2021, 2, 12, 0, 15, 0): 500.0, datetime(2021, 2, 12, 0, 20, 0): 700.0, datetime(2021, 2, 12, 0, 25, 0): 600.0, - datetime(2021, 2, 12, 0, 30, 0): 100.0 + datetime(2021, 2, 12, 0, 30, 0): 100.0, } - interp_profile, slot_times = _interpolate_profile_values_to_slot( - profile_dict, duration(minutes=15)) + interp_profile, slot_times = self._profile._interpolate_profile_values_to_slot( + profile_dict, duration(minutes=15) + ) assert len(interp_profile) == 3 assert slot_times[0] == datetime(2021, 2, 12, 0, 0, 0).timestamp() assert slot_times[1] == datetime(2021, 2, 12, 0, 15, 0).timestamp() @@ -98,8 +117,7 @@ def test_interpolate_profile_values_to_slot(): assert interp_profile[1] == 0.5 assert interp_profile[2] == 0.1 - @staticmethod - def test_read_profile_for_player(): + def test_read_profile_for_player(self): profile_dict = { datetime(2021, 2, 12, 0, 0, 0): 150.0, datetime(2021, 2, 12, 0, 5, 0): 100.0, @@ -107,52 +125,59 @@ def test_read_profile_for_player(): datetime(2021, 2, 12, 0, 15, 0): 500.0, datetime(2021, 2, 12, 0, 20, 0): 700.0, datetime(2021, 2, 12, 0, 25, 0): 600.0, - datetime(2021, 2, 12, 0, 30, 0): 100.0 + datetime(2021, 2, 12, 0, 30, 0): 100.0, } - return_dict = read_profile_without_config(profile_dict) + return_dict = self._profile.read_profile_without_config(profile_dict) assert len(return_dict.keys()) == 3 - assert_dicts_identical(return_dict, { - datetime(2021, 2, 12, 0, 0, 0): 0.15, - datetime(2021, 2, 12, 0, 15, 0): 0.5, - datetime(2021, 2, 12, 0, 30, 0): 0.1 - }) + assert_dicts_identical( + return_dict, + { + datetime(2021, 2, 12, 0, 0, 0): 0.15, + datetime(2021, 2, 12, 0, 15, 0): 0.5, + datetime(2021, 2, 12, 0, 30, 0): 0.1, + }, + ) - @staticmethod def test_read_arbitrary_profile_returns_correct_profile_in_canary_network( - set_is_canary_network): + self, set_is_canary_network + ): set_is_canary_network(True) expected_last_time_slot = today(tz=TIME_ZONE) - mmr = read_arbitrary_profile(InputProfileTypes.IDENTITY, 30) + mmr = self._profile.read_arbitrary_profile(InputProfileTypes.IDENTITY, 30) assert list(mmr.keys())[-1] == expected_last_time_slot - @staticmethod - def test_read_arbitrary_profile_returns_correct_profile(set_is_canary_network): + def test_read_arbitrary_profile_returns_correct_profile(self, set_is_canary_network): set_is_canary_network(False) market_maker_rate = 30 GlobalConfig.FUTURE_MARKET_DURATION_HOURS = 0 GlobalConfig.sim_duration = duration(hours=3) - mmr = read_arbitrary_profile(InputProfileTypes.IDENTITY, market_maker_rate) + mmr = self._profile.read_arbitrary_profile(InputProfileTypes.IDENTITY, market_maker_rate) assert (list(mmr.keys())[-1] - today(tz=TIME_ZONE)).days == 0 GlobalConfig.sim_duration = duration(hours=36) - mmr = read_arbitrary_profile(InputProfileTypes.IDENTITY, market_maker_rate) + mmr = self._profile.read_arbitrary_profile(InputProfileTypes.IDENTITY, market_maker_rate) assert (list(mmr.keys())[-1] - today(tz=TIME_ZONE)).days == 1 GlobalConfig.FUTURE_MARKET_DURATION_HOURS = 24 GlobalConfig.sim_duration = duration(hours=1) - mmr = read_arbitrary_profile(InputProfileTypes.IDENTITY, market_maker_rate) + mmr = self._profile.read_arbitrary_profile(InputProfileTypes.IDENTITY, market_maker_rate) time_diff = list(mmr.keys())[-1] - today(tz=TIME_ZONE) assert time_diff.minutes == 45 - @staticmethod - def test_resample_energy_profile_performs_correctly_for_lower_resolutions(): - input_profile = {"2021-01-25T00:00": 0.1, "2021-01-25T01:00": 0.1, "2021-01-25T02:00": 0.1, - "2021-01-25T03:00": 0.1, "2021-01-25T04:00": 0.1} - result_profile = resample_hourly_energy_profile( + def test_resample_energy_profile_performs_correctly_for_lower_resolutions(self): + input_profile = { + "2021-01-25T00:00": 0.1, + "2021-01-25T01:00": 0.1, + "2021-01-25T02:00": 0.1, + "2021-01-25T03:00": 0.1, + "2021-01-25T04:00": 0.1, + } + result_profile = self._profile.resample_hourly_energy_profile( convert_str_to_pendulum_in_dict(input_profile), duration(minutes=15), duration(hours=4), - datetime(2021, 1, 25, 0, 0)) + datetime(2021, 1, 25, 0, 0), + ) assert len(result_profile) == 16 first_time_stamp = next(iter(result_profile)) last_time_stamp = next(reversed(result_profile)) @@ -160,40 +185,94 @@ def test_resample_energy_profile_performs_correctly_for_lower_resolutions(): assert last_time_stamp == datetime(2021, 1, 25, 3, 45) assert all(value == 0.025 for value in result_profile.values()) - @staticmethod - def test_resample_energy_profile_performs_correctly_for_higher_resolutions(): - input_profile = {"2021-01-25T00:00": 0.1, "2021-01-25T01:00": 0.1, "2021-01-25T02:00": 0.1, - "2021-01-25T03:00": 0.1, "2021-01-25T04:00": 0.1, "2021-01-25T05:00": 0.1, - "2021-01-25T06:00": 0.1} - result_profile = resample_hourly_energy_profile( + def test_resample_energy_profile_performs_correctly_for_higher_resolutions(self): + input_profile = { + "2021-01-25T00:00": 0.1, + "2021-01-25T01:00": 0.1, + "2021-01-25T02:00": 0.1, + "2021-01-25T03:00": 0.1, + "2021-01-25T04:00": 0.1, + "2021-01-25T05:00": 0.1, + "2021-01-25T06:00": 0.1, + } + result_profile = self._profile.resample_hourly_energy_profile( convert_str_to_pendulum_in_dict(input_profile), duration(hours=2), duration(hours=6), - datetime(2021, 1, 25, 0, 0)) - assert result_profile == {datetime(2021, 1, 25, 0, 0, 0): 0.2, - datetime(2021, 1, 25, 2, 0, 0): 0.2, - datetime(2021, 1, 25, 4, 0, 0): 0.2} + datetime(2021, 1, 25, 0, 0), + ) + assert result_profile == { + datetime(2021, 1, 25, 0, 0, 0): 0.2, + datetime(2021, 1, 25, 2, 0, 0): 0.2, + datetime(2021, 1, 25, 4, 0, 0): 0.2, + } - @staticmethod - def test_resample_energy_profile_performs_correctly_for_equal_resolutions(): - input_profile = {"2021-01-25T00:00": 0.1, "2021-01-25T01:00": 0.1, "2021-01-25T02:00": 0.1, - "2021-01-25T03:00": 0.1, "2021-01-25T04:00": 0.1} + def test_resample_energy_profile_performs_correctly_for_equal_resolutions(self): + input_profile = { + "2021-01-25T00:00": 0.1, + "2021-01-25T01:00": 0.1, + "2021-01-25T02:00": 0.1, + "2021-01-25T03:00": 0.1, + "2021-01-25T04:00": 0.1, + } input_profile = convert_str_to_pendulum_in_dict(input_profile) - result_profile = resample_hourly_energy_profile(input_profile, - duration(minutes=60), - duration(hours=4), - datetime(2021, 1, 25, 0, 0)) + result_profile = self._profile.resample_hourly_energy_profile( + input_profile, duration(minutes=60), duration(hours=4), datetime(2021, 1, 25, 0, 0) + ) assert result_profile == input_profile - @staticmethod - def test_read_arbitrary_profile_returns_early_for_empty_profiles(): + def test_read_arbitrary_profile_returns_early_for_empty_profiles(self): original_slot_length = GlobalConfig.slot_length original_sim_duration = GlobalConfig.sim_duration GlobalConfig.slot_length = duration(hours=1) GlobalConfig.sim_duration = duration(hours=4) - assert read_arbitrary_profile(InputProfileTypes.POWER_W, {}) == {} - assert read_arbitrary_profile(InputProfileTypes.POWER_W, None) == {} - assert len(read_arbitrary_profile(InputProfileTypes.POWER_W, 0)) == 4 - assert set(read_arbitrary_profile(InputProfileTypes.POWER_W, 0).values()) == {0} + assert self._profile.read_arbitrary_profile(InputProfileTypes.POWER_W, {}) == {} + assert self._profile.read_arbitrary_profile(InputProfileTypes.POWER_W, None) == {} + assert len(self._profile.read_arbitrary_profile(InputProfileTypes.POWER_W, 0)) == 4 + assert set( + self._profile.read_arbitrary_profile(InputProfileTypes.POWER_W, 0).values() + ) == {0} GlobalConfig.slot_length = original_slot_length GlobalConfig.sim_duration = original_sim_duration + + @staticmethod + def test_copy_profile_to_multiple_days_correctly_expands_for_non_CNs(): + # GlobalConfig.CONFIG_TYPE = ConfigurationType.CANARY_NETWORK.value + GlobalConfig.sim_duration = duration(days=10) + profile_path = pathlib.Path("tests/static/Solar_Curve_W_cloudy.csv") + in_profile = UserProfileReader()._read_from_different_sources_todict(profile_path) + out_profile = UserProfileReader()._copy_profile_to_multiple_days(in_profile) + daytime_dict = dict( + (UserProfileReader()._hour_time_str(time.hour, time.minute), time) + for time in in_profile.keys() + ) + assert ( + len(in_profile) * PROFILE_EXPANSION_DAYS + + ConstSettings.FutureMarketSettings.FUTURE_MARKET_DURATION_HOURS * 4 + ) == len(out_profile) + for time, out_value in out_profile.items(): + assert ( + out_value + == in_profile[ + daytime_dict[UserProfileReader()._hour_time_str(time.hour, time.minute)] + ] + ) + + @staticmethod + def test_copy_profile_to_multiple_days_correctly_expands_for_CNs(): + GlobalConfig.CONFIG_TYPE = ConfigurationType.CANARY_NETWORK.value + profile_path = pathlib.Path("tests/static/Solar_Curve_W_cloudy.csv") + in_profile = UserProfileReader()._read_from_different_sources_todict(profile_path) + out_profile = UserProfileReader()._copy_profile_to_multiple_days(in_profile) + daytime_dict = dict( + (UserProfileReader()._hour_time_str(time.hour, time.minute), time) + for time in in_profile.keys() + ) + assert len(out_profile) == 1 + for time, out_value in out_profile.items(): + assert ( + out_value + == in_profile[ + daytime_dict[UserProfileReader()._hour_time_str(time.hour, time.minute)] + ] + )