見出し画像

ブロックスデュオの対戦アプリケーション解説!~サンプルコード有

こんにちは!スマートスケープDE事業部の三井です。

第一回 Smart SCAPE CUPで使用したブロックスデュオの対戦アプリケーションについて、一部を紹介します。
Smart SCAPE CUPでは学生たちが対戦AIを開発し、実際にブロックスデュオというボードゲームで対戦するというイベントを行いました!
イベントレポート記事も是非ご覧ください。


プログラム全体の構成

まずアプリケーション全体の構成ですが、大きく以下の3つのモジュールからなります。

  1. ゲームを進行する、Game Master

  2. いわゆるゲームプレイヤーに相当する、Player Client

  3. ゲームの進行を表示する、Viewer

今回の記事では、1.のGame Masterについて紹介します。

```mermaid
flowchart BT;
    V[Viewer]
    GM[Game Master]
    P1[Player Client]
    P2[Player Client]

    GM-->V;
    P1-->GM;
    P2-->GM;
```

処理の流れ

Game Masterの処理の流れの概要は図のようになっています。

```mermaid
flowchart TB;
    S((開始))
    Init[ゲームの初期化]
    P_Init["(1)Player1/Player2の接続"]
    set_P1[手番PlayerをPlayer1に]
    L1_S[/LOOP:
          Player1/2が
          共にパスするまで\]
    L1_E[\LOOP/]
    C_Pass{手番Playerは
           パス済み?}
    PA[手番Playerから
       手を受信]
    parse["(2)プレイヤーの手を解析"]
    pass{パス?}
    place["(3)ブロックを配置"]
    switch[手番Playerを入れ替える]
    winner[配置したブロックの
           面積が多いほうが勝者]
    E((終了))

    S-->Init-->P_Init-->set_P1;
    set_P1-->L1_S-->C_Pass;
    C_Pass-->|No|PA-->parse-->pass;
    C_Pass-->|Yes|switch;
    pass-->|No|place-->switch;
    pass-->|Yes|switch;
    switch-->L1_E-->winner-->E;
```

(1)Player1/Player2の接続

今回のアプリケーションとして、Game MasterとPlayer Clientをそれぞれ
別プロセスとしたかったため、Game MasterとPlayer Clientはそれぞれwebsocketを介して通信します。

また、Game MasterとPlayer Clientを別々に起動するのではなく、
Game MasterからPlayer Clientを別プロセスとして起動するようにしました。
これにより、Game Masterを起動する際に2つのPlayer Clientを指定することで、自動でゲームが開始します。

class PlayerFactory:
    @staticmethod
    async def create(server: WebsocketServer, player_number: int, target: str, name: str, loop: asyncio.AbstractEventLoop):
        future: asyncio.Future[Player] = loop.create_future()

        def on_connect(socket: websockets.WebSocketServerProtocol):
            # websocketの接続時にPlayerオブジェクトを生成する
            player = Player(player_number, target, name, socket)

            print(f'player: {player_number} connected')
            future.set_result(player)

        server.set_callback(on_connect)
        # 別プロセスとしてPlayer Clientを実行
        loop.run_in_executor(None, PlayerFactory.start_client, target, server.server_url())

        try:
            # 接続待機
            player = await asyncio.wait_for(future, 20)
            await player.send_player_number()
            print(f'player {player_number} was created.')
            return player
        finally:
            server.clear_callback()

    @staticmethod
    def start_client(target: str, url: str):
        print(f'client_script={target}')
        args = [target, url]
        subprocess.run(args)

(2)プレイヤーの手を解析

Player ClientはGame Masterに手を送信するために、
{ブロック種別}{回転/反転}{x座標}{y座標} の4文字の文字列を送信します。

ブロック種別については、ブロックスの21種のピースにA~Uまでのアルファベットを対応させています。
また、パスする場合は、この文字をXにします。

回転/反転については、それぞれに対応した0~7の値で指定します。

$$
\begin{array}{c|cc}
\textbf{番号} & \textbf{回転} & \textbf{反転} \\ \hline
0 & なし & なし \\
1 & なし & あり \\
2 & 90°回転 & なし \\
3 & 90°回転 & あり \\
4 & 180°回転 & なし \\
5 & 180°回転 & あり \\
6 & 270°回転 & なし \\
7 & 270°回転 & あり\\
\end {array}
$$

配置位置については、x座標とy座標を16進数で指定します。この際に指定する座標は、ピースを2次元配列としてみた際の、左上の座標を指定します。ゲームの盤面の大きさは14マス×14マスなので、1~Eを使用します。

異常な文字列を受信した、もしくは5秒以内にPlayer Clientから文字列が受信できなかった場合は、そのPlayer Clientを反則負けとします。

(3)ブロックを配置

続いて、Player Clientから受信した文字列をもとに、ブロックを盤面に配置します。

まず盤面の状態の保持ですが、盤面は14マス×14マスなので、
同サイズの2次元配列として管理します。
各マスの状態は、空 / Player1のピース / Player2 のピースのいずれかの値を持ちます。

続いて、配置しようとしているピースを14×14の2次元配列上に配置します。
その後、ブロックに隣接する場所と角に該当する場所を同じ配列上にマークします。こうすることで、現在の盤面との比較をやりやすくします。

# L字型のブロックを5,7の位置に配置する例
# ピース: 1, 角: 2, 隣接: 3でプロット
array([[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 2, 3, 2, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 3, 1, 3, 3, 3, 2, 0, 0, 0, 0, 0],
       [0, 0, 0, 3, 1, 1, 1, 1, 3, 0, 0, 0, 0, 0],
       [0, 0, 0, 2, 3, 3, 3, 3, 2, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
       [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])

作成した2次元配列と盤面の2次元配列について、以下の条件を確認します。
条件をすべて満たした場合、ピースを盤面に配置します。

  1. ピースの座標の盤面がすべて空である

  2. ピースに隣接する場所の座標の盤面に、同一プレイヤーのピースが一つも存在しない

  3. ピースと角で接する場所の座標の盤面に、同一プレイヤーのピースが少なくとも一つ存在する

これらの条件判定の実装はこのようにしています。
(一部抜粋)

class Board:
    def __init__(self):
        self.__board = np.zeros((14, 14), dtype=np.int64)
        pass

    def now_board(self):
        """
        現状のboardの状態を返す
        :return:
        """
        return self.__board

    @property
    def shape_x(self) -> int:
        return self.__board.shape[1]

    @property
    def shape_y(self) -> int:
        return self.__board.shape[0]
    
    def try_place_block(self, player: Player, block: Block, position: Position):
        self.assert_range(block, position)
        padded_block = Board.PaddedBlock(self, block, position)
        if not self.can_place(player, padded_block):
            raise ValueError("invalid position: cannot place")
        self.place_block(player, padded_block)

    def assert_range(self, block, position):
        if position.x < 0 or position.x + block.shape_x > self.shape_x:
            raise ValueError("invalid position")
        if position.y < 0 or position.y + block.shape_y > self.shape_y:
            raise ValueError("invalid position")

    def can_place(self, player: Player, padded_block: PaddedBlock) -> bool:
        # 条件1
        if self.detect_collision(padded_block):
            return False
        # 条件2
        if self.detect_side_connection(player, padded_block):
            return False
        # 条件3
        return self.detect_corner_connection(player, padded_block)

    def detect_collision(self, padded_block: PaddedBlock) -> bool:
        collision_map = padded_block.block_map
        return self.__board.flatten().dot(collision_map.flatten()) > 0

    def detect_side_connection(self, player: Player, padded_block: PaddedBlock) -> bool:
        edge_map = padded_block.edge_map
        player_block_map = np.zeros(self.__board.shape, dtype=np.int64)
        player_block_map[self.__board == player.player_number] = 1
        return player_block_map.flatten().dot(edge_map.flatten()) > 0

    def detect_corner_connection(self, player: Player, padded_block: PaddedBlock) -> bool:
        corner_map = padded_block.corner_map
        player_block_map = np.zeros(self.__board.shape, dtype=np.int64)
        player_block_map[self.__board == player.player_number] = 1
        return player_block_map.flatten().dot(corner_map.flatten()) > 0

    def place_block(self, player: Player, padded_block: PaddedBlock):
        self.__board += padded_block.block_map * player.player_number

    class PaddedBlock:

        def __init__(self, board: Board, block: Block, position: Position):
            pad_top = position.y
            pad_bottom = board.shape_y - (position.y + block.shape_y)
            pad_left = position.x
            pad_right = board.shape_x - (position.x + block.shape_x)
            self.__map = np.pad(block.block_map, ((pad_top, pad_bottom), (pad_left, pad_right)))
            self.__decorate_corner(self.__map)
            self.__decorate_edge(self.__map)

        @staticmethod
        def __decorate_corner(map_):
            corner_lt = np.array([[0, 0], [0, 1]])
            corner_rt = np.array([[0, 0], [1, 0]])
            corner_lb = np.array([[0, 1], [0, 0]])
            corner_rb = np.array([[1, 0], [0, 0]])

            corner_windows = np.lib.stride_tricks.sliding_window_view(map_, [2, 2], writeable=True)
            for corner_windows_row in corner_windows:
                for corner_window in corner_windows_row:
                    if np.all(corner_window == corner_lt):
                        corner_window[0, 0] = 2
                    elif np.all(corner_window == corner_rt):
                        corner_window[0, 1] = 2
                    elif np.all(corner_window == corner_lb):
                        corner_window[1, 0] = 2
                    elif np.all(corner_window == corner_rb):
                        corner_window[1, 1] = 2

        @staticmethod
        def __decorate_edge(map_):
            edge_left = np.array([[0, 1]])
            edge_right = np.array([[1, 0]])

            vertical_windows = np.lib.stride_tricks.sliding_window_view(map_, [1, 2], writeable=True)
            for vertical_windows_row in vertical_windows:
                for vertical_window in vertical_windows_row:
                    if np.all(vertical_window == edge_left):
                        vertical_window[0, 0] = 3
                    elif np.all(vertical_window == edge_right):
                        vertical_window[0, 1] = 3

            edge_top = np.array([[0], [1]])
            edge_bottom = np.array([[1], [0]])

            vertical_windows = np.lib.stride_tricks.sliding_window_view(map_, [2, 1], writeable=True)
            for vertical_windows_row in vertical_windows:
                for vertical_window in vertical_windows_row:
                    if np.all(vertical_window == edge_top):
                        vertical_window[0, 0] = 3
                    elif np.all(vertical_window == edge_bottom):
                        vertical_window[1, 0] = 3

        @property
        def map(self):
            return self.__map

        @property
        def block_map(self):
            map_ = np.zeros(self.__map.shape, dtype=np.int64)
            map_[self.__map == 1] = 1
            return map_

        @property
        def edge_map(self):
            map_ = np.zeros(self.__map.shape, dtype=np.int64)
            map_[self.__map == 3] = 1
            return map_

        @property
        def corner_map(self):
            map_ = np.zeros(self.__map.shape, dtype=np.int64)
            map_[self.__map == 2] = 1
            return map_

配置できない手を送信した場合は、そのPlayer Clientを反則負けとします。

課題と感想

Game Masterの実装では、そこまで処理時間を気にしなくてよいのですが、もう少し処理時間を短くしたかったです。
制限時間に追われるPlayer Clientの判定ロジックに流用しようとすると、
ちょっと気になります。
あと、エラー処理はだいぶ雑に書いてしまったので反省です。

普段の業務でnumpyを使用しないので、良い勉強の機会になりました。
また触れる機会があれば、もう少し使いこなしたいところです。


We’re hiring!

スマートスケープでは一緒に働いていただける仲間も募集しています。
「こうなりたい」という思いを持ち、型にはまらず、自らの意思でキャリアを切り開ける仕組みが整っております!
ぜひお気軽に以下のフォームまたはメールアドレスにご連絡ください!
新卒採用エントリーページ

キャリア採用エントリー
キャリア採用で募集している職種についてnoteにまとめました。
以下のリンクよりご覧ください。
スマートスケープで募集中の職種を紹介します!

またご質問等あればお気軽に下記へ連絡ください。
ss-career-recruit@smart-group.co.jp


Products

スマートスケープ株式会社

公式YouTubeチャンネル

SS4M - AIを活用した3D類似形状検索ツール

無料体験版お申し込みはこちら
QUANTO - 調達/購買業務を効率化するクラウドサービス

無料プランお申し込みはこちら
SmartExchange - 3D CADデータを3D PDF/3D HTMLに自動変換するソフトウェア

realvirtual.io - バーチャルコミッショニングを実現!