前言/概况

这是一个针对 Minecraft 服务器的轻量级守护代理/看门狗脚本(Python 3),用于:当没有玩家时让后端服务器处于关闭/休眠状态,玩家连接时自动启动后端并给出友好提示;后端启动后自动将玩家代理到真实服务器。脚本设计轻量、可配置,适合在 Linux + tmux终端 环境下配合自动关机脚本(例如skript插件)使用,从而在空载时最大化节省服务器资源。
notice:脚本在文章最后给出


设计初衷

Minecraft 服务器即便在空载(无人在线)时仍会占用较多内存与 CPU(JVM、G1 GC、插件等),对成本敏感或运行在资源受限设备(树莓派/香橙派/小型 VPS)的情况下尤为明显。为此,本脚本采用「延迟启动 + 透明代理」的思路:当有玩家尝试连接时,先判断后端是否可用;若不可用则发起后端重启流程并向玩家返回“服务器正在启动,请等待”类型的聊天/状态信息;当后端可用时,立即将玩家连接代理到后端,保证体验尽可能透明。


高层流程(直观说明)

  1. 脚本监听公网(或内部)LISTEN_PORT(通常为 25565)。

  2. 玩家连接 -> 解析握手包以判断客户端是 Status(查询服务器列表)还是 Login(实际进入服务器)。

  3. 检测后端 BACKEND_HOST:BACKEND_PORT 是否可连:

    • 可连:建立到后端的透明代理(并将已消费的握手原样写回后端),双向转发数据。\

    • 不可连:如果未达最大重试次数,则触发 tmux 重启后端的异步重试序列,并向玩家返回“服务器启动中,请等待”的聊天或状态包;若达上限则直接返回后端错误信息。

  4. 后端启动并可连通后,新的连接将直接被代理到后端。


关键实现点(基于脚本)

1. 透明代理 + 保留握手

脚本在 parse_handshake_and_get_nextstate 中解析握手,获得 nextState(1 = Status, 2 = Login)并将原始握手包完整保存。如果后端可连,脚本会先把保存的握手原样转发给后端,保证代理对后端透明(玩家/客户端不会感知中间层)。

2. Minecraft 协议处理(VarInt / 包打包)

脚本实现了 VarInt 编解码、字符串打包等最小需求的封装,并能构建两类回复包:

  • Login: Disconnect(玩家在登录阶段收到的聊天断开包),用于 Login 阶段发 "服务器启动中/后端错误"。

  • Status: Response(服务器列表信息),用于 Status 阶段给出启动提示或错误文本。

这样可以在玩家服务器列表和实际连接时都给出合适的友好信息。

3. 后端可用性检测与 tmux 控制

check_backend_available() 使用短超时的 TCP 连接尝试判断后端是否在线;若不可连,tmux_restart_server() 会通过 tmux kill-session + tmux new-session -d 在指定 SERVER_DIR 下用 START_COMMAND 启动后端(命令可高度自定义)。该方式的好处是:通过 tmux 的新会话能把服务器进程放在用户可进入的会话里,便于人工检查与日志查看。

4. 异步重启/重试序列(防止并发重启)

start_restart_sequence_async() 使用线程和 _state_lock 保证同一时间只会有一个重启序列在运行,并用 _retry_count 跟踪当前已尝试的重启次数。重试期间会周期性检测后端状态,直到可连或达到 MAX_RETRIES。这种设计避免重复重启浪费资源,也能在短时间多次重启以尝试恢复服务。

5. 双向转发与线程模型

对每个连接,脚本会在后端可连时创建两个转发线程(client->backend, backend->client),以及在不可连时直接返回合适的消息并关闭连接。整体线程模型非常轻量:每个连接只占用两个短寿命线程,主线程只负责 accept 并分发处理。


配置项说明(可调整项与推荐值)

  • BIND_HOST / LISTEN_PORT:监听地址与端口,通常 0.0.0.0:25565

  • BACKEND_HOST / BACKEND_PORT:后端真实 MC 服务地址(本机可为 127.0.0.1 / 不同端口)。这里的BACKEND_PORT 设置的端口记得要和里的一样server.properties里的一样

  • TMUX_SESSION / SERVER_DIR / START_COMMAND:tmux 会话名、服务器目录、启动命令(必须在 SERVER_DIR 下可执行)。强烈建议START_COMMAND 调整为你系统上的绝对 java 路径和实际 jar/参数,并保留足够的 Xms/Xmx 设置匹配你的机器。脚本默认给出了一串 Aikar 风格的 JVM flags,适用于 1.20.x 的高性能调优,但可按需删减。

  • STARTUP_MESSAGE / BACKEND_ERROR_MESSAGE:玩家看到的提示文本,可改为中文或更详尽的信息。

  • MAX_RETRIES / RETRY_INTERVAL:重启上限与重试间隔(秒)。默认 MAX_RETRIES = 5RETRY_INTERVAL = 300(5 分钟)是个较保守的设置;想更积极恢复可缩短间隔或增加重试次数。

  • BACKEND_CHECK_TIMEOUT / CLIENT_READ_TIMEOUT:socket 超时,用于快速判断后端连通性和避免僵尸连接。可根据网络延迟适当放宽。


优点与适用场景

  • 高配置灵活性:几乎所有重要参数都集中在脚本开头(端口、tmux、服务器目录、启动命令、重试策略、超时等),方便按需调整并适配不同机器(低端树莓派到高端 VPS)。

  • 极低的常驻开销:代理脚本本身只是一个小型 Python 网络服务(多线程 I/O),内存与 CPU 占用很低;在无人访问时,后端可以保持关闭/关机,从而把真正的资源占用降到最小。

  • 用户友好:在玩家点击服务器列表或尝试连接时,会得到明确的中文/自定义提示("服务器启动中,请等待"),避免误以为服务器挂了。

  • 透明代理:握手原样转发与双向转发保证客户端能无感连接到后端(一旦后端就绪)。

  • 兼容自动关机:与自动关机脚本组合时(例如:当没有玩家且后端长时间无效则执行 shutdown 或挂起),可以显著节省云/VPS/树莓派 的资源与费用。

  • 容易调试:使用 tmux 启动服务便于管理员随时 tmux attach -t <session> 查看控制台输出与日志。

  • 稳健的重启策略:带有重试计数与互斥的重启序列,避免重启风暴和重复启动。管理员可根据需要调整 MAX_RETRIESRETRY_INTERVAL


使用注意(必须/建议事项)

  1. 运行环境:Linux(已知兼容 Debian/Ubuntu/CentOS 等),需要安装 python3tmux。脚本通过 subprocess.run 调用 tmux,请确保运行该脚本的用户有权限执行 tmux、并能访问 SERVER_DIR

  2. 系统服务化建议:把脚本作为 systemd 服务或放在后端更长寿的管理进程下运行(或用 nohup / screen / tmux),以保证脚本意外退出能被重启(systemd 示例如下,示例未包含在脚本内,可按需创建)。

  3. START_COMMAND 与权限START_COMMAND 中若使用相对路径或自定义 JVM,请确认路径正确且可执行;建议使用绝对路径指向 java 可执行文件以避免环境变量问题。

  4. 端口与防火墙:确保 LISTEN_PORT(通常 25565)对外可达,同时 BACKEND_PORT(例如 25566)在本机/内部网络中正确映射并未被防火墙阻塞。

  5. 日志与监控:脚本会向 stdout 打印简单日志,建议用 systemd 的 journal 或重定向到文件做长期保存与分析。

  6. 与自动关机配合:可以通过skript插件实现检测无人时延时关机,示例脚本如下:

    options:
        idle-time: 10 minutes  # 无人在线等待时间
    on load:
        set {lastOnline} to now
    every 1 minutes:
        if number of players is 0:
            set {_idle} to difference between now and {lastOnline}
            if {_idle} >= {@idle-time}:
                broadcast "&c无人在线超过 10 分钟,服务器即将关闭..."
                execute console command "stop"
        else:
            set {lastOnline} to now

部署示例(简要)

  1. 将脚本保存为 mc_proxy_autorestart.py,放在服务器某目录并 chmod +x

  2. 在脚本开头修改配置项(特别是 SERVER_DIRSTART_COMMANDTMUX_SESSIONBACKEND_PORT)。

  3. 安装 tmuxpython3(一般服务器自带,可通过以下命令查询,若没安装,自行bing搜索方法)

    tmux -V
    python3 --version
  4. 后台运行脚本:

    nohup python3 mc_proxy_autorestart.py &
  5. 或建议使用 systemd(示例,仅供参考,创建 /etc/systemd/system/mc-watchdog.service):

    [Unit]
    Description=MC Proxy Autorestart Watchdog
    After=network.target
    
    [Service]
    Type=simple
    User=youruser
    WorkingDirectory=/path/to/script
    ExecStart=/usr/bin/python3 mc_proxy_autorestart.py
    Restart=on-failure
    
    [Install]
    WantedBy=multi-user.target

常见问题(FAQ)

  • Q:为什么玩家连上去后马上被断开?

    • A:脚本在后端不可用时会直接返回 STARTUP_MESSAGE(Login 阶段发送断开包),客户端会显示消息并断开。等后端就绪后再次连接即可。

  • Q:如何让脚本在重启后自动把玩家直接转发到后端而不是再次要求连接?

    • A:因为 Minecraft 协议和客户端行为限制,短时间内重新连接是更稳妥的做法。脚本的做法是先向玩家发送启动提示并断开,玩家或客户端会在服务器就绪后再次尝试连接(如有插件/客户端重连器可自动处理)。

  • Q:我不想使用 tmux,能改为 systemd 吗?

    • A:可以。tmux_restart_server() 的逻辑负责把命令在 SERVER_DIR 下以 tmux 会话方式启动。你可以把它替换为 subprocess.run(["systemctl", "start", "your-mc.service"]) 或直接用其他进程管理工具。tmux 的好处是便于手动 attach 观察控制台输出。

最终脚本

#!/usr/bin/env python3
import socket
import threading
import subprocess
import time
import json
from datetime import datetime
from typing import Tuple

# -------------------- 可配置区域 --------------------
BIND_HOST = "0.0.0.0"
LISTEN_PORT = 25565               # 监听端口(玩家连入)
BACKEND_HOST = "127.0.0.1"        # 后端地址
BACKEND_PORT = 25566              # 后端端口(被代理的mc服务器端口)

TMUX_SESSION = "weimin"           # tmux 会话名
SERVER_DIR = "/weimin"            # 服务器文件夹(tmux 新会话进入的目录)
START_COMMAND = "//jdk-21/bin/java -Xms2G -Xmx10G -XX:+UseG1GC -XX:+ParallelRefProcEnabled -XX:MaxGCPauseMillis=200 -XX:+UnlockExperimentalVMOptions -XX:+DisableExplicitGC -XX:+AlwaysPreTouch -XX:G1HeapWastePercent=5 -XX:G1MixedGCCountTarget=4 -XX:InitiatingHeapOccupancyPercent=15 -XX:G1MixedGCLiveThresholdPercent=90 -XX:G1RSetUpdatingPauseTimePercent=5 -XX:SurvivorRatio=32 -XX:+PerfDisableSharedMem -XX:MaxTenuringThreshold=1 -Dusing.aikars.flags=https://mcflags.emc.gs -Daikars.new.flags=true -XX:G1NewSizePercent=30 -XX:G1MaxNewSizePercent=40 -XX:G1HeapRegionSize=8M -XX:G1ReservePercent=20 -jar leaves-1.20.4.jar nogui"
# 发送给客户端的“正在启动”文本(将作为 JSON chat)
STARTUP_MESSAGE = "服务器启动中,请等待2min......"
# 超过最大重试次数后给客户端的错误提示
BACKEND_ERROR_MESSAGE = "服务器后端错误,请联系管理员!"
# Status (服务器列表) 要返回的 MOTD(原始,使用 & 作为颜色/样式标记)
STATUS_MOTD_RAW = "&b&l&m-----------&r&6&l [ XXX服务器 ] &b&l&m-----------\n&e&l生存纯净服务器  §eQ群:§2XXXXXXXXX"

MAX_RETRIES = 5                   # 最大重试次数(达到后不再重启)
RETRY_INTERVAL = 300              # 重试检测间隔(秒)

# 检测后端的 socket 超时(秒)
BACKEND_CHECK_TIMEOUT = 3.0

# 客户端读取初始数据(handshake)超时(秒)
CLIENT_READ_TIMEOUT = 5.0

# -------------------- 全局状态 --------------------
_state_lock = threading.Lock()
_retry_count = 0                  # 当前已重启次数
_restart_in_progress = False      # 是否有重启/重试的后台流程正在运行

# -------------------- 日志 --------------------
def log(msg: str):
    now = datetime.now().strftime("%Y/%m/%d %H:%M")
    print(f"[{now}] {msg}", flush=True)

# -------------------- VarInt 与 Minecraft 包辅助 --------------------
def encode_varint(value: int) -> bytes:
    out = bytearray()
    v = value & 0xFFFFFFFF
    while True:
        temp = v & 0x7F
        v >>= 7
        if v != 0:
            temp |= 0x80
        out.append(temp)
        if v == 0:
            break
    return bytes(out)

def decode_varint_from_bytes(data: bytes, offset: int = 0) -> Tuple[int, int]:
    num_read = 0
    result = 0
    while True:
        if offset + num_read >= len(data):
            raise ValueError("Not enough bytes for varint")
        read = data[offset + num_read]
        value = read & 0x7F
        result |= (value << (7 * num_read))
        num_read += 1
        if (read & 0x80) == 0:
            break
        if num_read > 5:
            raise ValueError("VarInt too big")
    return result, offset + num_read

def pack_mc_string(s: str) -> bytes:
    b = s.encode('utf-8')
    return encode_varint(len(b)) + b

def build_login_disconnect_packet(message: str) -> bytes:
    chat_json = json.dumps({"text": message}, ensure_ascii=False)
    packet_id = encode_varint(0x00)         # Login: Disconnect packet id
    payload = pack_mc_string(chat_json)
    packet = packet_id + payload
    full = encode_varint(len(packet)) + packet
    return full

def build_status_response_packet(message: str) -> bytes:
    resp = {"description": {"text": message}}
    s = json.dumps(resp, ensure_ascii=False)
    packet_id = encode_varint(0x00)  # Status: Response packet id
    payload = pack_mc_string(s)
    packet = packet_id + payload
    full = encode_varint(len(packet)) + packet
    return full
def convert_amp_to_section(s: str) -> str:
    # 将 &x 风格的颜色/样式代码转换成 Minecraft 的 section sign(§x)
    return s.replace('&', '\u00A7')
# -------------------- 读取辅助 --------------------
def read_exact(sock: socket.socket, n: int) -> bytes:
    buf = bytearray()
    while len(buf) < n:
        chunk = sock.recv(n - len(buf))
        if not chunk:
            raise ConnectionError("EOF while reading exact bytes")
        buf.extend(chunk)
    return bytes(buf)

def read_varint_socket(sock: socket.socket, timeout: float = CLIENT_READ_TIMEOUT) -> int:
    sock.settimeout(timeout)
    num_read = 0
    result = 0
    while True:
        b = sock.recv(1)
        if not b:
            raise ConnectionError("EOF while reading varint")
        byte = b[0]
        value = byte & 0x7F
        result |= (value << (7 * num_read))
        num_read += 1
        if (byte & 0x80) == 0:
            break
        if num_read > 5:
            raise ValueError("VarInt too big")
    return result

# -------------------- 后端检测与 tmux 控制 --------------------
def check_backend_available() -> bool:
    try:
        with socket.create_connection((BACKEND_HOST, BACKEND_PORT), timeout=BACKEND_CHECK_TIMEOUT) as s:
            return True
    except ConnectionRefusedError:
        return False
    except Exception:
        return False

def tmux_restart_server():
    log(f"尝试重启 tmux 会话 '{TMUX_SESSION}'(kill -> new)")
    try:
        subprocess.run(["tmux", "kill-session", "-t", TMUX_SESSION], check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    except Exception as e:
        log(f"tmux kill-session 异常: {e}")
    full_cmd = f"cd {SERVER_DIR} && {START_COMMAND}"
    try:
        subprocess.run(["tmux", "new-session", "-d", "-s", TMUX_SESSION, "bash", "-lc", full_cmd], check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        log("已执行 tmux 新建会话并启动命令。")
    except Exception as e:
        log(f"tmux new-session 异常: {e}")

# -------------------- 重启/重试控制流程 --------------------
def start_restart_sequence_async():
    global _restart_in_progress, _retry_count

    with _state_lock:
        if _restart_in_progress:
            log("重启序列已在进行中,跳过新序列启动。")
            return
        # 立即标记并把这次重启计入(保证第一次调用就算第1次)
        _restart_in_progress = True
        _retry_count += 1
        initial_try = _retry_count

    log(f"开始重启序列(立即执行第 {initial_try} 次重启)")

    def worker():
        global _restart_in_progress, _retry_count
        try:
            tmux_restart_server()
            while True:
                time.sleep(RETRY_INTERVAL)
                if check_backend_available():
                    log("后端检测:可连通。重试序列结束,重试计数清零。")
                    with _state_lock:
                        _retry_count = 0
                    break
                with _state_lock:
                    if _retry_count >= MAX_RETRIES:
                        log(f"重试次数已达上限({_retry_count} >= {MAX_RETRIES}),停止再重启。")
                        break
                    _retry_count += 1
                    current_try = _retry_count
                log(f"后端仍不可连,执行第 {current_try} 次重启(将在本次循环中执行)。")
                tmux_restart_server()
        finally:
            with _state_lock:
                _restart_in_progress = False
            log("重启序列线程结束。")

    t = threading.Thread(target=worker, daemon=True)
    t.start()

# -------------------- 数据转发 --------------------
def forward(src: socket.socket, dst: socket.socket):
    try:
        while True:
            data = src.recv(4096)
            if not data:
                break
            dst.sendall(data)
    except Exception:
        pass
    finally:
        try:
            dst.shutdown(socket.SHUT_WR)
        except Exception:
            pass

# -------------------- 客户端 handshake 解析(返回 nextState 与完整包 bytes) --------------------
def parse_handshake_and_get_nextstate(client_sock: socket.socket) -> Tuple[int, bytes]:
    """
    读取并解析 handshake 包,返回 (next_state, full_packet_bytes)
    full_packet_bytes = encode_varint(packet_len) + raw_packet_bytes
    若解析失败,返回 (2, b"")(假定 Login)
    """
    try:
        # 读取包长度 VarInt(整数)
        packet_len = read_varint_socket(client_sock, timeout=CLIENT_READ_TIMEOUT)
        # 读取整个包内容
        raw = read_exact(client_sock, packet_len)
        # 读取 packet id VarInt
        pkt_id, offset = decode_varint_from_bytes(raw, 0)
        if pkt_id != 0x00:
            # 不是 handshake
            return 2, (encode_varint(packet_len) + raw)
        # decode protocol version
        _, offset = decode_varint_from_bytes(raw, offset)
        # decode server address string
        addr_len, offset = decode_varint_from_bytes(raw, offset)
        offset += addr_len
        # skip port (2 bytes)
        offset += 2
        # decode nextState
        next_state, _ = decode_varint_from_bytes(raw, offset)
        full = encode_varint(packet_len) + raw
        return next_state, full
    except Exception:
        return 2, b""

def send_packet_and_close(sock: socket.socket, packet: bytes):
    try:
        sock.sendall(packet)
        try:
            sock.shutdown(socket.SHUT_WR)
        except Exception:
            pass
        time.sleep(0.05)
    except Exception:
        pass
    finally:
        try:
            sock.close()
        except Exception:
            pass

# -------------------- 客户端连接处理 --------------------
def handle_client(client_sock: socket.socket, addr):
    global _retry_count  # 修复 UnboundLocalError:函数中有对这个全局变量的写入/赋值
    client_sock.settimeout(CLIENT_READ_TIMEOUT)
    log(f"玩家连入:{addr[0]}:{addr[1]}")

    # 解析 handshake 并拿到原始握手包(如果解析失败,handshake_bytes 为空)
    next_state, handshake_bytes = parse_handshake_and_get_nextstate(client_sock)
    state_name = "Status" if next_state == 1 else "Login"
    log(f"解析客户端 handshake,nextState={next_state} ({state_name})")
    # 如果是 Status 请求,只返回自定义 MOTD(不触发重启)
    if next_state == 1:
        # 将 & 转为 §,然后构造 Status Response 包并发送
        motd = convert_amp_to_section(STATUS_MOTD_RAW)
        pkt = build_status_response_packet(motd)
        send_packet_and_close(client_sock, pkt)
        log(f"已发送 MOTD(Status)给 {addr},未触发重启。")
        return

    # quick backend check
    if check_backend_available():
        # 后端可连,清零重试计数(确保成功后计数清零)
        with _state_lock:
            if _retry_count != 0:
                log("检测到后端可连,清零重试计数。")
            _retry_count = 0

        # 建立到后端的连接并代理;若有 handshake_bytes,则先回写给后端,保证透明代理
        try:
            backend = socket.create_connection((BACKEND_HOST, BACKEND_PORT), timeout=BACKEND_CHECK_TIMEOUT)
        except Exception as e:
            log(f"尝试连接后端失败(应少见):{e}")
            send_pkt = build_login_disconnect_packet(BACKEND_ERROR_MESSAGE) if next_state == 2 else build_status_response_packet(BACKEND_ERROR_MESSAGE)
            send_packet_and_close(client_sock, send_pkt)
            return

        # 如果我们之前消费了 handshake,把原始握手包先写回给后端
        if handshake_bytes:
            try:
                backend.sendall(handshake_bytes)
            except Exception as e:
                log(f"回写 handshake 到后端失败:{e}")

        # 开始双向代理
        t1 = threading.Thread(target=forward, args=(client_sock, backend), daemon=True)
        t2 = threading.Thread(target=forward, args=(backend, client_sock), daemon=True)
        t1.start(); t2.start()
        log(f"开始代理 client {addr} <-> backend {BACKEND_HOST}:{BACKEND_PORT}")
        t1.join(); t2.join()
        try:
            client_sock.close()
            backend.close()
        except Exception:
            pass
        log(f"代理连接结束:{addr}")
        return

    # 若后端不可连
    with _state_lock:
        retry_count_snapshot = _retry_count

    if retry_count_snapshot >= MAX_RETRIES:
        # 已达到上限,不再重启,直接给客户端错误提示并断开(并且不要触发重启)
        log("后端不可连且重试次数已达上限 -> 直接告知客户端后端错误(不再重启)。")
        pkt = build_login_disconnect_packet(BACKEND_ERROR_MESSAGE) if next_state == 2 else build_status_response_packet(BACKEND_ERROR_MESSAGE)
        send_packet_and_close(client_sock, pkt)
        return

    # 启动重启/重试序列(如果尚未进行)——此函数会立即把重试计数 +1
    start_restart_sequence_async()

    # 给客户端发送启动中提示(按 nextState 选择包类型)
    pkt = build_login_disconnect_packet(STARTUP_MESSAGE) if next_state == 2 else build_status_response_packet(STARTUP_MESSAGE)
    send_packet_and_close(client_sock, pkt)
    log(f"已发送客户端启动提示文本并断开:{addr}")

# -------------------- 主监听循环 --------------------
def serve_forever():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((BIND_HOST, LISTEN_PORT))
    sock.listen(200)
    log(f"监听启动 -> {BIND_HOST}:{LISTEN_PORT},后端目标 {BACKEND_HOST}:{BACKEND_PORT}")
    try:
        while True:
            try:
                client, addr = sock.accept()
            except KeyboardInterrupt:
                break
            except Exception as e:
                log(f"accept 异常:{e}")
                continue
            t = threading.Thread(target=handle_client, args=(client, addr), daemon=True)
            t.start()
    finally:
        sock.close()

if __name__ == "__main__":
    log("mc_proxy_autorestart 启动")
    serve_forever()