diff --git a/python/socha/api/networking/utils.py b/python/socha/api/networking/utils.py index 84d3323..6e462d1 100644 --- a/python/socha/api/networking/utils.py +++ b/python/socha/api/networking/utils.py @@ -1,122 +1,115 @@ -from ast import List import logging +from functools import partial +from collections import defaultdict +from typing import List, Callable, Union from socha import _socha -from socha._socha import Field, FieldType, Move, TeamEnum, CubeCoordinates, GameState +from socha._socha import Field, FieldType, Move, TeamEnum, CubeCoordinates, GameState, CubeDirection from socha.api.protocol.protocol import Acceleration, Actions, Advance, Push, Turn, Board, Data, Water, Sandbank, Island, Passenger, Goal from python.socha.api.protocol.protocol import Room - -def _convert_field(field) -> _socha.Field: - if isinstance(field, Water): - return Field(FieldType.Water, None) - elif isinstance(field, Sandbank): - return Field(FieldType.Sandbank, None) - elif isinstance(field, Island): - return Field(FieldType.Island, None) - elif isinstance(field, Passenger): - return Field(FieldType.Passenger, _socha.Passenger(direction_from_string(field.direction), field.passenger)) - elif isinstance(field, Goal): - return Field(FieldType.Goal, None) - - -def _convert_field_array(field_array) -> List[_socha.Field]: - return [_convert_field(field) for field in field_array.field] - - -def _convert_segment(segment) -> _socha.Segment: - con_fields = [_convert_field_array(field_array) - for field_array in segment.field_array] +# Global Dictionaries for Mapping +field_type_mapping = { + Water: FieldType.Water, + Sandbank: FieldType.Sandbank, + Island: FieldType.Island, + Passenger: FieldType.Passenger, + Goal: FieldType.Goal +} + +cube_direction_mapping = { + "RIGHT": CubeDirection.Right, + "DOWN_RIGHT": CubeDirection.DownRight, + "DOWN_LEFT": CubeDirection.DownLeft, + "LEFT": CubeDirection.Left, + "UP_LEFT": CubeDirection.UpLeft, + "UP_RIGHT": CubeDirection.UpRight +} + +reverse_cube_direction_mapping = {v: k for k, v in cube_direction_mapping.items()} + +def handle_conversion_error(err_type, value): + logging.error(f"Conversion error for {err_type}: {value}") + raise ValueError(f"Invalid {err_type}: {value}") + +# Conversion Functions +def convert_field(field) -> _socha.Field: + field_type = type(field) + if field_type in field_type_mapping: + if field_type is Passenger: + return Field(field_type_mapping[field_type], _socha.Passenger(direction_from_string(field.direction), field.passenger)) + return Field(field_type_mapping[field_type], None) + handle_conversion_error("field", field) + +def convert_field_array(field_array) -> List[_socha.Field]: + return [convert_field(field) for field in field_array.field] + +def convert_segment(segment) -> _socha.Segment: + con_fields = [convert_field_array(field_array) for field_array in segment.field_array] con_center = _socha.CubeCoordinates(q=segment.center.q, r=segment.center.r) return _socha.Segment(direction=direction_from_string(segment.direction), center=con_center, fields=con_fields) - -def _convert_board(protocol_board: Board) -> _socha.Board: - con_segments = [_convert_segment(segment) - for segment in protocol_board.segment] - return _socha.Board( - segments=con_segments, - next_direction=direction_from_string(protocol_board.next_direction) - ) - +def convert_board(protocol_board: Board) -> _socha.Board: + con_segments = [convert_segment(segment) for segment in protocol_board.segment] + return _socha.Board(segments=con_segments, next_direction=direction_from_string(protocol_board.next_direction)) def direction_from_string(cube_direction: str) -> _socha.CubeDirection: - if cube_direction == "RIGHT": - return _socha.CubeDirection.Right - elif cube_direction == "DOWN_RIGHT": - return _socha.CubeDirection.DownRight - elif cube_direction == "DOWN_LEFT": - return _socha.CubeDirection.DownLeft - elif cube_direction == "LEFT": - return _socha.CubeDirection.Left - elif cube_direction == "UP_LEFT": - return _socha.CubeDirection.UpLeft - elif cube_direction == "UP_RIGHT": - return _socha.CubeDirection.UpRight - raise ValueError("Invalid cube direction") - + return cube_direction_mapping.get(cube_direction) or handle_conversion_error("cube direction", cube_direction) def direction_to_string(cube_direction: _socha.CubeDirection) -> str: - if cube_direction == _socha.CubeDirection.Right: - return "RIGHT" - elif cube_direction == _socha.CubeDirection.DownRight: - return "DOWN_RIGHT" - elif cube_direction == _socha.CubeDirection.DownLeft: - return "DOWN_LEFT" - elif cube_direction == _socha.CubeDirection.Left: - return "LEFT" - elif cube_direction == _socha.CubeDirection.UpLeft: - return "UP_LEFT" - elif cube_direction == _socha.CubeDirection.UpRight: - return "UP_RIGHT" - raise ValueError("Invalid cube direction") - + return reverse_cube_direction_mapping.get(cube_direction) or handle_conversion_error("cube direction", cube_direction) + +# Action Handling +def handle_action(action, action_type: Callable, field: str): + return action_type(**{field: getattr(action, field)}) + +action_handlers = { + Acceleration: partial(handle_action, action_type=_socha.Accelerate, field='acc'), + Advance: partial(handle_action, action_type=_socha.Advance, field='distance'), + Push: partial(handle_action, action_type=_socha.Push, field='direction'), + Turn: partial(handle_action, action_type=_socha.Turn, field='direction') +} + +def protocol_to_socha_action(action): + action_handler = action_handlers.get(type(action)) + if action_handler: + return action_handler(action) + logging.error(f"Unknown action type: {type(action)}") + raise ValueError(f"Unsupported action type: {type(action)}") def handle_move(move_response): actions = move_response.actions - protocol_actions = [Acceleration(acc=a.acc) if isinstance(a, _socha.Accelerate) - else Advance(distance=a.distance) if isinstance(a, _socha.Advance) - else Push(direction=direction_to_string(a.direction)) if isinstance(a, _socha.Push) - else Turn(direction=direction_to_string(a.direction)) for a in actions] + protocol_actions = [protocol_to_socha_action(a) for a in actions] return Data(class_value="move", actions=Actions(actions=protocol_actions)) - def _merge_advances(actions): - new_actions = [] - for i in range(len(actions) - 1): - if isinstance(actions[i], _socha.Advance) and isinstance(actions[i + 1], _socha.Advance): - actions[i + 1].distance += actions[i].distance - actions[i] = None - new_actions = [a for a in actions if a is not None] - return new_actions - + merged_actions = [] + for action in actions: + if merged_actions and isinstance(action, _socha.Advance) and isinstance(merged_actions[-1], _socha.Advance): + merged_actions[-1].distance += action.distance + else: + merged_actions.append(action) + return merged_actions def if_last_game_state(message: Room, last_game_state): try: - last_game_state.board = _convert_board( - message.data.class_binding.board) + last_game_state.board = convert_board(message.data.class_binding.board) actions = message.data.class_binding.last_move.actions.actions - new_actions = _merge_advances([_socha.Accelerate(acc=a.acc) if isinstance(a, Acceleration) - else _socha.Advance(distance=a.distance) if isinstance(a, Advance) - else _socha.Push(direction=direction_from_string(a.direction)) if isinstance(a, Push) - else _socha.Turn(direction=direction_from_string(a.direction)) for a in actions]) - + new_actions = _merge_advances([protocol_to_socha_action(a) for a in actions]) last_move = Move(actions=new_actions) return last_game_state.perform_move(last_move) except Exception as e: - logging.warning( - f"An error occurred: {e}.") + logging.warning(f"An error occurred: {e}.") return if_not_last_game_state(message) - def if_not_last_game_state(message: Room) -> GameState: ships = [ _socha.Ship(position=CubeCoordinates(q=ship.position.q, r=ship.position.r), team=TeamEnum.One if ship.team == "ONE" else TeamEnum.Two) for ship in message.data.class_binding.ship ] - return GameState(board=_convert_board(message.data.class_binding.board), + return GameState(board=convert_board(message.data.class_binding.board), turn=message.data.class_binding.turn, current_ship=ships[0], other_ship=ships[1],