Метка: recvfrom

Дурак по сети на Python: часть 3 – игра

В части 2 я привел способ обнаружения соперника в локальной сети. Как только двое игроков нашли друг друга, они могут начать игру, обмениваясь UDP пакетами напрямую, т.е. посылая их непосредственно по IP адресу и нужному порту.

Да, номера портов вы выбираете сами. Старайтесь выбирать порты так, чтобы они не перекрывались с существующими сервисами (список популярных портов). Порты 0-1023 системные, их вы не займете просто так без особых прав. Берите любые порты из диапазона 1024-49151, лучше ближе к концу, так меньше вероятности конфликтов.

Если для протокола обнаружения нам нужен был только один порт 37020, то для обмена сообщениями между клиентами я возьму 2 порта: 37020 и 37021. Почему два? Потому что мы с вами отлаживаем код обычно на одной машине. А двум программам слушать один и тот же порт на одной машине не получится, даже переиспользуя порт. Сообщения просто будут приходить только на один из клиентов во всех случаях, а второй не получит ничего. Следовательно делатем так: один клиент слушает 37020 и отправляет сообщения на 37021, а другой – наоборот слушает 37021 и отправляет сообщения на 37020. Такая схема будет работать и на одной машине, и на разных машинах.

Примечание. Эту проблему, вроде бы, можно решить путем мультикаст-групп, но это выходит за рамки статьи: нам гораздо проще просто взять два разных порта.

Потоки данных между игроками

Итак, костяк нашей игры будет лежать в модуле main.py:

from render import ConsoleRenderer
from net_game import DurakNetGame
from discovery_protocol import DiscoveryProtocol
from util import rand_id

PORT_NO = 37020
PORT_NO_AUX = 37021

def main():
    my_pid = rand_id()  # создадим себе ID случайно

    discovery = DiscoveryProtocol(my_pid, port_no=PORT_NO)
    print('Сканирую локальную сеть...')
    (remote_addr, _port), remote_pid = discovery.run()
    del discovery

    renderer = ConsoleRenderer()
    game = DurakNetGame(renderer, my_pid, remote_pid, remote_addr, [PORT_NO, PORT_NO_AUX])
    game.start()


if __name__ == '__main__':
    main()

Класс DiscoveryProtocol был описан в части 2, а класс ConsoleRenderer аж в первой части. Мы же сейчас обратимся к классу DurakNetGame, код которого я поместил в файл net_game.py.

Класс требует ID локального игрока, а также только что найденные ID удаленного игрока и его IP адрес в сети. Еще ему нужна пара номеров портов для коммуникации.

Так как у каждого есть свой ID и это просто числа, то мы выбираем того, кто ходит первый просто сравнивая эти числа: у кого меньше, тот и первый, а другой про себя решит точно также, что он второй. Аналогично идет выбор портов. Если я первый (мой ID меньше твоего), то слушаю, допустим, порт 37020. Другой клиент также совершенно однозначно решит, слушать порт 37021, сравнив свой ID с моим и поняв, что он больше. Вот такой элементарный алгоритм консенсуса. Так инициализируется класс сетевой игры:

class DurakNetGame:
    def __init__(self, renderer: GameRenderer, my_id, remote_id, remote_addr, ports):
        self._renderer = renderer

        self._game = DurakSerialized()  # геймплей

        self._my_id = int(my_id)
        self._remote_id = int(remote_id)
        self._remote_addr = remote_addr

        # проверка на адекватность ID
        assert self._my_id != 0 and self._remote_id != 0 and self._my_id != self._remote_id

        # кто ходит первый выбираем просто сравнивая ID (они же случайные)!
        me_first = self._my_id < self._remote_id
        # мой индекс 0 если я первый, и 1 иначе. у соперника наоборот
        self._my_index = 0 if me_first else 1

        # две сетевых примочки на разны портах
        network1 = Networking(port_no=ports[0])
        network2 = Networking(port_no=ports[1])

        # кто слушает какой порт выбираем также на базе сравнения ID как чисел
        self._receiver = network1 if me_first else network2
        self._receiver.bind("")

        self._sender = network2 if me_first else network1

Запуск игры (метод start) инициализирует новую игру (если я первый ходящий), и отсылает ее другому игроку, чтобы синхронизировать их состояния.

    def _new_game(self):
        self._game = DurakSerialized()        # игрок с индексом 0 создает игру!
        self._send_game_state()        # и отсылает ее сопернику
        self._renderer.render_game(self._game, self._my_index)

Сериализация состояния игры происходит в классе DurakSerialized – наследнике класса Durak. Она очень простая, почти без ухищрений: просто собираем в словарь разные аспекты игры. Код сериализации находится в файле serialization.py. Пример состояния игры после сериализации:

{'trump': '♦', 'attacker_index': 0, 'deck': [('J', '♠'), ('10', '♦'), ('8', '♣'), ('10', '♠'), ('A', '♦'), ('6', '♣'), ('7', '♣'), ('6', '♠'), ('Q', '♥'), ('J', '♣'), ('K', '♣'), ('6', '♦'), ('9', '♣'), ('6', '♥'), ('7', '♦'), ('9', '♠'), ('A', '♣'), ('7', '♥'), ('10', '♣'), ('A', '♠'), ('J', '♦'), ('8', '♥'), ('A', '♥'), ('7', '♠')], 'winner': None, 'field': [(('8', '♠'), None), (('8', '♦'), ('Q', '♦'))], 'players': [{'index': 0, 'cards': [('9', '♥'), ('J', '♥'), ('Q', '♠'), ('Q', '♣')]}, {'index': 1, 'cards': [('9', '♦'), ('10', '♥'), ('K', '♠'), ('K', '♥'), ('K', '♦')]}]}

Возвращаясь к запуску сетевой игры, он подразумевает запуск двух вещей: потока чтения и цикла-обработчика ввода пользователя.

    def start(self):
        print(f'Мой ID #{self._my_id}, мой индекс {self._my_index}')
        print(f'Удаленный адрес {self._remote_addr}')
        self._renderer.help()

        if self._my_index == 0:
            # игрок с индексом 0 создает игру!
            self._new_game()

        self._receiver.run_reader_thread(self._on_remote_message)
        self._game_loop()

Первая часть – отдельный поток, который слушает порт на прием. На этот порт приходят сообщения от клиента соперника, содержащие закодированное состояние игры, если оно изменится в результате действий и ходов соперника. Ведь игра в дурака – синхронная: пока я отбиваюсь, ты можешь подкидывать мне еще карт того же достоинства, что уже лежат на столе. В networking.py определен метод запуска потока чтения, который вызывает стороннюю функцию-обработчик callback.

 def run_reader_thread(self, callback):
        """
        Запускает отдельный поток, чтобы получать данные из сокета
        :param callback: функция, которая вызывается, если получены данные
        """
        def reader_job():
            while True:
                data, _ = self.recv_json()
                if data:
                    callback(data)
        # daemon=True, чтобы не зависал, если выйдет основной поток
        threading.Thread(target=reader_job, daemon=True).start()

Обработка сообщений от соперника идет в DurakNetGame и включают в себя обновление состояния игры локально и его отрисовку в терминале. Соперник еще может прислать сообщение, что он покидает игру, тогда мы тоже выйдем из программы.

    def _on_remote_message(self, data):
        action = data['action']
        if action == 'state':
            self._game = DurakSerialized(data['state'])  # обновить остояние
            print('Пришел ход от соперника!')
            self._renderer.render_game(self._game, self._my_index)
        elif action == 'quit':
            print('Соперник вышел!')
            exit(0)

Вторая ключевая вещь игры – это бесконечный цикл, который читает пользовательский ввод с клавиатуры и обрабатывает его. Если я походил, отбился или еще что-то сделал, тогда я обновляю состояние игры у себя и отсылаю его копию другому игроку, чтобы тот отобразил изменения у себя на экране.

    def _game_loop(self):
        while True:
            q = input('Ваш ход (q = выход)? ')

            parts = q.lower().split(' ')
            if not parts:
                continue

            command = parts[0]

            good_move = False  # флаг, удачный ли был ход после ввода команды
            g = self._game
            try:
                if command == 'f':
                    good_move = self._handle_finish()
                elif command == 'a' and g.attacker_index == self._my_index:
                    good_move = self._handle_attack(parts)
                elif command == 'd' and g.attacker_index != self._my_index:
                    good_move = self._handle_defence(parts)
                elif command == 'q':
                    print('Вы вышли из игры!')
                    self._send_quit()
                    break
                else:
                    print('Неизвестная команда.')
            except IndexError:
                print('ОШИБКА! Неверный выбор карты')
            except ValueError:
                print('Введите число через пробел после команды')

            if good_move:
                # если ход удачный, пошлем копию состояния игры 
                self._send_game_state()
                self._renderer.render_game(g, self._my_index)

                # проверка на окончание игры (если есть победитель, в g.winner - этог индекс)
                if g.winner is not None:
                    outcome = 'Вы победили!' if g.winner == self._my_index else 'Вы проиграли!'
                    print(f'Игра окончена! {outcome}')
                    break

Как и в случае локальной игры из первой части, мы читает строку с клавиатуры, где первый символ команда, а второй через пробел – опциональный аргумент. Команды будут такие:

  • f – завершить ход (сказав «Бито!» или забрав карты, зависит от того, кто вызвал команду)
  • a 2 – атакующий ход (аргумент – номер карты, с которой ходить или подбросить)
  • d 1 – защитный ход (аргумент – номер карты, которой отбиться). Если выбор карты для покрытия неоднозначен, то спросим пользователя о том, какую именно карту следует отбить.
  • q – выход из игры, уведомив соперника, чтобы тот тоже завершил цикл и вышел из программы.
    def _game_loop(self):
        while True:
            q = input('Ваш ход (q = выход)? ')

            parts = q.lower().split(' ')
            if not parts:
                continue

            command = parts[0]

            good_move = False  # флаг, удачный ли был ход после ввода команды
            g = self._game
            try:
                if command == 'f':
                    good_move = self._handle_finish()
                elif command == 'a' and g.attacker_index == self._my_index:
                    good_move = self._handle_attack(parts)
                elif command == 'd' and g.attacker_index != self._my_index:
                    good_move = self._handle_defence(parts)
                elif command == 'q':
                    print('Вы вышли из игры!')
                    self._send_quit()
                    break
                else:
                    print('Неизвестная команда.')
            except IndexError:
                print('ОШИБКА! Неверный выбор карты')
            except ValueError:
                print('Введите число через пробел после команды')

            if good_move:
                # если ход удачный, пошлем копию состояния игры
                self._send_game_state()
                self._renderer.render_game(g, self._my_index)

                # проверка на окончание игры (если есть победитель, в g.winner - этог индекс)
                if g.winner is not None:
                    outcome = 'Вы победили!' if g.winner == self._my_index else 'Вы проиграли!'
                    print(f'Игра окончена! {outcome}')
                    break

Если в результате ввода состоялся удачный ход (по правилам игры, не было исключений), то состояние игры локально обновиться. А мы будем обязаны уведомить об этом другую сторону. Код отправки сообщений:

    def _send_game_state(self):
        self._sender.send_json({
            'action': 'state',
            'state': self._game.serialized()
        }, self._remote_addr)

    def _send_quit(self):
        self._sender.send_json({
            'action': 'quit'
        }, self._remote_addr)

Если в игре будет обнаружен победитель, то оба игрока выходят из цикла. Переходим к обработчикам каждой команды.

Атака. Атаковать, как понятно, можно только в свой ход. Возвращает True, если атака успешна, иными словами, если игрок сходил с одной из правильных карт (нельзя подкинуть десятку, если на столе только, например, семерки, валеты и дамы).

    def _handle_attack(self, parts):
        g = self._game
        index = int(parts[1]) - 1
        card = g.current_player.cards[index]
        if not g.attack(card):
            print('ОШИБКА! Вы не можете ходить этой картой!')
            return False
        else:
            return True

Защита возможна только в ход соперника. Защищающийся выбирает карту, игра ищет варианты, которыми он может отбиться именно этой картой. Если вариантов несколько, будет спрошено, что он конкретно имеет в виду. Возвращает True, если успешно отбился одной картой, но это еще не значит, что ход завершен. Вот код метода:

    def _handle_defence(self, parts):
        g = self._game
        index = int(parts[1]) - 1
        new_card = g.opponent_player.cards[index]
        if g.field:
            variants = g.defend_variants(new_card)

            print(f'variants {variants} - {new_card}')
            if len(variants) == 1:
                def_index = variants[0]
            elif len(variants) >= 2:
                max_pos = len(g.field)
                def_index = int(input(f'Какую позицию отбить {new_card} (1-{max_pos})? ')) - 1
            else:
                print('Вам придется взять карты!')
                return False

            old_card = list(g.field.keys())[def_index]
            if not g.defend(old_card, new_card):
                print('ОШИБКА! Нельзя так отбиться!')
            else:
                return True
        else:
            print('Пока нечего отбивать!')
            return False

Завершение хода. Если защищающийся игрок не отбил все карты, то завершение хода вынудит его забрать со стола все карты себе в руку. Ход не переходит. Если же он успешно отбился, то завершение хода – это «Бито!», ход перейдет, а все карты со стола уйдут из игры в стопку «бито».

    def _handle_finish(self, my_turn):
        g = self._game
        if g.field:
            if my_turn and g.any_unbeaten_cards:
                print('Не можете вынудить соперника взять карты!')
                return False
            elif not my_turn and not g.any_unbeaten_cards:
                print('Только атакующий может сказать "Бито!"')
                return False
            else:
                r = g.finish_turn()
                if r == g.GAME_OVER:
                    r_str = 'игра окончена!'
                elif r == g.TOOK_CARDS:
                    r_str = 'взяли карты.'
                else:
                    r_str = 'бито.'
                print(f'Ход завершен: {r_str}')
                return True
        else:
            print('Пока ход не сделал, чтобы его завершить!')
            return False

Вот и весь код игры! Теперь можно запустить пару клиентов игры на одном или разных компьютерах и играть в дурака!

Напомню, что весь код в репозитории на Github. Ставьте звездочки, мне будет очень приятно.

Надеюсь, вам понравилось. Может быть прикрутим к игре графический интерфейс и нормальное управление?

Предыдущие части:

Специально для канала @pyway. Подписывайтесь на мой канал в Телеграм @pyway 👈 

Дурак по сети на Python: часть 2 – обнаружение

Надеюсь, вы уже ознакомились с частью 1.

Мем про TCP и UDP

В этой части мы начнем реализовывать сетевые взаимодействия. Обычно в статьях по сетевому программированию нам предлагают использовать клиент-серверную модель по протоколу TCP. Тут кроется пара неудобств. Во-первых, разделение код на клиентский и серверный. Во-вторых, необходимость клиентам узнавать адрес сервера. В масштабах локальной WiFi сети для небольших игрушек – лишняя трата времени и неудобства. Почему бы нам не позволить клиентам самим находить друг друга? Это не так сложно.

Начнем с того, что у каждой машины в сети есть свой IP адрес из четырех чисел 0-255. В локальной сети обычно (но не всегда) адреса имеют вид 192.168.1.X, где X – разный для разных устройств в сети.

Подключенные устройства в вашем роутере.
Подключенные устройства в вашем роутере.

Один из вариантов, который я нашел в сети, предлагал нам просканировать диапазон адресов 192.168.1.1 — 192.168.1.254 и попытаться подключиться к каждому из них. Это вариант меня не устроил, потому что такой брут-форс выполняется долго, да и вообще метод топорный. В моем методе не придется узнавать даже свой IP.

Будем работать по протоколу UPD, обмениваясь датаграммами (короткими сообщениями). Это простой протокол. UDP отличается от TCP тем, что не требует устанавливать соединение, однако в UDP нет гарантий доставки сообщений (получатель не отправляет отправителю подтверждение получения данных), как следствие не гарантирован порядок получения сообщений.

Метафора UDP против TCP

Отправитель просто отправляет данные в сеть либо конкретной машине или на всю подсеть (broadcast), и будь, что будет. Кто-то может принять эти данные, либо они вообще могут потеряться. Чтобы различать разные прикладные приложения, используют номер порта (число до 65535). Потенциальный получатель просто начинает слушать свой порт, вдруг кто-то на него отправит данные.

Казалось бы, протокол UDP ненадежен, однако, UPD работает быстрее, чем TCP, так как не тратится время на подтверждения при обмене. UPD подходит неплохо для игр, стримминга, телефонии и тому подобного. А еще он отлично подойдет для наших целей обнаружения.

Я знаю отличную шутку про UDP, но боюсь, она до вас не дойдет!

С просторов Интернета…

Как только клиент игры запустится, он начнет переодически отправлять широковещательные UDP пакеты в сеть (с пометкой discovery), авось кто услышит. Но и сам начинает сразу после отправки слушать, не пришел ли ему ответ (5 секунд). Затем снова оправляет запрос.

В тоже время какой-то другой клиент сети, который уже ищет соперника, получает от него выше-указанное сообщение discovery и отвечает просьбой прекратить сканирование (stop_scan), после чего останавливает сканирование сети. Клиент получивший stop_scan проверяет, его ли идентификатор в нем указан. Если да, то он также останавливает сканирование.

Оба клиента теперь знают адреса друг друга и готовы начать обмениваться пакетами напрямую между собой уже в рамках игровой сессии. Задача обнаружения выполнена.

Мой протокол

Класс сети

Начнем писать код с класса сети Networking (по ссылке полный код класса). Он абстрагирует создание и настройку UDP сокета, обмен данными через него (кодирование и декодирование данных в JSON).

Импортирует стандартный модуль socket. Создание сокета:

import socket
...
class Networking:
    ...
    @classmethod
    def get_socket(cls, broadcast=False, timeout=TIME_OUT):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        # чтобы на одной машине можно было слушать тотже порт
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        if broadcast:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.settimeout(timeout)
        return sock

Отправку данных совершить очень просто. Кодируем данные в JSON, потом в байты, потом посылаем через сокет на указанный адрес и порт:

    def send_json(self, j, to):
        data = bytes(json.dumps(j), 'utf-8')
        return self._socket.sendto(data, (to, self.port_no))

Широковещательная отправка отличается только тем, что получатель будет to="<broadcast>":

  def send_json_broadcast(self, j):
        return self.send_json(j, "<broadcast>")

С прием все несколько хитрее. Сначала надо сделать bind (привязаться к порту для получения на него сообщений). Теперь сетевой стэк будет знать, что именно наша программа должна получать данные, пришедшие на компьютер именно на этот порт.

    def bind(self, to=""):
        """
        Привязаться к порту, то есть начать слушать с него сообщения
        После bind можно вызывать recv_json
        :param to: интерфейс ("" - любой)
        """
        self._socket.bind((to, self.port_no))

Затем мы вызываем на сокете recvfrom. Если есть данные, то декодируем JSON, а если не дождались (у меня стоит тайм-аут 1 секунду), то возвращаем None.

  def recv_json(self):
        try:
            # получить датаграмму и адрес из сокета
            data, addr = self._socket.recvfrom(self.BUFFER_SIZE)
            # декодируем в юникод и загружаем из JSON
            return json.loads(data.decode('utf-8', errors='ignore'), encoding='utf-8'), addr
        except json.JSONDecodeError:
            logging.error(f'JSONDecodeError!')
        except socket.timeout:
            pass  # ничего не пришло
        return None, None

Учтите! На наш сокет могут приходить также сообщения от чужих программ или от наших же клиентов игры, но в другом состоянии. А еще broadcast пакеты приходят также и обратно себе на клиент. Их надо фильтровать. Поэтому добавим метод, который несколько раз в течение определенного времени (5 секунд, допустим) будет получать из сокета данные и передавать их на проверку внешней функции predicate, которая вернет False, если это чужие данные и True, если данные подходят для текущего состояния игры. Сам метод recv_json_until вернет данные и адрес, с которого они пришли.

    def recv_json_until(self, predicate, timeout):
        t0 = time.monotonic()
        while time.monotonic() < t0 + timeout:
            data, addr = self.recv_json()
            if predicate(data):
                return data, addr
        return None, None

Discovery Protocol

Мы готовы реализовать протокол по обнаружению других клиентов, ждущих начала игры.

import random
import network
import logging

class DiscoveryProtocol:
    A_DISCOVERY = 'discovery'
    A_STOP_SCAN = 'stop_scan'

    def __init__(self, pid, port_no):
        assert pid
        self._my_pid = pid
        self._network = network.Networking(port_no, broadcast=True)
        self._network.bind()

Здесь pid (player ID) – уникальный идентификатор, чтобы отличаться от других игроков. Он создается случайно при запуске игры pid = random.getrandbits(64). Я не стал использовать IP адрес, потому что на одной машине может быть несколько запущенных клиентов (например, во время отладки). Думаю, большинство читателей первый раз будут пробовать запускать два клиента на одной машине, а не на разных.

Также мы создаем в конструкторе класс Networking, настраиваем его на широковещательную отправку и говорим ему также слушать порт.

Формат отправки сообщений будет в виде словаря с ключом action (тип действия). Например:

{
    "action": "discovery",
    "sender": 1234
}

Метод для посылки таких сообщений:

    def _send_action(self, action, data=None):
        data = data or {}
        self._network.send_json_broadcast({'action': action, 'sender': self._my_pid, **data})

Сам процесс сканирования: в бесконечном цикле рассылаем сообщение discovery, и сразу переходим в режим приема. 5 секунд ждем подходящее сообщение от других клиентов. Если оно пришло, то обрабатываем событие и прекращаем сканирование, выходя из цикла. При этом на сообщение discovery мы обязаны ответить stop_scan, чтобы удаленные клиент понял, что он нас нашел и тоже вышел из процесса сканирования.

    def run(self):
        while True:
            logging.info('Scanning...')
            # рассылаем всем сообщение A_DISCOVERY
            self._send_action(self.A_DISCOVERY)

            # ждем приемлемого ответа не более 5 секунд, игнорируя таймауты и неревалентные пакеты
            data, addr = self._network.recv_json_until(self._is_message_for_me, timeout=5.0)

            # если пришло что-то наше
            if data:
                action, sender = data['action'], data['sender']
                # кто-то нам отправил A_DISCOVERY
                if action == self.A_DISCOVERY:
                    # отсылаем ему сообщение остановить сканирование A_STOP_SCAN, указав его PID
                    self._send_action(self.A_STOP_SCAN, {'to_pid': sender})
                elif action == self.A_STOP_SCAN:
                    # если получили сообщение остановить сканирование, нужно выяснить нам ли оно предназначено
                    if data['to_pid'] != self._my_pid:
                        continue  # это не нам; игнорировать!
                return addr, sender

Как понять, что сообщение нужное? В словаре должен быть ключ "action", который принимает значения «discovery» или «stop_scan«, а еще требуем, чтобы pid отправителя был не наш (фильтруем свои же сообщения). Остальные сообщения игнорируются.

 def _is_message_for_me(self, d):
        return d and d.get('action') in [self.A_DISCOVERY, self.A_STOP_SCAN] and d.get('sender') != self._my_pid

Код для тестирования алгоритма обнаружения:

if __name__ == '__main__':
    print('Testing the discovery protocol.')
    pid = random.getrandbits(64)
    print('pid =', pid)
    info = DiscoveryProtocol(pid, 37020).run()
    print("success: ", info)

Полный код класса здесь discovery_protocol.py.

Запустите один клиент. Он будет висеть в состоянии сканирования сети. А теперь запустите второй клиент. Они сразу найдут друг друга:

Testing the discovery protocol.
pid = 8100514396826939414
success:  (('192.168.1.99', 37020), 5614644081426404292)

Примечание. Этот метод обнаружения будет работать, вероятно, только в пределах вашей локальной сети (одного роутера), потому что любой адекватный роутер на стороне провайдера будет резать широковещательные пакеты. Представляете, какой бы спам начался, если бы была возможность рассылать пакеты сразу всем устройствам, подключенным к Интернет в мире?

На этом все! В следующей части мы реализуем сам сетевой геймплей между клиентами, которые нашли друг друга по этому протоколу.

Специально для канала @pyway. Подписывайтесь на мой канал в Телеграм @pyway 👈