<?php
/**
 * Account API v1 (leech only) + SQLite source of truth.
 *
 * Auth: TEMP pass via ?pass=getlinkv6
 * DB: data/api_main.sqlite
 *
 * submit_link:
 *  - create job
 *  - try API engine (api_core/engine/current) to run leech() directly
 *  - fallback to web-submit (POST ?id=leech) if engine fails
 *
 * query_status/get_result:
 *  - sync status from data/files/*.dat
 */

header('Content-Type: application/json; charset=utf-8');
header('Cache-Control: no-store');

date_default_timezone_set('Asia/Ho_Chi_Minh');

const API_FALLBACK_PASS = 'getlinkv6';
// Admin pass (can be rotated separately later).
const API_ADMIN_PASS = 'demo21@21k9';
// Bridge: allow newer stubs to pass GL_ACCOUNT_DIR while core continues using API_ACCOUNT_DIR.
if (defined('GL_ACCOUNT_DIR') && !defined('API_ACCOUNT_DIR')) {
  define('API_ACCOUNT_DIR', GL_ACCOUNT_DIR);
}

// Account directory is provided by per-account stub api.php.
// Fallback to SCRIPT_FILENAME for safety (e.g. if api_core/api.php is called directly).
$__accDir = defined('API_ACCOUNT_DIR') ? API_ACCOUNT_DIR : dirname((string)($_SERVER['SCRIPT_FILENAME'] ?? __FILE__));
if (!defined('API_ACCOUNT_DIR')) define('API_ACCOUNT_DIR', $__accDir);
define('API_DATA_DIR', API_ACCOUNT_DIR . '/data');
define('API_SQLITE_PATH', API_ACCOUNT_DIR . '/data/api_main.sqlite');
define('API_TRAFFIC_SQLITE_PATH', API_ACCOUNT_DIR . '/data/traffic.sqlite');
define('API_COOKIE_JAR', API_ACCOUNT_DIR . '/data/api_cookiejar.txt');
// Derive base URL dynamically (DO NOT hardcode per-account URL).
$__proto = (string)($_SERVER['HTTP_X_FORWARDED_PROTO'] ?? $_SERVER['HTTP_X_FORWARDED_PROTOCOL'] ?? '');
$__https = (!empty($_SERVER['HTTPS']) && $_SERVER['HTTPS'] !== 'off');
$__scheme = $__proto !== '' ? explode(',', $__proto)[0] : ($__https ? 'https' : 'http');
$__host = (string)($_SERVER['HTTP_HOST'] ?? $_SERVER['SERVER_NAME'] ?? 'localhost');
$__path = rtrim(str_replace('\\', '/', dirname((string)($_SERVER['SCRIPT_NAME'] ?? '/'))), '/');
$__base = $__scheme . '://' . $__host . ($__path !== '' ? $__path : '') . '/';
define('API_BASE_URL', $__base);
define('API_SELF_URL', API_BASE_URL . 'index.php');

function json_out($arr, int $code = 200): void {
  http_response_code($code);
  echo json_encode($arr, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES);
  exit;
}

/**
 * RD-like error output.
 * - error_code: int (Real-Debrid style)
 * - error_code_str: string (internal/legacy)
 * - retry_after_sec + Retry-After header for 429
 */
function json_error(int $httpCode, int $rdErrorCode, string $error, string $errorCodeStr, array $extra = [], ?int $retryAfterSec = null): void {
  if ($retryAfterSec !== null) {
    header('Retry-After: ' . (string)$retryAfterSec);
    $extra['retry_after_sec'] = $retryAfterSec;
  }
  $payload = array_merge([
    'ok' => false,
    'error' => $error,
    'error_code' => $rdErrorCode,
    'error_code_str' => $errorCodeStr,
  ], $extra);
  json_out($payload, $httpCode);
}

function require_pass(): void {
  $pass = $_GET['pass'] ?? ($_SERVER['HTTP_X_GL_PASS'] ?? '');
  if (!hash_equals(API_FALLBACK_PASS, (string)$pass)) {
    json_error(401, 9, 'Permission denied (invalid pass)', 'AUTH_INVALID');
  }


function require_admin(): void {
  // Admin auth: prefer header X-GL-ADMIN-PASS; fallback to ?admin_pass=...
  $pass = (string)($_SERVER['HTTP_X_GL_ADMIN_PASS'] ?? ($_GET['admin_pass'] ?? ''));
  if (!hash_equals(API_ADMIN_PASS, $pass)) {
    json_error(401, 9, 'Permission denied (admin)', 'AUTH_ADMIN_INVALID');
  }
}
}

function ensure_jobs_schema(PDO $pdo): void {
  // Backfill/migrate columns for older DBs (SQLite has no IF NOT EXISTS for ADD COLUMN).
  $cols = $pdo->query("PRAGMA table_info('jobs')")->fetchAll(PDO::FETCH_ASSOC);
  $have = [];
  foreach ($cols as $c) $have[$c['name']] = true;
  $need = [
    // list/quota fields (mirror legacy .dat)
    'norm_url' => 'TEXT',
    'job_type' => 'TEXT',
    'client_ip' => 'TEXT',
    'client_network' => 'TEXT',
    'owner' => 'TEXT',
    'showfile' => 'INTEGER',
    'releech' => 'INTEGER',
    'speed' => 'TEXT',
    'meta_json' => 'TEXT',

    // existing fields
    'direct_url_internal' => 'TEXT',
    'direct_url_pve' => 'TEXT',
    'direct_url_public' => 'TEXT',
    'direct_url_public_updated_at' => 'INTEGER',
    'direct_url_source' => 'TEXT',
    'mng_hash' => 'TEXT',
    'legacy_hash' => 'TEXT',
    'mng_last_db_check_at' => 'INTEGER',
    'mng_last_poll_at' => 'INTEGER',
    'mng_last_status' => 'TEXT',
    'mng_last_direct_url' => 'TEXT',
    'mng_poll_fail_count' => 'INTEGER'
  ];
  foreach ($need as $name => $type) {
    if (!isset($have[$name])) {
      $pdo->exec("ALTER TABLE jobs ADD COLUMN {$name} {$type}");
    }
  }
}

function db(): PDO {
  static $pdo = null;
  if ($pdo instanceof PDO) {
    ensure_jobs_schema($pdo);
    return $pdo;
  }
  if (!is_dir(API_DATA_DIR)) @mkdir(API_DATA_DIR, 0775, true);
  $pdo = new PDO('sqlite:' . API_SQLITE_PATH);
  $pdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  $pdo->exec('PRAGMA journal_mode=WAL;');
  $pdo->exec('PRAGMA synchronous=NORMAL;');

  $pdo->exec('CREATE TABLE IF NOT EXISTS jobs (
    job_id TEXT PRIMARY KEY,
    origin_url TEXT NOT NULL,
    norm_url TEXT,
    created_at INTEGER NOT NULL,
    updated_at INTEGER NOT NULL,
    status TEXT NOT NULL,
    status_code TEXT,
    job_type TEXT,
    host TEXT,
    filename TEXT,
    size_text TEXT,
    size_bytes INTEGER,
    bw_charge_bytes INTEGER,
    mtime INTEGER,
    path TEXT,
    hash TEXT,
    direct_url_latest TEXT,
    direct_url_updated_at INTEGER,
    direct_url_source TEXT,
    direct_url_internal TEXT,
    direct_url_pve TEXT,
    direct_url_public TEXT,
    direct_url_public_updated_at INTEGER,
    client_ip TEXT,
    client_network TEXT,
    owner TEXT,
    showfile INTEGER,
    releech INTEGER,
    speed TEXT,
    meta_json TEXT,
    source_file TEXT,
    source_key TEXT,
    mng_hash TEXT,
    legacy_hash TEXT,
    mng_last_db_check_at INTEGER,
    mng_last_poll_at INTEGER,
    mng_last_status TEXT,
    mng_last_direct_url TEXT,
    mng_poll_fail_count INTEGER
  );');
  $pdo->exec('CREATE INDEX IF NOT EXISTS idx_jobs_updated ON jobs(updated_at);');
  $pdo->exec('CREATE INDEX IF NOT EXISTS idx_jobs_created ON jobs(created_at);');
  $pdo->exec('CREATE INDEX IF NOT EXISTS idx_jobs_host_created ON jobs(host,created_at);');
  $pdo->exec('CREATE INDEX IF NOT EXISTS idx_jobs_norm_created ON jobs(norm_url,created_at);');
  $pdo->exec('CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status);');

  $pdo->exec('CREATE TABLE IF NOT EXISTS leech_entries (
    entry_id TEXT PRIMARY KEY,
    mtime INTEGER,
    host TEXT,
    origin_url TEXT,
    status_code TEXT,
    filename TEXT,
    size_text TEXT,
    size_bytes INTEGER,
    bw_charge_bytes INTEGER,
    path TEXT,
    hash TEXT,
    source_file TEXT,
    source_key TEXT
  );');
  $pdo->exec('CREATE INDEX IF NOT EXISTS idx_entries_mtime ON leech_entries(mtime);');
  $pdo->exec('CREATE INDEX IF NOT EXISTS idx_entries_host_mtime ON leech_entries(host,mtime);');

  // Global rate limit (per-account)
  $pdo->exec('CREATE TABLE IF NOT EXISTS ratelimit (
    k TEXT PRIMARY KEY,
    window_start INTEGER,
    count INTEGER
  );');

  ensure_jobs_schema($pdo);

  return $pdo;
}

function day_key_for_ts(int $ts): string {
  // YYYY-MM-DD in Asia/Ho_Chi_Minh (timezone already set globally)
  return date('d.m.Y', $ts);
}


function traffic_next_day_key(string $dayKey): string {
  // dayKey format: dd.mm.YYYY
  $dt = DateTime::createFromFormat('d.m.Y', $dayKey, new DateTimeZone('Asia/Ho_Chi_Minh'));
  if (!$dt) return date('d.m.Y', strtotime('tomorrow'));
  $dt->modify('+1 day');
  return $dt->format('d.m.Y');
}

function traffic_record_debt_inc_after_done(PDO $t, string $dayKey, string $hostCanon, int $usedBeforeBytes, int $usedAfterBytes): void {
  // Boss policy: debt = (used - daily_soft) carry to tomorrow; record incremental debt only.
  try {
    $cfg = load_snapshot_config();
    $q = traffic_quota_bytes_for($t, $cfg, $dayKey, 'host', $hostCanon);
    $dailySoft = (int)($q['daily_bytes'] ?? 0);
    if ($dailySoft <= 0) return;

    $overBefore = $usedBeforeBytes - $dailySoft;
    if ($overBefore < 0) $overBefore = 0;
    $overAfter = $usedAfterBytes - $dailySoft;
    if ($overAfter < 0) $overAfter = 0;
    $debtInc = $overAfter - $overBefore;
    if ($debtInc <= 0) return;

    $tomorrowKey = traffic_next_day_key($dayKey);
    traffic_daily_adjust_add($t, $tomorrowKey, $hostCanon, (int)$debtInc, 0, 'debt_from_'.$dayKey, 'api');
  } catch (Throwable $e) {
    // ignore
  }
}

function traffic_db(): PDO {
  static $t = null;
  if ($t instanceof PDO) return $t;
  if (!is_dir(API_DATA_DIR)) @mkdir(API_DATA_DIR, 0775, true);
  $t = new PDO('sqlite:' . API_TRAFFIC_SQLITE_PATH);
  $t->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
  $t->exec('PRAGMA journal_mode=WAL;');
  $t->exec('PRAGMA synchronous=NORMAL;');
  $t->exec('PRAGMA busy_timeout=800;');
  ensure_traffic_schema($t);
  return $t;
}

function ensure_traffic_schema(PDO $t): void {
  $t->exec('CREATE TABLE IF NOT EXISTS traffic_meta (k TEXT PRIMARY KEY, v TEXT);');

  $t->exec('CREATE TABLE IF NOT EXISTS traffic_seen (
    day_key TEXT NOT NULL,
    host TEXT NOT NULL,
    norm_url TEXT NOT NULL,
    size_bytes INTEGER NOT NULL DEFAULT 0,
    first_seen_at INTEGER NOT NULL,
    source TEXT,
    ref_id TEXT,
    PRIMARY KEY(day_key, host, norm_url)
  );');
  $t->exec('CREATE INDEX IF NOT EXISTS idx_seen_day_host ON traffic_seen(day_key, host);');

  $t->exec('CREATE TABLE IF NOT EXISTS traffic_ledger (
    day_key TEXT NOT NULL,
    host TEXT NOT NULL,
    norm_url TEXT NOT NULL,
    size_known INTEGER NOT NULL DEFAULT 0,
    size_bytes INTEGER NOT NULL DEFAULT 0,
    reserve_bytes INTEGER NOT NULL DEFAULT 0,
    final_bytes INTEGER NOT NULL DEFAULT 0,
    status_code TEXT,
    status_code_num INTEGER,
    state TEXT,
    job_id_last TEXT,
    reserve_until_ts INTEGER NOT NULL DEFAULT 0,
    needs_reconcile INTEGER NOT NULL DEFAULT 1,
    created_at INTEGER NOT NULL,
    updated_at INTEGER NOT NULL,
    PRIMARY KEY(day_key, host, norm_url)
  );');
  $t->exec('CREATE INDEX IF NOT EXISTS idx_ledger_day_host ON traffic_ledger(day_key, host);');
  $t->exec('CREATE INDEX IF NOT EXISTS idx_ledger_reconcile ON traffic_ledger(needs_reconcile, day_key);');

  // Backfill new columns for older DBs
  try { $t->exec("ALTER TABLE traffic_ledger ADD COLUMN status_code_num INTEGER"); } catch (Throwable $e) {}


  $t->exec('CREATE TABLE IF NOT EXISTS traffic_quota_override (
    scope TEXT NOT NULL,
    host TEXT NOT NULL,
    effective_from_day TEXT NOT NULL,
    daily_add_bytes INTEGER NOT NULL DEFAULT 0,
    max_bytes INTEGER NOT NULL DEFAULT 0,
    updated_at INTEGER NOT NULL,
    reason TEXT,
    actor TEXT,
    PRIMARY KEY(scope, host)
  );');

  $t->exec('CREATE TABLE IF NOT EXISTS traffic_daily_adjust (
    day_key TEXT NOT NULL,
    host TEXT NOT NULL,
    delta_final_bytes INTEGER NOT NULL DEFAULT 0,
    delta_reserved_bytes INTEGER NOT NULL DEFAULT 0,
    updated_at INTEGER NOT NULL,
    reason TEXT,
    actor TEXT,
    PRIMARY KEY(day_key, host)
  );');

  $t->exec('CREATE TABLE IF NOT EXISTS traffic_admin_log (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    ts INTEGER NOT NULL,
    action TEXT NOT NULL,
    actor TEXT,
    detail_json TEXT
  );');



  $t->exec('CREATE TABLE IF NOT EXISTS traffic_daily (
    day_key TEXT NOT NULL,
    host TEXT NOT NULL,
    uniq_links INTEGER NOT NULL DEFAULT 0,
    reserved_bytes INTEGER NOT NULL DEFAULT 0,
    final_bytes INTEGER NOT NULL DEFAULT 0,
    updated_at INTEGER NOT NULL,
    PRIMARY KEY(day_key, host)
  );');
  $t->exec('CREATE INDEX IF NOT EXISTS idx_daily_day_host ON traffic_daily(day_key, host);');
  $t->exec('CREATE INDEX IF NOT EXISTS idx_daily_day ON traffic_daily(day_key);');

  // Backfill new columns for older DBs
  try { $t->exec("ALTER TABLE traffic_daily ADD COLUMN final_bytes INTEGER NOT NULL DEFAULT 0"); } catch (Throwable $e) {}

}

function traffic_meta_get(PDO $t, string $k, string $default = ''): string {
  $st = $t->prepare('SELECT v FROM traffic_meta WHERE k=?');
  $st->execute([$k]);
  $r = $st->fetch(PDO::FETCH_ASSOC);
  if (!$r || !isset($r['v'])) return $default;
  return (string)$r['v'];
}

function traffic_meta_set(PDO $t, string $k, string $v): void {
  $t->prepare('INSERT INTO traffic_meta(k,v) VALUES(?,?) ON CONFLICT(k) DO UPDATE SET v=excluded.v')->execute([$k, $v]);
}


function traffic_ledger_upsert(PDO $t, array $args): array {
  $dayKey = (string)($args['day_key'] ?? '');
  $host = (string)($args['host'] ?? '');
  $norm = (string)($args['norm_url'] ?? '');
  $now = (int)($args['now'] ?? time());

  if ($dayKey === '' || $host === '' || $norm === '') return ['ok'=>false,'error'=>'missing fields'];

  $sizeKnown = !empty($args['size_known']) ? 1 : 0;
  $sizeBytes = (int)($args['size_bytes'] ?? 0);
  if ($sizeBytes < 0) $sizeBytes = 0;

  $reserveBytes = (int)($args['reserve_bytes'] ?? 0);
  $finalBytes = (int)($args['final_bytes'] ?? 0);
  if ($reserveBytes < 0) $reserveBytes = 0;
  if ($finalBytes < 0) $finalBytes = 0;

  $statusCode = array_key_exists('status_code', $args) ? (string)$args['status_code'] : null;
  $statusCodeNum = array_key_exists('status_code_num', $args) ? (int)$args['status_code_num'] : null;
  if ($statusCodeNum === null && $statusCode !== null && $statusCode !== '' && ctype_digit($statusCode)) $statusCodeNum = (int)$statusCode;
  // Normalize pending legacy code: use numeric 0 for pending instead of string 'new'
  if ($statusCode !== null) {
    $scTrim = trim($statusCode);
    if ($scTrim === 'new') {
      $statusCode = '0';
      $statusCodeNum = 0;
    }
  }

  $state = array_key_exists('state', $args) ? (string)$args['state'] : null;
  $jobIdLast = array_key_exists('job_id_last', $args) ? (string)$args['job_id_last'] : null;
  $reserveUntil = (int)($args['reserve_until_ts'] ?? 0);
  $needs = array_key_exists('needs_reconcile', $args) ? ((int)$args['needs_reconcile'] ? 1 : 0) : 1;

  $t->prepare(
    'INSERT INTO traffic_ledger(day_key,host,norm_url,size_known,size_bytes,reserve_bytes,final_bytes,status_code,status_code_num,state,job_id_last,reserve_until_ts,needs_reconcile,created_at,updated_at)'
    . ' VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)'
    . ' ON CONFLICT(day_key,host,norm_url) DO UPDATE SET'
    . ' size_known=excluded.size_known,'
    . ' size_bytes=excluded.size_bytes,'
    . ' reserve_bytes=CASE WHEN state="done" OR (state="rejected" AND COALESCE(excluded.state,state)<>"done") THEN reserve_bytes ELSE excluded.reserve_bytes END,'
    . ' final_bytes=CASE WHEN state="done" OR (state="rejected" AND COALESCE(excluded.state,state)<>"done") THEN final_bytes ELSE excluded.final_bytes END,'
    . ' status_code=CASE WHEN state="done" OR (state="rejected" AND COALESCE(excluded.state,state)<>"done") THEN status_code ELSE COALESCE(excluded.status_code, status_code) END,'
    . ' status_code_num=CASE WHEN state="done" OR (state="rejected" AND COALESCE(excluded.state,state)<>"done") THEN status_code_num ELSE COALESCE(excluded.status_code_num, status_code_num) END,'
    . ' state=CASE WHEN state="done" OR (state="rejected" AND COALESCE(excluded.state,state)<>"done") THEN state ELSE COALESCE(excluded.state, state) END,'
    . ' job_id_last=COALESCE(excluded.job_id_last, job_id_last),'
    . ' reserve_until_ts=CASE WHEN excluded.reserve_until_ts>0 THEN excluded.reserve_until_ts ELSE reserve_until_ts END,'
    . ' needs_reconcile=CASE WHEN excluded.needs_reconcile=1 THEN 1 ELSE needs_reconcile END,'
    . ' updated_at=excluded.updated_at'
  )->execute([
    $dayKey,$host,$norm,$sizeKnown,$sizeBytes,$reserveBytes,$finalBytes,$statusCode,$statusCodeNum,$state,$jobIdLast,$reserveUntil,$needs,$now,$now
  ]);

  return ['ok'=>true];
}


function traffic_daily_rebuild_from_ledger(PDO $t, string $dayKey, string $host): void {
  $now = time();
  $st = $t->prepare('SELECT COUNT(*) AS c, COALESCE(SUM(reserve_bytes),0) AS r, COALESCE(SUM(final_bytes),0) AS f FROM traffic_ledger WHERE day_key=? AND host=?');
  $st->execute([$dayKey, $host]);
  $row = $st->fetch(PDO::FETCH_ASSOC) ?: [];
  $c = (int)($row['c'] ?? 0);
  $rb = (int)($row['r'] ?? 0);
  $fb = (int)($row['f'] ?? 0);

  $t->prepare('INSERT INTO traffic_daily(day_key,host,uniq_links,reserved_bytes,final_bytes,updated_at) VALUES(?,?,?,?,?,?)'
    . ' ON CONFLICT(day_key,host) DO UPDATE SET uniq_links=excluded.uniq_links,reserved_bytes=excluded.reserved_bytes,final_bytes=excluded.final_bytes,updated_at=excluded.updated_at')
    ->execute([$dayKey,$host,$c,$rb,$fb,$now]);
}

function traffic_daily_rebuild_account_from_ledger(PDO $t, string $dayKey): void {
  $now = time();
  $st = $t->prepare("SELECT COUNT(*) AS c, COALESCE(SUM(reserve_bytes),0) AS r, COALESCE(SUM(final_bytes),0) AS f FROM traffic_ledger WHERE day_key=? AND host!='__account__'");
  $st->execute([$dayKey]);
  $row = $st->fetch(PDO::FETCH_ASSOC) ?: [];
  $c = (int)($row['c'] ?? 0);
  $rb = (int)($row['r'] ?? 0);
  $fb = (int)($row['f'] ?? 0);

  $t->prepare('INSERT INTO traffic_daily(day_key,host,uniq_links,reserved_bytes,final_bytes,updated_at) VALUES(?,?,?,?,?,?)'
    . ' ON CONFLICT(day_key,host) DO UPDATE SET uniq_links=excluded.uniq_links,reserved_bytes=excluded.reserved_bytes,final_bytes=excluded.final_bytes,updated_at=excluded.updated_at')
    ->execute([$dayKey,'__account__',$c,$rb,$fb,$now]);
}

function traffic_ledger_mark_done(PDO $t, string $dayKey, string $host, string $norm, int $sizeBytes, string $jobId = ''): void {
  $now = time();
  if ($sizeBytes < 0) $sizeBytes = 0;
  traffic_ledger_upsert($t, [
    'day_key'=>$dayKey,
    'host'=>$host,
    'norm_url'=>$norm,
    'now'=>$now,
    'size_known'=>($sizeBytes > 0) ? 1 : 0,
    'size_bytes'=>$sizeBytes,
    'reserve_bytes'=>0,
    'final_bytes'=>($sizeBytes > 0) ? $sizeBytes : 0,
    'status_code'=>'1',
    'status_code_num'=>1,
    'state'=>'done',
    'job_id_last'=>$jobId,
    'reserve_until_ts'=>0,
    'needs_reconcile'=>1,
  ]);
}

function traffic_ledger_mark_reject(PDO $t, string $dayKey, string $host, string $norm, string $statusCode, string $state = 'rejected', int $sizeBytes = 0, string $jobId = ''): void {
  $now = time();
  if ($sizeBytes < 0) $sizeBytes = 0;
  traffic_ledger_upsert($t, [
    'day_key'=>$dayKey,
    'host'=>$host,
    'norm_url'=>$norm,
    'now'=>$now,
    'size_known'=>($sizeBytes > 0) ? 1 : 0,
    'size_bytes'=>$sizeBytes,
    'reserve_bytes'=>0,
    'final_bytes'=>0,
    'status_code'=>$statusCode,
    'status_code_num'=>ctype_digit((string)$statusCode) ? (int)$statusCode : null,
    'state'=>$state,
    'job_id_last'=>$jobId,
    'reserve_until_ts'=>0,
    'needs_reconcile'=>1,
  ]);
}

function traffic_reconcile_run(PDO $t, int $daysBack = 7, int $cleanupDays = 30): array {
  $now = time();

  // Cleanup old ledger rows (> cleanupDays) by created_at
  $cutoff = $now - ($cleanupDays * 86400);
  $t->prepare('DELETE FROM traffic_ledger WHERE created_at < ?')->execute([$cutoff]);

  $cutoffTs = $now - ($daysBack * 86400);


  $changed = []; // day|host => true
  $rows = 0;
  $batch = 5000;

  while (true) {
    $st = $t->prepare(
      'SELECT day_key,host,norm_url,size_known,size_bytes,reserve_bytes,final_bytes,status_code,state,reserve_until_ts,created_at'
      . ' FROM traffic_ledger'
      . ' WHERE created_at >= ?'
      . ' AND (needs_reconcile=1 OR (reserve_until_ts>0 AND reserve_until_ts < ?))'
      . ' ORDER BY (reserve_until_ts>0 AND reserve_until_ts < ?) DESC, reserve_until_ts ASC, day_key ASC'
      . ' LIMIT ' . (int)$batch
    );
    $st->execute([$cutoffTs, $now, $now]);

    $got = 0;
    while ($r = $st->fetch(PDO::FETCH_ASSOC)) {
      $got += 1;
      $rows += 1;

      $day = (string)($r['day_key'] ?? '');
      $host = (string)($r['host'] ?? '');
      $norm = (string)($r['norm_url'] ?? '');
      if ($day === '' || $host === '' || $norm === '') continue;

      $sizeKnown = (int)($r['size_known'] ?? 0);
      $sizeBytes = (int)($r['size_bytes'] ?? 0);
      if ($sizeBytes < 0) $sizeBytes = 0;

      $statusCode = (string)($r['status_code'] ?? '');
      $state = (string)($r['state'] ?? '');
      $reserveUntil = (int)($r['reserve_until_ts'] ?? 0);
      $createdAt = (int)($r['created_at'] ?? $now);

      $newState = $state;
      $newReserve = (int)($r['reserve_bytes'] ?? 0);
      $newFinal = (int)($r['final_bytes'] ?? 0);

      // TTL 3 days: if not done, cancel
      if (($now - $createdAt) > 3*86400) {
        if ($newState !== 'done' && $statusCode !== '1') {
          $newState = 'cancelled';
          $newReserve = 0;
          $newFinal = 0;
        }
      }

      $rejectCodes = ['5','6','7','8','dead'];
      if (in_array($statusCode, $rejectCodes, true) || in_array($newState, ['dead','rejected','cancelled'], true)) {
        $newReserve = 0;
        $newFinal = 0;
      } elseif ($statusCode === '1' || $newState === 'done') {
        $newState = 'done';
        $newReserve = 0;
        $newFinal = ($sizeKnown && $sizeBytes > 0) ? $sizeBytes : 0;
      } else {
        // pending
        if (!$sizeKnown || $sizeBytes <= 0) {
          $newReserve = 0; // updating
        } else {
          if ($reserveUntil > 0 && $now > $reserveUntil) {
            $newReserve = 0; // refund after 1h
          } else {
            $newReserve = $sizeBytes;
          }
        }
        $newFinal = 0; // only done counts
      }

      $t->prepare('UPDATE traffic_ledger SET state=?, reserve_bytes=?, final_bytes=?, needs_reconcile=0, updated_at=? WHERE day_key=? AND host=? AND norm_url=?')
        ->execute([$newState, $newReserve, $newFinal, $now, $day, $host, $norm]);

      $changed[$day . '|' . $host] = true;
      $changed[$day . '|__account__'] = true;
    }

    if ($got < $batch) break;
  }
  // Rebuild daily aggregates for changed pairs (reserved_bytes + final_bytes)
  foreach ($changed as $k => $_v) {
    [$dayKey2, $host2] = explode('|', $k, 2);
    if ($host2 === '__account__') {
      $st2 = $t->prepare("SELECT COUNT(*) AS c, COALESCE(SUM(reserve_bytes),0) AS r, COALESCE(SUM(final_bytes),0) AS f FROM traffic_ledger WHERE day_key=? AND host!='__account__'");
      $st2->execute([$dayKey2]);
      $row = $st2->fetch(PDO::FETCH_ASSOC) ?: [];
      $c = (int)($row['c'] ?? 0);
      $rb = (int)($row['r'] ?? 0);
      $fb = (int)($row['f'] ?? 0);
      $t->prepare('INSERT INTO traffic_daily(day_key,host,uniq_links,reserved_bytes,final_bytes,updated_at) VALUES(?,?,?,?,?,?)'
        . ' ON CONFLICT(day_key,host) DO UPDATE SET uniq_links=excluded.uniq_links,reserved_bytes=excluded.reserved_bytes,final_bytes=excluded.final_bytes,updated_at=excluded.updated_at')
        ->execute([$dayKey2,'__account__',$c,$rb,$fb,$now]);
    } else {
      $st2 = $t->prepare('SELECT COUNT(*) AS c, COALESCE(SUM(reserve_bytes),0) AS r, COALESCE(SUM(final_bytes),0) AS f FROM traffic_ledger WHERE day_key=? AND host=?');
      $st2->execute([$dayKey2, $host2]);
      $row = $st2->fetch(PDO::FETCH_ASSOC) ?: [];
      $c = (int)($row['c'] ?? 0);
      $rb = (int)($row['r'] ?? 0);
      $fb = (int)($row['f'] ?? 0);
      $t->prepare('INSERT INTO traffic_daily(day_key,host,uniq_links,reserved_bytes,final_bytes,updated_at) VALUES(?,?,?,?,?,?)'
        . ' ON CONFLICT(day_key,host) DO UPDATE SET uniq_links=excluded.uniq_links,reserved_bytes=excluded.reserved_bytes,final_bytes=excluded.final_bytes,updated_at=excluded.updated_at')
        ->execute([$dayKey2,$host2,$c,$rb,$fb,$now]);
    }
  }

  return ['ok'=>true,'rows'=>$rows,'changed_pairs'=>count($changed)];
}

function traffic_reconcile_maybe(PDO $t): void {
  $today = today_key();
  $last = traffic_meta_get($t, 'reconcile_last_day', '');
  if ($last !== $today) {
    traffic_meta_set($t, 'reconcile_last_day', $today);
    traffic_reconcile_run($t, 7, 30);
  }
}


function admin_actor(): string {
  // Best-effort actor id
  $ip = get_client_ip();
  return $ip !== '' ? $ip : 'unknown';
}

function traffic_admin_log(PDO $t, string $action, array $detail = []): void {
  try {
    $t->prepare('INSERT INTO traffic_admin_log(ts,action,actor,detail_json) VALUES(?,?,?,?)')
      ->execute([time(), $action, admin_actor(), json_encode($detail, JSON_UNESCAPED_SLASHES|JSON_UNESCAPED_UNICODE)]);
  } catch (Throwable $e) {
    // ignore
  }
}

function traffic_quota_get_effective(PDO $t, string $scope, string $host, string $dayKey): array {
  // Returns override if effective_from_day <= dayKey
  $st = $t->prepare('SELECT * FROM traffic_quota_override WHERE scope=? AND host=?');
  $st->execute([$scope, $host]);
  $r = $st->fetch(PDO::FETCH_ASSOC);
  if (!$r) return ['ok'=>true,'override'=>false];
  $eff = (string)($r['effective_from_day'] ?? '');
  if ($eff !== '' && strcmp($dayKey, $eff) < 0) {
    return ['ok'=>true,'override'=>False,'row'=>$r];
  }
  return ['ok'=>true,'override'=>true,'row'=>$r];
}


function traffic_quota_bytes_for(PDO $t, array $cfg, string $dayKey, string $scope, string $hostCanon): array {
  // Returns bytes limits used for enforcement/display (Boss semantics):
  // - daily_bytes: SOFT cap bytes
  // - over_bytes: HARD max total allowed for the day (bytes), NOT extra
  // - cap_bytes:  hard cap bytes (== over_bytes when set; fallback to daily_bytes if over_bytes=0)
  // IMPORTANT: Do NOT treat (daily_bytes + over_bytes) as a combined cap.
  $dailyBytes = 0;
  $overBytes = 0;
  $override = false;
  $effDay = '';

  if ($scope === 'account') {
    $accCapMb = (int)($cfg['maxbw_mb'] ?? 0);
    $accBonusMb = (int)($cfg['plusbw_mb'] ?? 0);
    $accDailyAddMb = (int)($cfg['daily_add_mb'] ?? max(0, $accCapMb - $accBonusMb));
    $dailyBytes = $accDailyAddMb > 0 ? $accDailyAddMb * 1024 * 1024 : 0;
    $overBytes = $accCapMb > 0 ? $accCapMb * 1024 * 1024 : 0;
    try {
      $q = traffic_quota_get_effective($t, 'account', '__account__', $dayKey);
      if (!empty($q['override']) && !empty($q['row'])) {
        $dailyBytes = (int)($q['row']['daily_add_bytes'] ?? $dailyBytes);
        $override = true;
        $effDay = (string)($q['row']['effective_from_day'] ?? '');
      }
    } catch (Throwable $e) {
      // ignore
    }
  } else {
    $hostCfg = $cfg['hosts'][$hostCanon] ?? ($cfg['hosts']['other'] ?? []);
    $dmb = (int)($hostCfg['daily_bw_mb'] ?? 0);
    $omb = (int)($hostCfg['overlimit_bw_mb'] ?? 0);
    $dailyBytes = $dmb > 0 ? $dmb * 1024 * 1024 : 0;
    $overBytes = $omb > 0 ? $omb * 1024 * 1024 : 0;
    try {
      $q = traffic_quota_get_effective($t, 'host', $hostCanon, $dayKey);
      if (!empty($q['override']) && !empty($q['row'])) {
        $dailyBytes = (int)($q['row']['daily_add_bytes'] ?? $dailyBytes);
        $override = true;
        $effDay = (string)($q['row']['effective_from_day'] ?? '');
      }
    } catch (Throwable $e) {
      // ignore
    }
  }

  $cap = ($overBytes > 0 ? $overBytes : $dailyBytes);
  return ['daily_bytes'=>$dailyBytes,'over_bytes'=>$overBytes,'cap_bytes'=>$cap,'override'=>$override,'effective_from_day'=>$effDay];
}

function traffic_daykey_to_ts(string $dayKey): int {
  $dt = DateTime::createFromFormat('d.m.Y H:i:s', $dayKey . ' 00:00:00');
  if (!$dt) return 0;
  return (int)$dt->getTimestamp();
}

function traffic_account_daily_used_bytes(PDO $t, string $dayKey): int {
  try {
    $st = $t->prepare("SELECT COALESCE(reserved_bytes,0)+COALESCE(final_bytes,0) AS used FROM traffic_daily WHERE day_key=? AND host='__account__' LIMIT 1");
    $st->execute([$dayKey]);
    $r = $st->fetch(PDO::FETCH_ASSOC) ?: [];
    $used = (int)($r['used'] ?? 0);
    $adj = traffic_daily_adjust_get($t, $dayKey, '__account__');
    $used += (int)($adj['delta_reserved_bytes'] ?? 0) + (int)($adj['delta_final_bytes'] ?? 0);
    return (int)$used;
  } catch (Throwable $e) {
    return 0;
  }
}

// PA3 (2026-02-22): account wallet carry-over semantics.
// Uses config snapshot vars: maxbw_mb (cap), plusbw_mb (activation bonus once),
// and daily add fallback = max(0, maxbw_mb - plusbw_mb) when daily_add_mb is absent.
// left_today can be negative (overuse debt), by Boss request.
function traffic_account_wallet_state(PDO $t, array $cfg, string $dayKey): array {
  $capMb = (int)($cfg['maxbw_mb'] ?? 0);
  $bonusMb = (int)($cfg['plusbw_mb'] ?? 0);
  $dailyAddMb = (int)($cfg['daily_add_mb'] ?? max(0, $capMb - $bonusMb));

  $capBytes = $capMb > 0 ? $capMb * 1024 * 1024 : 0;
  $bonusBytes = $bonusMb > 0 ? $bonusMb * 1024 * 1024 : 0;
  $dailyAddBytes = $dailyAddMb > 0 ? $dailyAddMb * 1024 * 1024 : 0;

  $dayTs = traffic_daykey_to_ts($dayKey);
  if ($dayTs <= 0) return ['cap_bytes'=>$capBytes,'bonus_bytes'=>$bonusBytes,'daily_add_bytes'=>$dailyAddBytes,'start_today'=>0,'used_today'=>0,'left_today'=>0,'end_yesterday'=>0];

  $createdDayTs = $dayTs;
  try {
    $st = $t->prepare("SELECT day_key FROM traffic_daily WHERE host='__account__' ORDER BY day_key ASC LIMIT 1");
    $st->execute();
    $r = $st->fetch(PDO::FETCH_ASSOC);
    if (is_array($r) && !empty($r['day_key'])) {
      $ts0 = traffic_daykey_to_ts((string)$r['day_key']);
      if ($ts0 > 0) $createdDayTs = $ts0;
    }
  } catch (Throwable $e) {}
  if ($createdDayTs > $dayTs) $createdDayTs = $dayTs;

  $stateEnd = 0;
  $stateStart = 0;
  for ($ts = $createdDayTs; $ts <= $dayTs; $ts += 86400) {
    $dk = date('d.m.Y', $ts);
    $stateStart = ($ts == $createdDayTs) ? ($dailyAddBytes + $bonusBytes) : ($stateEnd + $dailyAddBytes);
    if ($capBytes > 0 && $stateStart > $capBytes) $stateStart = $capBytes;
    $used = traffic_account_daily_used_bytes($t, $dk);
    $stateEnd = $stateStart - $used;
    if ($ts == $dayTs) {
      return [
        'cap_bytes'=>$capBytes,
        'bonus_bytes'=>$bonusBytes,
        'daily_add_bytes'=>$dailyAddBytes,
        'start_today'=>(int)$stateStart,
        'used_today'=>(int)$used,
        'left_today'=>(int)$stateEnd,
        'end_yesterday'=>(int)($stateStart - $dailyAddBytes),
      ];
    }
  }
  return ['cap_bytes'=>$capBytes,'bonus_bytes'=>$bonusBytes,'daily_add_bytes'=>$dailyAddBytes,'start_today'=>0,'used_today'=>0,'left_today'=>0,'end_yesterday'=>0];
}

function traffic_daily_adjust_get(PDO $t, string $dayKey, string $host): array {
  $st = $t->prepare('SELECT delta_final_bytes,delta_reserved_bytes FROM traffic_daily_adjust WHERE day_key=? AND host=?');
  $st->execute([$dayKey, $host]);
  $r = $st->fetch(PDO::FETCH_ASSOC);
  if (!$r) return ['delta_final_bytes'=>0,'delta_reserved_bytes'=>0];
  return ['delta_final_bytes'=>(int)($r['delta_final_bytes'] ?? 0), 'delta_reserved_bytes'=>(int)($r['delta_reserved_bytes'] ?? 0)];
}



function traffic_daily_adjust_add(PDO $t, string $dayKey, string $host, int $deltaFinalBytes = 0, int $deltaReservedBytes = 0, string $reason = '', string $actor = ''): void {
  // Upsert additive deltas (used for "debt" carry to tomorrow).
  // NOTE: SQLite may be busy due to concurrent ledger writes; set busy_timeout to reduce spurious failures.
  try { $t->exec('PRAGMA busy_timeout=2000'); } catch (Throwable $e) { /* ignore */ }
  if ($deltaFinalBytes === 0 && $deltaReservedBytes === 0) return;
  $isDebt = (stripos((string)$reason, 'debt') !== false);
  if ($isDebt && !api_debt_write_enabled()) {
    phase1_compat_log('debt_write_skipped_api', ['day_key'=>$dayKey,'host'=>$host,'delta_final'=>$deltaFinalBytes,'delta_reserved'=>$deltaReservedBytes,'reason'=>$reason,'actor'=>$actor]);
    return;
  }
  $now = time();
  // Ensure table exists (schema ensured in traffic_db())
  $t->prepare('INSERT INTO traffic_daily_adjust(day_key,host,delta_final_bytes,delta_reserved_bytes,updated_at,reason,actor) VALUES(?,?,?,?,?,?,?)'
    . ' ON CONFLICT(day_key,host) DO UPDATE SET delta_final_bytes=delta_final_bytes+excluded.delta_final_bytes, delta_reserved_bytes=delta_reserved_bytes+excluded.delta_reserved_bytes, updated_at=excluded.updated_at')
    ->execute([$dayKey, $host, $deltaFinalBytes, $deltaReservedBytes, $now, ($reason !== '' ? $reason : null), ($actor !== '' ? $actor : null)]);
}
function traffic_record_seen(PDO $t, array $args): array {
  $dayKey = (string)($args['day_key'] ?? '');
  $host = (string)($args['host'] ?? '');
  $norm = (string)($args['norm_url'] ?? '');
  $bytes = (int)($args['size_bytes'] ?? 0);
  $src = (string)($args['source'] ?? '');
  $ref = (string)($args['ref_id'] ?? '');
  $now = time();

  if ($dayKey === '' || $host === '' || $norm === '') return ['ok'=>false,'error'=>'missing fields'];
  if ($bytes < 0) $bytes = 0;

  $ins = $t->prepare('INSERT OR IGNORE INTO traffic_seen(day_key,host,norm_url,size_bytes,first_seen_at,source,ref_id) VALUES(?,?,?,?,?,?,?)');
  $ins->execute([$dayKey, $host, $norm, $bytes, $now, $src, $ref]);
  $new = ($ins->rowCount() > 0);

  if ($new) {
    // Ledger (reserve/final): reserve only when size is known; final counts only when DONE.
    // Policy: size updating => reserve=0; reserve will be refunded to 0 after 1h by reconcile.
    $sizeKnown = ($bytes > 0) ? 1 : 0;
    $reserve = $sizeKnown ? $bytes : 0;
    $reserveUntil = $sizeKnown ? ($now + 3600) : 0;

    // PA3 (2026-02-09): prevent legacy_sync/seen from reserving beyond host daily cap
    try {
      $cfg = load_snapshot_config();
      $q = traffic_quota_bytes_for($t, $cfg, $dayKey, 'host', $host);
      $cap = (int)($q['cap_bytes'] ?? 0);
      if ($cap > 0 && $reserve > 0) {
        $usedNow = (int)traffic_get_used_bytes_today($t, $dayKey, $host);
        if (($usedNow + $reserve) > $cap) {
          traffic_ledger_upsert($t, [
            'day_key'=>$dayKey,
            'host'=>$host,
            'norm_url'=>$norm,
            'now'=>$now,
            'size_known'=>$sizeKnown,
            'size_bytes'=>$bytes,
            'reserve_bytes'=>0,
            'final_bytes'=>0,
            'status_code'=>'rejected',
            'state'=>'rejected',
            'job_id_last'=>$ref,
            'reserve_until_ts'=>0,
            'needs_reconcile'=>1,
          ]);
          return ['ok'=>true,'new'=>$new,'blocked'=>true,'reason'=>'host_cap'];
        }
      }
    } catch (Throwable $e) {}

    traffic_ledger_upsert($t, [
      'day_key'=>$dayKey,
      'host'=>$host,
      'norm_url'=>$norm,
      'now'=>$now,
      'size_known'=>$sizeKnown,
      'size_bytes'=>$bytes,
      'reserve_bytes'=>$reserve,
      'final_bytes'=>0,
      'status_code'=>'new',
      'state'=>'new',
      'job_id_last'=>$ref,
      'reserve_until_ts'=>$reserveUntil,
      'needs_reconcile'=>1,
    ]);

    $t->prepare('INSERT INTO traffic_daily(day_key,host,uniq_links,reserved_bytes,updated_at) VALUES(?,?,?,?,?)
      ON CONFLICT(day_key,host) DO UPDATE SET uniq_links=uniq_links+1,reserved_bytes=reserved_bytes+excluded.reserved_bytes,updated_at=excluded.updated_at')
      ->execute([$dayKey, $host, 1, $bytes, $now]);
  }

  return ['ok'=>true,'new'=>$new];
}

function traffic_sync_from_legacy_sqlite(PDO $t, int $days = 7, int $limit = 500, int $budgetMs = 200): array {
  // Incremental sync from local legacy.sqlite into traffic.sqlite.
  // Throttle: once per 2 seconds.
  $now = time();
  $last = (int)traffic_meta_get($t, 'legacy_sync_last_at', '0');
  if ($last > 0 && ($now - $last) < 2) {
    return ['ok'=>true,'skipped'=>true,'reason'=>'throttle','age_sec'=>$now-$last];
  }

  traffic_meta_set($t, 'legacy_sync_last_at', (string)$now);

  $legacyPath = API_DATA_DIR . '/legacy.sqlite';
  if (!is_file($legacyPath)) return ['ok'=>true,'skipped'=>true,'reason'=>'legacy_missing'];

  $cursorU = (int)traffic_meta_get($t, 'legacy_sync_cursor_updated_at', '0');
  $cursorK = (string)traffic_meta_get($t, 'legacy_sync_cursor_job_key', '');

  // Safety: legacy_jobs2 may insert late rows with the same updated_at but smaller job_key;
  // use a small lookback window to avoid missing them. Idempotent upserts make this safe.
  $sinceU = $cursorU > 2 ? ($cursorU - 2) : 0;

  $minTs = $now - ($days * 86400) - 86400; // small buffer
  $t0 = microtime(true);
  $rows = 0;
  $newSeen = 0;

  try {
    $lpdo = new PDO('sqlite:' . $legacyPath);
    $lpdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
    @$lpdo->exec('PRAGMA query_only=ON;');
    @$lpdo->exec('PRAGMA busy_timeout=800;');

    $sql = 'SELECT job_key, host, norm_url, origin_url, size_bytes, bw_charge_bytes, mtime, updated_at
'
      . 'FROM legacy_jobs2
'
      . 'WHERE updated_at >= ?
'
      . 'ORDER BY updated_at ASC, job_key ASC
'
      . 'LIMIT ' . (int)$limit;

    $st = $lpdo->prepare($sql);
    $st->execute([$sinceU]);

    while ($r = $st->fetch(PDO::FETCH_ASSOC)) {
      $rows += 1;
      $u = (int)($r['updated_at'] ?? 0);
      $k = (string)($r['job_key'] ?? '');
      $cursorU = $u;
      $cursorK = $k;

      $mtime = (int)($r['mtime'] ?? 0);
      if ($mtime <= 0) continue;
      if ($mtime < $minTs) continue;

      $host = canonical_host_from_url('https://' . ((string)($r['host'] ?? '')) . '/');
      if ($host === '') $host = canonical_host_from_url((string)($r['origin_url'] ?? ''));
      if ($host === '') continue;

      $norm = (string)($r['norm_url'] ?? '');
      if ($norm === '') $norm = normalize_url_for_uniqueness(normalize_url((string)($r['origin_url'] ?? '')));
      if ($norm === '') continue;

      $bytes = (int)($r['bw_charge_bytes'] ?? 0);
      if ($bytes <= 0) $bytes = (int)($r['size_bytes'] ?? 0);
      if ($bytes <= 0) continue; // skip unknown/zero bytes
      if ($bytes < 0) $bytes = 0;

      $dayKey = day_key_for_ts($mtime);
      $ins = traffic_record_seen($t, [
        'day_key' => $dayKey,
        'host' => $host,
        'norm_url' => $norm,
        'size_bytes' => $bytes,
        'source' => 'legacy_sync',
        'ref_id' => $k,
      ]);

      // Bridge legacy -> traffic_ledger (Boss policy):
      // - DONE (status_code=1) counts final_bytes=size
      // - Reject/dead statuses (5/6/7/8/dead) do NOT count BW (reserve=0, final=0)
      // - NEW/processing/leeching: reserve=size for up to 1 hour (then reconcile will refund to 0 if still not DONE)
      try {
        $sc = (string)($r['status_code'] ?? ($r['status'] ?? ''));
        if ($sc === '') $sc = (string)($r['status'] ?? '');

        $isReject = in_array($sc, ['5','6','7','8','dead'], true);

        if ($sc === '1') {
          $usedBeforeDebt = traffic_get_used_bytes_today($t, $dayKey, $host);
          traffic_ledger_mark_done($t, $dayKey, $host, $norm, $bytes, $k);
        } elseif ($isReject) {
          $st = ($sc === 'dead') ? 'dead' : 'rejected';
          traffic_ledger_mark_reject($t, $dayKey, $host, $norm, $sc, $st, $bytes, $k);
        } else {
          // Pending: reserve within 1h only if size is known
          $sizeKnown = ($bytes > 0) ? 1 : 0;
          $reserve = $sizeKnown ? $bytes : 0;
          $reserveUntil = $sizeKnown ? ($mtime + 3600) : 0;
          traffic_ledger_upsert($t, [
            'day_key'=>$dayKey,
            'host'=>$host,
            'norm_url'=>$norm,
            'now'=>time(),
            'size_known'=>$sizeKnown,
            'size_bytes'=>$bytes,
            'reserve_bytes'=>$reserve,
            'final_bytes'=>0,
            'status_code'=>($sc !== '' && $sc !== 'new') ? $sc : '0',
            'status_code_num'=>ctype_digit((string)$sc) ? (int)$sc : 0,
            'state'=>'new',
            'job_id_last'=>$k,
            'reserve_until_ts'=>$reserveUntil,
            'needs_reconcile'=>1,
          ]);
        }

        traffic_daily_rebuild_from_ledger($t, $dayKey, $host);
        traffic_daily_rebuild_account_from_ledger($t, $dayKey);
        if (isset($usedBeforeDebt)) {
          $usedAfterDebt = traffic_get_used_bytes_today($t, $dayKey, $host);
          traffic_record_debt_inc_after_done($t, $dayKey, $host, (int)$usedBeforeDebt, (int)$usedAfterDebt);
          unset($usedBeforeDebt);
        }
      } catch (Throwable $e) {
        // ignore
      }

      if (!empty($ins['new'])) $newSeen += 1;

      if ((microtime(true) - $t0) * 1000.0 > $budgetMs) break;
    }

    traffic_meta_set($t, 'legacy_sync_cursor_updated_at', (string)$cursorU);
    // Keep job_key cursor empty; we rely on updated_at lookback window.
    traffic_meta_set($t, 'legacy_sync_cursor_job_key', '');

    return ['ok'=>true,'skipped'=>false,'rows'=>$rows,'new_seen'=>$newSeen,'cursor_updated_at'=>$cursorU,'cursor_job_key'=>$cursorK,'took_ms'=>(int)round((microtime(true)-$t0)*1000)];
  } catch (Throwable $e) {
    return ['ok'=>false,'error'=>$e->getMessage(),'rows'=>$rows,'new_seen'=>$newSeen,'took_ms'=>(int)round((microtime(true)-$t0)*1000)];
  }
}

function traffic_get_daily(PDO $t, string $dayKey): array {
  $st = $t->prepare('SELECT host, uniq_links, reserved_bytes, final_bytes FROM traffic_daily WHERE day_key=?');
  $st->execute([$dayKey]);
  $out = [];
  while ($r = $st->fetch(PDO::FETCH_ASSOC)) {
    $h = (string)($r['host'] ?? '');
    if ($h === '') continue;
    $adj = traffic_daily_adjust_get($t, $dayKey, $h);
    $rb = (int)($r['reserved_bytes'] ?? 0) + (int)($adj['delta_reserved_bytes'] ?? 0);
    $fb = (int)($r['final_bytes'] ?? 0) + (int)($adj['delta_final_bytes'] ?? 0);
    if ($rb < 0) $rb = 0;
    if ($fb < 0) $fb = 0;
    $out[$h] = ['uniq_links'=>(int)($r['uniq_links'] ?? 0), 'reserved_bytes'=>$rb, 'final_bytes'=>$fb];
  }
  return $out;
}

function traffic_is_seen_today(PDO $t, string $dayKey, string $host, string $normUrl): bool {
  $st = $t->prepare('SELECT 1 FROM traffic_seen WHERE day_key=? AND host=? AND norm_url=? LIMIT 1');
  $st->execute([$dayKey, $host, $normUrl]);
  $r = $st->fetch(PDO::FETCH_ASSOC);
  return !empty($r);
}

function traffic_get_used_bytes_today(PDO $t, string $dayKey, string $host = ''): int {
  if ($host !== '') {
    $st = $t->prepare('SELECT reserved_bytes, final_bytes FROM traffic_daily WHERE day_key=? AND host=?');
    $st->execute([$dayKey, $host]);
    $r = $st->fetch(PDO::FETCH_ASSOC);
    $rb = (int)($r['reserved_bytes'] ?? 0) + (int)($r['final_bytes'] ?? 0);
    $adj = traffic_daily_adjust_get($t, $dayKey, $host);
    $rb += (int)($adj['delta_reserved_bytes'] ?? 0);
    $rb += (int)($adj['delta_final_bytes'] ?? 0);
    if ($rb < 0) $rb = 0;
    return $rb;
  }

  $st = $t->prepare('SELECT COALESCE(SUM(reserved_bytes),0) AS r, COALESCE(SUM(final_bytes),0) AS f FROM traffic_daily WHERE day_key=?');
  $st->execute([$dayKey]);
  $r = $st->fetch(PDO::FETCH_ASSOC);
  $rb = (int)($r['r'] ?? 0) + (int)($r['f'] ?? 0);
  $adj = traffic_daily_adjust_get($t, $dayKey, '__account__');
  $rb += (int)($adj['delta_reserved_bytes'] ?? 0);
  $rb += (int)($adj['delta_final_bytes'] ?? 0);
  if ($rb < 0) $rb = 0;
  return $rb;
}

function traffic_get_details(PDO $t, int $startTs, int $endTs): array {
  $start = day_key_for_ts($startTs);
  $end = day_key_for_ts($endTs);

  $st = $t->prepare('SELECT day_key, host, reserved_bytes FROM traffic_daily WHERE day_key>=? AND day_key<=? ORDER BY day_key ASC');
  $st->execute([$start, $end]);
  $days = [];
  while ($r = $st->fetch(PDO::FETCH_ASSOC)) {
    $dk = (string)($r['day_key'] ?? '');
    $h = (string)($r['host'] ?? '');
    $b = (int)($r['reserved_bytes'] ?? 0);
    if ($dk === '' || $h === '') continue;
    if (!isset($days[$dk])) $days[$dk] = ['host'=>[], 'bytes'=>0];
    $days[$dk]['host'][$h] = $b;
    $days[$dk]['bytes'] += $b;
  }
  return $days;
}

function curl_request(string $url, string $method = 'GET', ?array $postFields = null, array $extraHeaders = []): array {
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $url);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  curl_setopt($ch, CURLOPT_HEADER, true);
  curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);
  curl_setopt($ch, CURLOPT_MAXREDIRS, 5);
  curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 15);
  curl_setopt($ch, CURLOPT_TIMEOUT, 40);
  curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
  curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
  curl_setopt($ch, CURLOPT_COOKIEJAR, API_COOKIE_JAR);
  curl_setopt($ch, CURLOPT_COOKIEFILE, API_COOKIE_JAR);

  $headers = [
    'User-Agent: GetlinkAPI/0.1 (internal)',
    'Accept: text/html,application/json;q=0.9,*/*;q=0.8',
  ];

  if (!empty($extraHeaders)) {
    foreach ($extraHeaders as $h) {
      if (is_string($h) && $h !== '') $headers[] = $h;
    }
  }

  $method = strtoupper($method);
  if ($method === 'POST') {
    curl_setopt($ch, CURLOPT_POST, true);
    if ($postFields !== null) {
      curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postFields));
      $headers[] = 'Content-Type: application/x-www-form-urlencoded';
    }
  }

  curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);

  $resp = curl_exec($ch);
  if ($resp === false) {
    $err = curl_error($ch);
    curl_close($ch);
    return ['ok'=>false,'error'=>$err,'status'=>0,'headers'=>'','body'=>''];
  }

  $status = curl_getinfo($ch, CURLINFO_HTTP_CODE);
  $headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
  curl_close($ch);

  $rawHeaders = substr($resp, 0, $headerSize);
  $body = substr($resp, $headerSize);

  return ['ok'=>true,'status'=>$status,'headers'=>$rawHeaders,'body'=>$body];
}

function is_logged_in_leech_page(string $html): bool {
  return (stripos($html, 'id="urllist"') !== false) || (stripos($html, 'name="urllist"') !== false) || (stripos($html, 'Leech Link Phiên bản') !== false);
}

function ensure_logged_in(): void {
  $r = curl_request(API_BASE_URL . '?id=leech', 'GET');
  $body = $r['body'] ?? '';
  if (!is_logged_in_leech_page($body)) {
    curl_request(API_BASE_URL . 'login.php', 'POST', [
      'secure' => API_FALLBACK_PASS,
      'remember-me' => '1',
    ]);
    $r2 = curl_request(API_BASE_URL . '?id=leech', 'GET');
    if (!is_logged_in_leech_page($r2['body'] ?? '')) {
      json_error(401, 13, 'Invalid login', 'LOGIN_FAILED');
    }
  }
}

function debug_enabled(): bool {
  if (empty($_GET['debug'])) return false;
  return (string)($_GET['passdebug'] ?? '') === 'demo21@21k9';
}

function get_client_ip(): string {
  // Prefer proxy headers if present (Nginx/CF), fallback to REMOTE_ADDR.
  $xff = (string)($_SERVER['HTTP_X_FORWARDED_FOR'] ?? '');
  if ($xff !== '') {
    $parts = array_map('trim', explode(',', $xff));
    if (!empty($parts[0]) && filter_var($parts[0], FILTER_VALIDATE_IP)) return $parts[0];
  }
  $xri = (string)($_SERVER['HTTP_X_REAL_IP'] ?? '');
  if ($xri !== '' && filter_var($xri, FILTER_VALIDATE_IP)) return $xri;
  $ip = (string)($_SERVER['REMOTE_ADDR'] ?? '');
  return $ip !== '' ? $ip : '127.0.0.1';
}

function normalize_job_type($v): string {
  $t = strtolower(trim((string)$v));
  if ($t === 'leech' || $t === 'get') return $t;
  // legacy sometimes stores "direct" or other labels; treat as leech by default
  return 'leech';
}

function normalize_bool_int($v, int $default = 0): int {
  if ($v === null) return $default;
  if (is_bool($v)) return $v ? 1 : 0;
  if (is_numeric($v)) return ((int)$v) ? 1 : 0;
  $s = strtolower(trim((string)$v));
  if ($s === '1' || $s === 'true' || $s === 'yes' || $s === 'on') return 1;
  if ($s === '0' || $s === 'false' || $s === 'no' || $s === 'off') return 0;
  return $default;
}

function normalize_url(string $url): string {
  $u = trim($url);
  // Keep normalized for matching in .dat (legacy may store either form)
  $u = preg_replace('~^https?://rg\.to/file/~i', 'https://rapidgator.net/file/', $u);
  // strip common wrapper
  $u = preg_replace('~\.html$~i', '', $u);
  return $u;
}

function parse_size_to_bytes(?string $sizeText): ?int {
  if (!$sizeText) return null;
  $s = trim(str_replace(',', '.', $sizeText));
  if (!preg_match('~^([0-9]+(?:\.[0-9]+)?)\s*(bytes|kb|mb|gb|tb)?~i', $s, $m)) return null;
  $num = (float)$m[1];
  $unit = strtolower($m[2] ?? '');
  if ($unit === '' || $unit === 'bytes') return (int)round($num);
  $mul = 1;
  if ($unit === 'kb') $mul = 1024;
  elseif ($unit === 'mb') $mul = 1024*1024;
  elseif ($unit === 'gb') $mul = 1024*1024*1024;
  elseif ($unit === 'tb') $mul = 1024*1024*1024*1024;
  return (int)round($num * $mul);
}

function diehost_list(): array {
  // From legacy class.user/config.all.php ($this->diehost)
  return [
    'uploaded.net','uploaded.to','ul.to','hotfile.com','rapidshare.com','megaupload.com','filesonic.com',
    'fileserve.com','wupload.com','megashares.com','datafile.com','uploading.com','freakshare.com',
    'netload.in','oron.com','filepost.com','uploadstation.com','shareflare.net','bitshare.com',
    'extabit.com','share-online.biz','vip-file.com','yourfilelink.com'
  ];
}

function is_diehost(string $urlOrHost): bool {
  $host = strtolower(trim($urlOrHost));
  if (stristr($host, '://')) {
    $p = @parse_url($host);
    if (is_array($p) && !empty($p['host'])) $host = strtolower((string)$p['host']);
  }
  $host = preg_replace('~^www\.~i', '', $host);

  $filter = legacy_filterhost();
  foreach ($filter as $canon => $aliases) {
    if ($host === $canon) { $host = $canon; break; }
    foreach ($aliases as $a) {
      if ($host === $a) { $host = $canon; break 2; }
    }
  }

  return in_array($host, diehost_list(), true);
}

function checklink_precheck(string $url): array {
  // Uses local legacy checklink (fast, resolves shortlinks, returns filename/filesize)
  // FAIL-OPEN policy: caller should ignore errors and proceed.
  $endpoint = 'http://127.0.0.1:8081/checklink/?url=' . urlencode($url);
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $endpoint);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 5);
  curl_setopt($ch, CURLOPT_TIMEOUT, 10);
  $body = curl_exec($ch);
  $err = curl_error($ch);
  $code = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
  curl_close($ch);

  if ($body === false || $code <= 0) return ['ok'=>false,'error'=>$err ?: 'curl failed'];
  $json = json_decode($body, true);
  if (!is_array($json)) {
    $body2 = trim((string)$body, "\xEF\xBB\xBF");
    $json = json_decode($body2, true);
  }
  if (!is_array($json)) return ['ok'=>false,'error'=>'invalid json','raw'=>$body];

  $fullurl = (string)($json['fullurl'] ?? ($json['url'] ?? ''));
  $filename = (string)($json['filename'] ?? '');
  $filesizeText = (string)($json['filesize'] ?? '');
  $filesizeBytes = parse_size_to_bytes($filesizeText);

  return [
    'ok'=>true,
    'fullurl'=>$fullurl,
    'filename'=>$filename,
    'filesize_text'=>$filesizeText,
    'filesize_bytes'=>$filesizeBytes,
    'raw'=>$json,
  ];
}

function legacy_filterhost(): array {
  // From legacy class.user/config.all.php
  return [
    'uploaded.net'   => ['uploaded.to','ul.to'],
    'rapidgator.net' => ['rapidgator.to','rg.to','rapidgator.com'],
    'turbobit.net'   => ['turbo-bit.net','trbbt.net','tbit.to','turbo.to','turbobit.cc','turb.to','turb.cc'],
    '1fichier.com'   => ['alterupload.com','cjoint.net','dl4free.com','tenvoi.com','pjointe.com','megadl.fr','piecejointe.net','mesfichiers.org','dfichiers.com','desfichiers.com'],
    'hitfile.net'    => ['hitf.to','hitf.cc','hitfile.cc'],
    'mega.nz'        => ['mega1.nz','mega.co.nz'],
    'youtube.com'    => ['youtube1.com','youtu.be'],
    'upstore.net'    => ['upstore1.net','upsto.re'],
    'k2s.cc'         => ['keep2share.cc','keep2s.cc','k2share.cc'],
    'katfile.com'    => ['katfile.online','katfile.cloud'],
    'fboom.me'       => ['fileboom.me','fileboom.com'],
    '4share.vn'      => ['up2.4share.vn','up.4share.vn'],
    'nitro.download' => ['nitroflare.com'],
  ];
}

function extract_rapidgator_file_id(string $url): string {
  $p = @parse_url($url);
  $path = is_array($p) ? (string)($p['path'] ?? '') : '';
  if (preg_match('~^/file/([^/]+)~i', $path, $m)) return (string)$m[1];
  return '';
}

function canonical_host_from_url(string $url): string {
  $host = '';
  $p = @parse_url($url);
  if (is_array($p) && !empty($p['host'])) $host = strtolower((string)$p['host']);
  $host = preg_replace('~^www\.~i', '', $host);

  $filter = legacy_filterhost();
  foreach ($filter as $canon => $aliases) {
    if ($host === $canon) return $canon;
    foreach ($aliases as $a) {
      if ($host === $a) return $canon;
    }
  }
  return $host;
}

function today_key(): string {
  return date('d.m.Y');
}

function today_start_ts(): int {
  return strtotime('today 00:00:00');
}

function normalize_url_for_uniqueness(string $url): string {
  $u = trim((string)$url);
  if ($u === '') return '';

  // strip fragment
  $u = preg_replace('~#.*$~', '', $u);

  if (stripos($u, 'http://') === 0 || stripos($u, 'https://') === 0) {
    $p = @parse_url($u);
    if (!is_array($p) || empty($p['host'])) return '';

    $hostRaw = strtolower((string)$p['host']);
    $hostRaw = preg_replace('~^www\.~i', '', $hostRaw);
    $host = canonical_host_from_url('https://' . $hostRaw . '/');

    $path = (string)($p['path'] ?? '');
    $q = (string)($p['query'] ?? '');

    // Legacy parity: for rapidgator family, drop query to avoid dedupe split.
    if ($host === 'rapidgator.net') $q = '';

    // strip trailing .html
    $path = preg_replace('~\.html$~i', '', $path);

    $out = $host . $path;
    if ($q !== '') $out .= '?' . $q;

    // Legacy parity: canonicalize rapid file id URLs with trailing '/'.
    $out = preg_replace('~(rapidgator\.net/file/[a-f0-9]{32})/+$~i', '$1', $out);
    return ltrim($out, '/');
  }

  // already normalized-like
  $u = preg_replace('~^www\.~i', '', $u);
  $u = preg_replace('~\.html$~i', '', $u);
  $u = preg_replace('~(rapidgator\.net/file/[a-f0-9]{32})/+$~i', '$1', $u);
  return ltrim($u, '/');
}

function is_supported_host_canon(string $hostCanon): bool {
  static $cache = null;
  if (!is_array($cache)) {
    $cache = [];
    try {
      $cfg = load_snapshot_config();
      if (isset($cfg['hosts']) && is_array($cfg['hosts'])) {
        foreach ($cfg['hosts'] as $k => $_v) {
          if (is_string($k) && $k !== 'other' && $k !== '') $cache[$k] = true;
        }
      }
    } catch (Throwable $e) {
      // ignore
    }
  }
  if ($hostCanon === '') return false;
  // hard allow common supported hosts even if snapshot missing
  if (in_array($hostCanon, ['rapidgator.net','hitfile.net'], true)) return true;
  return !empty($cache[$hostCanon]);
}

function legacy_links_quota(int $todayStart, string $hostCanon): array {
  // Count UNIQUE URLs in day (Boss requirement).
  // Prefer legacy.sqlite (fast), fallback to scanning data/files/*.dat only when DB fails or returns 0.

  $legacyPath = API_DATA_DIR . '/legacy.sqlite';
  if (is_file($legacyPath)) {
    try {
      $lpdo = new PDO('sqlite:' . $legacyPath);
      $lpdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
      @$lpdo->exec('PRAGMA query_only=ON;');
      @$lpdo->exec('PRAGMA busy_timeout=800;');

      $acc = 0;
      $host = 0;

      $q1 = $lpdo->prepare("SELECT COUNT(DISTINCT norm_url) AS c FROM legacy_jobs WHERE mtime>=?");
      $q1->execute([$todayStart]);
      $acc = (int)(($q1->fetch(PDO::FETCH_ASSOC)['c'] ?? 0));

      $q2 = $lpdo->prepare("SELECT COUNT(DISTINCT norm_url) AS c FROM legacy_jobs WHERE mtime>=? AND host=?");
      $q2->execute([$todayStart, $hostCanon]);
      $host = (int)(($q2->fetch(PDO::FETCH_ASSOC)['c'] ?? 0));

      // Return immediately when DB has data. If both are 0, allow fallback to .dat (Boss rule).
      if ($acc > 0 || $host > 0) {
        return [
          'ok'=>true,
          'acc_links_today'=>$acc,
          'host_links_today'=>$host,
          'source'=>'legacy_sqlite',
        ];
      }
    } catch (Throwable $e) {
      // fall back to .dat
    }
  }

  // --- DAT fallback ---
  $files = glob(API_DATA_DIR . '/files/*.dat');
  rsort($files);

  $seenAcc = [];  // normUrl => true
  $seenHost = []; // normUrl => true

  foreach ($files as $path) {
    $fm = @filemtime($path);
    if ($fm !== false && $fm < $todayStart) break;

    $raw = @file_get_contents($path);
    if ($raw === false) continue;
    $obj = json_decode($raw, true);
    if (!is_array($obj)) continue;
    $leech = $obj['leech'] ?? [];
    if (!is_array($leech)) continue;

    foreach ($leech as $v) {
      if (!is_array($v)) continue;
      $mtime = (int)($v['mtime'] ?? 0);
      if ($mtime < $todayStart) continue;

      $u = (string)($v['url'] ?? ($v['fullurl'] ?? ($v['link'] ?? '')));
      $norm = normalize_url_for_uniqueness($u);
      if ($norm === '') continue;

      $seenAcc[$norm] = true;
      $hc = canonical_host_from_url($u);
      if ($hc === $hostCanon) $seenHost[$norm] = true;
    }
  }

  return [
    'ok'=>true,
    'acc_links_today'=>count($seenAcc),
    'host_links_today'=>count($seenHost),
    'source'=>'dat',
  ];
}

function legacy_bw_quota(string $dayKey, string $hostCanon): array {
  // Try newest bwplus snapshot
  $files = glob(API_DATA_DIR . '/bwplus/*.dat');
  rsort($files);
  $files = array_slice($files, 0, 5);

  foreach ($files as $path) {
    $raw = @file_get_contents($path);
    if ($raw === false) continue;
    $obj = json_decode($raw, true);
    if (!is_array($obj)) continue;
    if (!isset($obj[$dayKey]) || !is_array($obj[$dayKey])) continue;

    $day = $obj[$dayKey];
    $accUsed = null;
    $hostUsed = null;

    // Many legacy formats store acc used in day['acc']['daungay'] (bytes)
    if (isset($day['acc']['daungay'])) $accUsed = (int)$day['acc']['daungay'];

    // Host used: try a few common locations (may differ by version)
    if (isset($day['done'][$hostCanon]['total'])) $hostUsed = (int)$day['done'][$hostCanon]['total'];
    elseif (isset($day['leeching'][$hostCanon]['total'])) $hostUsed = (int)$day['leeching'][$hostCanon]['total'];

    return [
      'ok'=>true,
      'acc_used_today_bytes'=>$accUsed,
      'host_used_today_bytes'=>$hostUsed,
      'source'=>basename($path),
    ];
  }

  return ['ok'=>false,'error'=>'no bw data'];
}

function sqlite_quota(PDO $pdo, int $todayStart, string $hostCanon): array {
  // Fallback: UNIQUE URL counts + "used" computed as MAX(size_bytes) per normalized URL.
  // This matches Boss rule: reserve filesize per unique URL/day; resubmitting same URL doesn't increase used.
  $q = $pdo->prepare("SELECT origin_url, host, created_at, status, bw_charge_bytes, size_bytes FROM jobs WHERE created_at>=? AND status NOT IN ('rejected')");
  $q->execute([$todayStart]);
  $rows = $q->fetchAll(PDO::FETCH_ASSOC);

  $seenAcc = [];      // normUrl => true
  $seenHost = [];     // normUrl => true
  $accMax = [];       // normUrl => maxBytes
  $hostMax = [];      // normUrl => maxBytes (only if host matches)

  foreach ($rows as $r) {
    $u = (string)($r['origin_url'] ?? '');
    $norm = normalize_url_for_uniqueness($u);
    if ($norm === '') continue;

    $seenAcc[$norm] = true;

    $hc = (string)($r['host'] ?? '');
    if ($hc === '') $hc = canonical_host_from_url($u);
    if ($hc === $hostCanon) $seenHost[$norm] = true;

    // Prefer size_bytes; bw_charge_bytes is for real BW but in fallback mode we treat size as reservation.
    $bytes = (int)($r['size_bytes'] ?? 0);
    if ($bytes < 0) $bytes = 0;

    if (!isset($accMax[$norm]) || $bytes > $accMax[$norm]) $accMax[$norm] = $bytes;
    if ($hc === $hostCanon) {
      if (!isset($hostMax[$norm]) || $bytes > $hostMax[$norm]) $hostMax[$norm] = $bytes;
    }
  }

  $accUsed = array_sum($accMax);
  $hostUsed = array_sum($hostMax);

  return [
    'acc_links_today'=>count($seenAcc),
    'host_links_today'=>count($seenHost),
    'acc_used_today_bytes'=>(int)$accUsed,
    'host_used_today_bytes'=>(int)$hostUsed,
  ];
}

function bytes_to_mb(?int $bytes): ?float {
  if ($bytes === null) return null;
  return round($bytes / (1024*1024), 2);
}

function build_links_today_items(PDO $pdo, array $opt): array {
  $todayStart = (int)($opt['todayStart'] ?? today_start_ts());
  $source = (string)($opt['source'] ?? 'all'); // all|dat|sqlite|legacy_sqlite
  $hostFilter = (string)($opt['host'] ?? ''); // canonical host or empty
  $debug = !empty($opt['debug']);

  $map = []; // norm => item

  // --- DAT source ---
  if ($source === 'dat') {
    // Optimization: scanning all .dat files can be expensive when the folder grows.
    // Only scan the newest N files; still stop early once filemtime < todayStart.
    $maxDatFiles = 1600; // tweakable (recommended 1200-2000)
    $files = glob(API_DATA_DIR . '/files/*.dat');
    rsort($files);
    if (count($files) > $maxDatFiles) $files = array_slice($files, 0, $maxDatFiles);

    foreach ($files as $path) {
      $fm = @filemtime($path);
      if ($fm !== false && $fm < $todayStart) break;

      $raw = @file_get_contents($path);
      if ($raw === false) continue;
      $obj = json_decode($raw, true);
      if (!is_array($obj)) continue;
      $leech = $obj['leech'] ?? [];
      if (!is_array($leech)) continue;

      foreach ($leech as $k => $v) {
        if (!is_array($v)) continue;
        $mtime = (int)($v['mtime'] ?? 0);
        if ($mtime < $todayStart) continue;

        $u = (string)($v['url'] ?? ($v['fullurl'] ?? ($v['link'] ?? '')));
        $norm = normalize_url_for_uniqueness($u);
        if ($norm === '') continue;

        $hc = canonical_host_from_url($u);
        if ($hostFilter !== '' && $hc !== $hostFilter) continue;
        if (!is_supported_host_canon($hc)) continue;
      if (!is_supported_host_canon($hc)) continue;

        $size = (int)($v['msize'] ?? 0);
        if ($size < 0) $size = 0;

        if (!isset($map[$norm])) {
          $map[$norm] = [
            'norm' => $norm,
            'origin_url' => $u,
            'host' => $hc,
            'size_bytes' => $size,
            'first_seen_ts' => $mtime,
            'last_seen_ts' => $mtime,
            'source' => ['dat'],
            'dat' => $debug ? ['source_file' => basename($path), 'source_key' => is_string($k)?$k:null] : null,
            'sqlite' => null,
            'legacy_sqlite' => null,
          ];
        } else {
          $it =& $map[$norm];
          if ($mtime && $mtime < (int)$it['first_seen_ts']) $it['first_seen_ts'] = $mtime;

          // Prefer the most recently submitted/updated item when duplicates exist.
          if ($mtime && $mtime > (int)($it['last_seen_ts'] ?? 0)) {
            $it['origin_url'] = $u;
            $it['host'] = $hc;
            $it['last_seen_ts'] = $mtime;
          }

          if ($size > (int)$it['size_bytes']) $it['size_bytes'] = $size;
          if (!in_array('dat', $it['source'], true)) $it['source'][] = 'dat';
        }
      }
    }
  }

  // --- SQLITE source (api_main.sqlite) ---
  if ($source === 'all' || $source === 'sqlite') {
    $q = $pdo->prepare("SELECT job_id, origin_url, host, created_at, updated_at, status, size_bytes, direct_url_latest FROM jobs WHERE created_at>=? AND status NOT IN ('rejected','submitted') ORDER BY created_at DESC");
    $q->execute([$todayStart]);
    while ($r = $q->fetch(PDO::FETCH_ASSOC)) {
      $u = (string)($r['origin_url'] ?? '');
      $norm = normalize_url_for_uniqueness($u);
      if ($norm === '') continue;

      $hc = (string)($r['host'] ?? '');
      if ($hc === '') $hc = canonical_host_from_url($u);
      if ($hostFilter !== '' && $hc !== $hostFilter) continue;
      if (!is_supported_host_canon($hc)) continue;

      $size = (int)($r['size_bytes'] ?? 0);
      if ($size < 0) $size = 0;

      if (!isset($map[$norm])) {
        $map[$norm] = [
          'norm' => $norm,
          'origin_url' => $u,
          'host' => $hc,
          'size_bytes' => $size,
          'first_seen_ts' => (int)($r['created_at'] ?? 0),
          'last_seen_ts' => (int)($r['updated_at'] ?? $r['created_at'] ?? 0),
          'source' => ['sqlite'],
          'dat' => null,
          'legacy_sqlite' => null,
          'sqlite' => $debug ? ['job_ids' => [$r['job_id']], 'status' => (string)($r['status'] ?? ''), 'direct_url_latest' => (string)($r['direct_url_latest'] ?? '')] : null,
        ];
      } else {
        $it =& $map[$norm];

        // Prefer the most recently submitted/updated item when duplicates exist.
        $candLast = (int)($r['updated_at'] ?? ($r['created_at'] ?? 0));
        if ($candLast > (int)($it['last_seen_ts'] ?? 0)) {
          $it['origin_url'] = $u;
          $it['host'] = $hc;
          $it['last_seen_ts'] = $candLast;
        }
        if ((int)($r['created_at'] ?? 0) > 0 && (int)($it['first_seen_ts'] ?? 0) > 0) {
          $it['first_seen_ts'] = min((int)$it['first_seen_ts'], (int)$r['created_at']);
        }

        if ($size > (int)$it['size_bytes']) $it['size_bytes'] = $size;
        if (!in_array('sqlite', $it['source'], true)) $it['source'][] = 'sqlite';
        if ($debug) {
          if (!isset($it['sqlite']['job_ids'])) $it['sqlite']['job_ids'] = [];
          $it['sqlite']['job_ids'][] = $r['job_id'];
        }
      }
    }
  }

  // --- LEGACY SQLITE source (legacy.sqlite) ---
  if ($source === 'all' || $source === 'legacy_sqlite') {
    $legacyPath = API_DATA_DIR . '/legacy.sqlite';
    if (is_file($legacyPath)) {
      try {
        $lpdo = new PDO('sqlite:' . $legacyPath);
        $lpdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $q = $lpdo->prepare("SELECT job_id, origin_url, norm_url, host, status, status_code, size_bytes, bw_charge_bytes, mtime, direct_url, updated_at, created_at, source_file, source_key FROM legacy_jobs WHERE mtime>=? ORDER BY mtime DESC");
        $q->execute([$todayStart]);
        while ($r = $q->fetch(PDO::FETCH_ASSOC)) {
          $u = (string)($r['origin_url'] ?? '');
          $norm = (string)($r['norm_url'] ?? '');
          if ($norm === '') $norm = normalize_url_for_uniqueness($u);
          if ($norm === '') continue;

          $hc = (string)($r['host'] ?? '');
          if ($hc === '') $hc = canonical_host_from_url($u);
          if ($hostFilter !== '' && $hc !== $hostFilter) continue;
          if (!is_supported_host_canon($hc)) continue;

          $size = (int)($r['size_bytes'] ?? 0);
          if ($size < 0) $size = 0;

          $mtime = (int)($r['mtime'] ?? 0);
          $updated = (int)($r['updated_at'] ?? 0);
          $lastSeen = $updated > 0 ? $updated : ($mtime ?: 0);

          if (!isset($map[$norm])) {
            $map[$norm] = [
              'norm' => $norm,
              'origin_url' => $u,
              'host' => $hc,
              'size_bytes' => $size,
              'first_seen_ts' => $mtime,
              'last_seen_ts' => $lastSeen,
              'source' => ['legacy_sqlite'],
              'dat' => null,
              'sqlite' => null,
              'legacy_sqlite' => $debug ? [
                'job_ids' => [$r['job_id']],
                'status' => (string)($r['status'] ?? ''),
                'status_code' => (string)($r['status_code'] ?? ''),
                'direct_url' => (string)($r['direct_url'] ?? ''),
                'source_file' => (string)($r['source_file'] ?? ''),
                'source_key' => (string)($r['source_key'] ?? ''),
              ] : null,
            ];
          } else {
            $it =& $map[$norm];
            if ($mtime && $mtime < (int)$it['first_seen_ts']) $it['first_seen_ts'] = $mtime;

            // Prefer the most recently submitted/updated item when duplicates exist.
            if ($lastSeen && $lastSeen > (int)($it['last_seen_ts'] ?? 0)) {
              $it['origin_url'] = $u;
              $it['host'] = $hc;
              $it['last_seen_ts'] = $lastSeen;
            }

            if ($size > (int)$it['size_bytes']) $it['size_bytes'] = $size;
            if (!in_array('legacy_sqlite', $it['source'], true)) $it['source'][] = 'legacy_sqlite';
            if ($debug) {
              if (!isset($it['legacy_sqlite']['job_ids'])) $it['legacy_sqlite']['job_ids'] = [];
              $it['legacy_sqlite']['job_ids'][] = $r['job_id'];
            }
          }
        }
      } catch (Throwable $e) {
        // fail-open: ignore legacy sqlite errors
      }
    }
  }

  // --- DAT fallback for source=all ---
  // Only scan .dat when DB sources fail (exceptions) or return 0 items.
  if ($source === 'all' && count($map) === 0) {
    $maxDatFiles = 1600;
    $files = glob(API_DATA_DIR . '/files/*.dat');
    rsort($files);
    if (count($files) > $maxDatFiles) $files = array_slice($files, 0, $maxDatFiles);

    foreach ($files as $path) {
      $fm = @filemtime($path);
      if ($fm !== false && $fm < $todayStart) break;

      $raw = @file_get_contents($path);
      if ($raw === false) continue;
      $obj = json_decode($raw, true);
      if (!is_array($obj)) continue;
      $leech = $obj['leech'] ?? [];
      if (!is_array($leech)) continue;

      foreach ($leech as $k => $v) {
        if (!is_array($v)) continue;
        $mtime = (int)($v['mtime'] ?? 0);
        if ($mtime < $todayStart) continue;

        $u = (string)($v['url'] ?? ($v['fullurl'] ?? ($v['link'] ?? '')));
        $norm = normalize_url_for_uniqueness($u);
        if ($norm === '') continue;

        $hc = canonical_host_from_url($u);
        if ($hostFilter !== '' && $hc !== $hostFilter) continue;
        if (!is_supported_host_canon($hc)) continue;

        $size = (int)($v['msize'] ?? 0);
        if ($size < 0) $size = 0;

        if (!isset($map[$norm])) {
          $map[$norm] = [
            'norm' => $norm,
            'origin_url' => $u,
            'host' => $hc,
            'size_bytes' => $size,
            'first_seen_ts' => $mtime,
            'last_seen_ts' => $mtime,
            'source' => ['dat'],
            'dat' => $debug ? ['source_file' => basename($path), 'source_key' => is_string($k)?$k:null] : null,
            'sqlite' => null,
            'legacy_sqlite' => null,
          ];
        } else {
          $it =& $map[$norm];
          if ($mtime && $mtime < (int)$it['first_seen_ts']) $it['first_seen_ts'] = $mtime;

          // Prefer the most recently submitted/updated item when duplicates exist.
          if ($mtime && $mtime > (int)($it['last_seen_ts'] ?? 0)) {
            $it['origin_url'] = $u;
            $it['host'] = $hc;
            $it['last_seen_ts'] = $mtime;
          }

          if ($size > (int)$it['size_bytes']) $it['size_bytes'] = $size;
          if (!in_array('dat', $it['source'], true)) $it['source'][] = 'dat';
        }
      }
    }
  }

  // enforce de-dup by norm
  $items = array_values($map);
  $uniq = [];
  foreach ($items as $it) {
    $n = (string)($it['norm'] ?? '');
    if ($n === '') continue;
    $uniq[$n] = $it;
  }
  return array_values($uniq);
}

function list_links_today(PDO $pdo, array $opt): array {
  $todayStart = (int)($opt['todayStart'] ?? today_start_ts());
  $dayKey = (string)($opt['dayKey'] ?? today_key());
  $source = (string)($opt['source'] ?? 'all'); // all|dat|sqlite|legacy_sqlite
  $hostFilter = (string)($opt['host'] ?? ''); // canonical host or empty
  $debug = !empty($opt['debug']);
  $limit = max(1, (int)($opt['limit'] ?? 200));
  $offset = max(0, (int)($opt['offset'] ?? 0));

  $items = build_links_today_items($pdo, [
    'todayStart' => $todayStart,
    'source' => $source,
    'host' => $hostFilter,
    'debug' => $debug,
  ]);

  // Summaries (from merged unique items)
  $accLinks = count($items);
  $accUsedBytes = 0;
  $hostLinks = 0;
  $hostUsedBytes = 0;
  foreach ($items as $it) {
    $accUsedBytes += (int)($it['size_bytes'] ?? 0);
    if ($hostFilter !== '' && ($it['host'] ?? '') === $hostFilter) {
      $hostLinks++;
      $hostUsedBytes += (int)($it['size_bytes'] ?? 0);
    }
  }

  // Pagination on stable order (last_seen desc)
  usort($items, function($a,$b){ return (int)($b['last_seen_ts'] ?? 0) <=> (int)($a['last_seen_ts'] ?? 0); });
  $paged = array_slice($items, $offset, $limit);

  // Debug cross-check: sum from returned items
  $returnedUsedBytes = 0;
  foreach ($paged as $it) $returnedUsedBytes += (int)($it['size_bytes'] ?? 0);

  if (!$debug) {
    // drop heavy fields
    foreach ($paged as &$it) { unset($it['dat']); unset($it['sqlite']); }
  }

  return [
    'ok' => true,
    'day_key' => $dayKey,
    'today_start' => $todayStart,
    'host' => $hostFilter,
    'source' => $source,
    'counts' => [
      'acc_links_today' => $accLinks,
      'acc_used_today_bytes' => (int)$accUsedBytes,
      'acc_used_today_mb' => bytes_to_mb($accUsedBytes),
      'host_links_today' => $hostLinks,
      'host_used_today_bytes' => (int)$hostUsedBytes,
      'host_used_today_mb' => bytes_to_mb($hostUsedBytes),
      'returned_used_today_bytes' => (int)$returnedUsedBytes,
      'returned_used_today_mb' => bytes_to_mb($returnedUsedBytes),
    ],
    'debug' => [
      'tz' => date_default_timezone_get(),
      'now' => date('c'),
      'today_start_iso' => date('c', $todayStart),
    ],
    'page' => ['limit'=>$limit,'offset'=>$offset,'returned'=>count($paged),'total'=>$accLinks],
    'items' => $paged,
  ];
}
function load_snapshot_config(): array {
  static $cfg = null;
  if (is_array($cfg)) return $cfg;

  $cfgPath = '/var/www/html.main/mygl/api_core/engine/current/config.snapshot.php';
  $snap = [];
  if (is_file($cfgPath)) {
    $tmp = @require $cfgPath;
    if (is_array($tmp)) $snap = $tmp;
  }
  $cfg = $snap;

  // Phase1-B2 (compare-only): inspect runtime legacy config and log diff, do NOT switch behavior yet.
  $accDir = runtime_account_dir();
  $prePath = runtime_resolve_pre_config_path($accDir);
  $rtCore = [];
  $rtHostsCount = 0;
  $rtHostsMap = [];
  if ($prePath !== '' && is_file($prePath)) {
    $txt = (string)@file_get_contents($prePath);
    if ($txt !== '') { $rtCore = runtime_parse_pre_config_text($txt); $rtHostsCount = runtime_pre_hosts_count($txt); $rtHostsMap = runtime_parse_pre_hosts_map($txt); }
  }

  $useRuntime = runtime_config_switch_enabled();
  if ($useRuntime && !empty($rtCore)) {
    if (!empty($rtCore['limitMBIP'])) $cfg['daily_add_mb'] = runtime_eval_mb_expr($rtCore['limitMBIP']);
    if (!empty($rtCore['plusbw'])) $cfg['plusbw_mb'] = runtime_eval_mb_expr($rtCore['plusbw']);
    if (!empty($rtCore['maxbw'])) $cfg['maxbw_mb'] = runtime_eval_mb_expr($rtCore['maxbw']);
    if (!empty($rtCore['ttl'])) $cfg['ttl_minutes'] = runtime_eval_mb_expr($rtCore['ttl']);
    if (isset($rtCore['reset_bw']) && $rtCore['reset_bw'] !== '') {
      $rb = strtolower((string)$rtCore['reset_bw']);
      $cfg['reset_bw'] = (strpos($rb, 'true') !== false || $rb === '1');
    }
  
    if (!empty($rtHostsMap)) $cfg['hosts'] = $rtHostsMap;
  }

  if (function_exists('phase1_compat_log')) {
    phase1_compat_log('config_compare_b2', [
      'mode' => ($useRuntime ? ((!empty($rtHostsMap) ? 'runtime_core_hosts_enabled' : 'runtime_core_enabled')) : 'compare_only'),
      'account_dir' => $accDir,
      'runtime_pre' => $prePath,
      'runtime_core' => $rtCore,
      'snapshot_core' => [
        'daily_add_mb' => (string)($snap['daily_add_mb'] ?? ''),
        'plusbw_mb' => (string)($snap['plusbw_mb'] ?? ''),
        'maxbw_mb' => (string)($snap['maxbw_mb'] ?? ''),
        'ttl_minutes' => (string)($snap['ttl_minutes'] ?? ''),
        'reset_bw' => (string)($snap['reset_bw'] ?? ''),
      ],
      'snapshot_hosts' => (int)count((array)($snap['hosts'] ?? [])),
      'runtime_hosts_count' => (int)$rtHostsCount,
      'effective_hosts' => (int)count((array)($cfg['hosts'] ?? [])),
      'effective_core' => [
        'daily_add_mb' => (string)($cfg['daily_add_mb'] ?? ''),
        'plusbw_mb' => (string)($cfg['plusbw_mb'] ?? ''),
        'maxbw_mb' => (string)($cfg['maxbw_mb'] ?? ''),
        'ttl_minutes' => (string)($cfg['ttl_minutes'] ?? ''),
        'reset_bw' => (string)($cfg['reset_bw'] ?? ''),
      ],
      'use_runtime_flag' => $useRuntime,
    ]);
  }

  return $cfg;
}



// Phase1-B1 (safe/read-only): runtime config helper stubs for legacy parity migration.
function runtime_account_dir(): string {
  $sf = (string)($_SERVER['SCRIPT_FILENAME'] ?? '');
  if ($sf !== '' && is_file($sf)) return rtrim(dirname($sf), '/');
  return '';
}

function runtime_resolve_pre_config_path(string $accountDir): string {
  if ($accountDir === '') return '';
  $cfgPhp = $accountDir . '/config.php';
  if (!is_file($cfgPhp)) return '';
  $txt = (string)@file_get_contents($cfgPhp);
  if ($txt === '') return '';

  // Match include("...config.pre....php") / require_once('...') style.
  $inc = '';
  if (preg_match('/(?:include|require|include_once|require_once)\s*\(\s*["\']([^"\']*config\.pre[^"\']*\.php)["\']\s*\)\s*;/i', $txt, $m)) {
    $inc = (string)($m[1] ?? '');
  }
  if ($inc === '') return '';

  $cand = realpath($accountDir . '/' . $inc);
  if ($cand && is_file($cand)) return $cand;
  $cand2 = realpath('/var/www/html.main/mygl/' . ltrim($inc, './'));
  if ($cand2 && is_file($cand2)) return $cand2;
  return '';
}

function runtime_eval_mb_expr($v): int {
  $s = trim((string)$v);
  if ($s === '') return 0;
  if (preg_match('/^\d+$/', $s)) return (int)$s;
  if (preg_match('/^(\d+)\s*\*\s*(\d+)$/', $s, $m)) return (int)$m[1] * (int)$m[2];
  if (preg_match('/^(\d+)\s*\*\s*(\d+)\s*\*\s*(\d+)$/', $s, $m)) return (int)$m[1]*(int)$m[2]*(int)$m[3];
  return (int)$s;
}

function runtime_pre_hosts_count(string $txt): int {
  if ($txt === '') return 0;
  $body = '';
  if (preg_match('/\$this->hosts\s*=\s*array\((.*?)\);\s*\$this->maxsv/s', $txt, $m)) $body = (string)($m[1] ?? '');
  elseif (preg_match('/\$this->hosts\s*=\s*array\((.*?)\);/s', $txt, $m)) $body = (string)($m[1] ?? '');
  if ($body === '') return 0;
  if (!preg_match_all("/'[^']+'\s*=>\s*array\s*\(/", $body, $mm)) return 0;
  return (int)count($mm[0] ?? []);
}


function runtime_parse_pre_hosts_map(string $txt): array {
  $out = [];
  if ($txt === '') return $out;
  $body = '';
  if (preg_match('/\$this->hosts\s*=\s*array\((.*?)\);\s*\$this->maxsv/s', $txt, $m)) $body = (string)($m[1] ?? '');
  elseif (preg_match('/\$this->hosts\s*=\s*array\((.*?)\);/s', $txt, $m)) $body = (string)($m[1] ?? '');
  if ($body === '') return $out;

  if (preg_match_all("/'([^']+)'\s*=>\s*array\((.*?)\)\s*,?/s", $body, $rows, PREG_SET_ORDER)) {
    foreach ($rows as $r) {
      $h = strtolower(trim((string)($r[1] ?? '')));
      $b = (string)($r[2] ?? '');
      if ($h === '') continue;
      $pick = function(string $pat, string $src): string {
        if (preg_match($pat, $src, $m2)) return trim((string)$m2[1]);
        return '';
      };
      $daily = runtime_eval_mb_expr($pick("/'daily_bw'\s*=>\s*([^,\n]+)/i", $b));
      $over  = runtime_eval_mb_expr($pick("/'overlimit_bw'\s*=>\s*([^,\n]+)/i", $b));
      $maxL  = runtime_eval_mb_expr($pick("/'max_link'\s*=>\s*([^,\n]+)/i", $b));
      $out[$h] = [
        'daily_bw_mb' => $daily,
        'overlimit_bw_mb' => $over,
        'max_link' => $maxL,
        'counts_to_total' => true,
      ];
    }
  }
  return $out;
}

function runtime_parse_pre_config_text(string $txt): array {
  $pick = function(string $pat, string $src): string {
    if (preg_match($pat, $src, $m)) return trim((string)$m[1]);
    return '';
  };
  return [
    'limitMBIP' => $pick('/\$limitMBIP\s*=\s*([^;]+);/i', $txt),
    'plusbw' => $pick('/\$plusbw\s*=\s*([^;]+);/i', $txt),
    'maxbw' => $pick('/\$maxbw\s*=\s*([^;]+);/i', $txt),
    'ttl' => $pick('/\$ttl\s*=\s*([^;]+);/i', $txt),
    'reset_bw' => $pick("/'reset_bw'\s*=>\s*([^,\n]+)/i", $txt),
  ];
}
function load_api_routing(): array {
  $cfg = load_snapshot_config();
  if (isset($cfg['routing']) && is_array($cfg['routing'])) return $cfg['routing'];
  return [];
}

function phase1_compat_enabled(): bool {
  if (isset($_GET['compat_debug']) && (string)$_GET['compat_debug'] === '1') return true;
  $v = getenv('OPENCLAW_API_COMPAT_DEBUG');
  return ($v === '1' || strtolower((string)$v) === 'true');
}

function phase1_compat_log(string $event, array $data = []): void {
  if (!phase1_compat_enabled()) return;
  try {
    if (!is_dir(API_DATA_DIR . '/logs')) @mkdir(API_DATA_DIR . '/logs', 0775, true);
    $line = date('c') . ' | ' . $event . ' | ' . json_encode($data, JSON_UNESCAPED_UNICODE|JSON_UNESCAPED_SLASHES) . "
";
    @file_put_contents(API_DATA_DIR . '/logs/compat_phase1.log', $line, FILE_APPEND);
  } catch (Throwable $e) {}
}

function api_debt_write_enabled(): bool {
  // Phase 1 policy: legacy writer is primary; API debt write disabled by default.
  $v = getenv('OPENCLAW_API_DEBT_WRITE');
  if ($v !== false && $v !== '') return ($v === '1' || strtolower((string)$v) === 'true');
  return false;
}


function runtime_config_switch_enabled(): bool {
  if (isset($_GET['runtime_cfg']) && (string)$_GET['runtime_cfg'] === '1') return true;
  $v = getenv('OPENCLAW_API_USE_RUNTIME_CONFIG');
  return ($v === '1' || strtolower((string)$v) === 'true');
}

function ratelimit_allow(PDO $pdo, string $key, int $windowSeconds, int $maxCount): bool {
  // Fail-open: ratelimit is a performance guard; it must not break query_status.
  try {
    $now = time();
    $row = $pdo->prepare('SELECT window_start,count FROM ratelimit WHERE k=?');
    $row->execute([$key]);
    $r = $row->fetch(PDO::FETCH_ASSOC);
    $ws = (int)($r['window_start'] ?? 0);
    $cnt = (int)($r['count'] ?? 0);

    if ($ws <= 0 || ($now - $ws) >= $windowSeconds) {
      $pdo->prepare('INSERT INTO ratelimit(k,window_start,count) VALUES(?,?,1) ON CONFLICT(k) DO UPDATE SET window_start=excluded.window_start,count=excluded.count')
        ->execute([$key, $now]);
      return true;
    }

    if ($cnt >= $maxCount) return false;
    $pdo->prepare('UPDATE ratelimit SET count=count+1 WHERE k=?')->execute([$key]);
    return true;
  } catch (PDOException $e) {
    $msg = strtolower($e->getMessage());
    if (strpos($msg, 'database is locked') !== false || strpos($msg, 'busy') !== false) {
      return true;
    }
    // unexpected error: also fail-open
    return true;
  } catch (Throwable $e) {
    return true;
  }
}

function ratelimit_min_interval(PDO $pdo, string $key, int $minSeconds): array {
  // Enforce minimum interval between actions (ms precision).
  // Returns ['ok'=>bool,'allow'=>bool,'retry_after_sec'=>int]
  $minMs = max(0, (int)$minSeconds) * 1000;
  $nowMs = (int)floor(microtime(true) * 1000);

  $row = $pdo->prepare('SELECT window_start,count FROM ratelimit WHERE k=?');
  try {
    $row->execute([$key]);
  } catch (PDOException $e) {
    $msg = strtolower($e->getMessage());
    if (strpos($msg, 'database is locked') !== false || strpos($msg, 'busy') !== false) {
      return ['ok'=>true,'allow'=>true,'retry_after_sec'=>0,'note'=>'ratelimit_locked_failopen'];
    }
    return ['ok'=>false,'allow'=>true,'retry_after_sec'=>0,'note'=>'ratelimit_error_failopen'];
  }
  $r = $row->fetch(PDO::FETCH_ASSOC);
  $ws = (int)($r['window_start'] ?? 0);

  // Backward compat: older rows may store seconds; detect and convert.
  if ($ws > 0 && $ws < 2000000000) { // < ~2033-05-18 in seconds
    $ws = $ws * 1000;
  }

  if ($ws <= 0 || ($nowMs - $ws) >= $minMs) {
    $pdo->prepare('INSERT INTO ratelimit(k,window_start,count) VALUES(?,?,1) ON CONFLICT(k) DO UPDATE SET window_start=excluded.window_start,count=excluded.count')
      ->execute([$key, $nowMs]);
    return ['ok'=>true,'allow'=>true,'retry_after_sec'=>0];
  }

  $retryMs = $minMs - ($nowMs - $ws);
  if ($retryMs < 0) $retryMs = 0;
  $retrySec = (int)ceil($retryMs / 1000);
  if ($retrySec < 1) $retrySec = 1;
  return ['ok'=>true,'allow'=>false,'retry_after_sec'=>$retrySec];
}

function leechmng_endpoint_for_host(string $host): ?string {
  $rt = load_api_routing();
  $m = $rt['mng_normal'] ?? null;
  if (is_array($m)) {
    if (isset($m[$host])) return (string)$m[$host];
    if (isset($m['other'])) return (string)$m['other'];
  }
  return null;
}

function legacy_latest_dat_file(): string {
  $files = glob(API_DATA_DIR . '/files/*.dat');
  rsort($files);
  if (!empty($files)) return (string)$files[0];
  $ms = (int)round(microtime(true) * 1000);
  return API_DATA_DIR . '/files/' . $ms . '.dat';
}

function legacy_dat_merge(array $a, array $b): array {
  // Recursive merge where $b overwrites $a (similar to legacy update_array)
  foreach ($b as $k => $v) {
    if (is_array($v)) {
      $a[$k] = legacy_dat_merge(isset($a[$k]) && is_array($a[$k]) ? $a[$k] : [], $v);
    } else {
      $a[$k] = $v;
    }
  }
  return $a;
}

function legacy_dat_upsert_reject(array $args): array {
  // sqlite-only mode: mirror legacy job state into legacy.sqlite (legacy_jobs2)
  // IMPORTANT: Do NOT write data/files/*.dat (avoid duplicate UI rows + keep sqlite as the single source of truth).
  $url = (string)($args['url'] ?? '');
  $host = (string)($args['host'] ?? '');
  $status = (int)($args['status'] ?? 0);
  $filename = (string)($args['filename'] ?? '');
  $sizeText = (string)($args['size_text'] ?? '');
  $msize = (int)($args['size_bytes'] ?? 0);
  $bwtam = (int)($args['bw_charge_bytes'] ?? $msize);
  $owner = (string)($args['owner'] ?? '');
  $ip = (string)($args['ip'] ?? ($_SERVER['REMOTE_ADDR'] ?? '127.0.0.1'));
  $network = (string)($args['network'] ?? '');
  $directUrl = (string)($args['direct_url'] ?? '');

  if ($url === '' || $host === '') return ['ok'=>false,'error'=>'missing fields'];

  // Normalize legacy URL to match legacy UI behavior (strip .html, rg.to alias)
  $url2 = preg_replace('~^https?://rg\.to/file/~i', 'https://rapidgator.net/file/', $url);
  $url2 = preg_replace('~\.html$~i', '', $url2);

  // Canonical legacy key (no suffix)
  $hash = md5($url2 . API_SELF_URL);
  $key = $hash;
  $hashShort = substr(md5($hash), 0, 10);
  $pathShort = substr(md5($hashShort), 0, 5);

  // Fail-closed: never store internal pve/autodel as direct_url for legacy UI.
  if ($directUrl !== '' && !is_public_directlink($directUrl)) {
    $directUrl = '';
    // if no public link, do not mark as ready in sqlite-only mode
    if ($status === 1 || $status === 3) $status = 0;
  }

  $statusTxt = 'processing';
  if ($status === 1 || $status === 3) $statusTxt = 'ready';
  elseif ($status === 4) $statusTxt = 'failed';
  elseif (in_array($status, [5,6,7], true)) $statusTxt = 'rejected';

  $norm2 = normalize_url_for_uniqueness($url2);
  if ($norm2 === '') $norm2 = $url;
  $norm2 = preg_replace('~^https?://~i', '', $norm2);
  $norm2 = preg_replace('~^www\.~i', '', $norm2);
  $norm2 = preg_replace('~/$~', '', $norm2);

  // lock to avoid sqlite corruption under concurrent API calls
  $lockPath = API_DATA_DIR . '/.legacy_sqlite_write.lock';
  $lockFp = @fopen($lockPath, 'c');
  $locked = false;
  if ($lockFp) {
    $start = time();
    while (!($locked = @flock($lockFp, LOCK_EX | LOCK_NB))) {
      if (time() - $start >= 3) break;
      usleep(100000);
    }
  }

  try {
    $legacySqlite = API_DATA_DIR . '/legacy.sqlite';
    $db = new PDO('sqlite:' . $legacySqlite);
    $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
    $db->exec('PRAGMA journal_mode=WAL;');
    $db->exec('PRAGMA synchronous=NORMAL;');
    $db->exec('PRAGMA busy_timeout=2000;');

    $db->exec('CREATE TABLE IF NOT EXISTS legacy_jobs2 (
      job_key TEXT PRIMARY KEY,
      base_id TEXT,
      origin_url TEXT,
      norm_url TEXT,
      host TEXT,
      status TEXT,
      status_code TEXT,
      filename TEXT,
      size_text TEXT,
      size_bytes INTEGER,
      bw_charge_bytes INTEGER,
      mtime INTEGER,
      path TEXT,
      hash10 TEXT,
      direct_url TEXT,
      owner TEXT,
      client_ip TEXT,
      client_network TEXT,
      job_type TEXT,
      showfile INTEGER,
      releech INTEGER,
      speed TEXT,
      source_file TEXT,
      meta_json TEXT,
      created_at INTEGER,
      updated_at INTEGER
    );');

    $now2 = time();
    $stmt2 = $db->prepare('INSERT INTO legacy_jobs2(
      job_key, base_id, origin_url, norm_url, host, status, status_code, filename, size_text, size_bytes, bw_charge_bytes,
      mtime, path, hash10, direct_url, owner, client_ip, client_network, job_type, showfile, releech, speed,
      source_file, meta_json, created_at, updated_at
    ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
    ON CONFLICT(job_key) DO UPDATE SET
      base_id=excluded.base_id,
      origin_url=excluded.origin_url,
      norm_url=excluded.norm_url,
      host=excluded.host,
      status=excluded.status,
      status_code=excluded.status_code,
      filename=excluded.filename,
      size_text=excluded.size_text,
      size_bytes=excluded.size_bytes,
      bw_charge_bytes=excluded.bw_charge_bytes,
      mtime=excluded.mtime,
      path=excluded.path,
      hash10=excluded.hash10,
      direct_url=excluded.direct_url,
      owner=excluded.owner,
      client_ip=excluded.client_ip,
      client_network=excluded.client_network,
      job_type=excluded.job_type,
      showfile=excluded.showfile,
      releech=excluded.releech,
      speed=excluded.speed,
      source_file=excluded.source_file,
      meta_json=excluded.meta_json,
      updated_at=excluded.updated_at');

    $stmt2->execute([
      (string)$key,
      (string)$key,
      (string)$url2,
      (string)$norm2,
      (string)$host,
      (string)$statusTxt,
      (string)$status,
      (string)$filename,
      (string)$sizeText,
      (int)$msize,
      (int)$bwtam,
      (int)$now2,
      (string)$pathShort,
      (string)$hashShort,
      (string)$directUrl,
      (string)$owner,
      (string)$ip,
      (string)$network,
      'leech',
      1,
      0,
      '0',
      'sqlite_only',
      null,
      (int)$now2,
      (int)$now2
    ]);

    return ['ok'=>true,'mode'=>'sqlite_only','key'=>$key];
  } catch (Throwable $e) {
    return ['ok'=>false,'error'=>$e->getMessage()];
  } finally {
    if ($locked) @flock($lockFp, LOCK_UN);
    if ($lockFp) @fclose($lockFp);
  }
}

function load_account_identity(): array {
  // Legacy identity lives in data/info.php
  $SecureID = $email = $mobile = '';
  $infoPhp = API_ACCOUNT_DIR . '/data/info.php';
  if (is_file($infoPhp)) {
    @include $infoPhp;
  }
  return [
    'secureid' => is_string($SecureID ?? null) ? $SecureID : '',
    'email' => is_string($email ?? null) ? $email : '',
    'mobile' => is_string($mobile ?? null) ? $mobile : '',
  ];
}

function leechmng_log_event(array $event): void {
  // Lightweight log for debugging leechmng sync.
  $dir = API_DATA_DIR . '/logs';
  @mkdir($dir, 0775, true);
  $event['ts'] = time();
  $line = json_encode($event, JSON_UNESCAPED_SLASHES);
  if ($line === false) return;
  @file_put_contents($dir . '/leechmng_api.log', $line . "\n", FILE_APPEND | LOCK_EX);
}

function leechmng_extract_directlink(array $resp): ?string {
  if (empty($resp['ok']) || empty($resp['raw']) || !is_array($resp['raw'])) return null;
  $raw = $resp['raw'];
  if (isset($raw['directlink']) && is_string($raw['directlink']) && stristr($raw['directlink'], 'http')) return $raw['directlink'];
  if (isset($raw['directlink']['url']) && is_string($raw['directlink']['url']) && stristr($raw['directlink']['url'], 'http')) return $raw['directlink']['url'];
  return null;
}

function is_public_directlink(string $url): bool {
  // Heuristic: legacy public links are https://us*.20gbps.info/dlink6868/...
  return (bool)preg_match('~^https?://[^/]*20gbps\.info/dlink6868/~i', $url);
}

function is_internal_directlink(string $url): bool {
  if ($url === '') return false;

  // Internal/origin directlinks are typically on pve*.getlink.biz with /autodel/
  // Seen formats:
  //  - http://pve5.getlink.biz:8089/autodel/<filename>
  //  - http://pve0045.getlink.biz:8989/autodel/<filename>
  if (stristr($url, '/autodel/')) return true;

  // Be permissive on pve host + common ports (legacy infra varies).
  if (preg_match('~^https?://pve\d+\.getlink\.biz:(?:8089|8989)~i', $url)) return true;

  return false;
}

function convert_internal_to_public(string $internalUrl): array {
  // Convert origin/internal link (pve*/autodel) -> public dlink6868.
  // This is the critical "bridge" between leechmng DB (often stores internal) and API output (must be public).
  if ($internalUrl === '') return ['ok'=>false,'error'=>'empty'];
  if (is_public_directlink($internalUrl)) return ['ok'=>true,'public'=>$internalUrl];

  try {
    if (!defined('API_SELF_URL')) define('API_SELF_URL', API_BASE_URL . 'index.php');

    // Ensure snapshot config is loaded for bootstrap.php (it expects global $__gl_config).
    global $__gl_config;
    if (!is_array($__gl_config ?? null)) {
      $cfgPath = '/var/www/html.main/mygl/api_core/engine/current/config.snapshot.php';
      $tmpCfg = is_file($cfgPath) ? @require $cfgPath : null;
      if (is_array($tmpCfg)) $__gl_config = $tmpCfg;
      else $__gl_config = [];
    }

    require_once '/var/www/html.main/mygl/api_core/engine/current/bootstrap.php';
    $engine = getlink_api_engine();

    // legacy get() behavior depends on these flags
    $engine->leech = true;
    // some code paths expect myip/owner
    $engine->myip = '127.0.0.1';
    $engine->owner = 1;

    // Defensive: legacy code may read from globals/REQUEST.
    // Keep it minimal but explicit to avoid state bleed between requests.
    $_GET = [];
    $_POST = [];
    $_REQUEST = [];
    $_POST['download'] = 'down';
    $_GET['id'] = 'get';
    $_REQUEST = array_merge($_GET, $_POST);

    $new = $engine->get($internalUrl);

    // Debug trace (only local file; no user data leak outside server)
    leechmng_log_event([
      'kind' => 'convert_internal_to_public',
      'internal' => $internalUrl,
      'engine_ret_type' => is_array($new) ? 'array' : gettype($new),
      'engine_ret_keys' => is_array($new) ? array_keys($new) : null,
      'engine_ret' => is_array($new) ? $new : (string)$new,
    ]);

    if (is_array($new) && isset($new['dlink']) && is_string($new['dlink']) && stristr($new['dlink'], 'http')) {
      return ['ok'=>true,'public'=>$new['dlink'],'raw'=>$new];
    }
    return ['ok'=>false,'error'=>'no dlink','raw'=>$new];
  } catch (Throwable $e) {
    leechmng_log_event([
      'kind' => 'convert_internal_to_public_err',
      'internal' => $internalUrl,
      'error' => $e->getMessage(),
    ]);
    return ['ok'=>false,'error'=>$e->getMessage()];
  }
}

function convert_internal_to_public_subprocess(string $internalUrl, int $timeoutSec = 5): array {
  // Fail-closed: never return internal URL to clients.
  // Run legacy conversion in a subprocess with a hard timeout to avoid hanging API workers.
  $u = trim((string)$internalUrl);
  if ($u === '') return ['ok'=>false,'error'=>'empty'];
  if (is_public_directlink($u)) return ['ok'=>true,'public'=>$u,'mode'=>'already_public'];
  if (!is_internal_directlink($u)) return ['ok'=>false,'error'=>'not_internal'];

  $timeoutSec = (int)$timeoutSec;
  if ($timeoutSec <= 0) $timeoutSec = 5;

  $worker = '/var/www/html.main/mygl/api_core/convert_worker.php';
  if (!is_file($worker)) return ['ok'=>false,'error'=>'worker_missing'];

  $php = PHP_BINARY ?: 'php';
  $cmd = $php . ' ' . escapeshellarg($worker) . ' ' . escapeshellarg($u);

  $descriptorspec = [
    0 => ['pipe','r'],
    1 => ['pipe','w'],
    2 => ['pipe','w'],
  ];

  // Start in its own process group so we can SIGKILL the whole group on timeout.
  $proc = @proc_open('setsid ' . $cmd, $descriptorspec, $pipes);
  if (!is_resource($proc)) return ['ok'=>false,'error'=>'proc_open_fail'];

  @fclose($pipes[0]);
  stream_set_blocking($pipes[1], false);
  stream_set_blocking($pipes[2], false);

  $out = '';
  $err = '';
  $start = microtime(true);

  while (true) {
    $st = proc_get_status($proc);
    $out .= stream_get_contents($pipes[1]);
    $err .= stream_get_contents($pipes[2]);

    if (!$st['running']) break;

    if ((microtime(true) - $start) >= $timeoutSec) {
      // Kill process group
      $pid = (int)($st['pid'] ?? 0);
      if ($pid > 0) {
        @posix_kill(-$pid, 9);
        @posix_kill($pid, 9);
      }
      @proc_terminate($proc, 9);
      @fclose($pipes[1]);
      @fclose($pipes[2]);
      @proc_close($proc);
      return ['ok'=>false,'error'=>'timeout','timeout_sec'=>$timeoutSec];
    }

    usleep(50000); // 50ms
  }

  @fclose($pipes[1]);
  @fclose($pipes[2]);
  @proc_close($proc);

  $out = trim((string)$out);
  if ($out === '') return ['ok'=>false,'error'=>'empty_output','stderr'=>trim($err)];

  $j = json_decode($out, true);
  if (!is_array($j)) return ['ok'=>false,'error'=>'bad_json','raw'=>$out,'stderr'=>trim($err)];

  if (!empty($j['ok']) && !empty($j['public']) && is_string($j['public']) && is_public_directlink((string)$j['public'])) {
    return ['ok'=>true,'public'=>(string)$j['public'],'mode'=>'subprocess'];
  }

  return ['ok'=>false,'error'=>(string)($j['error'] ?? 'convert_failed'),'mode'=>'subprocess','stderr'=>trim($err)];
}

function api_legacy_hash_for_link(string $effectiveLink): string {
  return md5((string)$effectiveLink . API_SELF_URL);
}

function api_jobs_set_legacy_hash(PDO $pdo, string $jobId, string $effectiveLink): void {
  $legacyHash = api_legacy_hash_for_link($effectiveLink);
  try {
    $pdo->prepare("UPDATE jobs SET legacy_hash=COALESCE(NULLIF(legacy_hash,''), ?) WHERE job_id=?")
      ->execute([$legacyHash, $jobId]);
  } catch (Throwable $e) {
    // best-effort
  }
}

function leechmng_savelink(string $endpoint, string $passleech, array $postFields): array {
  // Calls leechmng savelink endpoint (same endpoint as normal leechmng URL).
  $cookie = 'auth=' . md5($passleech);
  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $endpoint);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  curl_setopt($ch, CURLOPT_POST, true);
  curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($postFields));
  curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 8);
  curl_setopt($ch, CURLOPT_TIMEOUT, 15);
  curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
  curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
  curl_setopt($ch, CURLOPT_COOKIE, $cookie);
  curl_setopt($ch, CURLOPT_USERAGENT, 'GetlinkAPI/0.1 (leechmng-savelink)');
  $body = curl_exec($ch);
  $err = curl_error($ch);
  $code = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
  curl_close($ch);
  if ($body === false || $code <= 0) return ['ok'=>false,'error'=>$err ?: 'curl failed', 'http_code'=>$code];

  $json = json_decode($body, true);
  if (!is_array($json)) {
    $body2 = trim((string)$body, "\xEF\xBB\xBF");
    $json = json_decode($body2, true);
  }
  if (!is_array($json)) return ['ok'=>false,'error'=>'invalid json','raw'=>$body, 'http_code'=>$code];
  return ['ok'=>true,'raw'=>$json,'status'=>(string)($json['status'] ?? ''), 'http_code'=>$code];
}

function leechmng_check_hash(PDO $pdo, string $endpoint, string $passleech, string $hash, string $status = '0'): array {
  // Returns ['ok'=>bool,'status'=>string|null,'direct_url'=>string|null,'raw'=>mixed]
  // NOTE: leechmng/index.php expects POST(hash) (full md5 from savelink), not our older short-hash.
  $cookie = 'auth=' . md5($passleech);
  $post = 'hash=' . urlencode($hash);

  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $endpoint);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  curl_setopt($ch, CURLOPT_POST, true);
  curl_setopt($ch, CURLOPT_POSTFIELDS, $post);
  curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 8);
  curl_setopt($ch, CURLOPT_TIMEOUT, 15);
  curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
  curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
  curl_setopt($ch, CURLOPT_COOKIE, $cookie);
  curl_setopt($ch, CURLOPT_USERAGENT, 'GetlinkAPI/0.1 (leechmng-fastpath)');
  $body = curl_exec($ch);
  $err = curl_error($ch);
  $code = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
  curl_close($ch);

  if ($body === false || $code <= 0) {
    return ['ok'=>false,'error'=>$err ?: 'curl failed'];
  }

  $json = json_decode($body, true);
  if (!is_array($json)) {
    $body2 = trim((string)$body, "\xEF\xBB\xBF");
    $json = json_decode($body2, true);
  }
  if (!is_array($json)) return ['ok'=>false,'error'=>'invalid json','raw'=>$body];

  $direct = null;
  // Some leechmng variants return directlink as string or under ['directlink']
  if (isset($json['directlink']) && is_string($json['directlink']) && stristr($json['directlink'], 'http')) {
    $direct = $json['directlink'];
  } elseif (isset($json['directlink']['url']) && is_string($json['directlink']['url']) && stristr($json['directlink']['url'], 'http')) {
    $direct = $json['directlink']['url'];
  } elseif (isset($json['dlink']) && is_string($json['dlink']) && stristr($json['dlink'], 'http')) {
    $direct = $json['dlink'];
  }

  return [
    'ok'=>true,
    'status'=>(string)($json['status'] ?? ''),
    'direct_url'=>$direct,
    'raw'=>$json,
  ];
}

function leechmng_http_lookup_fuzzy(PDO $pdo, string $endpoint, string $passleech, string $baseHash, int $maxSuffix = 5): array {
  // leechmng may append suffixes to hash to store multiple rows. Mirror the DB fuzzy lookup.
  $best = ['ok'=>false,'status'=>null,'direct_url'=>null,'raw'=>null,'error'=>'not found','hash_used'=>null];

  for ($i = 0; $i <= $maxSuffix; $i++) {
    $h = ($i === 0) ? $baseHash : ($baseHash . (string)$i);
    $r = leechmng_check_hash($pdo, $endpoint, $passleech, $h, '0');
    if (!($r['ok'] ?? false)) continue;
    $r['hash_used'] = $h;

    if (!($best['ok'] ?? false)) {
      $best = $r;
      continue;
    }

    $bdl = (string)($best['direct_url'] ?? '');
    $rdl = (string)($r['direct_url'] ?? '');
    $bst = (string)($best['status'] ?? '');
    $rst = (string)($r['status'] ?? '');

    $better = false;
    if ($bdl === '' && $rdl !== '') $better = true;
    elseif ($bst === '' && $rst !== '') $better = true;

    if ($better) $best = $r;
  }

  return $best;
}

function leechmng_local_db_path_for_host(string $hostCanon): ?string {
  // Mirror of leechmng config.php mapping (simplified).
  // Prefer SNAPSHOT DBs to avoid SQLITE locks/WAL contention.
  $hostCanon = trim($hostCanon);
  if ($hostCanon === '') return null;

  // Live DB first (contains latest rows); snapshot is fallback when live is missing/locked.
  $bases = [
    '/var/www/leech/getlink.db',
    '/var/www/leech/getlink.db_snap',
  ];

  foreach ($bases as $base) {
    // Default per-host DB
    $path = $base . '/' . $hostCanon . '.db';
    if (is_file($path)) return $path;

    // Fallback group DBs
    $fallbacks = [
      $base . '/other_data.db',
      $base . '/merged_bash.db',
      $base . '/baoloi_data.db',
    ];
    foreach ($fallbacks as $p) if (is_file($p)) return $p;
  }

  return null;
}

function leechmng_local_db_lookup(string $hostCanon, string $hash): array {
  // Returns ['ok'=>bool,'status'=>string|null,'direct_url'=>string|null,'row'=>array|null,'error'=>string|null]
  $dbPath = leechmng_local_db_path_for_host($hostCanon);
  if (!$dbPath) return ['ok'=>false,'error'=>'no local mng db'];
  if ($hash === '') return ['ok'=>false,'error'=>'empty hash'];

  try {
    // Open read-only when possible (SQLite URI); snapshot DBs are simple files (no WAL) so this is safe.
    $dsn = 'sqlite:' . $dbPath;
    if (str_starts_with($dbPath, '/var/www/leech/getlink.db_snap/')) {
      $dsn = 'sqlite:file:' . $dbPath . '?mode=ro';
    }

    $pdo = new PDO($dsn, null, null, [
      PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
      PDO::ATTR_TIMEOUT => 1,
    ]);
    // reduce lock sensitivity
    @$pdo->exec('PRAGMA query_only=ON;');
    @$pdo->exec('PRAGMA busy_timeout=800;');

    $q = $pdo->prepare('SELECT status,directlink,filename,filesize,time,link,hash FROM link WHERE hash=? ORDER BY time DESC LIMIT 1');
    $q->execute([$hash]);
    $row = $q->fetch(PDO::FETCH_ASSOC);
    if (!$row) return ['ok'=>true,'status'=>null,'direct_url'=>null,'row'=>null];

    $direct = null;
    $dl = (string)($row['directlink'] ?? '');
    if ($dl !== '' && stristr($dl, 'http')) $direct = $dl;

    return [
      'ok'=>true,
      'status'=>isset($row['status']) ? (string)$row['status'] : null,
      'direct_url'=>$direct,
      'row'=>$row,
    ];
  } catch (Throwable $e) {
    return ['ok'=>false,'error'=>$e->getMessage()];
  }
}

function leechmng_local_db_lookup_by_link(string $hostCanon, string $link): array {
  // Returns ['ok'=>bool,'status'=>string|null,'direct_url'=>string|null,'row'=>array|null,'error'=>string|null]
  $dbPath = leechmng_local_db_path_for_host($hostCanon);
  if (!$dbPath) return ['ok'=>false,'error'=>'no local mng db'];
  if ($link === '') return ['ok'=>false,'error'=>'empty link'];

  try {
    $dsn = 'sqlite:' . $dbPath;
    if (str_starts_with($dbPath, '/var/www/leech/getlink.db_snap/')) {
      $dsn = 'sqlite:file:' . $dbPath . '?mode=ro';
    }

    $pdo = new PDO($dsn, null, null, [
      PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
      PDO::ATTR_TIMEOUT => 1,
    ]);
    @$pdo->exec('PRAGMA query_only=ON;');
    @$pdo->exec('PRAGMA busy_timeout=800;');

    $q = $pdo->prepare('SELECT status,directlink,filename,filesize,time,link,hash FROM link WHERE link=? ORDER BY time DESC LIMIT 1');
    $q->execute([$link]);
    $row = $q->fetch(PDO::FETCH_ASSOC);
    if (!$row) return ['ok'=>true,'status'=>null,'direct_url'=>null,'row'=>null];

    $direct = null;
    $dl = (string)($row['directlink'] ?? '');
    if ($dl !== '' && stristr($dl, 'http')) $direct = $dl;

    return [
      'ok'=>true,
      'status'=>isset($row['status']) ? (string)$row['status'] : null,
      'direct_url'=>$direct,
      'row'=>$row,
    ];
  } catch (Throwable $e) {
    return ['ok'=>false,'error'=>$e->getMessage()];
  }
}


function leechmng_local_db_lookup_fuzzy(string $hostCanon, string $baseHash, int $maxSuffix = 5): array {
  // leechmng sometimes appends suffixes to hash to store multiple rows for same URL.
  // Try baseHash, then baseHash.'1'.. baseHash.'N', and pick the newest row; prefer ones with directlink.
  $best = ['ok'=>true,'status'=>null,'direct_url'=>null,'row'=>null,'hash_used'=>null];

  for ($i = 0; $i <= $maxSuffix; $i++) {
    $h = ($i === 0) ? $baseHash : ($baseHash . (string)$i);
    $r = leechmng_local_db_lookup($hostCanon, $h);
    if (!($r['ok'] ?? false)) continue;
    if (empty($r['row'])) continue;

    if (empty($best['row'])) {
      $best = $r;
      $best['hash_used'] = $h;
      continue;
    }

    $bt = (int)($best['row']['time'] ?? 0);
    $rt = (int)($r['row']['time'] ?? 0);
    $bdl = (string)($best['direct_url'] ?? '');
    $rdl = (string)($r['direct_url'] ?? '');

    $better = false;
    if ($rt > $bt) $better = true;
    elseif ($rt === $bt && $bdl === '' && $rdl !== '') $better = true;

    if ($better) {
      $best = $r;
      $best['hash_used'] = $h;
    }
  }

  // Fallback: prefix match (hash LIKE baseHash%)
  // This can catch suffixes beyond maxSuffix while keeping the pattern index-friendly.
  if (empty($best['row'])) {
    try {
      $dbPath = leechmng_local_db_path_for_host($hostCanon);
      if ($dbPath) {
        $dsn = 'sqlite:' . $dbPath;
        if (str_starts_with($dbPath, '/var/www/leech/getlink.db_snap/')) {
          $dsn = 'sqlite:file:' . $dbPath . '?mode=ro';
        }
        $pdo2 = new PDO($dsn, null, null, [
          PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
          PDO::ATTR_TIMEOUT => 1,
        ]);
        @$pdo2->exec('PRAGMA query_only=ON;');
        @$pdo2->exec('PRAGMA busy_timeout=800;');

        $q = $pdo2->prepare("SELECT status,directlink,filename,filesize,time,link,hash FROM link WHERE hash LIKE ? ORDER BY time DESC LIMIT 1");
        $q->execute([$baseHash . '%']);
        $row = $q->fetch(PDO::FETCH_ASSOC);
        if (is_array($row) && !empty($row)) {
          $direct = null;
          $dl = (string)($row['directlink'] ?? '');
          if ($dl !== '' && stristr($dl, 'http')) $direct = $dl;
          return [
            'ok'=>true,
            'status'=>isset($row['status']) ? (string)$row['status'] : null,
            'direct_url'=>$direct,
            'row'=>$row,
            'hash_used'=>(string)($row['hash'] ?? ''),
          ];
        }
      }
    } catch (Throwable $e) {
      // ignore
    }
  }

  return $best;
}

function sync_from_files(PDO $pdo, string $jobId, array $opt = []): array {
  $debug = !empty($opt['debug']) && debug_enabled();

  $stmt = $pdo->prepare('SELECT * FROM jobs WHERE job_id=?');
  $stmt->execute([$jobId]);
  $job = $stmt->fetch(PDO::FETCH_ASSOC);
  if (!$job) return ['ok'=>false,'error'=>'Job not found'];

  // Never downgrade: if we already have a direct_url_latest, treat job as ready and return immediately.
  // This prevents later .dat parsing (which may only have status=processing) from overwriting a ready job.
  if (!empty($job['direct_url_latest'])) {
    $direct = (string)$job['direct_url_latest'];
    $src = (string)($job['direct_url_source'] ?? 'sqlite');

    $now = time();
    $age = $now - (int)($job['created_at'] ?? $now);
    $stale = ($age > 3*86400);
    $refreshUrl = API_BASE_URL . 'api.php?action=refresh_public_link&job_id=' . urlencode($jobId);


    // self-heal: ensure sqlite status is ready when direct_url_latest exists
    if (($job['status'] ?? '') !== 'ready') {
      $pdo->prepare('UPDATE jobs SET status=?, status_code=?, updated_at=? WHERE job_id=?')
        ->execute(['ready', ($job['status_code'] ?? '1') ?: '1', time(), $jobId]);
      $job['status'] = 'ready';
      if (empty($job['status_code'])) $job['status_code'] = '1';
    }

    $out = [
      'ok'=>true,
      'job_id'=>$jobId,
      'status'=>'ready',
      'status_code'=>(string)($job['status_code'] ?? '1'),
      'host'=>$job['host'] ?? null,
      'filename'=>$job['filename'] ?? null,
      'size_text'=>$job['size_text'] ?? null,
      'direct_url'=>$direct,
      // direct_url_internal is sensitive; only expose under debug+passdebug
      'direct_url_internal'=>$debug ? ($job['direct_url_internal'] ?? null) : null,
      // direct_url_pve: only expose under debug+passdebug
      // Backward-compat: if column is empty but direct_url_internal starts with pve, derive it.
      'direct_url_pve'=>$debug ? (($job['direct_url_pve'] ?? null) ?: (preg_match('~^https?://pve~i', (string)($job['direct_url_internal'] ?? '')) ? (string)$job['direct_url_internal'] : null)) : null,
      'direct_url_public'=>$job['direct_url_public'] ?? $direct,
      'direct_url_pve'=>$debug ? (($job['direct_url_pve'] ?? null) ?: (preg_match('~^https?://pve~i', (string)($job['direct_url_internal'] ?? '')) ? (string)$job['direct_url_internal'] : null)) : null,
      'source'=>$src,
      'direct_url_source'=>$src,
      'stale'=>$stale,
      'need_refresh'=>$stale,
      'refresh_url'=>$stale ? $refreshUrl : null,
      'hint'=>$stale ? 'Link older than 3 days. Call refresh_public_link(job_id) to regenerate.' : null,
    ];
    if (!empty($opt['debug'])) {
      $out['debug'] = $out['debug'] ?? [];
      $out['debug']['nodowngrade'] = true;
    }
    return $out;
  }

  

  // TTL guard: stop auto-heal/sync after 3 days to avoid unlimited polling load.
  // Boss rule: after TTL, require resubmit (do not query legacy sqlite/getlink.db/.dat/leechmng).
  $nowTtl = time();
  $ageTtl = $nowTtl - (int)($job['created_at'] ?? $nowTtl);
  if (($job['direct_url_latest'] ?? '') === '' && $ageTtl > 3*86400) {
    $out = [
      'ok'=>true,
      'status'=>(string)($job['status'] ?? 'processing'),
      'status_code'=>(string)(($job['status_code'] ?? '0') ?: '0'),
      'host'=>$job['host'] ?? null,
      'filename'=>$job['filename'] ?? null,
      'size_text'=>$job['size_text'] ?? null,
      'direct_url'=>null,
      'direct_url_public'=>null,
      'source'=>'ttl',
      'direct_url_source'=>'ttl',
      'need_resubmit'=>true,
      'hint'=>'Job expired (>3 days). Please resubmit the original link.',
    ];
    if (!empty($opt['debug']) && debug_enabled()) {
      $out['debug'] = [
        'ttl_days'=>3,
        'age_sec'=>$ageTtl,
        'ttl_skipped'=>true,
      ];
    }
    return $out;
  }
$origin = normalize_url($job['origin_url']);
  $originUniq = normalize_url_for_uniqueness($origin);
  $originHost = canonical_host_from_url($origin);
  $originRgId = ($originHost === 'rapidgator.net') ? extract_rapidgator_file_id($origin) : '';

  // --- Fast-path (DB-first): read local mirrored leechmng sqlite first (cheap), then fallback to HTTP poll (rate-limited)
  // Only when job not already ready.
  // Always check local getlink.db when direct_url_latest is empty.
  // Rationale: engine may push a directlink into getlink.db very quickly; don't skip this just because other paths mark status.
  if (($job['direct_url_latest'] ?? '') === '') {
    $now = time();
    $age = $now - (int)($job['created_at'] ?? $now);
    $host = canonical_host_from_url($origin);

    // Hash used by leechmng DB (see leechmng/index.php: $hash = $_POST['md5'])
    // For API submits we persist a per-submit hash so leechmng can append duplicate URL submits.
    // Legacy hash (engine path) is md5(url + self). This is the correct key for getlink.db and for HTTP poll when engine pushes.
    $legacyHash = (string)($job['legacy_hash'] ?? '');
    if ($legacyHash === '') $legacyHash = md5($origin . API_SELF_URL);

    // mng_hash is per-submit hash used only when we push via savelink fallback.
    $mngHash = (string)($job['mng_hash'] ?? '');

    $mngDbPath = leechmng_local_db_path_for_host($host);

    // 1) DB-first lookup (local rsynced DB)
    // Add a small per-job cooldown to avoid hammering sqlite when clients poll too frequently.
    $dbCooldown = 3; // seconds
    $lastDb = (int)($job['mng_last_db_check_at'] ?? 0);

    if ($lastDb <= 0 || ($now - $lastDb) >= $dbCooldown) {
            // TTL: only auto-heal from getlink.db/leechmng within 3 days to avoid unlimited polling load.
            if ($age > 3*86400) {
              if ($debug) {
                $out['debug'] = $out['debug'] ?? [];
                $out['debug']['autoheal_skipped'] = true;
                $out['debug']['autoheal_ttl_days'] = 3;
                $out['debug']['age_sec'] = $age;
              }
            } else {
              // Prefer exact link match first, then legacy hash fuzzy.
              $dbChk = leechmng_local_db_lookup_by_link($host, (string)($job['origin_url'] ?? ''));
              if (empty($dbChk['direct_url'])) $dbChk = leechmng_local_db_lookup_by_link($host, $origin);
              if (empty($dbChk['direct_url'])) $dbChk = leechmng_local_db_lookup_fuzzy($host, $legacyHash, 5);
            }
      if (isset($dbChk) && ($dbChk['ok'] ?? false)) {
        $pdo->prepare('UPDATE jobs SET mng_last_db_check_at=?, mng_last_status=?, mng_last_direct_url=?, mng_poll_fail_count=? WHERE job_id=?')
          ->execute([$now, (string)($dbChk['status'] ?? ''), (string)($dbChk['direct_url'] ?? ''), 0, $jobId]);

        if (!empty($dbChk['direct_url'])) {
          $directUrlRaw = (string)$dbChk['direct_url'];
          $pve = preg_match('~^https?://pve~i', $directUrlRaw) ? $directUrlRaw : null;
          $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_pve=?, direct_url_updated_at=? WHERE job_id=?')
            ->execute([$directUrlRaw, $pve, $now, $jobId]);

          $directPublic = null;
          if (is_public_directlink($directUrlRaw)) {
            $directPublic = $directUrlRaw;
          } elseif (is_internal_directlink($directUrlRaw)) {
            // Boss rule: verify PVE alive before conversion (timeout => dead)
            if (preg_match('~^https?://pve~i', $directUrlRaw)) {
              $pv = verify_url_alive($directUrlRaw, 3);
              if (empty($pv['alive'])) {
                $directUrlRaw = '';
              }
            }
            if ($directUrlRaw !== '') {
              // Option B: hard-timeout conversion (fail-closed, never leak internal)
              if (ratelimit_allow($pdo, 'convert_job:' . $jobId . ':30s', 30, 1) && ratelimit_allow($pdo, 'convert_global:1m', 60, 20)) {
                $conv = convert_internal_to_public_subprocess($directUrlRaw, 5);
                if (($conv['ok'] ?? false) && !empty($conv['public'])) $directPublic = (string)$conv['public'];
              }
            }
          }
          if ($directPublic) {
            $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, host=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
              ->execute([$now,'ready','1',$host,$directPublic,$now,$directPublic,$now,'mng_db',$jobId]);
            // Legacy UI: update data/files/*.dat so index.php shows public link immediately (best-effort).
            // legacy_dat_upsert_reject also mirrors into legacy.sqlite (fallback) after this patch.
            try {
              $id2 = load_account_identity();
              @legacy_dat_upsert_reject([
                'url' => (string)($job['origin_url'] ?? $origin),
                'host' => $host,
                'status' => 1,
                'filename' => (string)($job['filename'] ?? ''),
                'size_text' => (string)($job['size_text'] ?? ''),
                'size_bytes' => (int)($job['size_bytes'] ?? 0),
                'bw_charge_bytes' => (int)($job['size_bytes'] ?? 0),
                'direct_url' => $directPublic,
                'owner' => (string)($id2['secureid'] ?? ''),
              ]);
            } catch (Throwable $e) {
              // ignore
            }

            // Record traffic_seen (Boss: count unique URL/day even when link appears later via cache-heal)
            try {
              $t = traffic_db();
              $norm = normalize_url_for_uniqueness((string)($job['origin_url'] ?? ''));
              if ($norm === '') $norm = normalize_url_for_uniqueness($origin);
              $bytes = (int)($job['size_bytes'] ?? 0);
              if ($bytes <= 0) $bytes = (int)($job['filesize_bytes'] ?? 0);
              if ($bytes <= 0) $bytes = (int)($job['size'] ?? 0);
              if ($bytes < 0) $bytes = 0;
              if ($bytes > 0 && $norm !== '') {
                traffic_record_seen($t, [
                  'day_key' => today_key(),
                  'host' => $host,
                  'norm_url' => $norm,
                  'size_bytes' => $bytes,
                  'source' => 'query_status_heal',
                  'ref_id' => $jobId,
                ]);
              }
            } catch (Throwable $e) {
              // ignore
            }

            $out = [
              'ok'=>true,
              'status'=>'ready',
              'status_code'=>'1',
              'host'=>$host,
              'filename'=>$job['filename'] ?? null,
              'size_text'=>$job['size_text'] ?? null,
              'direct_url'=>$directPublic,
              'direct_url_internal'=>$debug ? $directUrlRaw : null,
              'direct_url_public'=>$directPublic,
              'source'=>'mng_db',
              'direct_url_source'=>'mng_db',
            ];
            if ($debug) {
              $out['debug'] = [
                'mng_hash' => $legacyHash,
                'mng_hash_used' => (string)($dbChk['hash_used'] ?? ''),
                'mng_db_path' => $mngDbPath,
                'mng_db_status' => (string)($dbChk['status'] ?? ''),
                'mng_db_direct_url' => (string)($dbChk['direct_url'] ?? ''),
                'age_sec' => $age,
                'db_cooldown_sec' => $dbCooldown,
              ];
            }
            
            // Ledger: DONE counts final_bytes (reserve=0), reconcile back to submit day.
            try {
              $tL = traffic_db();
              $submitDay = day_key_for_ts((int)($job['created_at'] ?? time()));
              $normL = normalize_url_for_uniqueness((string)($job['origin_url'] ?? ''));
              if ($normL === '') $normL = normalize_url_for_uniqueness($origin);
              $sz = (int)($job['size_bytes'] ?? 0);
              if ($sz <= 0) $sz = (int)($job['filesize_bytes'] ?? 0);
              if ($sz <= 0) $sz = (int)($job['size'] ?? 0);
              $usedBeforeDebt = traffic_get_used_bytes_today($tL, $submitDay, $host);
              traffic_ledger_mark_done($tL, $submitDay, $host, $normL, $sz, $jobId);
              traffic_daily_rebuild_from_ledger($tL, $submitDay, $host);
              traffic_daily_rebuild_account_from_ledger($tL, $submitDay);
              $usedAfterDebt = traffic_get_used_bytes_today($tL, $submitDay, $host);
              traffic_record_debt_inc_after_done($tL, $submitDay, $host, (int)$usedBeforeDebt, (int)$usedAfterDebt);

            } catch (Throwable $e) {
              // ignore
            }

return $out;
          }
        }
      }
    } else {
      if ($debug) {
        // expose that we intentionally skipped DB check
        $out['debug'] = $out['debug'] ?? [];
        $out['debug']['db_skipped'] = true;
        $out['debug']['db_cooldown_sec'] = $dbCooldown;
        $out['debug']['db_last_check_age_sec'] = $now - $lastDb;
      }
    }

      /* duplicate old DB-fastpath block removed (now handled above with cooldown) */

        /* duplicate block removed */

    // 2) HTTP fallback poll (bucketed cooldown)
    // Tight within 5 minutes, then use big milestones to avoid overload.
    $cooldown = 3600; // default: 1h
    if ($age <= 60) $cooldown = 10;
    elseif ($age <= 180) $cooldown = 20;
    elseif ($age <= 300) $cooldown = 30;
    elseif ($age <= 900) $cooldown = 120;
    elseif ($age <= 3600) $cooldown = 600;        // 15-60m: 10m
    elseif ($age <= 3*3600) $cooldown = 3600;     // 1-3h: 1h
    elseif ($age <= 12*3600) $cooldown = 3*3600;  // 3-12h: 3h
    elseif ($age <= 24*3600) $cooldown = 12*3600; // 12-24h: 12h
    elseif ($age <= 48*3600) $cooldown = 24*3600; // 24-48h: 24h
    else $cooldown = 48*3600;                     // >48h: 48h

    $lastPoll = (int)($job['mng_last_poll_at'] ?? 0);
    $failCnt = (int)($job['mng_poll_fail_count'] ?? 0);

    if ($lastPoll <= 0 || ($now - $lastPoll) >= $cooldown) {
      // global rate limit: 60/min
      if (ratelimit_allow($pdo, 'leechmng_global', 60, 60)) {
        $rt = load_api_routing();
        $endpoint = leechmng_endpoint_for_host($host);
        $passleech = (string)($rt['passleech'] ?? '');

        if ($endpoint && $passleech) {
          $chk = leechmng_http_lookup_fuzzy($pdo, $endpoint, $passleech, $legacyHash, 5);

          if (($chk['ok'] ?? false) && !empty($chk['direct_url'])) {
            $directUrlRaw = (string)$chk['direct_url'];
            $pve = preg_match('~^https?://pve~i', $directUrlRaw) ? $directUrlRaw : null;
            $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_pve=?, direct_url_updated_at=? WHERE job_id=?')
              ->execute([$directUrlRaw, $pve, $now, $jobId]);

            $directPublic = null;
            if (is_public_directlink($directUrlRaw)) $directPublic = $directUrlRaw;
            elseif (is_internal_directlink($directUrlRaw)) {
              // Option B: hard-timeout conversion (fail-closed, never leak internal)
              if (ratelimit_allow($pdo, 'convert_job:' . $jobId . ':30s', 30, 1) && ratelimit_allow($pdo, 'convert_global:1m', 60, 20)) {
                $conv = convert_internal_to_public_subprocess($directUrlRaw, 5);
                if (($conv['ok'] ?? false) && !empty($conv['public'])) $directPublic = (string)$conv['public'];
              }
            }

            if ($directPublic) {
              $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, host=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=?, mng_last_poll_at=?, mng_last_status=?, mng_last_direct_url=?, mng_poll_fail_count=? WHERE job_id=?')
                ->execute([$now,'ready','1',$host,$directPublic,$now,$directPublic,$now,'mng_http',$now,(string)($chk['status'] ?? ''), (string)$chk['direct_url'],0,$jobId]);
            // Legacy UI: update data/files/*.dat so index.php shows public link immediately (best-effort).
            // legacy_dat_upsert_reject also mirrors into legacy.sqlite (fallback) after this patch.
            try {
              $id2 = load_account_identity();
              @legacy_dat_upsert_reject([
                'url' => (string)($job['origin_url'] ?? $origin),
                'host' => $host,
                'status' => 1,
                'filename' => (string)($job['filename'] ?? ''),
                'size_text' => (string)($job['size_text'] ?? ''),
                'size_bytes' => (int)($job['size_bytes'] ?? 0),
                'bw_charge_bytes' => (int)($job['size_bytes'] ?? 0),
                'direct_url' => $directPublic,
                'owner' => (string)($id2['secureid'] ?? ''),
              ]);
            } catch (Throwable $e) {
              // ignore
            }

              $out = [
                'ok'=>true,
                'status'=>'ready',
                'status_code'=>'1',
                'host'=>$host,
                'filename'=>$job['filename'] ?? null,
                'size_text'=>$job['size_text'] ?? null,
                'direct_url'=>$directPublic,
                'direct_url_internal'=>$debug ? $directUrlRaw : null,
                'direct_url_public'=>$directPublic,
                'source'=>'mng_http',
              'direct_url_source'=>'mng_http',
              ];
              if ($debug) {
                $out['debug'] = [
                  'mng_hash' => $legacyHash,
                  'mng_http_hash_used' => (string)($chk['hash_used'] ?? ''),
                  'mng_db_path' => $mngDbPath,
                  'mng_http_status' => (string)($chk['status'] ?? ''),
                  'mng_http_direct_url' => (string)($chk['direct_url'] ?? ''),
                  'age_sec' => $age,
                  'cooldown_sec' => $cooldown,
                ];
              }
              return $out;
            }
          }

          // update poll metadata
          $pdo->prepare('UPDATE jobs SET mng_last_poll_at=?, mng_last_status=?, mng_last_direct_url=?, mng_poll_fail_count=? WHERE job_id=?')
            ->execute([$now, (string)($chk['status'] ?? ''), (string)($chk['direct_url'] ?? ''), ($chk['ok'] ?? false) ? 0 : ($failCnt+1), $jobId]);
        }
      }
    }
  }

  // --- LEGACY SQLITE fast-path (avoid scanning .dat when possible) ---
  $legacyPath = API_DATA_DIR . '/legacy.sqlite';
  if (is_file($legacyPath)) {
    try {
      $lpdo = new PDO('sqlite:' . $legacyPath);
      $lpdo->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
      @$lpdo->exec('PRAGMA query_only=ON;');
      @$lpdo->exec('PRAGMA busy_timeout=800;');

      // Try match by normalized unique URL (best), also allow exact origin_url match.
      $q = $lpdo->prepare("SELECT job_id, origin_url, norm_url, host, status, status_code, direct_url, mtime, updated_at FROM legacy_jobs WHERE (norm_url=? AND norm_url!='') OR origin_url=? ORDER BY mtime DESC LIMIT 5");
      $q->execute([$originUniq, $origin]);
      $rows = $q->fetchAll(PDO::FETCH_ASSOC);

      $best = null;
      foreach ($rows as $r) {
        $dl = (string)($r['direct_url'] ?? '');
        if ($dl !== '' && stristr($dl, 'http')) { $best = $r; break; }
        if ($best === null) $best = $r;
      }

      if (is_array($best) && !empty($best)) {
        $legacyStatus = (string)($best['status'] ?? '');
        $legacyCode = (string)($best['status_code'] ?? '');
        $legacyHost = (string)($best['host'] ?? $originHost);
        if ($legacyHost === '') $legacyHost = $originHost;
        $legacyDl = (string)($best['direct_url'] ?? '');

        // If we have a direct link from legacy mirror, use it.
        if ($legacyDl !== '' && stristr($legacyDl, 'http')) {
          $now = time();
          $pve = preg_match('~^https?://pve~i', $legacyDl) ? $legacyDl : null;
          $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_pve=?, direct_url_updated_at=? WHERE job_id=?')
            ->execute([$legacyDl, $pve, $now, $jobId]);

          $directPublic = null;
          if (is_public_directlink($legacyDl)) $directPublic = $legacyDl;
          elseif (is_internal_directlink($legacyDl)) {
            $conv = convert_internal_to_public($legacyDl);
            if (($conv['ok'] ?? false) && !empty($conv['public'])) $directPublic = (string)$conv['public'];
          }

          if ($directPublic) {
            $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, host=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
              ->execute([$now,'ready','1',$legacyHost,$directPublic,$now,$directPublic,$now,'legacy_sqlite',$jobId]);

            $out = [
              'ok'=>true,
              'job_id'=>$jobId,
              'status'=>'ready',
              'status_code'=>'1',
              'host'=>$legacyHost,
              'filename'=>$job['filename'] ?? null,
              'size_text'=>$job['size_text'] ?? null,
              'direct_url'=>$directPublic,
              'direct_url_internal'=>$debug ? $legacyDl : null,
              'direct_url_public'=>$directPublic,
              'source'=>'legacy_sqlite',
              'direct_url_source'=>'legacy_sqlite',
            ];
            if ($debug) {
              $out['debug'] = $out['debug'] ?? [];
              $out['debug']['legacy_job_id'] = (string)($best['job_id'] ?? '');
              $out['debug']['legacy_mtime'] = (int)($best['mtime'] ?? 0);
            }
            return $out;
          }

          // If we cannot convert, keep internal only and continue other paths.
        } else {
          // No direct link yet; we can still sync status without scanning .dat.
          // Only update sqlite status if legacy indicates a later state.
          if ($legacyStatus !== '' && ($job['status'] ?? '') === 'processing') {
            $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, host=? WHERE job_id=?')
              ->execute([time(), $legacyStatus, $legacyCode ?: ($job['status_code'] ?? '0'), $legacyHost, $jobId]);
          }
        }
      }
    } catch (Throwable $e) {
      // ignore and fallback to .dat scan
    }
  }

  $files = glob(API_DATA_DIR . '/files/*.dat');
  rsort($files);
  $files = array_slice($files, 0, 5); // only newest few

  $found = null;
  $foundMeta = null;

  foreach ($files as $path) {
    $raw = @file_get_contents($path);
    if ($raw === false) continue;
    $obj = json_decode($raw, true);
    if (!is_array($obj)) continue;

    $leech = $obj['leech'] ?? [];
    if (is_array($leech)) {
      foreach ($leech as $k => $v) {
        if (!is_array($v)) continue;
        $u = normalize_url((string)($v['url'] ?? ''));
        // Legacy sometimes stores full url with filename + .html wrapper, while API stores a shorter origin_url.
        // 1) Try strict match.
        // 2) Try uniqueness-normalized match.
        // 3) Special-case Rapidgator: match by /file/<id> (API may store only /file/<id>, legacy stores /file/<id>/<name>.html).
        $uUniq = normalize_url_for_uniqueness($u);
        $uHost = canonical_host_from_url($u);
        $uRgId = ($uHost === 'rapidgator.net') ? extract_rapidgator_file_id($u) : '';

        $isMatch = false;
        if ($u === $origin) $isMatch = true;
        elseif ($originUniq !== '' && $uUniq === $originUniq) $isMatch = true;
        elseif ($originRgId !== '' && $uRgId !== '' && $originRgId === $uRgId) $isMatch = true;

        if ($isMatch) {
          // Pick best entry for same origin:
          // - prefer newer mtime
          // - if same mtime, prefer one that has directlink
          // - if still tied, prefer higher status
          $vm = (int)($v['mtime'] ?? 0);
          $fm = $found ? (int)($found['mtime'] ?? 0) : 0;
          $vdl = '';
          if (isset($v['directlink']['url']) && is_string($v['directlink']['url'])) $vdl = $v['directlink']['url'];
          $fdl = '';
          if ($found && isset($found['directlink']['url']) && is_string($found['directlink']['url'])) $fdl = $found['directlink']['url'];
          $vs = (int)($v['status'] ?? 0);
          $fs = $found ? (int)($found['status'] ?? 0) : 0;

          $better = false;
          if ($found === null) $better = true;
          elseif ($vm > $fm) $better = true;
          elseif ($vm === $fm && $fdl === '' && $vdl !== '') $better = true;
          elseif ($vm === $fm && $vdl !== '' && $fdl !== '' && $vs > $fs) $better = true;
          elseif ($vm === $fm && $vdl !== '' && $fdl === '' ) $better = true;
          elseif ($vm === $fm && $vdl === '' && $fdl === '' && $vs > $fs) $better = true;

          if ($better) {
            $found = $v;
            $foundMeta = ['source_file'=>basename($path), 'source_key'=>$k];
          }
        }
      }
    }
  }

  if (!$found) {
    // still processing but not yet in file list
    $pdo->prepare('UPDATE jobs SET updated_at=?, status=? WHERE job_id=?')->execute([time(), 'processing', $jobId]);
    return ['ok'=>true,'status'=>'processing'];
  }

  $statusCode = (string)($found['status'] ?? '');
  $status = 'processing';
  // Legacy status meanings vary by branch; in our observed .dat:
  // - status=1 often already includes a usable public directlink (treat as ready)
  // - status=0/2 processing
  // - status=3 ready (some builds)
  // - status=4 failed
  if ($statusCode === '1') $status = 'ready';
  elseif ($statusCode === '3') $status = 'ready';
  elseif ($statusCode === '4') $status = 'failed';
  else $status = 'processing';

  // Persist legacy .dat fields into sqlite for fast list/quota. Store unknown extras in meta_json.
  $meta = $found;
  // remove fields we already store explicitly
  foreach (['url','hash','host','type','status','filename','size','msize','bwtam','directlink','path','mtime','speed','ip','network','owner','showfile','releech'] as $kk) {
    if (isset($meta[$kk])) unset($meta[$kk]);
  }
  $metaJson = !empty($meta) ? json_encode($meta, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) : null;

  $normUrl = normalize_url_for_uniqueness(normalize_url((string)($found['url'] ?? '')));

  $jobType = normalize_job_type($found['type'] ?? 'leech');
  $clientIp = (string)($found['ip'] ?? '');
  if ($clientIp === '' || $clientIp === '127.0.0.1') $clientIp = get_client_ip();
  $clientNetwork = (string)($found['network'] ?? '');
  if ($clientNetwork === '' && isset($_COOKIE['network'])) $clientNetwork = (string)$_COOKIE['network'];

  $owner = (string)($found['owner'] ?? '');
  if ($owner === '' || $owner === API_FALLBACK_PASS) {
    $id = load_account_identity();
    if (!empty($id['secureid'])) $owner = (string)$id['secureid'];
  }

  $showfile = normalize_bool_int($found['showfile'] ?? null, 1);
  $releech = normalize_bool_int($found['releech'] ?? null, 0);

  $upd = $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, host=?, filename=?, size_text=?, size_bytes=?, bw_charge_bytes=?, mtime=?, path=?, hash=?, norm_url=?, job_type=?, speed=?, client_ip=?, client_network=?, owner=?, showfile=?, releech=?, meta_json=?, source_file=?, source_key=? WHERE job_id=?');
  $upd->execute([
    time(),
    $status,
    $statusCode,
    $found['host'] ?? null,
    $found['filename'] ?? null,
    $found['size'] ?? null,
    $found['msize'] ?? null,
    $found['bwtam'] ?? null,
    $found['mtime'] ?? null,
    $found['path'] ?? null,
    $found['hash'] ?? null,
    $normUrl !== '' ? $normUrl : null,
    $jobType,
    (string)($found['speed'] ?? ''),
    $clientIp,
    $clientNetwork,
    $owner,
    $showfile,
    $releech,
    $metaJson,
    $foundMeta['source_file'] ?? null,
    $foundMeta['source_key'] ?? null,
    $jobId
  ]);

  // direct link may be present even when status is still 1 (legacy behavior)
  $directUrlRaw = null;
  if (isset($found['directlink']['url']) && is_string($found['directlink']['url']) && $found['directlink']['url'] !== '') {
    $directUrlRaw = $found['directlink']['url'];
  }

  $directPublic = null;
  if ($directUrlRaw) {
    // Store internal/raw always
    $pve = preg_match('~^https?://pve~i', $directUrlRaw) ? $directUrlRaw : null;
    $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_pve=?, direct_url_updated_at=? WHERE job_id=?')
      ->execute([$directUrlRaw, $pve, time(), $jobId]);

    if (is_public_directlink($directUrlRaw)) {
      $directPublic = $directUrlRaw;
    } elseif (is_internal_directlink($directUrlRaw)) {
      $conv = convert_internal_to_public($directUrlRaw);
      if (($conv['ok'] ?? false) && !empty($conv['public'])) {
        $directPublic = (string)$conv['public'];
        $pdo->prepare('UPDATE jobs SET direct_url_public=?, direct_url_public_updated_at=? WHERE job_id=?')
          ->execute([$directPublic, time(), $jobId]);
      }
    }

    if ($directPublic) {
      // When we have a public directlink, treat job as ready regardless of legacy numeric status.
      // Also persist direct_url_public to keep sqlite consistent (important for refresh/export list).
      $now = time();
      $pdo->prepare('UPDATE jobs SET status=?, status_code=?, direct_url_latest=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
        ->execute(['ready','1',$directPublic, $directPublic, $now, $now, 'dat', $jobId]);
    }
  }

  // Guard: never report ready without a directlink.
  if (($status ?? '') === 'ready' && empty($directPublic)) {
    $status = 'processing';
    $statusCode = '0';
  }

  return [
    'ok'=>true,
    'status'=>$status,
    'status_code'=>$statusCode,
    'host'=>$found['host'] ?? null,
    'filename'=>$found['filename'] ?? null,
    'size_text'=>$found['size'] ?? null,
    'direct_url'=>$directPublic,
    'direct_url_internal'=>$debug ? $directUrlRaw : null,
    'direct_url_public'=>$directPublic,
    'source'=>'dat',
    'direct_url_source'=>'dat',
  ];
}

function rewrite_region_us_to_sg(string $url): string {
  // Boss rule: simply replace https://us* -> https://sg*
  if (stripos($url, 'https://us') === 0) return 'https://sg' . substr($url, 10);
  return $url;
}

function verify_url_alive(string $url, int $timeoutSec = 3): array {
  // Lightweight liveness check.
  // Use GET with Range to avoid downloading full files; HEAD is unreliable on some CDNs.
  // Returns: ['ok'=>bool,'alive'=>bool,'http_code'=>int,'error'=>string|null]
  $u = trim($url);
  if ($u === '') return ['ok'=>false,'alive'=>false,'http_code'=>0,'error'=>'empty'];

  $ch = curl_init();
  curl_setopt($ch, CURLOPT_URL, $u);
  curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  curl_setopt($ch, CURLOPT_FOLLOWLOCATION, true);
  curl_setopt($ch, CURLOPT_MAXREDIRS, 3);
  curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, min(3, $timeoutSec));
  curl_setopt($ch, CURLOPT_TIMEOUT, $timeoutSec);
  curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
  curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
  curl_setopt($ch, CURLOPT_HTTPHEADER, [
    'Range: bytes=0-0',
    'User-Agent: GetlinkAPI/0.1 (verify)',
    'Accept: */*',
  ]);
  $body = curl_exec($ch);
  $err = curl_error($ch);
  $code = (int)curl_getinfo($ch, CURLINFO_HTTP_CODE);
  curl_close($ch);

  if ($body === false || $code <= 0) {
    // Boss rule: timeout/connect fail -> treat as dead
    return ['ok'=>false,'alive'=>false,'http_code'=>$code,'error'=>$err ?: 'curl failed'];
  }

  $alive = in_array($code, [200,206,302], true);
  if (!$alive) {
    // dead includes 403/404/410/5xx
    return ['ok'=>true,'alive'=>false,'http_code'=>$code,'error'=>null];
  }

  return ['ok'=>true,'alive'=>true,'http_code'=>$code,'error'=>null];
}

// ===== router =====
$action = $_GET['action'] ?? '';
if ($action === '') json_error(400, 1, 'Missing parameter (action)', 'BAD_REQUEST');
require_pass();

$pdo = db();
$rawBody = file_get_contents('php://input') ?: '';
$body = json_decode($rawBody, true);
if (!is_array($body)) $body = [];

if ($action === 'list_links_today') {
  $host = trim((string)($_GET['host'] ?? ''));
  if ($host !== '') $host = canonical_host_from_url('https://' . $host . '/');
  $source = (string)($_GET['source'] ?? 'all');
  $limit = (int)($_GET['limit'] ?? 200);
  $offset = (int)($_GET['offset'] ?? 0);
  $debug = debug_enabled();

  $out = list_links_today($pdo, [
    'host' => $host,
    'source' => $source,
    'limit' => $limit,
    'offset' => $offset,
    'debug' => $debug,
  ]);
  json_out($out);
}

if ($action === 'traffic') {
  $debug = debug_enabled();
  $days = (int)($_GET['days'] ?? 7);
  if ($days < 1) $days = 1;
  if ($days > 31) $days = 31;

  // Sync incremental from legacy.sqlite into traffic.sqlite (throttle 2s)
  $sync = null;
  try {
    $t = traffic_db();
    $sync = traffic_sync_from_legacy_sqlite($t, $days, 500, 200);
  } catch (Throwable $e) {
    $sync = ['ok'=>false,'error'=>$e->getMessage()];
  }

  $cfg = load_snapshot_config();
  $dayKey = today_key();
  $todayStart = today_start_ts();

  $t = traffic_db();
  $daily = traffic_get_daily($t, $dayKey);

  $out = [];

  foreach ($daily as $host => $v) {
    $usedBytes = (int)($v['reserved_bytes'] ?? 0) + (int)($v['final_bytes'] ?? 0);
    $usedLinks = (int)($v['uniq_links'] ?? 0);

    $hostCfg = $cfg['hosts'][$host] ?? ($cfg['hosts']['other'] ?? []);
    $dailyMb = (int)($hostCfg['daily_bw_mb'] ?? 0);
    $overMb = (int)($hostCfg['overlimit_bw_mb'] ?? 0);
    $maxLink = (int)($hostCfg['max_link'] ?? 0);

    $dailyBytes = $dailyMb > 0 ? $dailyMb * 1024 * 1024 : 0;
    $overBytes = $overMb > 0 ? $overMb * 1024 * 1024 : 0;

    // Boss rule:
    // - If already used >= dailyBytes, block any further submits for this host today.
    // - Else allow submit only if (used + filesize) <= overBytes.
    $allowMoreBytes = 0;
    if ($dailyBytes > 0 && $overBytes > 0) {
      if ($usedBytes < $dailyBytes) {
        $allowMoreBytes = $overBytes - $usedBytes;
        if ($allowMoreBytes < 0) $allowMoreBytes = 0;
      }
    } elseif ($dailyBytes > 0) {
      // No overlimit configured: strict daily.
      $allowMoreBytes = ($usedBytes < $dailyBytes) ? ($dailyBytes - $usedBytes) : 0;
      if ($allowMoreBytes < 0) $allowMoreBytes = 0;
    }

    $leftLinks = $maxLink > 0 ? ($maxLink - $usedLinks) : 0;
    if ($leftLinks < 0) $leftLinks = 0;

    $type = 'bytes';
    $limit = $dailyBytes;
    $extra = $overBytes; // NOTE: this is NOT extra; it's the overlimit threshold
    $left = $allowMoreBytes;

    if ($dailyBytes <= 0 && $maxLink > 0) {
      $type = 'links';
      $limit = $maxLink;
      $extra = 0;
      $left = $leftLinks;
    }

    $out[$host] = [
      'left' => (int)$left,
      'bytes' => (int)$usedBytes,
      'links' => (int)$usedLinks,
      'limit' => (int)$limit,
      'type' => $type,
      'extra' => (int)$extra,
      'reset' => 'daily',
    ];
  }

  $accUsedBytes = 0;
  $accUsedLinks = 0;
  foreach ($daily as $h => $v) {
    if ($h === '__account__') continue;
    $accUsedBytes += (int)($v['reserved_bytes'] ?? 0) + (int)($v['final_bytes'] ?? 0);
    $accUsedLinks += (int)($v['uniq_links'] ?? 0);
  }

  $accMaxLink = (int)($cfg['acc_max_link'] ?? 0);
  $accWallet = traffic_account_wallet_state($t, $cfg, $dayKey);
  $accLimitBytes = (int)($accWallet['cap_bytes'] ?? 0);
  $accExtraBytes = (int)($accWallet['bonus_bytes'] ?? 0);
  $accLeftBytes = (int)($accWallet['left_today'] ?? 0); // may be negative by design

  $accLeftLinks = $accMaxLink > 0 ? ($accMaxLink - $accUsedLinks) : 0;
  if ($accLeftLinks < 0) $accLeftLinks = 0;

  $accType = 'bytes';
  $accLimit = $accLimitBytes;
  $accExtra = $accExtraBytes;
  $accLeft = $accLeftBytes;
  if ($accLimitBytes <= 0 && $accMaxLink > 0) {
    $accType = 'links';
    $accLimit = $accMaxLink;
    $accExtra = 0;
    $accLeft = $accLeftLinks;
  }

  $out['__account__'] = [
    'left' => (int)$accLeft,
    'bytes' => (int)$accUsedBytes,
    'links' => (int)$accUsedLinks,
    'limit' => (int)$accLimit,
    'type' => $accType,
    'extra' => (int)$accExtra,
    'reset' => 'daily',
  ];

  phase1_compat_log('traffic_account_snapshot', [
    'day_key'=>$dayKey,
    'left'=>$accLeft,
    'used'=>$accUsedBytes,
    'limit'=>$accLimit,
    'type'=>$accType,
    'daily_add_bytes'=>(int)($accWallet['daily_add_bytes'] ?? 0),
    'start_today'=>(int)($accWallet['start_today'] ?? 0),
  ]);

  if ($debug) {
    $out['__debug__'] = [
      'day_key' => $dayKey,
      'sync' => $sync,
      'measure' => 'reserved_plus_final',
      'today_start_iso' => date('c', $todayStart),
    ];
  }

  json_out($out);
}

if ($action === 'traffic_details') {
  $debug = debug_enabled();
  $days = (int)($_GET['days'] ?? 7);
  if ($days < 1) $days = 1;
  if ($days > 31) $days = 31;

  $sync = null;
  try {
    $t = traffic_db();
    $sync = traffic_sync_from_legacy_sqlite($t, $days, 500, 200);
  } catch (Throwable $e) {
    $sync = ['ok'=>false,'error'=>$e->getMessage()];
  }

  $t = traffic_db();
  $end = time();
  $start = $end - (($days - 1) * 86400);
  $details = traffic_get_details($t, $start, $end);

  if ($debug) {
    $details['__debug__'] = [
      'days' => $days,
      'sync' => $sync,
      'measure' => 'reserved_plus_final',
    ];
  }

  json_out($details);
}



// ===== admin endpoints =====
if ($action === 'admin_reconcile_run') {
  require_admin();
  $days = (int)($_GET['days'] ?? 7);
  if ($days < 1) $days = 1;
  if ($days > 30) $days = 30;
  $cleanup = (int)($_GET['cleanup_days'] ?? 30);
  if ($cleanup < 7) $cleanup = 7;
  if ($cleanup > 180) $cleanup = 180;
  $t = traffic_db();
  $r = traffic_reconcile_run($t, $days, $cleanup);
  json_out(['ok'=>true,'action'=>'reconcile','days'=>$days,'cleanup_days'=>$cleanup,'result'=>$r]);
}

if ($action === 'admin_ledger_list') {
  require_admin();
  $dayKey = trim((string)($_GET['day_key'] ?? today_key()));
  $host = trim((string)($_GET['host'] ?? ''));
  $limit = (int)($_GET['limit'] ?? 200);
  if ($limit < 1) $limit = 1;
  if ($limit > 2000) $limit = 2000;
  $t = traffic_db();

  if ($host !== '') {
    $st = $t->prepare('SELECT * FROM traffic_ledger WHERE day_key=? AND host=? ORDER BY updated_at DESC LIMIT ' . (int)$limit);
    $st->execute([$dayKey, $host]);
  } else {
    $st = $t->prepare('SELECT * FROM traffic_ledger WHERE day_key=? ORDER BY updated_at DESC LIMIT ' . (int)$limit);
    $st->execute([$dayKey]);
  }
  $rows = $st->fetchAll(PDO::FETCH_ASSOC);
  json_out(['ok'=>true,'day_key'=>$dayKey,'host'=>$host,'count'=>count($rows),'rows'=>$rows]);
}

if ($action === 'admin_ledger_cleanup') {
  require_admin();
  $cleanup = (int)($_GET['cleanup_days'] ?? 30);
  if ($cleanup < 7) $cleanup = 7;
  if ($cleanup > 365) $cleanup = 365;
  $t = traffic_db();
  $cutoff = time() - ($cleanup * 86400);
  $st = $t->prepare('DELETE FROM traffic_ledger WHERE created_at < ?');
  $st->execute([$cutoff]);
  json_out(['ok'=>true,'action'=>'cleanup','cleanup_days'=>$cleanup,'deleted'=>$st->rowCount()]);
}



if ($action === 'admin_daily_status') {
  require_admin();
  $dayKey = trim((string)($_GET['day_key'] ?? today_key()));
  $t = traffic_db();
  $cfg = load_snapshot_config();

  // ensure reconcile ran today
  try { traffic_reconcile_maybe($t); } catch (Throwable $e) {}

  $daily = traffic_get_daily($t, $dayKey);

  $out = [];
  foreach ($daily as $host => $v) {
    $usedReserved = (int)($v['reserved_bytes'] ?? 0);
    $usedFinal = (int)($v['final_bytes'] ?? 0);
    $links = (int)($v['uniq_links'] ?? 0);

    if ($host === '__account__') {
      $q = traffic_quota_bytes_for($t, $cfg, $dayKey, 'account', '__account__');
    } else {
      $q = traffic_quota_bytes_for($t, $cfg, $dayKey, 'host', $host);
    }

    $leftReserved = $q['cap_bytes'] - $usedReserved;
    if ($leftReserved < 0) $leftReserved = 0;
    $leftFinal = $q['cap_bytes'] - $usedFinal;
    if ($leftFinal < 0) $leftFinal = 0;

    $out[$host] = [
      'uniq_links' => $links,
      'used_reserved_bytes' => $usedReserved,
      'used_final_bytes' => $usedFinal,
      'left_reserved_bytes' => $leftReserved,
      'left_final_bytes' => $leftFinal,
      'quota' => [
        'daily_bytes' => (int)($q['daily_bytes'] ?? 0),
        'over_bytes' => (int)($q['over_bytes'] ?? 0),
        'cap_bytes' => (int)($q['cap_bytes'] ?? 0),
        'override' => (bool)($q['override'] ?? false),
        'effective_from_day' => (string)($q['effective_from_day'] ?? ''),
      ],
    ];
  }

  json_out(['ok'=>true,'day_key'=>$dayKey,'hosts'=>$out]);
}

if ($action === 'admin_quota_set') {
  require_admin();
  $scope = trim((string)($_GET['scope'] ?? ''));
  $host = trim((string)($_GET['host'] ?? ''));
  if ($scope !== 'account' && $scope !== 'host') json_error(400, 1, 'Missing/invalid parameter (scope)', 'BAD_REQUEST');
  if ($scope === 'account') { $host = '__account__'; }
  if ($host === '') json_error(400, 1, 'Missing parameter (host)', 'BAD_REQUEST');

  $dayKey = trim((string)($_GET['effective_from_day'] ?? today_key()));
  $dailyGb = (float)($_GET['daily_gb'] ?? 0);
  $dailyBytes = (int)round($dailyGb * 1024 * 1024 * 1024);
  $maxGb = (float)($_GET['max_gb'] ?? 0);
  $maxBytes = (int)round($maxGb * 1024 * 1024 * 1024);
  $reason = trim((string)($_GET['reason'] ?? ''));

  if ($dailyBytes < 0) $dailyBytes = 0;
  if ($maxBytes < 0) $maxBytes = 0;

  $t = traffic_db();
  $t->prepare('INSERT INTO traffic_quota_override(scope,host,effective_from_day,daily_add_bytes,max_bytes,updated_at,reason,actor) VALUES(?,?,?,?,?,?,?,?)'
    . ' ON CONFLICT(scope,host) DO UPDATE SET effective_from_day=excluded.effective_from_day,daily_add_bytes=excluded.daily_add_bytes,max_bytes=excluded.max_bytes,updated_at=excluded.updated_at,reason=excluded.reason,actor=excluded.actor')
    ->execute([$scope,$host,$dayKey,$dailyBytes,$maxBytes,time(),$reason,admin_actor()]);

  traffic_admin_log($t, 'quota_set', ['scope'=>$scope,'host'=>$host,'effective_from_day'=>$dayKey,'daily_add_bytes'=>$dailyBytes,'max_bytes'=>$maxBytes,'reason'=>$reason]);
  json_out(['ok'=>true,'scope'=>$scope,'host'=>$host,'effective_from_day'=>$dayKey,'daily_add_bytes'=>$dailyBytes,'max_bytes'=>$maxBytes]);
}

if ($action === 'admin_quota_get') {
  require_admin();
  $scope = trim((string)($_GET['scope'] ?? ''));
  $host = trim((string)($_GET['host'] ?? ''));
  if ($scope === '') $scope = 'host';
  if ($scope !== 'account' && $scope !== 'host') json_error(400, 1, 'Missing/invalid parameter (scope)', 'BAD_REQUEST');
  if ($scope === 'account') { $host = '__account__'; }
  if ($host === '') json_error(400, 1, 'Missing parameter (host)', 'BAD_REQUEST');
  $dayKey = trim((string)($_GET['day_key'] ?? today_key()));
  $t = traffic_db();
  $r = traffic_quota_get_effective($t, $scope, $host, $dayKey);
  json_out(['ok'=>true,'scope'=>$scope,'host'=>$host,'day_key'=>$dayKey,'effective_override'=>(bool)($r['override'] ?? false),'row'=>$r['row'] ?? null]);
}

if ($action === 'admin_daily_adjust') {
  require_admin();
  $dayKey = trim((string)($_GET['day_key'] ?? today_key()));
  $host = trim((string)($_GET['host'] ?? ''));
  if ($host === '') json_error(400, 1, 'Missing parameter (host)', 'BAD_REQUEST');
  $deltaGb = (float)($_GET['delta_final_gb'] ?? 0);
  $deltaFinal = (int)round($deltaGb * 1024 * 1024 * 1024);
  $reason = trim((string)($_GET['reason'] ?? ''));
  $t = traffic_db();
  $t->prepare('INSERT INTO traffic_daily_adjust(day_key,host,delta_final_bytes,delta_reserved_bytes,updated_at,reason,actor) VALUES(?,?,?,?,?,?,?)'
    . ' ON CONFLICT(day_key,host) DO UPDATE SET delta_final_bytes=delta_final_bytes+excluded.delta_final_bytes, updated_at=excluded.updated_at, reason=excluded.reason, actor=excluded.actor')
    ->execute([$dayKey,$host,$deltaFinal,0,time(),$reason,admin_actor()]);
  traffic_admin_log($t, 'daily_adjust', ['day_key'=>$dayKey,'host'=>$host,'delta_final_bytes'=>$deltaFinal,'reason'=>$reason]);
  json_out(['ok'=>true,'day_key'=>$dayKey,'host'=>$host,'delta_final_bytes'=>$deltaFinal]);
}

if ($action === 'admin_ledger_fix_size') {
  require_admin();
  $dayKey = trim((string)($_GET['day_key'] ?? ''));
  $host = trim((string)($_GET['host'] ?? ''));
  $norm = trim((string)($_GET['norm_url'] ?? ''));
  if ($dayKey === '' || $host === '' || $norm === '') json_error(400, 1, 'Missing parameters (day_key, host, norm_url)', 'BAD_REQUEST');
  $sizeGb = (float)($_GET['size_gb'] ?? 0);
  $sizeBytes = (int)round($sizeGb * 1024 * 1024 * 1024);
  $reason = trim((string)($_GET['reason'] ?? ''));
  if ($sizeBytes < 0) $sizeBytes = 0;
  $t = traffic_db();
  // fetch current
  $st = $t->prepare('SELECT state,status_code FROM traffic_ledger WHERE day_key=? AND host=? AND norm_url=?');
  $st->execute([$dayKey,$host,$norm]);
  $r = $st->fetch(PDO::FETCH_ASSOC);
  if (!$r) json_error(404, 7, 'Resource not found (ledger row)', 'NOT_FOUND');
  $state = (string)($r['state'] ?? '');
  $statusCode = (string)($r['status_code'] ?? '');
  $isDone = ($state === 'done' || $statusCode === '1');

  $t->prepare('UPDATE traffic_ledger SET size_known=?, size_bytes=?, final_bytes=?, reserve_bytes=?, needs_reconcile=1, updated_at=? WHERE day_key=? AND host=? AND norm_url=?')
    ->execute([$sizeBytes>0?1:0, $sizeBytes, $isDone ? $sizeBytes : 0, 0, time(), $dayKey, $host, $norm]);

  traffic_admin_log($t, 'ledger_fix_size', ['day_key'=>$dayKey,'host'=>$host,'norm_url'=>$norm,'size_bytes'=>$sizeBytes,'reason'=>$reason]);
  // rebuild daily
  traffic_daily_rebuild_from_ledger($t, $dayKey, $host);
  traffic_daily_rebuild_account_from_ledger($t, $dayKey);
  json_out(['ok'=>true,'day_key'=>$dayKey,'host'=>$host,'norm_url'=>$norm,'size_bytes'=>$sizeBytes,'done'=>$isDone]);
}

if ($action === 'refresh_public_link') {
  $jobId = trim((string)($_GET['job_id'] ?? ''));
  if ($jobId === '') json_error(400, 1, 'Missing parameter (job_id)', 'BAD_REQUEST');

  $verify = !isset($_GET['verify']) || (string)$_GET['verify'] !== '0';
  $force = !empty($_GET['force']);
  $prefer = (string)($_GET['prefer_region'] ?? 'sg');
  if ($prefer !== 'sg' && $prefer !== 'us') $prefer = 'sg';

  // rate limit: per job + per ip
  $ip = get_client_ip();

  // Min-interval: 2s per IP + job (anti-abuse)
  if ($ip !== '' && $ip !== '127.0.0.1' && $ip !== '::1') {
    $rl = ratelimit_min_interval($pdo, 'min:refresh:' . $ip . ':' . $jobId, 2);
    if (empty($rl['allow'])) {
      json_error(429, 5, 'Slow down (min interval 2s)', 'RATE_LIMIT_MIN_INTERVAL', ['scope'=>'refresh','job_id'=>$jobId], (int)($rl['retry_after_sec'] ?? 2));
    }
  }

  // Per-job cooldown (anti-spam burst)
  if (!ratelimit_allow($pdo, 'refresh_job:' . $jobId . ':cooldown', 30, 1)) {
    json_error(429, 5, 'Slow down (refresh job cooldown 30s)', 'RATE_LIMIT_JOB_COOLDOWN', [], 30);
  }

  // Per-job buckets: 3/15m, 5/1h, 20/day
  if (!ratelimit_allow($pdo, 'refresh_job:' . $jobId . ':15m', 15*60, 3)) {
    json_error(429, 5, 'Slow down (refresh job 3/15m limit exceeded)', 'RATE_LIMIT_JOB_15M', [], 15*60);
  }
  if (!ratelimit_allow($pdo, 'refresh_job:' . $jobId . ':1h', 60*60, 5)) {
    json_error(429, 5, 'Slow down (refresh job 5/1h limit exceeded)', 'RATE_LIMIT_JOB_1H', [], 60*60);
  }
  if (!ratelimit_allow($pdo, 'refresh_job:' . $jobId . ':1d', 24*60*60, 20)) {
    json_error(429, 5, 'Slow down (refresh job 20/day limit exceeded)', 'RATE_LIMIT_JOB_DAY', [], 24*60*60);
  }

  // Per-IP buckets (protect service): refresh 100/hour, 500/day
  if ($ip !== '') {
    if (!ratelimit_allow($pdo, 'refresh_ip:' . $ip . ':1h', 60*60, 100)) {
      json_error(429, 5, 'Slow down (refresh IP 100/1h limit exceeded)', 'RATE_LIMIT_IP_1H', [], 60*60);
    }
    if (!ratelimit_allow($pdo, 'refresh_ip:' . $ip . ':1d', 24*60*60, 500)) {
      json_error(429, 5, 'Slow down (refresh IP 500/day limit exceeded)', 'RATE_LIMIT_IP_DAY', [], 24*60*60);
    }
  }

  // load job
  $stmt = $pdo->prepare('SELECT * FROM jobs WHERE job_id=?');
  $stmt->execute([$jobId]);
  $job = $stmt->fetch(PDO::FETCH_ASSOC);
  if (!$job) json_error(404, 7, 'Resource not found (job_id)', 'NOT_FOUND');

  // If no public yet, attempt sync once
  if (empty($job['direct_url_public'])) {
    @sync_from_files($pdo, $jobId, ['debug'=>false]);
    $stmt->execute([$jobId]);
    $job = $stmt->fetch(PDO::FETCH_ASSOC) ?: $job;
  }

  $oldPublic = (string)($job['direct_url_public'] ?? '');
  if ($oldPublic === '') {
    json_out([
      'ok'=>true,
      'job_id'=>$jobId,
      'action'=>'need_wait',
      'status'=>(string)($job['status'] ?? 'processing'),
      'hint'=>'No public link yet. Poll query_status.',
      'need_resubmit'=>false,
    ]);
  }

  $verifyRes = null;
  if ($force) {
    $verifyRes = ['enabled'=>$verify,'alive'=>false,'http_code'=>0,'error'=>'force'];
  } elseif ($verify) {
    $vr = verify_url_alive($oldPublic, 3);
    $verifyRes = ['enabled'=>true,'alive'=>(bool)($vr['alive'] ?? false),'http_code'=>(int)($vr['http_code'] ?? 0),'error'=>$vr['error'] ?? null];

    if (($vr['alive'] ?? false) === true) {
      $newPublic = $oldPublic;
      if ($prefer === 'sg') $newPublic = rewrite_region_us_to_sg($oldPublic);
      // (prefer=us not implemented yet; keep as-is)

      json_out([
        'ok'=>true,
        'job_id'=>$jobId,
        'action'=>'rewrite_region',
        'verify'=>$verifyRes,
        'old_public'=>$oldPublic,
        'new_public'=>$newPublic,
        'prefer_region'=>$prefer,
        'need_resubmit'=>false,
      ]);
    }
  } else {
    $verifyRes = ['enabled'=>false];
  }

  // Dead (or forced): find pve
  $pve = '';
  $pve0 = (string)($job['direct_url_pve'] ?? '');
  if ($pve0 !== '' && preg_match('~^https?://pve~i', $pve0)) $pve = $pve0;
  if ($pve === '') {
    $in0 = (string)($job['direct_url_internal'] ?? '');
    if ($in0 !== '' && preg_match('~^https?://pve~i', $in0)) $pve = $in0;
  }

  // If still no pve: try getlink.db by legacy hash
  if ($pve === '') {
    $host = (string)($job['host'] ?? '');
    $legacyHash = (string)($job['legacy_hash'] ?? '');
    if ($host !== '' && $legacyHash !== '') {
      $dbChk = leechmng_local_db_lookup_fuzzy($host, $legacyHash, 5);
      if (!empty($dbChk['direct_url']) && preg_match('~^https?://pve~i', (string)$dbChk['direct_url'])) {
        $pve = (string)$dbChk['direct_url'];
      }
    }
  }

  if ($pve === '') {
    json_out([
      'ok'=>true,
      'job_id'=>$jobId,
      'action'=>'need_resubmit',
      'reason'=>'no_pve_found',
      'verify'=>$verifyRes,
      'old_public'=>$oldPublic,
      'need_resubmit'=>true,
    ]);
  }

  // Verify pve (Boss: timeout -> dead)
  $pveV = verify_url_alive($pve, 3);
  if (empty($pveV['alive'])) {
    json_out([
      'ok'=>true,
      'job_id'=>$jobId,
      'action'=>'need_resubmit',
      'reason'=>'pve_dead',
      'verify'=>$verifyRes,
      'pve_verify'=>['alive'=>false,'http_code'=>(int)($pveV['http_code'] ?? 0),'error'=>$pveV['error'] ?? null],
      'old_public'=>$oldPublic,
      'need_resubmit'=>true,
    ]);
  }

  $conv = convert_internal_to_public($pve);
  if (empty($conv['ok']) || empty($conv['public'])) {
    json_error(503, 25, 'Service unavailable (convert pve->public failed)', 'GATE_FAILED', [
      'job_id' => $jobId,
      'verify' => $verifyRes,
      'old_public' => $oldPublic,
      'need_resubmit' => true,
      'upstream_error' => (string)($conv['error'] ?? 'convert failed'),
    ]);

  }

  $newPublic = (string)$conv['public'];
  $now = time();
  $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
    ->execute([$now,'ready','1',$newPublic,$now,$newPublic,$now,'gate_refresh',$jobId]);

  json_out([
    'ok'=>true,
    'job_id'=>$jobId,
    'action'=>'regenerate_from_pve',
    'verify'=>$verifyRes,
    'old_public'=>$oldPublic,
    'new_public'=>$newPublic,
    'need_resubmit'=>false,
  ]);
}

if ($action === 'resubmit_link') {
  $link = trim((string)($_GET['link'] ?? ''));
  if ($link === '') json_error(400, 1, 'Missing parameter (link)', 'BAD_REQUEST');

  // Rate limit per normalized link + per IP
  $norm = normalize_url_for_uniqueness(normalize_url($link));
  if ($norm === '') $norm = normalize_url_for_uniqueness($link);
  $ip = get_client_ip();

  // Min-interval: 2s per IP + normalized link (anti-abuse)
  if ($ip !== '' && $ip !== '127.0.0.1' && $ip !== '::1') {
    $normHash = substr(sha1($norm), 0, 16);
    $rl = ratelimit_min_interval($pdo, 'min:resubmit:' . $ip . ':' . $normHash, 2);
    if (empty($rl['allow'])) {
      json_error(429, 5, 'Slow down (min interval 2s)', 'RATE_LIMIT_MIN_INTERVAL', ['scope'=>'resubmit','link_norm'=>$norm], (int)($rl['retry_after_sec'] ?? 2));
    }
  }

  // Per-link buckets: 3/15m, 5/1h, 20/day
  if (!ratelimit_allow($pdo, 'resubmit_link:' . $norm . ':15m', 15*60, 3)) {
    json_error(429, 5, 'Slow down (resubmit link 3/15m limit exceeded)', 'RATE_LIMIT_RESUBMIT_LINK_15M', ['link_norm'=>$norm], 15*60);
  }
  if (!ratelimit_allow($pdo, 'resubmit_link:' . $norm . ':1h', 60*60, 5)) {
    json_error(429, 5, 'Slow down (resubmit link 5/1h limit exceeded)', 'RATE_LIMIT_RESUBMIT_LINK_1H', ['link_norm'=>$norm], 60*60);
  }
  if (!ratelimit_allow($pdo, 'resubmit_link:' . $norm . ':1d', 24*60*60, 20)) {
    json_error(429, 5, 'Slow down (resubmit link 20/day limit exceeded)', 'RATE_LIMIT_RESUBMIT_LINK_DAY', ['link_norm'=>$norm], 24*60*60);
  }

  // Per-IP buckets (protect service): resubmit 50/hour, 200/day
  if ($ip !== '') {
    if (!ratelimit_allow($pdo, 'resubmit_ip:' . $ip . ':1h', 60*60, 50)) {
      json_error(429, 5, 'Slow down (resubmit IP 50/1h limit exceeded)', 'RATE_LIMIT_RESUBMIT_IP_1H', [], 60*60);
    }
    if (!ratelimit_allow($pdo, 'resubmit_ip:' . $ip . ':1d', 24*60*60, 200)) {
      json_error(429, 5, 'Slow down (resubmit IP 200/day limit exceeded)', 'RATE_LIMIT_RESUBMIT_IP_DAY', [], 24*60*60);
    }
  }

  // Reuse submit pipeline via internal HTTP call
  $url = API_BASE_URL . 'api.php?action=submit_link_get&pass=' . urlencode((string)($_GET['pass'] ?? '')) . '&link=' . urlencode($link);
  $r = curl_request($url, 'GET', null, ['X-GL-Internal: 1']);
  $body = (string)($r['body'] ?? '');
  $j = json_decode($body, true);
  if (!is_array($j)) {
    json_error(503, 25, 'Service unavailable (upstream error)', 'UPSTREAM_ERROR', ['raw'=>$body]);
  }
  $j['action'] = 'resubmitted';
  json_out($j);
}

if ($action === 'submit_link' || $action === 'submit_link_get') {
  // submit_link: POST JSON {link:"..."}
  // submit_link_get: GET ?link=...
  $link = '';
  if ($action === 'submit_link') {
    $link = trim((string)($body['link'] ?? ($_GET['link'] ?? '')));
  } else {
    $link = trim((string)($_GET['link'] ?? ''));
  }
  if ($link === '') json_error(400, 1, 'Missing parameter (link)', 'BAD_REQUEST');

  // Precheck (FAIL-OPEN): use local checklink to fetch filename/filesize.
  // Policy (C): do NOT rewrite the submitted URL based on checklink fullurl.
  // Rationale: dead/locked links may redirect to generic pages (e.g. /article/premium) which would poison leechmng + quotas.
  $submittedLink = $link;
  $pre = checklink_precheck($link);

  // Effective link for engine/hash: normalize wrapper URLs (.html) but keep submittedLink for audit/response.
  // This ensures legacy_hash matches what engine/leechmng will key on.
  $effectiveLink = normalize_url($submittedLink);
  if ($effectiveLink === '') $effectiveLink = $submittedLink;

  // Diehost/maintenance: reject early (based on legacy diehost list)
  if (is_diehost($effectiveLink)) {
    json_error(400, 16, 'Unsupported hoster (host is not supported/maintenance)', 'HOST_DIE', [
      'origin_url' => $submittedLink,
      'host' => canonical_host_from_url($effectiveLink),
      'hint' => 'Please use a supported host or try again later.',
    ]);
  }

  // Canonical host (groups aliases like rg.to/rapidgator.com -> rapidgator.net)
  $hostCanon = canonical_host_from_url($effectiveLink);

  // Min-interval: 2s per IP + host (anti-abuse)
  if (empty($_SERVER['HTTP_X_GL_INTERNAL'])) {
  $ip = get_client_ip();
    if ($ip !== '' && $ip !== '127.0.0.1' && $ip !== '::1') {
      $rl = ratelimit_min_interval($pdo, 'min:submit:' . $ip . ':' . $hostCanon, 2);
      if (empty($rl['allow'])) {
        json_error(429, 5, 'Slow down (min interval 2s)', 'RATE_LIMIT_MIN_INTERVAL', ['scope'=>'submit','host'=>$hostCanon], (int)($rl['retry_after_sec'] ?? 2));
      }
    }
  }
  $rt = load_api_routing();
  $cfg = load_snapshot_config();

  // Daily reconcile (bounded backfill + cleanup)
  try { $t = traffic_db(); traffic_reconcile_maybe($t); } catch (Throwable $e) {}


  // Account daily link limit (unique URLs/day)
  $accMaxLink = (int)($cfg['acc_max_link'] ?? 0);
  if ($accMaxLink > 0) {
    $lstAll = list_links_today($pdo, [
      'todayStart' => today_start_ts(),
      'dayKey' => today_key(),
      'source' => 'sqlite',
      'host' => '',
      'limit' => 1,
      'offset' => 0,
      'debug' => false,
    ]);
    $accCountToday = (int)($lstAll['counts']['acc_links_today'] ?? 0);

    if ($accCountToday >= $accMaxLink) {
      $jobId = substr(hash('sha256', $submittedLink . '|' . microtime(true)), 0, 16);
      $now = time();
      $mngHash = md5($effectiveLink . API_SELF_URL . ':' . $jobId);
      $pdo->prepare('INSERT INTO jobs(job_id, origin_url, created_at, updated_at, status, status_code, host, filename, size_text, size_bytes, mng_hash) VALUES(?,?,?,?,?,?,?,?,?,?,?)')
        ->execute([
          $jobId,
          $submittedLink,
          $now,
          $now,
          'rejected',
          '7',
          $hostCanon,
          ($pre['filename'] ?? null) ?: null,
          ($pre['filesize_text'] ?? null) ?: null,
          ($pre['filesize_bytes'] ?? null) ?: null,
          $mngHash,
        ]);
      api_jobs_set_legacy_hash($pdo, (string)$jobId, (string)$effectiveLink);


      @legacy_dat_upsert_reject([
        'url' => $effectiveLink,
        'host' => $hostCanon,
        'status' => 7,
        'filename' => (string)($pre['filename'] ?? ''),
        'size_text' => (string)($pre['filesize_text'] ?? ''),
        'size_bytes' => (int)($pre['filesize_bytes'] ?? 0),
        'bw_charge_bytes' => (int)($pre['filesize_bytes'] ?? 0),
        'direct_url' => '',
        'owner' => (string)((load_account_identity()['secureid'] ?? '') ?: ''),
      ]);

      $retryAfter = (today_start_ts() + 86400) - time();
      if ($retryAfter < 1) $retryAfter = 60;
      
      // Ledger: status 7 reject does not count BW (reserve=0, final=0) but counts uniq link/day.
      try {
        $t = traffic_db();
        traffic_ledger_mark_reject($t, today_key(), '__account__', normalize_url_for_uniqueness($submittedLink), '7', 'rejected', (int)($pre['filesize_bytes'] ?? 0), $jobId);
        traffic_daily_rebuild_account_from_ledger($t, today_key());
      } catch (Throwable $e) {
        // ignore
      }

json_error(429, 36, 'Fair usage limit reached (account link limit exceeded)', 'ACCOUNT_LINK_LIMIT_REACHED', [
        'origin_url' => $submittedLink,
        'host' => $hostCanon,
        'acc_max_link' => $accMaxLink,
        'count_today' => $accCountToday,
        'job_id' => $jobId,
        'limit_type' => 'links',
        'scope' => 'account',
        'hint' => 'Please wait for daily reset or reduce the number of generated links.',
      ], $retryAfter);
    }
  }

  // Max_link host check. If over limit: record in DB + notify leechmng with special status.
  $maxLink = null;
  if (isset($cfg['hosts'][$hostCanon]['max_link'])) {
    $maxLink = (int)$cfg['hosts'][$hostCanon]['max_link'];
  }
  // If not in snapshot, fall back to 'other'
  if (($maxLink === null || $maxLink <= 0) && isset($cfg['hosts']['other']['max_link'])) {
    $maxLink = (int)$cfg['hosts']['other']['max_link'];
  }

  if ($maxLink && $maxLink > 0) {
    // Limit UNIQUE URLs per host per day (00:00 Asia/Ho_Chi_Minh)
    $todayStart = today_start_ts();

    // Prefer merged unique list as single source of truth
    $lstHost = list_links_today($pdo, [
      'todayStart' => $todayStart,
      'dayKey' => today_key(),
      // IMPORTANT: submit must be fast; avoid scanning legacy .dat on every submit.
      'source' => 'sqlite',
      'host' => $hostCanon,
      'limit' => 1,
      'offset' => 0,
      'debug' => false,
    ]);
    $countToday = (int)($lstHost['counts']['host_links_today'] ?? 0);

    if ($countToday >= $maxLink) {
      // Create job (keeps audit trail)
      $jobId = substr(hash('sha256', $submittedLink . '|' . microtime(true)), 0, 16);
      $now = time();
      $mngHash = md5($effectiveLink . API_SELF_URL . ':' . $jobId);
      $pdo->prepare('INSERT INTO jobs(job_id, origin_url, created_at, updated_at, status, status_code, host, filename, size_text, size_bytes, mng_hash) VALUES(?,?,?,?,?,?,?,?,?,?,?)')
        ->execute([
          $jobId,
          $submittedLink,
          $now,
          $now,
          'rejected',
          '7',
          $hostCanon,
          ($pre['filename'] ?? null) ?: null,
          ($pre['filesize_text'] ?? null) ?: null,
          ($pre['filesize_bytes'] ?? null) ?: null,
          $mngHash,
        ]);
      api_jobs_set_legacy_hash($pdo, (string)$jobId, (string)$effectiveLink);



      // DB-first cache-bypass for reject status=7 (host max_link). Verify PVE alive before conversion.
      try {
        $dbChk = leechmng_local_db_lookup_by_link($hostCanon, $submittedLink);
        if (empty($dbChk['direct_url'])) $dbChk = leechmng_local_db_lookup_by_link($hostCanon, $effectiveLink);
        if (empty($dbChk['direct_url'])) {
          $legacyHash2 = md5($effectiveLink . API_SELF_URL);
          $dbChk = leechmng_local_db_lookup_fuzzy($hostCanon, $legacyHash2, 5);
        }
        if (!empty($dbChk['direct_url'])) {
          $dl2 = (string)$dbChk['direct_url'];
          if (is_internal_directlink($dl2) && preg_match('~^https?://pve~i', $dl2)) {
            $pv = verify_url_alive($dl2, 3);
            if (empty($pv['alive'])) throw new Exception('pve dead');
          }

          $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_pve=?, direct_url_updated_at=? WHERE job_id=?')
            ->execute([$dl2, (preg_match('~^https?://pve~i', $dl2) ? $dl2 : null), time(), $jobId]);

          $pub2 = null;
          if (is_public_directlink($dl2)) $pub2 = $dl2;
          elseif (is_internal_directlink($dl2)) {
            $conv2 = convert_internal_to_public($dl2);
            if (($conv2['ok'] ?? false) && !empty($conv2['public'])) $pub2 = (string)$conv2['public'];
          }

          if ($pub2) {
            $now2 = time();
            $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
              ->execute([$now2,'ready','1',$pub2,$now2,$pub2,$now2,'mng_db',$jobId]);

            // Ledger: DONE should count final_bytes (reserve=0).
            try {
              $tL = traffic_db();
              $normX = normalize_url_for_uniqueness($submittedLink);
              $dayKeyDebt = today_key();
              $usedBeforeDebt = traffic_get_used_bytes_today($tL, $dayKeyDebt, $hostCanon);
              traffic_ledger_mark_done($tL, today_key(), $hostCanon, $normX, (int)($pre['filesize_bytes'] ?? 0), $jobId);
              traffic_daily_rebuild_from_ledger($tL, today_key(), $hostCanon);
              traffic_daily_rebuild_account_from_ledger($tL, today_key());
              $usedAfterDebt = traffic_get_used_bytes_today($tL, $dayKeyDebt, $hostCanon);
              traffic_record_debt_inc_after_done($tL, $dayKeyDebt, $hostCanon, (int)$usedBeforeDebt, (int)$usedAfterDebt);

            } catch (Throwable $e) {
              // ignore
            }


            @legacy_dat_upsert_reject([
              'url' => $effectiveLink,
              'host' => $hostCanon,
              'status' => 1,
              'filename' => (string)($pre['filename'] ?? ''),
              'size_text' => (string)($pre['filesize_text'] ?? ''),
              'size_bytes' => (int)($pre['filesize_bytes'] ?? 0),
              'bw_charge_bytes' => (int)($pre['filesize_bytes'] ?? 0),
              'direct_url' => $pub2,
              'owner' => (string)((load_account_identity()['secureid'] ?? '') ?: ''),
            ]);

            // Option A: record traffic_seen
            try {
              $t4 = traffic_db();
              $bytesT = (int)($pre['filesize_bytes'] ?? 0);
              if ($bytesT > 0) {
                $candNorm2 = normalize_url_for_uniqueness($submittedLink);
                if ($candNorm2 !== '') {
                  traffic_record_seen($t4, [
                    'day_key' => today_key(),
                    'host' => $hostCanon,
                    'norm_url' => $candNorm2,
                    'size_bytes' => $bytesT,
                    'source' => 'api_submit',
                    'ref_id' => $jobId,
                  ]);
                }
              }
            } catch (Throwable $e) {
              // ignore
            }

            // Still notify leechmng for tracking (best-effort)
            try {
              $endpointLm = leechmng_endpoint_for_host($hostCanon);
              $passleech = (string)($rt['passleech'] ?? '');
              $id = load_account_identity();
              if ($endpointLm && $passleech) {
                $post = [
                  'user' => API_SELF_URL,
                  'email' => $id['email'],
                  'mobile' => $id['mobile'],
                  'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
                  'url' => $effectiveLink,
                  'md5' => md5($effectiveLink . API_SELF_URL),
                  'host' => $hostCanon,
                  'ip' => $_SERVER['REMOTE_ADDR'] ?? '127.0.0.1',
                  'isp' => '',
                  'status' => 7,
                  'filename' => (string)($pre['filename'] ?? ''),
                  'filesize' => (string)($pre['filesize_bytes'] ?? 0),
                ];
                $rLm2 = leechmng_savelink($endpointLm, $passleech, $post);
                leechmng_log_event(['kind'=>'notify_reject_dbfirst','job_id'=>$jobId,'status'=>7,'endpoint'=>$endpointLm,'ok'=>$rLm2['ok']??false,'http_code'=>$rLm2['http_code']??null,'resp'=>$rLm2]);
              }
            } catch (Throwable $e) {
              // ignore
            }

            // Ledger: bypass (host limit) still counts BW when we return a public link (Boss rule).
            try {
              $tL = traffic_db();
              $submitDay = day_key_for_ts($now);
              $normX = normalize_url_for_uniqueness($submittedLink);
              $sz = (int)($r['raw']['filesize'] ?? $candidateSize);
              if ($sz < 0) $sz = 0;
              if ($hostCanon !== '' && $normX !== '' && $sz > 0) {
                $usedBeforeDebt = traffic_get_used_bytes_today($tL, $submitDay, $hostCanon);
              traffic_ledger_mark_done($tL, $submitDay, $hostCanon, $normX, $sz, $jobId);
                traffic_daily_rebuild_from_ledger($tL, $submitDay, $hostCanon);
                traffic_daily_rebuild_account_from_ledger($tL, $submitDay);
              $usedAfterDebt = traffic_get_used_bytes_today($tL, $submitDay, $hostCanon);
              traffic_record_debt_inc_after_done($tL, $submitDay, $hostCanon, (int)$usedBeforeDebt, (int)$usedAfterDebt);

              }
            } catch (Throwable $e) {
              // ignore
            }

            json_out([
              'ok' => true,
              'job_id' => $jobId,
              'status' => 'ready',
              'status_code' => '1',
              'origin_url' => $submittedLink,
              'host' => $hostCanon,
              'direct_url' => $pub2,
              'direct_url_public' => $pub2,
              'note' => 'db-first bypass max_link (status=7)',
            ]);
          }
        }
      } catch (Throwable $e) {
        // ignore
      }
      // Notify leechmng for tracking (status=7 for max_link)
      $endpoint = leechmng_endpoint_for_host($hostCanon);
      $passleech = (string)($rt['passleech'] ?? '');
      $id = load_account_identity();
      if ($endpoint && $passleech) {
        // Use per-submit hash so leechmng appends a new record even for the same URL.
        $fullHash = $mngHash;
        $post = [
          // keep UI callback working: leechmng will call back to this user url
          'user' => API_SELF_URL,
          'email' => $id['email'],
          'mobile' => $id['mobile'],
          'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
          'url' => $effectiveLink,
          'md5' => $fullHash,
          'host' => $hostCanon,
          'ip' => $_SERVER['REMOTE_ADDR'] ?? '127.0.0.1',
          'isp' => '',
          'status' => 7,
          'filename' => (string)($pre['filename'] ?? ''),
          'filesize' => (string)($pre['filesize_bytes'] ?? 0),
        ];
        $r = leechmng_savelink($endpoint, $passleech, $post);
        $direct = leechmng_extract_directlink($r);
        leechmng_log_event(['kind'=>'reject_max_link','job_id'=>$jobId,'status'=>7,'endpoint'=>$endpoint,'ok'=>$r['ok']??false,'http_code'=>$r['http_code']??null,'resp'=>$r]);

        // NOTE: bypass-on-directlink is disabled (would leak origin server directlink).
      } else {
        leechmng_log_event(['kind'=>'reject_max_link','job_id'=>$jobId,'status'=>7,'endpoint'=>$endpoint,'ok'=>false,'error'=>'missing endpoint/passleech']);
      }

      // If leechmng returned a directlink, allow ONLY if we can convert to public.
      if (isset($r) && is_array($r)) {
        $dl = leechmng_extract_directlink($r);
        if ($dl) {
          // store internal
          $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_updated_at=? WHERE job_id=?')
            ->execute([$dl, time(), $jobId]);

          $pub = null;
          if (is_public_directlink($dl)) {
            $pub = $dl;
          } elseif (is_internal_directlink($dl)) {
            $conv = convert_internal_to_public($dl);
            if (($conv['ok'] ?? false) && !empty($conv['public'])) $pub = (string)$conv['public'];
          }

          if ($pub) {
            $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=? WHERE job_id=?')
              ->execute([time(), 'ready', '1', $pub, time(), $pub, $jobId]);

            @legacy_dat_upsert_reject([
              'url' => $effectiveLink,
              'host' => $hostCanon,
              'status' => 1,
              'filename' => (string)($r['raw']['filename'] ?? ($pre['filename'] ?? '')),
              'size_text' => (string)($pre['filesize_text'] ?? ''),
              'size_bytes' => (int)($r['raw']['filesize'] ?? ($pre['filesize_bytes'] ?? 0)),
              'bw_charge_bytes' => (int)($r['raw']['filesize'] ?? ($pre['filesize_bytes'] ?? 0)),
              'direct_url' => $pub,
              'owner' => (string)($id['secureid'] ?? ''),
            ]);

            json_out([
              'ok' => true,
              'job_id' => $jobId,
              'status' => 'ready',
              'status_code' => '1',
              'origin_url' => $submittedLink,
              'host' => $hostCanon,
              'direct_url' => $pub,
              'note' => 'leechmng returned directlink; bypassed max_link (public only)',
            ]);
          }
        }
      }

      // IMPORTANT: leechmng is remote; to make THIS account UI show the job, we must write local legacy .dat
      @legacy_dat_upsert_reject([
        'url' => $effectiveLink,
        'host' => $hostCanon,
        'status' => 7,
        'filename' => (string)($pre['filename'] ?? ''),
        'size_text' => (string)($pre['filesize_text'] ?? ''),
        'size_bytes' => (int)($pre['filesize_bytes'] ?? 0),
        'bw_charge_bytes' => (int)($pre['filesize_bytes'] ?? 0),
        'direct_url' => '',
        'owner' => (string)($id['secureid'] ?? ''),
      ]);



      // DB-first cache-bypass for reject status=7 (account max_link). Verify PVE alive before conversion.
      try {
        $dbChk = leechmng_local_db_lookup_by_link($hostCanon, $submittedLink);
        if (empty($dbChk['direct_url'])) $dbChk = leechmng_local_db_lookup_by_link($hostCanon, $effectiveLink);
        if (empty($dbChk['direct_url'])) {
          $legacyHash2 = md5($effectiveLink . API_SELF_URL);
          $dbChk = leechmng_local_db_lookup_fuzzy($hostCanon, $legacyHash2, 5);
        }
        if (!empty($dbChk['direct_url'])) {
          $dl2 = (string)$dbChk['direct_url'];
          if (is_internal_directlink($dl2) && preg_match('~^https?://pve~i', $dl2)) {
            $pv = verify_url_alive($dl2, 3);
            if (empty($pv['alive'])) throw new Exception('pve dead');
          }

          $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_pve=?, direct_url_updated_at=? WHERE job_id=?')
            ->execute([$dl2, (preg_match('~^https?://pve~i', $dl2) ? $dl2 : null), time(), $jobId]);

          $pub2 = null;
          if (is_public_directlink($dl2)) $pub2 = $dl2;
          elseif (is_internal_directlink($dl2)) {
            $conv2 = convert_internal_to_public($dl2);
            if (($conv2['ok'] ?? false) && !empty($conv2['public'])) $pub2 = (string)$conv2['public'];
          }

          if ($pub2) {
            $now2 = time();
            $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
              ->execute([$now2,'ready','1',$pub2,$now2,$pub2,$now2,'mng_db',$jobId]);

            @legacy_dat_upsert_reject([
              'url' => $effectiveLink,
              'host' => $hostCanon,
              'status' => 1,
              'filename' => (string)($pre['filename'] ?? ''),
              'size_text' => (string)($pre['filesize_text'] ?? ''),
              'size_bytes' => (int)($pre['filesize_bytes'] ?? 0),
              'bw_charge_bytes' => (int)($pre['filesize_bytes'] ?? 0),
              'direct_url' => $pub2,
              'owner' => (string)((load_account_identity()['secureid'] ?? '') ?: ''),
            ]);

            // Option A: record traffic_seen
            try {
              $t5 = traffic_db();
              $bytesT = (int)($pre['filesize_bytes'] ?? 0);
              $candNorm2 = normalize_url_for_uniqueness($submittedLink);
              if ($bytesT > 0 && $candNorm2 !== '') {
                traffic_record_seen($t5, [
                  'day_key' => today_key(),
                  'host' => $hostCanon,
                  'norm_url' => $candNorm2,
                  'size_bytes' => $bytesT,
                  'source' => 'api_submit',
                  'ref_id' => $jobId,
                ]);
              }
            } catch (Throwable $e) {
              // ignore
            }

            // Still notify leechmng for tracking (best-effort)
            try {
              $endpointLm = leechmng_endpoint_for_host($hostCanon);
              $passleech = (string)($rt['passleech'] ?? '');
              $id = load_account_identity();
              if ($endpointLm && $passleech) {
                $post = [
                  'user' => API_SELF_URL,
                  'email' => $id['email'],
                  'mobile' => $id['mobile'],
                  'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
                  'url' => $effectiveLink,
                  'md5' => md5($effectiveLink . API_SELF_URL),
                  'host' => $hostCanon,
                  'ip' => get_client_ip(),
                  'isp' => '',
                  'status' => 7,
                  'filename' => (string)($pre['filename'] ?? ''),
                  'filesize' => (string)($pre['filesize_bytes'] ?? 0),
                ];
                $rLm2 = leechmng_savelink($endpointLm, $passleech, $post);
                leechmng_log_event(['kind'=>'notify_reject_dbfirst','job_id'=>$jobId,'status'=>7,'endpoint'=>$endpointLm,'ok'=>$rLm2['ok']??false,'http_code'=>$rLm2['http_code']??null,'resp'=>$rLm2]);
              }
            } catch (Throwable $e) {
              // ignore
            }

            json_out([
              'ok' => true,
              'job_id' => $jobId,
              'status' => 'ready',
              'status_code' => '1',
              'origin_url' => $submittedLink,
              'host' => $hostCanon,
              'direct_url' => $pub2,
              'direct_url_public' => $pub2,
              'note' => 'db-first bypass account max_link (status=7)',
            ]);
          }
        }
      } catch (Throwable $e) {
        // ignore
      }
      $retryAfter = (today_start_ts() + 86400) - time();
      if ($retryAfter < 1) $retryAfter = 60;
      
      // Ledger: status 7 reject does not count BW (reserve=0, final=0) but counts uniq link/day.
      try {
        $t = traffic_db();
        $normX = normalize_url_for_uniqueness($submittedLink);
        traffic_ledger_mark_reject($t, today_key(), $hostCanon, $normX, '7', 'rejected', (int)($pre['filesize_bytes'] ?? 0), $jobId);
        traffic_daily_rebuild_from_ledger($t, today_key(), $hostCanon);
        traffic_daily_rebuild_account_from_ledger($t, today_key());
      } catch (Throwable $e) {
        // ignore
      }

json_error(429, 18, 'Hoster limit reached (host link limit exceeded)', 'HOST_LINK_LIMIT_REACHED', [
        'origin_url' => $submittedLink,
        'host' => $hostCanon,
        'max_link' => $maxLink,
        'count_today' => $countToday,
        'job_id' => $jobId,
        'limit_type' => 'links',
        'scope' => 'host',
        'hint' => 'Please wait for host link limit reset or try another host.',
      ], $retryAfter);
    }
  }

// BW quota checks (daily, reserved size per UNIQUE URL/day).
  // Uses merged items (.dat + sqlite) and adds current URL if not already present.
  $dayKey = today_key();
  $todayStart = today_start_ts();
  $candidateNorm = normalize_url_for_uniqueness($submittedLink);
  $candidateSize = (int)($pre['filesize_bytes'] ?? 0);
  if ($candidateSize < 0) $candidateSize = 0;

  // Ledger: record unique URL/day for host; reserve bytes only when size is known (<=1h).
  try {
    $t = traffic_db();
    $sizeKnown = ($candidateSize > 0) ? 1 : 0;
    $reserve = $sizeKnown ? $candidateSize : 0;
    $reserveUntil = $sizeKnown ? (time() + 3600) : 0;
    traffic_ledger_upsert($t, [
      'day_key' => $dayKey,
      'host' => $hostCanon,
      'norm_url' => $candidateNorm,
      'now' => time(),
      'size_known' => $sizeKnown,
      'size_bytes' => $candidateSize,
      'reserve_bytes' => $reserve,
      'final_bytes' => 0,
      'status_code' => 'new',
      'state' => 'new',
      'job_id_last' => '',
      'reserve_until_ts' => $reserveUntil,
      'needs_reconcile' => 1,
    ]);
  } catch (Throwable $e) {
    // ignore
  }


  // Account-wide
  // IMPORTANT: submit must be fast; avoid scanning legacy .dat on every submit.
  // BW enforcement is still reasonably accurate using API sqlite jobs as source of truth.
  $dayKeyT = $dayKey;
  if (!isset($t) || !($t instanceof PDO)) { $t = traffic_db(); }
  $allItems = build_links_today_items($pdo, [
    'todayStart' => $todayStart,
    'source' => 'sqlite',
    'host' => '',
    'debug' => false,
  ]);
  $accUsedBytes = 0;
  $seenNorm = [];
  foreach ($allItems as $it) {
    $n = (string)($it['norm'] ?? '');
    if ($n === '' || isset($seenNorm[$n])) continue;
    $seenNorm[$n] = true;
    $accUsedBytes += (int)($it['size_bytes'] ?? 0);
  }
  $candidateExists = ($candidateNorm !== '' && isset($seenNorm[$candidateNorm]));
  $accUsedNextBytes = $accUsedBytes + (!$candidateExists ? $candidateSize : 0);

  $accWallet = traffic_account_wallet_state($t, $cfg, $dayKeyT);
  $accLimitBytes = (int)($accWallet['cap_bytes'] ?? 0);

  if ($accLimitBytes > 0 && $accUsedNextBytes > $accLimitBytes) {
    $leechStatus = 6;
    phase1_compat_log('enforce_account_reject', ['day_key'=>$dayKeyT,'host'=>$hostCanon,'used_next'=>$accUsedNextBytes,'cap_bytes'=>$accLimitBytes,'status'=>$leechStatus]);
    $jobId = substr(hash('sha256', $submittedLink . '|' . microtime(true)), 0, 16);
    $now = time();
    $mngHash = md5($effectiveLink . API_SELF_URL . ':' . $jobId);
    $pdo->prepare('INSERT INTO jobs(job_id, origin_url, created_at, updated_at, status, status_code, host, filename, size_text, size_bytes, mng_hash) VALUES(?,?,?,?,?,?,?,?,?,?,?)')
      ->execute([
        $jobId, $submittedLink, $now, $now,
        'rejected', (string)$leechStatus, $hostCanon,
        ($pre['filename'] ?? null) ?: null,
        ($pre['filesize_text'] ?? null) ?: null,
        $candidateSize ?: null,
        $mngHash,
      ]);

    // Log leechmng status (5 cancel / 6 delay)
    $endpoint = leechmng_endpoint_for_host($hostCanon);
    $passleech = (string)($rt['passleech'] ?? '');
    $id = load_account_identity();
    if ($endpoint && $passleech) {
      // Use per-submit hash so leechmng appends a new record even for the same URL.
      $fullHash = $mngHash;
      $post = [
        'user' => API_SELF_URL,
        'email' => $id['email'],
        'mobile' => $id['mobile'],
        'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
        'url' => $effectiveLink,
        'md5' => $fullHash,
        'host' => $hostCanon,
        'ip' => $_SERVER['REMOTE_ADDR'] ?? '127.0.0.1',
        'isp' => '',
        'status' => $leechStatus,
        'filename' => (string)($pre['filename'] ?? ''),
        'filesize' => (string)$candidateSize,
      ];
      $r = leechmng_savelink($endpoint, $passleech, $post);
      $direct = leechmng_extract_directlink($r);
      leechmng_log_event(['kind'=>'reject_bw_acc','job_id'=>$jobId,'status'=>$leechStatus,'endpoint'=>$endpoint,'ok'=>$r['ok']??false,'http_code'=>$r['http_code']??null,'resp'=>$r]);

      // NOTE: bypass-on-directlink is disabled (would leak origin server directlink).
    } else {
      leechmng_log_event(['kind'=>'reject_bw_acc','job_id'=>$jobId,'status'=>$leechStatus,'endpoint'=>$endpoint,'ok'=>false,'error'=>'missing endpoint/passleech']);
    }

    // If leechmng returned a directlink, allow ONLY if we can convert to public.
    if (isset($r) && is_array($r)) {
      $dl = leechmng_extract_directlink($r);
      if ($dl) {
        $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_updated_at=? WHERE job_id=?')
          ->execute([$dl, time(), $jobId]);

        $pub = null;
        if (is_public_directlink($dl)) $pub = $dl;
        elseif (is_internal_directlink($dl)) {
          $conv = convert_internal_to_public($dl);
          if (($conv['ok'] ?? false) && !empty($conv['public'])) $pub = (string)$conv['public'];
        }

        if ($pub) {
          $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=? WHERE job_id=?')
            ->execute([time(), 'ready', '1', $pub, time(), $pub, $jobId]);
          @legacy_dat_upsert_reject([
            'url' => $effectiveLink,
            'host' => $hostCanon,
            'status' => 1,
            'filename' => (string)($r['raw']['filename'] ?? ($pre['filename'] ?? '')),
            'size_text' => (string)($pre['filesize_text'] ?? ''),
            'size_bytes' => (int)($r['raw']['filesize'] ?? $candidateSize),
            'bw_charge_bytes' => (int)($r['raw']['filesize'] ?? $candidateSize),
            'direct_url' => $pub,
            'owner' => (string)($id['secureid'] ?? ''),
          ]);
          json_out([
            'ok' => true,
            'job_id' => $jobId,
            'status' => 'ready',
            'status_code' => '1',
            'origin_url' => $submittedLink,
            'host' => $hostCanon,
            'direct_url' => $pub,
            'note' => 'leechmng returned directlink; bypassed BW limit (public only)',
          ]);
        }
      }
    }

    @legacy_dat_upsert_reject([
      'url' => $effectiveLink,
      'host' => $hostCanon,
      'status' => $leechStatus,
      'filename' => (string)($pre['filename'] ?? ''),
      'size_text' => (string)($pre['filesize_text'] ?? ''),
      'size_bytes' => (int)$candidateSize,
      'bw_charge_bytes' => (int)$candidateSize,
      'direct_url' => '',
      'owner' => (string)($id['secureid'] ?? ''),
    ]);

    $retryAfter = (today_start_ts() + 86400) - time();
    if ($retryAfter < 1) $retryAfter = 60;
    // RD-like: Traffic exhausted (23). We still use HTTP 429 so clients can retry after reset.
    
      // Ledger: status 5 reject does not count BW (reserve=0, final=0) but counts uniq link/day.
      try {
        $t = traffic_db();
        traffic_ledger_mark_reject($t, $dayKeyT, '__account__', $candNorm, '5', 'rejected', (int)$candBytes, $jobId);
        // also mark host side if known
        if (!empty($hostCanon)) traffic_ledger_mark_reject($t, $dayKeyT, $hostCanon, $candNorm, '5', 'rejected', (int)$candBytes, $jobId);
        traffic_daily_rebuild_account_from_ledger($t, $dayKeyT);
        if (!empty($hostCanon)) traffic_daily_rebuild_from_ledger($t, $dayKeyT, $hostCanon);
      } catch (Throwable $e) {
        // ignore
      }

json_error(429, 23, 'Traffic exhausted (account bandwidth limit exceeded)', 'ACCOUNT_BANDWIDTH_LIMIT_REACHED', [
      'origin_url' => $submittedLink,
      'job_id' => $jobId,
      'status_code' => (string)$leechStatus,
      'limit_type' => 'bytes',
      'scope' => 'account',
      'quota' => [
        'acc_cap_bytes' => $accLimitBytes,
        'acc_used_today_mb' => bytes_to_mb($accUsedBytes),
        'acc_used_next_mb' => bytes_to_mb($accUsedNextBytes),
      ],
      'hint' => ($leechStatus === 5) ? 'Account bandwidth limit exceeded. Please wait for reset or upgrade.' : 'Account bandwidth daily limit reached. Please retry later.',
    ], $retryAfter);
  }

  // Host-specific
  $hostCfg = $cfg['hosts'][$hostCanon] ?? ($cfg['hosts']['other'] ?? []);
  $hostDailyMb = (int)($hostCfg['daily_bw_mb'] ?? 0);
    try {
      $tq = traffic_db();
      $q = traffic_quota_get_effective($tq, 'host', $hostCanon, $dayKeyT);
      if (!empty($q['override']) && !empty($q['row'])) {
        $hostDailyMb = (int)round(((int)($q['row']['daily_add_bytes'] ?? 0)) / (1024*1024));
      }
    } catch (Throwable $e) {
      // ignore
    }

  $hostOverMb = (int)($hostCfg['overlimit_bw_mb'] ?? 0);
  if ($hostDailyMb > 0) {
    $hostItems = build_links_today_items($pdo, [
      'todayStart' => $todayStart,
      // IMPORTANT: submit must be fast; avoid scanning legacy .dat on every submit.
      'source' => 'sqlite',
      'host' => $hostCanon,
      'debug' => false,
    ]);
    $hostUsedBytes = 0;
    $seenHost = [];
    foreach ($hostItems as $it) {
      $n = (string)($it['norm'] ?? '');
      if ($n === '' || isset($seenHost[$n])) continue;
      $seenHost[$n] = true;
      $hostUsedBytes += (int)($it['size_bytes'] ?? 0);
    }
    $candidateExistsHost = ($candidateNorm !== '' && isset($seenHost[$candidateNorm]));
    $hostUsedNextBytes = $hostUsedBytes + (!$candidateExistsHost ? $candidateSize : 0);

    $hostDailyBytes = $hostDailyMb * 1024 * 1024;
    $hostOverBytes = $hostOverMb > 0 ? $hostOverMb * 1024 * 1024 : 0;

    // NOTE (Boss semantics / FIX): Host BW enforcement is handled by TRAFFIC_ENFORCE_V1 (traffic.sqlite: reserved+final+adjust)
    // to avoid drift with legacy/sqlite-scan. Do NOT reject here based on daily_soft.
    if (false && $hostUsedNextBytes > $hostDailyBytes) {
      $leechStatus = 6;
      $jobId = substr(hash('sha256', $submittedLink . '|' . microtime(true)), 0, 16);
      $now = time();
      $mngHash = md5($effectiveLink . API_SELF_URL . ':' . $jobId);
      $pdo->prepare('INSERT INTO jobs(job_id, origin_url, norm_url, created_at, updated_at, status, status_code, host, filename, size_text, size_bytes, mng_hash) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)')
        ->execute([
          $jobId, $submittedLink, $candidateNorm, $now, $now,
          'rejected', (string)$leechStatus, $hostCanon,
          ($pre['filename'] ?? null) ?: null,
          ($pre['filesize_text'] ?? null) ?: null,
          $candidateSize ?: null,
          $mngHash,
        ]);
      api_jobs_set_legacy_hash($pdo, (string)$jobId, (string)$effectiveLink);

      $endpoint = leechmng_endpoint_for_host($hostCanon);
      $passleech = (string)($rt['passleech'] ?? '');
      $id = load_account_identity();
      if ($endpoint && $passleech) {
        // Use per-submit hash so leechmng appends a new record even for the same URL.
        $fullHash = $mngHash;
        $post = [
          'user' => API_SELF_URL,
          'email' => $id['email'],
          'mobile' => $id['mobile'],
          'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
          'url' => $effectiveLink,
          'md5' => $fullHash,
          'host' => $hostCanon,
          'ip' => $_SERVER['REMOTE_ADDR'] ?? '127.0.0.1',
          'isp' => '',
          'status' => $leechStatus,
          'filename' => (string)($pre['filename'] ?? ''),
          'filesize' => (string)$candidateSize,
        ];
        $r = leechmng_savelink($endpoint, $passleech, $post);
        $direct = leechmng_extract_directlink($r);
        leechmng_log_event(['kind'=>'reject_bw_host','job_id'=>$jobId,'status'=>$leechStatus,'endpoint'=>$endpoint,'ok'=>$r['ok']??false,'http_code'=>$r['http_code']??null,'resp'=>$r]);

        // NOTE: bypass-on-directlink is disabled (would leak origin server directlink).
      } else {
        leechmng_log_event(['kind'=>'reject_bw_host','job_id'=>$jobId,'status'=>$leechStatus,'endpoint'=>$endpoint,'ok'=>false,'error'=>'missing endpoint/passleech']);
      }

      // If leechmng returned a directlink, allow ONLY if we can convert to public.
      if (isset($r) && is_array($r)) {
        $dl = leechmng_extract_directlink($r);
        if ($dl) {
          $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_updated_at=? WHERE job_id=?')
            ->execute([$dl, time(), $jobId]);

          $pub = null;
          if (is_public_directlink($dl)) $pub = $dl;
          elseif (is_internal_directlink($dl)) {
            $conv = convert_internal_to_public($dl);
            if (($conv['ok'] ?? false) && !empty($conv['public'])) $pub = (string)$conv['public'];
          }

          if ($pub) {
            $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=? WHERE job_id=?')
              ->execute([time(), 'ready', '1', $pub, time(), $pub, $jobId]);
            @legacy_dat_upsert_reject([
              'url' => $effectiveLink,
              'host' => $hostCanon,
              'status' => 1,
              'filename' => (string)($r['raw']['filename'] ?? ($pre['filename'] ?? '')),
              'size_text' => (string)($pre['filesize_text'] ?? ''),
              'size_bytes' => (int)($r['raw']['filesize'] ?? $candidateSize),
              'bw_charge_bytes' => (int)($r['raw']['filesize'] ?? $candidateSize),
              'direct_url' => $pub,
              'owner' => (string)($id['secureid'] ?? ''),
            ]);
            json_out([
              'ok' => true,
              'job_id' => $jobId,
              'status' => 'ready',
              'status_code' => '1',
              'origin_url' => $submittedLink,
              'host' => $hostCanon,
              'direct_url' => $pub,
              'note' => 'leechmng returned directlink; bypassed host BW limit (public only)',
            ]);
          }
        }
      }

      @legacy_dat_upsert_reject([
        'url' => $effectiveLink,
        'host' => $hostCanon,
        'status' => $leechStatus,
        'filename' => (string)($pre['filename'] ?? ''),
        'size_text' => (string)($pre['filesize_text'] ?? ''),
        'size_bytes' => (int)$candidateSize,
        'bw_charge_bytes' => (int)$candidateSize,
        'direct_url' => '',
        'owner' => (string)($id['secureid'] ?? ''),
      ]);

      $retryAfter = (today_start_ts() + 86400) - time();
      if ($retryAfter < 1) $retryAfter = 60;
      
        // Ledger: host-limit (status 6) should NOT reserve/count BW at submit time.
        // If later promoted to DONE (status 1) via query_status, it will count BW for submit day.
        try {
          $t = traffic_db();
          $dayKeyX = today_key();
          $normX = normalize_url_for_uniqueness($submittedLink);
          $bytesX = (int)$candidateSize;
          if ($bytesX < 0) $bytesX = 0;
          if ($hostCanon !== '' && $normX !== '') {
            traffic_ledger_mark_reject($t, $dayKeyX, $hostCanon, $normX, '6', 'rejected', $bytesX, $jobId);
            traffic_daily_rebuild_from_ledger($t, $dayKeyX, $hostCanon);
            traffic_daily_rebuild_account_from_ledger($t, $dayKeyX);
          }
        } catch (Throwable $e) {
          // ignore
        }

json_error(429, 18, 'Hoster limit reached (host bandwidth limit exceeded)', 'HOST_BANDWIDTH_LIMIT_REACHED', [
        'origin_url' => $submittedLink,
        'host' => $hostCanon,
        'job_id' => $jobId,
        'status_code' => (string)$leechStatus,
        'limit_type' => 'bytes',
        'scope' => 'host',
        'quota' => [
          'host_daily_mb' => $hostDailyMb,
          'host_overlimit_mb' => $hostOverMb,
          'host_used_today_mb' => bytes_to_mb($hostUsedBytes),
          'host_used_next_mb' => bytes_to_mb($hostUsedNextBytes),
        ],
        'hint' => 'Host bandwidth limit reached. Please retry later.',
      ], $retryAfter);
    }
  }

  $link = $submittedLink;

  $jobId = substr(hash('sha256', $link . '|' . microtime(true)), 0, 16);
  $now = time();

  // Store canonical host (groups aliases like rg.to/rapidgator.com -> rapidgator.net)
  $hostCanon = canonical_host_from_url($effectiveLink);

  $mngHash = md5($effectiveLink . API_SELF_URL . ':' . $jobId);
  $legacyHash = md5($effectiveLink . API_SELF_URL);
  $normUrl = normalize_url_for_uniqueness(normalize_url($effectiveLink));
  if ($normUrl === '') $normUrl = normalize_url_for_uniqueness($effectiveLink);


  
  // === TRAFFIC_ENFORCE_V1 ===
  // Prefer traffic.sqlite (merged ledger: API + legacy sync) for quota enforcement.
  // Fallback to legacy api_main sqlite scan on errors.
  try {
    $t = traffic_db();

    $dbg2 = debug_enabled();
    if ($dbg2) {
      if (!is_dir(API_DATA_DIR . '/logs')) @mkdir(API_DATA_DIR . '/logs', 0775, true);
      @file_put_contents(API_DATA_DIR . '/logs/traffic_enforce_dbg.log', date('c')." enter_enforce\n", FILE_APPEND);
    }

    // Pull recent legacy updates (throttle 2s inside)
    @traffic_sync_from_legacy_sqlite($t, 7, 500, 200);

    $dayKeyT = today_key();
    // Candidate identity (unique URL/day)
    // IMPORTANT: derive from already-computed $normUrl/$pre to avoid ordering bugs.
    $candNorm = $normUrl;
    $candBytes = (int)($pre['filesize_bytes'] ?? 0);
    if ($candBytes < 0) $candBytes = 0;

    $already = false;
    if ($candNorm !== '' && $hostCanon !== '') {
      $already = traffic_is_seen_today($t, $dayKeyT, $hostCanon, $candNorm);
    }

    // If already counted today (unique URL/day), do not block by BW limits (no extra charge).
    if ($already) {
      // Allow submit (still rate-limited elsewhere).
    }

    // Account bytes
    $accUsedBytes = traffic_get_used_bytes_today($t, $dayKeyT, '');
    $accUsedNextBytes = $accUsedBytes + (!$already ? $candBytes : 0);

    $accWallet = traffic_account_wallet_state($t, $cfg, $dayKeyT);
    $accLimitBytes = (int)($accWallet['cap_bytes'] ?? 0);

    if ($accLimitBytes > 0 && $accUsedNextBytes > $accLimitBytes) {
      $leechStatus = 6;
      $jobId = substr(hash('sha256', $submittedLink . '|' . microtime(true)), 0, 16);
      $now = time();
      $mngHash = md5($effectiveLink . API_SELF_URL . ':' . $jobId);
      $pdo->prepare('INSERT INTO jobs(job_id, origin_url, created_at, updated_at, status, status_code, host, filename, size_text, size_bytes, mng_hash) VALUES(?,?,?,?,?,?,?,?,?,?,?)')
        ->execute([
          $jobId, $submittedLink, $now, $now,
          'rejected', (string)$leechStatus, $hostCanon,
          ($pre['filename'] ?? null) ?: null,
          ($pre['filesize_text'] ?? null) ?: null,
          $candBytes ?: null,
          $mngHash,
        ]);
      api_jobs_set_legacy_hash($pdo, (string)$jobId, (string)$effectiveLink);

      // Push reject status to leechmng/getlink.db for tracking (best-effort)
      try {
        $endpointLm = leechmng_endpoint_for_host($hostCanon);
        $passleech = (string)($rt['passleech'] ?? '');
        $id = load_account_identity();
        if ($endpointLm && $passleech) {
          $post = [
            'user' => API_SELF_URL,
            'email' => $id['email'],
            'mobile' => $id['mobile'],
            'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
            'url' => $effectiveLink,
            'md5' => md5($effectiveLink . API_SELF_URL),
            'host' => $hostCanon,
            'ip' => get_client_ip(),
            'isp' => '',
            'status' => $leechStatus,
            'filename' => (string)($pre['filename'] ?? ''),
            'filesize' => (string)$candBytes,
          ];
          $rLm = leechmng_savelink($endpointLm, $passleech, $post);
          leechmng_log_event(['kind'=>'reject_bw_acc_traffic','job_id'=>$jobId,'status'=>$leechStatus,'endpoint'=>$endpointLm,'ok'=>$rLm['ok']??false,'http_code'=>$rLm['http_code']??null,'resp'=>$rLm]);
        }
      } catch (Throwable $e) {
        // ignore
      }


      @legacy_dat_upsert_reject([
        'url' => $effectiveLink,
        'host' => $hostCanon,
        'status' => $leechStatus,
        'filename' => (string)($pre['filename'] ?? ''),
        'size_text' => (string)($pre['filesize_text'] ?? ''),
        'size_bytes' => (int)$candBytes,
        'bw_charge_bytes' => (int)$candBytes,
        'direct_url' => '',
        'owner' => (string)((load_account_identity()['secureid'] ?? '') ?: ''),
      ]);

      $retryAfter = (today_start_ts() + 86400) - time();
      if ($retryAfter < 1) $retryAfter = 60;
      json_error(429, 23, 'Traffic exhausted (account bandwidth limit exceeded)', 'ACCOUNT_BANDWIDTH_LIMIT_REACHED', [
        'origin_url' => $submittedLink,
        'job_id' => $jobId,
        'status_code' => (string)$leechStatus,
        'limit_type' => 'bytes',
        'scope' => 'account',
        'quota' => [
          'acc_cap_bytes' => $accLimitBytes,
          'acc_used_today_mb' => bytes_to_mb($accUsedBytes),
          'acc_used_next_mb' => bytes_to_mb($accUsedNextBytes),
        ],
      ], $retryAfter);
    }

    // Host bytes
    $hostCfg = $cfg['hosts'][$hostCanon] ?? ($cfg['hosts']['other'] ?? []);
    $hostDailyMb = (int)($hostCfg['daily_bw_mb'] ?? 0);
    $hostOverMb = (int)($hostCfg['overlimit_bw_mb'] ?? 0);
    if ($hostDailyMb > 0) {
      $hostUsedBytes = traffic_get_used_bytes_today($t, $dayKeyT, $hostCanon);
      $hostUsedNextBytes = $hostUsedBytes + (!$already ? $candBytes : 0);

      $hostDailyBytes = $hostDailyMb * 1024 * 1024;
      $hostHardBytes = $hostOverMb > 0 ? $hostOverMb * 1024 * 1024 : $hostDailyBytes;

      // Boss semantics: overlimit_bw_mb is HARD max/day (total), not extra.
      // Allow leech if used_next <= hard_max; if used_next crosses daily soft cap, record debt for tomorrow.
      if ($hostHardBytes > 0 && $hostUsedNextBytes > $hostHardBytes) {
        $leechStatus = 6;
        $jobId = substr(hash('sha256', $submittedLink . '|' . microtime(true)), 0, 16);
        $now = time();
        $mngHash = md5($effectiveLink . API_SELF_URL . ':' . $jobId);
        $pdo->prepare('INSERT INTO jobs(job_id, origin_url, created_at, updated_at, status, status_code, host, filename, size_text, size_bytes, mng_hash) VALUES(?,?,?,?,?,?,?,?,?,?,?)')
          ->execute([
            $jobId, $submittedLink, $now, $now,
            'rejected', (string)$leechStatus, $hostCanon,
            ($pre['filename'] ?? null) ?: null,
            ($pre['filesize_text'] ?? null) ?: null,
            $candBytes ?: null,
            $mngHash,
          ]);



        // DB-first cache-bypass for reject status=6 (Boss: DB first, verify PVE alive, then convert to public).
        // If hit, still push leechmng in background for tracking.
        try {
          $dbChk = leechmng_local_db_lookup_by_link($hostCanon, $submittedLink);
          if (empty($dbChk['direct_url'])) $dbChk = leechmng_local_db_lookup_by_link($hostCanon, $effectiveLink);
          if (empty($dbChk['direct_url'])) {
            $legacyHash2 = md5($effectiveLink . API_SELF_URL);
            $dbChk = leechmng_local_db_lookup_fuzzy($hostCanon, $legacyHash2, 5);
          }

          if (!empty($dbChk['direct_url'])) {
            $dl2 = (string)$dbChk['direct_url'];
            // Verify PVE/internal link alive before conversion (timeout => dead)
            if (is_internal_directlink($dl2) && preg_match('~^https?://pve~i', $dl2)) {
              $pv = verify_url_alive($dl2, 3);
              if (empty($pv['alive'])) {
                throw new Exception('pve dead');
              }
            }

            // Store internal
            $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_pve=?, direct_url_updated_at=? WHERE job_id=?')
              ->execute([$dl2, (preg_match('~^https?://pve~i', $dl2) ? $dl2 : null), time(), $jobId]);

            $pub2 = null;
            if (is_public_directlink($dl2)) {
              $pub2 = $dl2;
            } elseif (is_internal_directlink($dl2)) {
              $conv2 = convert_internal_to_public($dl2);
              if (($conv2['ok'] ?? false) && !empty($conv2['public'])) $pub2 = (string)$conv2['public'];
            }

            if ($pub2) {
              $now3 = time();
              $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
                ->execute([$now3,'ready','1',$pub2,$now3,$pub2,$now3,'mng_db',$jobId]);

                  // Ledger: DONE should count final_bytes (reserve=0).
                  try {
                    $tL = traffic_db();
                    $usedBeforeDebt = traffic_get_used_bytes_today($tL, $dayKey, $hostCanon);
                    traffic_ledger_mark_done($tL, $dayKey, $hostCanon, $candNorm, (int)(($pre['filesize_bytes'] ?? 0) ?: $candBytes), $jobId);
                    traffic_daily_rebuild_from_ledger($tL, $dayKey, $hostCanon);
                    traffic_daily_rebuild_account_from_ledger($tL, $dayKey);
                    $usedAfterDebt = traffic_get_used_bytes_today($tL, $dayKey, $hostCanon);
                    traffic_record_debt_inc_after_done($tL, $dayKey, $hostCanon, (int)$usedBeforeDebt, (int)$usedAfterDebt);

                  } catch (Throwable $e) {
                    // ignore
                  }


              @legacy_dat_upsert_reject([
                'url' => $effectiveLink,
                'host' => $hostCanon,
                'status' => 1,
                'filename' => (string)($pre['filename'] ?? ''),
                'size_text' => (string)($pre['filesize_text'] ?? ''),
                'size_bytes' => (int)(($pre['filesize_bytes'] ?? 0) ?: $candBytes),
                'bw_charge_bytes' => (int)(($pre['filesize_bytes'] ?? 0) ?: $candBytes),
                'direct_url' => $pub2,
                'owner' => (string)((load_account_identity()['secureid'] ?? '') ?: ''),
              ]);

              // Option A: record traffic for unique URL/day (best-effort)
              try {
                $t3 = traffic_db();
                $bytesT2 = (int)(($pre['filesize_bytes'] ?? 0) ?: $candBytes);
                if ($bytesT2 < 0) $bytesT2 = 0;
                if ($bytesT2 > 0 && $candNorm !== '') {
                  traffic_record_seen($t3, [
                    'day_key' => today_key(),
                    'host' => $hostCanon,
                    'norm_url' => $candNorm,
                    'size_bytes' => $bytesT2,
                    'source' => 'api_submit',
                    'ref_id' => $jobId,
                  ]);
                }
              } catch (Throwable $e) {
                // ignore
              }

              // Still notify leechmng for tracking (best-effort)
              try {
                $endpointLm = leechmng_endpoint_for_host($hostCanon);
                $passleech = (string)($rt['passleech'] ?? '');
                $id = load_account_identity();
                if ($endpointLm && $passleech) {
                  $post = [
                    'user' => API_SELF_URL,
                    'email' => $id['email'],
                    'mobile' => $id['mobile'],
                    'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
                    'url' => $effectiveLink,
                    'md5' => md5($effectiveLink . API_SELF_URL),
                    'host' => $hostCanon,
                    'ip' => get_client_ip(),
                    'isp' => '',
                    'status' => $leechStatus,
                    'filename' => (string)($pre['filename'] ?? ''),
                    'filesize' => (string)$candBytes,
                  ];
                  $rLm2 = leechmng_savelink($endpointLm, $passleech, $post);
                  leechmng_log_event(['kind'=>'notify_reject_dbfirst','job_id'=>$jobId,'status'=>$leechStatus,'endpoint'=>$endpointLm,'ok'=>$rLm2['ok']??false,'http_code'=>$rLm2['http_code']??null,'resp'=>$rLm2]);
                }
              } catch (Throwable $e) {
                // ignore
              }

              json_out([
                'ok' => true,
                'job_id' => $jobId,
                'status' => 'ready',
                'status_code' => '1',
                'origin_url' => $submittedLink,
                'host' => $hostCanon,
                'direct_url' => $pub2,
                'direct_url_public' => $pub2,
                'note' => 'db-first bypass host BW limit (status=6)',
              ]);
            }
          }
        } catch (Throwable $e) {
          // ignore
        }
        // Push reject status to leechmng/getlink.db for tracking (best-effort)
        try {
          $endpointLm = leechmng_endpoint_for_host($hostCanon);
          $passleech = (string)($rt['passleech'] ?? '');
          $id = load_account_identity();
          if ($endpointLm && $passleech) {
            $post = [
              'user' => API_SELF_URL,
              'email' => $id['email'],
              'mobile' => $id['mobile'],
              'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
              'url' => $effectiveLink,
              'md5' => $mngHash,
              'host' => $hostCanon,
              'ip' => get_client_ip(),
              'isp' => '',
              'status' => $leechStatus,
              'filename' => (string)($pre['filename'] ?? ''),
              'filesize' => (string)$candBytes,
            ];
            $rLm = leechmng_savelink($endpointLm, $passleech, $post);
            leechmng_log_event(['kind'=>'reject_bw_host_traffic','job_id'=>$jobId,'status'=>$leechStatus,'endpoint'=>$endpointLm,'ok'=>$rLm['ok']??false,'http_code'=>$rLm['http_code']??null,'resp'=>$rLm]);

            // If leechmng already has a cached file (PVE), it may return a directlink even under host BW status=6.
            // Legacy behavior: allow returning public link to improve UX (account still has BW).
            $dl = leechmng_extract_directlink($rLm);
            // Verify PVE/internal link alive before conversion (timeout => dead)
            if ($dl && is_internal_directlink($dl) && preg_match('~^https?://pve~i', $dl)) {
              $pv = verify_url_alive($dl, 3);
              if (empty($pv['alive'])) $dl = '';
            }
            if ($dl) {
              // Store internal
              $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_updated_at=? WHERE job_id=?')
                ->execute([$dl, time(), $jobId]);

              $pub = null;
              if (is_public_directlink($dl)) {
                $pub = $dl;
              } elseif (is_internal_directlink($dl)) {
                $conv = convert_internal_to_public($dl);
                if (($conv['ok'] ?? false) && !empty($conv['public'])) $pub = (string)$conv['public'];
              }

              if ($pub) {
                $now2 = time();
                // Mark job ready
                $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
                  ->execute([$now2,'ready','1',$pub,$now2,$pub,$now2,'mng_http',$jobId]);

                // Update legacy UI .dat line (ready)
                @legacy_dat_upsert_reject([
                  'url' => $effectiveLink,
                  'host' => $hostCanon,
                  'status' => 1,
                  'filename' => (string)($rLm['raw']['filename'] ?? ($pre['filename'] ?? '')),
                  'size_text' => (string)($pre['filesize_text'] ?? ''),
                  'size_bytes' => (int)($rLm['raw']['filesize'] ?? $candBytes),
                  'bw_charge_bytes' => (int)($rLm['raw']['filesize'] ?? $candBytes),
                  'direct_url' => $pub,
                  'owner' => (string)((load_account_identity()['secureid'] ?? '') ?: ''),
                ]);

                // Option A: record traffic for unique URL/day (best-effort)
                try {
                  $t2 = traffic_db();
                  $bytesT = (int)($rLm['raw']['filesize'] ?? $candBytes);
                  if ($bytesT < 0) $bytesT = 0;
                  if ($bytesT > 0 && $candNorm !== '') {
                    traffic_record_seen($t2, [
                      'day_key' => today_key(),
                      'host' => $hostCanon,
                      'norm_url' => $candNorm,
                      'size_bytes' => $bytesT,
                      'source' => 'api_submit',
                      'ref_id' => $jobId,
                    ]);
                  }
                } catch (Throwable $e) {
                  // ignore
                }

                json_out([
                  'ok' => true,
                  'job_id' => $jobId,
                  'status' => 'ready',
                  'status_code' => '1',
                  'origin_url' => $submittedLink,
                  'host' => $hostCanon,
                  'direct_url' => $pub,
                  'direct_url_public' => $pub,
                  'note' => 'cache-hit bypass host BW limit (status=6)',
                ]);
              }
            }


            // DB cache-hit: lookup local getlink.db mirror for PVE link (leechmng may not return directlink immediately).
            try {
              // Prefer exact link match first (WHERE link=origin_url), then fallback to legacy hash fuzzy.
              $dbChk = leechmng_local_db_lookup_by_link($hostCanon, $submittedLink);
              if (empty($dbChk['direct_url'])) $dbChk = leechmng_local_db_lookup_by_link($hostCanon, $effectiveLink);
              if (empty($dbChk['direct_url'])) {
                $legacyHash2 = md5($effectiveLink . API_SELF_URL);
                $dbChk = leechmng_local_db_lookup_fuzzy($hostCanon, $legacyHash2, 5);
              }
              if (!empty($dbChk['direct_url'])) {
                $dl2 = (string)$dbChk['direct_url'];
                $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_pve=?, direct_url_updated_at=? WHERE job_id=?')
                  ->execute([$dl2, (preg_match('~^https?://pve~i', $dl2) ? $dl2 : null), time(), $jobId]);

                $pub2 = null;
                if (is_public_directlink($dl2)) {
                  $pub2 = $dl2;
                } elseif (is_internal_directlink($dl2)) {
                  $conv2 = convert_internal_to_public($dl2);
                  if (($conv2['ok'] ?? false) && !empty($conv2['public'])) $pub2 = (string)$conv2['public'];
                }

                if ($pub2) {
                  $now3 = time();
                  $pdo->prepare('UPDATE jobs SET updated_at=?, status=?, status_code=?, direct_url_public=?, direct_url_public_updated_at=?, direct_url_latest=?, direct_url_updated_at=?, direct_url_source=? WHERE job_id=?')
                    ->execute([$now3,'ready','1',$pub2,$now3,$pub2,$now3,'mng_db',$jobId]);

                  @legacy_dat_upsert_reject([
                    'url' => $effectiveLink,
                    'host' => $hostCanon,
                    'status' => 1,
                    'filename' => (string)($pre['filename'] ?? ''),
                    'size_text' => (string)($pre['filesize_text'] ?? ''),
                    'size_bytes' => (int)(($pre['filesize_bytes'] ?? 0) ?: $candBytes),
                    'bw_charge_bytes' => (int)(($pre['filesize_bytes'] ?? 0) ?: $candBytes),
                    'direct_url' => $pub2,
                    'owner' => (string)((load_account_identity()['secureid'] ?? '') ?: ''),
                  ]);

                  // Option A: record traffic for unique URL/day (best-effort)
                  try {
                    $t3 = traffic_db();
                    $bytesT2 = (int)(($pre['filesize_bytes'] ?? 0) ?: $candBytes);
                    if ($bytesT2 < 0) $bytesT2 = 0;
                    if ($bytesT2 > 0 && $candNorm !== '') {
                      traffic_record_seen($t3, [
                        'day_key' => today_key(),
                        'host' => $hostCanon,
                        'norm_url' => $candNorm,
                        'size_bytes' => $bytesT2,
                        'source' => 'api_submit',
                        'ref_id' => $jobId,
                      ]);
                    }
                  } catch (Throwable $e) {
                    // ignore
                  }

                                    // Debt (cached bypass over hard max): if this unique URL/day causes overage beyond daily soft cap,
                  // record debt to tomorrow even when hard max is exceeded (Boss policy: cached still allowed).
                  if (!$already && $candBytes > 0 && $hostDailyBytes > 0) {
                    $overBefore = $hostUsedBytes - $hostDailyBytes;
                    if ($overBefore < 0) $overBefore = 0;
                    $overAfter = ($hostUsedBytes + $candBytes) - $hostDailyBytes;
                    if ($overAfter < 0) $overAfter = 0;
                    $debtInc = $overAfter - $overBefore;
                    if ($debtInc < 0) $debtInc = 0;
                    if ($debtInc > 0) {
                      try {
                        $tomorrowKey = date('d.m.Y', strtotime('tomorrow'));
                        traffic_daily_adjust_add($t, $tomorrowKey, $hostCanon, (int)$debtInc, 0);
                      } catch (Throwable $e) {
                        // ignore
                      }
                    }
                  }

json_out([
                    'ok' => true,
                    'job_id' => $jobId,
                    'status' => 'ready',
                    'status_code' => '1',
                    'origin_url' => $submittedLink,
                    'host' => $hostCanon,
                    'direct_url' => $pub2,
                    'direct_url_public' => $pub2,
                    'note' => 'db-hit bypass host BW limit (status=6)',
                  ]);
                }
              }
            } catch (Throwable $e) {
              // ignore
            }

          }
        } catch (Throwable $e) {
          // ignore
        }


        @legacy_dat_upsert_reject([
          'url' => $effectiveLink,
          'host' => $hostCanon,
          'status' => $leechStatus,
          'filename' => (string)($pre['filename'] ?? ''),
          'size_text' => (string)($pre['filesize_text'] ?? ''),
          'size_bytes' => (int)$candBytes,
          'bw_charge_bytes' => (int)$candBytes,
          'direct_url' => '',
          'owner' => (string)((load_account_identity()['secureid'] ?? '') ?: ''),
        ]);

        $retryAfter = (today_start_ts() + 86400) - time();
        if ($retryAfter < 1) $retryAfter = 60;
        json_error(429, 18, 'Hoster limit reached (host bandwidth limit exceeded)', 'HOST_BANDWIDTH_LIMIT_REACHED', [
          'origin_url' => $submittedLink,
          'host' => $hostCanon,
          'job_id' => $jobId,
          'status_code' => (string)$leechStatus,
          'limit_type' => 'bytes',
          'scope' => 'host',
          'quota' => [
            'host_daily_mb' => $hostDailyMb,
            'host_overlimit_mb' => $hostOverMb,
            'host_used_today_mb' => bytes_to_mb($hostUsedBytes),
            'host_used_next_mb' => bytes_to_mb($hostUsedNextBytes),
          ],
        ], $retryAfter);
      } else {
        // Under hard max: allow submit. If this submit crosses daily soft cap, record DEBT to tomorrow.
        // Debt is incremental to avoid double-charging when already over daily.
        if (!$already && $candBytes > 0 && $hostDailyBytes > 0) {
          $overBefore = $hostUsedBytes - $hostDailyBytes;
          if ($overBefore < 0) $overBefore = 0;
          $overAfter = $hostUsedNextBytes - $hostDailyBytes;
          if ($overAfter < 0) $overAfter = 0;
          $debtInc = $overAfter - $overBefore;
          if ($debtInc < 0) $debtInc = 0;
          if ($debtInc > 0) {
            try {
              $tomorrowKey = date('d.m.Y', strtotime('tomorrow'));
              traffic_daily_adjust_add($t, $tomorrowKey, $hostCanon, (int)$debtInc, 0);
            } catch (Throwable $e) {
              // ignore
            }
          }
        }
      }
    }

  } catch (Throwable $e) {
    if (debug_enabled()) {
      if (!is_dir(API_DATA_DIR . '/logs')) @mkdir(API_DATA_DIR . '/logs', 0775, true);
      @file_put_contents(API_DATA_DIR . '/logs/traffic_enforce_dbg.log', date('c')." enforce_exception: " . $e->getMessage() . "\n", FILE_APPEND);
    }
    // Fallback to old sqlite-scan enforcement below.
  }
  // === /TRAFFIC_ENFORCE_V1 ===

  $clientIp = get_client_ip();
  $id = load_account_identity();
  $owner = (string)($id['secureid'] ?? '');
  if ($owner === '') $owner = API_FALLBACK_PASS;

  $pdo->prepare('INSERT INTO jobs(job_id, origin_url, norm_url, created_at, updated_at, status, host, filename, size_text, size_bytes, mng_hash, legacy_hash, job_type, client_ip, client_network, owner, showfile, releech, speed) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)')
      ->execute([
        $jobId,
        $submittedLink,
        $normUrl ?: null,
        $now,
        $now,
        'submitted',
        $hostCanon ?: null,
        ($pre['filename'] ?? null) ?: null,
        ($pre['filesize_text'] ?? null) ?: null,
        ($pre['filesize_bytes'] ?? null) ?: null,
        $mngHash,
        $legacyHash,
        'leech',
        $clientIp,
        (string)($_COOKIE['network'] ?? ''),
        $owner,
        1,
        0,
        '0'
      ]);

  // Boss requirement: UI local should show immediately (queued)
  $id = load_account_identity();
  @legacy_dat_upsert_reject([
    'url' => $effectiveLink,
    'host' => $hostCanon,
    'status' => 0,
    'filename' => (string)($pre['filename'] ?? ''),
    'size_text' => (string)($pre['filesize_text'] ?? ''),
    'size_bytes' => (int)($pre['filesize_bytes'] ?? 0),
    'bw_charge_bytes' => (int)($pre['filesize_bytes'] ?? 0),
    'direct_url' => '',
    'owner' => (string)($id['secureid'] ?? ''),
  ]);

  // Policy C (Boss): run engine first; only if engine fails (or produces no visible job) then push via leechmng_savelink.
  // Rationale: engine/legacy leech already pushes to leechmng; calling savelink unconditionally causes duplicate records.



  // Option A (Boss): ALWAYS push to leechmng on submit (no dedupe needed).
  // Goal: every submitted link creates a new row in leechmng/getlink.db for tracking, even if cached already.
  try {
    $endpointLm = leechmng_endpoint_for_host($hostCanon);
    $passleech = (string)($rt['passleech'] ?? '');
    if ($endpointLm && $passleech) {
      // Per-submit hash so leechmng appends a new record even for the same URL.
      $fullHash = $mngHash;
      $post = [
        'user' => API_SELF_URL,
        'email' => $id['email'],
        'mobile' => $id['mobile'],
        'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
        'url' => $effectiveLink,
        'md5' => $fullHash,
        'host' => $hostCanon,
        'ip' => get_client_ip(),
        'isp' => '',
        // status=5 must never return public link, but still can be recorded.
        'status' => 0,
        'filename' => (string)($pre['filename'] ?? ''),
        'filesize' => (string)($pre['filesize_bytes'] ?? 0),
      ];
      $rLm = leechmng_savelink($endpointLm, $passleech, $post);
      leechmng_log_event(['kind'=>'submit_savelink_always_v2','job_id'=>$jobId,'status'=>0,'endpoint'=>$endpointLm,'ok'=>$rLm['ok']??false,'http_code'=>$rLm['http_code']??null,'resp'=>$rLm]);
      // Option 3B: create marker so engine can skip duplicate leechmng push for this submit (best-effort).
      try {
        $marksDir = API_DATA_DIR . '/mng_submit_marks';
        if (!is_dir($marksDir)) @mkdir($marksDir, 0775, true);
        $markPath = $marksDir . '/' . $jobId . '.ok';
        $tmpPath = $markPath . '.tmp';
        $mark = [
          'ts' => time(),
          'job_id' => $jobId,
          'host' => $hostCanon,
          'effectiveLink' => $effectiveLink,
          'mng_hash' => $mngHash,
          'api_self_url' => API_SELF_URL,
          'endpoint' => $endpointLm,
        ];
        @file_put_contents($tmpPath, json_encode($mark, JSON_UNESCAPED_SLASHES) . "
");
        @rename($tmpPath, $markPath);
        // lightweight cleanup: 1/200 chance
        if (mt_rand(1,200) === 1) {
          foreach (glob($marksDir . '/*.ok') as $f) {
            $mt = @filemtime($f);
            if ($mt && (time() - $mt > 7*24*60*60)) @unlink($f);
          }
        }
      } catch (Throwable $e2) {
        // ignore
      }

    }
  } catch (Throwable $e) {
    leechmng_log_event(['kind'=>'submit_savelink_always_v2_err','job_id'=>$jobId,'error'=>$e->getMessage()]);
  }
  // --- 1) Try engine snapshot first ---
  // IMPORTANT: doing this synchronously can block submit_link_get and cause timeouts.
  // We spawn a background PHP process to run engine->leech_api() and return immediately.
  $engineOk = false;
  $engineErr = null;

  $engineOk = true; // queued
  $engineErr = null;
  try {
    $php = PHP_BINARY ?: 'php';
    $snippet = '';
    // Ensure engine bootstrap sees correct self URL when running under CLI (otherwise defaults to http://127.0.0.1/).
    $snippet .= "define('GL_ACCOUNT_DIR', " . var_export(API_ACCOUNT_DIR, true) . ");";
    $snippet .= "\ndefine('GL_JOB_ID', " . var_export($jobId, true) . ");";
    $snippet .= "define('API_BASE_URL', " . var_export(API_BASE_URL, true) . ");";
    $snippet .= "\ndefine('API_SELF_URL', " . var_export(API_SELF_URL, true) . ");";
    $snippet .= "\nrequire_once '/var/www/html.main/mygl/api_core/engine/current/bootstrap.php';";
    $snippet .= "\n$" . "engine = getlink_api_engine();";
    $snippet .= "\nob_start();";
    // Use effectiveLink (normalized) so engine/leechmng keying matches legacy_hash
    $snippet .= "\n$" . "engine->leech_api(" . var_export($effectiveLink, true) . ");";
    $snippet .= "\nob_end_clean();";

    $cmd = 'nohup ' . escapeshellcmd($php) . ' -r ' . escapeshellarg($snippet) . ' >/dev/null 2>&1 &';
    @exec($cmd);
  } catch (Throwable $e) {
    // If spawning fails, we'll fall back to savelink/web below.
    $engineOk = false;
    $engineErr = $e->getMessage();
  }

  // --- 2) Fallback condition ---
  // IMPORTANT: engine itself already pushes to leechmng. Any additional savelink call would create duplicate records.
  // So we only fall back to savelink on *hard fail* (exception).
  $needSavelink = !$engineOk;

  // --- 3) Fallback path (Option 2): savelink first, if savelink unavailable/failed then web-submit ---
  // Goal: keep code leechmng unchanged and still avoid duplicate leechmng records.
  // Rule: after a hard engine fail, we do *at most one* of:
  //  - leechmng_savelink (preferred)
  //  - legacy UI web-submit (only if savelink can't run or fails)

  $lmDirect = null;
  $didSavelink = false;
  $savelinkOk = false;

  if ($needSavelink) {
    $endpointLm = leechmng_endpoint_for_host($hostCanon);
    $passleech = (string)($rt['passleech'] ?? '');

    if ($endpointLm && $passleech) {
      $didSavelink = true;
      // Use per-submit hash so leechmng appends a new record even for the same URL.
      $fullHash = $mngHash;
      $post = [
        'user' => API_SELF_URL,
        'email' => $id['email'],
        'mobile' => $id['mobile'],
        'acc_type' => $id['secureid'] ? $id['secureid'] : 'pre',
        'url' => $effectiveLink,
        'md5' => $fullHash,
        'host' => $hostCanon,
        'ip' => $_SERVER['REMOTE_ADDR'] ?? '127.0.0.1',
        'isp' => '',
        'status' => 0,
        'filename' => (string)($pre['filename'] ?? ''),
        'filesize' => (string)($pre['filesize_bytes'] ?? 0),
      ];
      $r = leechmng_savelink($endpointLm, $passleech, $post);
      $savelinkOk = !empty($r['ok']);
      $lmDirect = leechmng_extract_directlink($r);
      leechmng_log_event(['kind'=>'submit_ok_fallback_savelink','job_id'=>$jobId,'status'=>0,'endpoint'=>$endpointLm,'ok'=>$r['ok']??false,'http_code'=>$r['http_code']??null,'resp'=>$r,'engine_error'=>$engineErr]);

      if ($lmDirect) {
        $pdo->prepare('UPDATE jobs SET direct_url_internal=?, direct_url_updated_at=? WHERE job_id=?')
          ->execute([$lmDirect, time(), $jobId]);
      }
      if ($savelinkOk) $engineOk = true;
    }

    // If savelink cannot run or failed, fall back to legacy web-submit.
    if (!$savelinkOk) {
      ensure_logged_in();
      $wr = curl_request(API_BASE_URL . '?id=leech&rand=' . mt_rand(), 'POST', [
        'urllist' => $effectiveLink,
        'leech' => '1',
        'download' => 'leech',
        'svleech' => '',
      ]);
      leechmng_log_event(['kind'=>'submit_fallback_web','job_id'=>$jobId,'ok'=>$wr['ok']??false,'status'=>$wr['status']??0,'engine_error'=>$engineErr,'did_savelink'=>$didSavelink]);
      // Web submit doesn't guarantee immediate dlink; we still consider the submission accepted.
      $engineOk = true;
    }
  }

  $resp = [
    'ok'=>true,
    'job_id'=>$jobId,
    'status'=>'submitted',
    'created_at'=>$now,
    'origin_url'=>$submittedLink,
    'host'=>$hostCanon,
    'engine'=>$engineOk,
    'engine_error'=>$engineErr
  ];

  // Optional quota info
  if (!empty($_GET['include_quota'])) {
    $dayKey = today_key();
    $todayStart = today_start_ts();
    $hostCanonQuota = $hostCanon ?: canonical_host_from_url($effectiveLink);

    // Use list_links_today() as the single source of truth for unique URL/day + reserved size.
    $lstAll = list_links_today($pdo, [
      'todayStart' => $todayStart,
      'dayKey' => $dayKey,
      // include_quota is used by submit responses; keep it fast.
      'source' => 'sqlite',
      'host' => '',
      'limit' => 1,
      'offset' => 0,
      'debug' => false,
    ]);
    $lstHost = list_links_today($pdo, [
      'todayStart' => $todayStart,
      'dayKey' => $dayKey,
      // include_quota is used by submit responses; keep it fast.
      'source' => 'sqlite',
      'host' => $hostCanonQuota,
      'limit' => 1,
      'offset' => 0,
      'debug' => false,
    ]);

    $resp['quota'] = [
      'day_key' => $dayKey,
      'host' => $hostCanonQuota,
      'acc_links_today' => (int)($lstAll['counts']['acc_links_today'] ?? 0),
      'host_links_today' => (int)($lstHost['counts']['host_links_today'] ?? 0),
      'acc_used_today_mb' => $lstAll['counts']['acc_used_today_mb'] ?? 0,
      'host_used_today_mb' => $lstHost['counts']['host_used_today_mb'] ?? 0,
      'source' => [
        'links' => 'merged',
        'bw' => 'reserved_size',
      ],
    ];
  }



  // Traffic (RD-like): record reserved bytes per unique URL/day (best-effort; never fail submit)
  // IMPORTANT: do this AFTER all quota checks pass (avoid counting rejected submits).
  try {
    $t = traffic_db();
    $dayKey = today_key();
    $bytes = (int)($pre['filesize_bytes'] ?? 0);
    if ($bytes < 0) $bytes = 0;
    if ($normUrl !== '' && $hostCanon !== '' && $bytes > 0) {
      traffic_record_seen($t, [
        'day_key' => $dayKey,
        'host' => $hostCanon,
        'norm_url' => $normUrl,
        'size_bytes' => $bytes,
        'source' => 'api_submit',
        'ref_id' => $jobId,
      ]);
    }
  } catch (Throwable $e) {
    // ignore
  }
  json_out($resp);
}

if ($action === 'query_status' || $action === 'get_result') {
  $jobId = trim((string)($body['job_id'] ?? ($_GET['job_id'] ?? '')));
  if ($jobId === '') json_error(400, 1, 'Missing parameter (job_id)', 'BAD_REQUEST');

  $st = sync_from_files($pdo, $jobId, ['debug' => !empty($_GET['debug'])]);
  if (!$st['ok']) json_error(404, 7, (string)($st['error'] ?? 'Resource not found (job_id)'), 'NOT_FOUND');

  // If we already have a direct link (from leechmng fast-path or files), we may treat as DONE.
  if (!empty($st['direct_url'])) {
    // Read authoritative job status + created_at (submit day)
    $jr = $pdo->prepare('SELECT host,norm_url,size_bytes,status_code,created_at FROM jobs WHERE job_id=?');
    $jr->execute([$jobId]);
    $jrow = $jr->fetch(PDO::FETCH_ASSOC) ?: [];

    $jobStatus = (string)($jrow['status_code'] ?? '');

    // status=5 (ACCOUNT BW exhausted): never return public link
    if ($jobStatus === '5') {
      $st['status'] = $st['status'] ?? 'processing';
      $st['status_code'] = '5';
      unset($st['direct_url'], $st['direct_url_public'], $st['direct_url_latest'], $st['direct_url_internal'], $st['direct_url_pve']);
    } else {
      // status=6 (HOST BW exhausted) may be cache-hit; when link becomes available, promote to DONE (status=1)
      $st['status'] = 'ready';
      $st['status_code'] = '1';

      // Ledger: count BW for the day of submit (Boss rule)
      try {
        $host = (string)($jrow['host'] ?? '');
        $norm = (string)($jrow['norm_url'] ?? '');
        // fallback to origin_url if jobs.norm_url is missing
        if ($norm === '') $norm = normalize_url_for_uniqueness((string)($st['origin_url'] ?? ''));
        $sz = (int)($jrow['size_bytes'] ?? 0);
        if ($sz < 0) $sz = 0;
        $createdAt = (int)($jrow['created_at'] ?? 0);
        $submitDay = $createdAt > 0 ? day_key_for_ts($createdAt) : today_key();

        if ($host !== '' && $norm !== '' && $sz > 0) {
          $tL = traffic_db();
          $now2 = time();
          traffic_ledger_upsert($tL, [
            'day_key'=>$submitDay,
            'host'=>$host,
            'norm_url'=>$norm,
            'now'=>$now2,
            'size_known'=>1,
            'size_bytes'=>$sz,
            'reserve_bytes'=>0,
            'final_bytes'=>$sz,
            'status_code'=>'1',
            'status_code_num'=>1,
            'state'=>'done',
            'job_id_last'=>$jobId,
            'reserve_until_ts'=>0,
            'needs_reconcile'=>0,
          ]);
          traffic_daily_rebuild_from_ledger($tL, $submitDay, $host);
          traffic_daily_rebuild_account_from_ledger($tL, $submitDay);
        }
      } catch (Throwable $e) {
        // ignore
      }
    }
  }

  if ($action === 'get_result' && ($st['status'] ?? '') !== 'ready') {
    json_out(['ok'=>true,'job_id'=>$jobId,'status'=>$st['status'] ?? 'processing']);
  }

  json_out(array_merge(['ok'=>true,'job_id'=>$jobId], $st));
}

json_error(400, 3, 'Unknown method (action)', 'BAD_REQUEST');
