This commit is contained in:
Fasterino
2025-10-20 21:08:52 +03:00
commit 4aaa436079
20 changed files with 6824 additions and 0 deletions

19
back/__init__.py Normal file
View File

@@ -0,0 +1,19 @@
from fastapi import APIRouter
from socketio import AsyncServer
from .socket import Socket
from .game import game_router
from .create_room import create_room_router
def get_backend() -> tuple[APIRouter, AsyncServer]:
router = APIRouter(prefix="/api")
sio = AsyncServer(async_mode="asgi")
server = Socket(sio)
@sio.on("connect") # type: ignore
async def sio_connect(sid, _):
socket = server.to(sid)
await create_room_router(server, socket)
return router, sio

108
back/create_room.py Normal file
View File

@@ -0,0 +1,108 @@
from random import randint
from .socket import Socket
from .game import game_router
async def create_room_router(server: Socket, socket: Socket):
async def check_nickname(nickname: str) -> str | None:
nickname = nickname.strip()
if len(nickname) < 4:
await socket.emit("room-error", "Никнейм должен быть больше 4 букв!")
return
return nickname
async def check_width(width: str) -> int | None:
width = width.strip()
if not width.isnumeric():
await socket.emit("room-error", "Ширина должна быть числом!")
return
width_i = int(width)
if width_i < 5 or width_i > 25:
await socket.emit("room-error", "Ширина может быть числом от 5 до 25!")
return
return width_i
async def check_height(height: str) -> int | None:
height = height.strip()
if not height.isnumeric():
await socket.emit("room-error", "Высота должна быть числом!")
return
height_i = int(height)
if height_i < 5 or height_i > 15:
await socket.emit("room-error", "Высота может быть числом от 5 до 15!")
return
return height_i
async def check_bombs(bombs: str, width: int, height: int) -> int | None:
bombs = bombs.strip()
if not bombs.isnumeric():
await socket.emit("room-error", "Кол-во бомб должно быть числом!")
return
bombs_i = int(bombs)
bombs_max = (width * height * 2) // 5
if bombs_i < 5 or bombs_i > bombs_max:
await socket.emit(
"room-error", f"Кол-во бомб может быть от 5 до {bombs_max}!"
)
return
return bombs_i
async def check_room(room_id: str) -> str | None:
room_id = room_id.strip()
if not room_id.isnumeric() or len(room_id) != 6:
await socket.emit("room-error", "Код должен быть из 6 цифр!")
return
if not server.room_exists(room_id):
await socket.emit("room-error", "Такой комнаты не существует!")
return
return room_id
@socket.on("create-room")
async def create_room(_nickname: str, _width: str, _height: str, _bombs: str):
nickname = await check_nickname(_nickname)
if nickname is None:
return
width = await check_width(_width)
if width is None:
return
height = await check_height(_height)
if height is None:
return
bombs = await check_bombs(_bombs, width, height)
if bombs is None:
return
room_id = str(randint(100000, 999999))
room = await socket.join(room_id)
await game_router(server, socket, room, nickname, width, height, bombs)
@socket.on("join-room")
async def join_room(_nickname: str, _room_id: str):
nickname = await check_nickname(_nickname)
if nickname is None:
return
room_id = await check_room(_room_id)
if room_id is None:
return
room = await socket.join(room_id)
await game_router(server, socket, room, nickname)

231
back/game.py Normal file
View File

@@ -0,0 +1,231 @@
from dataclasses import dataclass, field
from types import SimpleNamespace
from random import randint
from .socket import Socket
class CellState:
OPEN = 0
CLOSED = 1
FLAGGED = 2
MARKED = 3
NOT_A_FIELD = 4
STR_CLOSED = ""
STR_BOMB = "💣"
STR_FLAGGED = "🚩"
STR_MARKED = ""
STR_NUMBERS = [" ", *(str(i) for i in range(1, 9))]
@dataclass
class GameData:
width: int
height: int
bombs: int
game_over: bool = False
win: bool = True
bombs_in_field: list[tuple[int, int]] = field(default_factory=list)
field: list[list[int]] = field(default_factory=list)
def generate(self) -> "GameData":
self.game_over = False
self.win
self.field.clear()
self.bombs_in_field.clear()
for _ in range(self.width):
self.field.append([CellState.CLOSED for _ in range(self.height)])
return self
def generate_bombs(self, ex: int, ey: int):
while len(self.bombs_in_field) < self.bombs:
x = randint(0, self.width - 1)
y = randint(0, self.height - 1)
if x == ex and y == ey:
continue
skip = False
for x1, y1 in self.bombs_in_field:
if x1 == x and y1 == y:
skip = True
break
if skip:
continue
self.bombs_in_field.append((x, y))
def is_bomb(self, x: int, y: int) -> bool:
for x1, y1 in self.bombs_in_field:
if x1 == x and y1 == y:
return True
return False
def get_number(self, x: int, y: int) -> int:
bombs = 0
for i in range(-1, 2):
for j in range(-1, 2):
if i == 0 and j == 0:
continue
if self.is_bomb(x + i, y + j):
bombs += 1
return bombs
def is_win(self) -> bool:
if len(self.bombs_in_field) == 0:
return False
must_be_open = self.width * self.height - self.bombs
flaged_cells = 0
open_cells = 0
for x in range(self.width):
for y in range(self.height):
if self.field[x][y] == CellState.FLAGGED and self.is_bomb(x, y):
flaged_cells += 1
if self.field[x][y] == CellState.OPEN:
open_cells += 1
if open_cells == must_be_open or self.bombs == flaged_cells:
return True
return False
async def game_router(
server: Socket,
socket: Socket,
room: Socket,
nickname: str,
width=0,
height=0,
bombs=0,
):
async def emit_all_in_room(event: str, *args):
await socket.emit(event, *args)
await room.emit(event, *args)
async def update_field():
str_field = []
flags = 0
for x, row in enumerate(data.field):
str_row = []
for y, cell in enumerate(row):
if (
data.game_over
and (not data.win or cell != CellState.FLAGGED)
and data.is_bomb(x, y)
):
str_row.append(STR_BOMB)
else:
match cell:
case CellState.CLOSED:
str_row.append(STR_CLOSED)
case CellState.FLAGGED:
str_row.append(STR_FLAGGED)
flags += 1
case CellState.MARKED:
str_row.append(STR_MARKED)
case _:
str_row.append(STR_NUMBERS[data.get_number(x, y)])
str_field.append(str_row)
if data.game_over:
text = "Победа!" if data.win else "Поражение!"
else:
text = ""
await emit_all_in_room("update-field", str_field, flags, data.bombs, text)
is_host = width > 0 and height > 0 and bombs > 0
room_id = room.sid
data = room.room_data(lambda: GameData(width, height, bombs).generate())
@socket.on("chat")
async def chat(msg: str):
await room.emit("chat", nickname, msg)
@socket.on("disconnect")
async def disconnect(_reason):
if is_host:
await room.emit("reload-page")
else:
await room.emit("other-disconnect", socket.sid, nickname)
@socket.on("about-me")
async def about_me():
await room.emit("about-me", socket.sid, nickname + (" 👑" if is_host else ""))
@socket.on("cursor")
async def cursor(x: float, y: float):
await room.emit("cursor", socket.sid, nickname, x, y)
@socket.on("restart")
async def restart():
data.generate()
await update_field()
def openCells(x: int, y: int):
if (
x < 0
or y < 0
or x >= data.width
or y >= data.height
or data.field[x][y] == CellState.OPEN
):
return
data.field[x][y] = CellState.OPEN
if data.get_number(x, y) == 0:
for i in range(-1, 2):
for j in range(-1, 2):
openCells(x + i, y + j)
@socket.on("click")
async def click(left: bool, x: int, y: int):
if data.game_over or x < 0 or y < 0 or x >= data.width or y >= data.height:
return
cell = data.field[x][y]
if cell == CellState.OPEN:
return
if left:
if cell != CellState.CLOSED:
return
if data.is_bomb(x, y):
data.game_over = True
data.field[x][y] = CellState.OPEN
else:
if len(data.bombs_in_field) == 0:
data.generate_bombs(x, y)
openCells(x, y)
data.win = data.is_win()
if data.win:
data.game_over = True
else:
cell += 1
if cell == CellState.NOT_A_FIELD:
cell = CellState.CLOSED
data.field[x][y] = cell
if cell == CellState.FLAGGED:
data.win = data.is_win()
if data.win:
data.game_over = True
await update_field()
await socket.emit("room-join", room_id)
await room.emit("other-join", socket.sid, nickname)
await update_field()

123
back/socket.py Normal file
View File

@@ -0,0 +1,123 @@
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar
from socketio import AsyncServer
T = TypeVar("T")
@dataclass
class SocketMemory:
callbacks: dict[str, dict[str, Callable]] = field(default_factory=dict)
room_data: dict[str, tuple[str, Any]] = field(default_factory=dict)
class Socket:
sio: AsyncServer
from_sid: str | None
sid: str | None
mem: SocketMemory
def __init__(self, sio: AsyncServer, mem: SocketMemory | None = None) -> None:
self.sio = sio
self.from_sid = None
self.sid = None
if mem is None:
self.mem = SocketMemory()
self.on("disconnect")(lambda: None)
else:
self.mem = mem
def __str__(self) -> str:
if self.sid is None:
return "Server Socket"
elif self.from_sid is None:
return "Client Socket {id: " + self.sid + "}"
else:
return "Room Socket {id: " + self.sid + ", room_id: " + self.from_sid + "}"
def clear(self, sid: str) -> None:
for event in self.mem.callbacks:
if sid in self.mem.callbacks[event]:
self.mem.callbacks[event].pop(sid)
for room in tuple(self.mem.room_data.keys()):
owner = self.mem.room_data[room][0]
if owner == sid:
self.mem.room_data.pop(room)
def to(self, sid: str) -> "Socket":
socket = Socket(self.sio, self.mem)
socket.from_sid = self.sid
socket.sid = sid
return socket
def on(self, event: str):
def wrapper(f: Callable):
if event not in self.mem.callbacks:
self.mem.callbacks[event] = {}
@self.sio.on(event) # type: ignore
async def handler(sid: str, *args):
if sid in self.mem.callbacks[event]:
await self.mem.callbacks[event][sid](*args)
if event == "disconnect":
self.clear(sid)
if self.sid is not None:
async def wrapper_inner(*args):
try:
await f(*args)
except Exception as e:
print("Error:", e)
self.mem.callbacks[event][self.sid] = wrapper_inner
return f
return wrapper
async def emit(self, event: str, *args):
if self.from_sid is None:
await self.sio.emit(event, args, to=self.sid)
else:
await self.sio.emit(event, args, room=self.sid, skip_sid=self.from_sid)
async def join(self, room: str) -> "Socket":
await self.sio.enter_room(self.sid, room)
return self.to(room)
async def leave(self, room: str):
await self.sio.leave_room(self.sid, room)
def room_exists(self, room: str):
rooms = self.sio.manager.rooms.get("/", {})
return room in rooms and len(rooms[room]) > 0
def room_data(self, f: Callable[[], T]) -> T:
if self.from_sid is None or self.sid is None:
raise Exception(f"{self} cannot use room data")
if self.sid in self.mem.room_data:
data = self.mem.room_data[self.sid][1]
return data
else:
data = f()
self.mem.room_data[self.sid] = self.from_sid, data
return data
async def connect(sio: AsyncServer, sid: str):
server = Socket(sio)
socket = server.to(sid)
print(f"Client #{sid} connected")
await socket.emit("msg", "Test", 123, [1, True, None])
@socket.on("msg")
async def test(s: str, n: int, l: list):
print(s, n * 10, l[0])