文件内容
main.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
OpenClaw Skill: Desktop Vision & Automation
领域: 电脑视觉感知与桌面自动化操作
版本: V1.0.0
作者: OpenClaw Team
功能: 12项核心功能,实现电脑视觉感知与全功能桌面自动化
"""
import os
import sys
import time
import json
import threading
import queue
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple, Union
from dataclasses import dataclass, asdict
from enum import Enum
# 导入真实依赖库
import mss
import mss.tools
import pyautogui
import cv2
import numpy as np
from PIL import Image, ImageGrab
import pytesseract
import win32gui
import win32con
import win32api
import win32process
from pynput import mouse, keyboard
from pynput.mouse import Controller as MouseController, Button
from pynput.keyboard import Controller as KeyboardController, Key
import psutil
# 配置
pyautogui.FAILSAFE = True
pyautogui.PAUSE = 0.1
class ResultCode(Enum):
SUCCESS = 0
ERROR = -1
INVALID_PARAM = -2
NOT_FOUND = -3
PERMISSION_DENIED = -4
@dataclass
class SkillResult:
code: int
message: str
data: Any = None
def to_dict(self):
return {
"code": self.code,
"message": self.message,
"data": self.data,
"timestamp": datetime.now().isoformat()
}
class DesktopVisionAutomation:
def __init__(self):
self.sct = mss.mss()
self.mouse_ctrl = MouseController()
self.keyboard_ctrl = KeyboardController()
self.recording = False
self.recorded_actions = []
self.task_queue = queue.Queue()
self.worker_thread = None
def _validate_params(self, params: Dict, required: List[str]) -> SkillResult:
"""参数校验"""
missing = [p for p in required if p not in params]
if missing:
return SkillResult(
code=ResultCode.INVALID_PARAM.value,
message=f"缺少必要参数: {', '.join(missing)}"
)
return None
# ==================== 功能1: 多模式屏幕截图 ====================
def screenshot_full(self, params: Dict) -> SkillResult:
"""全屏截图"""
try:
output_path = params.get("output_path", f"screenshot_full_{int(time.time())}.png")
format_type = params.get("format", "PNG").upper()
quality = params.get("quality", 95)
monitor = self.sct.monitors[0]
sct_img = self.sct.grab(monitor)
if format_type == "PNG":
mss.tools.to_png(sct_img.rgb, sct_img.size, output=output_path)
else:
img = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
if format_type == "JPG":
img.save(output_path, "JPEG", quality=quality)
elif format_type == "BMP":
img.save(output_path, "BMP")
return SkillResult(
code=ResultCode.SUCCESS.value,
message="全屏截图成功",
data={
"file_path": os.path.abspath(output_path),
"width": sct_img.size[0],
"height": sct_img.size[1],
"format": format_type
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"全屏截图失败: {str(e)}"
)
def screenshot_region(self, params: Dict) -> SkillResult:
"""指定区域截图"""
err = self._validate_params(params, ["x", "y", "width", "height"])
if err:
return err
try:
x, y = params["x"], params["y"]
w, h = params["width"], params["height"]
output_path = params.get("output_path", f"screenshot_region_{int(time.time())}.png")
format_type = params.get("format", "PNG").upper()
quality = params.get("quality", 95)
monitor = {"top": y, "left": x, "width": w, "height": h}
sct_img = self.sct.grab(monitor)
if format_type == "PNG":
mss.tools.to_png(sct_img.rgb, sct_img.size, output=output_path)
else:
img = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
if format_type == "JPG":
img.save(output_path, "JPEG", quality=quality)
elif format_type == "BMP":
img.save(output_path, "BMP")
return SkillResult(
code=ResultCode.SUCCESS.value,
message="区域截图成功",
data={
"file_path": os.path.abspath(output_path),
"region": {"x": x, "y": y, "width": w, "height": h},
"format": format_type
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"区域截图失败: {str(e)}"
)
def screenshot_window(self, params: Dict) -> SkillResult:
"""指定窗口截图"""
try:
window_title = params.get("window_title", "")
hwnd = params.get("hwnd")
output_path = params.get("output_path", f"screenshot_window_{int(time.time())}.png")
if not hwnd and window_title:
hwnd = win32gui.FindWindow(None, window_title)
if not hwnd:
return SkillResult(
code=ResultCode.NOT_FOUND.value,
message="未找到指定窗口"
)
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.2)
rect = win32gui.GetWindowRect(hwnd)
x, y, x2, y2 = rect
w, h = x2 - x, y2 - y
monitor = {"top": y, "left": x, "width": w, "height": h}
sct_img = self.sct.grab(monitor)
mss.tools.to_png(sct_img.rgb, sct_img.size, output=output_path)
return SkillResult(
code=ResultCode.SUCCESS.value,
message="窗口截图成功",
data={
"file_path": os.path.abspath(output_path),
"hwnd": hwnd,
"window_rect": {"x": x, "y": y, "width": w, "height": h}
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"窗口截图失败: {str(e)}"
)
def screenshot_timed(self, params: Dict) -> SkillResult:
"""定时截图"""
err = self._validate_params(params, ["delay"])
if err:
return err
try:
delay = params["delay"]
count = params.get("count", 1)
interval = params.get("interval", 1)
output_dir = params.get("output_dir", "./timed_screenshots")
os.makedirs(output_dir, exist_ok=True)
saved_files = []
time.sleep(delay)
for i in range(count):
output_path = os.path.join(output_dir, f"timed_{i+1}_{int(time.time())}.png")
monitor = self.sct.monitors[0]
sct_img = self.sct.grab(monitor)
mss.tools.to_png(sct_img.rgb, sct_img.size, output=output_path)
saved_files.append(os.path.abspath(output_path))
if i < count - 1:
time.sleep(interval)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"定时截图完成,共{count}张",
data={"files": saved_files}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"定时截图失败: {str(e)}"
)
# ==================== 功能2: 高清屏幕录制 ====================
def screen_record(self, params: Dict) -> SkillResult:
"""屏幕录制"""
try:
mode = params.get("mode", "full") # full, region, window
fps = params.get("fps", 30)
duration = params.get("duration", 10)
output_path = params.get("output_path", f"recording_{int(time.time())}.mp4")
resolution = params.get("resolution", "1080p")
# 分辨率映射
res_map = {"1080p": (1920, 1080), "2K": (2560, 1440), "4K": (3840, 2160)}
width, height = res_map.get(resolution, (1920, 1080))
if mode == "region":
x, y = params.get("x", 0), params.get("y", 0)
w, h = params.get("width", width), params.get("height", height)
monitor = {"top": y, "left": x, "width": w, "height": h}
else:
monitor = self.sct.monitors[0]
w, h = monitor["width"], monitor["height"]
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))
start_time = time.time()
frame_count = 0
while time.time() - start_time < duration:
sct_img = self.sct.grab(monitor)
frame = np.array(sct_img)
frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
out.write(frame)
frame_count += 1
time.sleep(1.0 / fps)
out.release()
return SkillResult(
code=ResultCode.SUCCESS.value,
message="屏幕录制完成",
data={
"file_path": os.path.abspath(output_path),
"duration": duration,
"fps": fps,
"frames": frame_count,
"resolution": f"{w}x{h}"
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"屏幕录制失败: {str(e)}"
)
# ==================== 功能3: 屏幕UI元素智能识别 ====================
def detect_ui_elements(self, params: Dict) -> SkillResult:
"""UI元素识别"""
try:
region = params.get("region")
threshold = params.get("threshold", 0.8)
if region:
monitor = {"top": region["y"], "left": region["x"],
"width": region["width"], "height": region["height"]}
else:
monitor = self.sct.monitors[0]
sct_img = self.sct.grab(monitor)
img = np.array(sct_img)
gray = cv2.cvtColor(img, cv2.COLOR_BGRA2GRAY)
elements = []
# 按钮检测 (基于边缘和颜色)
edges = cv2.Canny(gray, 50, 150)
contours, _ = cv2.findContours(edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for cnt in contours:
x, y, w, h = cv2.boundingRect(cnt)
if 30 < w < 300 and 20 < h < 100:
aspect_ratio = w / h
if 1 < aspect_ratio < 8:
elements.append({
"type": "button",
"x": x + monitor["left"],
"y": y + monitor["top"],
"width": w,
"height": h,
"confidence": min(1.0, w * h / 10000)
})
# 输入框检测
for cnt in contours:
x, y, w, h = cv2.boundingRect(cnt)
if 100 < w < 800 and 25 < h < 60:
elements.append({
"type": "input",
"x": x + monitor["left"],
"y": y + monitor["top"],
"width": w,
"height": h,
"confidence": 0.85
})
# 去重和排序
seen = set()
unique_elements = []
for elem in elements:
key = (elem["x"] // 10, elem["y"] // 10, elem["type"])
if key not in seen:
seen.add(key)
unique_elements.append(elem)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"识别到{len(unique_elements)}个UI元素",
data={"elements": unique_elements[:50]}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"UI元素识别失败: {str(e)}"
)
# ==================== 功能4: 高精度桌面自动化操作 ====================
def mouse_move(self, params: Dict) -> SkillResult:
"""鼠标移动"""
err = self._validate_params(params, ["x", "y"])
if err:
return err
try:
x, y = params["x"], params["y"]
duration = params.get("duration", 0.2)
relative = params.get("relative", False)
if relative:
current = pyautogui.position()
x += current[0]
y += current[1]
pyautogui.moveTo(x, y, duration=duration)
actual_pos = pyautogui.position()
return SkillResult(
code=ResultCode.SUCCESS.value,
message="鼠标移动成功",
data={
"target": {"x": x, "y": y},
"actual": {"x": actual_pos[0], "y": actual_pos[1]},
"error": {"x": abs(actual_pos[0] - x), "y": abs(actual_pos[1] - y)}
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"鼠标移动失败: {str(e)}"
)
def mouse_click(self, params: Dict) -> SkillResult:
"""鼠标点击"""
try:
x = params.get("x")
y = params.get("y")
button = params.get("button", "left") # left, right, middle
clicks = params.get("clicks", 1)
interval = params.get("interval", 0.1)
if x is not None and y is not None:
pyautogui.click(x, y, clicks=clicks, interval=interval, button=button)
else:
pyautogui.click(clicks=clicks, interval=interval, button=button)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"鼠标{button}键点击{clicks}次成功",
data={"position": {"x": x, "y": y} if x else "current"}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"鼠标点击失败: {str(e)}"
)
def mouse_drag(self, params: Dict) -> SkillResult:
"""鼠标拖拽"""
err = self._validate_params(params, ["start_x", "start_y", "end_x", "end_y"])
if err:
return err
try:
sx, sy = params["start_x"], params["start_y"]
ex, ey = params["end_x"], params["end_y"]
duration = params.get("duration", 0.5)
pyautogui.moveTo(sx, sy)
pyautogui.dragTo(ex, ey, duration=duration, button="left")
return SkillResult(
code=ResultCode.SUCCESS.value,
message="鼠标拖拽成功",
data={
"start": {"x": sx, "y": sy},
"end": {"x": ex, "y": ey}
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"鼠标拖拽失败: {str(e)}"
)
def mouse_scroll(self, params: Dict) -> SkillResult:
"""鼠标滚轮"""
err = self._validate_params(params, ["clicks"])
if err:
return err
try:
clicks = params["clicks"]
pyautogui.scroll(clicks)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"滚轮滚动{clicks}次成功",
data={"clicks": clicks}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"滚轮滚动失败: {str(e)}"
)
def keyboard_type(self, params: Dict) -> SkillResult:
"""键盘输入文本"""
err = self._validate_params(params, ["text"])
if err:
return err
try:
text = params["text"]
interval = params.get("interval", 0.01)
pyautogui.typewrite(text, interval=interval)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"输入文本成功: {text}",
data={"text_length": len(text)}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"键盘输入失败: {str(e)}"
)
def keyboard_hotkey(self, params: Dict) -> SkillResult:
"""快捷键组合"""
err = self._validate_params(params, ["keys"])
if err:
return err
try:
keys = params["keys"]
pyautogui.hotkey(*keys)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"快捷键执行成功: {'+'.join(keys)}",
data={"keys": keys}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"快捷键执行失败: {str(e)}"
)
# ==================== 功能5: 屏幕OCR文字识别 ====================
def ocr_screen(self, params: Dict) -> SkillResult:
"""屏幕OCR识别"""
try:
region = params.get("region")
lang = params.get("lang", "chi_sim+eng")
output_format = params.get("output", "json")
if region:
monitor = {"top": region["y"], "left": region["x"],
"width": region["width"], "height": region["height"]}
sct_img = self.sct.grab(monitor)
else:
monitor = self.sct.monitors[0]
sct_img = self.sct.grab(monitor)
img = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX")
# 获取详细OCR数据
ocr_data = pytesseract.image_to_data(img, lang=lang, output_type=pytesseract.Output.DICT)
results = []
for i in range(len(ocr_data["text"])):
if ocr_data["text"][i].strip():
results.append({
"text": ocr_data["text"][i],
"confidence": ocr_data["conf"][i] / 100.0,
"x": ocr_data["left"][i] + (monitor["left"] if region else 0),
"y": ocr_data["top"][i] + (monitor["top"] if region else 0),
"width": ocr_data["width"][i],
"height": ocr_data["height"][i]
})
full_text = " ".join([r["text"] for r in results])
# 导出
if output_format == "txt":
txt_path = f"ocr_result_{int(time.time())}.txt"
with open(txt_path, "w", encoding="utf-8") as f:
f.write(full_text)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"OCR识别完成,共{len(results)}个文本块",
data={
"full_text": full_text,
"details": results,
"count": len(results)
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"OCR识别失败: {str(e)}"
)
# ==================== 功能6: 屏幕目标图像匹配 ====================
def image_match(self, params: Dict) -> SkillResult:
"""图像模板匹配"""
err = self._validate_params(params, ["template_path"])
if err:
return err
try:
template_path = params["template_path"]
threshold = params.get("threshold", 0.8)
max_matches = params.get("max_matches", 5)
if not os.path.exists(template_path):
return SkillResult(
code=ResultCode.NOT_FOUND.value,
message=f"模板图片不存在: {template_path}"
)
# 读取模板
template = cv2.imread(template_path, 0)
h, w = template.shape
# 截取屏幕
monitor = self.sct.monitors[0]
sct_img = self.sct.grab(monitor)
screen_gray = cv2.cvtColor(np.array(sct_img), cv2.COLOR_BGRA2GRAY)
# 模板匹配
result = cv2.matchTemplate(screen_gray, template, cv2.TM_CCOEFF_NORMED)
locations = np.where(result >= threshold)
matches = []
for pt in zip(*locations[::-1]):
matches.append({
"x": pt[0],
"y": pt[1],
"width": w,
"height": h,
"center_x": pt[0] + w // 2,
"center_y": pt[1] + h // 2,
"confidence": float(result[pt[1]][pt[0]])
})
if len(matches) >= max_matches:
break
# 非极大值抑制去重
matches.sort(key=lambda x: x["confidence"], reverse=True)
final_matches = []
for m in matches:
duplicate = False
for fm in final_matches:
if abs(m["x"] - fm["x"]) < 20 and abs(m["y"] - fm["y"]) < 20:
duplicate = True
break
if not duplicate:
final_matches.append(m)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"图像匹配完成,找到{len(final_matches)}个匹配",
data={"matches": final_matches}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"图像匹配失败: {str(e)}"
)
# ==================== 功能7: 系统窗口全功能管理 ====================
def get_windows(self, params: Dict) -> SkillResult:
"""获取所有窗口列表"""
try:
windows = []
def callback(hwnd, extra):
if win32gui.IsWindowVisible(hwnd):
title = win32gui.GetWindowText(hwnd)
if title:
rect = win32gui.GetWindowRect(hwnd)
windows.append({
"hwnd": hwnd,
"title": title,
"rect": {"x": rect[0], "y": rect[1], "width": rect[2]-rect[0], "height": rect[3]-rect[1]},
"is_minimized": bool(win32gui.IsIconic(hwnd)),
"is_foreground": hwnd == win32gui.GetForegroundWindow()
})
win32gui.EnumWindows(callback, None)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"获取到{len(windows)}个窗口",
data={"windows": windows}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"获取窗口列表失败: {str(e)}"
)
def window_activate(self, params: Dict) -> SkillResult:
"""激活窗口"""
try:
hwnd = params.get("hwnd")
title = params.get("title")
if not hwnd and title:
hwnd = win32gui.FindWindow(None, title)
if not hwnd:
return SkillResult(
code=ResultCode.NOT_FOUND.value,
message="未找到指定窗口"
)
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE)
win32gui.SetForegroundWindow(hwnd)
return SkillResult(
code=ResultCode.SUCCESS.value,
message="窗口激活成功",
data={"hwnd": hwnd}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"窗口激活失败: {str(e)}"
)
def window_move(self, params: Dict) -> SkillResult:
"""移动窗口"""
err = self._validate_params(params, ["hwnd", "x", "y"])
if err:
return err
try:
hwnd = params["hwnd"]
x, y = params["x"], params["y"]
width = params.get("width")
height = params.get("height")
rect = win32gui.GetWindowRect(hwnd)
if not width:
width = rect[2] - rect[0]
if not height:
height = rect[3] - rect[1]
win32gui.MoveWindow(hwnd, x, y, width, height, True)
return SkillResult(
code=ResultCode.SUCCESS.value,
message="窗口移动成功",
data={"position": {"x": x, "y": y, "width": width, "height": height}}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"窗口移动失败: {str(e)}"
)
def window_state(self, params: Dict) -> SkillResult:
"""窗口状态控制(最小化/最大化/关闭)"""
err = self._validate_params(params, ["hwnd", "action"])
if err:
return err
try:
hwnd = params["hwnd"]
action = params["action"] # minimize, maximize, restore, close, topmost
action_map = {
"minimize": win32con.SW_MINIMIZE,
"maximize": win32con.SW_MAXIMIZE,
"restore": win32con.SW_RESTORE,
"close": None
}
if action == "close":
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
elif action == "topmost":
win32gui.SetWindowPos(hwnd, win32con.HWND_TOPMOST, 0, 0, 0, 0,
win32con.SWP_NOMOVE | win32con.SWP_NOSIZE)
else:
win32gui.ShowWindow(hwnd, action_map.get(action, win32con.SW_RESTORE))
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"窗口{action}操作成功",
data={"hwnd": hwnd}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"窗口操作失败: {str(e)}"
)
# ==================== 功能8: 屏幕颜色拾取与分析 ====================
def get_pixel_color(self, params: Dict) -> SkillResult:
"""获取指定坐标颜色"""
err = self._validate_params(params, ["x", "y"])
if err:
return err
try:
x, y = params["x"], params["y"]
# 使用mss精确取色
monitor = {"top": y, "left": x, "width": 1, "height": 1}
sct_img = self.sct.grab(monitor)
b, g, r = sct_img.pixel(0, 0)
# RGB转HEX
hex_color = f"#{r:02x}{g:02x}{b:02x}"
# RGB转HSV
r_norm, g_norm, b_norm = r/255.0, g/255.0, b/255.0
cmax = max(r_norm, g_norm, b_norm)
cmin = min(r_norm, g_norm, b_norm)
diff = cmax - cmin
if diff == 0:
h = 0
elif cmax == r_norm:
h = (60 * ((g_norm - b_norm) / diff) + 360) % 360
elif cmax == g_norm:
h = (60 * ((b_norm - r_norm) / diff) + 120) % 360
else:
h = (60 * ((r_norm - g_norm) / diff) + 240) % 360
s = 0 if cmax == 0 else (diff / cmax) * 100
v = cmax * 100
return SkillResult(
code=ResultCode.SUCCESS.value,
message="颜色拾取成功",
data={
"position": {"x": x, "y": y},
"rgb": {"r": r, "g": g, "b": b},
"hex": hex_color,
"hsv": {"h": round(h), "s": round(s), "v": round(v)}
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"颜色拾取失败: {str(e)}"
)
# ==================== 功能9: 屏幕变化智能检测 ====================
def detect_screen_change(self, params: Dict) -> SkillResult:
"""检测屏幕变化"""
try:
region = params.get("region")
threshold = params.get("threshold", 0.05)
duration = params.get("duration", 5)
check_interval = params.get("interval", 0.5)
if region:
monitor = {"top": region["y"], "left": region["x"],
"width": region["width"], "height": region["height"]}
else:
monitor = self.sct.monitors[0]
# 基准图像
base_img = np.array(self.sct.grab(monitor))
base_gray = cv2.cvtColor(base_img, cv2.COLOR_BGRA2GRAY)
start_time = time.time()
changes = []
while time.time() - start_time < duration:
current_img = np.array(self.sct.grab(monitor))
current_gray = cv2.cvtColor(current_img, cv2.COLOR_BGRA2GRAY)
# 计算差异
diff = cv2.absdiff(base_gray, current_gray)
change_ratio = np.sum(diff > 30) / diff.size
if change_ratio > threshold:
changes.append({
"timestamp": time.time() - start_time,
"change_ratio": change_ratio,
"detected": True
})
# 更新基准
base_gray = current_gray.copy()
time.sleep(check_interval)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"屏幕变化检测完成,检测到{len(changes)}次变化",
data={"changes": changes, "total_changes": len(changes)}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"屏幕变化检测失败: {str(e)}"
)
# ==================== 功能10: 键鼠操作录制与回放 ====================
def start_recording(self, params: Dict) -> SkillResult:
"""开始录制键鼠操作"""
try:
self.recorded_actions = []
self.recording = True
self.start_time = time.time()
def on_move(x, y):
if self.recording:
self.recorded_actions.append({
"type": "mouse_move",
"x": x, "y": y,
"time": time.time() - self.start_time
})
def on_click(x, y, button, pressed):
if self.recording:
self.recorded_actions.append({
"type": "mouse_click",
"x": x, "y": y,
"button": str(button),
"pressed": pressed,
"time": time.time() - self.start_time
})
def on_scroll(x, y, dx, dy):
if self.recording:
self.recorded_actions.append({
"type": "mouse_scroll",
"x": x, "y": y,
"dx": dx, "dy": dy,
"time": time.time() - self.start_time
})
def on_press(key):
if self.recording:
try:
key_str = key.char
except:
key_str = str(key)
self.recorded_actions.append({
"type": "key_press",
"key": key_str,
"time": time.time() - self.start_time
})
self.mouse_listener = mouse.Listener(on_move=on_move, on_click=on_click, on_scroll=on_scroll)
self.keyboard_listener = keyboard.Listener(on_press=on_press)
self.mouse_listener.start()
self.keyboard_listener.start()
return SkillResult(
code=ResultCode.SUCCESS.value,
message="开始录制键鼠操作",
data={"status": "recording"}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"开始录制失败: {str(e)}"
)
def stop_recording(self, params: Dict) -> SkillResult:
"""停止录制"""
try:
self.recording = False
if hasattr(self, 'mouse_listener'):
self.mouse_listener.stop()
if hasattr(self, 'keyboard_listener'):
self.keyboard_listener.stop()
save_path = params.get("save_path", f"action_script_{int(time.time())}.json")
with open(save_path, "w", encoding="utf-8") as f:
json.dump(self.recorded_actions, f, indent=2, ensure_ascii=False)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"录制完成,共{len(self.recorded_actions)}个动作",
data={
"action_count": len(self.recorded_actions),
"save_path": os.path.abspath(save_path)
}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"停止录制失败: {str(e)}"
)
def playback_actions(self, params: Dict) -> SkillResult:
"""回放操作脚本"""
err = self._validate_params(params, ["script_path"])
if err:
return err
try:
script_path = params["script_path"]
speed = params.get("speed", 1.0)
loops = params.get("loops", 1)
with open(script_path, "r", encoding="utf-8") as f:
actions = json.load(f)
for loop in range(loops):
last_time = 0
for action in actions:
delay = (action["time"] - last_time) / speed
if delay > 0:
time.sleep(delay)
last_time = action["time"]
if action["type"] == "mouse_move":
pyautogui.moveTo(action["x"], action["y"], duration=0)
elif action["type"] == "mouse_click":
if action["pressed"]:
btn = "left" if "left" in action["button"] else "right"
pyautogui.click(action["x"], action["y"], button=btn)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"回放完成,共{loops}轮,{len(actions)}个动作",
data={"actions_count": len(actions), "loops": loops}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"回放失败: {str(e)}"
)
# ==================== 功能11: 屏幕内容智能搜索 ====================
def search_text_on_screen(self, params: Dict) -> SkillResult:
"""在屏幕上搜索文字"""
err = self._validate_params(params, ["search_text"])
if err:
return err
try:
search_text = params["search_text"].lower()
fuzzy = params.get("fuzzy", True)
ocr_result = self.ocr_screen({})
if ocr_result.code != 0:
return ocr_result
matches = []
for item in ocr_result.data["details"]:
text = item["text"].lower()
if fuzzy:
if search_text in text:
matches.append(item)
else:
if search_text == text:
matches.append(item)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"找到{len(matches)}个匹配项",
data={"matches": matches, "search_text": search_text}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"文字搜索失败: {str(e)}"
)
# ==================== 功能12: 批量自动化任务 ====================
def add_batch_task(self, params: Dict) -> SkillResult:
"""添加批量任务到队列"""
err = self._validate_params(params, ["tasks"])
if err:
return err
try:
tasks = params["tasks"]
for task in tasks:
self.task_queue.put(task)
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"已添加{len(tasks)}个任务到队列",
data={"queue_size": self.task_queue.qsize()}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"添加任务失败: {str(e)}"
)
def execute_batch_tasks(self, params: Dict) -> SkillResult:
"""执行批量任务队列"""
try:
results = []
count = 0
while not self.task_queue.empty():
task = self.task_queue.get()
func_name = task.get("function")
task_params = task.get("params", {})
if hasattr(self, func_name):
func = getattr(self, func_name)
result = func(task_params)
results.append({
"task": func_name,
"result": result.to_dict()
})
count += 1
return SkillResult(
code=ResultCode.SUCCESS.value,
message=f"批量执行完成,共{count}个任务",
data={"results": results}
)
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"批量执行失败: {str(e)}"
)
# OpenClaw Skill 标准入口
def execute(action: str, params: Dict) -> Dict:
"""
OpenClaw Skill 标准接口
Args:
action: 功能名称
params: 参数字典
Returns:
标准JSON结果
"""
skill = DesktopVisionAutomation()
action_map = {
# 功能1: 截图
"screenshot_full": skill.screenshot_full,
"screenshot_region": skill.screenshot_region,
"screenshot_window": skill.screenshot_window,
"screenshot_timed": skill.screenshot_timed,
# 功能2: 录屏
"screen_record": skill.screen_record,
# 功能3: UI识别
"detect_ui_elements": skill.detect_ui_elements,
# 功能4: 自动化操作
"mouse_move": skill.mouse_move,
"mouse_click": skill.mouse_click,
"mouse_drag": skill.mouse_drag,
"mouse_scroll": skill.mouse_scroll,
"keyboard_type": skill.keyboard_type,
"keyboard_hotkey": skill.keyboard_hotkey,
# 功能5: OCR
"ocr_screen": skill.ocr_screen,
# 功能6: 图像匹配
"image_match": skill.image_match,
# 功能7: 窗口管理
"get_windows": skill.get_windows,
"window_activate": skill.window_activate,
"window_move": skill.window_move,
"window_state": skill.window_state,
# 功能8: 颜色拾取
"get_pixel_color": skill.get_pixel_color,
# 功能9: 变化检测
"detect_screen_change": skill.detect_screen_change,
# 功能10: 录制回放
"start_recording": skill.start_recording,
"stop_recording": skill.stop_recording,
"playback_actions": skill.playback_actions,
# 功能11: 内容搜索
"search_text_on_screen": skill.search_text_on_screen,
# 功能12: 批量任务
"add_batch_task": skill.add_batch_task,
"execute_batch_tasks": skill.execute_batch_tasks
}
if action not in action_map:
return SkillResult(
code=ResultCode.INVALID_PARAM.value,
message=f"未知的操作: {action}"
).to_dict()
try:
result = action_map[action](params)
return result.to_dict()
except Exception as e:
return SkillResult(
code=ResultCode.ERROR.value,
message=f"执行异常: {str(e)}"
).to_dict()
if __name__ == "__main__":
# 测试入口
print("Desktop Vision & Automation Skill V1.0.0")
print("可用功能:")
print(" - screenshot_full: 全屏截图")
print(" - mouse_move: 鼠标移动")
print(" - keyboard_type: 键盘输入")
print(" - ocr_screen: 屏幕OCR识别")
print(" - get_windows: 获取窗口列表")