文件预览

manager.js

查看 Evolver 技能包中的文件内容。

文件内容

src/proxy/lifecycle/manager.js

'use strict';

const { PROXY_PROTOCOL_VERSION } = require('../mailbox/store');
const crypto = require('crypto');

const DEFAULT_HEARTBEAT_INTERVAL = 360_000;
const HELLO_TIMEOUT = 15_000;
const HEARTBEAT_TIMEOUT = 10_000;

class LifecycleManager {
  constructor({ hubUrl, store, logger, getTaskMeta } = {}) {
    this.hubUrl = (hubUrl || process.env.A2A_HUB_URL || '').replace(/\/+$/, '');
    this.store = store;
    this.logger = logger || console;
    this.getTaskMeta = getTaskMeta || null;
    this._heartbeatTimer = null;
    this._running = false;
    this._startedAt = null;
    this._consecutiveFailures = 0;
  }

  get nodeId() {
    return this.store.getState('node_id');
  }

  get nodeSecret() {
    return this.store.getState('node_secret') || process.env.A2A_NODE_SECRET || null;
  }

  _buildHeaders() {
    const headers = { 'Content-Type': 'application/json' };
    const secret = this.nodeSecret;
    if (secret) headers['x-node-secret'] = secret;
    return headers;
  }

  async hello() {
    if (!this.hubUrl) return { ok: false, error: 'no_hub_url' };

    const endpoint = `${this.hubUrl}/a2a/hello`;
    const nodeId = this.store.getState('node_id') || `node_${crypto.randomBytes(6).toString('hex')}`;

    const body = {
      protocol: 'gep-a2a',
      protocol_version: '1.0.0',
      message_type: 'hello',
      sender_id: nodeId,
      node_id: nodeId,
      capabilities: {},
      timestamp: new Date().toISOString(),
    };

    try {
      const res = await fetch(endpoint, {
        method: 'POST',
        headers: this._buildHeaders(),
        body: JSON.stringify(body),
        signal: AbortSignal.timeout(HELLO_TIMEOUT),
      });
      const data = await res.json();

      const secret = data?.payload?.node_secret || data?.node_secret || null;
      if (secret && /^[a-f0-9]{64}$/i.test(secret)) {
        this.store.setState('node_secret', secret);
      }

      this.store.setState('node_id', nodeId);
      this.logger.log(`[lifecycle] hello OK, node_id=${nodeId}`);
      return { ok: true, nodeId, response: data };
    } catch (err) {
      this.logger.error(`[lifecycle] hello failed: ${err.message}`);
      return { ok: false, error: err.message };
    }
  }

  async heartbeat() {
    if (!this.hubUrl) return { ok: false, error: 'no_hub_url' };

    const nodeId = this.nodeId;
    if (!nodeId) {
      const helloResult = await this.hello();
      if (!helloResult.ok) return helloResult;
    }

    const endpoint = `${this.hubUrl}/a2a/heartbeat`;
    const body = {
      node_id: this.nodeId,
      sender_id: this.nodeId,
      version: '1.0.0',
      uptime_ms: this._startedAt ? Date.now() - this._startedAt : 0,
      timestamp: new Date().toISOString(),
      meta: {
        proxy_version: PROXY_PROTOCOL_VERSION,
        proxy_protocol_version: PROXY_PROTOCOL_VERSION,
        outbound_pending: this.store.countPending({ direction: 'outbound' }),
        inbound_pending: this.store.countPending({ direction: 'inbound' }),
        ...(typeof this.getTaskMeta === 'function' ? this.getTaskMeta() : {}),
      },
    };

    try {
      const res = await fetch(endpoint, {
        method: 'POST',
        headers: this._buildHeaders(),
        body: JSON.stringify(body),
        signal: AbortSignal.timeout(HEARTBEAT_TIMEOUT),
      });
      const data = await res.json();

      this._consecutiveFailures = 0;
      this.store.setState('last_heartbeat_at', new Date().toISOString());

      if (data?.status === 'unknown_node') {
        this.logger.warn('[lifecycle] Node unknown, re-registering...');
        await this.hello();
      }

      if (Array.isArray(data?.events) && data.events.length > 0) {
        this.store.writeInboundBatch(
          data.events.map(e => ({
            type: e.type || 'hub_event',
            payload: e,
            channel: 'evomap-hub',
          }))
        );
      }

      if (data?.min_proxy_version && this._shouldUpgrade(data.min_proxy_version)) {
        this.store.writeInbound({
          type: 'system',
          payload: {
            action: 'proxy_upgrade_required',
            min_version: data.min_proxy_version,
            current_version: PROXY_PROTOCOL_VERSION,
            upgrade_url: data.upgrade_url || null,
            message: data.upgrade_message || 'Proxy version is below the minimum required by Hub.',
          },
          channel: 'evomap-hub',
          priority: 'high',
        });
        this.logger.warn(`[lifecycle] Hub requires proxy >= ${data.min_proxy_version}, current: ${PROXY_PROTOCOL_VERSION}`);
      }

      return { ok: true, response: data };
    } catch (err) {
      this._consecutiveFailures++;
      this.logger.error(`[lifecycle] heartbeat failed (${this._consecutiveFailures}): ${err.message}`);
      return { ok: false, error: err.message };
    }
  }

  startHeartbeatLoop(intervalMs) {
    if (this._running) return;
    this._running = true;
    this._startedAt = Date.now();

    const interval = Math.max(30_000, intervalMs || DEFAULT_HEARTBEAT_INTERVAL);

    const tick = async () => {
      if (!this._running) return;
      await this.heartbeat();
      if (this._running) {
        const backoff = this._consecutiveFailures > 0
          ? Math.min(interval * Math.pow(2, this._consecutiveFailures), 30 * 60_000)
          : interval;
        this._heartbeatTimer = setTimeout(tick, backoff);
        if (this._heartbeatTimer.unref) this._heartbeatTimer.unref();
      }
    };

    tick();
  }

  stopHeartbeatLoop() {
    this._running = false;
    if (this._heartbeatTimer) {
      clearTimeout(this._heartbeatTimer);
      this._heartbeatTimer = null;
    }
  }

  _shouldUpgrade(minVersion) {
    const parse = (v) => String(v || '0.0.0').split('.').map(Number);
    const min = parse(minVersion);
    const cur = parse(PROXY_PROTOCOL_VERSION);
    for (let i = 0; i < 3; i++) {
      if ((cur[i] || 0) < (min[i] || 0)) return true;
      if ((cur[i] || 0) > (min[i] || 0)) return false;
    }
    return false;
  }
}

module.exports = { LifecycleManager, DEFAULT_HEARTBEAT_INTERVAL };