文件预览

inbound.js

查看 Evolver 技能包中的文件内容。

文件内容

src/proxy/sync/inbound.js

'use strict';

const { PROXY_PROTOCOL_VERSION } = require('../mailbox/store');

const DEFAULT_POLL_INTERVAL_ACTIVE = 10_000;
const DEFAULT_POLL_INTERVAL_IDLE = 60_000;

class InboundSync {
  constructor({ store, hubUrl, getHeaders, logger }) {
    this.store = store;
    this.hubUrl = hubUrl;
    this.logger = logger || console;
    this.getHeaders = getHeaders;
  }

  async pull(channel = 'evomap-hub', limit = 50) {
    const cursorKey = `${channel}:inbound_cursor`;
    const cursor = this.store.getCursor(cursorKey);

    const endpoint = `${this.hubUrl}/a2a/mailbox/inbound`;

    try {
      const res = await fetch(endpoint, {
        method: 'POST',
        headers: this.getHeaders(),
        body: JSON.stringify({ proxy_protocol_version: PROXY_PROTOCOL_VERSION, cursor, limit }),
        signal: AbortSignal.timeout(35_000),
      });

      if (!res.ok) {
        const errText = await res.text().catch(() => 'unknown');
        throw new Error(`Hub returned ${res.status}: ${errText}`);
      }

      const data = await res.json();
      const messages = data.messages || [];

      if (messages.length > 0) {
        this.store.writeInboundBatch(
          messages.map(m => ({
            id: m.id,
            type: m.type,
            payload: m.payload,
            channel,
            priority: m.priority || 'normal',
            refId: m.ref_id,
            expiresAt: m.expires_at,
          }))
        );
      }

      if (data.next_cursor) {
        this.store.setCursor(cursorKey, data.next_cursor);
      }

      return { received: messages.length, cursor: data.next_cursor || cursor };
    } catch (err) {
      this.logger.error(`[inbound] pull failed: ${err.message}`);
      return { received: 0, error: err.message };
    }
  }

  async ackDelivered(channel = 'evomap-hub') {
    const delivered = this.store.list({
      type: '%',
      direction: 'inbound',
      status: 'delivered',
      limit: 100,
    }).filter(m => m.channel === channel);

    if (delivered.length === 0) return { acked: 0 };

    const endpoint = `${this.hubUrl}/a2a/mailbox/ack`;

    try {
      await fetch(endpoint, {
        method: 'POST',
        headers: this.getHeaders(),
        body: JSON.stringify({ message_ids: delivered.map(m => m.id) }),
        signal: AbortSignal.timeout(10_000),
      });
      return { acked: delivered.length };
    } catch (err) {
      this.logger.error(`[inbound] ack failed: ${err.message}`);
      return { acked: 0, error: err.message };
    }
  }
}

module.exports = { InboundSync, DEFAULT_POLL_INTERVAL_ACTIVE, DEFAULT_POLL_INTERVAL_IDLE };