文件预览

transfer_s3.mjs

查看 Aios Transfer File 技能包中的文件内容。

文件内容

scripts/transfer_s3.mjs

#!/usr/bin/env node

import { copyFile, mkdir, writeFile } from "node:fs/promises";
import { createReadStream, createWriteStream } from "node:fs";
import path from "node:path";
import { Readable } from "node:stream";
import { pipeline } from "node:stream/promises";

const TIMESTAMP_PREFIX_RE = /^\d+_/;

function fail(message, code = 1) {
  console.error(message);
  process.exit(code);
}

function readRequiredEnv(name) {
  const value = process.env[name]?.trim();
  if (!value) {
    throw new Error(`Missing required environment variable: ${name}`);
  }
  return value;
}

function readOptionalEnv(name, fallback) {
  const value = process.env[name] ?? fallback;
  if (!value || value.trim().length === 0) {
    return undefined;
  }
  return value.trim();
}

function readBooleanEnv(name, fallback) {
  const value = process.env[name];
  if (!value || value.trim().length === 0) {
    return fallback;
  }

  const normalized = value.trim().toLowerCase();
  if (normalized === "true" || normalized === "1" || normalized === "yes") {
    return true;
  }
  if (normalized === "false" || normalized === "0" || normalized === "no") {
    return false;
  }

  throw new Error(`${name} must be one of true,false,1,0,yes,no`);
}

function ensureOptionalUrl(name, value) {
  if (!value) {
    return undefined;
  }

  try {
    const url = new URL(value);
    if (!url.protocol || !url.hostname) {
      throw new Error("missing protocol or hostname");
    }
  } catch (error) {
    const reason = error instanceof Error ? error.message : String(error);
    throw new Error(`${name} must be a valid URL: ${reason}`);
  }

  return value;
}

function parseFileInputUri(rawUri) {
  const trimmed = rawUri.split(" ", 1)[0].trim();
  const match = /^file_input:\/\/([^/]+)\/(.+)$/.exec(trimmed);
  if (!match) {
    throw new Error(`Malformed file input URI: ${rawUri}`);
  }

  const [, bucket, key] = match;
  if (!bucket || !key) {
    throw new Error(`Malformed file input URI: ${rawUri}`);
  }

  return { sourceUri: trimmed, bucket, key };
}

function makeTimestampedName(name, { replaceSpaces }) {
  const normalized = (replaceSpaces ? name.replaceAll(" ", "_") : name).replace(TIMESTAMP_PREFIX_RE, "");
  return `${Math.floor(Date.now() / 1000)}_${normalized}`;
}

function resolveOutboxEnvName() {
  if (readOptionalEnv("AIOS_S3_AGENT_OUTBOX_BUCKET")) {
    return "AIOS_S3_AGENT_OUTBOX_BUCKET";
  }
  throw new Error("Missing required environment variable: AIOS_S3_AGENT_OUTBOX_BUCKET");
}

async function loadAwsSdk() {
  try {
    return await import("@aws-sdk/client-s3");
  } catch {
    throw new Error("Missing dependency: @aws-sdk/client-s3. Run npm install in this skill directory.");
  }
}

async function createS3Client() {
  const sdk = await loadAwsSdk();
  const endpoint = ensureOptionalUrl("AIOS_S3_ENDPOINT", readOptionalEnv("AIOS_S3_ENDPOINT"));
  const region = readOptionalEnv("AIOS_S3_REGION", "us-east-1") ?? "us-east-1";
  const accessKeyId = readRequiredEnv("AIOS_S3_ACCESS_KEY_ID");
  const secretAccessKey = readRequiredEnv("AIOS_S3_SECRET_ACCESS_KEY");
  const forcePathStyle = readBooleanEnv("AIOS_S3_FORCE_PATH_STYLE", true);

  return {
    sdk,
    client: new sdk.S3Client({
      endpoint,
      region,
      forcePathStyle,
      credentials: {
        accessKeyId,
        secretAccessKey
      }
    })
  };
}

async function ensureDirectory(dirPath) {
  await mkdir(dirPath, { recursive: true });
  return dirPath;
}

async function writeBodyToFile(body, destination) {
  if (!body) {
    throw new Error("S3 object body is empty");
  }

  if (typeof body.transformToWebStream === "function") {
    await pipeline(Readable.fromWeb(body.transformToWebStream()), createWriteStream(destination));
    return;
  }

  if (typeof body.transformToByteArray === "function") {
    await writeFile(destination, Buffer.from(await body.transformToByteArray()));
    return;
  }

  if (body instanceof Readable) {
    await pipeline(body, createWriteStream(destination));
    return;
  }

  if (body instanceof Uint8Array) {
    await writeFile(destination, body);
    return;
  }

  throw new Error("Unsupported S3 object body type");
}

async function ensureBucket(clientBundle, bucket) {
  const { sdk, client } = clientBundle;
  try {
    await client.send(new sdk.HeadBucketCommand({ Bucket: bucket }));
    return;
  } catch {}

  const region = readOptionalEnv("AIOS_S3_REGION", "us-east-1") ?? "us-east-1";
  try {
    await client.send(new sdk.CreateBucketCommand(
      region === "us-east-1"
        ? { Bucket: bucket }
        : {
            Bucket: bucket,
            CreateBucketConfiguration: { LocationConstraint: region }
          }
    ));
  } catch (error) {
    if (region !== "us-east-1") {
      await client.send(new sdk.CreateBucketCommand({ Bucket: bucket }));
      return;
    }
    throw error;
  }
}

async function downloadUri(options) {
  const workspace = path.resolve(options.workspace);
  const inputDir = await ensureDirectory(path.join(workspace, options.inputDirName));
  const { sourceUri, bucket, key } = parseFileInputUri(options.uri);
  const basename = path.basename(key);
  if (!basename) {
    throw new Error(`Malformed file input URI: ${options.uri}`);
  }

  const localName = makeTimestampedName(basename, { replaceSpaces: false });
  const localPath = path.join(inputDir, localName);
  const { sdk, client } = await createS3Client();
  const response = await client.send(new sdk.GetObjectCommand({
    Bucket: bucket,
    Key: key
  }));

  await writeBodyToFile(response.Body, localPath);

  console.log(JSON.stringify({
    bucket,
    key,
    sourceUri,
    localPath,
    localName
  }));
}

async function uploadFile(options) {
  const workspace = path.resolve(options.workspace);
  const outputDir = await ensureDirectory(path.join(workspace, options.outputDirName));
  const sourcePath = path.resolve(options.source);
  const outboxEnvName = resolveOutboxEnvName();
  const bucket = readRequiredEnv(outboxEnvName);
  const stagedName = makeTimestampedName(path.basename(sourcePath), { replaceSpaces: true });
  const stagedPath = path.join(outputDir, stagedName);

  await copyFile(sourcePath, stagedPath);

  const clientBundle = await createS3Client();
  await ensureBucket(clientBundle, bucket);
  await clientBundle.client.send(new clientBundle.sdk.PutObjectCommand({
    Bucket: bucket,
    Key: stagedName,
    Body: createReadStream(stagedPath)
  }));

  const result = {
    bucket,
    key: stagedName,
    sourcePath,
    stagedPath,
    uri: `file_output://${bucket}/${stagedName}`
  };
  console.log(options.uriOnly ? result.uri : JSON.stringify(result));
}

function parseArgs(argv) {
  const args = {
    command: undefined,
    workspace: ".",
    inputDirName: "file_input",
    outputDirName: "file_output",
    uri: undefined,
    source: undefined,
    uriOnly: false
  };

  for (let index = 0; index < argv.length; index += 1) {
    const token = argv[index];
    if (token === "-h" || token === "--help") {
      printHelp();
      process.exit(0);
    }
    if (!args.command) {
      args.command = token;
      continue;
    }
    if (token === "--workspace") {
      args.workspace = argv[++index] ?? fail("Missing value for --workspace");
      continue;
    }
    if (token === "--input-dir-name") {
      args.inputDirName = argv[++index] ?? fail("Missing value for --input-dir-name");
      continue;
    }
    if (token === "--output-dir-name") {
      args.outputDirName = argv[++index] ?? fail("Missing value for --output-dir-name");
      continue;
    }
    if (token === "--uri") {
      args.uri = argv[++index] ?? fail("Missing value for --uri");
      continue;
    }
    if (token === "--source") {
      args.source = argv[++index] ?? fail("Missing value for --source");
      continue;
    }
    if (token === "--uri-only") {
      args.uriOnly = true;
      continue;
    }

    fail(`Unknown argument: ${token}`);
  }

  return args;
}

function printHelp() {
  console.log(`Usage:
  node scripts/transfer_s3.mjs download-uri --uri "file_input://bucket/path/to/file.bin" [--workspace .] [--input-dir-name file_input]
  node scripts/transfer_s3.mjs upload-file --source "/abs/path/to/file.bin" [--workspace .] [--output-dir-name file_output] [--uri-only]

Commands:
  download-uri   Download a file_input:// URI into the workspace file_input directory.
  upload-file    Copy a local file into file_output and upload it to the outbox bucket root.
`);
}

async function main() {
  const args = parseArgs(process.argv.slice(2));
  if (args.command === "download-uri") {
    if (!args.uri) {
      fail("download-uri requires --uri");
    }
    await downloadUri(args);
    return;
  }
  if (args.command === "upload-file") {
    if (!args.source) {
      fail("upload-file requires --source");
    }
    await uploadFile(args);
    return;
  }

  printHelp();
  process.exit(args.command ? 1 : 0);
}

main().catch((error) => {
  const message = error instanceof Error ? error.message : String(error);
  fail(message);
});