Дурак по сети на Python: часть 2 – обнаружение

Надеюсь, вы уже ознакомились с частью 1.

Мем про TCP и UDP

В этой части мы начнем реализовывать сетевые взаимодействия. Обычно в статьях по сетевому программированию нам предлагают использовать клиент-серверную модель по протоколу TCP. Тут кроется пара неудобств. Во-первых, разделение код на клиентский и серверный. Во-вторых, необходимость клиентам узнавать адрес сервера. В масштабах локальной WiFi сети для небольших игрушек – лишняя трата времени и неудобства. Почему бы нам не позволить клиентам самим находить друг друга? Это не так сложно.

Начнем с того, что у каждой машины в сети есть свой IP адрес из четырех чисел 0-255. В локальной сети обычно (но не всегда) адреса имеют вид 192.168.1.X, где X – разный для разных устройств в сети.

Подключенные устройства в вашем роутере.
Подключенные устройства в вашем роутере.

Один из вариантов, который я нашел в сети, предлагал нам просканировать диапазон адресов 192.168.1.1 — 192.168.1.254 и попытаться подключиться к каждому из них. Это вариант меня не устроил, потому что такой брут-форс выполняется долго, да и вообще метод топорный. В моем методе не придется узнавать даже свой IP.

Будем работать по протоколу UPD, обмениваясь датаграммами (короткими сообщениями). Это простой протокол. UDP отличается от TCP тем, что не требует устанавливать соединение, однако в UDP нет гарантий доставки сообщений (получатель не отправляет отправителю подтверждение получения данных), как следствие не гарантирован порядок получения сообщений.

Метафора UDP против TCP

Отправитель просто отправляет данные в сеть либо конкретной машине или на всю подсеть (broadcast), и будь, что будет. Кто-то может принять эти данные, либо они вообще могут потеряться. Чтобы различать разные прикладные приложения, используют номер порта (число до 65535). Потенциальный получатель просто начинает слушать свой порт, вдруг кто-то на него отправит данные.

Казалось бы, протокол UDP ненадежен, однако, UPD работает быстрее, чем TCP, так как не тратится время на подтверждения при обмене. UPD подходит неплохо для игр, стримминга, телефонии и тому подобного. А еще он отлично подойдет для наших целей обнаружения.

Я знаю отличную шутку про UDP, но боюсь, она до вас не дойдет!

С просторов Интернета…

Как только клиент игры запустится, он начнет переодически отправлять широковещательные UDP пакеты в сеть (с пометкой discovery), авось кто услышит. Но и сам начинает сразу после отправки слушать, не пришел ли ему ответ (5 секунд). Затем снова оправляет запрос.

В тоже время какой-то другой клиент сети, который уже ищет соперника, получает от него выше-указанное сообщение discovery и отвечает просьбой прекратить сканирование (stop_scan), после чего останавливает сканирование сети. Клиент получивший stop_scan проверяет, его ли идентификатор в нем указан. Если да, то он также останавливает сканирование.

Оба клиента теперь знают адреса друг друга и готовы начать обмениваться пакетами напрямую между собой уже в рамках игровой сессии. Задача обнаружения выполнена.

Мой протокол

Класс сети

Начнем писать код с класса сети Networking (по ссылке полный код класса). Он абстрагирует создание и настройку UDP сокета, обмен данными через него (кодирование и декодирование данных в JSON).

Импортирует стандартный модуль socket. Создание сокета:

import socket
...
class Networking:
    ...
    @classmethod
    def get_socket(cls, broadcast=False, timeout=TIME_OUT):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
        # чтобы на одной машине можно было слушать тотже порт
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        if broadcast:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
        sock.settimeout(timeout)
        return sock

Отправку данных совершить очень просто. Кодируем данные в JSON, потом в байты, потом посылаем через сокет на указанный адрес и порт:

    def send_json(self, j, to):
        data = bytes(json.dumps(j), 'utf-8')
        return self._socket.sendto(data, (to, self.port_no))

Широковещательная отправка отличается только тем, что получатель будет to="<broadcast>":

  def send_json_broadcast(self, j):
        return self.send_json(j, "<broadcast>")

С прием все несколько хитрее. Сначала надо сделать bind (привязаться к порту для получения на него сообщений). Теперь сетевой стэк будет знать, что именно наша программа должна получать данные, пришедшие на компьютер именно на этот порт.

    def bind(self, to=""):
        """
        Привязаться к порту, то есть начать слушать с него сообщения
        После bind можно вызывать recv_json
        :param to: интерфейс ("" - любой)
        """
        self._socket.bind((to, self.port_no))

Затем мы вызываем на сокете recvfrom. Если есть данные, то декодируем JSON, а если не дождались (у меня стоит тайм-аут 1 секунду), то возвращаем None.

  def recv_json(self):
        try:
            # получить датаграмму и адрес из сокета
            data, addr = self._socket.recvfrom(self.BUFFER_SIZE)
            # декодируем в юникод и загружаем из JSON
            return json.loads(data.decode('utf-8', errors='ignore'), encoding='utf-8'), addr
        except json.JSONDecodeError:
            logging.error(f'JSONDecodeError!')
        except socket.timeout:
            pass  # ничего не пришло
        return None, None

Учтите! На наш сокет могут приходить также сообщения от чужих программ или от наших же клиентов игры, но в другом состоянии. А еще broadcast пакеты приходят также и обратно себе на клиент. Их надо фильтровать. Поэтому добавим метод, который несколько раз в течение определенного времени (5 секунд, допустим) будет получать из сокета данные и передавать их на проверку внешней функции predicate, которая вернет False, если это чужие данные и True, если данные подходят для текущего состояния игры. Сам метод recv_json_until вернет данные и адрес, с которого они пришли.

    def recv_json_until(self, predicate, timeout):
        t0 = time.monotonic()
        while time.monotonic() < t0 + timeout:
            data, addr = self.recv_json()
            if predicate(data):
                return data, addr
        return None, None

Discovery Protocol

Мы готовы реализовать протокол по обнаружению других клиентов, ждущих начала игры.

import random
import network
import logging

class DiscoveryProtocol:
    A_DISCOVERY = 'discovery'
    A_STOP_SCAN = 'stop_scan'

    def __init__(self, pid, port_no):
        assert pid
        self._my_pid = pid
        self._network = network.Networking(port_no, broadcast=True)
        self._network.bind()

Здесь pid (player ID) – уникальный идентификатор, чтобы отличаться от других игроков. Он создается случайно при запуске игры pid = random.getrandbits(64). Я не стал использовать IP адрес, потому что на одной машине может быть несколько запущенных клиентов (например, во время отладки). Думаю, большинство читателей первый раз будут пробовать запускать два клиента на одной машине, а не на разных.

Также мы создаем в конструкторе класс Networking, настраиваем его на широковещательную отправку и говорим ему также слушать порт.

Формат отправки сообщений будет в виде словаря с ключом action (тип действия). Например:

{
    "action": "discovery",
    "sender": 1234
}

Метод для посылки таких сообщений:

    def _send_action(self, action, data=None):
        data = data or {}
        self._network.send_json_broadcast({'action': action, 'sender': self._my_pid, **data})

Сам процесс сканирования: в бесконечном цикле рассылаем сообщение discovery, и сразу переходим в режим приема. 5 секунд ждем подходящее сообщение от других клиентов. Если оно пришло, то обрабатываем событие и прекращаем сканирование, выходя из цикла. При этом на сообщение discovery мы обязаны ответить stop_scan, чтобы удаленные клиент понял, что он нас нашел и тоже вышел из процесса сканирования.

    def run(self):
        while True:
            logging.info('Scanning...')
            # рассылаем всем сообщение A_DISCOVERY
            self._send_action(self.A_DISCOVERY)

            # ждем приемлемого ответа не более 5 секунд, игнорируя таймауты и неревалентные пакеты
            data, addr = self._network.recv_json_until(self._is_message_for_me, timeout=5.0)

            # если пришло что-то наше
            if data:
                action, sender = data['action'], data['sender']
                # кто-то нам отправил A_DISCOVERY
                if action == self.A_DISCOVERY:
                    # отсылаем ему сообщение остановить сканирование A_STOP_SCAN, указав его PID
                    self._send_action(self.A_STOP_SCAN, {'to_pid': sender})
                elif action == self.A_STOP_SCAN:
                    # если получили сообщение остановить сканирование, нужно выяснить нам ли оно предназначено
                    if data['to_pid'] != self._my_pid:
                        continue  # это не нам; игнорировать!
                return addr, sender

Как понять, что сообщение нужное? В словаре должен быть ключ "action", который принимает значения «discovery» или «stop_scan«, а еще требуем, чтобы pid отправителя был не наш (фильтруем свои же сообщения). Остальные сообщения игнорируются.

 def _is_message_for_me(self, d):
        return d and d.get('action') in [self.A_DISCOVERY, self.A_STOP_SCAN] and d.get('sender') != self._my_pid

Код для тестирования алгоритма обнаружения:

if __name__ == '__main__':
    print('Testing the discovery protocol.')
    pid = random.getrandbits(64)
    print('pid =', pid)
    info = DiscoveryProtocol(pid, 37020).run()
    print("success: ", info)

Полный код класса здесь discovery_protocol.py.

Запустите один клиент. Он будет висеть в состоянии сканирования сети. А теперь запустите второй клиент. Они сразу найдут друг друга:

Testing the discovery protocol.
pid = 8100514396826939414
success:  (('192.168.1.99', 37020), 5614644081426404292)

Примечание. Этот метод обнаружения будет работать, вероятно, только в пределах вашей локальной сети (одного роутера), потому что любой адекватный роутер на стороне провайдера будет резать широковещательные пакеты. Представляете, какой бы спам начался, если бы была возможность рассылать пакеты сразу всем устройствам, подключенным к Интернет в мире?

На этом все! В следующей части мы реализуем сам сетевой геймплей между клиентами, которые нашли друг друга по этому протоколу.

Специально для канала @pyway. Подписывайтесь на мой канал в Телеграм @pyway 👈 

Дурак по сети на Python: часть 1

Дурак - иллюстрация

Давайте попробуем разработать сетевую игру на Python, чтобы можно было играть по локальной сети. Думаю, начать надо именно с логики игры, а потом добавить уже сетевое взаимодействие. Я выбрал в качестве игры – карточную игру «Дурак», чтобы, во-первых, не создавать очередные надоевшие крестики-нолики, а, во-вторых, чтобы добавить элемент синхронных взаимодействий вместо исключительно пошаговых (активный игрок может подкидывать карты, пока другой отбивается).

Начнем с определения некоторых констант в файле durak.py:

import random

# масти
SPADES = '♠'
HEARTS = '♥'
DIAMS = '♦'
CLUBS = '♣'

# достоинтсва карт
NOMINALS = ['6', '7', '8', '9', '10', 'J', 'Q', 'K', 'A']

# поиск индекса по достоинству
NAME_TO_VALUE = {n: i for i, n in enumerate(NOMINALS)}

# карт в руке при раздаче
CARDS_IN_HAND_MAX = 6

N_PLAYERS = 2

# эталонная колода (каждая масть по каждому номиналу) - 36 карт
DECK = [(nom, suit) for nom in NOMINALS for suit in [SPADES, HEARTS, DIAMS, CLUBS]]

Посмотрим нашу эталонную колоду:

print(DECK)
[('6', '♠'), ('6', '♥'), ('6', '♦'), ('6', '♣'), ('7', '♠'), ('7', '♥'), ('7', '♦'), ('7', '♣'), ('8', '♠'), ('8', '♥'), ('8', '♦'), ('8', '♣'), ('9', '♠'), ('9', '♥'), ('9', '♦'), ('9', '♣'), ('10', '♠'), ('10', '♥'), ('10', '♦'), ('10', '♣'), ('J', '♠'), ('J', '♥'), ('J', '♦'), ('J', '♣'), ('Q', '♠'), ('Q', '♥'), ('Q', '♦'), ('Q', '♣'), ('K', '♠'), ('K', '♥'), ('K', '♦'), ('K', '♣'), ('A', '♠'), ('A', '♥'), ('A', '♦'), ('A', '♣')]

Мы не будем ее менять, просто при создании игры будем копировать этот список в колоду текущей игры. Каждая карта в колоде или в руке игрока – это кортеж из строки-достоинства и строки-масти.

Создадим класс игрока. Его свойства: список карт на руке и индекс игрока в массиве игроков (0 – для первого и 1 – для второго). Индекс нужен, чтобы определять текущего ходящего игрока. Игрок может брать недостающее число карт из колоды или просто добавлять себе в руку список карт, когда вынужден взять неотбитый им стол.

class Player:
    def __init__(self, index, cards):
        self.index = index
        self.cards = list(map(tuple, cards))  # убедимся, что будет список кортежей

    def take_cards_from_deck(self, deck: list):
        """
        Взять недостающее количество карт из колоды
        Колода уменьшится
        :param deck: список карт колоды 
        """
        lack = max(0, CARDS_IN_HAND_MAX - len(self.cards))
        n = min(len(deck), lack)
        self.add_cards(deck[:n])
        del deck[:n]
        return self

    def sort_hand(self):
        """
        Сортирует карты по достоинству и масти
        """
        self.cards.sort(key=lambda c: (NAME_TO_VALUE[c[0]], c[1]))
        return self

    def add_cards(self, cards):
        self.cards += list(cards)
        self.sort_hand()
        return self

    # всякие вспомогательные функции:
    
    def __repr__(self):
        return f"Player{self.cards!r}"

    def take_card(self, card):
        self.cards.remove(card)

    @property
    def n_cards(self):
        return len(self.cards)

    def __getitem__(self, item):
        return self.cards[item]

Приступим же к классу Durak – основному классу игровой логики:

class Durak:
    def __init__(self, rng: random.Random = None):
        self.rng = rng or random.Random()  # генератор случайных чисел

        self.deck = list(DECK)  # копируем колоду
        self.rng.shuffle(self.deck)  # мешаем карты в копии колоды

        # создаем игроков и раздаем им по 6 карт из перемешанной колоды
        self.players = [Player(i, []).take_cards_from_deck(self.deck)
                        for i in range(N_PLAYERS)]

        # козырь - карта сверху
        self.trump = self.deck[0][1]
        # кладем козырь под низ вращая список по кругу на 1 назад
        self.deck = rotate(self.deck, -1)

        # игровое поле: ключ - атакующая карта, значения - защищающаяся или None
        self.field = {}  

        self.attacker_index = 0  # индекс атакующего игрока
        self.winner = None  # индекс победителя

Генератор случайных чисел можно указать из-вне, это нужно для отладки, чтобы каждый раз воспроизводилась одна и та же раздача, если ГСЧ не задать, то он будет создан на месте и игра будет случайна.

При инициализации, как и в реальной игре, мы берем колоду, перемешиваем ее, раздаем по 6 карт игрокам, берем козырь сверху, запоминаем его и кладем под низ. Кстати, вот функция rotate , которая сдвигает циклично список на n позиций влево (n < 0) или вправо (n > 0):

def rotate(l, n):
    return l[n:] + l[:n]

Я не стал выбирать первого игрока по наличию младшего козыря, потому что обычно это нужно только в первый кон, а дальше ходят под дурака. И то, как договорятся. Просто назначаем первым игрока с индексом 0.

Игровое поле здесь – это словарь, где ключ – атакующая карта, а значение – отбивающая карта (если игрок отбился) или None (если он пока еще не отбился от конкретно этой атакующей карты).

Для получения списков карт на поле вводим такие свойства:

    @property
    def attacking_cards(self):
        """
        Список атакующих карт
        """
        return list(filter(bool, self.field.keys()))

    @property
    def defending_cards(self):
        """
        Список отбивающих карт (фильртруем None)
        """
        return list(filter(bool, self.field.values()))

    @property
    def any_unbeaten_card(self):
        """
        Есть ли неотбитые карты
        """
        return any(c is None for c in self.defending_cards)

А эти свойства помогают определить, кто текущий игрок, а кто его соперник:

    @property
    def current_player(self):
        return self.players[self.attacker_index]

    @property
    def opponent_player(self):
        return self.players[(self.attacker_index + 1) % N_PLAYERS]

Рассмотрим теперь методы атаки и защиты:

    def attack(self, card):
        assert not self.winner  # игра не должна быть окончена!

        # можно ли добавить эту карту на поле? (по масти или достоинству)
        if not self.can_add_to_field(card):
            return False
        cur, opp = self.current_player, self.opponent_player
        cur.take_card(card)  # уберем карту из руки атакующего
        self.field[card] = None  # карта добавлена на поле, пока не бита
        return True

Ходить можно с любой карты, если игровое поле пусто. Но подбрасывать можно только, если карта соответствует по достоинству или масти – этой проверкой заведует метод can_add_to_field:

    def can_add_to_field(self, card):
        if not self.field:  
            # на пустое поле можно ходить любой картой
            return True

        # среди всех атакующих и отбивающих карт ищем совпадения по достоинствам
        for attack_card, defend_card in self.field.items():
            if self.card_match(attack_card, card) or self.card_match(defend_card, card):
                return True
        return False

    def card_match(self, card1, card2):
        if card1 is None or card2 is None:
            return False
        n1, _ = card1
        n2, _ = card2
        return n1 == n2   # равны ли достоинства карт?

Переходим к защите:

    def defend(self, attacking_card, defending_card):
        """
        Защита
        :param attacking_card: какую карту отбиваем 
        :param defending_card: какой картой защищаемя
        :return: bool - успех или нет
        """
        assert not self.winner  # игра не должна быть окончена!

        if self.field[attacking_card] is not None:
            # если эта карта уже отбита - уходим
            return False
        if self.can_beat(attacking_card, defending_card):
            # еслии можем побить, то кладем ее на поле 
            self.field[attacking_card] = defending_card
            # и изымаем из руки защищающегося
            self.opponent_player.take_card(defending_card)
            return True
        return False

Метод, который определяет бьет ли первая карта вторую выглядит так. Обратите внимание, что предварительно надо преобразовать название достоинства карты в числовую характеристику – индекс в массиве достоинств по возрастанию (индекс шестерки – 0, семерки – 1, а у туза – 8).

    def can_beat(self, card1, card2):
        """
        Бьет ли card1 карту card2
        """
        nom1, suit1 = card1
        nom2, suit2 = card2

        # преобразуем строку-достоинство в численные характеристики
        nom1 = NAME_TO_VALUE[nom1]
        nom2 = NAME_TO_VALUE[nom2]

        if suit2 == self.trump:
            # если козырь, то бьет любой не козырь или козырь младше
            return suit1 != self.trump or nom2 > nom1
        elif suit1 == suit2:
            # иначе должны совпадать масти и номинал второй карты старше первой
            return nom2 > nom1
        else:
            return False

Метод завершающий ход finish_turn возвращает результат хода. В зависимости от ситуации на столе могут быть такие варианты. 1) Отбиты все карты. Тогда ход переходит к игроку, который защищался. Оба добирают из колоды недостающее число карт. 2) Не отбил что-то, тогда право хода не меняется, атакующий добирает карты, а защищающийся собирает со стола все карты к себе в руку. 3) Игра завешена, так как карт в колоде больше нет, и один из соперников тоже избавился от всех карт. Тот, кто остался с картами на руках в конце игры – ДУРАК 😉

    # константы результатов хода
    NORMAL = 'normal'
    TOOK_CARDS = 'took_cards'
    GAME_OVER = 'game_over'
    
    @property
    def attack_succeed(self):
        return any(def_card is None for def_card in self.field.values())

    def finish_turn(self):
        assert not self.winner

        took_cards = False
        if self.attack_succeed:
            # забрать все карты, если игрок не отбился в момент завершения хода
            self._take_all_field()
            took_cards = True
        else:
            # бито! очищаем поле (отдельного списка для бито нет, просто удаляем карты)
            self.field = {}

        # очередность взятия карт из колоды определяется индексом атакующего (можно сдвигать на 1, или нет)
        for p in rotate(self.players, self.attacker_index): 
            p.take_cards_from_deck(self.deck)

        # колода опустела?
        if not self.deck:
            for p in self.players:
                if not p.cards:  # если у кого-то кончились карты, он победил!
                    self.winner = p.index
                    return self.GAME_OVER

        if took_cards:
            return self.TOOK_CARDS
        else:
            # переход хода, если не отбился
            self.attacker_index = self.opponent_player.index
            return self.NORMAL

    def _take_all_field(self):
        """
        Соперник берет все катры со стола себе.  
        """
        cards = self.attacking_cards + self.defending_cards
        self.opponent_player.add_cards(cards)
        self.field = {}

Вот и вся логика. Один атакует attack, другой отбивается defend. В любой момент может быть вызван finish_turn, чтобы завершить ход. Смотрим на результат хода, и если игра окончена, то в поле winner будет индекс игрока-победителя.

Теперь реализуем локальную игру в консоли, как будто бы оба играют за одним компьютером. Функции по отрисовке состояния игры в консоль собраны в файле render.py. Не буду их разбирать подробно, так как они не так важны, а в будущем мы прикрутим графическую оболочку и консольные функции потеряют актуальность.

Сам же игровой интерфейс реализован в файле local_game.py:

from render import ConsoleRenderer
from durak import Durak
import random

def local_game():
    # rng = random.Random(42)  # игра с фиксированным рандомом (для отладки)
    rng = random.Random()  # случайная игра

    g = Durak(rng=rng)
    renderer = ConsoleRenderer()

    renderer.help()

    while not g.winner:
        renderer.render_game(g, my_index=0)

        renderer.sep()
        choice = input('Ваш выбор: ')
        # разбиваем на части: команда - пробел - номер карты
        parts = choice.lower().split(' ')
        if not parts:
            break

        command = parts[0]

        try:
            if command == 'f':
                r = g.finish_turn()
                print(f'Ход окончен: {r}')
            elif command == 'a':
                index = int(parts[1]) - 1
                card = g.current_player[index]
                if not g.attack(card):
                    print('Вы не можете ходить с этой карты!')
            elif command == 'd':
                index = int(parts[1]) - 1
                new_card = g.opponent_player[index]

                # варианты защиты выбранной картой
                variants = g.defend_variants(new_card)

                if len(variants) == 1:
                    def_index = variants[0]
                else:
                    def_index = int(input(f'Какую позицию отбить {new_card}? ')) - 1

                old_card = list(g.field.keys())[def_index]
                if not g.defend(old_card, new_card):
                    print('Не можете так отбиться')
            elif command == 'q':
                print('QUIT!')
                break
        except IndexError:
            print('Неправильный выбор карты')
        except ValueError:
            print('Введите число через пробел после команды')

        if g.winner:
            print(f'Игра окончена, победитель игрок: #{g.winner + 1}')
            break

if __name__ == '__main__':
    local_game()

Команды (a #номер карты – атака, d #номер карты – защита, просто f – завершить ход, q – выход). Номера карт задаются с 1 (там будет нумерация возле карт).

Локальную версию игры можно пощупать в браузере через replit.

Пример игры:

Козырь – [♦], 24 карт в колоде осталось.
1: 1. [7♥], 2. [10♠], 3. [J♥], 4. [K♥], 5. [A♥], 6. [A♦] <-- ходит (это я) 
2: 1. [6♠], 2. [7♠], 3. [8♣], 4. [8♦], 5. [9♦], 6. [K♣]
--------------------------------------------------------------------------------
Ваш выбор: a 1
--------------------------------------------------------------------------------
Козырь – [♦], 24 карт в колоде осталось.
1: 1. [10♠], 2. [J♥], 3. [K♥], 4. [A♥], 5. [A♦] <-- ходит (это я) 
2: 1. [6♠], 2. [7♠], 3. [8♣], 4. [8♦], 5. [9♦], 6. [K♣]

1. Ходит: [7♥] - отбиться: [  ]
--------------------------------------------------------------------------------
Ваш выбор: d 5
--------------------------------------------------------------------------------
Козырь – [♦], 24 карт в колоде осталось.
1: 1. [10♠], 2. [J♥], 3. [K♥], 4. [A♥], 5. [A♦] <-- ходит (это я) 
2: 1. [6♠], 2. [7♠], 3. [8♣], 4. [8♦], 5. [K♣]

1. Ходит: [7♥] - отбиться: [9♦]
--------------------------------------------------------------------------------
Ваш выбор: f
Ход окончен: normal
--------------------------------------------------------------------------------
Козырь – [♦], 22 карт в колоде осталось.
1: 1. [10♠], 2. [J♥], 3. [K♥], 4. [K♦], 5. [A♥], 6. [A♦] (это я) 
2: 1. [6♠], 2. [7♠], 3. [7♦], 4. [8♣], 5. [8♦], 6. [K♣] <-- ходит
--------------------------------------------------------------------------------
Ваш выбор: 

Весь код будет доступен в репозитории. О сетевой подсистеме игры я расскажу в следующих частях очень скоро!

Специально для канала @pyway. Подписывайтесь на мой канал в Телеграм @pyway 👈 

Python: все про del

КДПВ

Инструкция del (от англ. delete), как можно понять из названия, нужна чтобы что-то удалять, а именно имена переменных, атрибуты объектов, элементы списков и ключи словарей.

1. Удаление элемента из списка по индексу:

>>> x = [1, 2, 3, 4, 5]
>>> del x[2]
>>> x
[1, 2, 4, 5]

Также можно удалять по срезам. Пример: удаление первых двух элементов:

>>> x = [1, 2, 3, 4, 5]
>>> del x[:2]
>>> x
[3, 4, 5]

Удаление последних n элементов: del x[n:].

Удаление элементов с четными индексами: del x[::2], нечетными: del x[1::2].

Удаление произвольного среза: del x[i:j:k].

Не путайте del x[2] и x.remove(2). Первый удаляет по индексу (нумерация с 0), а второй по значению, то есть находит в списке первую двойку и удаляет ее.

2. Удаление ключа из словаря. Просто:

>>> d = {"foo": 5, "bar": 8}
>>> del d["foo"]
>>> d
{'bar': 8}

А вот строки, байты и сеты del не поддерживают.

3. Удаление атрибута объекта.

class Foo:
    def __init__(self):
        self.var = 10

f = Foo()
del f.var
print(f.var)  # ошибка! 

Примечание: можно через del удалить метод у самого класса del Foo.method, но нельзя удалить метод у экземпляра класса del Foo().methodAttributeError.

4. Что значит удалить имя переменной? Это просто значит, что надо отвязать имя от объекта (при этом если на объект никто более не ссылается, то он будет освобожден сборщиком мусора), а само имя станет свободно. При попытке доступа к этому имени после удаления будет NameError, пока ему снова не будет что-то присвоено.

>>> a = 5
>>> del a
>>> a
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
NameError: name 'a' is not defined

Здесь кроется один нюанс. Если переменная была внутри функции помечена, как global, то после ее удаления глобальная переменная никуда не денется, а имя освободится лишь в зоне видимости функции. Причем если мы снова присвоим ей значение, то она опять окажется глобальной, т.е. del не очищает информацию о global!

g = 100
def f():
    global g
    g = 200
    del g  # g останется вне функции
    g = 300  # та же самая глобальная g

f()
print(g) # 300

Чтобы реально удалить глобальную переменную, можно сделать так: del globals()['g'].

В пунктах 1, 2, 3 в качестве имен могут фигурировать выражения и ссылки, так как операции идут над содержимым объектов, а в пункте 4 должно быть строго формальное имя удаляемого объекта.

>>> x = [1, 2, 3]
>>> y = x
>>> del y  # удаляет именно y, но x остается

Еще одна особенность del – она может удалить несколько вещей за раз, если передать в нее кортеж или список объектов на удаление.

x, y, z = 10, 20, [1, 2, 3]
del x, y, z[2]

Пусть задан список из 5 элементов:

x = [1, 2, 3, 4, 5]
del x[2], x[4]

Казалось бы, что будут удалены 2-й и 4-й элементы списка, но это не так! Удаления происходят один за одним, и сначала будет удален 2-й элемент, размер списка уменьшится на 1, а потом будет попытка удалить 4-й элемент, но она будет неудачна – вылетит исключение IndexError, потому что элемента с индексом 4 больше нет, а сам список будет равен [1, 2, 4, 5]. Так что будьте внимательны в подобных ситуациях!

Специально для канала @pyway. Подписывайтесь на мой канал в Телеграм @pyway 👈 

Нечеткое сравнение текстов на Python с FuzzyWuzzy

Недавно мы обсуждали расчет расстояния Левеншейна, настало время испытать его применение на практике. Библиотека FuzzyWuzzy содержит набор функций для нечеткого поиска строк, дедупликации (удаления копий), корректировки ошибок. Она позволяет стать поиску умнее, помогая преодолеть влияние человеческого фактор.

Начнем с установки:

pip install fuzzywuzzy python-Levenshtein

Модуль python-Levenshtein можно устанавливать по желанию: работать будет и без него, но с ним гораздо быстрее (в разы). Поэтому советую его установить, он мелкий, порядка 50 Кб.

Похожесть двух строк

Задача: есть две строки, требуется вычислить степень их похожести числом от 0 до 100. В FuzzyWuzzy для этого есть несколько функций, отличающихся подходом к сравнению и вниманием к деталям. Не забудем импортировать:

from fuzzywuzzy import fuzz

Функция fuzz.ratio – простое посимвольное сравнение. Рейтинг 100 только если строки полностью равны, любое различие уменьшает рейтинг, будь то знаки препинания, регистр букв, порядок слов и так далее:

>>> fuzz.ratio("я люблю спать", "я люблю спать")
100
>>> fuzz.ratio("я люблю спать", "Я люблю cпать!")
81
>>> fuzz.ratio("я люблю спать", "я люблю есть")
88

Обратите внимание, что рейтинг второго примера ниже, чем у третьего, хотя по смыслу должно быть наоборот.

Следующая функция fuzz.token_sort_ratio решает эту проблему. Теперь акцент именно на сами слова, игнорируя регистр букв, порядок слов и даже знаки препинания по краям строки.

>>> fuzz.token_sort_ratio("я люблю спать", "я люблю есть")
56
>>> fuzz.token_sort_ratio("я люблю спать", "Я люблю спать!")
100
>>> fuzz.token_sort_ratio("я люблю спать", "спать люблю я...")
100

>>> fuzz.token_sort_ratio("Мал да удал", "удал да МАЛ")
100
>>> fuzz.token_sort_ratio("Мал да удал", "Да Мал Удал")
100

Однако, смысл пословицы немного изменился, а рейтинг остался на уровне полного совпадения.

Функция fuzz.token_set_ratio пошла еще дальше: она игнорирует повторяющиеся слова, учитывает только уникальные.

>>> fuzz.token_set_ratio("я люблю спать", "люблю я спать, спать, спать...")
100
>>> fuzz.token_set_ratio("я люблю спать", "люблю я спать, спать и спать...")
100
>>> fuzz.token_set_ratio("я люблю спать", "но надо работать")
28

# повторы в token_sort_ratio роняют рейтинг! 
>>> fuzz.token_sort_ratio("я люблю спать", "люблю я спать, спать и спать.")
65

# но вот это странно:
>>> fuzz.token_set_ratio("я люблю спать", "люблю я спать, но надо работать")
100
>>> fuzz.token_set_ratio("я люблю спать", "люблю я спать, люблю я есть")
100

Последние два примера вернули 100, хотя добавлены новые слова, и это странно. Тут следует вспомнить о fuzz.partial_ratio, которая ведет себя также. А именно, проверяет вхождение одной строки в другую. Лишние слова игнорируются, главное – оценить, чтобы ядро было одно и тоже.

>>> fuzz.partial_ratio("одно я знаю точно", "одно я знаю")
100
>>> fuzz.partial_ratio("одно я знаю точно", "одно я знаю!")
92
>>> fuzz.partial_ratio("одно я знаю точно", "я знаю")
100

Еще еще более навороченный метод fuzz.WRatio, который работает ближе к человеческой логике, комбинируя несколько методов в один алгоритм в определенными весами (отсюда и название WRatio = Weighted Ratio).

>>> fuzz.WRatio("я люблю спать", "люблю Я СПАТЬ!")
95
>>> fuzz.WRatio("я люблю спать", "люблю Я СПАТЬ и есть")
86
>>> fuzz.WRatio("я люблю спать", "!!СПАТЬ ЛЮБЛЮ Я!!")
95

Нечеткий поиск

Задача: найти в списке строк одну или несколько наиболее похожих на поисковый запрос.

Импортируем подмодуль и применим process.extract или process.extractOne:

from fuzzywuzzy import process

strings = ['привет', 'здравствуйте', 'приветствую', 'хай', 'здорова', 'ку-ку']
process.extract("Прив", strings, limit=3)
# [('привет', 90), ('приветствую', 90), ('здравствуйте', 45)]

process.extractOne("Прив", strings)
# ('привет', 90)

Удаление дубликатов

Очень полезная функция для обработки данных. Представьте, что вам досталась 1С база номенклатуры запчастей, там полный бардак, и вам нужно поудалять лишние повторяющиеся позиции товара, но где-то пробелы лишние, где-то буква перепутана и тому подобное. Тут пригодится process.dedupe.

dedupe(contains_dupes, threshold=70, scorer=token_set_ratio)

Первый аргумент – исходный список, второй – порог исключения (70 по умолчанию), третий – алгоритм сравнения (token_set_ratio по умолчанию).

Пример:

arr = ['Гайка на 4', 'гайка 4 ГОСТ-1828 оцинкованная', 'Болты на 10', 'гайка 4 ГОСТ-1828 оцинкованная ...', 'БОЛТ']

print(list(process.dedupe(arr)))
# ['гайка 4 ГОСТ-1828 оцинкованная ...', 'Болты на 10', 'БОЛТ']

FuzzyWuzzy можно применять совместно с Pandas. Например так (без особых подробностей):

def get_ratio(row):
    name = row['Last/Business Name']
    return fuzz.token_sort_ratio(name, "Alaska Sea Pilot PAC Fund")

df[df.apply(get_ratio, axis=1) > 70]

Специально для канала @pyway. Подписывайтесь на мой канал в Телеграм @pyway 👈 

Расстояние Левенштейна на Python

Как понять насколько близки две строки? Как поисковая система все равно находит то, что надо, даже если вы совершили пару опечаток в запросе? В этом вопросе нам поможет расстояние по Левенштейну или редакционное расстояние. Почему редакционное? Потому что мы считаем, сколько действий по редактированию одной строки нужно совершить, чтобы получить другую строку. Действия бывают следующими: вставка символа, удаление символа и замена одного символа другим.

Одинаковые строки имеют нулевое расстояние: не нужно ничего редактировать, первая и так равна второй. «Привет» и «Привт» имеют расстояние 1 (пропущена одна буква, остальное не изменилось). «Привет» и «привты» имеют расстояние 3 (одна замена «П» на «п», удаление буквы «е» и вставка «ы» на конце). Мы будем считать

Я не буду сюда копировать теорию и тем более доказательства, это вы можете изучить в Вики.

Давайте попробуем реализовать этот алгоритм в лоб по формуле:

Рекурсивная формула

Функция m – возвращает единицу, если символы не равны, иначе 0. Вот такой код получится:

def my_dist(a, b):
    def recursive(i, j):
        if i == 0 or j == 0:
            # если одна из строк пустая, то расстояние до другой строки - ее длина
            # т.е. n вставок
            return max(i, j)
        elif a[i - 1] == b[j - 1]:
            # если оба последних символов одинаковые, то съедаем их оба, не меняя расстояние
            return recursive(i - 1, j - 1)
        else:
            # иначе выбираем минимальный вариант из трех
            return 1 + min(
                recursive(i, j - 1),  # удаление
                recursive(i - 1, j),   # вставка
                recursive(i - 1, j - 1)  # замена
            )
    return recursive(len(a), len(b))

Вспомогательная функция, чтобы протестировать производительность:

from timeit import timeit

def test_lev_dist(f: callable, a, b, n=1):
    tm = timeit("f(a, b)", globals={
        'f': f, 'a': a, 'b': b
    }, number=n)
    r = f(a, b)
    print(f'a = {a!r} and b = {b!r} and {f.__name__} = {r} and time = {tm:.4f}')

test_lev_dist(my_dist, "hello world", "bye world!")
# a = 'hello world' and b = 'bye world!' and my_dist = 6 and time = 1.3829

Как можете видеть, первый вариант кода работает экстремально медленно, потому что много раз вычисляет функцию при одних и тех же входных параметрах. Давайте узнаем, сколько раз вызывается внутренняя функция. Для этого добавим декоратор, который считает число вызовов:

def count_calls(f):
    @wraps(f)
    def wrapped(*args, **kwargs):
        wrapped.n_calls += 1
        return f(*args, **kwargs)
    wrapped.n_calls = 0
    return wrapped

def my_dist(a, b):
    @count_calls
    def recursive(i, j):
        ...  # прочий код пропущен
    r = recursive(len(a), len(b))
    print('calls =', recursive.n_calls)
    return r

my_dist("hello world", "bye world!")
# calls =  3317804

Более 3 миллионов вызовов! И большинство из них с одинаковыми параметрами. А так как они повторяются, то можно их закешировать (при помощи lru_cache). Здесь размер кэша примерно равен числу различных комбинаций входных параметров.

from functools import lru_cache

def my_dist_cached(a, b):
    @count_calls
    @lru_cache(maxsize=len(a) * len(b))
    def recursive(i, j):
        if i == 0 or j == 0:
            return max(i, j)
        elif a[i - 1] == b[j - 1]:
            return recursive(i - 1, j - 1)
        else:
            return 1 + min(
                recursive(i, j - 1), 
                recursive(i - 1, j), 
                recursive(i - 1, j - 1)
            )

    r = recursive(len(a), len(b))
    print('calls = ', recursive.n_calls)
    return r

my_dist_cached("hello world", "bye world!")
# calls = 212

Количество вызовов сократилось до 212, а скорость работы увеличилась на порядки. Выкинем count_calls и проведем замеры времени для 10000 повторных вызовов.

def my_dist_cached(a, b):
    @lru_cache(maxsize=len(a) * len(b))
    def recursive(i, j):
        if i == 0 or j == 0:
            return max(i, j)
        elif a[i - 1] == b[j - 1]:
            return recursive(i - 1, j - 1)
        else:
            return 1 + min(
                recursive(i, j - 1),
                recursive(i - 1, j),
                recursive(i - 1, j - 1)
            )
    return recursive(len(a), len(b))

test_lev_dist(my_dist_cached, "hello world", "bye world!", n=10000)
# a = 'hello world' and b = 'bye world!' and my_dist_cached = 6 and time = 0.9740

Производительность улучшилась радикально (в прошлый раз мы прогоняли один вызов функции, а теперь 10000 раз, и то выходит быстрее). Однако, пока что объем требуемой памяти растет как O(len(a) * len(b)). Иными словами, для двух мегабайтных строк потребуются гигабайты памяти. Фактически в кэше будет хранится почти все матрица редактирований, а она нам не нужна целиком. Наша цель – правый нижний элемент.

Матрица редактирований recursive(i, j)
Матрица редактирований recursive(i, j)

Для его поиска можно обойтись лишь парой рядов: текущим и предыдущим. А остальные ряды не хранить в памяти. Так мы дойдем до конца таблицы, и нижний правый угол и будет искомым значением.

Вот пример реализации:

def distance(a, b):
    n, m = len(a), len(b)
    if n > m:
        # убедимся что n <= m, чтобы использовать минимум памяти O(min(n, m))
        a, b = b, a
        n, m = m, n

    current_row = range(n + 1)  # 0 ряд - просто восходящая последовательность (одни вставки)
    for i in range(1, m + 1):
        previous_row, current_row = current_row, [i] + [0] * n
        for j in range(1, n + 1):
            add, delete, change = previous_row[j] + 1, current_row[j - 1] + 1, previous_row[j - 1]
            if a[j - 1] != b[i - 1]:
                change += 1
            current_row[j] = min(add, delete, change)

    return current_row[n]

Объяснение. Сначала, чтобы использовать еще меньше памяти, мы можем поменять местами наши строки, чтобы длина рядов была минимальна. Это существенно экономит память, если одна из строк длинная, а другая короткая.

Затем мы понимаем, что нулевой ряд или столбец матрицы – просто восходящая последовательность. Потому что, чтобы из пустой строки получить любую не пустую, нужно ровно то число вставок, сколько символов в не пустой строке. И наоборот: n удалений из строки длины n приведут неизбежно к пустой строке.

Нам достаточно пары рядов.
Тут на картинке не ряды, а столбцы, но смысла это не меняет.

Потом мы шагаем по рядам матрицы, помня только текущий ряд и предыдущий, мы заполняем неизвестные клетки текущего ряда. Соседние клетки отвечают за вставку одного символа, удаление или замену (если символы неравны). Из трех возможных изменений мы выбираем то, чья стоимость наименьшая.

Эта версия еще быстрее, чем кэшированная:

 test_lev_dist(distance, "hello world", "bye world!", n=10000)
# a = 'hello world' and b = 'bye world!' and distance = 6 and time = 0.7374

Сложность этого алгоритма растет как произведение длин строк: O(n*m). Это еще не самый эффективный алгоритм. Для дальнейшего ускорения нужно воспользоваться древовидной структурой данных. Также неплохо бы учесть то, что на известном словаре можно заранее вычислить расстояния между словами.

Наконец-то, когда мы разобрались с принципом работы алгоритма, вспомним, что все велосипеды уже написаны до нас, да еще и на языке Си. Воспользуемся библиотечными функциями, установив пакет:

 pip install python-Levenshtein
import Levenshtein

test_lev_dist(Levenshtein.distance, "hello world", "bye world!", n=10000)
# a = 'hello world' and b = 'bye world!' and distance = 6 and time = 0.0032

Специально для канала @pyway. Подписывайтесь на мой канал в Телеграм @pyway 👈