XM 500ドル入金ボーナスの詳細はこちら ▶︎

海外FX・仮想通貨・バイナリーオプションの無料ソースコード置き場

  • URLをコピーしました!

FX/仮想通貨トレードの収益性を高めたいなら、Pythonスクリプトがおすすめ。トレードを自動化できるだけではなく、スワップポイント一覧や取引履歴を表示できるようになります。

またPythonはMQL4/MQL5よりも表現力が高く、ソースコードもウェブ上にたくさん存在するため、LLMで優秀なソースコードを出力しやすいです。

この記事では海外FX・仮想通貨・バイナリーオプション向けの無料ソースコードを紹介していきます。

筆者の個人的な備忘録としてまとめているものです。収益を目的としたものではありません。ソースコードの開発者が利益を得られるよう、可能な限り引用元リンクも併記しておきます。

目次

海外FXの無料ソースコード

Stop Grid Trader(グリッド+ストラドル+ピラミッディングbot)

Stop Grid Traderは、グリッドトレード・ストラドル戦略・ピラミッディングトレードなどを行うPythonスクリプトです。(ソースコード引用元

ソースコードはMQL4/MQL5ではなくPythonで書かれており、運用にはPythonの運用環境が必要です。(VS codeなど)

メインのトレードロジックはピラミッディングトレードなので、ボラティリティが激しいほど利益率が高くなります。基本的にはアメリカの大型経済指標の直前で運用しましょう。

"""
Stop-Grid Trader  – version 3.1
------------------------------------------------------------
Mini-GUI bot for MetaTrader 5 that trades a symmetric
stop-grid around the current mid-price.

Key workflow (unchanged trading logic, polished GUI):
  1.  Select a running MT5 terminal.
  2.  Enter parameters, then press **Start**:
        • Symbol name (incl. suffix)
        • Price-digits to round to
        • Base lot (even, ≥ 0.02)
        • Orders per side
        • Grid multiplier  (= spread × factor)
        • Loop count (#restarts after a full close)
  3.  The bot places alternating BUY_STOP / SELL_STOP orders
      at ±multiplier·spread intervals, outermost stops include
      a TP one grid-step farther out.
  4.  Every 0.02-lot fill is partially closed at +1 grid step
      (0.01 lot).  The remainder:
          – SL is moved to break-even (both grid & BE-REV).
          – A 0.02-lot reverse STOP is placed at the BE price.
          – If the reverse position’s 0.01 lot remainder finds
            price already beyond the initial mid-price, it is
            closed instantly; otherwise TP = initial mid-price.
  5.  Once the current mid-price touches the outer TP level,
      all pending orders are removed, every position is closed,
      and (optionally) the grid restarts up to *loop count*.
"""

import tkinter as tk
from tkinter import ttk, messagebox
import threading, time, sys
import MetaTrader5 as mt5
try:
    import psutil
except ImportError:
    psutil = None

# ── default GUI values ──────────────────────────────────────────
DEF_SYMBOL        = "XAUUSD"
DEF_DIGITS        = 2
DEF_LOT           = 0.02
DEF_ORDERS_SIDE   = 10
DEF_MULTIPLIER    = 2.0
DEF_LOOP          = 0              # 0 ⇒ run once
# ── constants (rarely changed) ─────────────────────────────────
DEVIATION         = 100
MAGIC_NUMBER      = 0
GRID_TAG          = "basic grid"
CHECK_INTERVAL    = 1.0
# ───────────────────────────────────────────────────────────────


# ═════════════════════════ GUI HELPERS ═════════════════════════
def _discover_terminals() -> list[str]:
    paths = []
    if psutil:
        for p in psutil.process_iter(attrs=["name", "exe"]):
            if "terminal64.exe" in (p.info.get("name") or "").lower():
                exe = p.info.get("exe") or ""
                if exe and exe not in paths:
                    paths.append(exe)
    return paths


def choose_terminal() -> str | None:
    """Modal: pick an MT5 terminal."""
    root = tk.Tk(); root.withdraw()
    win = tk.Toplevel(root); win.title("Select MT5 terminal"); win.grab_set()

    cols = ("exe", "login", "server", "balance", "currency", "name")
    tree = ttk.Treeview(win, columns=cols, show="headings", height=8)
    for c, w in zip(cols, (340, 80, 170, 100, 70, 150)):
        tree.heading(c, text=c); tree.column(c, width=w, anchor="w")

    for exe in _discover_terminals():
        mt5.initialize(path=exe)
        acc, _ = mt5.account_info(), mt5.terminal_info()
        if acc:
            tree.insert(
                "", tk.END,
                values=(
                    exe, acc.login, acc.server,
                    f"{acc.balance:.2f}", acc.currency, acc.name
                )
            )
        mt5.shutdown()

    tree.grid(row=0, column=0, columnspan=2, padx=6, pady=6)

    sel: dict[str, str | None] = {"path": None}

    def _use() -> None:
        if tree.selection():
            sel["path"] = tree.item(tree.selection()[0], "values")[0]
            win.destroy()

    ttk.Button(win, text="Use", command=_use)\
        .grid(row=1, column=1, pady=(0, 6), padx=6, sticky="e")

    if tree.get_children():
        tree.selection_set(tree.get_children()[0])

    win.wait_window(); root.destroy()
    return sel["path"]


class ParamDialog(tk.Toplevel):
    """Gather user parameters; returns None if canceled."""

    def __init__(self, parent: tk.Tk):
        super().__init__(parent)
        self.title("Grid parameters"); self.grab_set()
        self.res: tuple | None = None

        rows = (
            ("Symbol",          DEF_SYMBOL),
            ("Price digits",    str(DEF_DIGITS)),
            ("Base lot",        f"{DEF_LOT:.2f}"),
            ("Orders / side",   str(DEF_ORDERS_SIDE)),
            ("Grid multiplier", str(DEF_MULTIPLIER)),
            ("Loop count",      str(DEF_LOOP)),
        )
        self.vars: list[tk.StringVar] = []
        for r, (label, default) in enumerate(rows):
            ttk.Label(self, text=label).grid(row=r, column=0, sticky="w", padx=6, pady=4)
            var = tk.StringVar(value=default); self.vars.append(var)
            ttk.Entry(self, textvariable=var, width=15)\
                .grid(row=r, column=1, sticky="w", padx=6, pady=4)

        ttk.Button(self, text="Start", command=self._ok)\
            .grid(row=len(rows), column=1, pady=8, sticky="e")

    def _ok(self) -> None:
        try:
            sym   = self.vars[0].get().strip()
            digs  = int(self.vars[1].get())
            lot   = float(self.vars[2].get())
            nside = int(self.vars[3].get())
            mult  = float(self.vars[4].get())
            loops = int(self.vars[5].get())

            if digs < 0:               raise ValueError("digits ≥ 0")
            if lot < 0.02 or int(round(lot * 100)) % 2:
                raise ValueError("lot must be even ×0.01 and ≥0.02")
            if nside < 1:              raise ValueError("orders / side ≥ 1")
            if mult <= 0:              raise ValueError("multiplier > 0")
            if loops < 0:              raise ValueError("loop count ≥ 0")
        except Exception as err:
            messagebox.showerror("Invalid input", str(err), parent=self)
            return

        self.res = (sym, digs, lot, nside, mult, loops)
        self.destroy()


# ═════════════════════════ TRADER CLASS ════════════════════════
class StopGridTrader:

    def __init__(
        self,
        terminal_path: str,
        symbol: str,
        digits: int,
        base_lot: float,
        orders_side: int,
        multiplier: float,
        loop_count: int
    ):
        self.path   = terminal_path
        self.symbol = symbol
        self.digits = digits
        self.lot    = base_lot
        self.side   = orders_side
        self.mult   = multiplier
        self.loopN  = loop_count
        self.done   = 0  # loops completed

        # runtime
        self.mid: float | None = None
        self.step_pts: int | None = None
        self.tp_high = self.tp_low = None
        self.running = False

        # status window
        self.root = tk.Tk(); self.root.title("Stop-Grid Trader")
        self.status = tk.StringVar(value="Initializing…")
        ttk.Label(self.root, textvariable=self.status)\
            .grid(padx=12, pady=10)
        ttk.Button(self.root, text="Abort", command=self._abort)\
            .grid(pady=(0, 10))

    # ── MetaTrader 5 init ────────────────────────────────────
    def _mt5_init(self) -> None:
        if not mt5.initialize(path=self.path):
            c, m = mt5.last_error(); raise RuntimeError(f"MT5 init: {c} {m}")
        if not mt5.symbol_select(self.symbol, True):
            raise RuntimeError(f"Cannot select symbol {self.symbol}")

    # ── helpers ──────────────────────────────────────────────
    def _norm_vol(self, vol: float) -> float:
        info = mt5.symbol_info(self.symbol); step = info.volume_step or 0.01
        return round(max(info.volume_min, min(vol, info.volume_max)) / step) * step

    # place pending order
    def _pend(
        self, ord_type: int, price: float,
        sl: float, tp: float = 0.0,
        vol: float | None = None,
        tag: str = GRID_TAG
    ) -> None:
        if vol is None: vol = self.lot
        mt5.order_send({
            "action": mt5.TRADE_ACTION_PENDING,
            "symbol": self.symbol,
            "volume": self._norm_vol(vol),
            "type":   ord_type,
            "price":  price,
            "sl":     sl,
            "tp":     tp,
            "deviation": DEVIATION,
            "magic":  MAGIC_NUMBER,
            "comment": tag,
            "type_time": mt5.ORDER_TIME_GTC,
        })

    # ── grid creation ────────────────────────────────────────
    def _build_grid(self) -> None:
        tick = mt5.symbol_info_tick(self.symbol); info = mt5.symbol_info(self.symbol)
        self.mid      = round((tick.bid + tick.ask) / 2, self.digits)
        raw_spd_pts   = int(round((tick.ask - tick.bid) / info.point))
        self.step_pts = int(raw_spd_pts * self.mult)
        pt            = info.point

        self.tp_high = self.tp_low = None
        for i in range(1, self.side + 1):
            buy  = self.mid + i * self.step_pts * pt
            sell = self.mid - i * self.step_pts * pt
            if i == self.side:                       # outer layer
                self.tp_high = buy  + self.step_pts * pt
                self.tp_low  = sell - self.step_pts * pt
                tp_b, tp_s   = self.tp_high, self.tp_low
            else:
                tp_b = tp_s = 0.0
            self._pend(mt5.ORDER_TYPE_BUY_STOP , buy , self.mid, tp=tp_b)
            self._pend(mt5.ORDER_TYPE_SELL_STOP, sell, self.mid, tp=tp_s)

        self.status.set(f"Grid ready   (loop {self.done}/{self.loopN})")

    # place reverse stop at break-even
    def _place_be_rev(self, pos) -> None:
        info = mt5.symbol_info(self.symbol); pt = info.point
        be   = round(pos.price_open, self.digits)
        if pos.type == mt5.POSITION_TYPE_BUY:
            otype, sl = mt5.ORDER_TYPE_SELL_STOP, be + self.step_pts * pt
        else:
            otype, sl = mt5.ORDER_TYPE_BUY_STOP,  be - self.step_pts * pt
        self._pend(otype, be, round(sl, self.digits), vol=self.lot, tag="BE-REV")

    # after partial TP
    def _handle_partial(self, pos) -> None:
        tick = mt5.symbol_info_tick(self.symbol); bid, ask = tick.bid, tick.ask
        half = self.lot / 2
        be_price = round(pos.price_open, self.digits)

        if pos.comment.startswith("BE-REV"):
            beyond = (
                pos.type == mt5.POSITION_TYPE_BUY  and bid >= self.mid or
                pos.type == mt5.POSITION_TYPE_SELL and ask <= self.mid
            )
            if beyond:
                mt5.order_send({
                    "action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
                    "position": pos.ticket, "volume": half,
                    "type": mt5.ORDER_TYPE_SELL if pos.type == mt5.POSITION_TYPE_BUY
                           else mt5.ORDER_TYPE_BUY,
                    "price": bid if pos.type == mt5.POSITION_TYPE_BUY else ask,
                    "deviation": DEVIATION, "magic": MAGIC_NUMBER,
                    "comment": "mid-instant TP"
                })
                return
            mt5.order_send({
                "action":  mt5.TRADE_ACTION_SLTP, "symbol": self.symbol,
                "position": pos.ticket,
                "sl": be_price,                           # BE SL
                "tp": round(self.mid, self.digits),
                "deviation": DEVIATION
            })
        else:
            mt5.order_send({
                "action":  mt5.TRADE_ACTION_SLTP, "symbol": self.symbol,
                "position": pos.ticket,
                "sl": be_price,
                "tp": 0.0,
                "deviation": DEVIATION
            })
            self._place_be_rev(pos)

    # ── monitoring loop ──────────────────────────────────────
    def _monitor(self) -> None:
        info = mt5.symbol_info(self.symbol); pt = info.point
        half = self.lot / 2

        while self.running:
            time.sleep(CHECK_INTERVAL)
            tick = mt5.symbol_info_tick(self.symbol)
            mid_now = (tick.bid + tick.ask) / 2

            # global exit condition
            if (self.tp_high and mid_now >= self.tp_high) or \
               (self.tp_low  and mid_now <= self.tp_low):
                self._full_close()
                continue

            # partial-TP check
            for pos in mt5.positions_get(symbol=self.symbol) or []:
                trg = (
                    pos.price_open + self.step_pts * pt
                    if pos.type == mt5.POSITION_TYPE_BUY
                    else pos.price_open - self.step_pts * pt
                )
                hit = (
                    pos.type == mt5.POSITION_TYPE_BUY  and tick.bid >= trg or
                    pos.type == mt5.POSITION_TYPE_SELL and tick.ask <= trg
                )
                if hit and abs(pos.volume - self.lot) < 1e-6:
                    mt5.order_send({
                        "action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
                        "position": pos.ticket, "volume": half,
                        "type": mt5.ORDER_TYPE_SELL if pos.type == mt5.POSITION_TYPE_BUY
                               else mt5.ORDER_TYPE_BUY,
                        "price": tick.bid if pos.type == mt5.POSITION_TYPE_BUY else tick.ask,
                        "deviation": DEVIATION, "magic": MAGIC_NUMBER,
                        "comment": "partial TP"
                    })
                    time.sleep(0.3)
                    self._handle_partial(pos)

    # ── cancel orders & close positions ──────────────────────
    def _full_close(self) -> None:
        # cancel pending
        for o in mt5.orders_get(symbol=self.symbol) or []:
            if hasattr(mt5, "order_delete"):
                mt5.order_delete(o.ticket)
            else:
                mt5.order_send({
                    "action": mt5.TRADE_ACTION_REMOVE,
                    "order":  o.ticket,
                    "symbol": o.symbol
                })

        # close all positions
        tick = mt5.symbol_info_tick(self.symbol)
        for p in mt5.positions_get(symbol=self.symbol) or []:
            mt5.order_send({
                "action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
                "position": p.ticket, "volume": p.volume,
                "type": mt5.ORDER_TYPE_SELL if p.type == mt5.POSITION_TYPE_BUY
                       else mt5.ORDER_TYPE_BUY,
                "price": tick.bid if p.type == mt5.POSITION_TYPE_BUY else tick.ask,
                "deviation": DEVIATION, "magic": MAGIC_NUMBER,
                "comment": "grid exit"
            })

        self.done += 1
        if self.done <= self.loopN:
            self.status.set(f"Loop {self.done} finished – restarting…")
            time.sleep(2)
            self._build_grid()
        else:
            self.status.set("All loops done – exit")
            self.running = False
            self.root.after(800, self.root.quit)

    # ── GUI: abort button ────────────────────────────────────
    def _abort(self) -> None:
        if messagebox.askyesno("Abort", "Stop trading and exit?", parent=self.root):
            self.running = False
            self._full_close()

    # ── run bot ──────────────────────────────────────────────
    def run(self) -> None:
        self._mt5_init()
        self._build_grid()
        self.running = True
        threading.Thread(target=self._monitor, daemon=True).start()
        self.root.mainloop()
        mt5.shutdown()


# ══════════════════════════ MAIN ══════════════════════════════
def main() -> None:
    term = choose_terminal()
    if not term:
        sys.exit("No MT5 terminal selected – exiting.")

    root = tk.Tk(); root.withdraw()
    pd = ParamDialog(root); pd.wait_window(); root.destroy()
    if pd.res is None:
        sys.exit("Parameters dialog canceled – exiting.")

    sym, digs, lot, nside, mult, loops = pd.res
    StopGridTrader(
        terminal_path = term,
        symbol        = sym,
        digits        = digs,
        base_lot      = lot,
        orders_side   = nside,
        multiplier    = mult,
        loop_count    = loops
    ).run()


if __name__ == "__main__":
    main()

Stop Grid Trader(予約注文が半減すると、再エントリー)

こちらも同じくStop Grid Traderのソースコードですが、予約注文が半減するとストップ注文を再設置するようになっています。(ソースコード引用元

"""
Stop-Grid Trader  – version 3.1
------------------------------------------------------------
Mini-GUI bot for MetaTrader 5 that trades a symmetric
stop-grid around the current mid-price.

Key workflow (unchanged trading logic, polished GUI):
  1.  Select a running MT5 terminal.
  2.  Enter parameters, then press **Start**:
        • Symbol name (incl. suffix)
        • Price-digits to round to
        • Base lot (even, ≥ 0.02)
        • Orders per side
        • Grid multiplier  (= spread × factor)
        • Loop count (#restarts after a full close)
  3.  The bot places alternating BUY_STOP / SELL_STOP orders
      at ±multiplier·spread intervals, outermost stops include
      a TP one grid-step farther out.
  4.  Every 0.02-lot fill is partially closed at +1 grid step
      (0.01 lot).  The remainder:
          – SL is moved to break-even (both grid & BE-REV).
          – A 0.02-lot reverse STOP is placed at the BE price.
          – If the reverse position’s 0.01 lot remainder finds
            price already beyond the initial mid-price, it is
            closed instantly; otherwise TP = initial mid-price.
  5.  Once the current mid-price touches the outer TP level,
      all pending orders are removed, every position is closed,
      and (optionally) the grid restarts up to *loop count*.
"""

import tkinter as tk
from tkinter import ttk, messagebox
import threading, time, sys
import MetaTrader5 as mt5
try:
    import psutil
except ImportError:
    psutil = None

# ── default GUI values ──────────────────────────────────────────
DEF_SYMBOL        = "XAUUSD"
DEF_DIGITS        = 2
DEF_LOT           = 0.02
DEF_ORDERS_SIDE   = 10
DEF_MULTIPLIER    = 2.0
DEF_LOOP          = 0              # 0 ⇒ run once
# ── constants (rarely changed) ─────────────────────────────────
DEVIATION         = 100
MAGIC_NUMBER      = 0
GRID_TAG          = "basic grid"
CHECK_INTERVAL    = 1.0
# ───────────────────────────────────────────────────────────────


# ═════════════════════════ GUI HELPERS ═════════════════════════
def _discover_terminals() -> list[str]:
    paths = []
    if psutil:
        for p in psutil.process_iter(attrs=["name", "exe"]):
            if "terminal64.exe" in (p.info.get("name") or "").lower():
                exe = p.info.get("exe") or ""
                if exe and exe not in paths:
                    paths.append(exe)
    return paths


def choose_terminal() -> str | None:
    """Modal: pick an MT5 terminal."""
    root = tk.Tk(); root.withdraw()
    win = tk.Toplevel(root); win.title("Select MT5 terminal"); win.grab_set()

    cols = ("exe", "login", "server", "balance", "currency", "name")
    tree = ttk.Treeview(win, columns=cols, show="headings", height=8)
    for c, w in zip(cols, (340, 80, 170, 100, 70, 150)):
        tree.heading(c, text=c); tree.column(c, width=w, anchor="w")

    for exe in _discover_terminals():
        mt5.initialize(path=exe)
        acc, _ = mt5.account_info(), mt5.terminal_info()
        if acc:
            tree.insert(
                "", tk.END,
                values=(
                    exe, acc.login, acc.server,
                    f"{acc.balance:.2f}", acc.currency, acc.name
                )
            )
        mt5.shutdown()

    tree.grid(row=0, column=0, columnspan=2, padx=6, pady=6)

    sel: dict[str, str | None] = {"path": None}

    def _use() -> None:
        if tree.selection():
            sel["path"] = tree.item(tree.selection()[0], "values")[0]
            win.destroy()

    ttk.Button(win, text="Use", command=_use)\
        .grid(row=1, column=1, pady=(0, 6), padx=6, sticky="e")

    if tree.get_children():
        tree.selection_set(tree.get_children()[0])

    win.wait_window(); root.destroy()
    return sel["path"]


class ParamDialog(tk.Toplevel):
    """Gather user parameters; returns None if canceled."""

    def __init__(self, parent: tk.Tk):
        super().__init__(parent)
        self.title("Grid parameters"); self.grab_set()
        self.res: tuple | None = None

        rows = (
            ("Symbol",          DEF_SYMBOL),
            ("Price digits",    str(DEF_DIGITS)),
            ("Base lot",        f"{DEF_LOT:.2f}"),
            ("Orders / side",   str(DEF_ORDERS_SIDE)),
            ("Grid multiplier", str(DEF_MULTIPLIER)),
            ("Loop count",      str(DEF_LOOP)),
        )
        self.vars: list[tk.StringVar] = []
        for r, (label, default) in enumerate(rows):
            ttk.Label(self, text=label).grid(row=r, column=0, sticky="w", padx=6, pady=4)
            var = tk.StringVar(value=default); self.vars.append(var)
            ttk.Entry(self, textvariable=var, width=15)\
                .grid(row=r, column=1, sticky="w", padx=6, pady=4)

        ttk.Button(self, text="Start", command=self._ok)\
            .grid(row=len(rows), column=1, pady=8, sticky="e")

    def _ok(self) -> None:
        try:
            sym   = self.vars[0].get().strip()
            digs  = int(self.vars[1].get())
            lot   = float(self.vars[2].get())
            nside = int(self.vars[3].get())
            mult  = float(self.vars[4].get())
            loops = int(self.vars[5].get())

            if digs < 0:               raise ValueError("digits ≥ 0")
            if lot < 0.02 or int(round(lot * 100)) % 2:
                raise ValueError("lot must be even ×0.01 and ≥0.02")
            if nside < 1:              raise ValueError("orders / side ≥ 1")
            if mult <= 0:              raise ValueError("multiplier > 0")
            if loops < 0:              raise ValueError("loop count ≥ 0")
        except Exception as err:
            messagebox.showerror("Invalid input", str(err), parent=self)
            return

        self.res = (sym, digs, lot, nside, mult, loops)
        self.destroy()


# ═════════════════════════ TRADER CLASS ════════════════════════
class StopGridTrader:

    def __init__(
        self,
        terminal_path: str,
        symbol: str,
        digits: int,
        base_lot: float,
        orders_side: int,
        multiplier: float,
        loop_count: int
    ):
        self.path   = terminal_path
        self.symbol = symbol
        self.digits = digits
        self.lot    = base_lot
        self.side   = orders_side
        self.mult   = multiplier
        self.loopN  = loop_count
        self.done   = 0  # loops completed

        # runtime
        self.mid: float | None = None
        self.step_pts: int | None = None
        self.tp_high = self.tp_low = None
        self.running = False

        # status window
        self.root = tk.Tk(); self.root.title("Stop-Grid Trader")
        self.status = tk.StringVar(value="Initializing…")
        ttk.Label(self.root, textvariable=self.status)\
            .grid(padx=12, pady=10)
        ttk.Button(self.root, text="Abort", command=self._abort)\
            .grid(pady=(0, 10))

    # ── MetaTrader 5 init ────────────────────────────────────
    def _mt5_init(self) -> None:
        if not mt5.initialize(path=self.path):
            c, m = mt5.last_error(); raise RuntimeError(f"MT5 init: {c} {m}")
        if not mt5.symbol_select(self.symbol, True):
            raise RuntimeError(f"Cannot select symbol {self.symbol}")

    # ── helpers ──────────────────────────────────────────────
    def _norm_vol(self, vol: float) -> float:
        info = mt5.symbol_info(self.symbol); step = info.volume_step or 0.01
        return round(max(info.volume_min, min(vol, info.volume_max)) / step) * step

    # place pending order
    def _pend(
        self, ord_type: int, price: float,
        sl: float, tp: float = 0.0,
        vol: float | None = None,
        tag: str = GRID_TAG
    ) -> None:
        if vol is None: vol = self.lot
        mt5.order_send({
            "action": mt5.TRADE_ACTION_PENDING,
            "symbol": self.symbol,
            "volume": self._norm_vol(vol),
            "type":   ord_type,
            "price":  price,
            "sl":     sl,
            "tp":     tp,
            "deviation": DEVIATION,
            "magic":  MAGIC_NUMBER,
            "comment": tag,
            "type_time": mt5.ORDER_TIME_GTC,
        })

    # ── grid creation ────────────────────────────────────────
    def _build_grid(self) -> None:
        tick = mt5.symbol_info_tick(self.symbol); info = mt5.symbol_info(self.symbol)
        self.mid      = round((tick.bid + tick.ask) / 2, self.digits)
        raw_spd_pts   = int(round((tick.ask - tick.bid) / info.point))
        self.step_pts = int(raw_spd_pts * self.mult)
        pt            = info.point

        self.tp_high = self.tp_low = None
        for i in range(1, self.side + 1):
            buy  = self.mid + i * self.step_pts * pt
            sell = self.mid - i * self.step_pts * pt
            if i == self.side:                       # outer layer
                self.tp_high = buy  + self.step_pts * pt
                self.tp_low  = sell - self.step_pts * pt
                tp_b, tp_s   = self.tp_high, self.tp_low
            else:
                tp_b = tp_s = 0.0
            self._pend(mt5.ORDER_TYPE_BUY_STOP , buy , self.mid, tp=tp_b)
            self._pend(mt5.ORDER_TYPE_SELL_STOP, sell, self.mid, tp=tp_s)

        self.status.set(f"Grid ready   (loop {self.done}/{self.loopN})")

    # place reverse stop at break-even
    def _place_be_rev(self, pos) -> None:
        info = mt5.symbol_info(self.symbol); pt = info.point
        be   = round(pos.price_open, self.digits)
        if pos.type == mt5.POSITION_TYPE_BUY:
            otype, sl = mt5.ORDER_TYPE_SELL_STOP, be + self.step_pts * pt
        else:
            otype, sl = mt5.ORDER_TYPE_BUY_STOP,  be - self.step_pts * pt
        self._pend(otype, be, round(sl, self.digits), vol=self.lot, tag="BE-REV")

    # after partial TP
    def _handle_partial(self, pos) -> None:
        tick = mt5.symbol_info_tick(self.symbol); bid, ask = tick.bid, tick.ask
        half = self.lot / 2
        be_price = round(pos.price_open, self.digits)

        if pos.comment.startswith("BE-REV"):
            beyond = (
                pos.type == mt5.POSITION_TYPE_BUY  and bid >= self.mid or
                pos.type == mt5.POSITION_TYPE_SELL and ask <= self.mid
            )
            if beyond:
                mt5.order_send({
                    "action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
                    "position": pos.ticket, "volume": half,
                    "type": mt5.ORDER_TYPE_SELL if pos.type == mt5.POSITION_TYPE_BUY
                           else mt5.ORDER_TYPE_BUY,
                    "price": bid if pos.type == mt5.POSITION_TYPE_BUY else ask,
                    "deviation": DEVIATION, "magic": MAGIC_NUMBER,
                    "comment": "mid-instant TP"
                })
                return
            mt5.order_send({
                "action":  mt5.TRADE_ACTION_SLTP, "symbol": self.symbol,
                "position": pos.ticket,
                "sl": be_price,                           # BE SL
                "tp": round(self.mid, self.digits),
                "deviation": DEVIATION
            })
        else:
            mt5.order_send({
                "action":  mt5.TRADE_ACTION_SLTP, "symbol": self.symbol,
                "position": pos.ticket,
                "sl": be_price,
                "tp": 0.0,
                "deviation": DEVIATION
            })
            self._place_be_rev(pos)

    # ── monitoring loop ──────────────────────────────────────
    def _monitor(self) -> None:
        info = mt5.symbol_info(self.symbol); pt = info.point
        half = self.lot / 2

        while self.running:
            time.sleep(CHECK_INTERVAL)
            tick = mt5.symbol_info_tick(self.symbol)
            mid_now = (tick.bid + tick.ask) / 2

            # ===== Measures against range-bound markets (CODE ADDED HERE) =====
            if not mt5.positions_total():
                pendings = [o for o in (mt5.orders_get(
                    symbol=self.symbol) or []) if o.comment.startswith(GRID_TAG)]
                executed = self.side * 2 - len(pendings)
                if executed >= self.side // 2 and pendings:
                    self.status.set("Half grid consumed - restarting...")
                    for o in pendings:
                        if hasattr(mt5, "order_delete"):
                            mt5.order_delete(o.ticket)
                        else:
                            mt5.order_send({
                                "action": mt5.TRADE_ACTION_REMOVE,
                                "order": o.ticket,
                                "symbol": o.symbol
                            })
                    time.sleep(1)
                    self._build_grid()
                    continue
            # ====================================================================

            # global exit condition
            if (self.tp_high and mid_now >= self.tp_high) or \
               (self.tp_low  and mid_now <= self.tp_low):
                self._full_close()
                continue

            # partial-TP check
            for pos in mt5.positions_get(symbol=self.symbol) or []:
                trg = (
                    pos.price_open + self.step_pts * pt
                    if pos.type == mt5.POSITION_TYPE_BUY
                    else pos.price_open - self.step_pts * pt
                )
                hit = (
                    pos.type == mt5.POSITION_TYPE_BUY  and tick.bid >= trg or
                    pos.type == mt5.POSITION_TYPE_SELL and tick.ask <= trg
                )
                if hit and abs(pos.volume - self.lot) < 1e-6:
                    mt5.order_send({
                        "action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
                        "position": pos.ticket, "volume": half,
                        "type": mt5.ORDER_TYPE_SELL if pos.type == mt5.POSITION_TYPE_BUY
                               else mt5.ORDER_TYPE_BUY,
                        "price": tick.bid if pos.type == mt5.POSITION_TYPE_BUY else tick.ask,
                        "deviation": DEVIATION, "magic": MAGIC_NUMBER,
                        "comment": "partial TP"
                    })
                    time.sleep(0.3)
                    self._handle_partial(pos)

    # ── cancel orders & close positions ──────────────────────
    def _full_close(self) -> None:
        # cancel pending
        for o in mt5.orders_get(symbol=self.symbol) or []:
            if hasattr(mt5, "order_delete"):
                mt5.order_delete(o.ticket)
            else:
                mt5.order_send({
                    "action": mt5.TRADE_ACTION_REMOVE,
                    "order":  o.ticket,
                    "symbol": o.symbol
                })

        # close all positions
        tick = mt5.symbol_info_tick(self.symbol)
        for p in mt5.positions_get(symbol=self.symbol) or []:
            mt5.order_send({
                "action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
                "position": p.ticket, "volume": p.volume,
                "type": mt5.ORDER_TYPE_SELL if p.type == mt5.POSITION_TYPE_BUY
                       else mt5.ORDER_TYPE_BUY,
                "price": tick.bid if p.type == mt5.POSITION_TYPE_BUY else tick.ask,
                "deviation": DEVIATION, "magic": MAGIC_NUMBER,
                "comment": "grid exit"
            })

        self.done += 1
        if self.done <= self.loopN:
            self.status.set(f"Loop {self.done} finished – restarting…")
            time.sleep(2)
            self._build_grid()
        else:
            self.status.set("All loops done – exit")
            self.running = False
            self.root.after(800, self.root.quit)

    # ── GUI: abort button ────────────────────────────────────
    def _abort(self) -> None:
        if messagebox.askyesno("Abort", "Stop trading and exit?", parent=self.root):
            self.running = False
            self._full_close()

    # ── run bot ──────────────────────────────────────────────
    def run(self) -> None:
        self._mt5_init()
        self._build_grid()
        self.running = True
        threading.Thread(target=self._monitor, daemon=True).start()
        self.root.mainloop()
        mt5.shutdown()


# ══════════════════════════ MAIN ══════════════════════════════
def main() -> None:
    term = choose_terminal()
    if not term:
        sys.exit("No MT5 terminal selected – exiting.")

    root = tk.Tk(); root.withdraw()
    pd = ParamDialog(root); pd.wait_window(); root.destroy()
    if pd.res is None:
        sys.exit("Parameters dialog canceled – exiting.")

    sym, digs, lot, nside, mult, loops = pd.res
    StopGridTrader(
        terminal_path = term,
        symbol        = sym,
        digits        = digs,
        base_lot      = lot,
        orders_side   = nside,
        multiplier    = mult,
        loop_count    = loops
    ).run()


if __name__ == "__main__":
    main()

スワップ計測ツール(うみver)

以下のソースコードでは、MT5のスワップポイントを一覧表示できます。スワップアービトラージで通貨ペアを探すときに重宝するでしょう。

import MetaTrader5 as mt5
import time
from datetime import datetime
import sys
import tkinter as tk
from tkinter import ttk, messagebox
import threading
import traceback

# 実行中のMT5プロセスを検出するためにpsutilライブラリを使用します
try:
    import psutil
except ImportError:
    psutil = None

# tksheetライブラリをインポート
# 事前に `pip install tksheet` を実行してください
try:
    import tksheet
except ImportError:
    messagebox.showerror(
        "ライブラリ不足",
        "tksheetライブラリが見つかりません。\nコマンドプロンプトで `pip install tksheet` を実行してインストールしてください。"
    )
    sys.exit(1)


# --- MT5ターミナル選択部分は変更なし ---
def _discover_terminals() -> list[str]:
    paths = []
    if psutil:
        for p in psutil.process_iter(attrs=["name", "exe"]):
            if "terminal64.exe" in (p.info.get("name") or "").lower():
                exe_path = p.info.get("exe")
                if exe_path and exe_path not in paths:
                    paths.append(exe_path)
    return paths

def choose_terminal() -> str | None:
    if not psutil:
         messagebox.showerror(
            "ライブラリ不足",
            "psutilライブラリが見つかりません。\nコマンドプロンプトで `pip install psutil` を実行してインストールしてください。"
        )
         return None
    root = tk.Tk()
    root.withdraw()
    win = tk.Toplevel(root)
    win.title("接続するMT5ターミナルを選択")
    win.grab_set()
    message_label = None
    def on_close():
        win.destroy()
        root.destroy()
    win.protocol("WM_DELETE_WINDOW", on_close)
    def populate_tree():
        nonlocal message_label
        if message_label and message_label.winfo_exists():
            message_label.destroy()
            message_label = None
        for i in tree.get_children():
            tree.delete(i)
        found_terminals = _discover_terminals()
        if not found_terminals:
            tree.grid_remove()
            message_label = ttk.Label(win, text="実行中のMT5ターミナルが見つかりません。\nMT5を起動後、「再読込」を押してください。")
            message_label.grid(row=0, column=0, columnspan=3, padx=10, pady=10)
            use_button.config(state="disabled")
            return
        tree.grid()
        is_data_found = False
        for exe in found_terminals:
            try:
                if mt5.initialize(path=exe):
                    acc = mt5.account_info()
                    if acc:
                        tree.insert("", tk.END, values=(exe, acc.login, acc.server, f"{acc.balance:.2f} {acc.currency}", acc.name))
                        is_data_found = True
                    mt5.shutdown()
            except Exception as e:
                print(f"ターミナルへの接続エラー {exe}: {e}")
        if not is_data_found:
            tree.grid_remove()
            message_label = ttk.Label(win, text="MT5アカウント情報が取得できませんでした。\nログイン状態を確認してください。")
            message_label.grid(row=0, column=0, columnspan=3, padx=10, pady=10)
            use_button.config(state="disabled")
        else:
            if tree.get_children():
                tree.selection_set(tree.get_children()[0])
            use_button.config(state="normal")
    cols = ("exe", "login", "server", "balance", "name")
    tree = ttk.Treeview(win, columns=cols, show="headings", height=8)
    col_widths = {"exe": 340, "login": 80, "server": 170, "balance": 120, "name": 150}
    for c in cols:
        tree.heading(c, text=c.capitalize())
        tree.column(c, width=col_widths[c], anchor="w", stretch=tk.FALSE)
    tree.grid(row=0, column=0, columnspan=3, padx=6, pady=6)
    selected_path: dict[str, str | None] = {"path": None}
    def _use_selection() -> None:
        if tree.selection():
            selected_path["path"] = tree.item(tree.selection()[0], "values")[0]
        win.destroy()
    button_frame = ttk.Frame(win)
    button_frame.grid(row=1, column=0, columnspan=3, pady=(0, 6), padx=6, sticky="e")
    ttk.Button(button_frame, text="再読込", command=populate_tree).pack(side="left", padx=(0, 5))
    use_button = ttk.Button(button_frame, text="このターミナルを使用", command=_use_selection)
    use_button.pack(side="left")
    populate_tree()
    win.wait_window()
    root.destroy()
    return selected_path["path"]


class SwapViewerApp:
    def __init__(self, terminal_path: str):
        self.path = terminal_path
        self.root = tk.Tk()
        self.root.title("MT5 スワップチェッカー")
        self.root.geometry("1450x600")

        self.timer_id = None
        self.current_data = []
        self.last_sort_col = "swap_both_1d_jpy"
        self.last_sort_reverse = True

        if not self._connect_mt5():
            self.root.destroy()
            return
        
        self._setup_ui()
        self.update_data()
        self.root.protocol("WM_DELETE_WINDOW", self._on_closing)

    def _connect_mt5(self) -> bool:
        if not mt5.initialize(path=self.path):
            messagebox.showerror("MT5接続エラー", f"MT5の初期化に失敗しました: {mt5.last_error()}")
            return False
        self.account_info = mt5.account_info()
        if not self.account_info:
            messagebox.showerror("MT5接続エラー", f"アカウント情報の取得に失敗しました: {mt5.last_error()}")
            mt5.shutdown()
            return False
        if self.account_info.currency != "JPY":
            messagebox.showwarning("口座通貨の確認", f"このツールの円換算機能は、口座通貨がJPYの場合に最適化されています。\n現在の口座通貨: {self.account_info.currency}")
        return True

    def _setup_ui(self):
        main_frame = ttk.Frame(self.root, padding=10)
        main_frame.pack(fill=tk.BOTH, expand=True)
        top_frame = ttk.Frame(main_frame)
        top_frame.pack(fill=tk.X, pady=(0, 5))
        conn_text = f"接続先: {self.account_info.server} ({self.account_info.login})"
        ttk.Label(top_frame, text=conn_text, font=("Meiryo", 10)).pack(side=tk.LEFT)
        self.last_updated_var = tk.StringVar(value="最終更新: ----/--/-- --:--:--")
        ttk.Label(top_frame, textvariable=self.last_updated_var).pack(side=tk.RIGHT)
        
        sheet_frame = ttk.Frame(main_frame) 
        sheet_frame.pack(fill=tk.BOTH, expand=True)
        
        self.columns = {
            "symbol":           ("通貨ペア", 110, "w"),
            "swap_long_pts":    ("買いスワップ(Pts/%)", 150, "e"), "swap_short_pts": ("売りスワップ(Pts/%)", 150, "e"),
            "swap_long_jpy":    ("1Lot損益(買/円)", 140, "e"), "swap_short_jpy": ("1Lot損益(売/円)", 140, "e"),
            "swap_both_1d_jpy": ("1日両建て損益(円)", 160, "e"), "rollover_day": ("3倍デー適用曜日", 130, "center"),
            "swap_long_3d":     ("3倍デー(買/円)", 140, "e"), "swap_short_3d":     ("3倍デー(売/円)", 140, "e"),
            "swap_both_3d_jpy": ("3倍デー両建て損益(円)", 180, "e"),
        }
        
        self.sheet = tksheet.Sheet(sheet_frame,
                                   headers=[h for h, w, a in self.columns.values()],
                                   show_toolbar=False,
                                   show_top_left=False)
        self.sheet.pack(fill=tk.BOTH, expand=True)
        self.sheet.enable_bindings("single_select", "drag_select", "ctrl_select", "arrowkeys", "right_click_popup_menu", "rc_select")
        
        for i, (col_id, (header, width, anchor)) in enumerate(self.columns.items()):
            align = "w" if anchor == "w" else "e" if anchor == "e" else "center"
            self.sheet.column_width(column=i, width=width)
            self.sheet.align_columns(columns=i, align=align)

        
        # ヘッダークリックのイベント登録を削除
        
        # --- 下部コントロールパネル ---
        bottom_frame = ttk.Frame(main_frame)
        bottom_frame.pack(fill=tk.X, pady=(5, 0))
        
        # 更新ボタン
        self.update_button = ttk.Button(bottom_frame, text="手動更新", command=self.update_data)
        self.update_button.pack(side=tk.LEFT, padx=(0, 10))

        
        # 手動ソート機能のUIを追加
        sort_frame = ttk.Frame(bottom_frame)
        sort_frame.pack(side=tk.LEFT)
        
        ttk.Label(sort_frame, text="並べ替え:").pack(side=tk.LEFT, padx=(0, 5))

        # 表示名と内部IDの対応辞書を作成
        self.col_display_to_id = {v[0]: k for k, v in self.columns.items()}
        
        # ソート列選択のプルダウンメニュー
        self.sort_combo_var = tk.StringVar()
        self.sort_combo = ttk.Combobox(sort_frame, textvariable=self.sort_combo_var,
                                       values=[v[0] for v in self.columns.values()],
                                       state="readonly", width=25)
        self.sort_combo.set(self.columns[self.last_sort_col][0]) # 初期値を設定
        self.sort_combo.pack(side=tk.LEFT, padx=(0, 5))
        self.sort_combo.bind("<<ComboboxSelected>>", self._on_sort_combo_select)
        
        # 降順/昇順 切り替えボタン
        self.sort_order_button = ttk.Button(sort_frame, text="降順", command=self._on_sort_order_toggle, width=6)
        self.sort_order_button.pack(side=tk.LEFT)
        
        # ステータス表示
        self.status_var = tk.StringVar(value="準備完了")
        ttk.Label(bottom_frame, textvariable=self.status_var).pack(side=tk.LEFT, padx=20)
    
    
    # プルダウンメニューが選択された時の処理
    def _on_sort_combo_select(self, event=None):
        selected_display_name = self.sort_combo_var.get()
        new_sort_col = self.col_display_to_id[selected_display_name]
        self._sort_column(new_sort_col, keep_direction=True) # 並び順は維持したまま列を切り替え

    
    # 降順/昇順ボタンが押された時の処理
    def _on_sort_order_toggle(self, event=None):
        # 昇順/降順を反転させる
        self.last_sort_reverse = not self.last_sort_reverse
        # 現在選択されている列で再ソートを実行
        self._sort_column(self.last_sort_col, keep_direction=True)
        
    def _fetch_data_thread(self):
        
        try:
            all_symbols = mt5.symbols_get()
            if not all_symbols:
                self.root.after(0, self._update_gui_with_error, "エラー: 通貨ペアリストの取得に失敗。")
                return
            
            visible_symbols = [s for s in all_symbols if s.visible]
            if not visible_symbols:
                self.root.after(0, self._update_gui_with_error, "エラー: MT5の「気配値表示」に通貨ペアを追加してください。")
                return

            all_data = []
            weekdays = ["日", "月", "火", "水", "木", "金", "土"]

            for info in visible_symbols:
                tick = mt5.symbol_info_tick(info.name)
                if not tick or tick.bid == 0 or tick.ask == 0:
                    continue

                swap_long_jpy = 0.0
                swap_short_jpy = 0.0

                if info.swap_mode == mt5.SYMBOL_SWAP_MODE_POINTS:
                    tick_value = info.trade_tick_value
                    swap_long_jpy = info.swap_long * tick_value
                    swap_short_jpy = info.swap_short * tick_value
                elif info.swap_mode in [3, 5]:
                    swap_long_profit_ccy = 1.0 * info.trade_contract_size * tick.ask * (info.swap_long / 100) / 360
                    swap_short_profit_ccy = 1.0 * info.trade_contract_size * tick.bid * (info.swap_short / 100) / 360
                    if info.currency_profit != self.account_info.currency:
                        conversion_pair = info.currency_profit + self.account_info.currency
                        conversion_tick = mt5.symbol_info_tick(conversion_pair)
                        if conversion_tick and conversion_tick.bid > 0:
                            swap_long_jpy = swap_long_profit_ccy * conversion_tick.bid
                            swap_short_jpy = swap_short_profit_ccy * conversion_tick.bid
                        else:
                            continue
                    else:
                        swap_long_jpy = swap_long_profit_ccy
                        swap_short_jpy = swap_short_profit_ccy
                else:
                    continue
                
                all_data.append({
                    "symbol": info.name,
                    "swap_long_pts": info.swap_long, "swap_short_pts": info.swap_short,
                    "swap_long_jpy": swap_long_jpy, "swap_short_jpy": swap_short_jpy,
                    "swap_both_1d_jpy": swap_long_jpy + swap_short_jpy,
                    "rollover_day": weekdays[info.swap_rollover3days],
                    "swap_long_3d": swap_long_jpy * 3, "swap_short_3d": swap_short_jpy * 3,
                    "swap_both_3d_jpy": (swap_long_jpy + swap_short_jpy) * 3,
                    "swap_mode": info.swap_mode
                })
            
            self.root.after(0, self._update_gui, all_data)
        except Exception:
            traceback.print_exc()
            self.root.after(0, self._update_gui_with_error, "エラーが発生。詳細はコンソールを確認。")

    def _format_value(self, value, format_spec, is_jpy=False):
        unit = " 円" if is_jpy else ""
        if value == 0: return f"{value:{format_spec}}{unit}"
        return f"{value:+{format_spec}}{unit}"
    
    def _format_swap_points(self, value, mode):
        if mode in [3, 5]:
            return f"{value:+.3f} %"
        return f"{value:+.3f}"

    def _update_gui(self, data_list):
        self.current_data = data_list
        self._sort_column(self.last_sort_col, keep_direction=True)
        self.last_updated_var.set(f"最終更新: {datetime.now().strftime('%Y/%m/%d %H:%M:%S')}")
        self.status_var.set(f"データ取得完了。{len(data_list)}件の通貨ペアを表示中。")
        self.update_button.config(state="normal")

    def _redraw_view(self):
        
        if not self.current_data:
            self.sheet.set_sheet_data(data=[[]])
            return

        formatted_data = []
        for data in self.current_data:
            row = [
                data["symbol"],
                self._format_swap_points(data["swap_long_pts"], data["swap_mode"]),
                self._format_swap_points(data["swap_short_pts"], data["swap_mode"]),
                self._format_value(data["swap_long_jpy"], ",.2f", True),
                self._format_value(data["swap_short_jpy"], ",.2f", True),
                self._format_value(data["swap_both_1d_jpy"], ",.2f", True),
                data["rollover_day"],
                self._format_value(data["swap_long_3d"], ",.2f", True),
                self._format_value(data["swap_short_3d"], ",.2f", True),
                self._format_value(data["swap_both_3d_jpy"], ",.2f", True)
            ]
            formatted_data.append(row)

        self.sheet.set_sheet_data(data=formatted_data, redraw=False)

        color_target_cols = [
            "swap_long_pts", "swap_short_pts", "swap_long_jpy", "swap_short_jpy",
            "swap_both_1d_jpy", "swap_long_3d", "swap_short_3d", "swap_both_3d_jpy"
        ]
        
        col_id_to_index = {col_id: i for i, col_id in enumerate(self.columns.keys())}

        for r, data_row in enumerate(self.current_data):
            for col_id in color_target_cols:
                c = col_id_to_index[col_id]
                value = data_row[col_id]
                color = ""
                if value > 0:
                    color = "blue"
                elif value < 0:
                    color = "red"
                
                if color:
                    self.sheet.highlight_cells(row=r, column=c, fg=color)

        self.sheet.redraw()


    def _sort_column(self, col, keep_direction=False):
        if not self.current_data:
            return
        
        if not keep_direction:
            # この関数が直接呼ばれることはなくなったが念のため残す
            if self.last_sort_col == col:
                self.last_sort_reverse = not self.last_sort_reverse
            else:
                self.last_sort_reverse = True
        
        self.last_sort_col = col
        is_numeric = col not in ["symbol", "rollover_day"]
        
        key_func = (lambda d: d.get(col, 0)) if is_numeric else (lambda d: str(d.get(col, '')))
        self.current_data.sort(key=key_func, reverse=self.last_sort_reverse)
        
        # ソートボタンのテキストを現在の状態に合わせて更新
        self.sort_order_button.config(text="降順" if self.last_sort_reverse else "昇順")
        
        self._redraw_view()

    def _update_gui_with_error(self, message):
        self.sheet.set_sheet_data(data=[[]])
        self.status_var.set(message)
        self.update_button.config(state="normal")

    def update_data(self):
        self.status_var.set("データを取得中...")
        self.update_button.config(state="disabled")
        thread = threading.Thread(target=self._fetch_data_thread, daemon=True)
        thread.start()
        if self.timer_id: self.root.after_cancel(self.timer_id)
        self.timer_id = self.root.after(3600000, self.update_data)

    def _on_closing(self):
        if messagebox.askokcancel("終了確認", "アプリケーションを終了しますか?"):
            if self.timer_id: self.root.after_cancel(self.timer_id)
            mt5.shutdown()
            self.root.destroy()
            print("MT5との接続をシャットダウンしました。")

    def run(self):
        self.root.mainloop()

def main():
    terminal_path = choose_terminal()
    if not terminal_path:
        print("MT5ターミナルが選択されませんでした。プログラムを終了します。")
        sys.exit(0)
    app = SwapViewerApp(terminal_path)
    app.run()

if __name__ == "__main__":
    main()

トレード履歴表示ツール(うみver)

以下のソースコードでは、MT5の取引履歴を時間単位で細かく参照できます。(ソースコード引用元

bot/EAを24時間運用し、利益率の高い時間帯を見つけるのに便利です。

import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import atexit
import sys
from datetime import datetime, timedelta
import threading
import queue
import math
from collections import defaultdict

import MetaTrader5 as mt5
import pytz

# tkcalendarが利用可能かチェック
try:
    from tkcalendar import DateEntry
except ImportError:
    messagebox.showerror(
        "ライブラリ不足",
        "tkcalendarライブラリが見つかりません。\nコマンドプロンプトで `pip install tkcalendar` を実行してインストールしてください。"
    )
    sys.exit(1)

# Excelエクスポート機能に必要なライブラリをチェック
try:
    import pandas as pd
except ImportError:
    messagebox.showerror(
        "ライブラリ不足",
        "Excelエクスポート機能に必要なライブラリが見つかりません。\nコマンドプロンプトで `pip install pandas openpyxl` を実行してください。"
    )
    sys.exit(1)

try:
    import psutil
except ImportError:
    psutil = None

# ═════════════════════════ GUI HELPERS (変更なし) ═════════════════════════
def _discover_terminals() -> list[str]:
    """実行中のMT5ターミナル(terminal64.exe)のパスを検出する。"""
    paths = []
    if psutil:
        for p in psutil.process_iter(attrs=["name", "exe"]):
            if "terminal64.exe" in (p.info.get("name") or "").lower():
                exe = p.info.get("exe") or ""
                if exe and exe not in paths:
                    paths.append(exe)
    return paths


def choose_terminal() -> str | None:
    """モーダルウィンドウでMT5ターミナルを選択させる。"""
    root = tk.Tk()
    root.withdraw()
    win = tk.Toplevel(root)
    win.title("Select MT5 terminal")
    win.grab_set()
    cols = ("exe", "login", "server", "balance", "currency", "name")
    tree = ttk.Treeview(win, columns=cols, show="headings", height=8)
    for c, w in zip(cols, (340, 80, 170, 100, 70, 150)):
        tree.heading(c, text=c)
        tree.column(c, width=w, anchor="w", stretch=tk.FALSE) # stretchを追加
    for exe in _discover_terminals():
        try:
            if mt5.initialize(path=exe):
                acc = mt5.account_info()
                term = mt5.terminal_info()
                if acc and term:
                    tree.insert("", tk.END, values=(exe, acc.login, acc.server, f"{acc.balance:.2f}", acc.currency, acc.name))
                mt5.shutdown()
        except Exception as e:
            print(f"Error connecting to {exe}: {e}")
    tree.grid(row=0, column=0, columnspan=2, padx=6, pady=6)
    sel: dict[str, str | None] = {"path": None}
    def _use() -> None:
        if tree.selection():
            sel["path"] = tree.item(tree.selection()[0], "values")[0]
            win.destroy()
    ttk.Button(win, text="Use", command=_use).grid(row=1, column=1, pady=(0, 6), padx=6, sticky="e")
    if tree.get_children():
        tree.selection_set(tree.get_children()[0])
    win.wait_window()
    root.destroy()
    return sel["path"]

# ═══════════════════ RESIZE MANAGER CLASS (変更なし) ══════════════════════
class ResizeManager:
    def __init__(self, root, tree_widget, tree_parent_frame):
        self.root = root
        self.tree = tree_widget
        self.tree_parent = tree_parent_frame
        self.timer_id = None
        self.placeholder = ttk.Frame(self.tree_parent)
        self.placeholder.grid(row=0, column=0, sticky="nsew")
        self.placeholder.grid_remove()
        self.last_width = self.root.winfo_width()
        self.last_height = self.root.winfo_height()
        self.root.bind('<Configure>', self._on_resize)

    def _on_resize(self, event):
        if event.widget != self.root or \
           (self.last_width == event.width and self.last_height == event.height):
            return
        self.last_width = event.width
        self.last_height = event.height
        if not self.tree.get_children():
            return
        if self.timer_id:
            self.root.after_cancel(self.timer_id)
        if self.tree.winfo_viewable():
            self.tree.grid_remove()
            self.placeholder.grid()
        self.timer_id = self.root.after(250, self._redraw_tree)

    def _redraw_tree(self):
        if self.placeholder.winfo_viewable():
            self.placeholder.grid_remove()
            self.tree.grid()
        self.timer_id = None

# ═════════════════════════ HISTORY VIEWER CLASS ═════════════════════════
class HistoryViewer:
    def __init__(self, terminal_path: str):
        self.path = terminal_path
        self.root = tk.Tk()
        self.root.title("履歴検索")
        self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
        self.timezone = pytz.utc
        self.currency = ""
        self.data_queue = queue.Queue()
        self.all_trades = []
        self.current_page = 1
        self.items_per_page = 100
        self.total_pages = 0
        self.start_date_enabled_var = tk.BooleanVar(value=True)
        self.end_date_enabled_var = tk.BooleanVar(value=True)
        self.sort_desc_var = tk.BooleanVar(value=True)
        self.trade_type_filter_var = tk.StringVar(value="全部")
        self.summary_labels = {}

        if not self._connect_mt5():
            self.root.after(100, self.root.destroy)
            return
        account_info = mt5.account_info()
        if account_info:
            self.currency = account_info.currency
        self._setup_ui()
        self._populate_symbols_from_history()
        self._reset_conditions()
        atexit.register(self._shutdown_mt5)

    def _connect_mt5(self) -> bool:
        if not mt5.initialize(path=self.path):
            err_code, err_msg = mt5.last_error()
            messagebox.showerror("MT5 Connection Error", f"MT5の初期化に失敗しました: {err_msg} ({err_code})")
            return False
        terminal_info = mt5.terminal_info()
        if terminal_info and hasattr(terminal_info, 'time_zone'):
            try:
                self.timezone = pytz.timezone(terminal_info.time_zone)
            except pytz.exceptions.UnknownTimeZoneError:
                self.timezone = pytz.utc
        return True

    def _shutdown_mt5(self):
        mt5.shutdown()

    def _format_currency(self, value: float) -> str:
        is_jpy = self.currency == 'JPY'
        digits = 0 if is_jpy else 2
        return f"{value:+,.{digits}f}" if value != 0 else f"{value:,.{digits}f}"

    def _format_pips(self, value: float) -> str:
        return f"{value:+.1f}" if value != 0 else f"{value:.1f}"

    def _setup_ui(self):
        main_frame = ttk.Frame(self.root, padding=10)
        main_frame.grid(row=0, column=0, sticky="nsew")
        self.root.grid_rowconfigure(0, weight=1); self.root.grid_columnconfigure(0, weight=1)

        search_frame = ttk.Labelframe(main_frame, text="検索条件", padding=10)
        search_frame.grid(row=0, column=0, sticky="ew", pady=5)
        search_frame.grid_columnconfigure(3, weight=1)

        # --- Row 0: 期間、ページネーション、リセットボタン ---
        ttk.Label(search_frame, text="期間:").grid(row=0, column=0, padx=(0, 5), pady=5, sticky="w")
        time_frame = ttk.Frame(search_frame)
        time_frame.grid(row=0, column=1, sticky="w", pady=5)
        self.start_date_var = tk.StringVar(); self.start_hour_var = tk.StringVar(); self.start_min_var = tk.StringVar()
        self.end_date_var = tk.StringVar(); self.end_hour_var = tk.StringVar(); self.end_min_var = tk.StringVar()
        self.start_date_check = ttk.Checkbutton(time_frame, variable=self.start_date_enabled_var, command=self._toggle_date_widgets)
        self.start_date_check.pack(side="left")
        self.start_date_entry = DateEntry(time_frame, textvariable=self.start_date_var, date_pattern='y/mm/dd', width=10)
        self.start_date_entry.pack(side="left")
        self.start_hour_spin = ttk.Spinbox(time_frame, from_=0, to=23, textvariable=self.start_hour_var, width=3, format="%02.0f")
        self.start_hour_spin.pack(side="left", padx=(5,0))
        ttk.Label(time_frame, text=":").pack(side="left")
        self.start_min_spin = ttk.Spinbox(time_frame, from_=0, to=59, textvariable=self.start_min_var, width=3, format="%02.0f")
        self.start_min_spin.pack(side="left")
        ttk.Label(time_frame, text=" ~ ").pack(side="left", padx=5)
        self.end_date_check = ttk.Checkbutton(time_frame, variable=self.end_date_enabled_var, command=self._toggle_date_widgets)
        self.end_date_check.pack(side="left")
        self.end_date_entry = DateEntry(time_frame, textvariable=self.end_date_var, date_pattern='y/mm/dd', width=10)
        self.end_date_entry.pack(side="left")
        self.end_hour_spin = ttk.Spinbox(time_frame, from_=0, to=23, textvariable=self.end_hour_var, width=3, format="%02.0f")
        self.end_hour_spin.pack(side="left", padx=(5,0))
        ttk.Label(time_frame, text=":").pack(side="left")
        self.end_min_spin = ttk.Spinbox(time_frame, from_=0, to=59, textvariable=self.end_min_var, width=3, format="%02.0f")
        self.end_min_spin.pack(side="left")
        pagination_frame = ttk.Frame(search_frame)
        pagination_frame.grid(row=0, column=2, sticky="w", padx=(20, 0))
        self.prev_button = ttk.Button(pagination_frame, text="前へ(P)", command=self._prev_page, state="disabled")
        self.prev_button.pack(side="left")
        self.page_info_var = tk.StringVar()
        self.page_combo = ttk.Combobox(pagination_frame, textvariable=self.page_info_var, values=['25件', '50件', '100件', '500件'], state="readonly", width=18, justify='center')
        self.page_combo.set("100件")
        self.items_per_page = 100
        self.page_combo.bind("<<ComboboxSelected>>", self._on_items_per_page_changed)
        self.page_combo.pack(side="left", padx=5)
        self.next_button = ttk.Button(pagination_frame, text="次へ(N)", command=self._next_page, state="disabled")
        self.next_button.pack(side="left")
        reset_button = ttk.Button(search_frame, text="条件リセット", command=self._reset_conditions)
        reset_button.grid(row=0, column=3, sticky="e", padx=(10, 5))

        # --- Row 1: 通貨、売買区分、ソート順、エクスポートボタン --- ★ レイアウト変更
        # ★ 変更: columnspanを4から3に変更し、エクスポートボタンのスペースを確保
        row1_frame = ttk.Frame(search_frame)
        row1_frame.grid(row=1, column=0, columnspan=3, sticky="w", pady=5)
        
        ttk.Label(row1_frame, text="通貨:").pack(side="left", pady=(5,0))
        self.symbol_var = tk.StringVar(value="(全通貨)")
        self.symbol_combo = ttk.Combobox(row1_frame, textvariable=self.symbol_var, width=15, state="readonly")
        self.symbol_combo.pack(side="left", padx=(5, 0), pady=(5,0))

        trade_type_frame = ttk.Labelframe(row1_frame, text="売買区分")
        trade_type_frame.pack(side="left", padx=(20, 10), pady=(5,0))
        ttk.Radiobutton(trade_type_frame, text="全部", variable=self.trade_type_filter_var, value="全部").pack(side="left", padx=(5,0))
        ttk.Radiobutton(trade_type_frame, text="売", variable=self.trade_type_filter_var, value="売").pack(side="left", padx=5)
        ttk.Radiobutton(trade_type_frame, text="買", variable=self.trade_type_filter_var, value="買").pack(side="left", padx=(0,5))
        
        sort_check = ttk.Checkbutton(row1_frame, text="新しい履歴から表示", variable=self.sort_desc_var, command=self._resort_and_display)
        sort_check.pack(side="left", padx=(0, 0), pady=(5,0))
        
        # ★ 追加: Excelエクスポートボタンを検索条件エリアに移動
        self.export_button = ttk.Button(search_frame, text="Excelへエクスポート", command=self._export_to_excel, state="disabled")
        self.export_button.grid(row=1, column=3, sticky="e", padx=(10, 5), pady=5)

        # --- 検索ボタン ---
        self.search_button = ttk.Button(search_frame, text="検索(S)", command=self.start_fetch_thread)
        self.search_button.grid(row=0, column=4, rowspan=2, padx=10, ipady=10, sticky="ns")

        # --- Treeview ---
        history_frame = ttk.Frame(main_frame)
        history_frame.grid(row=1, column=0, sticky="nsew", pady=10)
        main_frame.grid_rowconfigure(1, weight=1); main_frame.grid_columnconfigure(0, weight=1)
        columns = ("exec_time", "close_time", "type", "volume", "symbol", "open_price", "close_price", "gross_profit", "pips_profit", "net_profit", "swap", "comment", "entry")
        self.tree = ttk.Treeview(history_frame, columns=columns, show="headings")
        col_map = { "exec_time": ("約定時間", 140), "close_time": ("決済時間", 140), "type": ("取引", 40), "volume": ("数量", 60), "symbol": ("通貨ペア", 80), "open_price": ("約定価格", 90), "close_price": ("決済価格", 90), "gross_profit": ("売買損益", 90), "pips_profit": ("pips損益", 80), "net_profit": ("決済損益", 90), "swap": ("スワップ", 70), "comment": ("コメント", 120), "entry": ("エントリー", 60) }
        for col, (text, width) in col_map.items():
            self.tree.heading(col, text=text, anchor="center"); self.tree.column(col, width=width, anchor="e", stretch=tk.FALSE)
        self.tree.column("comment", anchor="w", stretch=tk.FALSE)
        vsb, hsb = ttk.Scrollbar(history_frame, orient="vertical", command=self.tree.yview), ttk.Scrollbar(history_frame, orient="horizontal", command=self.tree.xview)
        self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
        self.tree.grid(row=0, column=0, sticky="nsew"); vsb.grid(row=0, column=1, sticky="ns"); hsb.grid(row=1, column=0, sticky="ew")
        history_frame.grid_rowconfigure(0, weight=1); history_frame.grid_columnconfigure(0, weight=1)
        self.tree.tag_configure("positive", foreground="blue"); self.tree.tag_configure("negative", foreground="red")
        
        # ★ 変更: 合計欄のフレームのみに修正
        summary_frame = ttk.Frame(main_frame, padding=5)
        summary_frame.grid(row=2, column=0, sticky="ew")
        
        self.summary_vars = {}
        summary_items = ["ポジション数合計", "Lot数合計", "売買損益合計", "pips損益合計", "決済損益合計", "手数料合計", "スワップ合計"]
        for i, label_text in enumerate(summary_items):
            row, col = divmod(i, 4)
            ttk.Label(summary_frame, text=f"{label_text}:").grid(row=row, column=col*2, sticky="w", padx=(5,0), pady=2)
            var = tk.StringVar(value="0")
            self.summary_vars[label_text] = var
            value_label = tk.Label(summary_frame, textvariable=var, background="white", relief="solid", borderwidth=1, anchor="center", padx=4, font=("メイリオ", 10))
            value_label.grid(row=row, column=col*2 + 1, sticky="ew", padx=5, pady=2)
            self.summary_labels[label_text] = value_label
            summary_frame.grid_columnconfigure(col*2 + 1, weight=1)
        
        self.resize_manager = ResizeManager(self.root, self.tree, history_frame)

    def _reset_conditions(self):
        now = datetime.now()
        thirty_days_ago = now - timedelta(days=30)
        self.start_date_var.set(thirty_days_ago.strftime("%Y/%m/%d")); self.start_hour_var.set("00"); self.start_min_var.set("00")
        self.end_date_var.set(now.strftime("%Y/%m/%d")); self.end_hour_var.set(now.strftime("%H")); self.end_min_var.set(now.strftime("%M"))
        self.start_date_enabled_var.set(True); self.end_date_enabled_var.set(True)
        self._toggle_date_widgets()
        self.symbol_var.set("(全通貨)")
        self.sort_desc_var.set(True)
        self.trade_type_filter_var.set("全部")
        self.page_combo.set("100件")
        self.items_per_page = 100

    def _toggle_date_widgets(self):
        start_state = "normal" if self.start_date_enabled_var.get() else "disabled"
        for widget in [self.start_date_entry, self.start_hour_spin, self.start_min_spin]: widget.config(state=start_state)
        end_state = "normal" if self.end_date_enabled_var.get() else "disabled"
        for widget in [self.end_date_entry, self.end_hour_spin, self.end_min_spin]: widget.config(state=end_state)

    def _populate_symbols_from_history(self):
        try:
            from_date = datetime(2000, 1, 1, tzinfo=pytz.utc)
            to_date = datetime.now(pytz.utc) + timedelta(days=1)
            deals = mt5.history_deals_get(from_date, to_date)
            if deals:
                history_symbols = sorted(list(set(d.symbol for d in deals if d.symbol)))
                self.symbol_combo['values'] = ["(全通貨)"] + history_symbols
            else:
                self.symbol_combo['values'] = ["(全通貨)"]
        except Exception as e:
            print(f"履歴からの通貨ペア取得エラー: {e}"); self.symbol_combo['values'] = ["(全通貨)"]
        self.symbol_combo.current(0)
    
    def start_fetch_thread(self):
        self.search_button.config(state="disabled"); self.prev_button.config(state="disabled")
        self.next_button.config(state="disabled"); self.page_combo.config(state="disabled")
        self.export_button.config(state="disabled") # ★ 追加: 検索開始時にエクスポートボタンを無効化
        self.page_info_var.set("検索中...")
        self.tree.delete(*self.tree.get_children())
        for key, var in self.summary_vars.items():
            var.set("0"); self.summary_labels[key].config(foreground="black")
        self.all_trades = []
        self.progress_win = tk.Toplevel(self.root)
        self.progress_win.title(""); self.progress_win.geometry("200x50")
        self.progress_win.transient(self.root); self.progress_win.grab_set()
        self.progress_win.protocol("WM_DELETE_WINDOW", lambda: None)
        ttk.Label(self.progress_win, text="処理中...").pack(expand=True)
        threading.Thread(target=self._fetch_history_thread, daemon=True).start()
        self.root.after(100, self.process_queue)

    def process_queue(self):
        try:
            while True:
                message = self.data_queue.get_nowait()
                if isinstance(message, tuple):
                    msg_type, payload = message
                    if msg_type == "error":
                        self.progress_win.destroy(); self.search_button.config(state="normal")
                        self.page_combo.config(state="readonly"); self.page_info_var.set("---")
                        self.export_button.config(state="disabled") # ★ 追加
                        err_code, err_msg = payload
                        messagebox.showerror("エラー", f"履歴の取得に失敗しました: {err_msg} ({err_code})")
                        return
                    else: self._initialize_display(payload)
                elif message == "done":
                    self.progress_win.destroy(); self.search_button.config(state="normal")
                    self.page_combo.config(state="readonly")
                    if not self.all_trades:
                         self._update_page_info_text()
                         self.export_button.config(state="disabled") # ★ 追加
                         messagebox.showinfo("情報", "指定された期間の取引履歴はありません。")
                    return
        except queue.Empty: self.root.after(100, self.process_queue)

    def _fetch_history_thread(self):
        try:
            from_utc = datetime(2000, 1, 1, tzinfo=pytz.utc)
            if self.start_date_enabled_var.get():
                from_local = datetime.strptime(f"{self.start_date_var.get()} {self.start_hour_var.get()}:{self.start_min_var.get()}", "%Y/%m/%d %H:%M")
                from_utc = self.timezone.localize(from_local).astimezone(pytz.utc)
            
            to_utc = datetime.now(pytz.utc)
            if self.end_date_enabled_var.get():
                to_local = datetime.strptime(f"{self.end_date_var.get()} {self.end_hour_var.get()}:{self.end_min_var.get()}", "%Y/%m/%d %H:%M")
                to_utc = self.timezone.localize(to_local).astimezone(pytz.utc)

            symbol_filter = self.symbol_var.get()
            if symbol_filter == "(全通貨)": symbol_filter = None
            
            trade_type_filter = self.trade_type_filter_var.get()

        except ValueError:
            self.data_queue.put(("error", (0, "日付や時間の形式が正しくありません。"))); return

        deals = mt5.history_deals_get(from_utc, to_utc, group="*")
        if deals is None: self.data_queue.put(("error", mt5.last_error())); return
        if not deals: self.data_queue.put("done"); return

        if symbol_filter: deals = [d for d in deals if d.symbol == symbol_filter]

        symbol_info_cache = {s: mt5.symbol_info(s) for s in {d.symbol for d in deals if d.symbol} if mt5.symbol_info(s)}
        positions = defaultdict(lambda: {'in': [], 'out': []})
        for deal in deals:
            if deal.entry == mt5.DEAL_ENTRY_IN: positions[deal.position_id]['in'].append(deal)
            elif deal.entry == mt5.DEAL_ENTRY_OUT: positions[deal.position_id]['out'].append(deal)
        
        all_trades, totals = [], {"pos": 0, "lots": 0.0, "gross": 0.0, "pips": 0.0, "net": 0.0, "comm": 0.0, "swap": 0.0}
        
        for pos_id, pos_deals in positions.items():
            if not pos_deals['in'] or not pos_deals['out']: continue
            in_deals = sorted(pos_deals['in'], key=lambda d: d.time_msc)
            out_deals = sorted(pos_deals['out'], key=lambda d: d.time_msc)
            for out_deal in out_deals:
                in_deal = in_deals[0] if in_deals else None
                if not in_deal: continue
                
                trade_type_str = "買" if in_deal.type == mt5.DEAL_TYPE_BUY else "売"
                if trade_type_filter != "全部" and trade_type_filter != trade_type_str:
                    continue 
                
                info = symbol_info_cache.get(in_deal.symbol)
                if not info: continue
                
                pip_size = info.point * 10 if info.digits in (3, 5) else info.point
                if pip_size == 0:
                    pips = 0.0
                else:
                    price_diff = (out_deal.price - in_deal.price) if in_deal.type == mt5.DEAL_TYPE_BUY else (in_deal.price - out_deal.price)
                    pips = price_diff / pip_size
                
                net_profit = out_deal.profit + out_deal.commission + out_deal.swap
                row_tag = "positive" if net_profit > 0 else ("negative" if net_profit < 0 else "")

                all_trades.append({
                    "close_time": out_deal.time, "tag": row_tag,
                    "data": (
                        datetime.fromtimestamp(in_deal.time, tz=pytz.utc).astimezone(self.timezone).strftime('%Y.%m.%d %H:%M:%S'),
                        datetime.fromtimestamp(out_deal.time, tz=pytz.utc).astimezone(self.timezone).strftime('%Y.%m.%d %H:%M:%S'),
                        trade_type_str, f"{out_deal.volume:.2f}", out_deal.symbol,
                        f"{in_deal.price:.{info.digits}f}", f"{out_deal.price:.{info.digits}f}",
                        self._format_currency(out_deal.profit), self._format_pips(pips),
                        self._format_currency(net_profit), self._format_currency(out_deal.swap),
                        out_deal.comment, "OUT"
                    )})
                totals['pos']+=1; totals['lots']+=out_deal.volume; totals['gross']+=out_deal.profit; totals['pips']+=pips;
                totals['net']+=net_profit; totals['comm']+=out_deal.commission; totals['swap']+=out_deal.swap
        
        all_trades.sort(key=lambda x: x['close_time'], reverse=self.sort_desc_var.get())
        
        self.data_queue.put(("data", (all_trades, totals)))
        self.data_queue.put("done")

    def _initialize_display(self, data):
        all_trades, totals = data
        self.all_trades = all_trades
        self._on_items_per_page_changed() 
        self.summary_vars["ポジション数合計"].set(f"{totals['pos']:,}")
        self.summary_labels["ポジション数合計"].config(foreground="black")
        self.summary_vars["Lot数合計"].set(f"{totals['lots']:.2f}")
        self.summary_labels["Lot数合計"].config(foreground="black")
        financial_keys = {"売買損益合計": totals['gross'], "pips損益合計": totals['pips'], "決済損益合計": totals['net'], "手数料合計": totals['comm'], "スワップ合計": totals['swap']}
        for key, value in financial_keys.items():
            formatter = self._format_pips if 'pips' in key else self._format_currency
            self.summary_vars[key].set(formatter(value))
            self.summary_labels[key].config(foreground="blue" if value > 0 else "red" if value < 0 else "black")
        self.current_page = 1
        self.total_pages = math.ceil(len(self.all_trades) / self.items_per_page) if self.items_per_page > 0 else 0
        self._display_page()

    def _display_page(self):
        self.tree.delete(*self.tree.get_children())
        start_index = (self.current_page - 1) * self.items_per_page
        end_index = start_index + self.items_per_page
        for trade in self.all_trades[start_index:end_index]:
            self.tree.insert("", "end", values=trade['data'], tags=(trade['tag'],))
        self._update_page_info_text()
        self.prev_button.config(state="normal" if self.current_page > 1 else "disabled")
        self.next_button.config(state="normal" if self.current_page < self.total_pages else "disabled")
        # ★ 追加: 履歴の有無に応じてエクスポートボタンの状態を更新
        has_data = len(self.all_trades) > 0
        self.export_button.config(state="normal" if has_data else "disabled")
        
    def _update_page_info_text(self):
        total_items = len(self.all_trades)
        if total_items == 0: 
            self.page_info_var.set("0件 (0/0)")
            return
        start_item = (self.current_page - 1) * self.items_per_page + 1
        end_item = min(self.current_page * self.items_per_page, total_items)
        self.page_info_var.set(f"{start_item}~{end_item}件 ({self.current_page}/{self.total_pages})")

    def _on_items_per_page_changed(self, event=None):
        try: 
            selected_value = self.page_combo.get()
            if "件" in selected_value:
                self.items_per_page = int(selected_value.replace('件', ''))
            else:
                self.items_per_page = 100
        except (ValueError, AttributeError): 
            self.items_per_page = 100
        
        if self.all_trades:
            self.current_page = 1
            self.total_pages = math.ceil(len(self.all_trades) / self.items_per_page) if self.items_per_page > 0 else 0
            self._display_page()

    def _resort_and_display(self):
        if not self.all_trades: return
        self.all_trades.sort(key=lambda x: x['close_time'], reverse=self.sort_desc_var.get())
        self.current_page = 1; self._display_page()

    def _prev_page(self):
        if self.current_page > 1: self.current_page -= 1; self._display_page()

    def _next_page(self):
        if self.current_page < self.total_pages: self.current_page += 1; self._display_page()

    # ★ 変更なし: Excelエクスポート機能のロジック自体は変更ありません
    def _export_to_excel(self):
        """Treeviewに表示されているデータをExcelファイルに出力する。"""
        if not self.tree.get_children():
            messagebox.showinfo("情報", "エクスポートするデータがありません。")
            return

        filepath = filedialog.asksaveasfilename(
            defaultextension=".xlsx",
            filetypes=[("Excel ファイル", "*.xlsx"), ("すべてのファイル", "*.*")],
            title="Excelファイルとして保存",
            initialfile=f"取引履歴_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
        )
        
        if not filepath:
            return

        try:
            self.root.config(cursor="watch")
            self.root.update_idletasks()
            
            columns = self.tree['columns']
            headers = [self.tree.heading(col, 'text') for col in columns]

            data = []
            # ★ 変更: all_trades (検索結果全体) をエクスポート対象にする
            for trade in self.all_trades:
                data.append(trade['data'])

            df = pd.DataFrame(data, columns=headers)
            df.to_excel(filepath, index=False, engine='openpyxl')
            messagebox.showinfo("成功", f"データを正常にエクスポートしました。\nファイル: {filepath}")
        except Exception as e:
            messagebox.showerror("エクスポートエラー", f"ファイルのエクスポート中にエラーが発生しました。\n\n詳細: {e}")
        finally:
            self.root.config(cursor="")


    def _on_closing(self):
        if messagebox.askokcancel("終了", "アプリケーションを終了しますか?"): self.root.destroy()
        
    def run(self): self.root.mainloop()

def main():
    term_path = choose_terminal()
    if not term_path: 
        messagebox.showinfo("終了", "MT5ターミナルが選択されなかったため、アプリケーションを終了します。")
        sys.exit("MT5ターミナルが選択されませんでした。")
    app = HistoryViewer(terminal_path=term_path)
    if app.root.winfo_exists():
        app.run()

if __name__ == "__main__":
    main()

仮想通貨の無料ソースコード

Python MMbot(マーケットメイク)

以下のソースコードではマーケットメイク戦略を行います。トレードロジックは以下の記事を参考にしています。

マーケットメイク戦略とは、ロング・ショートの指値注文(メイク注文)を提示し続け、他のトレーダーと売買することでスプレッドを稼ぐ手法のこと。

FX・株式市場・CEXだと競合が強すぎて利益が出せませんが、新興DEXであれば短期間で荒稼ぎすることは可能です。

import asyncio
import time
import logging
from typing import Dict

# ロギングの設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class MarketMakerBot:
    """
    ブログ記事「Adventures in Spot Market Making」に基づいた
    段階的なロジックを持つマーケットメイキングボット。
    """

    def __init__(self, api_key: str, api_secret: str, symbol: str, config: Dict):
        """
        ボットの初期化
        :param api_key: 取引所のAPIキー
        :param api_secret: 取引所のAPIシークレット
        :param symbol: 取引ペア (例: 'BTC/USDT')
        :param config: 戦略パラメータを含む辞書
        """
        self.api_key = api_key
        self.api_secret = api_secret
        self.symbol = symbol
        self.config = config

        # 状態を保持する変数
        self.inventory = {'base': 0.0, 'quote': 0.0}
        self.last_mid_price = 0.0
        
        logging.info("マーケットメイキングボットを初期化しました。")
        logging.info(f"取引ペア: {self.symbol}")
        logging.info(f"設定: {self.config}")

    # --- Step 1: 取引所API連携(ダミー実装) ---
    # 以下のメソッドを、お使いの取引所のAPIに合わせて実装してください。
    # 例として、ccxtライブラリなどを使うと便利です。

    async def _get_bbo(self) -> Dict[str, float]:
        """
        最良買気配(Best Bid)と最良売気配(Best Ask)を取得する。
        # TODO: ここに取引所固有のAPI呼び出しを実装
        """
        # ダミーデータ(テスト用)
        # 実際のAPIでは市場価格が返されます
        bid = 99.95
        ask = 100.05
        logging.debug(f"BBO取得: Bid={bid}, Ask={ask}")
        return {'bid': bid, 'ask': ask}

    async def _get_inventory(self) -> Dict[str, float]:
        """
        現在の資産残高(ベース通貨とクオート通貨)を取得する。
        # TODO: ここに取引所固有のAPI呼び出しを実装
        """
        # ダミーデータ(テスト用)
        # 実際にはAPIからウォレット残高を取得します
        self.inventory = {'base': 5.0, 'quote': 500.0}
        logging.debug(f"在庫取得: {self.inventory}")
        return self.inventory

    async def _place_order(self, side: str, price: float, size: float):
        """
        新規注文を出す。
        # TODO: ここに取引所固有のAPI呼び出しを実装
        """
        logging.info(f"【注文実行】 Side: {side}, Price: {price:.4f}, Size: {size:.6f}")
        # print(f"PLACING ORDER: {side} {size} {self.symbol} at {price}")
        
    async def _cancel_all_orders(self):
        """
        この取引ペアの未約定注文を全てキャンセルする。
        # TODO: ここに取引所固有のAPI呼び出しを実装
        """
        logging.info("全ての未約定注文をキャンセルします。")
        # print("CANCELLING ALL ORDERS")

    # --- 戦略ロジック ---

    def _calculate_quotes(self, mid_price: float) -> Dict[str, float]:
        """
        現在の戦略に基づいて、最終的な買い(bid)と売り(ask)の価格とサイズを計算する。
        """
        
        # --- Step 2: ナイーブな戦略(基本のスプレッド)---
        # 仲値から固定スプレッドをあけて価格を決定
        # BPS (Basis Points): 1 BPS = 0.01% = 0.0001
        spread_bps = self.config['default_width_bps']
        
        bid_price = mid_price * (1 - spread_bps / 2 / 10000)
        ask_price = mid_price * (1 + spread_bps / 2 / 10000)
        
        # --- Step 3 & 4: 在庫に基づいた調整 ---
        if self.config['use_inventory_management']:
            # 在庫スキューを計算
            base_value = self.inventory['base'] * mid_price
            quote_value = self.inventory['quote']
            total_value = base_value + quote_value
            
            if total_value == 0:
                inventory_skew = 0
            else:
                # 在庫の偏り (-1: 全てquote, +1: 全てbase)
                inventory_skew = (base_value - quote_value) / total_value

            # --- Step 4: 価格のスキュー ---
            # 在庫が多すぎる資産を売る方向に価格を偏らせる
            # inventory_skew > 0 (baseが多い) -> 売りたい -> 価格を全体的に下げる
            # inventory_skew < 0 (quoteが多い) -> 買いたい -> 価格を全体的に上げる
            dampening = self.config['dampening_factor']
            offset_bps = -1 * dampening * inventory_skew * spread_bps
            
            bid_price = mid_price * (1 - (spread_bps / 2 - offset_bps) / 10000)
            ask_price = mid_price * (1 + (spread_bps / 2 + offset_bps) / 10000)

            logging.info(f"在庫スキュー: {inventory_skew:.2f}, 価格オフセット: {offset_bps:.2f} BPS")
            
            # --- Step 3: サイズの調整 ---
            # 各資産の一定割合を注文サイズとする
            order_size_pct = self.config['order_size_pct']
            bid_size = (self.inventory['quote'] * order_size_pct) / bid_price
            ask_size = self.inventory['base'] * order_size_pct
        else:
            # ナイーブ戦略の場合、サイズは固定
            bid_size = self.config['fixed_order_size']
            ask_size = self.config['fixed_order_size']

        return {
            'bid_price': bid_price,
            'bid_size': bid_size,
            'ask_price': ask_price,
            'ask_size': ask_size,
        }

    async def run(self):
        """
        ボットのメインループ
        """
        logging.info("ボットのメインループを開始します...")
        
        while True:
            try:
                # 1. 最新の市場情報と在庫情報を取得
                bbo = await self._get_bbo()
                await self._get_inventory()

                if not bbo or 'bid' not in bbo or 'ask' not in bbo:
                    logging.warning("有効なBBOを取得できませんでした。スキップします。")
                    await asyncio.sleep(self.config['loop_interval_sec'])
                    continue

                mid_price = (bbo['bid'] + bbo['ask']) / 2

                # 2. 仲値が大きく動いたかチェック (リクオートのトリガー)
                price_change_bps = abs(mid_price - self.last_mid_price) / self.last_mid_price * 10000 if self.last_mid_price > 0 else float('inf')
                
                if price_change_bps > self.config['relist_threshold_bps']:
                    logging.info(f"価格が{price_change_bps:.2f} BPS変動しました。リクオートします。")
                    logging.info(f"旧仲値: {self.last_mid_price:.4f}, 新仲値: {mid_price:.4f}")
                    
                    # 3. 既存の注文をキャンセル
                    await self._cancel_all_orders()

                    # 4. 新しいクオートを計算
                    quotes = self._calculate_quotes(mid_price)

                    # 5. 新しい注文を出す
                    if quotes['bid_size'] > 0:
                        await self._place_order('buy', quotes['bid_price'], quotes['bid_size'])
                    if quotes['ask_size'] > 0:
                        await self._place_order('sell', quotes['ask_price'], quotes['ask_size'])

                    # 6. 最後の仲値を更新
                    self.last_mid_price = mid_price
                else:
                    logging.debug("価格変動は閾値以下です。注文は維持します。")

            except Exception as e:
                logging.error(f"ループ中にエラーが発生しました: {e}")
            
            await asyncio.sleep(self.config['loop_interval_sec'])


if __name__ == '__main__':
    # --- 重要: このボットを実際の資金で運用する前には ---
    # 1. 取引所のテストネット(Testnet)で十分にテストしてください。
    # 2. API連携部分(# TODO:)を正しく実装してください。
    # 3. パラメータを市場の状況に合わせて慎重に調整してください。
    # 4. 小額から始めて、ボットの挙動を監視してください。
    # ----------------------------------------------------

    # --- ボットの設定 ---
    bot_config = {
        # --- 基本戦略パラメータ ---
        'default_width_bps': 75,      # 仲値からの基本スプレッド (75 BPS = 0.75%)
        'relist_threshold_bps': 10,   # このBPS以上価格が動いたらリクオートする (10 BPS = 0.1%)
        'loop_interval_sec': 5,       # メインループの実行間隔(秒)

        # --- Step 2: ナイーブ戦略用 ---
        'fixed_order_size': 0.01,     # use_inventory_management=False の場合の固定注文サイズ

        # --- Step 3 & 4: 在庫管理戦略用 ---
        'use_inventory_management': True, # Trueにすると在庫管理とスキューが有効になる
        'order_size_pct': 0.1,        # 注文に使う在庫の割合 (10%)
        'dampening_factor': 0.3,      # 在庫スキューが価格に与える影響の減衰係数
    }

    # ボットのインスタンスを作成
    # api_keyとapi_secretは環境変数などから安全に読み込むことを推奨します
    mm_bot = MarketMakerBot(
        api_key="YOUR_API_KEY",
        api_secret="YOUR_API_SECRET",
        symbol="ASTER/USDT",  # 例: 取引したいペア
        config=bot_config
    )

    # ボットを実行
    try:
        asyncio.run(mm_bot.run())
    except KeyboardInterrupt:
        logging.info("ボットの実行が手動で停止されました。")

0コストお削りbot(Bybit/Hyperliquid)

以下のソースコードは、Bybit/Hyperliquidで鞘取りトレード(ベーシストレーディング)を行います。トレードロジックは、以下のnoteを参考にしています。

ベーシストレーディングとは、同一資産の現物・先物で価格が乖離したときに両建てをすることで、乖離縮小後に利益を得る手法のこと。鞘取りトレードとも呼ばれます。

import asyncio
import os
import time
from dotenv import load_dotenv

# Bybit
from pybit.unified_trading import WebSocket

# Hyperliquid
from hyperliquid.info import Info
from hyperliquid.exchange import Exchange
from hyperliquid.utils import constants

# .envファイルから環境変数を読み込む
load_dotenv()

# --- 設定項目 ---
# Bybitの設定
BYBIT_API_KEY = os.getenv("BYBIT_API_KEY")
BYBIT_API_SECRET = os.getenv("BYBIT_API_SECRET")
BYBIT_SYMBOL = "ETHUSDC"  # Bybitの取引ペア (USDC建て)
BYBIT_CATEGORY = "spot"   # スポット取引

# Hyperliquidの設定
HYPERLIQUID_PRIVATE_KEY = os.getenv("HYPERLIQUID_PRIVATE_KEY")
HYPERLIQUID_ASSET = "ETH" # Hyperliquidの資産名
# メインネットを使う場合は constants.MAINNET_API_URL に変更
HYPERLIQUID_API_URL = constants.TESTNET_API_URL

# 取引戦略の設定
TRADE_SIZE_USD = 10  # 1回あたりの取引サイズ(USD相当)
OPEN_THRESHOLD_BPS = 20.0  # ポジションを持つための利益率の閾値 (bps)
CLOSE_THRESHOLD_BPS = 0.0   # ポジションを閉じるための利益率の閾値 (bps)

# --- グローバル変数 ---
# 最新の価格情報を格納する辞書
latest_prices = {
    "bybit_bid": 0,
    "bybit_ask": 0,
    "hl_bid": 0,
    "hl_ask": 0,
}
# ポジションを保有しているかどうかの状態を管理
in_position = False

# --- Hyperliquidのセットアップ ---
info = Info(HYPERLIQUID_API_URL, skip_ws=True)
# 自分のウォレットアドレスを取得
account = Exchange.account_from_private_key(HYPERLIQUID_PRIVATE_KEY)
exchange = Exchange(account, HYPERLIQUID_API_URL)
# Hyperliquidの資産情報を取得(例: ETHの小数点以下の桁数など)
meta = info.meta()
asset_info = next(a for a in meta["universe"] if a["name"] == HYPERLIQUID_ASSET)
asset_decimals = asset_info["szDecimals"]

# --- メインロジック ---
async def check_and_trade():
    """価格差をチェックして、条件を満たせば取引を実行する"""
    global in_position

    # 価格情報がまだ揃っていない場合は何もしない
    if 0 in latest_prices.values():
        return

    hl_bid = latest_prices["hl_bid"]
    bybit_ask = latest_prices["bybit_ask"]
    bybit_bid = latest_prices["bybit_bid"]
    hl_ask = latest_prices["hl_ask"]

    # --- 1. ポジションを開く条件の計算 (Hyperliquid Short, Bybit Long) ---
    # open_pnl = (HyperliquidのBid価格 - BybitのAsk価格) / 平均価格
    if hl_bid > 0 and bybit_ask > 0:
        avg_open_price = (hl_bid + bybit_ask) / 2
        open_pnl_bps = ((hl_bid - bybit_ask) / avg_open_price) * 10000
    else:
        open_pnl_bps = -9999

    # --- 2. ポジションを閉じる条件の計算 (Hyperliquid Long, Bybit Short) ---
    # close_pnl = (BybitのBid価格 - HyperliquidのAsk価格) / 平均価格
    if bybit_bid > 0 and hl_ask > 0:
        avg_close_price = (bybit_bid + hl_ask) / 2
        close_pnl_bps = ((bybit_bid - hl_ask) / avg_close_price) * 10000
    else:
        close_pnl_bps = -9999

    print(
        f"Status: {'In Position' if in_position else 'Idle'} | "
        f"Open PnL: {open_pnl_bps:.2f} bps | "
        f"Close PnL: {close_pnl_bps:.2f} bps"
    )

    # --- 3. 取引実行 ---
    if not in_position and open_pnl_bps >= OPEN_THRESHOLD_BPS:
        print("\n--- [OPEN] 条件を満たしました。ポジションを開きます ---")
        # 数量を計算
        trade_size_asset = TRADE_SIZE_USD / avg_open_price
        
        # Hyperliquidで売り(ショート)
        print(f"Hyperliquid: {trade_size_asset:.{asset_decimals}f} {HYPERLIQUID_ASSET}を売り")
        # is_buy=Falseで売り, reduce_only=Falseで新規ポジション
        order_hl = exchange.order(HYPERLIQUID_ASSET, is_buy=False, sz=trade_size_asset,
                                  limit_px=hl_bid, order_type={"market": {}}, reduce_only=False)
        print(f"  -> HL Order: {order_hl}")

        # Bybitで買い(ロング) - ここでは擬似的にログ出力のみ
        # 実際のBybitの注文APIをここに実装します
        print(f"Bybit: {trade_size_asset:.{asset_decimals}f} {BYBIT_SYMBOL}を買い")
        # bybit_client.place_order(...)

        # 両方の注文が成功したと仮定して状態を変更
        in_position = True
        print("--- ポジションを開きました ---\n")

    elif in_position and close_pnl_bps >= CLOSE_THRESHOLD_BPS:
        print("\n--- [CLOSE] 条件を満たしました。ポジションを閉じます ---")
        # 数量を計算
        trade_size_asset = TRADE_SIZE_USD / avg_close_price

        # Hyperliquidで買い(ショートポジションをクローズ)
        print(f"Hyperliquid: {trade_size_asset:.{asset_decimals}f} {HYPERLIQUID_ASSET}を買い")
        # is_buy=Trueで買い, reduce_only=Trueでポジションを閉じる注文
        order_hl = exchange.order(HYPERLIQUID_ASSET, is_buy=True, sz=trade_size_asset,
                                  limit_px=hl_ask, order_type={"market": {}}, reduce_only=True)
        print(f"  -> HL Order: {order_hl}")

        # Bybitで売り(ロングポジションをクローズ)
        # 実際のBybitの注文APIをここに実装します
        print(f"Bybit: {trade_size_asset:.{asset_decimals}f} {BYBIT_SYMBOL}を売り")
        # bybit_client.place_order(...)

        # 両方の注文が成功したと仮定して状態を変更
        in_position = False
        print("--- ポジションを閉じました ---\n")


# --- WebSocketデータハンドラ ---
def handle_bybit_message(message):
    """Bybit WebSocketからメッセージを受け取り、価格を更新"""
    try:
        data = message["data"]
        # orderbook.1トピックは最初の1件が最良価格
        if data["b"] and data["a"]:
            latest_prices["bybit_bid"] = float(data["b"][0][0])
            latest_prices["bybit_ask"] = float(data["a"][0][0])
    except Exception as e:
        print(f"Bybit WebSocket message error: {e}")

async def subscribe_to_hyperliquid():
    """Hyperliquid WebSocketに接続し、価格を更新"""
    async def handle_hl_l2_book(event):
        try:
            levels = event["data"]["levels"]
            if levels[0]:  # Bid side
                latest_prices["hl_bid"] = float(levels[0][0]["px"])
            if levels[1]:  # Ask side
                latest_prices["hl_ask"] = float(levels[1][0]["px"])
        except Exception as e:
            print(f"Hyperliquid WebSocket message error: {e}")

    info.subscribe({"type": "l2Book", "coin": HYPERLIQUID_ASSET}, handle_hl_l2_book)

# --- メイン実行関数 ---
async def main():
    # 1. Bybit WebSocketのセットアップと接続
    bybit_ws = WebSocket(
        testnet=True,  # メインネットの場合は False
        channel_type=BYBIT_CATEGORY,
        api_key=BYBIT_API_KEY,
        api_secret=BYBIT_API_SECRET,
    )
    bybit_ws.subscribe_orderbook(symbol=BYBIT_SYMBOL, depth=1, callback=handle_bybit_message)
    print("Bybit WebSocketに接続中...")
    # Bybitは別スレッドで実行されるので、ここでは待機しない

    # 2. Hyperliquid WebSocketのセットアップと接続
    print("Hyperliquid WebSocketに接続中...")
    await subscribe_to_hyperliquid()

    # 3. メインループ: 1秒ごとに価格をチェックして取引を試みる
    while True:
        await check_and_trade()
        await asyncio.sleep(1)


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nBotを終了します。")

バイナリーオプションの無料ソースコード

バイナリー業者向けのレイテンシー取引ツール(うみver)

以下のソースコードは、バイナリ業者でレイテンシー取引を行います。Chromeブラウザをプログラムで動かし、MT5との価格が乖離したときに、ブラウザ内のバイナリー業者の注文ボタンをクリックしてエントリーします。

ただレイテンシーアービトラージはブローカー側による口座凍結・利益没収・出金拒否などのリスクが高く、難易度が高いです。初心者にはおすすめしません。

import json
import asyncio
import re
import requests
import websockets
import pychrome
import time
from collections import deque
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import MetaTrader5 as mt5
import psutil
import sys
import pyautogui # ▼▼▼ 追加: pyautogui をインポート ▼▼▼

# --- ▼ ユーザー設定 ▼ ---
YOUR_HASH = ""  # 自身のハッシュ値に書き換えてください
TRADE_AMOUNT = "500"
TIMEFRAME_ID = "15" # 30秒取引:"15", 1分取引:"16", ...
TRADE_MODE = "demo"
MT5_SYMBOL = "USDJPY"

# ▼▼▼ 追加・変更箇所: マウスクリック用の座標設定 ▼▼▼
# 【超重要】これらの座標は**仮の値**です。
# 実際の取引プラットフォームの「PUT」と「CALL」ボタンの正確な中心座標に設定してください。
# 設定を誤るとクリックが全く機能しません。
#
# ★ 座標特定の手順 ★
# 1. 取引プラットフォームのページをChromeで開く。
# 2. Chromeの**ズームレベルを必ず100%**に設定。
# 3. ブラウザの**ウィンドウサイズを固定**し、常にその位置に配置する。
# 4. マウスカーソルを「PUT」または「CALL」ボタンの中心に手動で合わせる。
# 5. Pythonのインタラクティブシェルや簡単なスクリプトで `import pyautogui; pyautogui.position()` を実行し、
#    その時点のマウスカーソルのX, Y座標を取得する。
#    例:
#    >>> import pyautogui
#    >>> pyautogui.position()
#    Point(x=900, y=500)
#
# ★ その他重要な注意点 (pyautogui 使用時) ★
# ・プログラム実行中は**ブラウザウィンドウを最小化せず、フォアグラウンドに表示**してください。
# ・プログラム実行中に**マウスやキーボードを操作しないでください**。誤動作の原因になります。
# ・これはOSレベルでの物理クリックであり、画面に表示されているものをクリックします。
PUT_BUTTON_COORD_X = 1742  # 例: PUTボタンのX座標 (画面左上を(0,0)とするピクセル値)
PUT_BUTTON_COORD_Y = 575  # 例: PUTボタンのY座標
CALL_BUTTON_COORD_X = 1732  # 例: CALLボタンのX座標
CALL_BUTTON_COORD_Y = 359  # 例: CALLボンのY座標

# --- ▼ エントリーロジック設定 ▼ ---
# 価格差の移動平均を計算するために、何回分のデータを保持するか
AVERAGE_PERIOD = 50
# 平均の価格差から、何pips乖離したらエントリーするか (例: 0.5pips = 0.005)
ENTRY_THRESHOLD_PIPS = 0.005
# 一度エントリーしてから、次にエントリーするまでの待機時間(秒)
ENTRY_COOLDOWN_SECONDS = 15
# 注文失敗時に再試行する最大回数
MAX_ORDER_RETRY_ATTEMPTS = 3
# 注文再試行までの待機時間(秒)
ORDER_RETRY_DELAY_SECONDS = 5
# --- ▲ エントリーロジック設定 ▲ ---


# --- 以下は基本的に編集不要 ---

# グローバル変数
diff_history = deque(maxlen=AVERAGE_PERIOD)
last_entry_time = 0

def log_message(message):
    print(f"[INFO] {message}")

def find_mt5_instances():
    instances = []
    for proc in psutil.process_iter(['name', 'exe']):
        try:
            if proc.info['name'] in ['terminal64.exe', 'terminal.exe']:
                if proc.info['exe'] and proc.info['exe'] not in instances:
                    instances.append(proc.info['exe'])
        except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
            pass
    return instances

def select_mt5_instance(instances):
    if not instances:
        log_message("MT5: 起動しているMT5が見つかりませんでした。")
        return None
    if len(instances) == 1:
        log_message(f"MT5: 起動中のMT5を1つ検出しました: {instances[0]}")
        return instances[0]
    print("-" * 50)
    log_message("複数のMT5が起動しています。接続するMT5を選択してください:")
    for i, path in enumerate(instances):
        print(f"  [{i + 1}] {path}")
    while True:
        try:
            choice = int(input(f"番号を入力してください (1-{len(instances)}): ")) - 1
            if 0 <= choice < len(instances): return instances[choice]
            else: print("無効な番号です。")
        except ValueError: print("数値を入力してください。")
        except (KeyboardInterrupt, EOFError): return None

# --- ブラウザへの接続処理 ---
# SeleniumとPyChromeはWebSocketによる価格データ取得のために引き続き使用します。
# クリックにはpyautoguiを使用するため、pychromeでのInput.dispatchMouseEventは不要になります。
try:
    chrome_options = Options()
    chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
    driver = webdriver.Chrome(options=chrome_options)
    log_message("Selenium: 既存のChromeセッションに接続しました (port 9222)")
except Exception as e:
    log_message(f"Selenium: 接続に失敗しました: {e}"); exit()
try:
    browser = pychrome.Browser(url="http://127.0.0.1:9222")
    tabs = browser.list_tab()
    tab = tabs[0] if tabs else None
    if tab: tab.start(); log_message("PyChrome: 接続に成功しました (port 9222)")
    else: log_message("PyChrome: 利用可能なタブが見つかりません")
except Exception as e:
    log_message(f"PyChrome: 接続に失敗しました: {e}"); tab = None


async def setup_websocket_connection():
    # この関数は、pyautoguiによるクリックとは直接関係ありませんが、
    # 価格データ取得のために既存のWebSocket接続設定を維持します。
    if not tab:
        log_message("BO: PyChrome タブが利用できないためWebSocket設定をスキップします。")
        return False

    js_websocket_setup = f"""
    (async function() {{
        if (typeof window.boWebSocket !== 'undefined' && window.boWebSocket.readyState !== WebSocket.CLOSED) {{
            console.log("Closing existing BO WebSocket before re-setup.");
            window.boWebSocket.close();
            await new Promise(r => setTimeout(r, 100));
        }}

        window.boWebSocket = new WebSocket("wss://bo.zentrader.com:27017/");

        window.boWebSocket.onclose = (event) => {{
            console.warn("BO WebSocket closed. Code:", event.code, "Reason:", event.reason);
        }};
        window.boWebSocket.onerror = (error) => {{
            console.error("BO WebSocket error:", error);
        }};

        let timeout = 10000;
        let interval = 500;
        let elapsed = 0;

        while (window.boWebSocket.readyState !== WebSocket.OPEN && elapsed < timeout) {{
            await new Promise(r => setTimeout(r, interval));
            elapsed += interval;
        }}

        if (window.boWebSocket.readyState === WebSocket.OPEN) {{
            console.log("BO WebSocket opened and ready for initial commands.");
            const now = Math.floor(Date.now() / 1000), tE = now, tB = now - 500;

            window.boWebSocket.send(JSON.stringify({{
                command: "connect",
                bo: "{TRADE_MODE}",
                hash: "{YOUR_HASH}",
                platform: "mt4",
                source: "site"
            }}));
            console.log("connect sent");

            await new Promise(r => setTimeout(r, 300));
            window.boWebSocket.send(JSON.stringify({{ command: "get_user_data" }}));
            console.log("get_user_data sent");

            await new Promise(r => setTimeout(r, 300));
            window.boWebSocket.send(JSON.stringify({{ command: "hook_time", enable: "true" }}));
            console.log("hook_time sent");

            await new Promise(r => setTimeout(r, 300));
            window.boWebSocket.send(JSON.stringify({{ command: "get_cfg_trade", language: "ja" }}));
            console.log("get_cfg_trade sent");

            await new Promise(r => setTimeout(r, 300));
            window.boWebSocket.send(JSON.stringify({{ command: "get_user_settings" }}));
            console.log("get_user_settings sent");

            await new Promise(r => setTimeout(r, 300));
            window.boWebSocket.send(JSON.stringify({{
                command: "hook_timeframes",
                option_kind: "1",
                tool_id: "4",
                timeframe_id: "{TIMEFRAME_ID}",
                bo: "{TRADE_MODE}",
                enable: "true",
                interval: "1000",
                source: "site"
            }}));
            console.log("hook_timeframes sent");

            await new Promise(r => setTimeout(r, 300));
            window.boWebSocket.send(JSON.stringify({{
                command: "get_quotes_history",
                tool_id: "4",
                time_begin: tB.toString(),
                time_end: tE.toString(),
                time_size: "S1",
                source: "site",
                request_id: "4_500"
            }}));
            console.log("get_quotes_history sent", {{
                time_begin: new Date(tB * 1000).toISOString(),
                time_end: new Date(tE * 1000).toISOString(),
                duration_sec: tE - tB
            }});

            await new Promise(r => setTimeout(r, 300));
            window.boWebSocket.send(JSON.stringify({{ command: "hook_user_status", enable: "true" }}));
            console.log("hook_user_status sent");

            await new Promise(r => setTimeout(r, 300));
            window.boWebSocket.send(JSON.stringify({{
                command: "hook_options",
                source: "site",
                enable: "true",
                interval: "100"
            }}));
            console.log("hook_options sent");

            return true;
        }} else {{
            console.error("BO WebSocket failed to open within timeout. readyState:", window.boWebSocket.readyState);
            return false;
        }}
    }})();
    """
    try:
        result_eval = tab.Runtime.evaluate(expression=js_websocket_setup, awaitPromise=True)

        js_return_value = result_eval.get('result', {}).get('value')

        if js_return_value is True:
            log_message("BO: WebSocket接続スクリプトの実行とOPEN状態への待機に成功しました。")
            return True
        else:
            log_message("BO: WebSocket接続スクリプトの実行に失敗、またはOPEN状態になりませんでした。")
            if js_return_value is False:
                pass
            else:
                log_message(f"BO: JavaScriptからの予期せぬ戻り値: {js_return_value}")
            return False
    except Exception as e:
        log_message(f"BO: WebSocket接続スクリプトの実行中にエラー: {e}")
        return False


def send_bo_order(direction, price_open):
    """バイナリーオプションの注文をPC画面上のマウスクリックで送信する関数"""
    target_x = 0
    target_y = 0
    button_name = ""

    if direction == "PUT":
        target_x = PUT_BUTTON_COORD_X
        target_y = PUT_BUTTON_COORD_Y
        button_name = "PUT"
    elif direction == "CALL":
        target_x = CALL_BUTTON_COORD_X
        target_y = CALL_BUTTON_COORD_Y
        button_name = "CALL"
    else:
        log_message(f"BO: 不明な注文方向: {direction}")
        return False

    log_message(f"BO: {button_name} ボタンを物理座標 ({target_x}, {target_y}) でクリックします。現在の価格表示(参考): {price_open}")
    log_message(f"【超重要】ブラウザのウィンドウ位置、ズームレベル100%を確認し、実行中はPCを操作しないでください。")

    try:
        # pyautoguiで指定された座標をクリック
        pyautogui.click(x=target_x, y=target_y)
        log_message(f"★★★ BO: {button_name} ボタンの物理クリックイベント送信成功 (座標: {target_x}, {target_y}) ★★★")
        return True

    except pyautogui.FailSafeException:
        log_message(f"BO: エラー - マウスが画面隅に移動したため、pyautoguiがFail-Safeをトリガーしました。")
        log_message(f"BO: Fail-Safeは緊急停止機能です。プログラムを終了します。")
        sys.exit(1) # プログラムを強制終了
    except Exception as e:
        log_message(f"BO: エラー - {button_name} ボタンの物理クリック中にエラーが発生しました: {e}")
        return False



def get_price_from_mt5():
    tick = mt5.symbol_info_tick(MT5_SYMBOL)
    return tick.ask if tick else None

async def price_monitoring_and_trading_logic():
    """ブラウザ価格を監視し、乖離エントリーロジックを実行する"""
    global last_entry_time
    port = 9222
    try:
        response = requests.get(f'http://localhost:{port}/json')
        ws_debugger_url = next(item['webSocketDebuggerUrl'] for item in response.json() if 'webSocketDebuggerUrl' in item and item['type'] == 'page')
    except Exception as e:
        log_message(f"CDP: DevTools WebSocket URL 取得に失敗: {e}");
        log_message("CDP: プログラムを終了します。ブラウザが起動していないか、デバッグポートが利用できません。")
        sys.exit(1) # 強制終了

    log_message("CDP: DevTools WebSocket に接続試行中...")
    try:
        websocket_cdp = await websockets.connect(ws_debugger_url)
        await websocket_cdp.send(json.dumps({"id": 1, "method": "Network.enable"}))
        log_message("CDP: DevTools WebSocket に接続成功。価格差の監視を開始...")
    except Exception as e:
        log_message(f"CDP: DevTools WebSocket への接続に失敗しました: {e}")
        log_message("CDP: プログラムを終了します。")
        sys.exit(1) # 強制終了


    latest_browser_price = 0.0
    while True:
        try:
            message = await websocket_cdp.recv() # CDP WebSocketを使用
            data = json.loads(message)
            if data.get("method") != "Network.webSocketFrameReceived": continue

            # WebSocketフレームのペイロードから価格情報を抽出
            payload = data["params"]["response"].get("payloadData", "")
            if not payload.startswith("U,1,1,USD/JPY"): continue

            match = re.search(r"USD/JPY\|([\d.]+)\|", payload)
            if not match: continue

            browser_price = float(match.group(1))
            if latest_browser_price == browser_price: continue
            latest_browser_price = browser_price

            mt5_ask = get_price_from_mt5()
            if not mt5_ask: continue

            current_diff = mt5_ask - browser_price
            diff_history.append(current_diff)

            if len(diff_history) < AVERAGE_PERIOD:
                log_message(f"[データ収集中 {len(diff_history)}/{AVERAGE_PERIOD}] MT5: {mt5_ask} | Browser: {browser_price} | 差: {current_diff:+.5f}")
                continue

            average_diff = sum(diff_history) / len(diff_history)
            log_message(f"[監視中] MT5: {mt5_ask} | Browser: {browser_price} | 現在差: {current_diff:+.5f} | 平均差: {average_diff:+.5f}")

            # --- ▼▼▼ エントリー判定 ▼▼▼ ---

            if time.time() - last_entry_time < ENTRY_COOLDOWN_SECONDS:
                log_message(f"【エントリー対象外】クールダウン中 ({ENTRY_COOLDOWN_SECONDS - (time.time() - last_entry_time):.1f}秒残り) ✕")
                continue

            order_triggered = False
            order_direction = None
            order_price = None

            if current_diff > average_diff + ENTRY_THRESHOLD_PIPS:
                log_message(f"【PUT条件成立】現在差({current_diff:+.5f}) > 平均差({average_diff:+.5f}) + 閾値({ENTRY_THRESHOLD_PIPS}) ○")
                order_direction = "PUT"
                order_price = browser_price
                order_triggered = True
            elif current_diff < average_diff - ENTRY_THRESHOLD_PIPS:
                log_message(f"【CALL条件成立】現在差({current_diff:+.5f}) < 平均差({average_diff:+.5f}) - 閾値({ENTRY_THRESHOLD_PIPS}) ○")
                order_direction = "CALL"
                order_price = browser_price
                order_triggered = True
            else:
                log_message(f"【エントリー対象外】条件不成立 (現在差: {current_diff:+.5f}, 平均差: {average_diff:+.5f}, 閾値: {ENTRY_THRESHOLD_PIPS}) ✕")

            # 注文がトリガーされた場合の再試行ロジック
            if order_triggered:
                order_successful = False
                for attempt in range(MAX_ORDER_RETRY_ATTEMPTS):
                    log_message(f"BO: 注文試行中... ({attempt + 1}/{MAX_ORDER_RETRY_ATTEMPTS}回目)")
                    # pyautoguiによる物理クリックを試行
                    order_status = send_bo_order(order_direction, order_price)

                    if order_status is True: # クリックイベント送信が成功
                        order_successful = True
                        break # 再試行ループを抜ける
                    else: # クリックイベント送信が失敗した場合 (FailSafeなどで中断された場合も含む)
                        if attempt < MAX_ORDER_RETRY_ATTEMPTS - 1:
                            log_message(f"BO: 注文クリック失敗。{ORDER_RETRY_DELAY_SECONDS}秒後に再試行します。")
                            await asyncio.sleep(ORDER_RETRY_DELAY_SECONDS)
                        else:
                            log_message(f"BO: 注文クリック失敗。最大試行回数 ({MAX_ORDER_RETRY_ATTEMPTS}) に達しました。")

                if order_successful:
                    log_message(f"BO: 注文クリック処理が正常に完了しました。クールダウンを開始します。")
                    last_entry_time = time.time() # 注文成功時のみ最終エントリー時間を更新
                else:
                    log_message(f"BO: 注文の再試行がすべて失敗しました。次のティックで処理を継続します。")


        except websockets.exceptions.ConnectionClosed as e:
            log_message(f"CDP: DevTools WebSocket接続が閉じられました: {e}。これは致命的な問題です。")
            log_message("CDP: プログラムを終了します。ブラウザのデバッグ接続が失われました。")
            break
        except Exception as e:
            log_message(f"CDP: WebSocket 受信または処理中にエラー: {e}");
            await asyncio.sleep(1)
            continue

async def main():
    mt5_path = select_mt5_instance(find_mt5_instances())
    if not mt5_path: log_message("プログラムを終了します。"); return
    if not mt5.initialize(path=mt5_path):
        log_message(f"MT5: initialize() failed, error code = {mt5.last_error()}"); return
    log_message(f"MT5: 接続成功 (Path: {mt5_path}, Version: {mt5.version()})")

    log_message("BO: 初期WebSocket接続設定を開始します。")
    if not await setup_websocket_connection():
        log_message("BO: WebSocketの初期接続設定に失敗しました。プログラムを終了します。"); return
    log_message("BO: 初期WebSocket接続が正常に確立されました。")

    await price_monitoring_and_trading_logic()

if __name__ == "__main__":
    # pyautogui のFail-Safe機能を有効にする (デフォルトで有効ですが念のため)
    # マウスを画面の四隅のいずれかに移動させると、pyautoguiの制御が停止します。
    pyautogui.FAILSAFE = True
    pyautogui.PAUSE = 0.1 # 各pyautogui呼び出し間の0.1秒の一時停止を設定

    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        log_message("プログラムが中断されました。(KeyboardInterrupt)")
    finally:
        if mt5.terminal_info():
            mt5.shutdown(); log_message("MT5: 接続をシャットダウンしました。")

バイナリー業者向けのレイテンシー取引ツール(天国草ver)

以下のソースコードは、うみさんのレイテンシー取引ツールの元となったソースコードです。

    # ---------------------------------------------------------------------
    #                     ENTRY TIMER & ODDS FETCH (SELENIUM)
    # ---------------------------------------------------------------------

    def init_driver_with_iframe(self):
        """Attach to an existing Chrome (debug port 9230) and switch to iframe containing odds."""
        opts = Options()
        opts.debugger_address = "localhost:9230"  # assumes user started Chrome with --remote-debugging-port=9230
        try:
            self.driver = webdriver.Chrome(options=opts)
            print("✅ Connected to Chrome debugging session")
            self.find_and_switch_to_odds_iframe()
        except Exception as exc:
            print(f"❌ Could not connect to Chrome: {exc}")
            self.driver = None

    def find_and_switch_to_odds_iframe(self):
        """Loop through iframes until BUY_ODDS_CLASS element found."""
        if self.driver is None:
            return
        while True:
            try:
                self.driver.switch_to.default_content()
                iframes = self.driver.find_elements(By.TAG_NAME, "iframe")
                for idx, iframe in enumerate(iframes):
                    try:
                        self.driver.switch_to.default_content()
                        self.driver.switch_to.frame(iframe)
                        if self.driver.find_elements(By.CLASS_NAME, BUY_ODDS_CLASS):
                            #print(f"✅ Odds found in iframe[{idx}]")
                            return
                    except Exception:
                        continue
                time.sleep(0)
            except Exception as exc:
                print(f"frame scan error: {exc}")
                time.sleep(0)

    def get_latest_buy_odds(self):
        if not self.driver:
            return None
        try:
            elements = self.driver.find_elements(By.CLASS_NAME, BUY_ODDS_CLASS)
            for el in elements:
                txt = el.text.strip()
                if txt:
                    return float(txt)
            #print("⚠️ BUYオッズ取得できず → iframe再検索")
            self.find_and_switch_to_odds_iframe()
        except Exception:
            self.find_and_switch_to_odds_iframe()
        return None


    def get_latest_sell_odds(self):
        if not self.driver:
            return None
        try:
            el = self.driver.find_element(By.XPATH, SELL_XPATH)
            txt = el.text.strip()
            return float(txt) if txt else None
        except Exception:
            self.find_and_switch_to_odds_iframe()
            return None

    def start_entry_timer(self):
        """Starts 40‑second cycle timer logic one minute in the future."""
        if self.timer_active:
            return
        now = datetime.now()
        self.entry_start_time = (now + timedelta(minutes=1)).replace(second=0, microsecond=0)
        self.timer_active = True
        self.label_overlay(f"Entry timer will start at {self.entry_start_time.strftime('%H:%M:%S')}")
        threading.Thread(target=self._entry_timer_loop, daemon=True).start()

    def _entry_timer_loop(self):
        """Runs forever updating allow flags based on time and odds."""
        while self.timer_active:
            time.sleep(0)
            now = datetime.now()
            if self.entry_start_time is None:
                continue
            delta = (now - self.entry_start_time).total_seconds()
            if delta < 0:
                self.allow_buy_click = self.allow_sell_click = False
                self.label_overlay("⏳ Waiting for entry start …")
                continue

            # inside cycles 37is best
            time_in_cycle = delta % CYCLE_SECONDS
            if 37 <= time_in_cycle <= 40:  # active 4‑second window each cycle
                buy_odds = self.get_latest_buy_odds()
                sell_odds = self.get_latest_sell_odds()
                self.allow_buy_click = buy_odds is not None and buy_odds >= ODDS_THRESHOLD
                self.allow_sell_click = sell_odds is not None and sell_odds >= ODDS_THRESHOLD
                status = (
                    f"[{now.strftime('%H:%M:%S')}] Window 37‑40s  BUY: {buy_odds} {'✅' if self.allow_buy_click else '❌'} | "
                    f"SELL: {sell_odds} {'✅' if self.allow_sell_click else '❌'}"
                )
                self.label_overlay(status)
            else:
                self.allow_buy_click = self.allow_sell_click = False
                self.label_overlay(f"Wait cycle … {int(time_in_cycle)}s")

バイナリー業者向けのレイテンシー取引ツール(天国草ver2)

# Selenium ドライバ設定(port 9222)
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
driver = webdriver.Chrome(options=chrome_options)

# PyChrome 設定(port 9222 使用)
try:
    browser = pychrome.Browser(url="http://127.0.0.1:9222")
    tabs = browser.list_tab()
    if tabs:
        tab = tabs[0]
        tab.start()
        log_queue.put("PyChrome: 接続に成功しました (port 9222)")
    else:
        log_queue.put("PyChrome: 利用可能なタブが見つかりません")
        tab = None
except Exception as e:
    log_queue.put(f"PyChrome: 接続に失敗しました: {e}")
    tab = None

# WebSocket イベントハンドラ
def on_ws_created(**kwargs):
    log_queue.put(f"WS created: {kwargs}")

def on_ws_frame_sent(**kwargs):
    response = kwargs.get("response", {})
    payload = response.get("payloadData", "")
    if "livechat" in payload and "disconnect_timeout" in payload:
        log_queue.put("WS: livechat の disconnect_timeout を送信")

def on_ws_frame_received(**kwargs):
    pass

# PyChrome 初期化
if tab:
    try:
        tab.Network.enable()
        tab.Network.webSocketCreated = on_ws_created
        tab.Network.webSocketFrameSent = on_ws_frame_sent
        tab.Network.webSocketFrameReceived = on_ws_frame_received
        log_queue.put("PyChrome: Network 監視を開始")
    except Exception as e:
        log_queue.put(f"PyChrome: Network 設定に失敗しました: {e}")

# BO 注文送信関数(CALL / PUT)
def send_bo_order(direction, price_open):
    if not tab:
        log_queue.put("BO: PyChrome 未接続のため送信をスキップ")
        return

    try:
        # CALL(BUY)と PUT(SELL)で構造は同一(direction のみ差し替え)
        if direction == "CALL":
            order_data = {
                "command": "open_option",
                "plugin": "site",
                "sum": "5000",
                "tool_id": "4",
                "direction": direction,
                "price_open": str(price_open),
                "timeframe_id": "15",
                "option_kind": "1"
            }
        else:
            order_data = {
                "command": "open_option",
                "plugin": "site",
                "sum": "5000",
                "tool_id": "4",
                "direction": direction,
                "price_open": str(price_open),
                "timeframe_id": "15",
                "option_kind": "1"
            }

        # JavaScript 経由で WebSocket 送信
        js_code = f"""
        if (typeof window.boWebSocket !== 'undefined' && window.boWebSocket.readyState === WebSocket.OPEN) {{
            var orderData = {json.dumps(order_data)};
            window.boWebSocket.send(JSON.stringify(orderData));
            console.log('BO order sent:', orderData);
            true;
        }} else {{
            console.log('BO WebSocket connection is not available');
            false;
        }}
        """

        result = tab.Runtime.evaluate(expression=js_code)
        if result.get('result', {}).get('value'):
            log_queue.put(f"BO: 注文送信成功 - {direction} @ {price_open}")
            log_queue.put(f"BO: 注文詳細 - {order_data}")
        else:
            log_queue.put("BO: 注文送信失敗(WebSocket 未接続)")

    except Exception as e:
        log_queue.put(f"BO: 注文送信エラー: {e}")

# WebSocket 接続設定(BO 用): 履歴データ取得 + hook_user_status / hook_options
js_websocket_setup = """
try {
    if (typeof window.boWebSocket !== 'undefined') {
        window.boWebSocket.close();
    }

    window.boWebSocket = new WebSocket("wss://bo.zentrader.com:27017/");

    window.boWebSocket.onopen = () => {
        const now = Math.floor(Date.now() / 1000);
        const timeEnd = now;
        const timeBegin = now - 500;

        // 1) connect
        window.boWebSocket.send(JSON.stringify({
            command: "connect",
            bo: "demo",
            hash: "ここにログイン情報いれてください。",
            platform: "mt4",
            source: "site"
        }));
        console.log("connect sent");

        // 2) get_user_data
        setTimeout(() => {
            window.boWebSocket.send(JSON.stringify({ command: "get_user_data" }));
            console.log("get_user_data sent");
        }, 300);

        // 3) hook_time
        setTimeout(() => {
            window.boWebSocket.send(JSON.stringify({ command: "hook_time", enable: "true" }));
            console.log("hook_time sent");
        }, 600);

        // 4) get_cfg_trade
        setTimeout(() => {
            window.boWebSocket.send(JSON.stringify({ command: "get_cfg_trade", language: "ja" }));
            console.log("get_cfg_trade sent");
        }, 900);

        // 5) get_user_settings
        setTimeout(() => {
            window.boWebSocket.send(JSON.stringify({ command: "get_user_settings" }));
            console.log("get_user_settings sent");
        }, 1200);

        // 6) hook_timeframes
        setTimeout(() => {
            window.boWebSocket.send(JSON.stringify({
                command: "hook_timeframes",
                option_kind: "1",
                tool_id: "4",
                timeframe_id: "15",
                bo: "demo",
                enable: "true",
                interval: "1000",
                source: "site"
            }));
            console.log("hook_timeframes sent");
        }, 1500);

        // 7) get_quotes_history
        setTimeout(() => {
            window.boWebSocket.send(JSON.stringify({
                command: "get_quotes_history",
                tool_id: "4",
                time_begin: timeBegin.toString(),
                time_end: timeEnd.toString(),
                time_size: "S1",
                source: "site",
                request_id: "4_500"
            }));
            console.log("get_quotes_history sent", {
                time_begin: new Date(timeBegin * 1000).toISOString(),
                time_end: new Date(timeEnd * 1000).toISOString(),
                duration_sec: timeEnd - timeBegin
            });
        }, 1800);

        // 8) hook_user_status
        setTimeout(() => {
            window.boWebSocket.send(JSON.stringify({ command: "hook_user_status", enable: "true" }));
            console.log("hook_user_status sent");
        }, 2100);

        // 9) hook_options
        setTimeout(() => {
            window.boWebSocket.send(JSON.stringify({
                command: "hook_options",
                source: "site",
                enable: "true",
                interval: "100"
            }));
            console.log("hook_options sent");
        }, 2400);
    };

    window.boWebSocket.onmessage = (event) => {
        console.log("BO WS message:", event.data);
    };

    window.boWebSocket.onerror = (error) => {
        console.log("BO WS error:", error);
    };

    window.boWebSocket.onclose = (event) => {
        console.log("BO WS closed:", event.code, event.reason);
    };

    true;
} catch (error) {
    console.log("BO WS setup error:", error);
    false;
}
"""

# WebSocket でブラウザ価格を監視(port 9222)
def start_cdp_price_listener():
    async def listen_price_from_cdp():
        port = 9222
        try:
            response = requests.get(f'http://localhost:{port}/json')
            targets = response.json()
            ws_debugger_url = next(item['webSocketDebuggerUrl'] for item in targets if 'webSocketDebuggerUrl' in item)
        except Exception as e:
            log_queue.put(f"CDP: DevTools WebSocket URL 取得に失敗: {e}")
            return

        async with websockets.connect(ws_debugger_url) as websocket:
            await websocket.send(json.dumps({"id": 1, "method": "Network.enable"}))
            log_queue.put("CDP: DevTools WebSocket に接続。価格監視を開始 (port 9222)")

            while True:
                try:
                    message = await websocket.recv()
                    data = json.loads(message)

                    if data.get("method") == "Network.webSocketFrameReceived":
                        payload = data["params"]["response"].get("payloadData", "")
                        if payload.startswith("U,1,1,USDJPY.FXCM"):
                            match = re.search(r"USDJPY\\.FXCM\\|([\\d.]+)\\|", payload)
                            if match:
                                latest_price["ask"] = float(match.group(1))
                except Exception as e:
                    log_queue.put(f"CDP: WebSocket 受信エラー: {e}")
                    break

    asyncio.run(listen_price_from_cdp())

LLMでソースコードを改造しよう

ここで紹介したソースコードが自分に合わない場合、LLMで改造しましょう。ソースコードをコピペして、指示を出すことで、プログラムの知識がなくてもコーディングできます。(この手法はバイブコーディングと呼ばれる)

利用するLLMはGoogle Geminiがおすすめ。Googleアカウントを持っていれば無料で利用できます。APIの利用料金は従量課金制ですが、チャット形式なら料金は発生しません。

LLMは入力が雑だと出力も雑になってしまいます。そのため「稼げるbotを作って」と雑に指示を出すよりも、既存のソースコードを改造してもらうほうがいいです。

使用するプログラム言語はPythonにしましょう。Pythonは世界的に利用者が多く、ウェブ上に情報がたくさんあり、表現力にも優れているため、バイブコーディングでも良質なソースコードを出力してくれます。

逆にMQL4/MQL5はマイナーな言語で、ウェブ上でも情報が少ないため、LLMでソースコードを作ってもらってもコンパイルエラーが発生しやすくなります。

役に立った記事はSNSシェア!
  • URLをコピーしました!
  • URLをコピーしました!

海外FXの稼ぎ方

海外FXの効率的な稼ぎ方はこちら。

  1. XM スタンダード口座を開設し、3種ボーナスを獲得する
  2. 海外FXの稼ぎ方を体得する
  3. 取引コストの低いTradeview cTrader口座/ILC口座に乗り換える

海外FX初心者の運用口座には、XM スタンダード口座がおすすめ。口座の最大レバレッジが1000倍で、ゴールドの銘柄レバレッジも1000倍なため、ゴールドのハイリスクトレードが可能となります。

またXMは3種類のボーナスを提供しており、10万円の入金額を17万円ほどに増やせます。ボーナスは損失カバー機能があるので、ハイリスクなトレードに使いましょう。

海外FXの稼ぎ方は、億トレーダー(Xアカウント)のコピートレード、無料bot「Stop Grid Trader」による自動売買などがおすすめ。

裁量トレードがメインなら、運用口座はTradeview cTrader口座がおすすめ。高性能FXプラットフォーム「cTrader」に対応しており、リミット注文・ストップ注文を設置しやすくなります。

botによる自動売買をするなら、運用口座はTradeview ILC口座がおすすめ。スプレッドが非常に狭く、取引手数料も1ロット往復5ドルと最安値クラスなので、取引回数の多いトレードロジックで利益を出しやすくなります。

この記事を書いた人

海外FXの情報を備忘録としてまとめています。
運用は自己責任でお願いします。
Twitterで「海外FXの有益情報bot」も運用してます。

目次