FX/仮想通貨トレードの収益性を高めたいなら、Pythonスクリプトがおすすめ。トレードを自動化できるだけではなく、スワップポイント一覧や取引履歴を表示できるようになります。
またPythonはMQL4/MQL5よりも表現力が高く、ソースコードもウェブ上にたくさん存在するため、LLMで優秀なソースコードを出力しやすいです。
この記事では海外FX・仮想通貨・バイナリーオプション向けの無料ソースコードを紹介していきます。
海外FXの無料ソースコード
Stop Grid Trader(グリッド+ストラドル+ピラミッディングbot)
Stop Grid Traderは、グリッドトレード・ストラドル戦略・ピラミッディングトレードなどを行うPythonスクリプトです。(ソースコード引用元)
ソースコードはMQL4/MQL5ではなくPythonで書かれており、運用にはPythonの運用環境が必要です。(VS codeなど)
メインのトレードロジックはピラミッディングトレードなので、ボラティリティが激しいほど利益率が高くなります。基本的にはアメリカの大型経済指標の直前で運用しましょう。

"""
Stop-Grid Trader – version 3.1
------------------------------------------------------------
Mini-GUI bot for MetaTrader 5 that trades a symmetric
stop-grid around the current mid-price.
Key workflow (unchanged trading logic, polished GUI):
1. Select a running MT5 terminal.
2. Enter parameters, then press **Start**:
• Symbol name (incl. suffix)
• Price-digits to round to
• Base lot (even, ≥ 0.02)
• Orders per side
• Grid multiplier (= spread × factor)
• Loop count (#restarts after a full close)
3. The bot places alternating BUY_STOP / SELL_STOP orders
at ±multiplier·spread intervals, outermost stops include
a TP one grid-step farther out.
4. Every 0.02-lot fill is partially closed at +1 grid step
(0.01 lot). The remainder:
– SL is moved to break-even (both grid & BE-REV).
– A 0.02-lot reverse STOP is placed at the BE price.
– If the reverse position’s 0.01 lot remainder finds
price already beyond the initial mid-price, it is
closed instantly; otherwise TP = initial mid-price.
5. Once the current mid-price touches the outer TP level,
all pending orders are removed, every position is closed,
and (optionally) the grid restarts up to *loop count*.
"""
import tkinter as tk
from tkinter import ttk, messagebox
import threading, time, sys
import MetaTrader5 as mt5
try:
import psutil
except ImportError:
psutil = None
# ── default GUI values ──────────────────────────────────────────
DEF_SYMBOL = "XAUUSD"
DEF_DIGITS = 2
DEF_LOT = 0.02
DEF_ORDERS_SIDE = 10
DEF_MULTIPLIER = 2.0
DEF_LOOP = 0 # 0 ⇒ run once
# ── constants (rarely changed) ─────────────────────────────────
DEVIATION = 100
MAGIC_NUMBER = 0
GRID_TAG = "basic grid"
CHECK_INTERVAL = 1.0
# ───────────────────────────────────────────────────────────────
# ═════════════════════════ GUI HELPERS ═════════════════════════
def _discover_terminals() -> list[str]:
paths = []
if psutil:
for p in psutil.process_iter(attrs=["name", "exe"]):
if "terminal64.exe" in (p.info.get("name") or "").lower():
exe = p.info.get("exe") or ""
if exe and exe not in paths:
paths.append(exe)
return paths
def choose_terminal() -> str | None:
"""Modal: pick an MT5 terminal."""
root = tk.Tk(); root.withdraw()
win = tk.Toplevel(root); win.title("Select MT5 terminal"); win.grab_set()
cols = ("exe", "login", "server", "balance", "currency", "name")
tree = ttk.Treeview(win, columns=cols, show="headings", height=8)
for c, w in zip(cols, (340, 80, 170, 100, 70, 150)):
tree.heading(c, text=c); tree.column(c, width=w, anchor="w")
for exe in _discover_terminals():
mt5.initialize(path=exe)
acc, _ = mt5.account_info(), mt5.terminal_info()
if acc:
tree.insert(
"", tk.END,
values=(
exe, acc.login, acc.server,
f"{acc.balance:.2f}", acc.currency, acc.name
)
)
mt5.shutdown()
tree.grid(row=0, column=0, columnspan=2, padx=6, pady=6)
sel: dict[str, str | None] = {"path": None}
def _use() -> None:
if tree.selection():
sel["path"] = tree.item(tree.selection()[0], "values")[0]
win.destroy()
ttk.Button(win, text="Use", command=_use)\
.grid(row=1, column=1, pady=(0, 6), padx=6, sticky="e")
if tree.get_children():
tree.selection_set(tree.get_children()[0])
win.wait_window(); root.destroy()
return sel["path"]
class ParamDialog(tk.Toplevel):
"""Gather user parameters; returns None if canceled."""
def __init__(self, parent: tk.Tk):
super().__init__(parent)
self.title("Grid parameters"); self.grab_set()
self.res: tuple | None = None
rows = (
("Symbol", DEF_SYMBOL),
("Price digits", str(DEF_DIGITS)),
("Base lot", f"{DEF_LOT:.2f}"),
("Orders / side", str(DEF_ORDERS_SIDE)),
("Grid multiplier", str(DEF_MULTIPLIER)),
("Loop count", str(DEF_LOOP)),
)
self.vars: list[tk.StringVar] = []
for r, (label, default) in enumerate(rows):
ttk.Label(self, text=label).grid(row=r, column=0, sticky="w", padx=6, pady=4)
var = tk.StringVar(value=default); self.vars.append(var)
ttk.Entry(self, textvariable=var, width=15)\
.grid(row=r, column=1, sticky="w", padx=6, pady=4)
ttk.Button(self, text="Start", command=self._ok)\
.grid(row=len(rows), column=1, pady=8, sticky="e")
def _ok(self) -> None:
try:
sym = self.vars[0].get().strip()
digs = int(self.vars[1].get())
lot = float(self.vars[2].get())
nside = int(self.vars[3].get())
mult = float(self.vars[4].get())
loops = int(self.vars[5].get())
if digs < 0: raise ValueError("digits ≥ 0")
if lot < 0.02 or int(round(lot * 100)) % 2:
raise ValueError("lot must be even ×0.01 and ≥0.02")
if nside < 1: raise ValueError("orders / side ≥ 1")
if mult <= 0: raise ValueError("multiplier > 0")
if loops < 0: raise ValueError("loop count ≥ 0")
except Exception as err:
messagebox.showerror("Invalid input", str(err), parent=self)
return
self.res = (sym, digs, lot, nside, mult, loops)
self.destroy()
# ═════════════════════════ TRADER CLASS ════════════════════════
class StopGridTrader:
def __init__(
self,
terminal_path: str,
symbol: str,
digits: int,
base_lot: float,
orders_side: int,
multiplier: float,
loop_count: int
):
self.path = terminal_path
self.symbol = symbol
self.digits = digits
self.lot = base_lot
self.side = orders_side
self.mult = multiplier
self.loopN = loop_count
self.done = 0 # loops completed
# runtime
self.mid: float | None = None
self.step_pts: int | None = None
self.tp_high = self.tp_low = None
self.running = False
# status window
self.root = tk.Tk(); self.root.title("Stop-Grid Trader")
self.status = tk.StringVar(value="Initializing…")
ttk.Label(self.root, textvariable=self.status)\
.grid(padx=12, pady=10)
ttk.Button(self.root, text="Abort", command=self._abort)\
.grid(pady=(0, 10))
# ── MetaTrader 5 init ────────────────────────────────────
def _mt5_init(self) -> None:
if not mt5.initialize(path=self.path):
c, m = mt5.last_error(); raise RuntimeError(f"MT5 init: {c} {m}")
if not mt5.symbol_select(self.symbol, True):
raise RuntimeError(f"Cannot select symbol {self.symbol}")
# ── helpers ──────────────────────────────────────────────
def _norm_vol(self, vol: float) -> float:
info = mt5.symbol_info(self.symbol); step = info.volume_step or 0.01
return round(max(info.volume_min, min(vol, info.volume_max)) / step) * step
# place pending order
def _pend(
self, ord_type: int, price: float,
sl: float, tp: float = 0.0,
vol: float | None = None,
tag: str = GRID_TAG
) -> None:
if vol is None: vol = self.lot
mt5.order_send({
"action": mt5.TRADE_ACTION_PENDING,
"symbol": self.symbol,
"volume": self._norm_vol(vol),
"type": ord_type,
"price": price,
"sl": sl,
"tp": tp,
"deviation": DEVIATION,
"magic": MAGIC_NUMBER,
"comment": tag,
"type_time": mt5.ORDER_TIME_GTC,
})
# ── grid creation ────────────────────────────────────────
def _build_grid(self) -> None:
tick = mt5.symbol_info_tick(self.symbol); info = mt5.symbol_info(self.symbol)
self.mid = round((tick.bid + tick.ask) / 2, self.digits)
raw_spd_pts = int(round((tick.ask - tick.bid) / info.point))
self.step_pts = int(raw_spd_pts * self.mult)
pt = info.point
self.tp_high = self.tp_low = None
for i in range(1, self.side + 1):
buy = self.mid + i * self.step_pts * pt
sell = self.mid - i * self.step_pts * pt
if i == self.side: # outer layer
self.tp_high = buy + self.step_pts * pt
self.tp_low = sell - self.step_pts * pt
tp_b, tp_s = self.tp_high, self.tp_low
else:
tp_b = tp_s = 0.0
self._pend(mt5.ORDER_TYPE_BUY_STOP , buy , self.mid, tp=tp_b)
self._pend(mt5.ORDER_TYPE_SELL_STOP, sell, self.mid, tp=tp_s)
self.status.set(f"Grid ready (loop {self.done}/{self.loopN})")
# place reverse stop at break-even
def _place_be_rev(self, pos) -> None:
info = mt5.symbol_info(self.symbol); pt = info.point
be = round(pos.price_open, self.digits)
if pos.type == mt5.POSITION_TYPE_BUY:
otype, sl = mt5.ORDER_TYPE_SELL_STOP, be + self.step_pts * pt
else:
otype, sl = mt5.ORDER_TYPE_BUY_STOP, be - self.step_pts * pt
self._pend(otype, be, round(sl, self.digits), vol=self.lot, tag="BE-REV")
# after partial TP
def _handle_partial(self, pos) -> None:
tick = mt5.symbol_info_tick(self.symbol); bid, ask = tick.bid, tick.ask
half = self.lot / 2
be_price = round(pos.price_open, self.digits)
if pos.comment.startswith("BE-REV"):
beyond = (
pos.type == mt5.POSITION_TYPE_BUY and bid >= self.mid or
pos.type == mt5.POSITION_TYPE_SELL and ask <= self.mid
)
if beyond:
mt5.order_send({
"action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
"position": pos.ticket, "volume": half,
"type": mt5.ORDER_TYPE_SELL if pos.type == mt5.POSITION_TYPE_BUY
else mt5.ORDER_TYPE_BUY,
"price": bid if pos.type == mt5.POSITION_TYPE_BUY else ask,
"deviation": DEVIATION, "magic": MAGIC_NUMBER,
"comment": "mid-instant TP"
})
return
mt5.order_send({
"action": mt5.TRADE_ACTION_SLTP, "symbol": self.symbol,
"position": pos.ticket,
"sl": be_price, # BE SL
"tp": round(self.mid, self.digits),
"deviation": DEVIATION
})
else:
mt5.order_send({
"action": mt5.TRADE_ACTION_SLTP, "symbol": self.symbol,
"position": pos.ticket,
"sl": be_price,
"tp": 0.0,
"deviation": DEVIATION
})
self._place_be_rev(pos)
# ── monitoring loop ──────────────────────────────────────
def _monitor(self) -> None:
info = mt5.symbol_info(self.symbol); pt = info.point
half = self.lot / 2
while self.running:
time.sleep(CHECK_INTERVAL)
tick = mt5.symbol_info_tick(self.symbol)
mid_now = (tick.bid + tick.ask) / 2
# global exit condition
if (self.tp_high and mid_now >= self.tp_high) or \
(self.tp_low and mid_now <= self.tp_low):
self._full_close()
continue
# partial-TP check
for pos in mt5.positions_get(symbol=self.symbol) or []:
trg = (
pos.price_open + self.step_pts * pt
if pos.type == mt5.POSITION_TYPE_BUY
else pos.price_open - self.step_pts * pt
)
hit = (
pos.type == mt5.POSITION_TYPE_BUY and tick.bid >= trg or
pos.type == mt5.POSITION_TYPE_SELL and tick.ask <= trg
)
if hit and abs(pos.volume - self.lot) < 1e-6:
mt5.order_send({
"action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
"position": pos.ticket, "volume": half,
"type": mt5.ORDER_TYPE_SELL if pos.type == mt5.POSITION_TYPE_BUY
else mt5.ORDER_TYPE_BUY,
"price": tick.bid if pos.type == mt5.POSITION_TYPE_BUY else tick.ask,
"deviation": DEVIATION, "magic": MAGIC_NUMBER,
"comment": "partial TP"
})
time.sleep(0.3)
self._handle_partial(pos)
# ── cancel orders & close positions ──────────────────────
def _full_close(self) -> None:
# cancel pending
for o in mt5.orders_get(symbol=self.symbol) or []:
if hasattr(mt5, "order_delete"):
mt5.order_delete(o.ticket)
else:
mt5.order_send({
"action": mt5.TRADE_ACTION_REMOVE,
"order": o.ticket,
"symbol": o.symbol
})
# close all positions
tick = mt5.symbol_info_tick(self.symbol)
for p in mt5.positions_get(symbol=self.symbol) or []:
mt5.order_send({
"action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
"position": p.ticket, "volume": p.volume,
"type": mt5.ORDER_TYPE_SELL if p.type == mt5.POSITION_TYPE_BUY
else mt5.ORDER_TYPE_BUY,
"price": tick.bid if p.type == mt5.POSITION_TYPE_BUY else tick.ask,
"deviation": DEVIATION, "magic": MAGIC_NUMBER,
"comment": "grid exit"
})
self.done += 1
if self.done <= self.loopN:
self.status.set(f"Loop {self.done} finished – restarting…")
time.sleep(2)
self._build_grid()
else:
self.status.set("All loops done – exit")
self.running = False
self.root.after(800, self.root.quit)
# ── GUI: abort button ────────────────────────────────────
def _abort(self) -> None:
if messagebox.askyesno("Abort", "Stop trading and exit?", parent=self.root):
self.running = False
self._full_close()
# ── run bot ──────────────────────────────────────────────
def run(self) -> None:
self._mt5_init()
self._build_grid()
self.running = True
threading.Thread(target=self._monitor, daemon=True).start()
self.root.mainloop()
mt5.shutdown()
# ══════════════════════════ MAIN ══════════════════════════════
def main() -> None:
term = choose_terminal()
if not term:
sys.exit("No MT5 terminal selected – exiting.")
root = tk.Tk(); root.withdraw()
pd = ParamDialog(root); pd.wait_window(); root.destroy()
if pd.res is None:
sys.exit("Parameters dialog canceled – exiting.")
sym, digs, lot, nside, mult, loops = pd.res
StopGridTrader(
terminal_path = term,
symbol = sym,
digits = digs,
base_lot = lot,
orders_side = nside,
multiplier = mult,
loop_count = loops
).run()
if __name__ == "__main__":
main()Stop Grid Trader(予約注文が半減すると、再エントリー)
こちらも同じくStop Grid Traderのソースコードですが、予約注文が半減するとストップ注文を再設置するようになっています。(ソースコード引用元)

"""
Stop-Grid Trader – version 3.1
------------------------------------------------------------
Mini-GUI bot for MetaTrader 5 that trades a symmetric
stop-grid around the current mid-price.
Key workflow (unchanged trading logic, polished GUI):
1. Select a running MT5 terminal.
2. Enter parameters, then press **Start**:
• Symbol name (incl. suffix)
• Price-digits to round to
• Base lot (even, ≥ 0.02)
• Orders per side
• Grid multiplier (= spread × factor)
• Loop count (#restarts after a full close)
3. The bot places alternating BUY_STOP / SELL_STOP orders
at ±multiplier·spread intervals, outermost stops include
a TP one grid-step farther out.
4. Every 0.02-lot fill is partially closed at +1 grid step
(0.01 lot). The remainder:
– SL is moved to break-even (both grid & BE-REV).
– A 0.02-lot reverse STOP is placed at the BE price.
– If the reverse position’s 0.01 lot remainder finds
price already beyond the initial mid-price, it is
closed instantly; otherwise TP = initial mid-price.
5. Once the current mid-price touches the outer TP level,
all pending orders are removed, every position is closed,
and (optionally) the grid restarts up to *loop count*.
"""
import tkinter as tk
from tkinter import ttk, messagebox
import threading, time, sys
import MetaTrader5 as mt5
try:
import psutil
except ImportError:
psutil = None
# ── default GUI values ──────────────────────────────────────────
DEF_SYMBOL = "XAUUSD"
DEF_DIGITS = 2
DEF_LOT = 0.02
DEF_ORDERS_SIDE = 10
DEF_MULTIPLIER = 2.0
DEF_LOOP = 0 # 0 ⇒ run once
# ── constants (rarely changed) ─────────────────────────────────
DEVIATION = 100
MAGIC_NUMBER = 0
GRID_TAG = "basic grid"
CHECK_INTERVAL = 1.0
# ───────────────────────────────────────────────────────────────
# ═════════════════════════ GUI HELPERS ═════════════════════════
def _discover_terminals() -> list[str]:
paths = []
if psutil:
for p in psutil.process_iter(attrs=["name", "exe"]):
if "terminal64.exe" in (p.info.get("name") or "").lower():
exe = p.info.get("exe") or ""
if exe and exe not in paths:
paths.append(exe)
return paths
def choose_terminal() -> str | None:
"""Modal: pick an MT5 terminal."""
root = tk.Tk(); root.withdraw()
win = tk.Toplevel(root); win.title("Select MT5 terminal"); win.grab_set()
cols = ("exe", "login", "server", "balance", "currency", "name")
tree = ttk.Treeview(win, columns=cols, show="headings", height=8)
for c, w in zip(cols, (340, 80, 170, 100, 70, 150)):
tree.heading(c, text=c); tree.column(c, width=w, anchor="w")
for exe in _discover_terminals():
mt5.initialize(path=exe)
acc, _ = mt5.account_info(), mt5.terminal_info()
if acc:
tree.insert(
"", tk.END,
values=(
exe, acc.login, acc.server,
f"{acc.balance:.2f}", acc.currency, acc.name
)
)
mt5.shutdown()
tree.grid(row=0, column=0, columnspan=2, padx=6, pady=6)
sel: dict[str, str | None] = {"path": None}
def _use() -> None:
if tree.selection():
sel["path"] = tree.item(tree.selection()[0], "values")[0]
win.destroy()
ttk.Button(win, text="Use", command=_use)\
.grid(row=1, column=1, pady=(0, 6), padx=6, sticky="e")
if tree.get_children():
tree.selection_set(tree.get_children()[0])
win.wait_window(); root.destroy()
return sel["path"]
class ParamDialog(tk.Toplevel):
"""Gather user parameters; returns None if canceled."""
def __init__(self, parent: tk.Tk):
super().__init__(parent)
self.title("Grid parameters"); self.grab_set()
self.res: tuple | None = None
rows = (
("Symbol", DEF_SYMBOL),
("Price digits", str(DEF_DIGITS)),
("Base lot", f"{DEF_LOT:.2f}"),
("Orders / side", str(DEF_ORDERS_SIDE)),
("Grid multiplier", str(DEF_MULTIPLIER)),
("Loop count", str(DEF_LOOP)),
)
self.vars: list[tk.StringVar] = []
for r, (label, default) in enumerate(rows):
ttk.Label(self, text=label).grid(row=r, column=0, sticky="w", padx=6, pady=4)
var = tk.StringVar(value=default); self.vars.append(var)
ttk.Entry(self, textvariable=var, width=15)\
.grid(row=r, column=1, sticky="w", padx=6, pady=4)
ttk.Button(self, text="Start", command=self._ok)\
.grid(row=len(rows), column=1, pady=8, sticky="e")
def _ok(self) -> None:
try:
sym = self.vars[0].get().strip()
digs = int(self.vars[1].get())
lot = float(self.vars[2].get())
nside = int(self.vars[3].get())
mult = float(self.vars[4].get())
loops = int(self.vars[5].get())
if digs < 0: raise ValueError("digits ≥ 0")
if lot < 0.02 or int(round(lot * 100)) % 2:
raise ValueError("lot must be even ×0.01 and ≥0.02")
if nside < 1: raise ValueError("orders / side ≥ 1")
if mult <= 0: raise ValueError("multiplier > 0")
if loops < 0: raise ValueError("loop count ≥ 0")
except Exception as err:
messagebox.showerror("Invalid input", str(err), parent=self)
return
self.res = (sym, digs, lot, nside, mult, loops)
self.destroy()
# ═════════════════════════ TRADER CLASS ════════════════════════
class StopGridTrader:
def __init__(
self,
terminal_path: str,
symbol: str,
digits: int,
base_lot: float,
orders_side: int,
multiplier: float,
loop_count: int
):
self.path = terminal_path
self.symbol = symbol
self.digits = digits
self.lot = base_lot
self.side = orders_side
self.mult = multiplier
self.loopN = loop_count
self.done = 0 # loops completed
# runtime
self.mid: float | None = None
self.step_pts: int | None = None
self.tp_high = self.tp_low = None
self.running = False
# status window
self.root = tk.Tk(); self.root.title("Stop-Grid Trader")
self.status = tk.StringVar(value="Initializing…")
ttk.Label(self.root, textvariable=self.status)\
.grid(padx=12, pady=10)
ttk.Button(self.root, text="Abort", command=self._abort)\
.grid(pady=(0, 10))
# ── MetaTrader 5 init ────────────────────────────────────
def _mt5_init(self) -> None:
if not mt5.initialize(path=self.path):
c, m = mt5.last_error(); raise RuntimeError(f"MT5 init: {c} {m}")
if not mt5.symbol_select(self.symbol, True):
raise RuntimeError(f"Cannot select symbol {self.symbol}")
# ── helpers ──────────────────────────────────────────────
def _norm_vol(self, vol: float) -> float:
info = mt5.symbol_info(self.symbol); step = info.volume_step or 0.01
return round(max(info.volume_min, min(vol, info.volume_max)) / step) * step
# place pending order
def _pend(
self, ord_type: int, price: float,
sl: float, tp: float = 0.0,
vol: float | None = None,
tag: str = GRID_TAG
) -> None:
if vol is None: vol = self.lot
mt5.order_send({
"action": mt5.TRADE_ACTION_PENDING,
"symbol": self.symbol,
"volume": self._norm_vol(vol),
"type": ord_type,
"price": price,
"sl": sl,
"tp": tp,
"deviation": DEVIATION,
"magic": MAGIC_NUMBER,
"comment": tag,
"type_time": mt5.ORDER_TIME_GTC,
})
# ── grid creation ────────────────────────────────────────
def _build_grid(self) -> None:
tick = mt5.symbol_info_tick(self.symbol); info = mt5.symbol_info(self.symbol)
self.mid = round((tick.bid + tick.ask) / 2, self.digits)
raw_spd_pts = int(round((tick.ask - tick.bid) / info.point))
self.step_pts = int(raw_spd_pts * self.mult)
pt = info.point
self.tp_high = self.tp_low = None
for i in range(1, self.side + 1):
buy = self.mid + i * self.step_pts * pt
sell = self.mid - i * self.step_pts * pt
if i == self.side: # outer layer
self.tp_high = buy + self.step_pts * pt
self.tp_low = sell - self.step_pts * pt
tp_b, tp_s = self.tp_high, self.tp_low
else:
tp_b = tp_s = 0.0
self._pend(mt5.ORDER_TYPE_BUY_STOP , buy , self.mid, tp=tp_b)
self._pend(mt5.ORDER_TYPE_SELL_STOP, sell, self.mid, tp=tp_s)
self.status.set(f"Grid ready (loop {self.done}/{self.loopN})")
# place reverse stop at break-even
def _place_be_rev(self, pos) -> None:
info = mt5.symbol_info(self.symbol); pt = info.point
be = round(pos.price_open, self.digits)
if pos.type == mt5.POSITION_TYPE_BUY:
otype, sl = mt5.ORDER_TYPE_SELL_STOP, be + self.step_pts * pt
else:
otype, sl = mt5.ORDER_TYPE_BUY_STOP, be - self.step_pts * pt
self._pend(otype, be, round(sl, self.digits), vol=self.lot, tag="BE-REV")
# after partial TP
def _handle_partial(self, pos) -> None:
tick = mt5.symbol_info_tick(self.symbol); bid, ask = tick.bid, tick.ask
half = self.lot / 2
be_price = round(pos.price_open, self.digits)
if pos.comment.startswith("BE-REV"):
beyond = (
pos.type == mt5.POSITION_TYPE_BUY and bid >= self.mid or
pos.type == mt5.POSITION_TYPE_SELL and ask <= self.mid
)
if beyond:
mt5.order_send({
"action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
"position": pos.ticket, "volume": half,
"type": mt5.ORDER_TYPE_SELL if pos.type == mt5.POSITION_TYPE_BUY
else mt5.ORDER_TYPE_BUY,
"price": bid if pos.type == mt5.POSITION_TYPE_BUY else ask,
"deviation": DEVIATION, "magic": MAGIC_NUMBER,
"comment": "mid-instant TP"
})
return
mt5.order_send({
"action": mt5.TRADE_ACTION_SLTP, "symbol": self.symbol,
"position": pos.ticket,
"sl": be_price, # BE SL
"tp": round(self.mid, self.digits),
"deviation": DEVIATION
})
else:
mt5.order_send({
"action": mt5.TRADE_ACTION_SLTP, "symbol": self.symbol,
"position": pos.ticket,
"sl": be_price,
"tp": 0.0,
"deviation": DEVIATION
})
self._place_be_rev(pos)
# ── monitoring loop ──────────────────────────────────────
def _monitor(self) -> None:
info = mt5.symbol_info(self.symbol); pt = info.point
half = self.lot / 2
while self.running:
time.sleep(CHECK_INTERVAL)
tick = mt5.symbol_info_tick(self.symbol)
mid_now = (tick.bid + tick.ask) / 2
# ===== Measures against range-bound markets (CODE ADDED HERE) =====
if not mt5.positions_total():
pendings = [o for o in (mt5.orders_get(
symbol=self.symbol) or []) if o.comment.startswith(GRID_TAG)]
executed = self.side * 2 - len(pendings)
if executed >= self.side // 2 and pendings:
self.status.set("Half grid consumed - restarting...")
for o in pendings:
if hasattr(mt5, "order_delete"):
mt5.order_delete(o.ticket)
else:
mt5.order_send({
"action": mt5.TRADE_ACTION_REMOVE,
"order": o.ticket,
"symbol": o.symbol
})
time.sleep(1)
self._build_grid()
continue
# ====================================================================
# global exit condition
if (self.tp_high and mid_now >= self.tp_high) or \
(self.tp_low and mid_now <= self.tp_low):
self._full_close()
continue
# partial-TP check
for pos in mt5.positions_get(symbol=self.symbol) or []:
trg = (
pos.price_open + self.step_pts * pt
if pos.type == mt5.POSITION_TYPE_BUY
else pos.price_open - self.step_pts * pt
)
hit = (
pos.type == mt5.POSITION_TYPE_BUY and tick.bid >= trg or
pos.type == mt5.POSITION_TYPE_SELL and tick.ask <= trg
)
if hit and abs(pos.volume - self.lot) < 1e-6:
mt5.order_send({
"action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
"position": pos.ticket, "volume": half,
"type": mt5.ORDER_TYPE_SELL if pos.type == mt5.POSITION_TYPE_BUY
else mt5.ORDER_TYPE_BUY,
"price": tick.bid if pos.type == mt5.POSITION_TYPE_BUY else tick.ask,
"deviation": DEVIATION, "magic": MAGIC_NUMBER,
"comment": "partial TP"
})
time.sleep(0.3)
self._handle_partial(pos)
# ── cancel orders & close positions ──────────────────────
def _full_close(self) -> None:
# cancel pending
for o in mt5.orders_get(symbol=self.symbol) or []:
if hasattr(mt5, "order_delete"):
mt5.order_delete(o.ticket)
else:
mt5.order_send({
"action": mt5.TRADE_ACTION_REMOVE,
"order": o.ticket,
"symbol": o.symbol
})
# close all positions
tick = mt5.symbol_info_tick(self.symbol)
for p in mt5.positions_get(symbol=self.symbol) or []:
mt5.order_send({
"action": mt5.TRADE_ACTION_DEAL, "symbol": self.symbol,
"position": p.ticket, "volume": p.volume,
"type": mt5.ORDER_TYPE_SELL if p.type == mt5.POSITION_TYPE_BUY
else mt5.ORDER_TYPE_BUY,
"price": tick.bid if p.type == mt5.POSITION_TYPE_BUY else tick.ask,
"deviation": DEVIATION, "magic": MAGIC_NUMBER,
"comment": "grid exit"
})
self.done += 1
if self.done <= self.loopN:
self.status.set(f"Loop {self.done} finished – restarting…")
time.sleep(2)
self._build_grid()
else:
self.status.set("All loops done – exit")
self.running = False
self.root.after(800, self.root.quit)
# ── GUI: abort button ────────────────────────────────────
def _abort(self) -> None:
if messagebox.askyesno("Abort", "Stop trading and exit?", parent=self.root):
self.running = False
self._full_close()
# ── run bot ──────────────────────────────────────────────
def run(self) -> None:
self._mt5_init()
self._build_grid()
self.running = True
threading.Thread(target=self._monitor, daemon=True).start()
self.root.mainloop()
mt5.shutdown()
# ══════════════════════════ MAIN ══════════════════════════════
def main() -> None:
term = choose_terminal()
if not term:
sys.exit("No MT5 terminal selected – exiting.")
root = tk.Tk(); root.withdraw()
pd = ParamDialog(root); pd.wait_window(); root.destroy()
if pd.res is None:
sys.exit("Parameters dialog canceled – exiting.")
sym, digs, lot, nside, mult, loops = pd.res
StopGridTrader(
terminal_path = term,
symbol = sym,
digits = digs,
base_lot = lot,
orders_side = nside,
multiplier = mult,
loop_count = loops
).run()
if __name__ == "__main__":
main()スワップ計測ツール(うみver)
以下のソースコードでは、MT5のスワップポイントを一覧表示できます。スワップアービトラージで通貨ペアを探すときに重宝するでしょう。
import MetaTrader5 as mt5
import time
from datetime import datetime
import sys
import tkinter as tk
from tkinter import ttk, messagebox
import threading
import traceback
# 実行中のMT5プロセスを検出するためにpsutilライブラリを使用します
try:
import psutil
except ImportError:
psutil = None
# tksheetライブラリをインポート
# 事前に `pip install tksheet` を実行してください
try:
import tksheet
except ImportError:
messagebox.showerror(
"ライブラリ不足",
"tksheetライブラリが見つかりません。\nコマンドプロンプトで `pip install tksheet` を実行してインストールしてください。"
)
sys.exit(1)
# --- MT5ターミナル選択部分は変更なし ---
def _discover_terminals() -> list[str]:
paths = []
if psutil:
for p in psutil.process_iter(attrs=["name", "exe"]):
if "terminal64.exe" in (p.info.get("name") or "").lower():
exe_path = p.info.get("exe")
if exe_path and exe_path not in paths:
paths.append(exe_path)
return paths
def choose_terminal() -> str | None:
if not psutil:
messagebox.showerror(
"ライブラリ不足",
"psutilライブラリが見つかりません。\nコマンドプロンプトで `pip install psutil` を実行してインストールしてください。"
)
return None
root = tk.Tk()
root.withdraw()
win = tk.Toplevel(root)
win.title("接続するMT5ターミナルを選択")
win.grab_set()
message_label = None
def on_close():
win.destroy()
root.destroy()
win.protocol("WM_DELETE_WINDOW", on_close)
def populate_tree():
nonlocal message_label
if message_label and message_label.winfo_exists():
message_label.destroy()
message_label = None
for i in tree.get_children():
tree.delete(i)
found_terminals = _discover_terminals()
if not found_terminals:
tree.grid_remove()
message_label = ttk.Label(win, text="実行中のMT5ターミナルが見つかりません。\nMT5を起動後、「再読込」を押してください。")
message_label.grid(row=0, column=0, columnspan=3, padx=10, pady=10)
use_button.config(state="disabled")
return
tree.grid()
is_data_found = False
for exe in found_terminals:
try:
if mt5.initialize(path=exe):
acc = mt5.account_info()
if acc:
tree.insert("", tk.END, values=(exe, acc.login, acc.server, f"{acc.balance:.2f} {acc.currency}", acc.name))
is_data_found = True
mt5.shutdown()
except Exception as e:
print(f"ターミナルへの接続エラー {exe}: {e}")
if not is_data_found:
tree.grid_remove()
message_label = ttk.Label(win, text="MT5アカウント情報が取得できませんでした。\nログイン状態を確認してください。")
message_label.grid(row=0, column=0, columnspan=3, padx=10, pady=10)
use_button.config(state="disabled")
else:
if tree.get_children():
tree.selection_set(tree.get_children()[0])
use_button.config(state="normal")
cols = ("exe", "login", "server", "balance", "name")
tree = ttk.Treeview(win, columns=cols, show="headings", height=8)
col_widths = {"exe": 340, "login": 80, "server": 170, "balance": 120, "name": 150}
for c in cols:
tree.heading(c, text=c.capitalize())
tree.column(c, width=col_widths[c], anchor="w", stretch=tk.FALSE)
tree.grid(row=0, column=0, columnspan=3, padx=6, pady=6)
selected_path: dict[str, str | None] = {"path": None}
def _use_selection() -> None:
if tree.selection():
selected_path["path"] = tree.item(tree.selection()[0], "values")[0]
win.destroy()
button_frame = ttk.Frame(win)
button_frame.grid(row=1, column=0, columnspan=3, pady=(0, 6), padx=6, sticky="e")
ttk.Button(button_frame, text="再読込", command=populate_tree).pack(side="left", padx=(0, 5))
use_button = ttk.Button(button_frame, text="このターミナルを使用", command=_use_selection)
use_button.pack(side="left")
populate_tree()
win.wait_window()
root.destroy()
return selected_path["path"]
class SwapViewerApp:
def __init__(self, terminal_path: str):
self.path = terminal_path
self.root = tk.Tk()
self.root.title("MT5 スワップチェッカー")
self.root.geometry("1450x600")
self.timer_id = None
self.current_data = []
self.last_sort_col = "swap_both_1d_jpy"
self.last_sort_reverse = True
if not self._connect_mt5():
self.root.destroy()
return
self._setup_ui()
self.update_data()
self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
def _connect_mt5(self) -> bool:
if not mt5.initialize(path=self.path):
messagebox.showerror("MT5接続エラー", f"MT5の初期化に失敗しました: {mt5.last_error()}")
return False
self.account_info = mt5.account_info()
if not self.account_info:
messagebox.showerror("MT5接続エラー", f"アカウント情報の取得に失敗しました: {mt5.last_error()}")
mt5.shutdown()
return False
if self.account_info.currency != "JPY":
messagebox.showwarning("口座通貨の確認", f"このツールの円換算機能は、口座通貨がJPYの場合に最適化されています。\n現在の口座通貨: {self.account_info.currency}")
return True
def _setup_ui(self):
main_frame = ttk.Frame(self.root, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True)
top_frame = ttk.Frame(main_frame)
top_frame.pack(fill=tk.X, pady=(0, 5))
conn_text = f"接続先: {self.account_info.server} ({self.account_info.login})"
ttk.Label(top_frame, text=conn_text, font=("Meiryo", 10)).pack(side=tk.LEFT)
self.last_updated_var = tk.StringVar(value="最終更新: ----/--/-- --:--:--")
ttk.Label(top_frame, textvariable=self.last_updated_var).pack(side=tk.RIGHT)
sheet_frame = ttk.Frame(main_frame)
sheet_frame.pack(fill=tk.BOTH, expand=True)
self.columns = {
"symbol": ("通貨ペア", 110, "w"),
"swap_long_pts": ("買いスワップ(Pts/%)", 150, "e"), "swap_short_pts": ("売りスワップ(Pts/%)", 150, "e"),
"swap_long_jpy": ("1Lot損益(買/円)", 140, "e"), "swap_short_jpy": ("1Lot損益(売/円)", 140, "e"),
"swap_both_1d_jpy": ("1日両建て損益(円)", 160, "e"), "rollover_day": ("3倍デー適用曜日", 130, "center"),
"swap_long_3d": ("3倍デー(買/円)", 140, "e"), "swap_short_3d": ("3倍デー(売/円)", 140, "e"),
"swap_both_3d_jpy": ("3倍デー両建て損益(円)", 180, "e"),
}
self.sheet = tksheet.Sheet(sheet_frame,
headers=[h for h, w, a in self.columns.values()],
show_toolbar=False,
show_top_left=False)
self.sheet.pack(fill=tk.BOTH, expand=True)
self.sheet.enable_bindings("single_select", "drag_select", "ctrl_select", "arrowkeys", "right_click_popup_menu", "rc_select")
for i, (col_id, (header, width, anchor)) in enumerate(self.columns.items()):
align = "w" if anchor == "w" else "e" if anchor == "e" else "center"
self.sheet.column_width(column=i, width=width)
self.sheet.align_columns(columns=i, align=align)
# ヘッダークリックのイベント登録を削除
# --- 下部コントロールパネル ---
bottom_frame = ttk.Frame(main_frame)
bottom_frame.pack(fill=tk.X, pady=(5, 0))
# 更新ボタン
self.update_button = ttk.Button(bottom_frame, text="手動更新", command=self.update_data)
self.update_button.pack(side=tk.LEFT, padx=(0, 10))
# 手動ソート機能のUIを追加
sort_frame = ttk.Frame(bottom_frame)
sort_frame.pack(side=tk.LEFT)
ttk.Label(sort_frame, text="並べ替え:").pack(side=tk.LEFT, padx=(0, 5))
# 表示名と内部IDの対応辞書を作成
self.col_display_to_id = {v[0]: k for k, v in self.columns.items()}
# ソート列選択のプルダウンメニュー
self.sort_combo_var = tk.StringVar()
self.sort_combo = ttk.Combobox(sort_frame, textvariable=self.sort_combo_var,
values=[v[0] for v in self.columns.values()],
state="readonly", width=25)
self.sort_combo.set(self.columns[self.last_sort_col][0]) # 初期値を設定
self.sort_combo.pack(side=tk.LEFT, padx=(0, 5))
self.sort_combo.bind("<<ComboboxSelected>>", self._on_sort_combo_select)
# 降順/昇順 切り替えボタン
self.sort_order_button = ttk.Button(sort_frame, text="降順", command=self._on_sort_order_toggle, width=6)
self.sort_order_button.pack(side=tk.LEFT)
# ステータス表示
self.status_var = tk.StringVar(value="準備完了")
ttk.Label(bottom_frame, textvariable=self.status_var).pack(side=tk.LEFT, padx=20)
# プルダウンメニューが選択された時の処理
def _on_sort_combo_select(self, event=None):
selected_display_name = self.sort_combo_var.get()
new_sort_col = self.col_display_to_id[selected_display_name]
self._sort_column(new_sort_col, keep_direction=True) # 並び順は維持したまま列を切り替え
# 降順/昇順ボタンが押された時の処理
def _on_sort_order_toggle(self, event=None):
# 昇順/降順を反転させる
self.last_sort_reverse = not self.last_sort_reverse
# 現在選択されている列で再ソートを実行
self._sort_column(self.last_sort_col, keep_direction=True)
def _fetch_data_thread(self):
try:
all_symbols = mt5.symbols_get()
if not all_symbols:
self.root.after(0, self._update_gui_with_error, "エラー: 通貨ペアリストの取得に失敗。")
return
visible_symbols = [s for s in all_symbols if s.visible]
if not visible_symbols:
self.root.after(0, self._update_gui_with_error, "エラー: MT5の「気配値表示」に通貨ペアを追加してください。")
return
all_data = []
weekdays = ["日", "月", "火", "水", "木", "金", "土"]
for info in visible_symbols:
tick = mt5.symbol_info_tick(info.name)
if not tick or tick.bid == 0 or tick.ask == 0:
continue
swap_long_jpy = 0.0
swap_short_jpy = 0.0
if info.swap_mode == mt5.SYMBOL_SWAP_MODE_POINTS:
tick_value = info.trade_tick_value
swap_long_jpy = info.swap_long * tick_value
swap_short_jpy = info.swap_short * tick_value
elif info.swap_mode in [3, 5]:
swap_long_profit_ccy = 1.0 * info.trade_contract_size * tick.ask * (info.swap_long / 100) / 360
swap_short_profit_ccy = 1.0 * info.trade_contract_size * tick.bid * (info.swap_short / 100) / 360
if info.currency_profit != self.account_info.currency:
conversion_pair = info.currency_profit + self.account_info.currency
conversion_tick = mt5.symbol_info_tick(conversion_pair)
if conversion_tick and conversion_tick.bid > 0:
swap_long_jpy = swap_long_profit_ccy * conversion_tick.bid
swap_short_jpy = swap_short_profit_ccy * conversion_tick.bid
else:
continue
else:
swap_long_jpy = swap_long_profit_ccy
swap_short_jpy = swap_short_profit_ccy
else:
continue
all_data.append({
"symbol": info.name,
"swap_long_pts": info.swap_long, "swap_short_pts": info.swap_short,
"swap_long_jpy": swap_long_jpy, "swap_short_jpy": swap_short_jpy,
"swap_both_1d_jpy": swap_long_jpy + swap_short_jpy,
"rollover_day": weekdays[info.swap_rollover3days],
"swap_long_3d": swap_long_jpy * 3, "swap_short_3d": swap_short_jpy * 3,
"swap_both_3d_jpy": (swap_long_jpy + swap_short_jpy) * 3,
"swap_mode": info.swap_mode
})
self.root.after(0, self._update_gui, all_data)
except Exception:
traceback.print_exc()
self.root.after(0, self._update_gui_with_error, "エラーが発生。詳細はコンソールを確認。")
def _format_value(self, value, format_spec, is_jpy=False):
unit = " 円" if is_jpy else ""
if value == 0: return f"{value:{format_spec}}{unit}"
return f"{value:+{format_spec}}{unit}"
def _format_swap_points(self, value, mode):
if mode in [3, 5]:
return f"{value:+.3f} %"
return f"{value:+.3f}"
def _update_gui(self, data_list):
self.current_data = data_list
self._sort_column(self.last_sort_col, keep_direction=True)
self.last_updated_var.set(f"最終更新: {datetime.now().strftime('%Y/%m/%d %H:%M:%S')}")
self.status_var.set(f"データ取得完了。{len(data_list)}件の通貨ペアを表示中。")
self.update_button.config(state="normal")
def _redraw_view(self):
if not self.current_data:
self.sheet.set_sheet_data(data=[[]])
return
formatted_data = []
for data in self.current_data:
row = [
data["symbol"],
self._format_swap_points(data["swap_long_pts"], data["swap_mode"]),
self._format_swap_points(data["swap_short_pts"], data["swap_mode"]),
self._format_value(data["swap_long_jpy"], ",.2f", True),
self._format_value(data["swap_short_jpy"], ",.2f", True),
self._format_value(data["swap_both_1d_jpy"], ",.2f", True),
data["rollover_day"],
self._format_value(data["swap_long_3d"], ",.2f", True),
self._format_value(data["swap_short_3d"], ",.2f", True),
self._format_value(data["swap_both_3d_jpy"], ",.2f", True)
]
formatted_data.append(row)
self.sheet.set_sheet_data(data=formatted_data, redraw=False)
color_target_cols = [
"swap_long_pts", "swap_short_pts", "swap_long_jpy", "swap_short_jpy",
"swap_both_1d_jpy", "swap_long_3d", "swap_short_3d", "swap_both_3d_jpy"
]
col_id_to_index = {col_id: i for i, col_id in enumerate(self.columns.keys())}
for r, data_row in enumerate(self.current_data):
for col_id in color_target_cols:
c = col_id_to_index[col_id]
value = data_row[col_id]
color = ""
if value > 0:
color = "blue"
elif value < 0:
color = "red"
if color:
self.sheet.highlight_cells(row=r, column=c, fg=color)
self.sheet.redraw()
def _sort_column(self, col, keep_direction=False):
if not self.current_data:
return
if not keep_direction:
# この関数が直接呼ばれることはなくなったが念のため残す
if self.last_sort_col == col:
self.last_sort_reverse = not self.last_sort_reverse
else:
self.last_sort_reverse = True
self.last_sort_col = col
is_numeric = col not in ["symbol", "rollover_day"]
key_func = (lambda d: d.get(col, 0)) if is_numeric else (lambda d: str(d.get(col, '')))
self.current_data.sort(key=key_func, reverse=self.last_sort_reverse)
# ソートボタンのテキストを現在の状態に合わせて更新
self.sort_order_button.config(text="降順" if self.last_sort_reverse else "昇順")
self._redraw_view()
def _update_gui_with_error(self, message):
self.sheet.set_sheet_data(data=[[]])
self.status_var.set(message)
self.update_button.config(state="normal")
def update_data(self):
self.status_var.set("データを取得中...")
self.update_button.config(state="disabled")
thread = threading.Thread(target=self._fetch_data_thread, daemon=True)
thread.start()
if self.timer_id: self.root.after_cancel(self.timer_id)
self.timer_id = self.root.after(3600000, self.update_data)
def _on_closing(self):
if messagebox.askokcancel("終了確認", "アプリケーションを終了しますか?"):
if self.timer_id: self.root.after_cancel(self.timer_id)
mt5.shutdown()
self.root.destroy()
print("MT5との接続をシャットダウンしました。")
def run(self):
self.root.mainloop()
def main():
terminal_path = choose_terminal()
if not terminal_path:
print("MT5ターミナルが選択されませんでした。プログラムを終了します。")
sys.exit(0)
app = SwapViewerApp(terminal_path)
app.run()
if __name__ == "__main__":
main()トレード履歴表示ツール(うみver)
以下のソースコードでは、MT5の取引履歴を時間単位で細かく参照できます。(ソースコード引用元)
bot/EAを24時間運用し、利益率の高い時間帯を見つけるのに便利です。
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import atexit
import sys
from datetime import datetime, timedelta
import threading
import queue
import math
from collections import defaultdict
import MetaTrader5 as mt5
import pytz
# tkcalendarが利用可能かチェック
try:
from tkcalendar import DateEntry
except ImportError:
messagebox.showerror(
"ライブラリ不足",
"tkcalendarライブラリが見つかりません。\nコマンドプロンプトで `pip install tkcalendar` を実行してインストールしてください。"
)
sys.exit(1)
# Excelエクスポート機能に必要なライブラリをチェック
try:
import pandas as pd
except ImportError:
messagebox.showerror(
"ライブラリ不足",
"Excelエクスポート機能に必要なライブラリが見つかりません。\nコマンドプロンプトで `pip install pandas openpyxl` を実行してください。"
)
sys.exit(1)
try:
import psutil
except ImportError:
psutil = None
# ═════════════════════════ GUI HELPERS (変更なし) ═════════════════════════
def _discover_terminals() -> list[str]:
"""実行中のMT5ターミナル(terminal64.exe)のパスを検出する。"""
paths = []
if psutil:
for p in psutil.process_iter(attrs=["name", "exe"]):
if "terminal64.exe" in (p.info.get("name") or "").lower():
exe = p.info.get("exe") or ""
if exe and exe not in paths:
paths.append(exe)
return paths
def choose_terminal() -> str | None:
"""モーダルウィンドウでMT5ターミナルを選択させる。"""
root = tk.Tk()
root.withdraw()
win = tk.Toplevel(root)
win.title("Select MT5 terminal")
win.grab_set()
cols = ("exe", "login", "server", "balance", "currency", "name")
tree = ttk.Treeview(win, columns=cols, show="headings", height=8)
for c, w in zip(cols, (340, 80, 170, 100, 70, 150)):
tree.heading(c, text=c)
tree.column(c, width=w, anchor="w", stretch=tk.FALSE) # stretchを追加
for exe in _discover_terminals():
try:
if mt5.initialize(path=exe):
acc = mt5.account_info()
term = mt5.terminal_info()
if acc and term:
tree.insert("", tk.END, values=(exe, acc.login, acc.server, f"{acc.balance:.2f}", acc.currency, acc.name))
mt5.shutdown()
except Exception as e:
print(f"Error connecting to {exe}: {e}")
tree.grid(row=0, column=0, columnspan=2, padx=6, pady=6)
sel: dict[str, str | None] = {"path": None}
def _use() -> None:
if tree.selection():
sel["path"] = tree.item(tree.selection()[0], "values")[0]
win.destroy()
ttk.Button(win, text="Use", command=_use).grid(row=1, column=1, pady=(0, 6), padx=6, sticky="e")
if tree.get_children():
tree.selection_set(tree.get_children()[0])
win.wait_window()
root.destroy()
return sel["path"]
# ═══════════════════ RESIZE MANAGER CLASS (変更なし) ══════════════════════
class ResizeManager:
def __init__(self, root, tree_widget, tree_parent_frame):
self.root = root
self.tree = tree_widget
self.tree_parent = tree_parent_frame
self.timer_id = None
self.placeholder = ttk.Frame(self.tree_parent)
self.placeholder.grid(row=0, column=0, sticky="nsew")
self.placeholder.grid_remove()
self.last_width = self.root.winfo_width()
self.last_height = self.root.winfo_height()
self.root.bind('<Configure>', self._on_resize)
def _on_resize(self, event):
if event.widget != self.root or \
(self.last_width == event.width and self.last_height == event.height):
return
self.last_width = event.width
self.last_height = event.height
if not self.tree.get_children():
return
if self.timer_id:
self.root.after_cancel(self.timer_id)
if self.tree.winfo_viewable():
self.tree.grid_remove()
self.placeholder.grid()
self.timer_id = self.root.after(250, self._redraw_tree)
def _redraw_tree(self):
if self.placeholder.winfo_viewable():
self.placeholder.grid_remove()
self.tree.grid()
self.timer_id = None
# ═════════════════════════ HISTORY VIEWER CLASS ═════════════════════════
class HistoryViewer:
def __init__(self, terminal_path: str):
self.path = terminal_path
self.root = tk.Tk()
self.root.title("履歴検索")
self.root.protocol("WM_DELETE_WINDOW", self._on_closing)
self.timezone = pytz.utc
self.currency = ""
self.data_queue = queue.Queue()
self.all_trades = []
self.current_page = 1
self.items_per_page = 100
self.total_pages = 0
self.start_date_enabled_var = tk.BooleanVar(value=True)
self.end_date_enabled_var = tk.BooleanVar(value=True)
self.sort_desc_var = tk.BooleanVar(value=True)
self.trade_type_filter_var = tk.StringVar(value="全部")
self.summary_labels = {}
if not self._connect_mt5():
self.root.after(100, self.root.destroy)
return
account_info = mt5.account_info()
if account_info:
self.currency = account_info.currency
self._setup_ui()
self._populate_symbols_from_history()
self._reset_conditions()
atexit.register(self._shutdown_mt5)
def _connect_mt5(self) -> bool:
if not mt5.initialize(path=self.path):
err_code, err_msg = mt5.last_error()
messagebox.showerror("MT5 Connection Error", f"MT5の初期化に失敗しました: {err_msg} ({err_code})")
return False
terminal_info = mt5.terminal_info()
if terminal_info and hasattr(terminal_info, 'time_zone'):
try:
self.timezone = pytz.timezone(terminal_info.time_zone)
except pytz.exceptions.UnknownTimeZoneError:
self.timezone = pytz.utc
return True
def _shutdown_mt5(self):
mt5.shutdown()
def _format_currency(self, value: float) -> str:
is_jpy = self.currency == 'JPY'
digits = 0 if is_jpy else 2
return f"{value:+,.{digits}f}" if value != 0 else f"{value:,.{digits}f}"
def _format_pips(self, value: float) -> str:
return f"{value:+.1f}" if value != 0 else f"{value:.1f}"
def _setup_ui(self):
main_frame = ttk.Frame(self.root, padding=10)
main_frame.grid(row=0, column=0, sticky="nsew")
self.root.grid_rowconfigure(0, weight=1); self.root.grid_columnconfigure(0, weight=1)
search_frame = ttk.Labelframe(main_frame, text="検索条件", padding=10)
search_frame.grid(row=0, column=0, sticky="ew", pady=5)
search_frame.grid_columnconfigure(3, weight=1)
# --- Row 0: 期間、ページネーション、リセットボタン ---
ttk.Label(search_frame, text="期間:").grid(row=0, column=0, padx=(0, 5), pady=5, sticky="w")
time_frame = ttk.Frame(search_frame)
time_frame.grid(row=0, column=1, sticky="w", pady=5)
self.start_date_var = tk.StringVar(); self.start_hour_var = tk.StringVar(); self.start_min_var = tk.StringVar()
self.end_date_var = tk.StringVar(); self.end_hour_var = tk.StringVar(); self.end_min_var = tk.StringVar()
self.start_date_check = ttk.Checkbutton(time_frame, variable=self.start_date_enabled_var, command=self._toggle_date_widgets)
self.start_date_check.pack(side="left")
self.start_date_entry = DateEntry(time_frame, textvariable=self.start_date_var, date_pattern='y/mm/dd', width=10)
self.start_date_entry.pack(side="left")
self.start_hour_spin = ttk.Spinbox(time_frame, from_=0, to=23, textvariable=self.start_hour_var, width=3, format="%02.0f")
self.start_hour_spin.pack(side="left", padx=(5,0))
ttk.Label(time_frame, text=":").pack(side="left")
self.start_min_spin = ttk.Spinbox(time_frame, from_=0, to=59, textvariable=self.start_min_var, width=3, format="%02.0f")
self.start_min_spin.pack(side="left")
ttk.Label(time_frame, text=" ~ ").pack(side="left", padx=5)
self.end_date_check = ttk.Checkbutton(time_frame, variable=self.end_date_enabled_var, command=self._toggle_date_widgets)
self.end_date_check.pack(side="left")
self.end_date_entry = DateEntry(time_frame, textvariable=self.end_date_var, date_pattern='y/mm/dd', width=10)
self.end_date_entry.pack(side="left")
self.end_hour_spin = ttk.Spinbox(time_frame, from_=0, to=23, textvariable=self.end_hour_var, width=3, format="%02.0f")
self.end_hour_spin.pack(side="left", padx=(5,0))
ttk.Label(time_frame, text=":").pack(side="left")
self.end_min_spin = ttk.Spinbox(time_frame, from_=0, to=59, textvariable=self.end_min_var, width=3, format="%02.0f")
self.end_min_spin.pack(side="left")
pagination_frame = ttk.Frame(search_frame)
pagination_frame.grid(row=0, column=2, sticky="w", padx=(20, 0))
self.prev_button = ttk.Button(pagination_frame, text="前へ(P)", command=self._prev_page, state="disabled")
self.prev_button.pack(side="left")
self.page_info_var = tk.StringVar()
self.page_combo = ttk.Combobox(pagination_frame, textvariable=self.page_info_var, values=['25件', '50件', '100件', '500件'], state="readonly", width=18, justify='center')
self.page_combo.set("100件")
self.items_per_page = 100
self.page_combo.bind("<<ComboboxSelected>>", self._on_items_per_page_changed)
self.page_combo.pack(side="left", padx=5)
self.next_button = ttk.Button(pagination_frame, text="次へ(N)", command=self._next_page, state="disabled")
self.next_button.pack(side="left")
reset_button = ttk.Button(search_frame, text="条件リセット", command=self._reset_conditions)
reset_button.grid(row=0, column=3, sticky="e", padx=(10, 5))
# --- Row 1: 通貨、売買区分、ソート順、エクスポートボタン --- ★ レイアウト変更
# ★ 変更: columnspanを4から3に変更し、エクスポートボタンのスペースを確保
row1_frame = ttk.Frame(search_frame)
row1_frame.grid(row=1, column=0, columnspan=3, sticky="w", pady=5)
ttk.Label(row1_frame, text="通貨:").pack(side="left", pady=(5,0))
self.symbol_var = tk.StringVar(value="(全通貨)")
self.symbol_combo = ttk.Combobox(row1_frame, textvariable=self.symbol_var, width=15, state="readonly")
self.symbol_combo.pack(side="left", padx=(5, 0), pady=(5,0))
trade_type_frame = ttk.Labelframe(row1_frame, text="売買区分")
trade_type_frame.pack(side="left", padx=(20, 10), pady=(5,0))
ttk.Radiobutton(trade_type_frame, text="全部", variable=self.trade_type_filter_var, value="全部").pack(side="left", padx=(5,0))
ttk.Radiobutton(trade_type_frame, text="売", variable=self.trade_type_filter_var, value="売").pack(side="left", padx=5)
ttk.Radiobutton(trade_type_frame, text="買", variable=self.trade_type_filter_var, value="買").pack(side="left", padx=(0,5))
sort_check = ttk.Checkbutton(row1_frame, text="新しい履歴から表示", variable=self.sort_desc_var, command=self._resort_and_display)
sort_check.pack(side="left", padx=(0, 0), pady=(5,0))
# ★ 追加: Excelエクスポートボタンを検索条件エリアに移動
self.export_button = ttk.Button(search_frame, text="Excelへエクスポート", command=self._export_to_excel, state="disabled")
self.export_button.grid(row=1, column=3, sticky="e", padx=(10, 5), pady=5)
# --- 検索ボタン ---
self.search_button = ttk.Button(search_frame, text="検索(S)", command=self.start_fetch_thread)
self.search_button.grid(row=0, column=4, rowspan=2, padx=10, ipady=10, sticky="ns")
# --- Treeview ---
history_frame = ttk.Frame(main_frame)
history_frame.grid(row=1, column=0, sticky="nsew", pady=10)
main_frame.grid_rowconfigure(1, weight=1); main_frame.grid_columnconfigure(0, weight=1)
columns = ("exec_time", "close_time", "type", "volume", "symbol", "open_price", "close_price", "gross_profit", "pips_profit", "net_profit", "swap", "comment", "entry")
self.tree = ttk.Treeview(history_frame, columns=columns, show="headings")
col_map = { "exec_time": ("約定時間", 140), "close_time": ("決済時間", 140), "type": ("取引", 40), "volume": ("数量", 60), "symbol": ("通貨ペア", 80), "open_price": ("約定価格", 90), "close_price": ("決済価格", 90), "gross_profit": ("売買損益", 90), "pips_profit": ("pips損益", 80), "net_profit": ("決済損益", 90), "swap": ("スワップ", 70), "comment": ("コメント", 120), "entry": ("エントリー", 60) }
for col, (text, width) in col_map.items():
self.tree.heading(col, text=text, anchor="center"); self.tree.column(col, width=width, anchor="e", stretch=tk.FALSE)
self.tree.column("comment", anchor="w", stretch=tk.FALSE)
vsb, hsb = ttk.Scrollbar(history_frame, orient="vertical", command=self.tree.yview), ttk.Scrollbar(history_frame, orient="horizontal", command=self.tree.xview)
self.tree.configure(yscrollcommand=vsb.set, xscrollcommand=hsb.set)
self.tree.grid(row=0, column=0, sticky="nsew"); vsb.grid(row=0, column=1, sticky="ns"); hsb.grid(row=1, column=0, sticky="ew")
history_frame.grid_rowconfigure(0, weight=1); history_frame.grid_columnconfigure(0, weight=1)
self.tree.tag_configure("positive", foreground="blue"); self.tree.tag_configure("negative", foreground="red")
# ★ 変更: 合計欄のフレームのみに修正
summary_frame = ttk.Frame(main_frame, padding=5)
summary_frame.grid(row=2, column=0, sticky="ew")
self.summary_vars = {}
summary_items = ["ポジション数合計", "Lot数合計", "売買損益合計", "pips損益合計", "決済損益合計", "手数料合計", "スワップ合計"]
for i, label_text in enumerate(summary_items):
row, col = divmod(i, 4)
ttk.Label(summary_frame, text=f"{label_text}:").grid(row=row, column=col*2, sticky="w", padx=(5,0), pady=2)
var = tk.StringVar(value="0")
self.summary_vars[label_text] = var
value_label = tk.Label(summary_frame, textvariable=var, background="white", relief="solid", borderwidth=1, anchor="center", padx=4, font=("メイリオ", 10))
value_label.grid(row=row, column=col*2 + 1, sticky="ew", padx=5, pady=2)
self.summary_labels[label_text] = value_label
summary_frame.grid_columnconfigure(col*2 + 1, weight=1)
self.resize_manager = ResizeManager(self.root, self.tree, history_frame)
def _reset_conditions(self):
now = datetime.now()
thirty_days_ago = now - timedelta(days=30)
self.start_date_var.set(thirty_days_ago.strftime("%Y/%m/%d")); self.start_hour_var.set("00"); self.start_min_var.set("00")
self.end_date_var.set(now.strftime("%Y/%m/%d")); self.end_hour_var.set(now.strftime("%H")); self.end_min_var.set(now.strftime("%M"))
self.start_date_enabled_var.set(True); self.end_date_enabled_var.set(True)
self._toggle_date_widgets()
self.symbol_var.set("(全通貨)")
self.sort_desc_var.set(True)
self.trade_type_filter_var.set("全部")
self.page_combo.set("100件")
self.items_per_page = 100
def _toggle_date_widgets(self):
start_state = "normal" if self.start_date_enabled_var.get() else "disabled"
for widget in [self.start_date_entry, self.start_hour_spin, self.start_min_spin]: widget.config(state=start_state)
end_state = "normal" if self.end_date_enabled_var.get() else "disabled"
for widget in [self.end_date_entry, self.end_hour_spin, self.end_min_spin]: widget.config(state=end_state)
def _populate_symbols_from_history(self):
try:
from_date = datetime(2000, 1, 1, tzinfo=pytz.utc)
to_date = datetime.now(pytz.utc) + timedelta(days=1)
deals = mt5.history_deals_get(from_date, to_date)
if deals:
history_symbols = sorted(list(set(d.symbol for d in deals if d.symbol)))
self.symbol_combo['values'] = ["(全通貨)"] + history_symbols
else:
self.symbol_combo['values'] = ["(全通貨)"]
except Exception as e:
print(f"履歴からの通貨ペア取得エラー: {e}"); self.symbol_combo['values'] = ["(全通貨)"]
self.symbol_combo.current(0)
def start_fetch_thread(self):
self.search_button.config(state="disabled"); self.prev_button.config(state="disabled")
self.next_button.config(state="disabled"); self.page_combo.config(state="disabled")
self.export_button.config(state="disabled") # ★ 追加: 検索開始時にエクスポートボタンを無効化
self.page_info_var.set("検索中...")
self.tree.delete(*self.tree.get_children())
for key, var in self.summary_vars.items():
var.set("0"); self.summary_labels[key].config(foreground="black")
self.all_trades = []
self.progress_win = tk.Toplevel(self.root)
self.progress_win.title(""); self.progress_win.geometry("200x50")
self.progress_win.transient(self.root); self.progress_win.grab_set()
self.progress_win.protocol("WM_DELETE_WINDOW", lambda: None)
ttk.Label(self.progress_win, text="処理中...").pack(expand=True)
threading.Thread(target=self._fetch_history_thread, daemon=True).start()
self.root.after(100, self.process_queue)
def process_queue(self):
try:
while True:
message = self.data_queue.get_nowait()
if isinstance(message, tuple):
msg_type, payload = message
if msg_type == "error":
self.progress_win.destroy(); self.search_button.config(state="normal")
self.page_combo.config(state="readonly"); self.page_info_var.set("---")
self.export_button.config(state="disabled") # ★ 追加
err_code, err_msg = payload
messagebox.showerror("エラー", f"履歴の取得に失敗しました: {err_msg} ({err_code})")
return
else: self._initialize_display(payload)
elif message == "done":
self.progress_win.destroy(); self.search_button.config(state="normal")
self.page_combo.config(state="readonly")
if not self.all_trades:
self._update_page_info_text()
self.export_button.config(state="disabled") # ★ 追加
messagebox.showinfo("情報", "指定された期間の取引履歴はありません。")
return
except queue.Empty: self.root.after(100, self.process_queue)
def _fetch_history_thread(self):
try:
from_utc = datetime(2000, 1, 1, tzinfo=pytz.utc)
if self.start_date_enabled_var.get():
from_local = datetime.strptime(f"{self.start_date_var.get()} {self.start_hour_var.get()}:{self.start_min_var.get()}", "%Y/%m/%d %H:%M")
from_utc = self.timezone.localize(from_local).astimezone(pytz.utc)
to_utc = datetime.now(pytz.utc)
if self.end_date_enabled_var.get():
to_local = datetime.strptime(f"{self.end_date_var.get()} {self.end_hour_var.get()}:{self.end_min_var.get()}", "%Y/%m/%d %H:%M")
to_utc = self.timezone.localize(to_local).astimezone(pytz.utc)
symbol_filter = self.symbol_var.get()
if symbol_filter == "(全通貨)": symbol_filter = None
trade_type_filter = self.trade_type_filter_var.get()
except ValueError:
self.data_queue.put(("error", (0, "日付や時間の形式が正しくありません。"))); return
deals = mt5.history_deals_get(from_utc, to_utc, group="*")
if deals is None: self.data_queue.put(("error", mt5.last_error())); return
if not deals: self.data_queue.put("done"); return
if symbol_filter: deals = [d for d in deals if d.symbol == symbol_filter]
symbol_info_cache = {s: mt5.symbol_info(s) for s in {d.symbol for d in deals if d.symbol} if mt5.symbol_info(s)}
positions = defaultdict(lambda: {'in': [], 'out': []})
for deal in deals:
if deal.entry == mt5.DEAL_ENTRY_IN: positions[deal.position_id]['in'].append(deal)
elif deal.entry == mt5.DEAL_ENTRY_OUT: positions[deal.position_id]['out'].append(deal)
all_trades, totals = [], {"pos": 0, "lots": 0.0, "gross": 0.0, "pips": 0.0, "net": 0.0, "comm": 0.0, "swap": 0.0}
for pos_id, pos_deals in positions.items():
if not pos_deals['in'] or not pos_deals['out']: continue
in_deals = sorted(pos_deals['in'], key=lambda d: d.time_msc)
out_deals = sorted(pos_deals['out'], key=lambda d: d.time_msc)
for out_deal in out_deals:
in_deal = in_deals[0] if in_deals else None
if not in_deal: continue
trade_type_str = "買" if in_deal.type == mt5.DEAL_TYPE_BUY else "売"
if trade_type_filter != "全部" and trade_type_filter != trade_type_str:
continue
info = symbol_info_cache.get(in_deal.symbol)
if not info: continue
pip_size = info.point * 10 if info.digits in (3, 5) else info.point
if pip_size == 0:
pips = 0.0
else:
price_diff = (out_deal.price - in_deal.price) if in_deal.type == mt5.DEAL_TYPE_BUY else (in_deal.price - out_deal.price)
pips = price_diff / pip_size
net_profit = out_deal.profit + out_deal.commission + out_deal.swap
row_tag = "positive" if net_profit > 0 else ("negative" if net_profit < 0 else "")
all_trades.append({
"close_time": out_deal.time, "tag": row_tag,
"data": (
datetime.fromtimestamp(in_deal.time, tz=pytz.utc).astimezone(self.timezone).strftime('%Y.%m.%d %H:%M:%S'),
datetime.fromtimestamp(out_deal.time, tz=pytz.utc).astimezone(self.timezone).strftime('%Y.%m.%d %H:%M:%S'),
trade_type_str, f"{out_deal.volume:.2f}", out_deal.symbol,
f"{in_deal.price:.{info.digits}f}", f"{out_deal.price:.{info.digits}f}",
self._format_currency(out_deal.profit), self._format_pips(pips),
self._format_currency(net_profit), self._format_currency(out_deal.swap),
out_deal.comment, "OUT"
)})
totals['pos']+=1; totals['lots']+=out_deal.volume; totals['gross']+=out_deal.profit; totals['pips']+=pips;
totals['net']+=net_profit; totals['comm']+=out_deal.commission; totals['swap']+=out_deal.swap
all_trades.sort(key=lambda x: x['close_time'], reverse=self.sort_desc_var.get())
self.data_queue.put(("data", (all_trades, totals)))
self.data_queue.put("done")
def _initialize_display(self, data):
all_trades, totals = data
self.all_trades = all_trades
self._on_items_per_page_changed()
self.summary_vars["ポジション数合計"].set(f"{totals['pos']:,}")
self.summary_labels["ポジション数合計"].config(foreground="black")
self.summary_vars["Lot数合計"].set(f"{totals['lots']:.2f}")
self.summary_labels["Lot数合計"].config(foreground="black")
financial_keys = {"売買損益合計": totals['gross'], "pips損益合計": totals['pips'], "決済損益合計": totals['net'], "手数料合計": totals['comm'], "スワップ合計": totals['swap']}
for key, value in financial_keys.items():
formatter = self._format_pips if 'pips' in key else self._format_currency
self.summary_vars[key].set(formatter(value))
self.summary_labels[key].config(foreground="blue" if value > 0 else "red" if value < 0 else "black")
self.current_page = 1
self.total_pages = math.ceil(len(self.all_trades) / self.items_per_page) if self.items_per_page > 0 else 0
self._display_page()
def _display_page(self):
self.tree.delete(*self.tree.get_children())
start_index = (self.current_page - 1) * self.items_per_page
end_index = start_index + self.items_per_page
for trade in self.all_trades[start_index:end_index]:
self.tree.insert("", "end", values=trade['data'], tags=(trade['tag'],))
self._update_page_info_text()
self.prev_button.config(state="normal" if self.current_page > 1 else "disabled")
self.next_button.config(state="normal" if self.current_page < self.total_pages else "disabled")
# ★ 追加: 履歴の有無に応じてエクスポートボタンの状態を更新
has_data = len(self.all_trades) > 0
self.export_button.config(state="normal" if has_data else "disabled")
def _update_page_info_text(self):
total_items = len(self.all_trades)
if total_items == 0:
self.page_info_var.set("0件 (0/0)")
return
start_item = (self.current_page - 1) * self.items_per_page + 1
end_item = min(self.current_page * self.items_per_page, total_items)
self.page_info_var.set(f"{start_item}~{end_item}件 ({self.current_page}/{self.total_pages})")
def _on_items_per_page_changed(self, event=None):
try:
selected_value = self.page_combo.get()
if "件" in selected_value:
self.items_per_page = int(selected_value.replace('件', ''))
else:
self.items_per_page = 100
except (ValueError, AttributeError):
self.items_per_page = 100
if self.all_trades:
self.current_page = 1
self.total_pages = math.ceil(len(self.all_trades) / self.items_per_page) if self.items_per_page > 0 else 0
self._display_page()
def _resort_and_display(self):
if not self.all_trades: return
self.all_trades.sort(key=lambda x: x['close_time'], reverse=self.sort_desc_var.get())
self.current_page = 1; self._display_page()
def _prev_page(self):
if self.current_page > 1: self.current_page -= 1; self._display_page()
def _next_page(self):
if self.current_page < self.total_pages: self.current_page += 1; self._display_page()
# ★ 変更なし: Excelエクスポート機能のロジック自体は変更ありません
def _export_to_excel(self):
"""Treeviewに表示されているデータをExcelファイルに出力する。"""
if not self.tree.get_children():
messagebox.showinfo("情報", "エクスポートするデータがありません。")
return
filepath = filedialog.asksaveasfilename(
defaultextension=".xlsx",
filetypes=[("Excel ファイル", "*.xlsx"), ("すべてのファイル", "*.*")],
title="Excelファイルとして保存",
initialfile=f"取引履歴_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
)
if not filepath:
return
try:
self.root.config(cursor="watch")
self.root.update_idletasks()
columns = self.tree['columns']
headers = [self.tree.heading(col, 'text') for col in columns]
data = []
# ★ 変更: all_trades (検索結果全体) をエクスポート対象にする
for trade in self.all_trades:
data.append(trade['data'])
df = pd.DataFrame(data, columns=headers)
df.to_excel(filepath, index=False, engine='openpyxl')
messagebox.showinfo("成功", f"データを正常にエクスポートしました。\nファイル: {filepath}")
except Exception as e:
messagebox.showerror("エクスポートエラー", f"ファイルのエクスポート中にエラーが発生しました。\n\n詳細: {e}")
finally:
self.root.config(cursor="")
def _on_closing(self):
if messagebox.askokcancel("終了", "アプリケーションを終了しますか?"): self.root.destroy()
def run(self): self.root.mainloop()
def main():
term_path = choose_terminal()
if not term_path:
messagebox.showinfo("終了", "MT5ターミナルが選択されなかったため、アプリケーションを終了します。")
sys.exit("MT5ターミナルが選択されませんでした。")
app = HistoryViewer(terminal_path=term_path)
if app.root.winfo_exists():
app.run()
if __name__ == "__main__":
main()仮想通貨の無料ソースコード
Python MMbot(マーケットメイク)
以下のソースコードではマーケットメイク戦略を行います。トレードロジックは以下の記事を参考にしています。
マーケットメイク戦略とは、ロング・ショートの指値注文(メイク注文)を提示し続け、他のトレーダーと売買することでスプレッドを稼ぐ手法のこと。
FX・株式市場・CEXだと競合が強すぎて利益が出せませんが、新興DEXであれば短期間で荒稼ぎすることは可能です。
import asyncio
import time
import logging
from typing import Dict
# ロギングの設定
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class MarketMakerBot:
"""
ブログ記事「Adventures in Spot Market Making」に基づいた
段階的なロジックを持つマーケットメイキングボット。
"""
def __init__(self, api_key: str, api_secret: str, symbol: str, config: Dict):
"""
ボットの初期化
:param api_key: 取引所のAPIキー
:param api_secret: 取引所のAPIシークレット
:param symbol: 取引ペア (例: 'BTC/USDT')
:param config: 戦略パラメータを含む辞書
"""
self.api_key = api_key
self.api_secret = api_secret
self.symbol = symbol
self.config = config
# 状態を保持する変数
self.inventory = {'base': 0.0, 'quote': 0.0}
self.last_mid_price = 0.0
logging.info("マーケットメイキングボットを初期化しました。")
logging.info(f"取引ペア: {self.symbol}")
logging.info(f"設定: {self.config}")
# --- Step 1: 取引所API連携(ダミー実装) ---
# 以下のメソッドを、お使いの取引所のAPIに合わせて実装してください。
# 例として、ccxtライブラリなどを使うと便利です。
async def _get_bbo(self) -> Dict[str, float]:
"""
最良買気配(Best Bid)と最良売気配(Best Ask)を取得する。
# TODO: ここに取引所固有のAPI呼び出しを実装
"""
# ダミーデータ(テスト用)
# 実際のAPIでは市場価格が返されます
bid = 99.95
ask = 100.05
logging.debug(f"BBO取得: Bid={bid}, Ask={ask}")
return {'bid': bid, 'ask': ask}
async def _get_inventory(self) -> Dict[str, float]:
"""
現在の資産残高(ベース通貨とクオート通貨)を取得する。
# TODO: ここに取引所固有のAPI呼び出しを実装
"""
# ダミーデータ(テスト用)
# 実際にはAPIからウォレット残高を取得します
self.inventory = {'base': 5.0, 'quote': 500.0}
logging.debug(f"在庫取得: {self.inventory}")
return self.inventory
async def _place_order(self, side: str, price: float, size: float):
"""
新規注文を出す。
# TODO: ここに取引所固有のAPI呼び出しを実装
"""
logging.info(f"【注文実行】 Side: {side}, Price: {price:.4f}, Size: {size:.6f}")
# print(f"PLACING ORDER: {side} {size} {self.symbol} at {price}")
async def _cancel_all_orders(self):
"""
この取引ペアの未約定注文を全てキャンセルする。
# TODO: ここに取引所固有のAPI呼び出しを実装
"""
logging.info("全ての未約定注文をキャンセルします。")
# print("CANCELLING ALL ORDERS")
# --- 戦略ロジック ---
def _calculate_quotes(self, mid_price: float) -> Dict[str, float]:
"""
現在の戦略に基づいて、最終的な買い(bid)と売り(ask)の価格とサイズを計算する。
"""
# --- Step 2: ナイーブな戦略(基本のスプレッド)---
# 仲値から固定スプレッドをあけて価格を決定
# BPS (Basis Points): 1 BPS = 0.01% = 0.0001
spread_bps = self.config['default_width_bps']
bid_price = mid_price * (1 - spread_bps / 2 / 10000)
ask_price = mid_price * (1 + spread_bps / 2 / 10000)
# --- Step 3 & 4: 在庫に基づいた調整 ---
if self.config['use_inventory_management']:
# 在庫スキューを計算
base_value = self.inventory['base'] * mid_price
quote_value = self.inventory['quote']
total_value = base_value + quote_value
if total_value == 0:
inventory_skew = 0
else:
# 在庫の偏り (-1: 全てquote, +1: 全てbase)
inventory_skew = (base_value - quote_value) / total_value
# --- Step 4: 価格のスキュー ---
# 在庫が多すぎる資産を売る方向に価格を偏らせる
# inventory_skew > 0 (baseが多い) -> 売りたい -> 価格を全体的に下げる
# inventory_skew < 0 (quoteが多い) -> 買いたい -> 価格を全体的に上げる
dampening = self.config['dampening_factor']
offset_bps = -1 * dampening * inventory_skew * spread_bps
bid_price = mid_price * (1 - (spread_bps / 2 - offset_bps) / 10000)
ask_price = mid_price * (1 + (spread_bps / 2 + offset_bps) / 10000)
logging.info(f"在庫スキュー: {inventory_skew:.2f}, 価格オフセット: {offset_bps:.2f} BPS")
# --- Step 3: サイズの調整 ---
# 各資産の一定割合を注文サイズとする
order_size_pct = self.config['order_size_pct']
bid_size = (self.inventory['quote'] * order_size_pct) / bid_price
ask_size = self.inventory['base'] * order_size_pct
else:
# ナイーブ戦略の場合、サイズは固定
bid_size = self.config['fixed_order_size']
ask_size = self.config['fixed_order_size']
return {
'bid_price': bid_price,
'bid_size': bid_size,
'ask_price': ask_price,
'ask_size': ask_size,
}
async def run(self):
"""
ボットのメインループ
"""
logging.info("ボットのメインループを開始します...")
while True:
try:
# 1. 最新の市場情報と在庫情報を取得
bbo = await self._get_bbo()
await self._get_inventory()
if not bbo or 'bid' not in bbo or 'ask' not in bbo:
logging.warning("有効なBBOを取得できませんでした。スキップします。")
await asyncio.sleep(self.config['loop_interval_sec'])
continue
mid_price = (bbo['bid'] + bbo['ask']) / 2
# 2. 仲値が大きく動いたかチェック (リクオートのトリガー)
price_change_bps = abs(mid_price - self.last_mid_price) / self.last_mid_price * 10000 if self.last_mid_price > 0 else float('inf')
if price_change_bps > self.config['relist_threshold_bps']:
logging.info(f"価格が{price_change_bps:.2f} BPS変動しました。リクオートします。")
logging.info(f"旧仲値: {self.last_mid_price:.4f}, 新仲値: {mid_price:.4f}")
# 3. 既存の注文をキャンセル
await self._cancel_all_orders()
# 4. 新しいクオートを計算
quotes = self._calculate_quotes(mid_price)
# 5. 新しい注文を出す
if quotes['bid_size'] > 0:
await self._place_order('buy', quotes['bid_price'], quotes['bid_size'])
if quotes['ask_size'] > 0:
await self._place_order('sell', quotes['ask_price'], quotes['ask_size'])
# 6. 最後の仲値を更新
self.last_mid_price = mid_price
else:
logging.debug("価格変動は閾値以下です。注文は維持します。")
except Exception as e:
logging.error(f"ループ中にエラーが発生しました: {e}")
await asyncio.sleep(self.config['loop_interval_sec'])
if __name__ == '__main__':
# --- 重要: このボットを実際の資金で運用する前には ---
# 1. 取引所のテストネット(Testnet)で十分にテストしてください。
# 2. API連携部分(# TODO:)を正しく実装してください。
# 3. パラメータを市場の状況に合わせて慎重に調整してください。
# 4. 小額から始めて、ボットの挙動を監視してください。
# ----------------------------------------------------
# --- ボットの設定 ---
bot_config = {
# --- 基本戦略パラメータ ---
'default_width_bps': 75, # 仲値からの基本スプレッド (75 BPS = 0.75%)
'relist_threshold_bps': 10, # このBPS以上価格が動いたらリクオートする (10 BPS = 0.1%)
'loop_interval_sec': 5, # メインループの実行間隔(秒)
# --- Step 2: ナイーブ戦略用 ---
'fixed_order_size': 0.01, # use_inventory_management=False の場合の固定注文サイズ
# --- Step 3 & 4: 在庫管理戦略用 ---
'use_inventory_management': True, # Trueにすると在庫管理とスキューが有効になる
'order_size_pct': 0.1, # 注文に使う在庫の割合 (10%)
'dampening_factor': 0.3, # 在庫スキューが価格に与える影響の減衰係数
}
# ボットのインスタンスを作成
# api_keyとapi_secretは環境変数などから安全に読み込むことを推奨します
mm_bot = MarketMakerBot(
api_key="YOUR_API_KEY",
api_secret="YOUR_API_SECRET",
symbol="ASTER/USDT", # 例: 取引したいペア
config=bot_config
)
# ボットを実行
try:
asyncio.run(mm_bot.run())
except KeyboardInterrupt:
logging.info("ボットの実行が手動で停止されました。")0コストお削りbot(Bybit/Hyperliquid)
以下のソースコードは、Bybit/Hyperliquidで鞘取りトレード(ベーシストレーディング)を行います。トレードロジックは、以下のnoteを参考にしています。

ベーシストレーディングとは、同一資産の現物・先物で価格が乖離したときに両建てをすることで、乖離縮小後に利益を得る手法のこと。鞘取りトレードとも呼ばれます。
import asyncio
import os
import time
from dotenv import load_dotenv
# Bybit
from pybit.unified_trading import WebSocket
# Hyperliquid
from hyperliquid.info import Info
from hyperliquid.exchange import Exchange
from hyperliquid.utils import constants
# .envファイルから環境変数を読み込む
load_dotenv()
# --- 設定項目 ---
# Bybitの設定
BYBIT_API_KEY = os.getenv("BYBIT_API_KEY")
BYBIT_API_SECRET = os.getenv("BYBIT_API_SECRET")
BYBIT_SYMBOL = "ETHUSDC" # Bybitの取引ペア (USDC建て)
BYBIT_CATEGORY = "spot" # スポット取引
# Hyperliquidの設定
HYPERLIQUID_PRIVATE_KEY = os.getenv("HYPERLIQUID_PRIVATE_KEY")
HYPERLIQUID_ASSET = "ETH" # Hyperliquidの資産名
# メインネットを使う場合は constants.MAINNET_API_URL に変更
HYPERLIQUID_API_URL = constants.TESTNET_API_URL
# 取引戦略の設定
TRADE_SIZE_USD = 10 # 1回あたりの取引サイズ(USD相当)
OPEN_THRESHOLD_BPS = 20.0 # ポジションを持つための利益率の閾値 (bps)
CLOSE_THRESHOLD_BPS = 0.0 # ポジションを閉じるための利益率の閾値 (bps)
# --- グローバル変数 ---
# 最新の価格情報を格納する辞書
latest_prices = {
"bybit_bid": 0,
"bybit_ask": 0,
"hl_bid": 0,
"hl_ask": 0,
}
# ポジションを保有しているかどうかの状態を管理
in_position = False
# --- Hyperliquidのセットアップ ---
info = Info(HYPERLIQUID_API_URL, skip_ws=True)
# 自分のウォレットアドレスを取得
account = Exchange.account_from_private_key(HYPERLIQUID_PRIVATE_KEY)
exchange = Exchange(account, HYPERLIQUID_API_URL)
# Hyperliquidの資産情報を取得(例: ETHの小数点以下の桁数など)
meta = info.meta()
asset_info = next(a for a in meta["universe"] if a["name"] == HYPERLIQUID_ASSET)
asset_decimals = asset_info["szDecimals"]
# --- メインロジック ---
async def check_and_trade():
"""価格差をチェックして、条件を満たせば取引を実行する"""
global in_position
# 価格情報がまだ揃っていない場合は何もしない
if 0 in latest_prices.values():
return
hl_bid = latest_prices["hl_bid"]
bybit_ask = latest_prices["bybit_ask"]
bybit_bid = latest_prices["bybit_bid"]
hl_ask = latest_prices["hl_ask"]
# --- 1. ポジションを開く条件の計算 (Hyperliquid Short, Bybit Long) ---
# open_pnl = (HyperliquidのBid価格 - BybitのAsk価格) / 平均価格
if hl_bid > 0 and bybit_ask > 0:
avg_open_price = (hl_bid + bybit_ask) / 2
open_pnl_bps = ((hl_bid - bybit_ask) / avg_open_price) * 10000
else:
open_pnl_bps = -9999
# --- 2. ポジションを閉じる条件の計算 (Hyperliquid Long, Bybit Short) ---
# close_pnl = (BybitのBid価格 - HyperliquidのAsk価格) / 平均価格
if bybit_bid > 0 and hl_ask > 0:
avg_close_price = (bybit_bid + hl_ask) / 2
close_pnl_bps = ((bybit_bid - hl_ask) / avg_close_price) * 10000
else:
close_pnl_bps = -9999
print(
f"Status: {'In Position' if in_position else 'Idle'} | "
f"Open PnL: {open_pnl_bps:.2f} bps | "
f"Close PnL: {close_pnl_bps:.2f} bps"
)
# --- 3. 取引実行 ---
if not in_position and open_pnl_bps >= OPEN_THRESHOLD_BPS:
print("\n--- [OPEN] 条件を満たしました。ポジションを開きます ---")
# 数量を計算
trade_size_asset = TRADE_SIZE_USD / avg_open_price
# Hyperliquidで売り(ショート)
print(f"Hyperliquid: {trade_size_asset:.{asset_decimals}f} {HYPERLIQUID_ASSET}を売り")
# is_buy=Falseで売り, reduce_only=Falseで新規ポジション
order_hl = exchange.order(HYPERLIQUID_ASSET, is_buy=False, sz=trade_size_asset,
limit_px=hl_bid, order_type={"market": {}}, reduce_only=False)
print(f" -> HL Order: {order_hl}")
# Bybitで買い(ロング) - ここでは擬似的にログ出力のみ
# 実際のBybitの注文APIをここに実装します
print(f"Bybit: {trade_size_asset:.{asset_decimals}f} {BYBIT_SYMBOL}を買い")
# bybit_client.place_order(...)
# 両方の注文が成功したと仮定して状態を変更
in_position = True
print("--- ポジションを開きました ---\n")
elif in_position and close_pnl_bps >= CLOSE_THRESHOLD_BPS:
print("\n--- [CLOSE] 条件を満たしました。ポジションを閉じます ---")
# 数量を計算
trade_size_asset = TRADE_SIZE_USD / avg_close_price
# Hyperliquidで買い(ショートポジションをクローズ)
print(f"Hyperliquid: {trade_size_asset:.{asset_decimals}f} {HYPERLIQUID_ASSET}を買い")
# is_buy=Trueで買い, reduce_only=Trueでポジションを閉じる注文
order_hl = exchange.order(HYPERLIQUID_ASSET, is_buy=True, sz=trade_size_asset,
limit_px=hl_ask, order_type={"market": {}}, reduce_only=True)
print(f" -> HL Order: {order_hl}")
# Bybitで売り(ロングポジションをクローズ)
# 実際のBybitの注文APIをここに実装します
print(f"Bybit: {trade_size_asset:.{asset_decimals}f} {BYBIT_SYMBOL}を売り")
# bybit_client.place_order(...)
# 両方の注文が成功したと仮定して状態を変更
in_position = False
print("--- ポジションを閉じました ---\n")
# --- WebSocketデータハンドラ ---
def handle_bybit_message(message):
"""Bybit WebSocketからメッセージを受け取り、価格を更新"""
try:
data = message["data"]
# orderbook.1トピックは最初の1件が最良価格
if data["b"] and data["a"]:
latest_prices["bybit_bid"] = float(data["b"][0][0])
latest_prices["bybit_ask"] = float(data["a"][0][0])
except Exception as e:
print(f"Bybit WebSocket message error: {e}")
async def subscribe_to_hyperliquid():
"""Hyperliquid WebSocketに接続し、価格を更新"""
async def handle_hl_l2_book(event):
try:
levels = event["data"]["levels"]
if levels[0]: # Bid side
latest_prices["hl_bid"] = float(levels[0][0]["px"])
if levels[1]: # Ask side
latest_prices["hl_ask"] = float(levels[1][0]["px"])
except Exception as e:
print(f"Hyperliquid WebSocket message error: {e}")
info.subscribe({"type": "l2Book", "coin": HYPERLIQUID_ASSET}, handle_hl_l2_book)
# --- メイン実行関数 ---
async def main():
# 1. Bybit WebSocketのセットアップと接続
bybit_ws = WebSocket(
testnet=True, # メインネットの場合は False
channel_type=BYBIT_CATEGORY,
api_key=BYBIT_API_KEY,
api_secret=BYBIT_API_SECRET,
)
bybit_ws.subscribe_orderbook(symbol=BYBIT_SYMBOL, depth=1, callback=handle_bybit_message)
print("Bybit WebSocketに接続中...")
# Bybitは別スレッドで実行されるので、ここでは待機しない
# 2. Hyperliquid WebSocketのセットアップと接続
print("Hyperliquid WebSocketに接続中...")
await subscribe_to_hyperliquid()
# 3. メインループ: 1秒ごとに価格をチェックして取引を試みる
while True:
await check_and_trade()
await asyncio.sleep(1)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nBotを終了します。")バイナリーオプションの無料ソースコード
バイナリー業者向けのレイテンシー取引ツール(うみver)
以下のソースコードは、バイナリ業者でレイテンシー取引を行います。Chromeブラウザをプログラムで動かし、MT5との価格が乖離したときに、ブラウザ内のバイナリー業者の注文ボタンをクリックしてエントリーします。
ただレイテンシーアービトラージはブローカー側による口座凍結・利益没収・出金拒否などのリスクが高く、難易度が高いです。初心者にはおすすめしません。
import json
import asyncio
import re
import requests
import websockets
import pychrome
import time
from collections import deque
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import MetaTrader5 as mt5
import psutil
import sys
import pyautogui # ▼▼▼ 追加: pyautogui をインポート ▼▼▼
# --- ▼ ユーザー設定 ▼ ---
YOUR_HASH = "" # 自身のハッシュ値に書き換えてください
TRADE_AMOUNT = "500"
TIMEFRAME_ID = "15" # 30秒取引:"15", 1分取引:"16", ...
TRADE_MODE = "demo"
MT5_SYMBOL = "USDJPY"
# ▼▼▼ 追加・変更箇所: マウスクリック用の座標設定 ▼▼▼
# 【超重要】これらの座標は**仮の値**です。
# 実際の取引プラットフォームの「PUT」と「CALL」ボタンの正確な中心座標に設定してください。
# 設定を誤るとクリックが全く機能しません。
#
# ★ 座標特定の手順 ★
# 1. 取引プラットフォームのページをChromeで開く。
# 2. Chromeの**ズームレベルを必ず100%**に設定。
# 3. ブラウザの**ウィンドウサイズを固定**し、常にその位置に配置する。
# 4. マウスカーソルを「PUT」または「CALL」ボタンの中心に手動で合わせる。
# 5. Pythonのインタラクティブシェルや簡単なスクリプトで `import pyautogui; pyautogui.position()` を実行し、
# その時点のマウスカーソルのX, Y座標を取得する。
# 例:
# >>> import pyautogui
# >>> pyautogui.position()
# Point(x=900, y=500)
#
# ★ その他重要な注意点 (pyautogui 使用時) ★
# ・プログラム実行中は**ブラウザウィンドウを最小化せず、フォアグラウンドに表示**してください。
# ・プログラム実行中に**マウスやキーボードを操作しないでください**。誤動作の原因になります。
# ・これはOSレベルでの物理クリックであり、画面に表示されているものをクリックします。
PUT_BUTTON_COORD_X = 1742 # 例: PUTボタンのX座標 (画面左上を(0,0)とするピクセル値)
PUT_BUTTON_COORD_Y = 575 # 例: PUTボタンのY座標
CALL_BUTTON_COORD_X = 1732 # 例: CALLボタンのX座標
CALL_BUTTON_COORD_Y = 359 # 例: CALLボンのY座標
# --- ▼ エントリーロジック設定 ▼ ---
# 価格差の移動平均を計算するために、何回分のデータを保持するか
AVERAGE_PERIOD = 50
# 平均の価格差から、何pips乖離したらエントリーするか (例: 0.5pips = 0.005)
ENTRY_THRESHOLD_PIPS = 0.005
# 一度エントリーしてから、次にエントリーするまでの待機時間(秒)
ENTRY_COOLDOWN_SECONDS = 15
# 注文失敗時に再試行する最大回数
MAX_ORDER_RETRY_ATTEMPTS = 3
# 注文再試行までの待機時間(秒)
ORDER_RETRY_DELAY_SECONDS = 5
# --- ▲ エントリーロジック設定 ▲ ---
# --- 以下は基本的に編集不要 ---
# グローバル変数
diff_history = deque(maxlen=AVERAGE_PERIOD)
last_entry_time = 0
def log_message(message):
print(f"[INFO] {message}")
def find_mt5_instances():
instances = []
for proc in psutil.process_iter(['name', 'exe']):
try:
if proc.info['name'] in ['terminal64.exe', 'terminal.exe']:
if proc.info['exe'] and proc.info['exe'] not in instances:
instances.append(proc.info['exe'])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return instances
def select_mt5_instance(instances):
if not instances:
log_message("MT5: 起動しているMT5が見つかりませんでした。")
return None
if len(instances) == 1:
log_message(f"MT5: 起動中のMT5を1つ検出しました: {instances[0]}")
return instances[0]
print("-" * 50)
log_message("複数のMT5が起動しています。接続するMT5を選択してください:")
for i, path in enumerate(instances):
print(f" [{i + 1}] {path}")
while True:
try:
choice = int(input(f"番号を入力してください (1-{len(instances)}): ")) - 1
if 0 <= choice < len(instances): return instances[choice]
else: print("無効な番号です。")
except ValueError: print("数値を入力してください。")
except (KeyboardInterrupt, EOFError): return None
# --- ブラウザへの接続処理 ---
# SeleniumとPyChromeはWebSocketによる価格データ取得のために引き続き使用します。
# クリックにはpyautoguiを使用するため、pychromeでのInput.dispatchMouseEventは不要になります。
try:
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
driver = webdriver.Chrome(options=chrome_options)
log_message("Selenium: 既存のChromeセッションに接続しました (port 9222)")
except Exception as e:
log_message(f"Selenium: 接続に失敗しました: {e}"); exit()
try:
browser = pychrome.Browser(url="http://127.0.0.1:9222")
tabs = browser.list_tab()
tab = tabs[0] if tabs else None
if tab: tab.start(); log_message("PyChrome: 接続に成功しました (port 9222)")
else: log_message("PyChrome: 利用可能なタブが見つかりません")
except Exception as e:
log_message(f"PyChrome: 接続に失敗しました: {e}"); tab = None
async def setup_websocket_connection():
# この関数は、pyautoguiによるクリックとは直接関係ありませんが、
# 価格データ取得のために既存のWebSocket接続設定を維持します。
if not tab:
log_message("BO: PyChrome タブが利用できないためWebSocket設定をスキップします。")
return False
js_websocket_setup = f"""
(async function() {{
if (typeof window.boWebSocket !== 'undefined' && window.boWebSocket.readyState !== WebSocket.CLOSED) {{
console.log("Closing existing BO WebSocket before re-setup.");
window.boWebSocket.close();
await new Promise(r => setTimeout(r, 100));
}}
window.boWebSocket = new WebSocket("wss://bo.zentrader.com:27017/");
window.boWebSocket.onclose = (event) => {{
console.warn("BO WebSocket closed. Code:", event.code, "Reason:", event.reason);
}};
window.boWebSocket.onerror = (error) => {{
console.error("BO WebSocket error:", error);
}};
let timeout = 10000;
let interval = 500;
let elapsed = 0;
while (window.boWebSocket.readyState !== WebSocket.OPEN && elapsed < timeout) {{
await new Promise(r => setTimeout(r, interval));
elapsed += interval;
}}
if (window.boWebSocket.readyState === WebSocket.OPEN) {{
console.log("BO WebSocket opened and ready for initial commands.");
const now = Math.floor(Date.now() / 1000), tE = now, tB = now - 500;
window.boWebSocket.send(JSON.stringify({{
command: "connect",
bo: "{TRADE_MODE}",
hash: "{YOUR_HASH}",
platform: "mt4",
source: "site"
}}));
console.log("connect sent");
await new Promise(r => setTimeout(r, 300));
window.boWebSocket.send(JSON.stringify({{ command: "get_user_data" }}));
console.log("get_user_data sent");
await new Promise(r => setTimeout(r, 300));
window.boWebSocket.send(JSON.stringify({{ command: "hook_time", enable: "true" }}));
console.log("hook_time sent");
await new Promise(r => setTimeout(r, 300));
window.boWebSocket.send(JSON.stringify({{ command: "get_cfg_trade", language: "ja" }}));
console.log("get_cfg_trade sent");
await new Promise(r => setTimeout(r, 300));
window.boWebSocket.send(JSON.stringify({{ command: "get_user_settings" }}));
console.log("get_user_settings sent");
await new Promise(r => setTimeout(r, 300));
window.boWebSocket.send(JSON.stringify({{
command: "hook_timeframes",
option_kind: "1",
tool_id: "4",
timeframe_id: "{TIMEFRAME_ID}",
bo: "{TRADE_MODE}",
enable: "true",
interval: "1000",
source: "site"
}}));
console.log("hook_timeframes sent");
await new Promise(r => setTimeout(r, 300));
window.boWebSocket.send(JSON.stringify({{
command: "get_quotes_history",
tool_id: "4",
time_begin: tB.toString(),
time_end: tE.toString(),
time_size: "S1",
source: "site",
request_id: "4_500"
}}));
console.log("get_quotes_history sent", {{
time_begin: new Date(tB * 1000).toISOString(),
time_end: new Date(tE * 1000).toISOString(),
duration_sec: tE - tB
}});
await new Promise(r => setTimeout(r, 300));
window.boWebSocket.send(JSON.stringify({{ command: "hook_user_status", enable: "true" }}));
console.log("hook_user_status sent");
await new Promise(r => setTimeout(r, 300));
window.boWebSocket.send(JSON.stringify({{
command: "hook_options",
source: "site",
enable: "true",
interval: "100"
}}));
console.log("hook_options sent");
return true;
}} else {{
console.error("BO WebSocket failed to open within timeout. readyState:", window.boWebSocket.readyState);
return false;
}}
}})();
"""
try:
result_eval = tab.Runtime.evaluate(expression=js_websocket_setup, awaitPromise=True)
js_return_value = result_eval.get('result', {}).get('value')
if js_return_value is True:
log_message("BO: WebSocket接続スクリプトの実行とOPEN状態への待機に成功しました。")
return True
else:
log_message("BO: WebSocket接続スクリプトの実行に失敗、またはOPEN状態になりませんでした。")
if js_return_value is False:
pass
else:
log_message(f"BO: JavaScriptからの予期せぬ戻り値: {js_return_value}")
return False
except Exception as e:
log_message(f"BO: WebSocket接続スクリプトの実行中にエラー: {e}")
return False
def send_bo_order(direction, price_open):
"""バイナリーオプションの注文をPC画面上のマウスクリックで送信する関数"""
target_x = 0
target_y = 0
button_name = ""
if direction == "PUT":
target_x = PUT_BUTTON_COORD_X
target_y = PUT_BUTTON_COORD_Y
button_name = "PUT"
elif direction == "CALL":
target_x = CALL_BUTTON_COORD_X
target_y = CALL_BUTTON_COORD_Y
button_name = "CALL"
else:
log_message(f"BO: 不明な注文方向: {direction}")
return False
log_message(f"BO: {button_name} ボタンを物理座標 ({target_x}, {target_y}) でクリックします。現在の価格表示(参考): {price_open}")
log_message(f"【超重要】ブラウザのウィンドウ位置、ズームレベル100%を確認し、実行中はPCを操作しないでください。")
try:
# pyautoguiで指定された座標をクリック
pyautogui.click(x=target_x, y=target_y)
log_message(f"★★★ BO: {button_name} ボタンの物理クリックイベント送信成功 (座標: {target_x}, {target_y}) ★★★")
return True
except pyautogui.FailSafeException:
log_message(f"BO: エラー - マウスが画面隅に移動したため、pyautoguiがFail-Safeをトリガーしました。")
log_message(f"BO: Fail-Safeは緊急停止機能です。プログラムを終了します。")
sys.exit(1) # プログラムを強制終了
except Exception as e:
log_message(f"BO: エラー - {button_name} ボタンの物理クリック中にエラーが発生しました: {e}")
return False
def get_price_from_mt5():
tick = mt5.symbol_info_tick(MT5_SYMBOL)
return tick.ask if tick else None
async def price_monitoring_and_trading_logic():
"""ブラウザ価格を監視し、乖離エントリーロジックを実行する"""
global last_entry_time
port = 9222
try:
response = requests.get(f'http://localhost:{port}/json')
ws_debugger_url = next(item['webSocketDebuggerUrl'] for item in response.json() if 'webSocketDebuggerUrl' in item and item['type'] == 'page')
except Exception as e:
log_message(f"CDP: DevTools WebSocket URL 取得に失敗: {e}");
log_message("CDP: プログラムを終了します。ブラウザが起動していないか、デバッグポートが利用できません。")
sys.exit(1) # 強制終了
log_message("CDP: DevTools WebSocket に接続試行中...")
try:
websocket_cdp = await websockets.connect(ws_debugger_url)
await websocket_cdp.send(json.dumps({"id": 1, "method": "Network.enable"}))
log_message("CDP: DevTools WebSocket に接続成功。価格差の監視を開始...")
except Exception as e:
log_message(f"CDP: DevTools WebSocket への接続に失敗しました: {e}")
log_message("CDP: プログラムを終了します。")
sys.exit(1) # 強制終了
latest_browser_price = 0.0
while True:
try:
message = await websocket_cdp.recv() # CDP WebSocketを使用
data = json.loads(message)
if data.get("method") != "Network.webSocketFrameReceived": continue
# WebSocketフレームのペイロードから価格情報を抽出
payload = data["params"]["response"].get("payloadData", "")
if not payload.startswith("U,1,1,USD/JPY"): continue
match = re.search(r"USD/JPY\|([\d.]+)\|", payload)
if not match: continue
browser_price = float(match.group(1))
if latest_browser_price == browser_price: continue
latest_browser_price = browser_price
mt5_ask = get_price_from_mt5()
if not mt5_ask: continue
current_diff = mt5_ask - browser_price
diff_history.append(current_diff)
if len(diff_history) < AVERAGE_PERIOD:
log_message(f"[データ収集中 {len(diff_history)}/{AVERAGE_PERIOD}] MT5: {mt5_ask} | Browser: {browser_price} | 差: {current_diff:+.5f}")
continue
average_diff = sum(diff_history) / len(diff_history)
log_message(f"[監視中] MT5: {mt5_ask} | Browser: {browser_price} | 現在差: {current_diff:+.5f} | 平均差: {average_diff:+.5f}")
# --- ▼▼▼ エントリー判定 ▼▼▼ ---
if time.time() - last_entry_time < ENTRY_COOLDOWN_SECONDS:
log_message(f"【エントリー対象外】クールダウン中 ({ENTRY_COOLDOWN_SECONDS - (time.time() - last_entry_time):.1f}秒残り) ✕")
continue
order_triggered = False
order_direction = None
order_price = None
if current_diff > average_diff + ENTRY_THRESHOLD_PIPS:
log_message(f"【PUT条件成立】現在差({current_diff:+.5f}) > 平均差({average_diff:+.5f}) + 閾値({ENTRY_THRESHOLD_PIPS}) ○")
order_direction = "PUT"
order_price = browser_price
order_triggered = True
elif current_diff < average_diff - ENTRY_THRESHOLD_PIPS:
log_message(f"【CALL条件成立】現在差({current_diff:+.5f}) < 平均差({average_diff:+.5f}) - 閾値({ENTRY_THRESHOLD_PIPS}) ○")
order_direction = "CALL"
order_price = browser_price
order_triggered = True
else:
log_message(f"【エントリー対象外】条件不成立 (現在差: {current_diff:+.5f}, 平均差: {average_diff:+.5f}, 閾値: {ENTRY_THRESHOLD_PIPS}) ✕")
# 注文がトリガーされた場合の再試行ロジック
if order_triggered:
order_successful = False
for attempt in range(MAX_ORDER_RETRY_ATTEMPTS):
log_message(f"BO: 注文試行中... ({attempt + 1}/{MAX_ORDER_RETRY_ATTEMPTS}回目)")
# pyautoguiによる物理クリックを試行
order_status = send_bo_order(order_direction, order_price)
if order_status is True: # クリックイベント送信が成功
order_successful = True
break # 再試行ループを抜ける
else: # クリックイベント送信が失敗した場合 (FailSafeなどで中断された場合も含む)
if attempt < MAX_ORDER_RETRY_ATTEMPTS - 1:
log_message(f"BO: 注文クリック失敗。{ORDER_RETRY_DELAY_SECONDS}秒後に再試行します。")
await asyncio.sleep(ORDER_RETRY_DELAY_SECONDS)
else:
log_message(f"BO: 注文クリック失敗。最大試行回数 ({MAX_ORDER_RETRY_ATTEMPTS}) に達しました。")
if order_successful:
log_message(f"BO: 注文クリック処理が正常に完了しました。クールダウンを開始します。")
last_entry_time = time.time() # 注文成功時のみ最終エントリー時間を更新
else:
log_message(f"BO: 注文の再試行がすべて失敗しました。次のティックで処理を継続します。")
except websockets.exceptions.ConnectionClosed as e:
log_message(f"CDP: DevTools WebSocket接続が閉じられました: {e}。これは致命的な問題です。")
log_message("CDP: プログラムを終了します。ブラウザのデバッグ接続が失われました。")
break
except Exception as e:
log_message(f"CDP: WebSocket 受信または処理中にエラー: {e}");
await asyncio.sleep(1)
continue
async def main():
mt5_path = select_mt5_instance(find_mt5_instances())
if not mt5_path: log_message("プログラムを終了します。"); return
if not mt5.initialize(path=mt5_path):
log_message(f"MT5: initialize() failed, error code = {mt5.last_error()}"); return
log_message(f"MT5: 接続成功 (Path: {mt5_path}, Version: {mt5.version()})")
log_message("BO: 初期WebSocket接続設定を開始します。")
if not await setup_websocket_connection():
log_message("BO: WebSocketの初期接続設定に失敗しました。プログラムを終了します。"); return
log_message("BO: 初期WebSocket接続が正常に確立されました。")
await price_monitoring_and_trading_logic()
if __name__ == "__main__":
# pyautogui のFail-Safe機能を有効にする (デフォルトで有効ですが念のため)
# マウスを画面の四隅のいずれかに移動させると、pyautoguiの制御が停止します。
pyautogui.FAILSAFE = True
pyautogui.PAUSE = 0.1 # 各pyautogui呼び出し間の0.1秒の一時停止を設定
try:
asyncio.run(main())
except KeyboardInterrupt:
log_message("プログラムが中断されました。(KeyboardInterrupt)")
finally:
if mt5.terminal_info():
mt5.shutdown(); log_message("MT5: 接続をシャットダウンしました。")バイナリー業者向けのレイテンシー取引ツール(天国草ver)
以下のソースコードは、うみさんのレイテンシー取引ツールの元となったソースコードです。
# ---------------------------------------------------------------------
# ENTRY TIMER & ODDS FETCH (SELENIUM)
# ---------------------------------------------------------------------
def init_driver_with_iframe(self):
"""Attach to an existing Chrome (debug port 9230) and switch to iframe containing odds."""
opts = Options()
opts.debugger_address = "localhost:9230" # assumes user started Chrome with --remote-debugging-port=9230
try:
self.driver = webdriver.Chrome(options=opts)
print("✅ Connected to Chrome debugging session")
self.find_and_switch_to_odds_iframe()
except Exception as exc:
print(f"❌ Could not connect to Chrome: {exc}")
self.driver = None
def find_and_switch_to_odds_iframe(self):
"""Loop through iframes until BUY_ODDS_CLASS element found."""
if self.driver is None:
return
while True:
try:
self.driver.switch_to.default_content()
iframes = self.driver.find_elements(By.TAG_NAME, "iframe")
for idx, iframe in enumerate(iframes):
try:
self.driver.switch_to.default_content()
self.driver.switch_to.frame(iframe)
if self.driver.find_elements(By.CLASS_NAME, BUY_ODDS_CLASS):
#print(f"✅ Odds found in iframe[{idx}]")
return
except Exception:
continue
time.sleep(0)
except Exception as exc:
print(f"frame scan error: {exc}")
time.sleep(0)
def get_latest_buy_odds(self):
if not self.driver:
return None
try:
elements = self.driver.find_elements(By.CLASS_NAME, BUY_ODDS_CLASS)
for el in elements:
txt = el.text.strip()
if txt:
return float(txt)
#print("⚠️ BUYオッズ取得できず → iframe再検索")
self.find_and_switch_to_odds_iframe()
except Exception:
self.find_and_switch_to_odds_iframe()
return None
def get_latest_sell_odds(self):
if not self.driver:
return None
try:
el = self.driver.find_element(By.XPATH, SELL_XPATH)
txt = el.text.strip()
return float(txt) if txt else None
except Exception:
self.find_and_switch_to_odds_iframe()
return None
def start_entry_timer(self):
"""Starts 40‑second cycle timer logic one minute in the future."""
if self.timer_active:
return
now = datetime.now()
self.entry_start_time = (now + timedelta(minutes=1)).replace(second=0, microsecond=0)
self.timer_active = True
self.label_overlay(f"Entry timer will start at {self.entry_start_time.strftime('%H:%M:%S')}")
threading.Thread(target=self._entry_timer_loop, daemon=True).start()
def _entry_timer_loop(self):
"""Runs forever updating allow flags based on time and odds."""
while self.timer_active:
time.sleep(0)
now = datetime.now()
if self.entry_start_time is None:
continue
delta = (now - self.entry_start_time).total_seconds()
if delta < 0:
self.allow_buy_click = self.allow_sell_click = False
self.label_overlay("⏳ Waiting for entry start …")
continue
# inside cycles 37is best
time_in_cycle = delta % CYCLE_SECONDS
if 37 <= time_in_cycle <= 40: # active 4‑second window each cycle
buy_odds = self.get_latest_buy_odds()
sell_odds = self.get_latest_sell_odds()
self.allow_buy_click = buy_odds is not None and buy_odds >= ODDS_THRESHOLD
self.allow_sell_click = sell_odds is not None and sell_odds >= ODDS_THRESHOLD
status = (
f"[{now.strftime('%H:%M:%S')}] Window 37‑40s BUY: {buy_odds} {'✅' if self.allow_buy_click else '❌'} | "
f"SELL: {sell_odds} {'✅' if self.allow_sell_click else '❌'}"
)
self.label_overlay(status)
else:
self.allow_buy_click = self.allow_sell_click = False
self.label_overlay(f"Wait cycle … {int(time_in_cycle)}s")バイナリー業者向けのレイテンシー取引ツール(天国草ver2)
# Selenium ドライバ設定(port 9222)
chrome_options = Options()
chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
driver = webdriver.Chrome(options=chrome_options)
# PyChrome 設定(port 9222 使用)
try:
browser = pychrome.Browser(url="http://127.0.0.1:9222")
tabs = browser.list_tab()
if tabs:
tab = tabs[0]
tab.start()
log_queue.put("PyChrome: 接続に成功しました (port 9222)")
else:
log_queue.put("PyChrome: 利用可能なタブが見つかりません")
tab = None
except Exception as e:
log_queue.put(f"PyChrome: 接続に失敗しました: {e}")
tab = None
# WebSocket イベントハンドラ
def on_ws_created(**kwargs):
log_queue.put(f"WS created: {kwargs}")
def on_ws_frame_sent(**kwargs):
response = kwargs.get("response", {})
payload = response.get("payloadData", "")
if "livechat" in payload and "disconnect_timeout" in payload:
log_queue.put("WS: livechat の disconnect_timeout を送信")
def on_ws_frame_received(**kwargs):
pass
# PyChrome 初期化
if tab:
try:
tab.Network.enable()
tab.Network.webSocketCreated = on_ws_created
tab.Network.webSocketFrameSent = on_ws_frame_sent
tab.Network.webSocketFrameReceived = on_ws_frame_received
log_queue.put("PyChrome: Network 監視を開始")
except Exception as e:
log_queue.put(f"PyChrome: Network 設定に失敗しました: {e}")
# BO 注文送信関数(CALL / PUT)
def send_bo_order(direction, price_open):
if not tab:
log_queue.put("BO: PyChrome 未接続のため送信をスキップ")
return
try:
# CALL(BUY)と PUT(SELL)で構造は同一(direction のみ差し替え)
if direction == "CALL":
order_data = {
"command": "open_option",
"plugin": "site",
"sum": "5000",
"tool_id": "4",
"direction": direction,
"price_open": str(price_open),
"timeframe_id": "15",
"option_kind": "1"
}
else:
order_data = {
"command": "open_option",
"plugin": "site",
"sum": "5000",
"tool_id": "4",
"direction": direction,
"price_open": str(price_open),
"timeframe_id": "15",
"option_kind": "1"
}
# JavaScript 経由で WebSocket 送信
js_code = f"""
if (typeof window.boWebSocket !== 'undefined' && window.boWebSocket.readyState === WebSocket.OPEN) {{
var orderData = {json.dumps(order_data)};
window.boWebSocket.send(JSON.stringify(orderData));
console.log('BO order sent:', orderData);
true;
}} else {{
console.log('BO WebSocket connection is not available');
false;
}}
"""
result = tab.Runtime.evaluate(expression=js_code)
if result.get('result', {}).get('value'):
log_queue.put(f"BO: 注文送信成功 - {direction} @ {price_open}")
log_queue.put(f"BO: 注文詳細 - {order_data}")
else:
log_queue.put("BO: 注文送信失敗(WebSocket 未接続)")
except Exception as e:
log_queue.put(f"BO: 注文送信エラー: {e}")
# WebSocket 接続設定(BO 用): 履歴データ取得 + hook_user_status / hook_options
js_websocket_setup = """
try {
if (typeof window.boWebSocket !== 'undefined') {
window.boWebSocket.close();
}
window.boWebSocket = new WebSocket("wss://bo.zentrader.com:27017/");
window.boWebSocket.onopen = () => {
const now = Math.floor(Date.now() / 1000);
const timeEnd = now;
const timeBegin = now - 500;
// 1) connect
window.boWebSocket.send(JSON.stringify({
command: "connect",
bo: "demo",
hash: "ここにログイン情報いれてください。",
platform: "mt4",
source: "site"
}));
console.log("connect sent");
// 2) get_user_data
setTimeout(() => {
window.boWebSocket.send(JSON.stringify({ command: "get_user_data" }));
console.log("get_user_data sent");
}, 300);
// 3) hook_time
setTimeout(() => {
window.boWebSocket.send(JSON.stringify({ command: "hook_time", enable: "true" }));
console.log("hook_time sent");
}, 600);
// 4) get_cfg_trade
setTimeout(() => {
window.boWebSocket.send(JSON.stringify({ command: "get_cfg_trade", language: "ja" }));
console.log("get_cfg_trade sent");
}, 900);
// 5) get_user_settings
setTimeout(() => {
window.boWebSocket.send(JSON.stringify({ command: "get_user_settings" }));
console.log("get_user_settings sent");
}, 1200);
// 6) hook_timeframes
setTimeout(() => {
window.boWebSocket.send(JSON.stringify({
command: "hook_timeframes",
option_kind: "1",
tool_id: "4",
timeframe_id: "15",
bo: "demo",
enable: "true",
interval: "1000",
source: "site"
}));
console.log("hook_timeframes sent");
}, 1500);
// 7) get_quotes_history
setTimeout(() => {
window.boWebSocket.send(JSON.stringify({
command: "get_quotes_history",
tool_id: "4",
time_begin: timeBegin.toString(),
time_end: timeEnd.toString(),
time_size: "S1",
source: "site",
request_id: "4_500"
}));
console.log("get_quotes_history sent", {
time_begin: new Date(timeBegin * 1000).toISOString(),
time_end: new Date(timeEnd * 1000).toISOString(),
duration_sec: timeEnd - timeBegin
});
}, 1800);
// 8) hook_user_status
setTimeout(() => {
window.boWebSocket.send(JSON.stringify({ command: "hook_user_status", enable: "true" }));
console.log("hook_user_status sent");
}, 2100);
// 9) hook_options
setTimeout(() => {
window.boWebSocket.send(JSON.stringify({
command: "hook_options",
source: "site",
enable: "true",
interval: "100"
}));
console.log("hook_options sent");
}, 2400);
};
window.boWebSocket.onmessage = (event) => {
console.log("BO WS message:", event.data);
};
window.boWebSocket.onerror = (error) => {
console.log("BO WS error:", error);
};
window.boWebSocket.onclose = (event) => {
console.log("BO WS closed:", event.code, event.reason);
};
true;
} catch (error) {
console.log("BO WS setup error:", error);
false;
}
"""
# WebSocket でブラウザ価格を監視(port 9222)
def start_cdp_price_listener():
async def listen_price_from_cdp():
port = 9222
try:
response = requests.get(f'http://localhost:{port}/json')
targets = response.json()
ws_debugger_url = next(item['webSocketDebuggerUrl'] for item in targets if 'webSocketDebuggerUrl' in item)
except Exception as e:
log_queue.put(f"CDP: DevTools WebSocket URL 取得に失敗: {e}")
return
async with websockets.connect(ws_debugger_url) as websocket:
await websocket.send(json.dumps({"id": 1, "method": "Network.enable"}))
log_queue.put("CDP: DevTools WebSocket に接続。価格監視を開始 (port 9222)")
while True:
try:
message = await websocket.recv()
data = json.loads(message)
if data.get("method") == "Network.webSocketFrameReceived":
payload = data["params"]["response"].get("payloadData", "")
if payload.startswith("U,1,1,USDJPY.FXCM"):
match = re.search(r"USDJPY\\.FXCM\\|([\\d.]+)\\|", payload)
if match:
latest_price["ask"] = float(match.group(1))
except Exception as e:
log_queue.put(f"CDP: WebSocket 受信エラー: {e}")
break
asyncio.run(listen_price_from_cdp())LLMでソースコードを改造しよう
ここで紹介したソースコードが自分に合わない場合、LLMで改造しましょう。ソースコードをコピペして、指示を出すことで、プログラムの知識がなくてもコーディングできます。(この手法はバイブコーディングと呼ばれる)
利用するLLMはGoogle Geminiがおすすめ。Googleアカウントを持っていれば無料で利用できます。APIの利用料金は従量課金制ですが、チャット形式なら料金は発生しません。
LLMは入力が雑だと出力も雑になってしまいます。そのため「稼げるbotを作って」と雑に指示を出すよりも、既存のソースコードを改造してもらうほうがいいです。
使用するプログラム言語はPythonにしましょう。Pythonは世界的に利用者が多く、ウェブ上に情報がたくさんあり、表現力にも優れているため、バイブコーディングでも良質なソースコードを出力してくれます。
逆にMQL4/MQL5はマイナーな言語で、ウェブ上でも情報が少ないため、LLMでソースコードを作ってもらってもコンパイルエラーが発生しやすくなります。





