232 lines
6.3 KiB
Python
232 lines
6.3 KiB
Python
from dataclasses import dataclass, field
|
|
from types import SimpleNamespace
|
|
from random import randint
|
|
|
|
from .socket import Socket
|
|
|
|
|
|
class CellState:
|
|
OPEN = 0
|
|
CLOSED = 1
|
|
FLAGGED = 2
|
|
MARKED = 3
|
|
NOT_A_FIELD = 4
|
|
|
|
|
|
STR_CLOSED = ""
|
|
STR_BOMB = "💣"
|
|
STR_FLAGGED = "🚩"
|
|
STR_MARKED = "❓"
|
|
STR_NUMBERS = [" ", *(str(i) for i in range(1, 9))]
|
|
|
|
|
|
@dataclass
|
|
class GameData:
|
|
width: int
|
|
height: int
|
|
bombs: int
|
|
game_over: bool = False
|
|
win: bool = True
|
|
bombs_in_field: list[tuple[int, int]] = field(default_factory=list)
|
|
field: list[list[int]] = field(default_factory=list)
|
|
|
|
def generate(self) -> "GameData":
|
|
self.game_over = False
|
|
self.win
|
|
|
|
self.field.clear()
|
|
self.bombs_in_field.clear()
|
|
|
|
for _ in range(self.width):
|
|
self.field.append([CellState.CLOSED for _ in range(self.height)])
|
|
|
|
return self
|
|
|
|
def generate_bombs(self, ex: int, ey: int):
|
|
while len(self.bombs_in_field) < self.bombs:
|
|
x = randint(0, self.width - 1)
|
|
y = randint(0, self.height - 1)
|
|
if x == ex and y == ey:
|
|
continue
|
|
|
|
skip = False
|
|
for x1, y1 in self.bombs_in_field:
|
|
if x1 == x and y1 == y:
|
|
skip = True
|
|
break
|
|
|
|
if skip:
|
|
continue
|
|
|
|
self.bombs_in_field.append((x, y))
|
|
|
|
def is_bomb(self, x: int, y: int) -> bool:
|
|
for x1, y1 in self.bombs_in_field:
|
|
if x1 == x and y1 == y:
|
|
return True
|
|
return False
|
|
|
|
def get_number(self, x: int, y: int) -> int:
|
|
bombs = 0
|
|
|
|
for i in range(-1, 2):
|
|
for j in range(-1, 2):
|
|
if i == 0 and j == 0:
|
|
continue
|
|
if self.is_bomb(x + i, y + j):
|
|
bombs += 1
|
|
|
|
return bombs
|
|
|
|
def is_win(self) -> bool:
|
|
if len(self.bombs_in_field) == 0:
|
|
return False
|
|
|
|
must_be_open = self.width * self.height - self.bombs
|
|
flaged_cells = 0
|
|
open_cells = 0
|
|
|
|
for x in range(self.width):
|
|
for y in range(self.height):
|
|
if self.field[x][y] == CellState.FLAGGED and self.is_bomb(x, y):
|
|
flaged_cells += 1
|
|
if self.field[x][y] == CellState.OPEN:
|
|
open_cells += 1
|
|
|
|
if open_cells == must_be_open or self.bombs == flaged_cells:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
async def game_router(
|
|
server: Socket,
|
|
socket: Socket,
|
|
room: Socket,
|
|
nickname: str,
|
|
width=0,
|
|
height=0,
|
|
bombs=0,
|
|
):
|
|
async def emit_all_in_room(event: str, *args):
|
|
await socket.emit(event, *args)
|
|
await room.emit(event, *args)
|
|
|
|
async def update_field():
|
|
str_field = []
|
|
flags = 0
|
|
|
|
for x, row in enumerate(data.field):
|
|
str_row = []
|
|
for y, cell in enumerate(row):
|
|
if (
|
|
data.game_over
|
|
and (not data.win or cell != CellState.FLAGGED)
|
|
and data.is_bomb(x, y)
|
|
):
|
|
str_row.append(STR_BOMB)
|
|
else:
|
|
match cell:
|
|
case CellState.CLOSED:
|
|
str_row.append(STR_CLOSED)
|
|
case CellState.FLAGGED:
|
|
str_row.append(STR_FLAGGED)
|
|
flags += 1
|
|
case CellState.MARKED:
|
|
str_row.append(STR_MARKED)
|
|
case _:
|
|
str_row.append(STR_NUMBERS[data.get_number(x, y)])
|
|
|
|
str_field.append(str_row)
|
|
|
|
if data.game_over:
|
|
text = "Победа!" if data.win else "Поражение!"
|
|
else:
|
|
text = ""
|
|
await emit_all_in_room("update-field", str_field, flags, data.bombs, text)
|
|
|
|
is_host = width > 0 and height > 0 and bombs > 0
|
|
room_id = room.sid
|
|
data = room.room_data(lambda: GameData(width, height, bombs).generate())
|
|
|
|
@socket.on("chat")
|
|
async def chat(msg: str):
|
|
await room.emit("chat", nickname, msg)
|
|
|
|
@socket.on("disconnect")
|
|
async def disconnect(_reason):
|
|
if is_host:
|
|
await room.emit("reload-page")
|
|
else:
|
|
await room.emit("other-disconnect", socket.sid, nickname)
|
|
|
|
@socket.on("about-me")
|
|
async def about_me():
|
|
await room.emit("about-me", socket.sid, nickname + (" 👑" if is_host else ""))
|
|
|
|
@socket.on("cursor")
|
|
async def cursor(x: float, y: float):
|
|
await room.emit("cursor", socket.sid, nickname, x, y)
|
|
|
|
@socket.on("restart")
|
|
async def restart():
|
|
data.generate()
|
|
await update_field()
|
|
|
|
def openCells(x: int, y: int):
|
|
if (
|
|
x < 0
|
|
or y < 0
|
|
or x >= data.width
|
|
or y >= data.height
|
|
or data.field[x][y] == CellState.OPEN
|
|
):
|
|
return
|
|
|
|
data.field[x][y] = CellState.OPEN
|
|
|
|
if data.get_number(x, y) == 0:
|
|
for i in range(-1, 2):
|
|
for j in range(-1, 2):
|
|
openCells(x + i, y + j)
|
|
|
|
@socket.on("click")
|
|
async def click(left: bool, x: int, y: int):
|
|
if data.game_over or x < 0 or y < 0 or x >= data.width or y >= data.height:
|
|
return
|
|
|
|
cell = data.field[x][y]
|
|
|
|
if cell == CellState.OPEN:
|
|
return
|
|
|
|
if left:
|
|
if cell != CellState.CLOSED:
|
|
return
|
|
if data.is_bomb(x, y):
|
|
data.game_over = True
|
|
data.field[x][y] = CellState.OPEN
|
|
else:
|
|
if len(data.bombs_in_field) == 0:
|
|
data.generate_bombs(x, y)
|
|
openCells(x, y)
|
|
data.win = data.is_win()
|
|
if data.win:
|
|
data.game_over = True
|
|
|
|
else:
|
|
cell += 1
|
|
if cell == CellState.NOT_A_FIELD:
|
|
cell = CellState.CLOSED
|
|
data.field[x][y] = cell
|
|
if cell == CellState.FLAGGED:
|
|
data.win = data.is_win()
|
|
if data.win:
|
|
data.game_over = True
|
|
|
|
await update_field()
|
|
|
|
await socket.emit("room-join", room_id)
|
|
await room.emit("other-join", socket.sid, nickname)
|
|
await update_field()
|