#!/bin/bash
#
# pgtwin-migrate - OCF Resource Agent for PostgreSQL Migration/Upgrade
# Version: 2.0.0
# Release Date: 2026-01-19
# Status: EXPERIMENTAL - Not for production use without thorough testing
#
# A Pacemaker OCF agent that orchestrates PostgreSQL cluster migrations via
# logical replication (major version upgrades, vendor migrations, hosting
# provider changes, etc.)
#
# Provides zero-downtime PostgreSQL migrations using logical replication between
# parallel clusters. Manages forward replication, reverse replication, and VIP
# migration.
#
# Author: pgtwin project
# License: GPL-2.0-or-later
#
# Usage:
#   pgtwin-migrate {start|stop|monitor|meta-data|validate-all}
#
# Requirements:
#   - Two parallel PostgreSQL clusters (source and target)
#   - Both clusters managed by pgtwin OCF agent
#   - wal_level=logical on both clusters
#   - Replication VIPs configured for both clusters
#   - Production VIP managed separately
#
# Architecture:
#   Source Cluster (e.g., PG17):
#     - Primary: Writes go here during migration
#     - Secondary: Has Source Replication VIP (for forward replication)
#
#   Target Cluster (e.g., PG18):
#     - Primary: Receives data via logical replication
#     - Secondary: Has Target Replication VIP (for reverse replication + testing)
#
# Migration Phases:
#   1. HOT SYNC: Forward logical replication (source → target)
#   2. QUIESCE: Set source to read-only
#   3. FINAL SYNC: Wait for lag=0
#   4. CUTOVER READY: Signal ready for VIP migration
#   5. VIP MIGRATION: Pacemaker moves production VIP
#   6. REVERSE REPLICATION: Setup reverse replication (target → source)
#
#######################################################################
# Initialization:

: ${OCF_FUNCTIONS_DIR=${OCF_ROOT}/lib/heartbeat}
. ${OCF_FUNCTIONS_DIR}/ocf-shellfuncs

#######################################################################
# Defaults

OCF_RESKEY_source_cluster_default="postgres-clone"
OCF_RESKEY_target_cluster_default="postgres-clone-18"
OCF_RESKEY_source_replication_vip_default=""
OCF_RESKEY_target_replication_vip_default=""
OCF_RESKEY_production_vip_resource_default="postgres-vip"
OCF_RESKEY_target_vip_resource_default="postgres-vip-18"
OCF_RESKEY_original_production_vip_ip_default=""  # Original IP of production VIP (for rollback)
OCF_RESKEY_original_target_vip_ip_default=""  # Original IP of target VIP (for rollback)
OCF_RESKEY_publication_name_default="pgtwin_migrate_forward_pub"
OCF_RESKEY_subscription_name_default="pgtwin_migrate_forward_sub"
OCF_RESKEY_replication_slot_name_default="pgtwin_migrate_forward_slot"
OCF_RESKEY_reverse_publication_name_default="pgtwin_migrate_reverse_pub"
OCF_RESKEY_reverse_subscription_name_default="pgtwin_migrate_reverse_sub"
OCF_RESKEY_reverse_replication_slot_name_default="pgtwin_migrate_reverse_slot"
OCF_RESKEY_lag_threshold_default="1024"  # 1KB
OCF_RESKEY_pgport_default="5432"
OCF_RESKEY_migration_dbuser_default="pgmigrate"  # Database superuser for migration replication
OCF_RESKEY_pgdatabase_default="postgres"  # PostgreSQL database name (legacy, use databases instead)
OCF_RESKEY_databases_default=""  # Comma-separated list of databases to migrate (e.g., "postgres,myapp_prod,analytics")
OCF_RESKEY_source_pghome_default=""  # Will be discovered from source postgres primitive
OCF_RESKEY_target_pghome_default=""  # Will be discovered from target postgres primitive
OCF_RESKEY_source_pgpassfile_default=""  # Will be discovered from source postgres primitive
OCF_RESKEY_target_pgpassfile_default=""  # Will be discovered from target postgres primitive
OCF_RESKEY_source_node_role_default="Promoted"  # Promoted or Unpromoted (PostgreSQL 16+ supports Unpromoted for publishing)
OCF_RESKEY_production_cluster_default=""  # Which cluster is production (empty=source_cluster). Change this to trigger cutover.
OCF_RESKEY_finalize_replication_default="false"  # When true, stop action cleans up all replication infrastructure
OCF_RESKEY_cutover_debug_default="false"  # Enable bash tracing (set -x) in async cutover for debugging
OCF_RESKEY_stability_timeout_default="300"  # Max seconds to wait for cluster stability before cutover

: ${OCF_RESKEY_source_cluster=${OCF_RESKEY_source_cluster_default}}
: ${OCF_RESKEY_target_cluster=${OCF_RESKEY_target_cluster_default}}
: ${OCF_RESKEY_source_replication_vip=${OCF_RESKEY_source_replication_vip_default}}
: ${OCF_RESKEY_target_replication_vip=${OCF_RESKEY_target_replication_vip_default}}
: ${OCF_RESKEY_production_vip_resource=${OCF_RESKEY_production_vip_resource_default}}
: ${OCF_RESKEY_target_vip_resource=${OCF_RESKEY_target_vip_resource_default}}
: ${OCF_RESKEY_original_production_vip_ip=${OCF_RESKEY_original_production_vip_ip_default}}
: ${OCF_RESKEY_original_target_vip_ip=${OCF_RESKEY_original_target_vip_ip_default}}
: ${OCF_RESKEY_publication_name=${OCF_RESKEY_publication_name_default}}
: ${OCF_RESKEY_subscription_name=${OCF_RESKEY_subscription_name_default}}
: ${OCF_RESKEY_replication_slot_name=${OCF_RESKEY_replication_slot_name_default}}
: ${OCF_RESKEY_reverse_publication_name=${OCF_RESKEY_reverse_publication_name_default}}
: ${OCF_RESKEY_reverse_subscription_name=${OCF_RESKEY_reverse_subscription_name_default}}
: ${OCF_RESKEY_reverse_replication_slot_name=${OCF_RESKEY_reverse_replication_slot_name_default}}
: ${OCF_RESKEY_lag_threshold=${OCF_RESKEY_lag_threshold_default}}
: ${OCF_RESKEY_pgport=${OCF_RESKEY_pgport_default}}
: ${OCF_RESKEY_migration_dbuser=${OCF_RESKEY_migration_dbuser_default}}
: ${OCF_RESKEY_pgdatabase=${OCF_RESKEY_pgdatabase_default}}
: ${OCF_RESKEY_databases=${OCF_RESKEY_databases_default}}
: ${OCF_RESKEY_source_pghome=${OCF_RESKEY_source_pghome_default}}
: ${OCF_RESKEY_target_pghome=${OCF_RESKEY_target_pghome_default}}
: ${OCF_RESKEY_source_pgpassfile=${OCF_RESKEY_source_pgpassfile_default}}
: ${OCF_RESKEY_target_pgpassfile=${OCF_RESKEY_target_pgpassfile_default}}
: ${OCF_RESKEY_source_node_role=${OCF_RESKEY_source_node_role_default}}
: ${OCF_RESKEY_production_cluster=${OCF_RESKEY_production_cluster_default}}
: ${OCF_RESKEY_finalize_replication=${OCF_RESKEY_finalize_replication_default}}
: ${OCF_RESKEY_cutover_debug=${OCF_RESKEY_cutover_debug_default}}
: ${OCF_RESKEY_stability_timeout=${OCF_RESKEY_stability_timeout_default}}

# Set default production_cluster to source_cluster if not specified
if [ -z "$OCF_RESKEY_production_cluster" ]; then
    OCF_RESKEY_production_cluster="$OCF_RESKEY_source_cluster"
fi

#######################################################################
# Database List Initialization
#
# v2.0: If databases parameter is empty, databases are discovered dynamically
# at runtime from the production cluster. This enables automatic tracking
# of new databases without CRM configuration changes.
#
# Priority: databases > pgdatabase > auto-discovery

# Note: DB_LIST and DB_COUNT are set dynamically by get_database_list()
# These variables are initialized here for backward compatibility but
# should be refreshed before each operation using get_database_list()
if [ -n "$OCF_RESKEY_databases" ]; then
    # Explicit parameter: comma-separated list
    DB_LIST=$(echo "$OCF_RESKEY_databases" | tr ',' ' ')
    DB_COUNT=$(echo "$DB_LIST" | wc -w)
elif [ -n "$OCF_RESKEY_pgdatabase" ] && [ "$OCF_RESKEY_pgdatabase" != "postgres" ]; then
    # Legacy parameter (backward compatibility)
    DB_LIST="$OCF_RESKEY_pgdatabase"
    DB_COUNT=1
else
    # Will be discovered dynamically - set empty placeholder
    DB_LIST=""
    DB_COUNT=0
fi

#######################################################################
# Functions

pgtwin_migrate_meta_data() {
    cat <<END
<?xml version="1.0"?>
<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
<resource-agent name="pgtwin-migrate" version="1.0">
<version>1.0</version>

<longdesc lang="en">
OCF Resource Agent for PostgreSQL Migration/Upgrade

This agent orchestrates zero-downtime PostgreSQL major version upgrades and
migrations using logical replication between two parallel PostgreSQL clusters.

Key Features:
- Zero-downtime major version upgrades (e.g., PostgreSQL 17 → 18)
- Vendor migrations (e.g., EDB → Community PostgreSQL)
- Hosting provider migrations (e.g., AWS → Azure)
- Infrastructure upgrades (e.g., old hardware → new hardware)

Migration Process:
1. HOT SYNC: Establishes logical replication from source to target
2. Monitor replication lag until threshold met
3. QUIESCE: Sets source database to read-only mode
4. FINAL SYNC: Waits for complete synchronization (lag=0)
5. CUTOVER READY: Signals readiness for VIP migration
6. VIP MIGRATION: Production VIP moves from source to target (Pacemaker-managed)
7. REVERSE REPLICATION: Establishes reverse replication for rollback capability

Requirements:
- Both clusters must have wal_level=logical
- Replication VIPs must be configured on secondary nodes
- Production VIP managed as separate resource
- Replication user with appropriate permissions
</longdesc>

<shortdesc lang="en">PostgreSQL Migration/Upgrade Agent</shortdesc>

<parameters>

<parameter name="source_cluster" required="1" unique="0">
<longdesc lang="en">
Source PostgreSQL cluster resource name (promotable clone).
Example: "postgres-clone" for PostgreSQL 17 cluster.
</longdesc>
<shortdesc lang="en">Source cluster resource name</shortdesc>
<content type="string" default="${OCF_RESKEY_source_cluster_default}" />
</parameter>

<parameter name="target_cluster" required="1" unique="0">
<longdesc lang="en">
Target PostgreSQL cluster resource name (promotable clone).
Example: "postgres-clone-18" for PostgreSQL 18 cluster.
</longdesc>
<shortdesc lang="en">Target cluster resource name</shortdesc>
<content type="string" default="${OCF_RESKEY_target_cluster_default}" />
</parameter>

<parameter name="source_replication_vip" required="1" unique="0">
<longdesc lang="en">
IP address of the Source Replication VIP.
This VIP should be colocated with the source cluster's unpromoted (secondary) node.
Used for forward logical replication (target subscribes from this VIP).

Example: "192.168.60.102"
</longdesc>
<shortdesc lang="en">Source Replication VIP address</shortdesc>
<content type="string" default="${OCF_RESKEY_source_replication_vip_default}" />
</parameter>

<parameter name="target_replication_vip" required="1" unique="0">
<longdesc lang="en">
IP address of the Target Replication VIP.
This VIP should be colocated with the target cluster's unpromoted (secondary) node.
Used for reverse logical replication (source subscribes from this VIP).
Also serves as testing VIP for pre-cutover validation.

Example: "192.168.60.103"
</longdesc>
<shortdesc lang="en">Target Replication VIP address</shortdesc>
<content type="string" default="${OCF_RESKEY_target_replication_vip_default}" />
</parameter>

<parameter name="production_vip_resource" required="0" unique="0">
<longdesc lang="en">
Production VIP resource name used by applications.

This is the VIP that applications connect to (e.g., "postgres-vip").
During cutover, this VIP will be AUTOMATICALLY migrated from the
source cluster to the target cluster.

AUTOMATIC VIP MIGRATION (v2.0+):
When cutover_ready=true, the agent will:
1. Prepare reverse replication (steps 1-7)
2. Verify target cluster can accept writes (step 8)
3. Automatically migrate this VIP to target cluster (step 9)
4. Signal CUTOVER_COMPLETE when done (step 10)

The VIP cutover happens via IP address swap with target_vip_resource:
- Source cluster VIP keeps its colocation but gets target's IP address
- Target cluster VIP keeps its colocation but gets source's IP address
- Applications continue using same production IP, now served by target cluster
- No constraint changes needed

Example: "postgres-vip"

If this parameter is empty or not set, automatic VIP migration
is SKIPPED and admin must migrate VIP manually.
</longdesc>
<shortdesc lang="en">Production VIP resource name</shortdesc>
<content type="string" default="${OCF_RESKEY_production_vip_resource_default}" />
</parameter>

<parameter name="target_vip_resource" required="0" unique="0">
<longdesc lang="en">
Name of the target cluster VIP IPaddr2 resource (e.g., postgres-vip-18).

During cutover (Step 9), this resource's IP address will be swapped with
the production_vip_resource. After cutover, this resource will have the
production IP and serve application traffic.

Example: "postgres-vip-18"

If not set, defaults to "postgres-vip-18".
</longdesc>
<shortdesc lang="en">Target cluster VIP resource name</shortdesc>
<content type="string" default="${OCF_RESKEY_target_vip_resource_default}" />
</parameter>

<parameter name="original_production_vip_ip" required="0" unique="0">
<longdesc lang="en">
Original IP address of the production VIP resource before migration.

This IP is used during rollback to restore the production VIP to its
original state. Without this parameter, rollback cannot reliably restore
VIP configuration after a failed cutover.

REQUIRED for safe rollback. If not set, rollback will attempt to toggle
the VIP swap which may not restore the correct state.

Example: "192.168.60.100"
</longdesc>
<shortdesc lang="en">Original production VIP IP address</shortdesc>
<content type="string" default="" />
</parameter>

<parameter name="original_target_vip_ip" required="0" unique="0">
<longdesc lang="en">
Original IP address of the target VIP resource before migration.

This IP is used during rollback to restore the target VIP to its
original state. Without this parameter, rollback cannot reliably restore
VIP configuration after a failed cutover.

REQUIRED for safe rollback. If not set, rollback will attempt to toggle
the VIP swap which may not restore the correct state.

Example: "192.168.60.101"
</longdesc>
<shortdesc lang="en">Original target VIP IP address</shortdesc>
<content type="string" default="" />
</parameter>

<parameter name="publication_name" required="0" unique="0">
<longdesc lang="en">
Logical replication publication name.
Used for both forward and reverse replication.
</longdesc>
<shortdesc lang="en">Publication name</shortdesc>
<content type="string" default="${OCF_RESKEY_publication_name_default}" />
</parameter>

<parameter name="subscription_name" required="0" unique="0">
<longdesc lang="en">
Logical replication subscription name.
Used for both forward and reverse replication.
</longdesc>
<shortdesc lang="en">Subscription name</shortdesc>
<content type="string" default="${OCF_RESKEY_subscription_name_default}" />
</parameter>

<parameter name="replication_slot_name" required="0" unique="0">
<longdesc lang="en">
Logical replication slot name.
Used for both forward and reverse replication.
</longdesc>
<shortdesc lang="en">Replication slot name</shortdesc>
<content type="string" default="${OCF_RESKEY_replication_slot_name_default}" />
</parameter>

<parameter name="lag_threshold" required="0" unique="0">
<longdesc lang="en">
Replication lag threshold in bytes before allowing cutover.
Migration will not proceed to QUIESCE phase until lag is below this threshold.
Default: 1024 bytes (1KB)
</longdesc>
<shortdesc lang="en">Lag threshold (bytes)</shortdesc>
<content type="integer" default="${OCF_RESKEY_lag_threshold_default}" />
</parameter>

<parameter name="pgport" required="0" unique="0">
<longdesc lang="en">
PostgreSQL port number.
Must match the port used by both source and target clusters.
Default: 5432
</longdesc>
<shortdesc lang="en">PostgreSQL port</shortdesc>
<content type="integer" default="${OCF_RESKEY_pgport_default}" />
</parameter>

<parameter name="migration_dbuser" required="0" unique="0">
<longdesc lang="en">
PostgreSQL database superuser for migration replication management.
This is a DATABASE USER (not a Unix user) that must have SUPERUSER privileges.

Required for:
- Creating publications and subscriptions (logical replication)
- Creating DDL triggers for schema replication
- Overriding read-only mode during cutover preparation

Note: All commands run as Unix user 'postgres', but authenticate to PostgreSQL
as this database user.

Default: pgmigrate
</longdesc>
<shortdesc lang="en">PostgreSQL database superuser for migration</shortdesc>
<content type="string" default="${OCF_RESKEY_migration_dbuser_default}" />
</parameter>

<parameter name="databases" required="0" unique="0">
<longdesc lang="en">
Comma-separated list of PostgreSQL databases to migrate.

Examples:
- Single database:  "postgres"
- Multiple databases: "postgres,myapp_prod,analytics,reporting"

All specified databases will:
- Have logical replication configured (publications/subscriptions)
- Be monitored for replication lag
- Be quiesced together during cutover (read-only mode)
- Cutover atomically with a single VIP swap
- Have reverse replication enabled post-cutover

Important:
- All databases must exist on BOTH source and target clusters before starting migration
- Database list cannot be changed during active migration (validated before cutover)
- For new databases appearing mid-migration, either:
  1. Stop migration, update list, restart fresh, OR
  2. Complete current migration, then migrate new databases separately

Backward Compatibility:
If not specified, defaults to 'postgres' for single-database migration.
This parameter replaces the legacy 'pgdatabase' parameter.

Default: "postgres" (single database migration)
</longdesc>
<shortdesc lang="en">Databases to migrate (comma-separated)</shortdesc>
<content type="string" default="${OCF_RESKEY_databases_default}" />
</parameter>

<parameter name="source_pghome" required="0" unique="0">
<longdesc lang="en">
PostgreSQL home directory for source cluster.
If not specified, will be automatically discovered from the source postgres primitive configuration
by extracting the parent directory of pgdata parameter.

Example: "/var/lib/pgsql"

The agent will use this as HOME when running psql commands against the source cluster.
</longdesc>
<shortdesc lang="en">Source PostgreSQL home directory</shortdesc>
<content type="string" default="${OCF_RESKEY_source_pghome_default}" />
</parameter>

<parameter name="target_pghome" required="0" unique="0">
<longdesc lang="en">
PostgreSQL home directory for target cluster.
If not specified, will be automatically discovered from the target postgres primitive configuration
by extracting the parent directory of pgdata parameter.

Example: "/var/lib/pgsql"

The agent will use this as HOME when running psql commands against the target cluster.
</longdesc>
<shortdesc lang="en">Target PostgreSQL home directory</shortdesc>
<content type="string" default="${OCF_RESKEY_target_pghome_default}" />
</parameter>

<parameter name="source_pgpassfile" required="0" unique="0">
<longdesc lang="en">
PostgreSQL password file for source cluster authentication.
If not specified, will be automatically discovered from the source postgres primitive configuration,
or will default to $source_pghome/.pgpass.

Example: "/var/lib/pgsql/.pgpass"

The agent will use PGPASSFILE environment variable when running psql commands against the source cluster.
This file must contain credentials for the postgres superuser to connect to source cluster nodes.
</longdesc>
<shortdesc lang="en">Source PostgreSQL password file</shortdesc>
<content type="string" default="${OCF_RESKEY_source_pgpassfile_default}" />
</parameter>

<parameter name="target_pgpassfile" required="0" unique="0">
<longdesc lang="en">
PostgreSQL password file for target cluster authentication.
If not specified, will be automatically discovered from the target postgres primitive configuration,
or will default to $target_pghome/.pgpass.

Example: "/var/lib/pgsql/.pgpass"

The agent will use PGPASSFILE environment variable when running psql commands against the target cluster.
This file must contain credentials for the postgres superuser to connect to target cluster nodes.
</longdesc>
<shortdesc lang="en">Target PostgreSQL password file</shortdesc>
<content type="string" default="${OCF_RESKEY_target_pgpassfile_default}" />
</parameter>

<parameter name="source_node_role" required="0" unique="0">
<longdesc lang="en">
Role of the source cluster node to use for logical replication publishing.

Options:
- "Promoted" (default): Use the PRIMARY node for logical replication
  - Compatible with all PostgreSQL versions
  - Zero lag, changes available immediately
  - Simpler setup, no special configuration needed
  - Recommended for initial deployment and PostgreSQL versions &lt; 16

- "Unpromoted": Use the STANDBY node for logical replication publishing
  - Requires PostgreSQL 16+ on source cluster
  - Offloads logical replication workload from production primary
  - Isolates migration traffic from production traffic
  - Requires hot_standby_feedback=on on standby
  - May have minimal replication lag (standby replay lag)
  - Recommended for large databases and high write loads
  - Automatically calls pg_log_standby_snapshot() to prevent "idle primary" hangs

IMPORTANT: The target cluster always uses the Promoted (primary) node because
subscriptions perform write operations. Only the source can use Unpromoted mode
since publishing is a read-only operation.

For PostgreSQL 15 and earlier, this MUST be "Promoted".
For PostgreSQL 16+, both options are supported.

Default: Promoted
</longdesc>
<shortdesc lang="en">Source node role (Promoted or Unpromoted)</shortdesc>
<content type="string" default="${OCF_RESKEY_source_node_role_default}" />
</parameter>

<parameter name="cutover_ready" required="0" unique="0">
<longdesc lang="en">
Signal to begin cutover preparation.

Values:
- "false" (default): Forward replication only
- "true": Trigger cutover preparation (promote operation)

When admin sets this to "true", pgtwin-migrate will FULLY AUTOMATICALLY
execute the complete cutover sequence:

1a. Quiesce source cluster (PG17 read-only mode)
1b. Quiesce target cluster (PG18 read-only mode)
2. Wait for forward replication lag = 0
3. Set up reverse replication (PG18 → PG17) using superuser override
4. Set up DDL trigger (PG18 → PG17) using superuser override
5. Verify reverse replication working
6. Unquiesce target cluster (PG18 read-write mode)
7. Test target cluster accepts writes (safety check)
8. AUTOMATICALLY migrate production VIP to target cluster
9. Signal "CUTOVER_COMPLETE" state

FULLY AUTOMATED: The admin ONLY needs to set cutover_ready=true.
Everything else happens automatically, including VIP migration.

TIMING: Set this parameter to "true" when you are ready for cutover.
pgtwin-migrate will execute as soon as conditions are met:
- Forward replication is healthy and active
- Both clusters are reachable
- Monitor cycle detects cutover_ready=true

DURATION: Complete cutover typically takes 2-5 minutes depending on
database size and replication lag.

IMPORTANT: During cutover (2-5 minutes), BOTH clusters will be
read-only to prevent data loss. Applications will be briefly unable
to write. After cutover completes, applications automatically connect
to the new cluster via the migrated VIP.

The migration_dbuser (pgmigrate) must have SUPERUSER privileges to
override read-only mode during setup operations.
</longdesc>
<shortdesc lang="en">Trigger cutover preparation</shortdesc>
<content type="boolean" default="${OCF_RESKEY_cutover_ready_default}" />
</parameter>

<parameter name="cutover_debug" required="0" unique="0">
<longdesc lang="en">
Enable verbose debug logging for async cutover process.

Values:
- "false" (default): Normal logging
- "true": Enable bash tracing (set -x) for detailed execution trace

When enabled, the async cutover process will:
- Enable bash set -x for line-by-line command tracing
- Log all variable expansions and command executions
- Provide maximum visibility into cutover execution
- Help troubleshoot complex failure scenarios

Debug output is written to /var/lib/pgsql/.cutover-YYYYMMDD-HHMMSS.log
A symlink /var/lib/pgsql/.cutover.log points to the latest log file.

Historical cutover logs are retained for troubleshooting and audit purposes.

WARNING: Debug mode generates significant log output. Only enable
when troubleshooting cutover issues.

Note: Debug mode is also automatically enabled when Pacemaker
resource tracing is active (crm resource trace migration).
</longdesc>
<shortdesc lang="en">Enable debug logging for cutover</shortdesc>
<content type="boolean" default="${OCF_RESKEY_cutover_debug_default}" />
</parameter>

<parameter name="production_cluster" required="0" unique="0">
<longdesc lang="en">
v2.0 PARAMETER: Which cluster should be the production cluster.

Values:
- "" (empty, default): Use source_cluster as production
- "source_cluster_name": Source cluster is production (forward replication active)
- "target_cluster_name": Target cluster is production (reverse replication active)

BIDIRECTIONAL CUTOVER:
Changing this parameter triggers a cutover. The agent will:
1. Set old production cluster to READ-ONLY mode
2. Wait for replication to sync (lag = 0)
3. DELETE old direction replication (publications, subscriptions, slots, triggers)
4. CREATE new direction replication (fresh setup with copy_data=true)
5. Swap VIP addresses between clusters
6. Set old production cluster back to READ-WRITE mode
7. Update CIB state to reflect new production cluster

FAILBACK CAPABILITY:
To failback, simply change production_cluster back to the original value.
The agent will execute the cutover in the opposite direction.

Example:
- Initial: production_cluster="" (source is production, forward replication)
- Cutover: production_cluster="postgres-clone-18" (target is production, reverse replication)
- Failback: production_cluster="postgres-clone" (source is production again, forward replication)

NOTE: This parameter replaces the deprecated cutover_ready boolean.
</longdesc>
<shortdesc lang="en">Which cluster is production</shortdesc>
<content type="string" default="${OCF_RESKEY_production_cluster_default}" />
</parameter>

<parameter name="finalize_replication" required="0" unique="0">
<longdesc lang="en">
v2.0 PARAMETER: Clean up all replication infrastructure on stop.

Values:
- "false" (default): Preserve replication infrastructure on stop
  - Use this when migrating the resource between nodes
  - Resource can be restarted without losing replication state

- "true": Delete all replication infrastructure on stop
  - Drop all publications, subscriptions, replication slots, and DDL triggers
  - Use this when migration is complete and you want to finalize
  - Cannot be undone - all replication state will be removed

WORKFLOW:
1. Complete migration with cutover (production_cluster changed)
2. Verify everything working on new production cluster
3. When ready to finalize: crm resource update migration-forward finalize_replication=true
4. Stop the resource: crm resource stop migration-forward
5. Delete the resource: crm configure delete migration-forward

The finalize_replication parameter is checked during stop operation.
If true, all replication infrastructure is cleaned up before stopping.
</longdesc>
<shortdesc lang="en">Clean up replication on stop</shortdesc>
<content type="boolean" default="${OCF_RESKEY_finalize_replication_default}" />
</parameter>

<parameter name="stability_timeout" required="0" unique="0">
<longdesc lang="en">
v2.0 PARAMETER: Maximum seconds to wait for cluster stability before cutover.

Before executing a cutover, the agent validates that the target cluster is stable:
- Target cluster has a promoted (primary) node
- PostgreSQL on target is accepting connections
- Replication lag is within acceptable threshold

If the target cluster is not stable within this timeout, the cutover fails
with a clear error message.

Default: 300 seconds (5 minutes)
</longdesc>
<shortdesc lang="en">Cluster stability timeout</shortdesc>
<content type="integer" default="${OCF_RESKEY_stability_timeout_default}" />
</parameter>

</parameters>

<actions>
<action name="start"        timeout="120s" />
<action name="stop"         timeout="60s" />
<action name="monitor"      timeout="30s" interval="10s" depth="0" />
<action name="meta-data"    timeout="5s" />
<action name="validate-all" timeout="30s" />
</actions>
</resource-agent>
END
}

pgtwin_migrate_usage() {
    cat <<END
usage: $0 {start|stop|monitor|validate-all|meta-data}

Expects to have a fully populated OCF RA-compliant environment set.
END
}

#######################################################################
# v2.0 Functions: Dynamic Database Discovery and Bidirectional Cutover
#######################################################################

get_current_databases() {
    # Query databases from the specified cluster
    # Usage: get_current_databases "source"|"target"
    # Returns: space-separated list of database names
    #
    # This function enables dynamic database discovery - databases are
    # queried at runtime instead of requiring static configuration.

    local cluster="$1"
    local host passfile

    if [ "$cluster" = "source" ]; then
        host="$OCF_RESKEY_source_replication_vip"
        passfile="$OCF_RESKEY_source_pgpassfile"
    else
        host="$OCF_RESKEY_target_replication_vip"
        passfile="$OCF_RESKEY_target_pgpassfile"
    fi

    if [ -z "$host" ] || [ -z "$passfile" ]; then
        ocf_log warn "Cannot discover databases: missing host or passfile for $cluster"
        return 1
    fi

    # Query all user databases (exclude templates)
    local databases=$(PGPASSFILE="$passfile" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d postgres -t -A \
        -c "SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname" 2>/dev/null)

    if [ -z "$databases" ]; then
        ocf_log warn "No databases found or query failed for $cluster cluster"
        return 1
    fi

    # Convert newlines to spaces
    echo "$databases" | tr '\n' ' ' | sed 's/ $//'
}

get_database_list() {
    # Get database list - either from parameter or by discovery
    # Usage: get_database_list ["source"|"target"]
    #   cluster: which cluster to query for auto-discovery (default: based on production_cluster)
    # Returns: space-separated list of database names
    # Sets: DB_LIST and DB_COUNT global variables

    local cluster="${1:-}"

    # If explicit list provided, use it
    if [ -n "$OCF_RESKEY_databases" ]; then
        DB_LIST=$(echo "$OCF_RESKEY_databases" | tr ',' ' ')
        DB_COUNT=$(echo "$DB_LIST" | wc -w)
        return 0
    fi

    # Legacy parameter
    if [ -n "$OCF_RESKEY_pgdatabase" ] && [ "$OCF_RESKEY_pgdatabase" != "postgres" ]; then
        DB_LIST="$OCF_RESKEY_pgdatabase"
        DB_COUNT=1
        return 0
    fi

    # Auto-discover from cluster
    if [ -z "$cluster" ]; then
        # Determine which cluster is production
        if [ "$OCF_RESKEY_production_cluster" = "$OCF_RESKEY_target_cluster" ]; then
            cluster="target"
        else
            cluster="source"
        fi
    fi

    local discovered=$(get_current_databases "$cluster")
    if [ $? -eq 0 ] && [ -n "$discovered" ]; then
        DB_LIST="$discovered"
        DB_COUNT=$(echo "$DB_LIST" | wc -w)
        ocf_log info "Auto-discovered $DB_COUNT database(s): $DB_LIST"
        return 0
    else
        ocf_log err "Failed to discover databases from $cluster cluster"
        return 1
    fi
}

get_current_production_cluster() {
    # Get the current production cluster from CIB or default to source
    # Returns: cluster resource name (e.g., "postgres-clone" or "postgres-clone-18")
    #
    # IMPORTANT: When CIB is empty (first start), default to source_cluster
    # This ensures the initial state is "source is production" regardless of
    # the production_cluster parameter. Cutover detection compares this CIB
    # value to the production_cluster parameter to detect changes.

    local cib_production=$(crm_attribute -q -n migration-production-cluster 2>/dev/null)
    if [ -n "$cib_production" ]; then
        echo "$cib_production"
    else
        # Default to source_cluster for initial state (forward replication)
        echo "$OCF_RESKEY_source_cluster"
    fi
}

validate_cluster_stability() {
    # Validate target cluster is stable before cutover
    # Usage: validate_cluster_stability "cluster_resource_name"
    # Returns: 0 if stable, 1 if timeout

    local target_cluster="$1"
    local max_wait="${OCF_RESKEY_stability_timeout:-300}"
    local wait_interval=10
    local elapsed=0

    ocf_log info "Validating cluster stability for $target_cluster (max wait: ${max_wait}s)"

    # Determine which VIP/passfile to use
    local host passfile
    if [ "$target_cluster" = "$OCF_RESKEY_source_cluster" ]; then
        host="$OCF_RESKEY_source_replication_vip"
        passfile="$OCF_RESKEY_source_pgpassfile"
    else
        host="$OCF_RESKEY_target_replication_vip"
        passfile="$OCF_RESKEY_target_pgpassfile"
    fi

    while [ $elapsed -lt $max_wait ]; do
        local all_checks_pass=true

        # Check 1: Promoted node exists
        local promoted=$(discover_cluster_node "$target_cluster" "Promoted")
        if [ -z "$promoted" ]; then
            ocf_log info "Waiting: $target_cluster has no promoted node (${elapsed}s/${max_wait}s)"
            all_checks_pass=false
        fi

        # Check 2: PostgreSQL accepting connections (if promoted exists)
        if [ -n "$promoted" ] && [ "$all_checks_pass" = "true" ]; then
            if ! PGPASSFILE="$passfile" psql -h "$host" -p "$OCF_RESKEY_pgport" \
                -U "$OCF_RESKEY_migration_dbuser" -d postgres -c "SELECT 1" >/dev/null 2>&1; then
                ocf_log info "Waiting: PostgreSQL on $promoted not accepting connections (${elapsed}s/${max_wait}s)"
                all_checks_pass=false
            fi
        fi

        # Check 3: Replication lag within threshold (get lag to this cluster)
        if [ "$all_checks_pass" = "true" ]; then
            # Get lag from current production to target
            local lag=$(get_replication_lag_to_cluster "$target_cluster")
            if [ -n "$lag" ] && [ "$lag" -gt "$OCF_RESKEY_lag_threshold" ]; then
                ocf_log info "Waiting: Replication lag ${lag}KB > threshold ${OCF_RESKEY_lag_threshold}KB (${elapsed}s/${max_wait}s)"
                all_checks_pass=false
            fi
        fi

        # All checks passed
        if [ "$all_checks_pass" = "true" ]; then
            # Warning check: Both nodes online?
            local unpromoted=$(discover_cluster_node "$target_cluster" "Unpromoted")
            if [ -z "$unpromoted" ]; then
                ocf_log warn "Target cluster has no unpromoted node - proceeding with single-node cluster"
            fi

            ocf_log info "✓ Cluster $target_cluster passed stability validation"
            return 0
        fi

        sleep $wait_interval
        elapsed=$((elapsed + wait_interval))
    done

    ocf_log err "Cluster stability validation timed out after ${max_wait}s"
    return 1
}

get_replication_lag_to_cluster() {
    # Get replication lag to the specified cluster
    # Usage: get_replication_lag_to_cluster "cluster_resource_name"
    # Returns: lag in KB (or empty if cannot determine)

    local target_cluster="$1"
    local max_lag=0

    # Determine direction based on which cluster we're measuring lag to
    local sub_host sub_passfile direction
    if [ "$target_cluster" = "$OCF_RESKEY_target_cluster" ]; then
        # Forward replication lag (source -> target)
        direction="forward"
        sub_host="$OCF_RESKEY_target_replication_vip"
        sub_passfile="$OCF_RESKEY_target_pgpassfile"
    else
        # Reverse replication lag (target -> source)
        direction="reverse"
        sub_host="$OCF_RESKEY_source_replication_vip"
        sub_passfile="$OCF_RESKEY_source_pgpassfile"
    fi

    # Get database list
    get_database_list >/dev/null 2>&1

    for db in $DB_LIST; do
        local sub_name="pgtwin_migrate_${direction}_sub_${db}"
        local lag_bytes=$(PGPASSFILE="$sub_passfile" psql -h "$sub_host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$db" -t -A \
            -c "SELECT COALESCE(pg_wal_lsn_diff(latest_end_lsn, confirmed_flush_lsn), 0)
                FROM pg_stat_subscription WHERE subname = '${sub_name}';" 2>/dev/null)

        if [ -n "$lag_bytes" ] && [ "$lag_bytes" -gt "$max_lag" ]; then
            max_lag=$lag_bytes
        fi
    done

    # Convert to KB
    echo $((max_lag / 1024))
}

delete_replication_direction() {
    # Delete all replication infrastructure for a given direction
    # Usage: delete_replication_direction "forward"|"reverse"
    # This is the core function for cutover - we DELETE the old direction
    # completely rather than just disabling it (avoids stale subscription issues)

    local direction="$1"

    ocf_log info "Deleting $direction replication infrastructure for $DB_COUNT database(s)"

    # Determine which cluster has publications vs subscriptions
    local pub_host pub_passfile sub_host sub_passfile
    if [ "$direction" = "forward" ]; then
        # Forward: publications on source, subscriptions on target
        pub_host="$OCF_RESKEY_source_replication_vip"
        pub_passfile="$OCF_RESKEY_source_pgpassfile"
        sub_host="$OCF_RESKEY_target_replication_vip"
        sub_passfile="$OCF_RESKEY_target_pgpassfile"
    else
        # Reverse: publications on target, subscriptions on source
        pub_host="$OCF_RESKEY_target_replication_vip"
        pub_passfile="$OCF_RESKEY_target_pgpassfile"
        sub_host="$OCF_RESKEY_source_replication_vip"
        sub_passfile="$OCF_RESKEY_source_pgpassfile"
    fi

    # Get database list
    get_database_list >/dev/null 2>&1

    for db in $DB_LIST; do
        local pub_name="pgtwin_migrate_${direction}_pub_${db}"
        local sub_name="pgtwin_migrate_${direction}_sub_${db}"

        # Drop subscription first (this also drops the replication slot)
        ocf_log debug "Dropping subscription $sub_name on $sub_host (database: $db)"
        PGPASSFILE="$sub_passfile" psql -h "$sub_host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$db" \
            -c "DROP SUBSCRIPTION IF EXISTS ${sub_name};" >/dev/null 2>&1

        # Drop publication
        ocf_log debug "Dropping publication $pub_name on $pub_host (database: $db)"
        PGPASSFILE="$pub_passfile" psql -h "$pub_host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$db" \
            -c "DROP PUBLICATION IF EXISTS ${pub_name};" >/dev/null 2>&1

        # Drop DDL trigger
        local trigger_name
        if [ "$direction" = "forward" ]; then
            trigger_name="replicate_ddl_to_target_trigger_${db}"
        else
            trigger_name="replicate_ddl_to_source_trigger_${db}"
        fi
        PGPASSFILE="$pub_passfile" psql -h "$pub_host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$db" \
            -c "DROP EVENT TRIGGER IF EXISTS ${trigger_name};" >/dev/null 2>&1

        ocf_log info "  ✓ Deleted $direction replication for database: $db"
    done

    ocf_log info "✓ Deleted $direction replication infrastructure"
    return 0
}

create_replication_direction() {
    # Create all replication infrastructure for a given direction
    # Usage: create_replication_direction "forward"|"reverse"
    # Returns: 0 on success, 1 on failure

    local direction="$1"

    ocf_log info "Creating $direction replication infrastructure for $DB_COUNT database(s)"

    # Determine clusters and hosts based on direction
    local pub_cluster sub_cluster pub_node sub_node
    if [ "$direction" = "forward" ]; then
        pub_cluster="source"
        sub_cluster="target"
        # Get nodes
        pub_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
        sub_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
    else
        pub_cluster="target"
        sub_cluster="source"
        # Get nodes
        pub_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
        sub_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
    fi

    if [ -z "$pub_node" ] || [ -z "$sub_node" ]; then
        ocf_log err "Cannot create $direction replication: missing nodes (pub=$pub_node, sub=$sub_node)"
        return 1
    fi

    # Get database list
    get_database_list >/dev/null 2>&1

    local failed=false
    for db in $DB_LIST; do
        # Use existing setup function
        if ! setup_database_replication "$db" "$direction" "$pub_node" "$sub_node" "false" "false"; then
            ocf_log err "Failed to setup $direction replication for database: $db"
            failed=true
            break
        fi

        # Setup DDL trigger
        local target_for_ddl
        if [ "$direction" = "forward" ]; then
            target_for_ddl="target"
            setup_ddl_trigger "source" "$pub_node" "$sub_node" "target" "false" "$db"
        else
            target_for_ddl="source"
            setup_ddl_trigger "target" "$pub_node" "$sub_node" "source" "false" "$db"
        fi

        ocf_log info "  ✓ Created $direction replication for database: $db"
    done

    if [ "$failed" = "true" ]; then
        return 1
    fi

    ocf_log info "✓ Created $direction replication infrastructure"
    return 0
}

check_and_trigger_cutover() {
    # Check if production_cluster parameter changed and trigger cutover
    # Called from notify and monitor functions
    # Returns: 0 if no action needed or cutover successful, 1 if cutover failed

    local current_production=$(get_current_production_cluster)
    local desired_production="$OCF_RESKEY_production_cluster"

    # Skip if no change
    if [ "$desired_production" = "$current_production" ]; then
        return 0
    fi

    ocf_log info "=========================================="
    ocf_log info "CUTOVER REQUESTED"
    ocf_log info "=========================================="
    ocf_log info "Current production: $current_production"
    ocf_log info "Desired production: $desired_production"
    ocf_log info "=========================================="

    # Determine direction of cutover
    local old_direction new_direction
    if [ "$current_production" = "$OCF_RESKEY_source_cluster" ]; then
        old_direction="forward"
        new_direction="reverse"
    else
        old_direction="reverse"
        new_direction="forward"
    fi

    # Validate target cluster stability
    if ! validate_cluster_stability "$desired_production"; then
        ocf_log err "Cutover blocked - target cluster validation failed"
        ocf_log err "Resource will continue with current production: $current_production"
        return 1
    fi

    # Execute cutover asynchronously to avoid monitor timeout
    start_async_cutover_v2 "$current_production" "$desired_production" "$old_direction" "$new_direction"
    return $?
}

finalize_all_replication() {
    # Clean up all replication infrastructure - called when finalize_replication=true
    # This deletes whichever direction is currently active

    ocf_log info "=========================================="
    ocf_log info "FINALIZING MIGRATION"
    ocf_log info "=========================================="
    ocf_log info "Deleting all replication infrastructure"

    local current_production=$(get_current_production_cluster)

    # Determine which direction is active
    if [ "$current_production" = "$OCF_RESKEY_source_cluster" ]; then
        # Forward replication is active (source is production)
        delete_replication_direction "forward"
    else
        # Reverse replication is active (target is production)
        delete_replication_direction "reverse"
    fi

    # Clear CIB state
    ocf_log info "Clearing CIB migration state attributes"
    crm_attribute -D -n migration-production-cluster 2>/dev/null || true
    crm_attribute -D -n migration-state 2>/dev/null || true
    crm_attribute -D -n migration-last-cutover 2>/dev/null || true

    ocf_log info "=========================================="
    ocf_log info "✓ MIGRATION FINALIZED"
    ocf_log info "=========================================="
    ocf_log info "All replication infrastructure removed"
    ocf_log info "Resource can be safely deleted"
    ocf_log info "=========================================="

    return 0
}

check_replication_slot_availability_v2() {
    # Check if sufficient replication slots are available
    # Usage: check_replication_slot_availability_v2 "source"|"target" db_count
    # Returns: 0 if OK, OCF_ERR_CONFIGURED if insufficient

    local cluster="$1"
    local db_count="$2"
    local slots_needed="$db_count"

    local host passfile
    if [ "$cluster" = "source" ]; then
        host="$OCF_RESKEY_source_replication_vip"
        passfile="$OCF_RESKEY_source_pgpassfile"
    else
        host="$OCF_RESKEY_target_replication_vip"
        passfile="$OCF_RESKEY_target_pgpassfile"
    fi

    # Query current slot usage
    local max_slots=$(PGPASSFILE="$passfile" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d postgres -t -A \
        -c "SHOW max_replication_slots" 2>/dev/null)

    local migration_slots=$(PGPASSFILE="$passfile" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d postgres -t -A \
        -c "SELECT count(*) FROM pg_replication_slots WHERE slot_name LIKE 'pgtwin_migrate_%'" 2>/dev/null)

    local other_slots=$(PGPASSFILE="$passfile" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d postgres -t -A \
        -c "SELECT count(*) FROM pg_replication_slots WHERE slot_name NOT LIKE 'pgtwin_migrate_%'" 2>/dev/null)

    # Handle empty values
    max_slots=${max_slots:-10}
    migration_slots=${migration_slots:-0}
    other_slots=${other_slots:-0}

    # Calculate available slots (excluding our existing migration slots which will be reused)
    local available=$((max_slots - other_slots))

    if [ "$slots_needed" -gt "$available" ]; then
        ocf_log err "============================================"
        ocf_log err "INSUFFICIENT REPLICATION SLOTS ON $cluster"
        ocf_log err "============================================"
        ocf_log err "Databases to replicate: $db_count"
        ocf_log err "Slots needed on $cluster: $slots_needed"
        ocf_log err "Slots available: $available"
        ocf_log err "  max_replication_slots: $max_slots"
        ocf_log err "  Used by other applications: $other_slots"
        ocf_log err "  Used by pgtwin-migrate: $migration_slots"
        ocf_log err "--------------------------------------------"
        ocf_log err "RESOLUTION:"
        ocf_log err "  1. Increase max_replication_slots in postgresql.conf"
        ocf_log err "     Recommended: max_replication_slots = $((slots_needed + other_slots + 5))"
        ocf_log err "  2. Restart PostgreSQL or use ALTER SYSTEM + pg_reload_conf()"
        ocf_log err "  3. Cleanup resource: crm resource cleanup ${OCF_RESOURCE_INSTANCE}"
        ocf_log err "============================================"
        return $OCF_ERR_CONFIGURED
    fi

    ocf_log info "Replication slots OK on $cluster: need $slots_needed, have $available available"
    return 0
}

#######################################################################
# End of v2.0 Functions
#######################################################################

pgtwin_migrate_validate() {
    ocf_log info "Validating pgtwin-migrate configuration"

    # Check required parameters
    if [ -z "$OCF_RESKEY_source_cluster" ]; then
        ocf_log err "source_cluster parameter is required"
        return $OCF_ERR_CONFIGURED
    fi

    if [ -z "$OCF_RESKEY_target_cluster" ]; then
        ocf_log err "target_cluster parameter is required"
        return $OCF_ERR_CONFIGURED
    fi

    if [ -z "$OCF_RESKEY_source_replication_vip" ]; then
        ocf_log err "source_replication_vip parameter is required"
        return $OCF_ERR_CONFIGURED
    fi

    if [ -z "$OCF_RESKEY_target_replication_vip" ]; then
        ocf_log err "target_replication_vip parameter is required"
        return $OCF_ERR_CONFIGURED
    fi

    # Validate VIP addresses format
    if ! [[ "$OCF_RESKEY_source_replication_vip" =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
        ocf_log err "source_replication_vip has invalid IP format: $OCF_RESKEY_source_replication_vip"
        return $OCF_ERR_CONFIGURED
    fi

    if ! [[ "$OCF_RESKEY_target_replication_vip" =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
        ocf_log err "target_replication_vip has invalid IP format: $OCF_RESKEY_target_replication_vip"
        return $OCF_ERR_CONFIGURED
    fi

    # Validate lag threshold
    if [ "$OCF_RESKEY_lag_threshold" -lt 0 ]; then
        ocf_log err "lag_threshold must be >= 0"
        return $OCF_ERR_CONFIGURED
    fi

    # Check if psql is available
    if ! command -v psql >/dev/null 2>&1; then
        ocf_log err "psql command not found in PATH"
        return $OCF_ERR_INSTALLED
    fi

    # Check if crm_mon is available (for cluster queries)
    if ! command -v crm_mon >/dev/null 2>&1; then
        ocf_log err "crm_mon command not found in PATH"
        return $OCF_ERR_INSTALLED
    fi

    ocf_log info "Configuration validation passed"
    return $OCF_SUCCESS
}

#######################################################################
# Helper Functions

get_cluster_pghome() {
    # Discover PGHOME from postgres primitive configuration
    # Usage: get_cluster_pghome "source" or get_cluster_pghome "target"
    # Returns: PGHOME path or default /var/lib/pgsql

    local cluster_type="$1"  # "source" or "target"
    local cluster_resource=""
    local pghome_param=""

    if [ "$cluster_type" = "source" ]; then
        cluster_resource="$OCF_RESKEY_source_cluster"
        pghome_param="$OCF_RESKEY_source_pghome"
    elif [ "$cluster_type" = "target" ]; then
        cluster_resource="$OCF_RESKEY_target_cluster"
        pghome_param="$OCF_RESKEY_target_pghome"
    else
        ocf_log err "Invalid cluster_type: $cluster_type (must be 'source' or 'target')"
        echo "/var/lib/pgsql"
        return 1
    fi

    # If already set as parameter, use it
    if [ -n "$pghome_param" ]; then
        ocf_log debug "Using configured ${cluster_type}_pghome: ${pghome_param}"
        echo "$pghome_param"
        return 0
    fi

    # Try to discover from cluster primitive
    local primitive_resource=$(crm configure show "$cluster_resource" 2>/dev/null | grep "^clone" | awk '{print $3}')

    if [ -n "$primitive_resource" ]; then
        # Try to get pgdata and derive pghome from it
        local pgdata=$(crm configure show "$primitive_resource" 2>/dev/null | grep -oP 'pgdata="\K[^"]+')

        if [ -n "$pgdata" ]; then
            # Remove /data suffix to get pghome
            local pghome=$(echo "$pgdata" | sed 's|/data$||')
            ocf_log debug "Discovered PGHOME from ${primitive_resource} pgdata: ${pghome}"
            echo "$pghome"
            return 0
        fi

        # Try to get pghome parameter directly
        local pghome=$(crm configure show "$primitive_resource" 2>/dev/null | grep -oP 'pghome="\K[^"]+')
        if [ -n "$pghome" ]; then
            ocf_log debug "Discovered PGHOME from ${primitive_resource} pghome: ${pghome}"
            echo "$pghome"
            return 0
        fi
    fi

    # Default fallback
    ocf_log debug "Using default PGHOME for ${cluster_type}: /var/lib/pgsql"
    echo "/var/lib/pgsql"
    return 0
}

get_cluster_pgpassfile() {
    # Discover pgpassfile from postgres primitive configuration
    # Usage: get_cluster_pgpassfile "source" or get_cluster_pgpassfile "target"
    # Returns: pgpassfile path or default $PGHOME/.pgpass

    local cluster_type="$1"  # "source" or "target"
    local cluster_resource=""
    local pgpassfile_param=""
    local pghome=""

    if [ "$cluster_type" = "source" ]; then
        cluster_resource="$OCF_RESKEY_source_cluster"
        pgpassfile_param="$OCF_RESKEY_source_pgpassfile"
        pghome=$(get_cluster_pghome "source")
    elif [ "$cluster_type" = "target" ]; then
        cluster_resource="$OCF_RESKEY_target_cluster"
        pgpassfile_param="$OCF_RESKEY_target_pgpassfile"
        pghome=$(get_cluster_pghome "target")
    else
        ocf_log err "Invalid cluster_type: $cluster_type (must be 'source' or 'target')"
        echo "/var/lib/pgsql/.pgpass"
        return 1
    fi

    # If already set as parameter, use it
    if [ -n "$pgpassfile_param" ]; then
        ocf_log debug "Using configured ${cluster_type}_pgpassfile: ${pgpassfile_param}"
        echo "$pgpassfile_param"
        return 0
    fi

    # Try to discover from cluster primitive
    local primitive_resource=$(crm configure show "$cluster_resource" 2>/dev/null | grep "^clone" | awk '{print $3}')

    if [ -n "$primitive_resource" ]; then
        local pgpassfile=$(crm configure show "$primitive_resource" 2>/dev/null | grep -oP 'pgpassfile="\K[^"]+')

        if [ -n "$pgpassfile" ]; then
            ocf_log debug "Discovered pgpassfile from ${primitive_resource}: ${pgpassfile}"
            echo "$pgpassfile"
            return 0
        fi
    fi

    # Default: $PGHOME/.pgpass
    local default_pgpassfile="${pghome}/.pgpass"
    ocf_log debug "Using default pgpassfile for ${cluster_type}: ${default_pgpassfile}"
    echo "$default_pgpassfile"
    return 0
}

run_as_postgres_user() {
    # Run command as postgres Unix system user with proper environment
    # Usage: run_as_postgres_user "source" psql -h host ...
    #    or: run_as_postgres_user "target" psql -h host ...
    # Note: Always runs as 'postgres' Unix system user (not to be confused with database user)

    local cluster_type="$1"
    shift  # Remove cluster_type from arguments

    # Always use 'postgres' system user, regardless of database user
    local system_user="postgres"
    local pguid=$(id -u "$system_user" 2>/dev/null) || {
        ocf_log err "Cannot determine UID for system user: $system_user"
        return 1
    }
    local pggid=$(id -g "$system_user" 2>/dev/null) || {
        ocf_log err "Cannot determine GID for system user: $system_user"
        return 1
    }

    # Get PGHOME and PGPASSFILE from cluster configuration
    local pghome=$(get_cluster_pghome "$cluster_type")
    local pgpassfile=$(get_cluster_pgpassfile "$cluster_type")

    # Set HOME and PGPASSFILE environment variables
    # Use env to set variables within setpriv context
    setpriv --reuid="$pguid" --regid="$pggid" --clear-groups \
        env HOME="$pghome" PGPASSFILE="$pgpassfile" "$@"
}

discover_promoted_node() {
    # Discover the promoted (primary) node for a given clone resource
    # Usage: discover_promoted_node "postgres-clone"
    # Returns: node hostname or empty string
    #
    # FIXED v1.6.16: Added hostname resolution after node discovery

    local clone_resource="$1"
    local promoted_node=""

    if [ -z "$clone_resource" ]; then
        ocf_log err "discover_promoted_node: clone_resource parameter required"
        return 1
    fi

    # Use discover_cluster_node with Promoted role
    # This function already includes hostname resolution
    promoted_node=$(discover_cluster_node "$clone_resource" "Promoted")

    if [ $? -eq 0 ] && [ -n "$promoted_node" ]; then
        ocf_log info "Discovered Promoted node for ${clone_resource}: ${promoted_node}"
        echo "$promoted_node"
        return 0
    fi

    ocf_log warn "Could not discover Promoted node for: $clone_resource"
    return 1
}

resolve_cluster_node_to_hostname() {
    # Resolve Pacemaker cluster node name to actual hostname/IP
    # This is needed when cluster node names differ from system hostnames
    local cluster_node="$1"

    # Try to get the node's hostname from Pacemaker
    if command -v crm_node >/dev/null 2>&1; then
        # Query Pacemaker for node attributes
        local node_hostname=$(crm_node -l 2>/dev/null | grep "^[0-9]* ${cluster_node} " | awk '{print $2}')

        if [ -n "$node_hostname" ] && [ "$node_hostname" != "$cluster_node" ]; then
            ocf_log debug "Resolved cluster node '${cluster_node}' to hostname '${node_hostname}'"
            echo "$node_hostname"
            return 0
        fi
    fi

    # Last resort: return the cluster node name as-is (may not be resolvable)
    # In most cases, the cluster node name IS the hostname
    ocf_log debug "Using cluster node name as hostname: ${cluster_node}"
    echo "$cluster_node"
    return 0
}

discover_cluster_node() {
    # Discover a cluster node based on its role (Promoted or Unpromoted)
    # Usage: discover_cluster_node "postgres-clone" "Promoted"
    #    or: discover_cluster_node "postgres-clone" "Unpromoted"
    # Returns: node hostname or empty string
    #
    # BUG FIX v1.6.15: Use XML parsing instead of awk text parsing
    # Text parsing was unreliable during cluster state transitions

    local clone_resource="$1"
    local role="$2"  # Promoted or Unpromoted
    local target_node=""

    if [ -z "$clone_resource" ]; then
        ocf_log err "discover_cluster_node: clone_resource parameter required"
        return 1
    fi

    if [ -z "$role" ]; then
        ocf_log err "discover_cluster_node: role parameter required (Promoted or Unpromoted)"
        return 1
    fi

    ocf_log debug "Discovering ${role} node for ${clone_resource} using XML parsing"

    # Use crm_mon XML output with XPath for reliable parsing
    # This avoids race conditions and ambiguity with text parsing
    target_node=$(crm_mon --as-xml 2>/dev/null | xmllint --xpath \
        "string(//clone[@id='${clone_resource}']/resource[@role='${role}']/node/@name)" - 2>/dev/null)

    if [ -n "$target_node" ]; then
        ocf_log info "Discovered ${role} cluster node for ${clone_resource}: ${target_node}"

        # Resolve cluster node name to hostname/IP
        target_node=$(resolve_cluster_node_to_hostname "$target_node")
        if [ $? -eq 0 ]; then
            ocf_log info "Resolved to connectable hostname: ${target_node}"
            echo "$target_node"
            return 0
        else
            ocf_log warn "Could not resolve cluster node to hostname"
            return 1
        fi
    fi

    ocf_log warn "Could not discover ${role} node for: $clone_resource"
    return 1
}

get_replication_user() {
    # Get replication user from .pgpass file
    # Usage: get_replication_user [cluster_type]
    # Looks for entries with database=replication or database=postgres

    local cluster_type="${1:-source}"  # Default to source cluster
    local pghome=$(get_cluster_pghome "$cluster_type")
    local pgpass_file="${pghome}/.pgpass"

    if [ ! -f "$pgpass_file" ]; then
        ocf_log warn ".pgpass file not found at $pgpass_file"
        echo "replicator"  # Default fallback
        return 0
    fi

    # Parse .pgpass for replication user
    # Format: hostname:port:database:username:password
    local repl_user=$(grep -v "^#" "$pgpass_file" | grep -E ":(replication|postgres):" | head -1 | cut -d: -f4)

    if [ -n "$repl_user" ]; then
        ocf_log debug "Found replication user in .pgpass: $repl_user"
        echo "$repl_user"
    else
        ocf_log warn "Could not parse replication user from .pgpass, using default"
        echo "replicator"
    fi

    return 0
}

check_publication_exists() {
    # Check if publication exists on a given host
    # Usage: check_publication_exists "cluster_type" "hostname" "publication_name" "database"
    # cluster_type: "source" or "target" (determines which PGHOME/PGPASSFILE to use)

    local cluster_type="$1"
    local host="$2"
    local pub_name="$3"
    local database="$4"

    local result=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc \
        "SELECT count(*) FROM pg_publication WHERE pubname='${pub_name}';" 2>/dev/null)

    if [ "$result" = "1" ]; then
        return 0  # Publication exists
    else
        return 1  # Publication does not exist
    fi
}

check_subscription_exists() {
    # Check if subscription exists on a given host
    # Usage: check_subscription_exists "cluster_type" "hostname" "subscription_name" "database"
    # cluster_type: "source" or "target" (determines which PGHOME/PGPASSFILE to use)

    local cluster_type="$1"
    local host="$2"
    local sub_name="$3"
    local database="$4"

    local result=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc \
        "SELECT count(*) FROM pg_subscription WHERE subname='${sub_name}' AND subdbid = (SELECT oid FROM pg_database WHERE datname = current_database());" 2>/dev/null)

    if [ "$result" = "1" ]; then
        return 0  # Subscription exists
    else
        return 1  # Subscription does not exist
    fi
}

check_replication_slot_availability() {
    # Check if sufficient replication slots are available on source cluster
    # Usage: check_replication_slot_availability "source_host" "db_count"
    # Returns: 0 if sufficient slots available, 1 if not
    #
    # Migration requires slots for:
    #   - Forward replication: 1 slot per database (source → target)
    #   - Reverse replication: 1 slot per database (target → source) - created during cutover
    # Plus existing slots (HA physical slots, etc.)

    local source_host="$1"
    local db_count="$2"

    # Query max_replication_slots from source
    local max_slots=$(run_as_postgres_user "source" psql -h "$source_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d postgres -Atc \
        "SHOW max_replication_slots;" 2>/dev/null)

    if [ -z "$max_slots" ] || ! [[ "$max_slots" =~ ^[0-9]+$ ]]; then
        ocf_log warn "Could not query max_replication_slots from source, skipping check"
        return 0
    fi

    # Query current slot count (excluding our migration slots that may already exist)
    local current_slots=$(run_as_postgres_user "source" psql -h "$source_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d postgres -Atc \
        "SELECT count(*) FROM pg_replication_slots WHERE slot_name NOT LIKE 'pgtwin_migrate_%';" 2>/dev/null)

    if [ -z "$current_slots" ] || ! [[ "$current_slots" =~ ^[0-9]+$ ]]; then
        ocf_log warn "Could not query current replication slots from source, skipping check"
        return 0
    fi

    # Calculate slots needed for migration (forward only during start, reverse created during cutover)
    # Forward: 1 slot per database
    # We need to ensure there's room for reverse slots too (1 per database)
    local forward_slots_needed=$db_count
    local reverse_slots_needed=$db_count
    local total_slots_needed=$((forward_slots_needed + reverse_slots_needed))
    local available_slots=$((max_slots - current_slots))

    ocf_log info "Replication slot check:"
    ocf_log info "  max_replication_slots: ${max_slots}"
    ocf_log info "  Current non-migration slots: ${current_slots}"
    ocf_log info "  Available slots: ${available_slots}"
    ocf_log info "  Slots needed for migration: ${total_slots_needed} (${forward_slots_needed} forward + ${reverse_slots_needed} reverse)"

    if [ "$available_slots" -lt "$total_slots_needed" ]; then
        ocf_log err "Insufficient replication slots available!"
        ocf_log err "  Need ${total_slots_needed} slots but only ${available_slots} available"
        ocf_log err "  Current slots on source:"
        run_as_postgres_user "source" psql -h "$source_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d postgres -c \
            "SELECT slot_name, slot_type, active FROM pg_replication_slots ORDER BY slot_name;" 2>/dev/null | while read line; do
            ocf_log err "    $line"
        done
        ocf_log err ""
        ocf_log err "Resolution options:"
        ocf_log err "  1. Increase max_replication_slots on source cluster (requires restart):"
        ocf_log err "     ALTER SYSTEM SET max_replication_slots = $((current_slots + total_slots_needed + 2));"
        ocf_log err "  2. Drop unused replication slots:"
        ocf_log err "     SELECT pg_drop_replication_slot('slot_name');"
        return 1
    fi

    ocf_log info "✓ Sufficient replication slots available"
    return 0
}

ensure_replication_slots() {
    # Ensure replication slots exist for all subscriptions in the given direction
    # This recovers from situations where slots were lost (failover, restart, pg_rewind)
    #
    # Usage: ensure_replication_slots "direction" "slot_host" "subscription_host"
    #   direction: "forward" or "reverse"
    #   slot_host: host where slots should exist (source for forward, target for reverse)
    #   subscription_host: host where subscriptions exist (target for forward, source for reverse)
    #
    # Forward replication: slots on source, subscriptions on target
    # Reverse replication: slots on target, subscriptions on source
    #
    # Returns: 0 if all slots OK, 1 if recovery failed

    local direction="$1"
    local slot_host="$2"
    local subscription_host="$3"
    local recovered=0
    local failed=0

    # Determine which passfile to use based on direction
    local slot_passfile subscription_passfile
    if [ "$direction" = "forward" ]; then
        slot_passfile="$OCF_RESKEY_source_pgpassfile"
        subscription_passfile="$OCF_RESKEY_target_pgpassfile"
    else
        slot_passfile="$OCF_RESKEY_target_pgpassfile"
        subscription_passfile="$OCF_RESKEY_source_pgpassfile"
    fi

    ocf_log debug "Checking ${direction} replication slots on: ${slot_host}"

    for db in $DB_LIST; do
        local slot_name="pgtwin_migrate_${direction}_slot_${db}"
        local sub_name="pgtwin_migrate_${direction}_sub_${db}"

        # Check if subscription exists on subscription_host
        local sub_exists=$(PGPASSFILE="$subscription_passfile" psql -h "$subscription_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$db" -Atc \
            "SELECT count(*) FROM pg_subscription WHERE subname = '${sub_name}';" 2>/dev/null)

        if [ "$sub_exists" != "1" ]; then
            ocf_log debug "Subscription ${sub_name} does not exist, skipping slot check"
            continue
        fi

        # Check if slot exists on slot_host
        local slot_exists=$(PGPASSFILE="$slot_passfile" psql -h "$slot_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d postgres -Atc \
            "SELECT count(*) FROM pg_replication_slots WHERE slot_name = '${slot_name}';" 2>/dev/null)

        if [ "$slot_exists" = "0" ]; then
            ocf_log warn "${direction^} replication slot missing: ${slot_name} - attempting recovery"

            # Create the missing slot
            local result=$(PGPASSFILE="$slot_passfile" psql -h "$slot_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$db" -Atc \
                "SELECT pg_create_logical_replication_slot('${slot_name}', 'pgoutput');" 2>&1)

            if echo "$result" | grep -q "$slot_name"; then
                ocf_log info "✓ Recovered ${direction} replication slot: ${slot_name}"
                recovered=$((recovered + 1))

                # Refresh the subscription to reconnect with new slot
                PGPASSFILE="$subscription_passfile" psql -h "$subscription_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
                    "ALTER SUBSCRIPTION ${sub_name} DISABLE; ALTER SUBSCRIPTION ${sub_name} ENABLE;" >/dev/null 2>&1
            else
                ocf_log err "Failed to recover ${direction} replication slot: ${slot_name}"
                ocf_log err "Error: ${result}"
                failed=$((failed + 1))
            fi
        fi
    done

    if [ $recovered -gt 0 ]; then
        ocf_log info "${direction^} slot recovery complete: ${recovered} slot(s) recovered, ${failed} failed"
    fi

    if [ $failed -gt 0 ]; then
        return 1
    fi
    return 0
}

# Wrapper functions for backward compatibility and clearer call sites
ensure_forward_replication_slots() {
    # Forward: slots on source, subscriptions on target
    local source_host="$1"
    local target_host="$2"
    ensure_replication_slots "forward" "$source_host" "$target_host"
}

ensure_reverse_replication_slots() {
    # Reverse: slots on target, subscriptions on source
    local source_host="$1"  # Has reverse subscriptions
    local target_host="$2"  # Has reverse slots
    ensure_replication_slots "reverse" "$target_host" "$source_host"
}

get_subscription_state() {
    # Get subscription state and lag
    # Usage: get_subscription_state "cluster_type" "hostname" "subscription_name" "database"
    # Returns: "subname|state|lag_bytes" or empty if not found
    # State values: streaming (worker running), startup (enabled but no worker yet), disabled

    local cluster_type="$1"
    local host="$2"
    local sub_name="$3"
    local database="${4:-$OCF_RESKEY_pgdatabase}"  # Use provided database or fall back to parameter

    # PostgreSQL 17/18: Check if worker is running via pid column
    # If pid exists, worker is active (state='streaming')
    # If no pid but subenabled=true, worker is starting (state='startup')
    # If subenabled=false, subscription is disabled (state='disabled')
    local state_info=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc "
        SELECT
            s.subname,
            CASE
                WHEN MAX(sr.pid) IS NOT NULL THEN 'streaming'
                WHEN s.subenabled THEN 'startup'
                ELSE 'disabled'
            END as state,
            COALESCE(
                pg_wal_lsn_diff(
                    (SELECT pg_current_wal_lsn()),
                    MAX(sr.latest_end_lsn)
                )::bigint,
                -1
            ) as lag_bytes
        FROM pg_subscription s
        LEFT JOIN pg_stat_subscription sr ON s.oid = sr.subid
        WHERE s.subname = '${sub_name}' AND s.subdbid = (SELECT oid FROM pg_database WHERE datname = current_database())
        GROUP BY s.subname, s.subenabled;
    " 2>/dev/null)

    if [ -n "$state_info" ]; then
        echo "$state_info"
        return 0
    else
        return 1
    fi
}

get_replication_lag() {
    # Get replication lag from subscription
    # Usage: get_replication_lag "cluster_type" "hostname" "subscription_name" "database"
    # Returns: lag in bytes, 0 if synced or empty database, -1 if cannot determine

    local cluster_type="$1"  # "source" or "target"
    local host="$2"
    local sub_name="$3"
    local database="$4"

    # First check if subscription has any tables to replicate
    # Empty databases (no tables) should be considered synced (lag = 0)
    local table_count=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc "
        SELECT COUNT(*) FROM pg_subscription_rel sr
        JOIN pg_subscription s ON s.oid = sr.srsubid
        WHERE s.subname = '${sub_name}';
    " 2>/dev/null)

    if [ "$table_count" = "0" ] || [ -z "$table_count" ]; then
        # No tables to replicate - consider synced
        ocf_log debug "Subscription ${sub_name} has no tables - treating as synced"
        echo "0"
        return 0
    fi

    local lag=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc "
        SELECT COALESCE(
            pg_wal_lsn_diff(
                (SELECT received_lsn FROM pg_stat_subscription WHERE subname = '${sub_name}'),
                (SELECT latest_end_lsn FROM pg_stat_subscription WHERE subname = '${sub_name}')
            )::bigint,
            -1
        );
    " 2>/dev/null)

    if [ -z "$lag" ]; then
        echo "-1"
    else
        echo "$lag"
    fi
}

# ============================================================================
# REUSABLE REPLICATION SETUP FUNCTIONS
# ============================================================================

setup_publication() {
    # Create publication on specified node
    # Usage: setup_publication <cluster_type> <host> <pub_name> <database> [read_only_override]
    # Example: setup_publication "source" "pgtwin01" "pgtwin_migrate_pub_postgres" "postgres" "false"

    local cluster_type="$1"  # "source" or "target"
    local host="$2"
    local pub_name="$3"
    local database="$4"      # Database name
    local read_only_override="${5:-false}"  # "true" or "false" (optional, defaults to false)

    ocf_log info "Checking for publication: ${pub_name} on ${host} (database: ${database})"

    if check_publication_exists "$cluster_type" "$host" "$pub_name" "$database"; then
        ocf_log info "✓ Publication already exists: ${pub_name} on database: ${database}"

        # Verify publication is configured correctly (FOR ALL TABLES)
        local puballtables=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc \
            "SELECT puballtables FROM pg_publication WHERE pubname='${pub_name}';" 2>/dev/null)

        if [ "$puballtables" != "t" ]; then
            ocf_log warn "Publication ${pub_name} exists but not configured for ALL TABLES (puballtables=${puballtables})"
            ocf_log warn "Recommend: DROP PUBLICATION ${pub_name}; then retry migration setup"
        else
            ocf_log debug "✓ Publication ${pub_name} correctly configured (FOR ALL TABLES)"
        fi

        return 0
    fi

    ocf_log info "Creating publication: ${pub_name} on ${host}"

    if [ "$read_only_override" = "true" ]; then
        # Create temporary SQL file (safer than HEREDOC in async background process)
        local tmpfile=$(mktemp)
        cat > "$tmpfile" <<EOF
SET default_transaction_read_only = off;
CREATE PUBLICATION ${pub_name} FOR ALL TABLES;
EOF
        run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -f "$tmpfile" >/dev/null 2>&1
        local rc=$?
        rm -f "$tmpfile"
        if [ $rc -ne 0 ]; then
            ocf_log err "Failed to create publication: ${pub_name}"
            return 1
        fi
    else
        run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc \
            "CREATE PUBLICATION ${pub_name} FOR ALL TABLES;" >/dev/null 2>&1
        if [ $? -ne 0 ]; then
            ocf_log err "Failed to create publication: ${pub_name} on database: ${database}"
            return 1
        fi
    fi

    ocf_log info "✓ Publication created: ${pub_name} on database: ${database}"
    return 0
}

setup_subscription() {
    # Create subscription with immediate refresh to copy table structures
    # Usage: setup_subscription <cluster_type> <host> <sub_name> <conn_str> <pub_name> <slot_name> <database> [read_only_override]
    # Example: setup_subscription "target" "pgtwin12" "pgtwin_migrate_sub" "host=..." "pgtwin_migrate_pub" "pgtwin_migrate_slot" "postgres" "false"

    local cluster_type="$1"
    local host="$2"
    local sub_name="$3"
    local conn_str="$4"       # Connection string (should include dbname=...)
    local pub_name="$5"
    local slot_name="$6"
    local database="$7"       # Database name (for psql -d)
    local read_only_override="${8:-false}"  # Optional, defaults to false

    ocf_log info "Checking for subscription: ${sub_name} on ${host} (database: ${database})"

    if check_subscription_exists "$cluster_type" "$host" "$sub_name" "$database"; then
        ocf_log info "✓ Subscription already exists: ${sub_name} on database: ${database}"
        return 0
    fi

    ocf_log info "Creating subscription: ${sub_name} on ${host} (database: ${database})"

    if [ "$read_only_override" = "true" ]; then
        # Create temporary SQL file (safer than HEREDOC in async background process)
        local tmpfile=$(mktemp)
        cat > "$tmpfile" <<EOF
SET default_transaction_read_only = off;
CREATE SUBSCRIPTION ${sub_name} CONNECTION '${conn_str}' PUBLICATION ${pub_name} WITH (create_slot = true, slot_name = '${slot_name}', copy_data = true);
EOF
        run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -f "$tmpfile" >/dev/null 2>&1
        local rc=$?
        rm -f "$tmpfile"
        if [ $rc -ne 0 ]; then
            ocf_log err "Failed to create subscription: ${sub_name} on database: ${database}"
            return 1
        fi
    else
        local sub_output=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" -c \
            "CREATE SUBSCRIPTION ${sub_name}
             CONNECTION '${conn_str}'
             PUBLICATION ${pub_name}
             WITH (create_slot = true, slot_name = '${slot_name}', copy_data = true);" 2>&1)
        local sub_rc=$?
        if [ $sub_rc -ne 0 ]; then
            ocf_log err "Failed to create subscription: ${sub_name} on database: ${database}"
            ocf_log err "Error output: $sub_output"
            return 1
        fi
    fi

    ocf_log info "✓ Subscription created: ${sub_name} on database: ${database}"
    ocf_log info "   Initial table sync started (copy_data=true)"

    return 0
}

#######################################################################
# Unified database-level replication setup (handles both forward and reverse)
#######################################################################

setup_database_replication() {
    # Set up complete replication for ONE database (forward or reverse direction)
    # This is a self-contained function - only knows about this database
    #
    # Usage: setup_database_replication "db_name" "direction" "source_node" "target_node" ["skip_schema_copy"] ["create_disabled"] ["read_only_override"]
    #
    # Arguments:
    #   db                  - Database name
    #   direction           - "forward" or "reverse"
    #   source_node         - Source node hostname (for logging)
    #   target_node         - Target node hostname (for logging)
    #   skip_schema_copy    - Optional: "true" to skip schema copy, defaults to "false"
    #   create_disabled     - Optional: "true" to create disabled subscription, defaults to "false"
    #   read_only_override  - Optional: "true" to override read_only for subscription creation, defaults to "false"
    #
    # Returns: 0 on success, 1 on failure

    local db="$1"
    local direction="$2"
    local source_node="$3"
    local target_node="$4"
    local skip_schema_copy="${5:-false}"
    local create_disabled="${6:-false}"
    local read_only_override="${7:-false}"

    # Validate direction
    if [ "$direction" != "forward" ] && [ "$direction" != "reverse" ]; then
        ocf_log err "Invalid direction: $direction (must be 'forward' or 'reverse')"
        return 1
    fi

    # Determine source and target based on direction
    local pub_cluster_type=""
    local pub_host=""
    local sub_cluster_type=""
    local sub_host=""
    local pub_name=""
    local sub_name=""
    local slot_name=""

    if [ "$direction" = "forward" ]; then
        # Forward: source -> target
        pub_cluster_type="source"
        pub_host="$OCF_RESKEY_source_replication_vip"
        sub_cluster_type="target"
        sub_host="$OCF_RESKEY_target_replication_vip"
        pub_name="pgtwin_migrate_forward_pub_${db}"
        sub_name="pgtwin_migrate_forward_sub_${db}"
        slot_name="pgtwin_migrate_forward_slot_${db}"
    else
        # Reverse: target -> source
        pub_cluster_type="target"
        pub_host="$OCF_RESKEY_target_replication_vip"
        sub_cluster_type="source"
        sub_host="$OCF_RESKEY_source_replication_vip"
        pub_name="pgtwin_migrate_reverse_pub_${db}"
        sub_name="pgtwin_migrate_reverse_sub_${db}"
        slot_name="pgtwin_migrate_reverse_slot_${db}"
    fi

    ocf_log info "=========================================="
    ocf_log info "Setting up ${direction} replication for database: ${db}"
    ocf_log info "  Publication: ${pub_cluster_type} ($pub_host)"
    ocf_log info "  Subscription: ${sub_cluster_type} ($sub_host)"
    ocf_log info "  Schema copy: $([ "$skip_schema_copy" = "true" ] && echo "SKIPPED" || echo "ENABLED")"
    ocf_log info "  Create disabled: $([ "$create_disabled" = "true" ] && echo "YES" || echo "NO")"
    ocf_log info "=========================================="

    # Step 1: Create publication on source
    ocf_log info "[${db}] Step 1: Creating publication on ${pub_cluster_type}"
    if ! setup_publication "$pub_cluster_type" "$pub_host" "$pub_name" "$db" "false"; then
        ocf_log err "[${db}] Failed to create publication"
        return 1
    fi
    ocf_log info "[${db}] ✓ Publication created on ${pub_cluster_type}"

    # Step 2a: Ensure database exists on target
    ocf_log info "[${db}] Step 2a: Ensuring database exists on ${sub_cluster_type}"
    local db_exists=$(run_as_postgres_user "$sub_cluster_type" psql -h "$sub_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d postgres -Atc \
        "SELECT 1 FROM pg_database WHERE datname = '$db';" 2>/dev/null)

    if [ "$db_exists" != "1" ]; then
        ocf_log info "[${db}] Creating database on ${sub_cluster_type} cluster"
        local create_output=$(run_as_postgres_user "$sub_cluster_type" psql -h "$sub_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d postgres -c \
            "CREATE DATABASE \"$db\";" 2>&1)
        local create_rc=$?
        if [ $create_rc -ne 0 ]; then
            ocf_log err "[${db}] Failed to create database on ${sub_cluster_type} cluster"
            ocf_log err "[${db}] Error output: $create_output"
            # Cleanup publication since we failed
            ocf_log err "[${db}] Cleaning up publication"
            run_as_postgres_user "$pub_cluster_type" psql -h "$pub_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$db" -Atc \
                "DROP PUBLICATION IF EXISTS ${pub_name};" >/dev/null 2>&1
            return 1
        fi
        ocf_log info "[${db}] ✓ Database created on ${sub_cluster_type}"
    else
        ocf_log info "[${db}] ✓ Database already exists on ${sub_cluster_type}"
    fi

    # Step 2b: Copy schema from source to target (unless skipped)
    if [ "$skip_schema_copy" = "true" ]; then
        ocf_log info "[${db}] Step 2b: Schema copy SKIPPED (skip_schema_copy=true)"
    else
        ocf_log info "[${db}] Step 2b: Copying schema from ${pub_cluster_type} to ${sub_cluster_type}"

        # Use custom format dump with selective TOC restoration
        # This approach avoids copying problematic objects (functions, event triggers, publications)
        # while ensuring correct dependency order (sequences before tables, etc.)

        # Create temporary files for dump and TOC (in postgres-writable directory)
        local tmp_dir=$(get_cluster_pghome "$pub_cluster_type")
        local dump_file="${tmp_dir}/tmp_pgtwin_migrate_schema_${direction}_${db}_$$.dump"
        local toc_file="${tmp_dir}/tmp_pgtwin_migrate_toc_${direction}_${db}_$$.list"
        local filtered_toc="${tmp_dir}/tmp_pgtwin_migrate_filtered_${direction}_${db}_$$.list"

        # Step 1: Create custom format dump
        ocf_log debug "[${db}] Creating dump: $dump_file"
        run_as_postgres_user "$pub_cluster_type" pg_dump -Fc --schema-only --no-owner --no-privileges \
            -h "$pub_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" \
            -d "$db" -f "$dump_file" 2>/dev/null
        local dump_rc=$?
        ocf_log debug "[${db}] pg_dump exit code: $dump_rc"

        if [ $dump_rc -ne 0 ]; then
            ocf_log info "[${db}] No tables to copy (empty database or pg_dump failed)"
            rm -f "$dump_file" "$toc_file" "$filtered_toc"
        else
            # Check if dump file was created
            if [ ! -f "$dump_file" ]; then
                ocf_log err "[${db}] Dump file not created: $dump_file"
                return 1
            fi
            ocf_log debug "[${db}] Dump file created: $(ls -lh $dump_file 2>&1)"

            # Step 2: Extract TOC from dump (using bash to avoid shell redirection issues)
            ocf_log debug "[${db}] Extracting TOC to: $toc_file"
            run_as_postgres_user "$pub_cluster_type" bash -c "pg_restore --list '$dump_file' > '$toc_file' 2>/dev/null"

            # Check if TOC file was created
            if [ ! -f "$toc_file" ]; then
                ocf_log err "[${db}] TOC file not created: $toc_file"
                rm -f "$dump_file"
                return 1
            fi
            ocf_log debug "[${db}] TOC entries: $(wc -l < $toc_file 2>&1)"

            # Step 3: Filter TOC to include only table-related objects
            # Include: TABLE, SEQUENCE, DEFAULT, CONSTRAINT, FK CONSTRAINT
            # Exclude: FUNCTION, EVENT TRIGGER, PUBLICATION (these cause problems)
            # IMPORTANT: Run as postgres user to ensure correct file permissions
            ocf_log debug "[${db}] Filtering TOC to: $filtered_toc"
            run_as_postgres_user "$pub_cluster_type" bash -c "grep -E ' (TABLE|SEQUENCE|DEFAULT|CONSTRAINT|FK) ' '$toc_file' | grep -v 'FUNCTION\|EVENT TRIGGER\|PUBLICATION' > '$filtered_toc' 2>/dev/null"

            # Check if filtered TOC has content
            if [ ! -f "$filtered_toc" ] || [ ! -s "$filtered_toc" ]; then
                ocf_log warn "[${db}] No table objects found in schema dump"
                rm -f "$dump_file" "$toc_file" "$filtered_toc"
                # Continue - not fatal, might be empty database
            else
                ocf_log debug "[${db}] Filtered TOC entries: $(wc -l < $filtered_toc 2>&1)"

                # Step 4: Restore using filtered TOC
                ocf_log debug "[${db}] Restoring schema to ${sub_cluster_type}"
                local restore_output=$(run_as_postgres_user "$sub_cluster_type" pg_restore --use-list="$filtered_toc" \
                    --no-owner --no-privileges \
                    -h "$sub_host" -p "$OCF_RESKEY_pgport" \
                    -U "$OCF_RESKEY_migration_dbuser" -d "$db" "$dump_file" 2>&1)
                local restore_rc=$?
                ocf_log debug "[${db}] pg_restore exit code: $restore_rc, output: $restore_output"

                # Clean up temporary files
                rm -f "$dump_file" "$toc_file" "$filtered_toc"

                if [ $restore_rc -eq 0 ]; then
                    ocf_log info "[${db}] ✓ Schema copied to ${sub_cluster_type} (custom format with TOC filtering)"
                else
                    ocf_log warn "[${db}] Schema copy had warnings (exit code: $restore_rc)"
                    ocf_log warn "[${db}] Output: $restore_output"
                    # Don't fail - this may be OK if tables already exist
                fi
            fi
        fi
    fi

    # Step 3: Create subscription on target
    if [ "$create_disabled" = "true" ]; then
        ocf_log info "[${db}] Step 3: Creating DISABLED subscription on ${sub_cluster_type}"
        local conn_str="host=${pub_host} port=${OCF_RESKEY_pgport} dbname=${db} user=${OCF_RESKEY_migration_dbuser}"

        if ! setup_subscription_disabled "$sub_cluster_type" "$sub_host" "$sub_name" \
            "$pub_name" "$slot_name" "$pub_host" "$db" "$read_only_override"; then
            ocf_log err "[${db}] Failed to create disabled subscription"
            # Cleanup publication
            ocf_log err "[${db}] Cleaning up publication"
            run_as_postgres_user "$pub_cluster_type" psql -h "$pub_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$db" -Atc \
                "DROP PUBLICATION IF EXISTS ${pub_name};" >/dev/null 2>&1
            return 1
        fi
        ocf_log info "[${db}] ✓ DISABLED subscription created on ${sub_cluster_type}"
    else
        ocf_log info "[${db}] Step 3: Creating ENABLED subscription on ${sub_cluster_type}"
        local conn_str="host=${pub_host} port=${OCF_RESKEY_pgport} dbname=${db} user=${OCF_RESKEY_migration_dbuser}"

        if ! setup_subscription "$sub_cluster_type" "$sub_host" "$sub_name" "$conn_str" "$pub_name" "$slot_name" "$db" "false"; then
            ocf_log err "[${db}] Failed to create subscription"
            # Cleanup publication
            ocf_log err "[${db}] Cleaning up publication"
            run_as_postgres_user "$pub_cluster_type" psql -h "$pub_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$db" -Atc \
                "DROP PUBLICATION IF EXISTS ${pub_name};" >/dev/null 2>&1
            return 1
        fi
        ocf_log info "[${db}] ✓ ENABLED subscription created on ${sub_cluster_type}"
    fi

    # Debug: Verify subscription is in the correct database
    local verify_sub=$(run_as_postgres_user "$sub_cluster_type" psql -h "$sub_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$db" -Atc \
        "SELECT subname FROM pg_subscription WHERE subname = '${sub_name}' AND subdbid = (SELECT oid FROM pg_database WHERE datname = current_database());" 2>/dev/null)
    if [ "$verify_sub" = "$sub_name" ]; then
        ocf_log info "[${db}] ✓ Subscription verified in correct database"
    else
        ocf_log warn "[${db}] Subscription verification failed (non-fatal)"
    fi

    ocf_log info "[${db}] ✓ ${direction} replication setup complete"
    return 0
}

#######################################################################
# Database-level forward replication setup (legacy wrapper)
#######################################################################

setup_database_forward_replication() {
    # Legacy wrapper for setup_database_replication() - maintains backward compatibility
    # Usage: setup_database_forward_replication "db_name" "source_node" "target_node"
    # Returns: 0 on success, 1 on failure

    local db="$1"
    local source_node="$2"
    local target_node="$3"

    # Call unified function with forward direction, schema copy enabled, subscription enabled
    setup_database_replication "$db" "forward" "$source_node" "$target_node" "false" "false"
    return $?
}

# Create subscription with enabled=false (for reverse replication preparation)
# Args: cluster_type, host, sub_name, pub_name, slot_name, source_host, database, [read_only_override]
setup_subscription_disabled() {
    local cluster_type=$1
    local host=$2
    local sub_name=$3
    local pub_name=$4
    local slot_name=$5
    local source_host=$6
    local database=$7
    local read_only_override="${8:-false}"  # Optional, defaults to false

    # Check if subscription already exists
    local exists=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc \
        "SELECT EXISTS(SELECT 1 FROM pg_subscription WHERE subname = '${sub_name}' AND subdbid = (SELECT oid FROM pg_database WHERE datname = current_database()));" 2>/dev/null)

    if [ "$exists" = "t" ]; then
        ocf_log info "Subscription ${sub_name} already exists on ${host} (database: ${database})"

        # Check if it's enabled
        local is_enabled=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc \
            "SELECT subenabled FROM pg_subscription WHERE subname = '${sub_name}' AND subdbid = (SELECT oid FROM pg_database WHERE datname = current_database());" 2>/dev/null)

        if [ "$is_enabled" = "t" ]; then
            ocf_log warn "Subscription ${sub_name} is ENABLED (expected disabled for preparation)"
        else
            ocf_log info "✓ Subscription ${sub_name} is correctly DISABLED (ready for cutover)"
        fi
        return 0
    fi

    # Build connection string
    local conn_str="host=${source_host} port=${OCF_RESKEY_pgport} dbname=${database} user=${OCF_RESKEY_migration_dbuser}"

    ocf_log info "Creating DISABLED subscription: ${sub_name} on ${host} (database: ${database})"
    ocf_log info "   This creates replication slot on ${source_host} but doesn't start replication"

    if [ "$read_only_override" = "true" ]; then
        # Create temporary SQL file with read_only override (for cutover when source is read-only)
        local tmpfile=$(mktemp)
        cat > "$tmpfile" <<EOF
SET default_transaction_read_only = off;
CREATE SUBSCRIPTION ${sub_name}
CONNECTION '${conn_str}'
PUBLICATION ${pub_name}
WITH (
  enabled = false,
  create_slot = true,
  slot_name = '${slot_name}',
  copy_data = false
);
EOF
        run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$database" -f "$tmpfile" >/dev/null 2>&1
        local rc=$?
        rm -f "$tmpfile"
        if [ $rc -ne 0 ]; then
            ocf_log err "Failed to create disabled subscription: ${sub_name}"
            return 1
        fi
    else
        # Normal mode (no read_only override needed)
        run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$database" -c \
            "CREATE SUBSCRIPTION ${sub_name}
             CONNECTION '${conn_str}'
             PUBLICATION ${pub_name}
             WITH (
               enabled = false,
               create_slot = true,
               slot_name = '${slot_name}',
               copy_data = false
             );" >/dev/null 2>&1

        if [ $? -ne 0 ]; then
            ocf_log err "Failed to create disabled subscription: ${sub_name}"
            return 1
        fi
    fi

    ocf_log info "✓ Disabled subscription created: ${sub_name}"
    ocf_log info "   Replication slot '${slot_name}' is now preserving WAL on ${source_host}"
    return 0
}

# Set cluster to read-only or read-write mode
# Args: cluster_type, host, read_only_value ("true" or "false"), database
# Note: default_transaction_read_only is CLUSTER-WIDE, but we need a database to connect to
set_cluster_read_only() {
    local cluster_type=$1
    local host=$2
    local read_only=$3  # "true" or "false"
    local database=$4   # Database to use for connection (setting is cluster-wide)

    if [ "$read_only" = "true" ]; then
        ocf_log info "Setting cluster to READ-ONLY mode on ${host}"
    else
        ocf_log info "Setting cluster to READ-WRITE mode on ${host}"
    fi

    # Set the parameter (cluster-wide, applies to all databases)
    run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$database" -c \
        "ALTER SYSTEM SET default_transaction_read_only = ${read_only};" >/dev/null 2>&1

    if [ $? -ne 0 ]; then
        ocf_log err "Failed to set default_transaction_read_only = ${read_only}"
        return 1
    fi

    # Reload configuration
    run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$database" -c \
        "SELECT pg_reload_conf();" >/dev/null 2>&1

    if [ $? -ne 0 ]; then
        ocf_log err "Failed to reload PostgreSQL configuration"
        return 1
    fi

    # Verify the setting using pg_file_settings (immune to role-level overrides)
    # Note: pgmigrate user has default_transaction_read_only=off role override
    # so SHOW would always return 'off'. We check the file setting instead.
    local current_value=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc \
        "SELECT setting FROM pg_file_settings WHERE name = 'default_transaction_read_only' AND sourcefile LIKE '%postgresql.auto.conf' ORDER BY seqno DESC LIMIT 1;" 2>/dev/null)

    # Normalize PostgreSQL boolean values for comparison
    local expected_value="$read_only"

    if [ "$current_value" = "$expected_value" ]; then
        ocf_log info "✓ Cluster is now in ${read_only} mode (verified from postgresql.auto.conf: ${current_value})"
        return 0
    else
        ocf_log err "Failed to verify read-only mode: expected ${expected_value}, got ${current_value}"
        return 1
    fi
}

# Enable a disabled subscription
# Args: cluster_type, host, sub_name, database
enable_subscription() {
    local cluster_type=$1
    local host=$2
    local sub_name=$3
    local database=$4

    ocf_log info "Enabling subscription: ${sub_name} on ${host} (database: ${database})"

    run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$database" -c \
        "ALTER SUBSCRIPTION ${sub_name} ENABLE;" >/dev/null 2>&1

    if [ $? -ne 0 ]; then
        ocf_log err "Failed to enable subscription: ${sub_name} on database: ${database}"
        return 1
    fi

    # Verify it's enabled
    local is_enabled=$(run_as_postgres_user "$cluster_type" psql -h "$host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$database" -Atc \
        "SELECT subenabled FROM pg_subscription WHERE subname = '${sub_name}' AND subdbid = (SELECT oid FROM pg_database WHERE datname = current_database());" 2>/dev/null)

    if [ "$is_enabled" = "t" ]; then
        ocf_log info "✓ Subscription enabled: ${sub_name} on database: ${database}"
        return 0
    else
        ocf_log err "Failed to verify subscription enabled"
        return 1
    fi
}

# ============================================================================
# MULTI-DATABASE ITERATION HELPERS
# ============================================================================

init_database_migration() {
    # Initialize database migration state on first start
    # Creates baseline database list and per-database state files

    local pghome=$(get_cluster_pghome "source")
    local baseline_file="${pghome}/.migration_databases_baseline"

    # Store baseline on first run
    if [ ! -f "$baseline_file" ]; then
        if [ -n "$OCF_RESKEY_databases" ]; then
            echo "$OCF_RESKEY_databases" > "$baseline_file"
            ocf_log info "Migration initialized with databases: $OCF_RESKEY_databases"
        else
            # Backward compatibility: single database mode
            echo "$OCF_RESKEY_pgdatabase" > "$baseline_file"
            ocf_log info "Migration initialized with database: $OCF_RESKEY_pgdatabase (legacy mode)"
        fi

        # Create state files for each database
        for db in $DB_LIST; do
            local state_file="${pghome}/.migration_state_${db}"
            cat > "$state_file" <<EOF
lag=unknown
quiesced=false
reverse_prepared=false
last_update=$(date +%s)
EOF
            ocf_log debug "Created state file for database: $db"
        done
    fi
}

validate_database_list_before_cutover() {
    # Validate that database list hasn't changed since migration started
    # Returns 0 if valid, 1 if mismatch detected

    local pghome=$(get_cluster_pghome "source")
    local baseline_file="${pghome}/.migration_databases_baseline"

    if [ ! -f "$baseline_file" ]; then
        ocf_log err "No baseline database list found - migration not initialized?"
        return 1
    fi

    local baseline_databases=$(cat "$baseline_file")
    local current_databases

    if [ -n "$OCF_RESKEY_databases" ]; then
        current_databases="$OCF_RESKEY_databases"
    else
        current_databases="$OCF_RESKEY_pgdatabase"
    fi

    if [ "$current_databases" != "$baseline_databases" ]; then
        ocf_log err "=========================================="
        ocf_log err "⚠️  DATABASE LIST MISMATCH DETECTED ⚠️"
        ocf_log err "=========================================="
        ocf_log err "Database list has changed since migration started!"
        ocf_log err ""
        ocf_log err "Baseline (migration start): $baseline_databases"
        ocf_log err "Current (now):              $current_databases"
        ocf_log err ""

        # Determine what changed
        local baseline_sorted=$(echo "$baseline_databases" | tr ',' '\n' | sort)
        local current_sorted=$(echo "$current_databases" | tr ',' '\n' | sort)

        local added=$(comm -13 <(echo "$baseline_sorted") <(echo "$current_sorted") | tr '\n' ',' | sed 's/,$//')
        local removed=$(comm -23 <(echo "$baseline_sorted") <(echo "$current_sorted") | tr '\n' ',' | sed 's/,$//')

        if [ -n "$added" ]; then
            ocf_log err "Added databases: $added"
            ocf_log err "  → These databases have NOT been migrated!"
            ocf_log err "  → They will NOT be included in VIP cutover!"
        fi

        if [ -n "$removed" ]; then
            ocf_log err "Removed databases: $removed"
            ocf_log err "  → These databases were being migrated but now removed!"
            ocf_log err "  → Orphaned pub/sub may exist on clusters!"
        fi

        ocf_log err ""
        ocf_log err "CUTOVER BLOCKED - Manual intervention required:"
        ocf_log err ""
        ocf_log err "Option 1: Restore original database list"
        ocf_log err "  crm resource param $OCF_RESOURCE_INSTANCE set databases \"$baseline_databases\""
        ocf_log err ""
        ocf_log err "Option 2: Abort current migration and start fresh"
        ocf_log err "  1. Stop migration: crm resource stop $OCF_RESOURCE_INSTANCE"
        ocf_log err "  2. Clean up state files: rm -f ${pghome}/.migration_*"
        ocf_log err "  3. Update database list to desired value"
        ocf_log err "  4. Start fresh migration: crm resource start $OCF_RESOURCE_INSTANCE"
        ocf_log err ""
        ocf_log err "Option 3: Migrate new databases separately"
        ocf_log err "  - Complete current migration with baseline database list"
        ocf_log err "  - Create new pgtwin-migrate resource for new databases"
        ocf_log err "=========================================="

        return 1
    fi

    ocf_log info "✓ Database list validation passed"
    ocf_log info "  Migrating: $current_databases"
    return 0
}

update_database_state() {
    # Update state file for a specific database
    # Args: database, lag, quiesced, reverse_prepared

    local database=$1
    local lag=$2
    local quiesced=$3
    local reverse_prepared=$4

    local pghome=$(get_cluster_pghome "source")
    local state_file="${pghome}/.migration_state_${database}"
    local temp_file="${state_file}.tmp"

    cat > "$temp_file" <<EOF
lag=$lag
quiesced=$quiesced
reverse_prepared=$reverse_prepared
last_update=$(date +%s)
EOF

    mv "$temp_file" "$state_file"  # Atomic update
}

get_database_state() {
    # Get state for a specific database
    # Args: database, field_name
    # Returns: field value or empty string

    local database=$1
    local field=$2

    local pghome=$(get_cluster_pghome "source")
    local state_file="${pghome}/.migration_state_${database}"

    if [ ! -f "$state_file" ]; then
        echo ""
        return 1
    fi

    grep "^${field}=" "$state_file" 2>/dev/null | cut -d'=' -f2
}

# Swap VIP IP addresses (production VIP ↔ target VIP)
swap_vip_addresses() {
    # Get current IPs (handles both ip=value and ip="value" formats)
    local production_vip_ip=$(crm configure show "$OCF_RESKEY_production_vip_resource" 2>/dev/null | grep -oP 'params ip=\K[0-9.]+' | head -1)
    local target_vip_ip=$(crm configure show "$OCF_RESKEY_target_vip_resource" 2>/dev/null | grep -oP 'params ip=\K[0-9.]+' | head -1)

    if [ -z "$production_vip_ip" ] || [ -z "$target_vip_ip" ]; then
        ocf_log err "Failed to get current VIP IP addresses"
        return 1
    fi

    ocf_log info "Swapping VIP IPs:"
    ocf_log info "   ${OCF_RESKEY_production_vip_resource}: ${production_vip_ip} → ${target_vip_ip}"
    ocf_log info "   ${OCF_RESKEY_target_vip_resource}: ${target_vip_ip} → ${production_vip_ip}"

    # Swap: production VIP gets target IP, target VIP gets production IP
    crm resource param "$OCF_RESKEY_production_vip_resource" set ip "$target_vip_ip" 2>/dev/null
    if [ $? -ne 0 ]; then
        ocf_log err "Failed to update production VIP to ${target_vip_ip}"
        return 1
    fi

    crm resource param "$OCF_RESKEY_target_vip_resource" set ip "$production_vip_ip" 2>/dev/null
    if [ $? -ne 0 ]; then
        ocf_log err "CRITICAL: Failed to update target VIP (production VIP already changed!)"
        ocf_log err "Manual recovery required:"
        ocf_log err "   crm resource param $OCF_RESKEY_target_vip_resource set ip $production_vip_ip"
        ocf_log err "Or rollback production VIP:"
        ocf_log err "   crm resource param $OCF_RESKEY_production_vip_resource set ip $production_vip_ip"
        return 1
    fi

    ocf_log info "✓ VIP addresses swapped successfully"
    ocf_log info "   Applications now connect to target cluster via ${production_vip_ip}"
    return 0
}

# Restore VIP IP addresses to original state (for rollback)
restore_vip_addresses() {
    # Restore VIPs to their original pre-migration IPs using configured parameters
    # This is NOT a toggle - it sets VIPs to known original values
    # Requires original_production_vip_ip and original_target_vip_ip parameters

    if [ -z "$OCF_RESKEY_original_production_vip_ip" ] || [ -z "$OCF_RESKEY_original_target_vip_ip" ]; then
        ocf_log warn "Original VIP IPs not configured - falling back to toggle swap"
        ocf_log warn "For reliable rollback, configure original_production_vip_ip and original_target_vip_ip parameters"
        swap_vip_addresses
        return $?
    fi

    # Get current IPs
    local current_production_vip_ip=$(crm configure show "$OCF_RESKEY_production_vip_resource" 2>/dev/null | grep -oP 'params ip=\K[0-9.]+' | head -1)
    local current_target_vip_ip=$(crm configure show "$OCF_RESKEY_target_vip_resource" 2>/dev/null | grep -oP 'params ip=\K[0-9.]+' | head -1)

    # Check if already at original state
    if [ "$current_production_vip_ip" = "$OCF_RESKEY_original_production_vip_ip" ] && \
       [ "$current_target_vip_ip" = "$OCF_RESKEY_original_target_vip_ip" ]; then
        ocf_log info "VIPs already at original state - no restore needed"
        return 0
    fi

    ocf_log info "Restoring VIP IPs to original state:"
    ocf_log info "   ${OCF_RESKEY_production_vip_resource}: ${current_production_vip_ip} → ${OCF_RESKEY_original_production_vip_ip}"
    ocf_log info "   ${OCF_RESKEY_target_vip_resource}: ${current_target_vip_ip} → ${OCF_RESKEY_original_target_vip_ip}"

    # Restore production VIP to original IP
    crm resource param "$OCF_RESKEY_production_vip_resource" set ip "$OCF_RESKEY_original_production_vip_ip" 2>/dev/null
    if [ $? -ne 0 ]; then
        ocf_log err "Failed to restore production VIP to ${OCF_RESKEY_original_production_vip_ip}"
        return 1
    fi

    # Restore target VIP to original IP
    crm resource param "$OCF_RESKEY_target_vip_resource" set ip "$OCF_RESKEY_original_target_vip_ip" 2>/dev/null
    if [ $? -ne 0 ]; then
        ocf_log err "CRITICAL: Failed to restore target VIP (production VIP already restored!)"
        ocf_log err "Manual recovery required:"
        ocf_log err "   crm resource param $OCF_RESKEY_target_vip_resource set ip $OCF_RESKEY_original_target_vip_ip"
        return 1
    fi

    ocf_log info "✓ VIP addresses restored to original state"
    return 0
}

setup_ddl_trigger() {
    # Create DDL event trigger for schema replication
    # Usage: setup_ddl_trigger <cluster_type> <source_host> <target_host> <target_name> <read_only_override> <database>
    # Example: setup_ddl_trigger "source" "pgtwin01" "pgtwin12" "pg18" "false" "postgres"
    # Creates: replicate_ddl_to_<target_name>() function and trigger

    local cluster_type="$1"
    local source_host="$2"
    local target_host="$3"
    local target_name="$4"  # e.g., "pg17" or "pg18"
    local read_only_override="$5"
    local database="${6:-${OCF_RESKEY_pgdatabase}}"  # Database name (default to legacy parameter for backward compat)

    local func_name="replicate_ddl_to_${target_name}"
    local conn_func_name="get_${target_name}_connection"
    local trigger_name="${func_name}_trigger"

    ocf_log info "Setting up DDL replication trigger: ${func_name} on ${source_host} for database: ${database}"

    # Create the DDL replication function and trigger
    if [ "$read_only_override" = "true" ]; then
        run_as_postgres_user "$cluster_type" psql -h "$source_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" <<EOSQL >/dev/null 2>&1
SET default_transaction_read_only = off;
CREATE EXTENSION IF NOT EXISTS dblink;

CREATE OR REPLACE FUNCTION ${conn_func_name}() RETURNS text
LANGUAGE sql STABLE
AS \$\$ SELECT 'host=${target_host} port=${OCF_RESKEY_pgport} dbname=postgres user=${OCF_RESKEY_migration_dbuser}'; \$\$;

CREATE OR REPLACE FUNCTION ${func_name}() RETURNS event_trigger
LANGUAGE plpgsql AS \$\$
DECLARE
    ddl_command text;
    obj record;
    target_conn text;
    conn_name text := '${target_name}_ddl_conn';
BEGIN
    ddl_command := current_query();
    IF ddl_command ~* 'migration_control|ddl_audit_log' THEN RETURN; END IF;
    target_conn := ${conn_func_name}();
    FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands()
                WHERE object_type = 'table' AND schema_name = 'public'
    LOOP
        BEGIN
            RAISE NOTICE '→ Replicating DDL to ${target_name}: %', obj.object_identity;
            -- Create persistent connection
            PERFORM dblink_connect(conn_name, target_conn);
            -- Set session to allow writes (superuser override)
            PERFORM dblink_exec(conn_name, 'SET default_transaction_read_only = off', true);
            -- Execute DDL command
            PERFORM dblink_exec(conn_name, ddl_command, true);
            -- Close connection
            PERFORM dblink_disconnect(conn_name);
            RAISE NOTICE '✓ DDL replicated successfully';
        EXCEPTION WHEN OTHERS THEN
            -- Ensure connection is closed on error
            BEGIN
                PERFORM dblink_disconnect(conn_name);
            EXCEPTION WHEN OTHERS THEN
                -- Ignore disconnect errors
            END;
            RAISE WARNING '✗ Failed to replicate DDL: %', SQLERRM;
        END;
    END LOOP;
END;
\$\$;

DROP EVENT TRIGGER IF EXISTS ${trigger_name};
CREATE EVENT TRIGGER ${trigger_name}
    ON ddl_command_end
    WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE', 'DROP TABLE')
    EXECUTE FUNCTION ${func_name}();
EOSQL
    else
        run_as_postgres_user "$cluster_type" psql -h "$source_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$database" <<EOSQL >/dev/null 2>&1
CREATE EXTENSION IF NOT EXISTS dblink;

CREATE OR REPLACE FUNCTION ${conn_func_name}() RETURNS text
LANGUAGE sql STABLE
AS \$\$ SELECT 'host=${target_host} port=${OCF_RESKEY_pgport} dbname=postgres user=${OCF_RESKEY_migration_dbuser}'; \$\$;

CREATE OR REPLACE FUNCTION ${func_name}() RETURNS event_trigger
LANGUAGE plpgsql AS \$\$
DECLARE
    ddl_command text;
    obj record;
    target_conn text;
    conn_name text := '${target_name}_ddl_conn';
BEGIN
    ddl_command := current_query();
    IF ddl_command ~* 'migration_control|ddl_audit_log' THEN RETURN; END IF;
    target_conn := ${conn_func_name}();
    FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands()
                WHERE object_type = 'table' AND schema_name = 'public'
    LOOP
        BEGIN
            RAISE NOTICE '→ Replicating DDL to ${target_name}: %', obj.object_identity;
            -- Create persistent connection
            PERFORM dblink_connect(conn_name, target_conn);
            -- Set session to allow writes (superuser override)
            PERFORM dblink_exec(conn_name, 'SET default_transaction_read_only = off', true);
            -- Execute DDL command
            PERFORM dblink_exec(conn_name, ddl_command, true);
            -- Close connection
            PERFORM dblink_disconnect(conn_name);
            RAISE NOTICE '✓ DDL replicated successfully';
        EXCEPTION WHEN OTHERS THEN
            -- Ensure connection is closed on error
            BEGIN
                PERFORM dblink_disconnect(conn_name);
            EXCEPTION WHEN OTHERS THEN
                -- Ignore disconnect errors
            END;
            RAISE WARNING '✗ Failed to replicate DDL: %', SQLERRM;
        END;
    END LOOP;
END;
\$\$;

DROP EVENT TRIGGER IF EXISTS ${trigger_name};
CREATE EVENT TRIGGER ${trigger_name}
    ON ddl_command_end
    WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE', 'DROP TABLE')
    EXECUTE FUNCTION ${func_name}();
EOSQL
    fi

    if [ $? -ne 0 ]; then
        ocf_log err "Failed to setup DDL trigger: ${func_name}"
        return 1
    fi

    ocf_log info "✓ DDL trigger setup complete: ${func_name}"
    return 0
}

# ============================================================================
# END OF REUSABLE FUNCTIONS
# ============================================================================

# ============================================================================
# SANITY CHECK FUNCTIONS
# Each function checks one aspect and optionally fixes it
# Parameters: varies by function, last parameter is always fix (true/false)
# Returns: 0 for OK/FIXED, 1 for ERROR
# ============================================================================

check_cluster_readwrite() {
    # Check if cluster is read-write (not quiesced)
    # Args: $1=cluster_type ("source"|"target"), $2=node_host, $3=fix (true|false, default=false)
    local cluster_type="$1"
    local node_host="$2"
    local fix="${3:-false}"

    if [ -z "$node_host" ]; then
        ocf_log err "  ✗ FAILED: Cannot discover ${cluster_type} primary node"
        ocf_log err "    Recovery: Check cluster status: crm status"
        return 1
    fi

    # Check pg_file_settings to see actual configured value (immune to role-level overrides)
    local readonly=$(run_as_postgres_user "$cluster_type" psql -h "$node_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$OCF_RESKEY_pgdatabase" -Atc \
        "SELECT COALESCE((SELECT setting FROM pg_file_settings WHERE name = 'default_transaction_read_only' AND sourcefile LIKE '%postgresql.auto.conf' ORDER BY seqno DESC LIMIT 1), 'false');" 2>/dev/null)

    if [ "$readonly" = "false" ] || [ "$readonly" = "off" ] || [ -z "$readonly" ]; then
        ocf_log err "  ✓ ${cluster_type^} is read-write"
        return 0
    else
        # Issue found
        if [ "$fix" = "true" ]; then
            ocf_log err "  ⚠ ${cluster_type^} is read-only (value: $readonly) - attempting fix..."
            run_as_postgres_user "$cluster_type" psql -h "$node_host" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$OCF_RESKEY_pgdatabase" <<EOSQL >/dev/null 2>&1
ALTER SYSTEM SET default_transaction_read_only = off;
SELECT pg_reload_conf();
EOSQL
            if [ $? -eq 0 ]; then
                ocf_log err "  🔧 FIXED: ${cluster_type^} unquiesced successfully"
                return 0
            else
                ocf_log err "  ✗ ERROR: Failed to unquiesce ${cluster_type}"
                ocf_log err "    Manual recovery required: Run on ${cluster_type}:"
                ocf_log err "      ALTER SYSTEM SET default_transaction_read_only = off;"
                ocf_log err "      SELECT pg_reload_conf();"
                return 1
            fi
        else
            # Check-only mode
            ocf_log err "  ✗ FAILED: ${cluster_type^} is read-only (value: $readonly)"
            ocf_log err "    Recovery: Run on ${cluster_type}:"
            ocf_log err "      ALTER SYSTEM SET default_transaction_read_only = off;"
            ocf_log err "      SELECT pg_reload_conf();"
            return 1
        fi
    fi
}

rollback_cutover() {
    # Rollback failed cutover to restore original state
    # Arguments: $1 = step where failure occurred, $2 = error message
    # Updated for 9-step zero-risk cutover sequence

    local failed_step="$1"
    local error_msg="$2"

    ocf_log err "=========================================="
    ocf_log err "CUTOVER FAILED AT STEP ${failed_step}"
    ocf_log err "Error: ${error_msg}"
    ocf_log err "=========================================="
    ocf_log err "Initiating automatic rollback..."

    # Discover cluster nodes
    local source_primary=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted" 2>/dev/null)
    local target_primary=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted" 2>/dev/null)

    if [ -z "$source_primary" ]; then
        ocf_log err "Cannot discover source primary - rollback may be incomplete"
        ocf_log err "MANUAL INTERVENTION REQUIRED"
        return 1
    fi

    # Use first database for connection (cluster-wide operations)
    local first_db="${DB_LIST%% *}"

    # Rollback logic based on where we failed
    # Steps 1-2: Before reverse infrastructure creation
    # Steps 3-4: After reverse infrastructure creation, before VIP swap
    # Step 5: During VIP swap
    # Steps 6-9: After VIP swap succeeded
    case $failed_step in
        1|2)
            # Failed during read-only or sync wait
            # Just restore source to read-write
            ocf_log info "Rollback: Restoring source cluster to read-write"
            set_cluster_read_only "source" "$source_primary" "false" "$first_db"
            if [ $? -eq 0 ]; then
                ocf_log info "✓ Source cluster restored to production (read-write)"
            else
                ocf_log err "✗ Failed to restore source to read-write (MANUAL FIX REQUIRED)"
            fi
            ;;

        3|4)
            # Failed during reverse infrastructure creation (step 3) or verification (step 4)
            # VIP has NOT been swapped yet - safe to cleanup and restore
            # Cleanup reverse infrastructure, then restore source to read-write
            ocf_log info "Rollback: Cleaning up reverse infrastructure (if partially created)"
            for db in $DB_LIST; do
                local reverse_pub_name="pgtwin_migrate_reverse_pub_${db}"
                local reverse_sub_name="pgtwin_migrate_reverse_sub_${db}"
                local trigger_name="replicate_ddl_to_source_trigger_${db}"

                # Drop reverse subscription (if created)
                run_as_postgres_user "source" psql -h "$source_primary" -p "$OCF_RESKEY_pgport" \
                    -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
                    "DROP SUBSCRIPTION IF EXISTS ${reverse_sub_name};" >/dev/null 2>&1

                # Drop reverse publication (if created)
                if [ -n "$target_primary" ]; then
                    run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
                        -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
                        "DROP PUBLICATION IF EXISTS ${reverse_pub_name};" >/dev/null 2>&1

                    # Drop reverse DDL trigger (if created)
                    run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
                        -U "$OCF_RESKEY_migration_dbuser" -d "$db" <<EOSQL >/dev/null 2>&1
DROP EVENT TRIGGER IF EXISTS ${trigger_name} CASCADE;
DROP FUNCTION IF EXISTS replicate_ddl_to_source_${db}() CASCADE;
DROP FUNCTION IF EXISTS get_source_connection_${db}() CASCADE;
EOSQL
                fi
            done
            ocf_log info "✓ Reverse infrastructure cleaned up"

            ocf_log info "Rollback: Restoring source cluster to read-write"
            set_cluster_read_only "source" "$source_primary" "false" "$first_db"
            if [ $? -eq 0 ]; then
                ocf_log info "✓ Source cluster restored to production (read-write)"
            else
                ocf_log err "✗ Failed to restore source to read-write (MANUAL FIX REQUIRED)"
            fi
            ;;

        5)
            # Failed during VIP swap
            # Cleanup reverse infrastructure, swap VIPs back (may be partial), restore read-write
            ocf_log info "Rollback: Cleaning up reverse infrastructure"
            for db in $DB_LIST; do
                local reverse_pub_name="pgtwin_migrate_reverse_pub_${db}"
                local reverse_sub_name="pgtwin_migrate_reverse_sub_${db}"
                local trigger_name="replicate_ddl_to_source_trigger_${db}"

                run_as_postgres_user "source" psql -h "$source_primary" -p "$OCF_RESKEY_pgport" \
                    -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
                    "DROP SUBSCRIPTION IF EXISTS ${reverse_sub_name};" >/dev/null 2>&1

                if [ -n "$target_primary" ]; then
                    run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
                        -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
                        "DROP PUBLICATION IF EXISTS ${reverse_pub_name};" >/dev/null 2>&1

                    run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
                        -U "$OCF_RESKEY_migration_dbuser" -d "$db" <<EOSQL >/dev/null 2>&1
DROP EVENT TRIGGER IF EXISTS ${trigger_name} CASCADE;
DROP FUNCTION IF EXISTS replicate_ddl_to_source_${db}() CASCADE;
DROP FUNCTION IF EXISTS get_source_connection_${db}() CASCADE;
EOSQL
                fi
            done

            ocf_log info "Rollback: Restoring VIP IPs to original state"
            restore_vip_addresses  # Restore to configured original IPs
            if [ $? -eq 0 ]; then
                ocf_log info "✓ VIPs restored to original state"
            else
                ocf_log err "✗ Failed to swap VIPs back (MANUAL FIX REQUIRED)"
                ocf_log err "Manual commands:"
                ocf_log err "  Check current VIP IPs: crm configure show | grep 'primitive.*vip'"
                ocf_log err "  Restore if needed: crm resource param <resource> set ip <correct_ip>"
            fi

            ocf_log info "Rollback: Restoring source cluster to read-write"
            set_cluster_read_only "source" "$source_primary" "false" "$first_db"
            if [ $? -eq 0 ]; then
                ocf_log info "✓ Source cluster restored to production (read-write)"
            else
                ocf_log err "✗ Failed to restore source to read-write (MANUAL FIX REQUIRED)"
            fi
            ;;

        6|7|8|9)
            # Failed after VIP swap (step 5 succeeded)
            # Recovery strategy:
            # 1. Disable reverse subscriptions (if enabled) - ALL DATABASES
            # 2. Re-enable forward subscriptions (restore original replication) - ALL DATABASES
            # 3. Swap VIPs back to original state
            # 4. Restore source to read-write

            ocf_log info "Rollback: Disabling reverse subscriptions (if enabled, ${DB_COUNT} databases)"
            for db in $DB_LIST; do
                local reverse_sub_name="pgtwin_migrate_reverse_sub_${db}"
                run_as_postgres_user "source" psql -h "$source_primary" -p "$OCF_RESKEY_pgport" \
                    -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
                    "ALTER SUBSCRIPTION ${reverse_sub_name} DISABLE;" >/dev/null 2>&1
                if [ $? -eq 0 ]; then
                    ocf_log info "✓ Reverse subscription disabled for database: $db"
                else
                    ocf_log warn "Could not disable reverse subscription for database: $db (may not have been enabled)"
                fi
            done

            ocf_log info "Rollback: Re-enabling forward subscriptions (${DB_COUNT} databases)"
            for db in $DB_LIST; do
                local forward_sub_name="pgtwin_migrate_forward_sub_${db}"
                run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
                    -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
                    "ALTER SUBSCRIPTION ${forward_sub_name} ENABLE;" >/dev/null 2>&1
                if [ $? -eq 0 ]; then
                    ocf_log info "✓ Forward subscription re-enabled for database: $db"
                else
                    ocf_log warn "Could not re-enable forward subscription for database: $db"
                fi
            done

            ocf_log info "Rollback: Restoring VIP IPs to original state"
            restore_vip_addresses  # Restore to configured original IPs
            if [ $? -eq 0 ]; then
                ocf_log info "✓ VIPs restored to original state"
            else
                ocf_log err "✗ Failed to swap VIPs back (MANUAL FIX REQUIRED)"
            fi

            ocf_log info "Rollback: Restoring source cluster to read-write"
            set_cluster_read_only "source" "$source_primary" "false" "$first_db"
            if [ $? -eq 0 ]; then
                ocf_log info "✓ Source cluster restored to production (read-write)"
            else
                ocf_log err "✗ Failed to restore source to read-write (MANUAL FIX REQUIRED)"
            fi
            ;;

        *)
            ocf_log err "Unknown failed step: ${failed_step}"
            ocf_log err "Attempting basic recovery (restore source to read-write)"
            set_cluster_read_only "source" "$source_primary" "false" "$first_db"
            ;;
    esac

    # Reset cluster state (cluster property - cluster-wide, persistent)
    crm_attribute -n migration-state -v "FORWARD_REPLICATION" 2>/dev/null
    ocf_log info "✓ Cluster state reset to FORWARD_REPLICATION"

    ocf_log err "=========================================="
    ocf_log err "ROLLBACK COMPLETE"
    ocf_log err "=========================================="
    ocf_log err "Current state:"
    ocf_log err "  - Source cluster: Production (read-write)"
    ocf_log err "  - Target cluster: Receiving forward replication"
    ocf_log err "  - Reverse subscription: Disabled (ready for retry)"
    ocf_log err ""
    ocf_log err "Next actions:"
    ocf_log err "  1. Investigate root cause in cutover log"
    ocf_log err "  2. Fix the issue"
    ocf_log err "  3. Set cutover_ready=false to clear trigger"
    ocf_log err "  4. When ready, set cutover_ready=true to retry"

    return 1
}

start_async_cutover_v2() {
    # v2.0: Start cutover in background with direction awareness
    # Usage: start_async_cutover_v2 "current_production" "desired_production" "old_direction" "new_direction"

    local current_production="$1"
    local desired_production="$2"
    local old_direction="$3"
    local new_direction="$4"

    local pghome=$(get_cluster_pghome "target")
    local cutover_state_file="${pghome}/.cutover_in_progress_${OCF_RESOURCE_INSTANCE}"
    local cutover_timestamp=$(date '+%Y%m%d-%H%M%S')
    local cutover_log="${pghome}/.cutover-${cutover_timestamp}.log"
    local cutover_log_symlink="${pghome}/.cutover.log"

    if [ -f "$cutover_state_file" ]; then
        ocf_log info "Cutover already in progress"
        return 0
    fi

    ocf_log info "Starting v2.0 asynchronous cutover"
    ocf_log info "  From: $current_production ($old_direction replication)"
    ocf_log info "  To: $desired_production ($new_direction replication)"
    ocf_log info "  Log: $cutover_log"

    # Create state file
    cat > "$cutover_state_file" <<EOF
status=running
start_time=$(date +%s)
step=0
current_step='Initializing v2.0 cutover'
error_message=''
failed_step=''
cutover_log='$cutover_log'
v2_current_production='$current_production'
v2_desired_production='$desired_production'
v2_old_direction='$old_direction'
v2_new_direction='$new_direction'
EOF
    chown postgres:postgres "$cutover_state_file"

    touch "$cutover_log"
    chown postgres:postgres "$cutover_log"
    rm -f "$cutover_log_symlink"
    ln -s "$(basename "$cutover_log")" "$cutover_log_symlink"

    # Export all necessary variables for background process
    ( export OCF_RESKEY_pgdata="$OCF_RESKEY_pgdata"
      export OCF_RESKEY_pgport="$OCF_RESKEY_pgport"
      export OCF_RESKEY_migration_dbuser="$OCF_RESKEY_migration_dbuser"
      export OCF_RESKEY_databases="$OCF_RESKEY_databases"
      export OCF_RESKEY_source_cluster="$OCF_RESKEY_source_cluster"
      export OCF_RESKEY_target_cluster="$OCF_RESKEY_target_cluster"
      export OCF_RESKEY_source_replication_vip="$OCF_RESKEY_source_replication_vip"
      export OCF_RESKEY_target_replication_vip="$OCF_RESKEY_target_replication_vip"
      export OCF_RESKEY_production_vip_resource="$OCF_RESKEY_production_vip_resource"
      export OCF_RESKEY_target_vip_resource="$OCF_RESKEY_target_vip_resource"
      export OCF_RESKEY_original_production_vip_ip="$OCF_RESKEY_original_production_vip_ip"
      export OCF_RESKEY_original_target_vip_ip="$OCF_RESKEY_original_target_vip_ip"
      export OCF_RESKEY_source_pgpassfile="$OCF_RESKEY_source_pgpassfile"
      export OCF_RESKEY_target_pgpassfile="$OCF_RESKEY_target_pgpassfile"
      export OCF_RESKEY_cutover_debug="$OCF_RESKEY_cutover_debug"
      export OCF_RESKEY_lag_threshold="$OCF_RESKEY_lag_threshold"
      export OCF_RESKEY_production_cluster="$OCF_RESKEY_production_cluster"
      export PGTWIN_V2_CURRENT_PRODUCTION="$current_production"
      export PGTWIN_V2_DESIRED_PRODUCTION="$desired_production"
      export PGTWIN_V2_OLD_DIRECTION="$old_direction"
      export PGTWIN_V2_NEW_DIRECTION="$new_direction"
      nohup "$0" internal-cutover-v2 >> "$cutover_log" 2>&1 &
      echo $! )
    local cutover_pid=$!

    echo "pid=$cutover_pid" >> "$cutover_state_file"

    ocf_log info "v2.0 cutover started in background (PID: $cutover_pid)"
    return 0
}

start_async_cutover() {
    # Start cutover preparation in background to avoid monitor timeout
    # Creates state file to track progress
    # Returns immediately, monitor will check progress on each cycle

    # Use PGHOME instead of PGDATA to avoid basebackup contamination
    # Dynamically discover from target cluster configuration (works across failovers)
    local pghome=$(get_cluster_pghome "target")
    # Include resource instance name to avoid confusion with other migration resources
    local cutover_state_file="${pghome}/.cutover_in_progress_${OCF_RESOURCE_INSTANCE}"

    # ENHANCEMENT v1.6.18: Timestamped cutover logs for history retention
    # Generate unique log filename with timestamp
    local cutover_timestamp=$(date '+%Y%m%d-%H%M%S')
    local cutover_log="${pghome}/.cutover-${cutover_timestamp}.log"
    local cutover_log_symlink="${pghome}/.cutover.log"

    # Check if cutover already in progress
    if [ -f "$cutover_state_file" ]; then
        ocf_log info "Cutover preparation already in progress"
        return 0
    fi

    ocf_log info "Starting asynchronous cutover preparation"
    ocf_log info "All stdout/stderr will be captured to: $cutover_log"
    if [ "$OCF_RESKEY_cutover_debug" = "true" ]; then
        ocf_log info "Debug mode ENABLED - bash tracing (set -x) will be used"
    fi

    # Create state file with initial status
    # Note: Values must be quoted for safe sourcing (shell special chars)
    cat > "$cutover_state_file" <<EOF
status=running
start_time=$(date +%s)
step=0
current_step='Initializing'
error_message=''
failed_step=''
cutover_log='$cutover_log'
EOF
    chown postgres:postgres "$cutover_state_file"

    # Create timestamped log file with correct ownership
    touch "$cutover_log"
    chown postgres:postgres "$cutover_log"

    # Create/update symlink to latest log for convenience
    rm -f "$cutover_log_symlink"
    ln -s "$(basename "$cutover_log")" "$cutover_log_symlink"

    # Start cutover in background with OCF environment variables
    # Export all necessary OCF variables for the background process
    ( export OCF_RESKEY_pgdata="$OCF_RESKEY_pgdata"
      export OCF_RESKEY_pgport="$OCF_RESKEY_pgport"
      export OCF_RESKEY_migration_dbuser="$OCF_RESKEY_migration_dbuser"
      export OCF_RESKEY_pgdatabase="$OCF_RESKEY_pgdatabase"
      export OCF_RESKEY_source_cluster="$OCF_RESKEY_source_cluster"
      export OCF_RESKEY_target_cluster="$OCF_RESKEY_target_cluster"
      export OCF_RESKEY_source_replication_vip="$OCF_RESKEY_source_replication_vip"
      export OCF_RESKEY_target_replication_vip="$OCF_RESKEY_target_replication_vip"
      export OCF_RESKEY_production_vip_resource="$OCF_RESKEY_production_vip_resource"
      export OCF_RESKEY_cutover_debug="$OCF_RESKEY_cutover_debug"
      nohup "$0" internal-cutover >> "$cutover_log" 2>&1 &
      echo $! )
    local cutover_pid=$!

    # Save PID to state file
    echo "pid=$cutover_pid" >> "$cutover_state_file"

    ocf_log info "Cutover preparation started in background (PID: $cutover_pid)"
    ocf_log info "Monitor progress: tail -f $cutover_log"

    return 0
}

check_cutover_progress() {
    # Check progress of async cutover preparation
    # Returns quickly for non-blocking monitor operation

    # Use PGHOME instead of PGDATA to avoid basebackup contamination
    # Dynamically discover from target cluster configuration (works across failovers)
    local pghome=$(get_cluster_pghome "target")
    # Include resource instance name to avoid confusion with other migration resources
    local cutover_state_file="${pghome}/.cutover_in_progress_${OCF_RESOURCE_INSTANCE}"

    if [ ! -f "$cutover_state_file" ]; then
        # No cutover in progress
        return 0
    fi

    # Read current state (including cutover_log path from v1.6.18+)
    source "$cutover_state_file"

    # ENHANCEMENT v1.6.18: Use cutover_log from state file if available
    # Fallback to symlink for compatibility with older state files
    if [ -z "$cutover_log" ]; then
        cutover_log="${pghome}/.cutover.log"
    fi

    # Check cutover status using AUTHORITATIVE state file fields
    # Primary decision: status + step fields (written by cutover process)
    # Secondary check: process state (safety verification to avoid race conditions)

    if [ "$status" = "complete" ] && [ "$step" = "complete" ]; then
        # State file indicates completion - verify process isn't still writing
        if [ -n "$pid" ] && kill -0 "$pid" 2>/dev/null; then
            # Process still running but status=complete - likely race condition
            # Process may still be writing final state, wait for next monitor cycle
            ocf_log debug "Cutover status=complete but process $pid still running - waiting for process to exit"
            return 0
        fi

        # SAFE: status=complete AND step=complete AND process is NOT running
        ocf_log info "=========================================="
        ocf_log info "✓ CUTOVER PREPARATION COMPLETED"
        ocf_log info "=========================================="
        ocf_log info "Status: $status (verified)"
        ocf_log info "Step: $step (verified)"
        ocf_log info "PID: ${pid:-none} (process exited)"
        ocf_log info "Resource: ${OCF_RESOURCE_INSTANCE}"
        ocf_log info "Cutover log: $cutover_log"

        # Set cluster attribute to CUTOVER_COMPLETE (cluster property - cluster-wide, persistent)
        ocf_log info "Setting migration-state cluster attribute to CUTOVER_COMPLETE..."
        crm_attribute -n migration-state -v "CUTOVER_COMPLETE" 2>&1 | \
            while IFS= read -r line; do ocf_log info "crm_attribute: $line"; done

        # Clean up state file (resource-specific)
        rm -f "$cutover_state_file"
        ocf_log info "Cutover state file removed"

        # Auto-stop the migration resource by setting target-role=Stopped
        # This prevents the resource from restarting on cluster restarts
        ocf_log info "Migration complete - setting target-role=Stopped to prevent restarts..."
        crm_resource --meta --resource "$OCF_RESOURCE_INSTANCE" --set-parameter target-role --parameter-value Stopped 2>&1 | \
            while IFS= read -r line; do ocf_log info "crm_resource: $line"; done

        ocf_log info "Migration resource will stop on next monitor cycle (target-role=Stopped)"
        ocf_log info "You can safely delete it later with: crm configure delete ${OCF_RESOURCE_INSTANCE}"

        return 0

    elif [ "$status" = "failed" ]; then
        # State file explicitly indicates failure
        ocf_log err "=========================================="
        ocf_log err "CUTOVER PREPARATION FAILED"
        ocf_log err "=========================================="
        ocf_log err "Status: $status"
        ocf_log err "Failed at step: ${failed_step:-unknown}"
        ocf_log err "Error message: ${error_message:-none}"
        ocf_log err "Resource: ${OCF_RESOURCE_INSTANCE}"
        ocf_log err "Check detailed logs: $cutover_log"
        ocf_log err "Rollback should have been triggered automatically"
        ocf_log err "=========================================="
        rm -f "$cutover_state_file"
        return $OCF_ERR_GENERIC

    elif [ "$status" = "running" ]; then
        # Process is still actively running - normal case
        # Verify process is actually alive
        if [ -n "$pid" ]; then
            if ! kill -0 "$pid" 2>/dev/null; then
                # Process died unexpectedly while status=running
                ocf_log err "=========================================="
                ocf_log err "CUTOVER PROCESS DIED UNEXPECTEDLY"
                ocf_log err "=========================================="
                ocf_log err "Status: $status (should be 'complete' or 'failed')"
                ocf_log err "PID: $pid (no longer running)"
                ocf_log err "Last known step: ${step:-unknown} - ${current_step:-unknown}"
                ocf_log err "Resource: ${OCF_RESOURCE_INSTANCE}"
                ocf_log err "Check logs: $cutover_log"
                ocf_log err "=========================================="
                # Trigger rollback for unexpected process death
                if [ -n "$step" ] && [ "$step" != "0" ] && [ "$step" != "complete" ]; then
                    rollback_cutover "$step" "Process died unexpectedly (PID $pid)"
                fi
                rm -f "$cutover_state_file"
                return $OCF_ERR_GENERIC
            fi
        else
            # No PID but status=running - inconsistent state (process may still be initializing)
            ocf_log warn "Cutover state shows status=running but no PID - waiting for state update"
        fi

        # Process is running normally - continue monitoring
        return 0

    else
        # Unexpected status value or empty status
        ocf_log err "=========================================="
        ocf_log err "UNEXPECTED CUTOVER STATE"
        ocf_log err "=========================================="
        ocf_log err "Status: ${status:-empty} (expected: running, complete, or failed)"
        ocf_log err "Step: ${step:-unknown}"
        ocf_log err "PID: ${pid:-none}"
        ocf_log err "Resource: ${OCF_RESOURCE_INSTANCE}"
        ocf_log err "State file: $cutover_state_file"
        ocf_log err "=========================================="
        return $OCF_ERR_GENERIC
    fi

    # Log progress
    if [ -n "$current_step" ]; then
        ocf_log info "Cutover progress: Step $step - $current_step"
    fi

    # Check for timeout (30 minutes)
    local current_time=$(date +%s)
    local elapsed=$((current_time - start_time))
    if [ $elapsed -gt 1800 ]; then
        ocf_log err "Cutover preparation timeout (elapsed: ${elapsed}s)"
        if [ -n "$pid" ]; then
            kill -9 "$pid" 2>/dev/null
        fi
        rm -f "$cutover_state_file"
        return $OCF_ERR_GENERIC
    fi

    return 0
}


pgtwin_migrate_prepare_cutover_v2() {
    # v2.0 BIDIRECTIONAL CUTOVER
    # Key changes from v1:
    #   - DELETES old direction replication (not just disable)
    #   - CREATES new direction replication (fresh)
    #   - Updates CIB with new production cluster
    #   - Supports both forward→reverse and reverse→forward

    log_ts() {
        echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*"
    }

    # Get direction info from environment
    local current_production="$PGTWIN_V2_CURRENT_PRODUCTION"
    local desired_production="$PGTWIN_V2_DESIRED_PRODUCTION"
    local old_direction="$PGTWIN_V2_OLD_DIRECTION"
    local new_direction="$PGTWIN_V2_NEW_DIRECTION"

    # Load OCF functions
    : ${OCF_FUNCTIONS_DIR=${OCF_ROOT}/lib/heartbeat}
    . ${OCF_FUNCTIONS_DIR}/ocf-shellfuncs

    local pghome=$(get_cluster_pghome "target")
    local cutover_state_file="${pghome}/.cutover_in_progress_${OCF_RESOURCE_INSTANCE}"

    source "$cutover_state_file"
    if [ -z "$cutover_log" ]; then
        cutover_log="${pghome}/.cutover.log"
    fi

    log_ts "=========================================="
    log_ts "v2.0 BIDIRECTIONAL CUTOVER STARTING"
    log_ts "=========================================="
    log_ts "Current production: $current_production"
    log_ts "Desired production: $desired_production"
    log_ts "Old direction: $old_direction (will be DELETED)"
    log_ts "New direction: $new_direction (will be CREATED)"

    # Get database list
    get_database_list >/dev/null 2>&1
    log_ts "Databases: $DB_LIST ($DB_COUNT total)"

    # Determine old and new production nodes
    local old_production_node new_production_node
    if [ "$current_production" = "$OCF_RESKEY_source_cluster" ]; then
        old_production_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
        new_production_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
    else
        old_production_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
        new_production_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
    fi

    if [ -z "$old_production_node" ] || [ -z "$new_production_node" ]; then
        log_ts "ERROR: Cannot discover nodes (old=$old_production_node, new=$new_production_node)"
        echo "status=failed" >> "$cutover_state_file"
        echo "error_message='Cannot discover cluster nodes'" >> "$cutover_state_file"
        return 1
    fi

    log_ts "Old production node: $old_production_node"
    log_ts "New production node: $new_production_node"

    # Determine passfiles and hosts
    local old_passfile new_passfile old_host new_host
    if [ "$current_production" = "$OCF_RESKEY_source_cluster" ]; then
        old_passfile="$OCF_RESKEY_source_pgpassfile"
        old_host="$OCF_RESKEY_source_replication_vip"
        new_passfile="$OCF_RESKEY_target_pgpassfile"
        new_host="$OCF_RESKEY_target_replication_vip"
    else
        old_passfile="$OCF_RESKEY_target_pgpassfile"
        old_host="$OCF_RESKEY_target_replication_vip"
        new_passfile="$OCF_RESKEY_source_pgpassfile"
        new_host="$OCF_RESKEY_source_replication_vip"
    fi

    local first_db="${DB_LIST%% *}"

    #------------------------------------------------------------------------
    # STEP 1: Set old production to READ-ONLY
    #------------------------------------------------------------------------
    echo "step=1" >> "$cutover_state_file"
    echo "current_step='Setting old production to read-only'" >> "$cutover_state_file"
    log_ts "Step 1/8: Setting old production ($current_production) to read-only"

    PGPASSFILE="$old_passfile" psql -h "$old_host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$first_db" \
        -c "ALTER SYSTEM SET default_transaction_read_only = on; SELECT pg_reload_conf();" >/dev/null 2>&1

    log_ts "✓ Old production is now read-only"

    #------------------------------------------------------------------------
    # STEP 2: Wait for replication sync (lag = 0)
    #------------------------------------------------------------------------
    echo "step=2" >> "$cutover_state_file"
    echo "current_step='Waiting for replication sync'" >> "$cutover_state_file"
    log_ts "Step 2/8: Waiting for $old_direction replication sync"

    local max_wait=300
    local waited=0
    while [ $waited -lt $max_wait ]; do
        local max_lag=0
        for db in $DB_LIST; do
            local sub_name="pgtwin_migrate_${old_direction}_sub_${db}"
            local sub_host sub_passfile
            if [ "$old_direction" = "forward" ]; then
                sub_host="$OCF_RESKEY_target_replication_vip"
                sub_passfile="$OCF_RESKEY_target_pgpassfile"
            else
                sub_host="$OCF_RESKEY_source_replication_vip"
                sub_passfile="$OCF_RESKEY_source_pgpassfile"
            fi

            local lag=$(PGPASSFILE="$sub_passfile" psql -h "$sub_host" -p "$OCF_RESKEY_pgport" \
                -U "$OCF_RESKEY_migration_dbuser" -d "$db" -t -A \
                -c "SELECT COALESCE(pg_wal_lsn_diff(latest_end_lsn, confirmed_flush_lsn), 0)
                    FROM pg_stat_subscription WHERE subname = '${sub_name}';" 2>/dev/null)
            lag=${lag:-0}
            if [ "$lag" -gt "$max_lag" ]; then
                max_lag=$lag
            fi
        done

        if [ "$max_lag" -eq 0 ]; then
            log_ts "✓ Replication synchronized (lag = 0)"
            break
        fi

        log_ts "Waiting for sync: lag = $max_lag bytes"
        sleep 5
        waited=$((waited + 5))
    done

    if [ $waited -ge $max_wait ]; then
        log_ts "ERROR: Timeout waiting for replication sync"
        # Restore read-write
        PGPASSFILE="$old_passfile" psql -h "$old_host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$first_db" \
            -c "ALTER SYSTEM SET default_transaction_read_only = off; SELECT pg_reload_conf();" >/dev/null 2>&1
        echo "status=failed" >> "$cutover_state_file"
        echo "error_message='Timeout waiting for sync'" >> "$cutover_state_file"
        return 1
    fi

    #------------------------------------------------------------------------
    # STEP 3: DELETE old direction replication
    #------------------------------------------------------------------------
    echo "step=3" >> "$cutover_state_file"
    echo "current_step='Deleting old replication direction'" >> "$cutover_state_file"
    log_ts "Step 3/8: Deleting $old_direction replication"

    delete_replication_direction "$old_direction"
    log_ts "✓ Deleted $old_direction replication infrastructure"

    #------------------------------------------------------------------------
    # STEP 4: CREATE new direction replication
    #------------------------------------------------------------------------
    echo "step=4" >> "$cutover_state_file"
    echo "current_step='Creating new replication direction'" >> "$cutover_state_file"
    log_ts "Step 4/8: Creating $new_direction replication"

    if ! create_replication_direction "$new_direction"; then
        log_ts "ERROR: Failed to create $new_direction replication"
        # Try to recreate old direction for rollback
        create_replication_direction "$old_direction"
        # Restore read-write
        PGPASSFILE="$old_passfile" psql -h "$old_host" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$first_db" \
            -c "ALTER SYSTEM SET default_transaction_read_only = off; SELECT pg_reload_conf();" >/dev/null 2>&1
        echo "status=failed" >> "$cutover_state_file"
        echo "error_message='Failed to create new replication'" >> "$cutover_state_file"
        return 1
    fi

    log_ts "✓ Created $new_direction replication infrastructure"

    #------------------------------------------------------------------------
    # STEP 5: Swap VIP addresses
    #------------------------------------------------------------------------
    echo "step=5" >> "$cutover_state_file"
    echo "current_step='Swapping VIP addresses'" >> "$cutover_state_file"
    log_ts "Step 5/8: Swapping VIP addresses"

    swap_vip_addresses
    log_ts "✓ VIP addresses swapped"

    #------------------------------------------------------------------------
    # STEP 6: Set old production back to READ-WRITE
    #------------------------------------------------------------------------
    echo "step=6" >> "$cutover_state_file"
    echo "current_step='Restoring old production to read-write'" >> "$cutover_state_file"
    log_ts "Step 6/8: Setting old production back to read-write"

    PGPASSFILE="$old_passfile" psql -h "$old_host" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$first_db" \
        -c "ALTER SYSTEM SET default_transaction_read_only = off; SELECT pg_reload_conf();" >/dev/null 2>&1

    log_ts "✓ Old production is now read-write (ready for reverse replication)"

    #------------------------------------------------------------------------
    # STEP 7: Update CIB with new production cluster
    #------------------------------------------------------------------------
    echo "step=7" >> "$cutover_state_file"
    echo "current_step='Updating CIB state'" >> "$cutover_state_file"
    log_ts "Step 7/8: Updating CIB with new production cluster"

    crm_attribute -n migration-production-cluster -v "$desired_production" 2>/dev/null
    crm_attribute -n migration-last-cutover -v "$(date -Iseconds)" 2>/dev/null

    log_ts "✓ CIB updated: production_cluster = $desired_production"

    #------------------------------------------------------------------------
    # STEP 8: Complete
    #------------------------------------------------------------------------
    echo "step=8" >> "$cutover_state_file"
    echo "current_step='Cutover complete'" >> "$cutover_state_file"
    echo "status=complete" >> "$cutover_state_file"

    log_ts "=========================================="
    log_ts "✓ v2.0 CUTOVER COMPLETE"
    log_ts "=========================================="
    log_ts "New production cluster: $desired_production"
    log_ts "Active replication: $new_direction"
    log_ts ""
    log_ts "To switch back, run:"
    log_ts "  crm resource update ${OCF_RESOURCE_INSTANCE} production_cluster=$current_production"
    log_ts ""
    log_ts "To finalize migration (cleanup):"
    log_ts "  crm resource update ${OCF_RESOURCE_INSTANCE} finalize_replication=true"
    log_ts "  crm resource stop ${OCF_RESOURCE_INSTANCE}"
    log_ts "=========================================="

    # Clean up state file after brief delay
    sleep 2
    rm -f "$cutover_state_file"

    return 0
}

pgtwin_migrate_prepare_cutover() {
    # ZERO-RISK CUTOVER with ZERO DATA LOSS GUARANTEE
    # Creates and enables reverse replication BEFORE VIP swap
    # Called internally by async background process
    #
    # Steps:
    #  1: Set source (PG17) to read-only mode (apps blocked, pgmigrate unaffected)
    #  2: Wait for forward replication sync (lag=0)
    #  3: Create and enable reverse replication infrastructure (ENABLED!)
    #     - pgmigrate role bypasses read-only mode (role-level setting)
    #     - Apply worker inherits pgmigrate's bypass
    #  4: Verify reverse subscriptions are streaming (lag=0)
    #  5: Swap VIP IP addresses (PRODUCTION CUTOVER)
    #     - Apps start writing to target
    #     - Reverse replication ALREADY ACTIVE (zero risk!)
    #  6: Disable forward subscriptions (no longer needed)
    #  7: Set source (PG17) back to read-write
    #  8: Enable reverse DDL triggers
    #  9: Monitor reverse replication
    #
    # KEY INSIGHT: Reverse replication is enabled BEFORE VIP swap.
    # When apps start writing to target, changes are immediately
    # replicated to source. NO RISK WINDOW.
    #
    # PREREQUISITE: pgmigrate role must have:
    #   ALTER ROLE pgmigrate SET default_transaction_read_only = off;
    # This allows pgmigrate (and apply workers) to write while DB is read-only.
    #
    # On any failure: Automatic rollback to restore PG17 production
    # This function runs in background, not in monitor context

    # Helper function for timestamped logging
    log_ts() {
        echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*"
    }

    # Enable bash tracing if debug mode requested OR if Pacemaker started us with tracing
    local enable_debug="false"
    local log_reason=""
    if [ "$OCF_RESKEY_cutover_debug" = "true" ]; then
        enable_debug="true"
        log_reason="cutover_debug parameter set to true"
    elif [[ "$-" == *x* ]]; then
        enable_debug="true"
        log_reason="inherited from Pacemaker resource trace"
    fi

    if [ "$enable_debug" = "true" ]; then
        log_ts "=========================================="
        log_ts "DEBUG MODE ENABLED (set -x)"
        log_ts "Reason: $log_reason"
        log_ts "Shell options: $-"
        log_ts "=========================================="
        set -x  # Enable bash tracing for maximum visibility
    fi

    # Use PGHOME for state file (not PGDATA to avoid basebackup contamination)
    local pghome=$(get_cluster_pghome "source")
    # Include resource instance name to avoid confusion with other migration resources
    local cutover_state_file="${pghome}/.cutover_in_progress_${OCF_RESOURCE_INSTANCE}"

    # Read cutover_log path from state file
    source "$cutover_state_file"
    if [ -z "$cutover_log" ]; then
        # Fallback for compatibility
        cutover_log="${pghome}/.cutover.log"
    fi

    log_ts "=========================================="
    log_ts "CLEAN 7-STEP CUTOVER STARTING"
    log_ts "=========================================="
    log_ts "Debug mode: ${OCF_RESKEY_cutover_debug}"
    log_ts "Process PID: $$"
    log_ts "Log file: $cutover_log"
    log_ts "State file: $cutover_state_file"

    ocf_log info "=========================================="
    ocf_log info "CLEAN 7-STEP CUTOVER STARTING"
    ocf_log info "=========================================="

    # Discover cluster nodes
    local source_primary=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
    local target_primary=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")

    if [ -z "$source_primary" ] || [ -z "$target_primary" ]; then
        local error_msg="Failed to discover primary nodes (source: $source_primary, target: $target_primary)"
        ocf_log err "$error_msg"
        echo "status=failed" >> "$cutover_state_file"
        echo "failed_step=0" >> "$cutover_state_file"
        echo "error_message='$error_msg'" >> "$cutover_state_file"
        rollback_cutover 0 "$error_msg"
        return $OCF_ERR_GENERIC
    fi

    ocf_log info "Source primary: $source_primary"
    ocf_log info "Target primary: $target_primary"

    #------------------------------------------------------------------------
    # STEP 0: Validate database list (SAFETY CHECK)
    #------------------------------------------------------------------------
    ocf_log info "Step 0: Validating database list consistency"
    log_ts "Step 0: Validating database list consistency"
    log_ts "   Databases: $(echo $DB_LIST | tr ' ' ',')"
    log_ts "   Count: $DB_COUNT"

    if ! validate_database_list_before_cutover; then
        local error_msg="Database list validation failed - cannot proceed with cutover"
        ocf_log err "$error_msg"
        echo "status=failed" >> "$cutover_state_file"
        echo "failed_step=0" >> "$cutover_state_file"
        echo "error_message='$error_msg'" >> "$cutover_state_file"
        rollback_cutover 0 "$error_msg"
        return $OCF_ERR_GENERIC
    fi

    ocf_log info "✓ Database list validated - no changes detected since migration started"
    log_ts "✓ Database list validated"

    #------------------------------------------------------------------------
    # STEP 1: Set source cluster (PG17) to read-only mode
    #------------------------------------------------------------------------
    echo "step=1" >> "$cutover_state_file"
    echo "current_step='Setting source cluster to read-only'" >> "$cutover_state_file"

    ocf_log info "Step 1/6: Setting source cluster to read-only"
    log_ts "Step 1/6: Setting source cluster to read-only"

    # Use first database for connection (setting is cluster-wide)
    local first_db="${DB_LIST%% *}"
    if ! set_cluster_read_only "source" "$source_primary" "true" "$first_db"; then
        local error_msg="Failed to set source cluster to read-only"
        ocf_log err "$error_msg"
        echo "status=failed" >> "$cutover_state_file"
        echo "failed_step=1" >> "$cutover_state_file"
        echo "error_message='$error_msg'" >> "$cutover_state_file"
        rollback_cutover 1 "$error_msg"
        return $OCF_ERR_GENERIC
    fi

    ocf_log info "✓ Source cluster is now read-only (applications get write errors)"
    log_ts "✓ Source cluster is now read-only"

    #------------------------------------------------------------------------
    # STEP 2: Wait for forward replication sync (lag=0) - ALL DATABASES
    #------------------------------------------------------------------------
    echo "step=2" >> "$cutover_state_file"
    echo "current_step='Waiting for forward replication sync (all databases)'" >> "$cutover_state_file"

    ocf_log info "Step 2/6: Waiting for forward replication to sync (lag=0) for ALL databases"
    log_ts "Step 2/6: Waiting for forward replication to sync (${DB_COUNT} databases)"

    local max_wait=300  # 5 minutes
    local waited=0
    local all_synced=false

    while [ $waited -lt $max_wait ]; do
        all_synced=true
        local max_lag=0
        local lag_summary=""

        for db in $DB_LIST; do
            local sub_name="pgtwin_migrate_forward_sub_${db}"
            local lag=$(get_replication_lag "target" "$target_primary" "$sub_name" "$db")

            if [ "$lag" = "-1" ]; then
                ocf_log warn "Cannot determine replication lag for database: $db, retrying..."
                log_ts "Database $db: lag unknown, retrying..."
                all_synced=false
            elif [ "$lag" != "0" ]; then
                log_ts "Database $db: lag = $lag bytes"
                lag_summary="${lag_summary}$db:${lag}B "
                all_synced=false
                if [ "$lag" -gt "$max_lag" ]; then
                    max_lag=$lag
                fi
            else
                log_ts "Database $db: lag = 0 bytes (synced)"
            fi

            # Update state file
            update_database_state "$db" "$lag" "true" "false"
        done

        if [ "$all_synced" = "true" ]; then
            ocf_log info "✓ Forward replication synchronized for ALL databases (lag = 0 bytes)"
            log_ts "✓ Forward replication synchronized for ALL databases"
            break
        else
            ocf_log info "Waiting for sync: ${lag_summary}(max: ${max_lag}B)"
            log_ts "Waiting for sync: max lag = ${max_lag} bytes"
        fi

        sleep 5
        waited=$((waited + 5))
    done

    if [ "$all_synced" != "true" ]; then
        local error_msg="Timeout waiting for forward replication to sync (${waited}s elapsed, not all databases synced)"
        ocf_log err "$error_msg"
        for db in $DB_LIST; do
            local sub_name="pgtwin_migrate_forward_sub_${db}"
            local lag=$(get_replication_lag "target" "$target_primary" "$sub_name" "$db")
            ocf_log err "  Database $db: lag = $lag bytes"
        done
        echo "status=failed" >> "$cutover_state_file"
        echo "failed_step=2" >> "$cutover_state_file"
        echo "error_message='$error_msg'" >> "$cutover_state_file"
        rollback_cutover 2 "$error_msg"
        return $OCF_ERR_GENERIC
    fi

    #------------------------------------------------------------------------
    # STEP 3: Create and enable reverse replication infrastructure
    # ZERO-RISK: Enable reverse replication BEFORE VIP swap
    # pgmigrate role has read_only bypass, so apply worker can write to read-only source
    #------------------------------------------------------------------------
    echo "step=3" >> "$cutover_state_file"
    echo "current_step='Creating reverse replication infrastructure (all databases)'" >> "$cutover_state_file"

    ocf_log info "Step 3/9: Creating and enabling reverse replication infrastructure"
    log_ts "Step 3/9: Creating reverse replication infrastructure (${DB_COUNT} databases)"
    log_ts "   ZERO-RISK: pgmigrate role bypasses read-only mode"
    log_ts "   Reverse replication will start BEFORE VIP swap"

    local reverse_setup_failed=false
    for db in $DB_LIST; do
        local reverse_pub_name="pgtwin_migrate_reverse_pub_${db}"
        local reverse_sub_name="pgtwin_migrate_reverse_sub_${db}"
        local reverse_slot_name="pgtwin_migrate_reverse_slot_${db}"
        local conn_str="host=${target_primary} port=${OCF_RESKEY_pgport} dbname=${db} user=${OCF_RESKEY_migration_dbuser}"

        log_ts "  Database $db: creating reverse publication on target..."

        # Create reverse publication on target (publisher)
        if ! setup_publication "target" "$target_primary" "$reverse_pub_name" "$db" "false"; then
            ocf_log err "Failed to create reverse publication for database: $db"
            log_ts "  Database $db: ✗ failed to create reverse publication"
            reverse_setup_failed=true
            break
        fi
        log_ts "  Database $db: ✓ reverse publication created on target"

        log_ts "  Database $db: creating ENABLED reverse subscription on source..."

        # Create ENABLED subscription on source (subscriber)
        # - enabled=true: Start replicating immediately
        # - copy_data=false: Don't copy existing data (source already has all data)
        # - create_slot=true: Create replication slot on target
        # NOTE: This works because pgmigrate has role-level bypass for read-only mode
        #       The apply worker inherits this setting and can write to read-only source
        run_as_postgres_user "source" psql -h "$source_primary" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
            "CREATE SUBSCRIPTION ${reverse_sub_name}
             CONNECTION '${conn_str}'
             PUBLICATION ${reverse_pub_name}
             WITH (
               enabled = true,
               create_slot = true,
               slot_name = '${reverse_slot_name}',
               copy_data = false
             );" >/dev/null 2>&1

        if [ $? -ne 0 ]; then
            ocf_log err "Failed to create reverse subscription for database: $db"
            log_ts "  Database $db: ✗ failed to create reverse subscription"
            # Cleanup publication
            run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
                -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
                "DROP PUBLICATION IF EXISTS ${reverse_pub_name};" >/dev/null 2>&1
            reverse_setup_failed=true
            break
        fi

        ocf_log info "✓ Reverse replication created for database: $db (ENABLED)"
        log_ts "  Database $db: ✓ reverse subscription created and ENABLED"
        update_database_state "$db" "0" "true" "true"
    done

    if [ "$reverse_setup_failed" = "true" ]; then
        local error_msg="Failed to create reverse replication infrastructure"
        ocf_log err "$error_msg"
        echo "status=failed" >> "$cutover_state_file"
        echo "failed_step=3" >> "$cutover_state_file"
        echo "error_message='$error_msg'" >> "$cutover_state_file"
        rollback_cutover 3 "$error_msg"
        return $OCF_ERR_GENERIC
    fi

    ocf_log info "✓ Reverse replication infrastructure created for ALL databases"
    log_ts "✓ Reverse replication infrastructure created for ALL databases"

    #------------------------------------------------------------------------
    # STEP 4: Verify reverse subscriptions are streaming
    # Wait for subscriptions to connect and start streaming before VIP swap
    #------------------------------------------------------------------------
    echo "step=4" >> "$cutover_state_file"
    echo "current_step='Verifying reverse subscriptions streaming'" >> "$cutover_state_file"

    ocf_log info "Step 4/9: Verifying reverse subscriptions are streaming"
    log_ts "Step 4/9: Verifying reverse subscriptions are streaming (${DB_COUNT} databases)"

    # Give subscriptions time to connect (up to 30 seconds)
    local verify_max_wait=30
    local verify_waited=0
    local all_streaming=false

    while [ $verify_waited -lt $verify_max_wait ]; do
        all_streaming=true

        for db in $DB_LIST; do
            local reverse_sub_name="pgtwin_migrate_reverse_sub_${db}"

            # Check subscription state
            local sub_state=$(run_as_postgres_user "source" psql -h "$source_primary" -p "$OCF_RESKEY_pgport" \
                -U "$OCF_RESKEY_migration_dbuser" -d "$db" -Atc \
                "SELECT srsubstate FROM pg_subscription_rel sr
                 JOIN pg_subscription s ON sr.srsubid = s.oid
                 WHERE s.subname = '${reverse_sub_name}'
                 LIMIT 1;" 2>/dev/null)

            # Check if subscription is enabled and connected
            local sub_enabled=$(run_as_postgres_user "source" psql -h "$source_primary" -p "$OCF_RESKEY_pgport" \
                -U "$OCF_RESKEY_migration_dbuser" -d "$db" -Atc \
                "SELECT subenabled FROM pg_subscription WHERE subname = '${reverse_sub_name}';" 2>/dev/null)

            if [ "$sub_enabled" = "t" ]; then
                # 'r' = ready (streaming), empty = no tables (OK for empty DB)
                if [ "$sub_state" = "r" ] || [ -z "$sub_state" ]; then
                    log_ts "  Database $db: ✓ streaming (state=$sub_state)"
                else
                    log_ts "  Database $db: connecting (state=$sub_state)"
                    all_streaming=false
                fi
            else
                log_ts "  Database $db: subscription not enabled (enabled=$sub_enabled)"
                all_streaming=false
            fi
        done

        if [ "$all_streaming" = "true" ]; then
            ocf_log info "✓ All reverse subscriptions are streaming"
            log_ts "✓ All reverse subscriptions are streaming"
            break
        fi

        sleep 2
        verify_waited=$((verify_waited + 2))
    done

    if [ "$all_streaming" != "true" ]; then
        ocf_log warn "Some subscriptions may not be fully streaming yet, but proceeding (non-fatal)"
        log_ts "⚠ Some subscriptions may not be fully streaming (non-fatal)"
    fi

    #------------------------------------------------------------------------
    # STEP 5: Swap VIP IP addresses (PRODUCTION CUTOVER)
    # ZERO-RISK: Reverse replication is ALREADY ACTIVE!
    #------------------------------------------------------------------------
    echo "step=5" >> "$cutover_state_file"
    echo "current_step='Swapping VIP IP addresses (PRODUCTION CUTOVER)'" >> "$cutover_state_file"

    ocf_log info "Step 5/9: Swapping VIP IP addresses (PRODUCTION CUTOVER)"
    log_ts "Step 5/9: Swapping VIP IP addresses (PRODUCTION CUTOVER)"
    log_ts "=========================================="
    log_ts "ZERO-RISK: Reverse replication is ALREADY ACTIVE!"
    log_ts "When apps start writing to target, changes are immediately"
    log_ts "replicated to source. NO data loss window."
    log_ts "=========================================="

    if ! swap_vip_addresses; then
        local error_msg="Failed to swap VIP IP addresses"
        ocf_log err "$error_msg"
        echo "status=failed" >> "$cutover_state_file"
        echo "failed_step=5" >> "$cutover_state_file"
        echo "error_message='$error_msg'" >> "$cutover_state_file"
        rollback_cutover 5 "$error_msg"
        return $OCF_ERR_GENERIC
    fi

    ocf_log info "✓ VIP addresses swapped - Applications now connect to target cluster"
    log_ts "✓ VIP addresses swapped - Applications now connect to target cluster"
    log_ts "   Reverse replication is ACTIVE - changes are being replicated to source"

    #------------------------------------------------------------------------
    # STEP 6: Disable forward subscriptions (SAFETY - prevent conflicts)
    #------------------------------------------------------------------------
    echo "step=6" >> "$cutover_state_file"
    echo "current_step='Disabling forward subscriptions (all databases)'" >> "$cutover_state_file"

    ocf_log info "Step 6/9: Disabling forward subscriptions (source → target) for ALL databases"
    log_ts "Step 6/9: Disabling forward subscriptions (${DB_COUNT} databases)"
    log_ts "   SAFETY: Preventing conflicts - applications now on target via VIP"

    local disable_failed=false
    for db in $DB_LIST; do
        local forward_sub_name="pgtwin_migrate_forward_sub_${db}"

        log_ts "  Database $db: disabling forward subscription..."

        run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
            "ALTER SUBSCRIPTION ${forward_sub_name} DISABLE;" >/dev/null 2>&1

        if [ $? -eq 0 ]; then
            ocf_log info "✓ Forward subscription disabled for database: $db"
            log_ts "  Database $db: ✓ forward subscription disabled"
        else
            ocf_log err "Failed to disable forward subscription for database: $db"
            log_ts "  Database $db: ✗ failed to disable forward subscription"
            disable_failed=true
            break
        fi
    done

    if [ "$disable_failed" = "true" ]; then
        local error_msg="Failed to disable forward subscriptions - risk of replication conflicts"
        ocf_log err "$error_msg"
        echo "status=failed" >> "$cutover_state_file"
        echo "failed_step=6" >> "$cutover_state_file"
        echo "error_message='$error_msg'" >> "$cutover_state_file"
        rollback_cutover 6 "$error_msg"
        return $OCF_ERR_GENERIC
    fi

    ocf_log info "✓ Forward subscriptions disabled for ALL databases (conflict-safe)"
    log_ts "✓ Forward subscriptions disabled"

    #------------------------------------------------------------------------
    # STEP 7: Set source cluster (PG17) back to read-write
    #------------------------------------------------------------------------
    echo "step=7" >> "$cutover_state_file"
    echo "current_step='Setting source cluster back to read-write'" >> "$cutover_state_file"

    ocf_log info "Step 7/9: Setting source cluster back to read-write"
    log_ts "Step 7/9: Setting source cluster back to read-write"
    log_ts "   NOTE: pgmigrate apply worker was already writing (via bypass)"
    log_ts "   This step enables DBA/admin write access"

    # Use first database for connection (setting is cluster-wide)
    if ! set_cluster_read_only "source" "$source_primary" "false" "$first_db"; then
        local error_msg="Failed to set source cluster to read-write"
        ocf_log err "$error_msg"
        ocf_log err "CRITICAL: Source cluster stuck in read-only mode"
        echo "status=failed" >> "$cutover_state_file"
        echo "failed_step=7" >> "$cutover_state_file"
        echo "error_message='$error_msg'" >> "$cutover_state_file"
        rollback_cutover 7 "$error_msg"
        return $OCF_ERR_GENERIC
    fi

    ocf_log info "✓ Source cluster is now read-write (DBAs have full access)"
    log_ts "✓ Source cluster is now read-write"

    #------------------------------------------------------------------------
    # STEP 8: Enable reverse DDL triggers (target → source)
    #------------------------------------------------------------------------
    echo "step=8" >> "$cutover_state_file"
    echo "current_step='Enabling reverse DDL triggers (all databases)'" >> "$cutover_state_file"

    ocf_log info "Step 8/9: Enabling reverse DDL triggers (target → source) for ALL databases"
    log_ts "Step 8/9: Enabling reverse DDL triggers (${DB_COUNT} databases)"
    log_ts "   This ensures CREATE TABLE/ALTER TABLE/DROP TABLE are replicated"

    for db in $DB_LIST; do
        local trigger_name="replicate_ddl_to_source_trigger_${db}"

        log_ts "  Database $db: enabling reverse DDL trigger..."

        # Try to enable existing trigger first (fast path)
        if run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
            -U "$OCF_RESKEY_migration_dbuser" -d "$db" -c \
            "ALTER EVENT TRIGGER ${trigger_name} ENABLE;" >/dev/null 2>&1; then
            ocf_log info "✓ Reverse DDL trigger enabled for database: $db"
            log_ts "  Database $db: ✓ reverse DDL trigger enabled"
        else
            # Fallback: trigger doesn't exist, create now
            ocf_log info "Reverse DDL trigger doesn't exist for $db, creating now..."
            log_ts "  Database $db: creating reverse DDL trigger"
            if ! setup_ddl_trigger "target" "$target_primary" "$source_primary" "source" "false" "$db"; then
                ocf_log warn "Reverse DDL trigger setup failed for database: $db - schema changes must be manually synchronized"
                log_ts "  Database $db: ⚠ reverse DDL trigger setup failed (non-fatal)"
                # Non-fatal: continue even if DDL trigger fails
            else
                ocf_log info "✓ Reverse DDL trigger created and active for database: $db"
                log_ts "  Database $db: ✓ reverse DDL trigger active"
            fi
        fi
    done

    #------------------------------------------------------------------------
    # STEP 9: Monitor reverse replication
    #------------------------------------------------------------------------
    echo "step=9" >> "$cutover_state_file"
    echo "current_step='Monitoring reverse replication (all databases)'" >> "$cutover_state_file"

    ocf_log info "Step 9/9: Monitoring reverse replication for ALL databases"
    log_ts "Step 9/9: Monitoring reverse replication (${DB_COUNT} databases)"

    # Wait briefly for any pending changes to replicate
    sleep 5

    # Check reverse replication lag (give it 60 seconds to sync)
    max_wait=60
    waited=0
    local all_synced=false

    while [ $waited -lt $max_wait ]; do
        all_synced=true
        local max_lag=0
        local lag_summary=""

        for db in $DB_LIST; do
            local reverse_sub_name="pgtwin_migrate_reverse_sub_${db}"
            local lag=$(get_replication_lag "source" "$source_primary" "$reverse_sub_name" "$db")

            if [ "$lag" = "0" ]; then
                log_ts "  Database $db: lag = 0 bytes (synced)"
            elif [ "$lag" != "-1" ]; then
                log_ts "  Database $db: lag = $lag bytes (syncing...)"
                lag_summary="${lag_summary}$db:${lag}B "
                all_synced=false
                if [ "$lag" -gt "$max_lag" ]; then
                    max_lag=$lag
                fi
            else
                log_ts "  Database $db: lag unknown"
                all_synced=false
            fi
        done

        if [ "$all_synced" = "true" ]; then
            ocf_log info "✓ Reverse replication synchronized for ALL databases (lag = 0 bytes)"
            log_ts "✓ Reverse replication synchronized for ALL databases"
            break
        else
            ocf_log info "Reverse replication syncing: ${lag_summary}(max: ${max_lag}B)"
        fi

        sleep 5
        waited=$((waited + 5))
    done

    if [ "$all_synced" != "true" ]; then
        ocf_log warn "Could not verify reverse replication lag for all databases, but subscriptions are enabled"
        log_ts "Could not verify reverse replication lag for all databases (non-fatal)"
    fi

    #------------------------------------------------------------------------
    # CUTOVER COMPLETE
    #------------------------------------------------------------------------
    echo "step=complete" >> "$cutover_state_file"
    echo "status=complete" >> "$cutover_state_file"
    echo "current_step='Cutover complete'" >> "$cutover_state_file"

    # Set cluster state to CUTOVER_COMPLETE (cluster property - cluster-wide, persistent)
    crm_attribute -n migration-state -v "CUTOVER_COMPLETE" 2>/dev/null

    ocf_log info "=========================================="
    ocf_log info "✓ CUTOVER COMPLETE - ZERO DOWNTIME ACHIEVED"
    ocf_log info "=========================================="
    ocf_log info "New production state:"
    ocf_log info "  - Applications → Target cluster (PostgreSQL newer version)"
    ocf_log info "  - Forward replication: Source → Target (disabled)"
    ocf_log info "  - Reverse replication: Target → Source (active)"
    ocf_log info "  - Source cluster: Standby receiving updates"
    ocf_log info ""
    ocf_log info "Next steps:"
    ocf_log info "  1. Verify applications are working on target cluster"
    ocf_log info "  2. Monitor reverse replication lag"
    ocf_log info "  3. Keep source cluster as backup or decommission later"

    log_ts "=========================================="
    log_ts "✓ CUTOVER COMPLETE"
    log_ts "=========================================="
    log_ts "Production cluster: Target"
    log_ts "Backup cluster: Source (receiving reverse replication)"
    log_ts "Forward subscription: Disabled (no longer needed)"

    return $OCF_SUCCESS
}

reconcile_migration_state() {
    # Reconcile migration state after completion (idempotency/self-healing)
    # Ensures reverse DDL trigger exists even if migration was completed with older version
    # Safe to run multiple times - detects and fixes missing components

    ocf_log info "Reconciling migration state (verifying reverse DDL trigger)..."

    # Discover target cluster primary
    local target_primary=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted" 2>/dev/null)
    if [ -z "$target_primary" ]; then
        ocf_log warn "Cannot discover target cluster primary - skipping reconciliation"
        return 0  # Non-fatal: migration is complete, just can't verify
    fi

    # Discover source cluster primary (needed for DDL trigger connection)
    local source_primary=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted" 2>/dev/null)
    if [ -z "$source_primary" ]; then
        ocf_log warn "Cannot discover source cluster primary - skipping DDL trigger check"
        return 0  # Non-fatal
    fi

    # Check if reverse DDL trigger exists on target
    local trigger_count=$(run_as_postgres_user "target" psql -h "$target_primary" -p "$OCF_RESKEY_pgport" \
        -U "$OCF_RESKEY_migration_dbuser" -d "$OCF_RESKEY_pgdatabase" -Atc \
        "SELECT COUNT(*) FROM pg_event_trigger WHERE evtname = 'replicate_ddl_to_source_trigger';" 2>/dev/null)

    if [ "$trigger_count" = "1" ]; then
        ocf_log info "✓ Reverse DDL trigger exists (state consistent)"
        return 0
    elif [ "$trigger_count" = "0" ]; then
        ocf_log warn "⚠ Reverse DDL trigger missing (migration completed with older version)"
        ocf_log info "Auto-healing: Creating reverse DDL trigger now..."

        if setup_ddl_trigger "target" "$target_primary" "$source_primary" "source" "false"; then
            ocf_log info "✓ Reverse DDL trigger created (state reconciled)"
            ocf_log info "  Future CREATE TABLE on target will auto-create on source"
        else
            ocf_log warn "Failed to create reverse DDL trigger - manual setup may be required"
            ocf_log warn "  See: FEATURE_REVERSE_DDL_REPLICATION_v1.0.4.md for manual steps"
        fi
    else
        ocf_log warn "Unexpected trigger count: $trigger_count (expected 0 or 1)"
    fi

    return 0  # Always succeed - reconciliation failures are non-fatal
}

pgtwin_migrate_start() {
    ocf_log info "Starting pgtwin-migrate v2.0"

    # v2.0: Determine which cluster should be production
    local desired_production="$OCF_RESKEY_production_cluster"
    local current_production=$(crm_attribute -q -n migration-production-cluster 2>/dev/null)

    # If no CIB state, initialize to desired (defaults to source_cluster)
    if [ -z "$current_production" ]; then
        current_production="$desired_production"
    fi

    # Determine replication direction based on production cluster
    local active_direction
    if [ "$current_production" = "$OCF_RESKEY_source_cluster" ]; then
        active_direction="forward"
        ocf_log info "Production cluster: $current_production (source) - forward replication active"
    else
        active_direction="reverse"
        ocf_log info "Production cluster: $current_production (target) - reverse replication active"
    fi

    # v2.0: Dynamic database discovery
    if ! get_database_list; then
        ocf_log err "Failed to get database list"
        return $OCF_ERR_GENERIC
    fi
    ocf_log info "Database list: $DB_LIST ($DB_COUNT databases)"

    # Check migration state from cluster properties (cluster-wide, persistent)
    # Note: Attribute persists until explicitly deleted - handle stale values
    local current_state=$(crm_attribute -G -n migration-state -q 2>/dev/null)

    # Handle active migration in progress - validate state against reality
    if [ "$current_state" = "FORWARD_REPLICATION" ]; then
        ocf_log info "Migration state shows FORWARD_REPLICATION - validating against actual state"

        # Check if forward replication infrastructure actually exists
        local check_source_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
        local state_is_valid=false

        if [ -n "$check_source_node" ]; then
            local first_db=$(echo $DB_LIST | awk '{print $1}')
            local first_pub="pgtwin_migrate_forward_pub_${first_db}"
            if check_publication_exists "source" "$OCF_RESKEY_source_replication_vip" "$first_pub" "$first_db"; then
                state_is_valid=true
            fi
        fi

        if [ "$state_is_valid" = "true" ]; then
            ocf_log info "✓ FORWARD_REPLICATION state validated - publications exist"
            ocf_log info "pgtwin-migrate already running (forward replication active)"

            # Ensure replication slots exist (recovery from cluster failover/restart)
            local target_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
            if [ -n "$check_source_node" ] && [ -n "$target_node" ]; then
                ensure_forward_replication_slots "$check_source_node" "$OCF_RESKEY_target_replication_vip"
            fi

            return $OCF_SUCCESS
        else
            # State says FORWARD_REPLICATION but no publications exist - stale state
            ocf_log warn "FORWARD_REPLICATION state is stale (no publications found)"
            ocf_log info "Clearing stale state and proceeding with fresh setup"
            crm_attribute -D -n migration-state 2>/dev/null || true
            current_state=""
        fi
    fi

    # Handle completed migration - validate state and reconcile
    if [ "$current_state" = "CUTOVER_COMPLETE" ]; then
        ocf_log info "Migration state shows CUTOVER_COMPLETE - validating against actual state"

        # Check if reverse replication infrastructure actually exists
        local target_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
        local state_is_valid=false

        if [ -n "$target_node" ]; then
            local first_db=$(echo $DB_LIST | awk '{print $1}')
            local first_reverse_pub="pgtwin_migrate_reverse_pub_${first_db}"
            if check_publication_exists "target" "$OCF_RESKEY_target_replication_vip" "$first_reverse_pub" "$first_db"; then
                state_is_valid=true
            fi
        fi

        if [ "$state_is_valid" = "true" ]; then
            ocf_log info "=========================================="
            ocf_log info "✓ Migration already COMPLETED (validated)"
            ocf_log info "=========================================="
            ocf_log info "Forward migration: ${OCF_RESKEY_source_cluster} → ${OCF_RESKEY_target_cluster} (complete)"
            ocf_log info "Production VIP (${OCF_RESKEY_production_vip_resource}): Points to target cluster"
            ocf_log info "Reverse replication: ${OCF_RESKEY_target_cluster} → ${OCF_RESKEY_source_cluster} (active)"
            ocf_log info ""

            # v1.0.6: Reconcile state (idempotency/self-healing)
            reconcile_migration_state

            # Ensure replication slots exist (recovery from cluster failover/restart)
            local source_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
            if [ -n "$source_node" ] && [ -n "$target_node" ]; then
                # After cutover, reverse replication is active (target → source)
                ensure_reverse_replication_slots "$OCF_RESKEY_source_replication_vip" "$target_node"
            fi

            ocf_log info "To migrate BACK to source cluster (reverse migration):"
            ocf_log info "  1. Stop this completed resource: crm resource stop ${OCF_RESOURCE_INSTANCE}"
            ocf_log info "  2. Create NEW migration resource with swapped source/target:"
            ocf_log info "     - source_cluster=${OCF_RESKEY_target_cluster}"
            ocf_log info "     - target_cluster=${OCF_RESKEY_source_cluster}"
            ocf_log info "     - Swap VIP resources accordingly"
            ocf_log info "  3. The existing reverse replication becomes forward replication for new migration"
            ocf_log info ""
            ocf_log info "To permanently clean up this resource:"
            ocf_log info "  crm configure delete ${OCF_RESOURCE_INSTANCE}"
            ocf_log info "=========================================="
            return $OCF_SUCCESS
        else
            # State says CUTOVER_COMPLETE but no reverse publications exist - stale state
            ocf_log warn "CUTOVER_COMPLETE state is stale (no reverse publications found)"
            ocf_log info "Clearing stale state and proceeding with fresh setup"
            crm_attribute -D -n migration-state 2>/dev/null || true
            current_state=""
        fi
    fi

    # v2.0: Check if setup is complete by verifying publications exist for active direction
    # Publications are created on the PRODUCTION cluster (forward=source, reverse=target)
    local pub_cluster_type pub_host
    if [ "$active_direction" = "forward" ]; then
        pub_cluster_type="source"
        pub_host="$OCF_RESKEY_source_replication_vip"
    else
        pub_cluster_type="target"
        pub_host="$OCF_RESKEY_target_replication_vip"
    fi

    local check_pub_node=$(discover_cluster_node "$([ "$active_direction" = "forward" ] && echo "$OCF_RESKEY_source_cluster" || echo "$OCF_RESKEY_target_cluster")" "Promoted")
    if [ -n "$check_pub_node" ]; then
        # Check if publications exist for first database as indicator of setup completion
        local first_db=$(echo $DB_LIST | awk '{print $1}')
        local first_pub="pgtwin_migrate_${active_direction}_pub_${first_db}"
        if check_publication_exists "$pub_cluster_type" "$pub_host" "$first_pub" "$first_db"; then
            ocf_log info "pgtwin-migrate already running ($active_direction publications exist)"

            # Ensure replication slots exist (recovery from cluster failover/restart)
            if [ "$active_direction" = "forward" ]; then
                local target_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
                if [ -n "$target_node" ]; then
                    ensure_forward_replication_slots "$check_pub_node" "$OCF_RESKEY_target_replication_vip"
                fi
            else
                local source_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
                if [ -n "$source_node" ]; then
                    ensure_reverse_replication_slots "$OCF_RESKEY_source_replication_vip" "$check_pub_node"
                fi
            fi

            return $OCF_SUCCESS
        fi
    fi

    ocf_log info "No existing $active_direction replication detected, proceeding with setup"

    # Initialize database migration (creates baseline and state files)
    init_database_migration

    ocf_log info "=========================================="
    ocf_log info "Setting up $active_direction replication for ${DB_COUNT} database(s)"
    ocf_log info "Databases: $(echo $DB_LIST | tr ' ' ',')"
    ocf_log info "Production cluster: $current_production"
    ocf_log info "=========================================="

    # v2.0: Determine publishing and subscribing clusters based on direction
    local pub_cluster sub_cluster pub_cluster_name sub_cluster_name
    if [ "$active_direction" = "forward" ]; then
        pub_cluster="source"
        sub_cluster="target"
        pub_cluster_name="$OCF_RESKEY_source_cluster"
        sub_cluster_name="$OCF_RESKEY_target_cluster"
    else
        pub_cluster="target"
        sub_cluster="source"
        pub_cluster_name="$OCF_RESKEY_target_cluster"
        sub_cluster_name="$OCF_RESKEY_source_cluster"
    fi

    # Step 1: Discover publishing cluster Promoted node
    ocf_log info "Discovering publishing cluster Promoted node from ${pub_cluster_name}"
    local pub_node=$(discover_cluster_node "$pub_cluster_name" "Promoted")
    if [ -z "$pub_node" ]; then
        ocf_log err "Failed to discover publishing cluster Promoted node"
        return $OCF_ERR_GENERIC
    fi
    ocf_log info "Publishing cluster Promoted node: ${pub_node}"

    # Step 2: Discover subscribing cluster Promoted node
    ocf_log info "Discovering subscribing cluster Promoted node from ${sub_cluster_name}"
    local sub_node=$(discover_cluster_node "$sub_cluster_name" "Promoted")
    if [ -z "$sub_node" ]; then
        ocf_log err "Failed to discover subscribing cluster Promoted node"
        return $OCF_ERR_GENERIC
    fi
    ocf_log info "Subscribing cluster Promoted node: ${sub_node}"

    # Step 3: Check replication slot availability on publishing cluster
    if [ -n "$pub_node" ]; then
        if ! check_replication_slot_availability "$pub_node" "$DB_COUNT"; then
            return $OCF_ERR_GENERIC
        fi
    fi

    # Step 4: If using Unpromoted source node role (forward only), call pg_log_standby_snapshot()
    # This prevents "idle primary" hanging issue when creating subscriptions from standby
    if [ "$active_direction" = "forward" ] && [ "$OCF_RESKEY_source_node_role" = "Unpromoted" ]; then
        ocf_log info "Source using Unpromoted node, calling pg_log_standby_snapshot() on source primary"
        local source_primary=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
        if [ -n "$source_primary" ]; then
            ocf_log info "Calling pg_log_standby_snapshot() on source primary: ${source_primary}"
            run_as_postgres_user "source" psql -h "$source_primary" -p "$OCF_RESKEY_pgport" -U "$OCF_RESKEY_migration_dbuser" -d "$OCF_RESKEY_pgdatabase" -Atc \
                "SELECT pg_log_standby_snapshot();" >/dev/null 2>&1
        fi
    fi

    # Step 5: Create replication infrastructure using unified function
    ocf_log info "Step 5: Setting up $active_direction replication for all databases"
    if ! create_replication_direction "$active_direction"; then
        ocf_log err "$active_direction replication setup failed"
        return $OCF_ERR_GENERIC
    fi

    ocf_log info "✓ $active_direction replication configured for all ${DB_COUNT} database(s)"

    # Step 6: Wait a moment for subscriptions to initialize
    sleep 2

    # Step 7: Verify replication is working for ALL databases
    ocf_log info "Verifying subscription states (${DB_COUNT} databases)"
    for db in $DB_LIST; do
        local sub_name="pgtwin_migrate_${active_direction}_sub_${db}"
        local sub_state=$(get_subscription_state "$sub_cluster" "$sub_node" "$sub_name" "$db")
        if [ -z "$sub_state" ]; then
            ocf_log warn "Could not verify subscription state for database: $db"
        else
            local state=$(echo "$sub_state" | cut -d'|' -f2)
            ocf_log info "Database $db subscription state: ${state}"

            if [ "$state" != "streaming" ] && [ "$state" != "catchup" ]; then
                ocf_log warn "Subscription for database $db is not in streaming/catchup state: ${state}"
            fi
        fi
    done

    # Step 8: Set CIB state and production cluster
    local state_value
    if [ "$active_direction" = "forward" ]; then
        state_value="FORWARD_REPLICATION"
    else
        state_value="CUTOVER_COMPLETE"
    fi
    crm_attribute -n migration-state -v "$state_value" 2>/dev/null || true
    crm_attribute -n migration-production-cluster -v "$current_production" 2>/dev/null || true

    ocf_log info "$active_direction replication setup completed successfully"
    if [ "$active_direction" = "forward" ]; then
        ocf_log info "Publications on Source: ${pub_node} @ ${OCF_RESKEY_source_replication_vip}"
        ocf_log info "Subscriptions on Target: ${sub_node} @ ${OCF_RESKEY_target_replication_vip}"
        ocf_log info ""
        ocf_log info "To cutover to target cluster:"
        ocf_log info "  crm resource update ${OCF_RESOURCE_INSTANCE} production_cluster=${OCF_RESKEY_target_cluster}"
    else
        ocf_log info "Publications on Target: ${pub_node} @ ${OCF_RESKEY_target_replication_vip}"
        ocf_log info "Subscriptions on Source: ${sub_node} @ ${OCF_RESKEY_source_replication_vip}"
        ocf_log info ""
        ocf_log info "To failback to source cluster:"
        ocf_log info "  crm resource update ${OCF_RESOURCE_INSTANCE} production_cluster=${OCF_RESKEY_source_cluster}"
    fi

    return $OCF_SUCCESS
}

pgtwin_migrate_stop() {
    ocf_log info "Stopping pgtwin-migrate v2.0"

    # v2.0: Check finalize_replication flag
    if ocf_is_true "$OCF_RESKEY_finalize_replication"; then
        ocf_log info "finalize_replication=true - performing full cleanup"

        # Get database list for cleanup
        get_database_list >/dev/null 2>&1

        # Call finalize function to clean up everything
        finalize_all_replication
        return $OCF_SUCCESS
    fi

    # v2.0: Normal stop - preserve infrastructure for restart/migration
    ocf_log info "Stopping migration resource (infrastructure preserved)"
    ocf_log info "Replication publications, subscriptions, and slots remain in place"
    ocf_log info ""
    ocf_log info "To clean up all replication infrastructure:"
    ocf_log info "  1. crm resource update ${OCF_RESOURCE_INSTANCE} finalize_replication=true"
    ocf_log info "  2. crm resource stop ${OCF_RESOURCE_INSTANCE}"
    ocf_log info "  3. crm configure delete ${OCF_RESOURCE_INSTANCE}"

    return $OCF_SUCCESS
}

pgtwin_migrate_monitor() {
    # Monitor the replication status (v2.0)

    # v2.0: Dynamic database discovery
    get_database_list >/dev/null 2>&1
    if [ -z "$DB_LIST" ]; then
        ocf_log warn "No databases discovered for monitoring"
    fi

    # Determine current production cluster
    local current_production=$(get_current_production_cluster)
    local active_direction="forward"
    if [ "$current_production" = "$OCF_RESKEY_target_cluster" ]; then
        active_direction="reverse"
    fi

    # Step 0: Check if cutover in progress
    local current_state=$(crm_attribute -G -n migration-state -q 2>/dev/null)
    if [ "$current_state" = "CUTOVER_IN_PROGRESS" ]; then
        ocf_log info "Cutover in progress..."
        return $OCF_SUCCESS
    fi

    # v2.0: Removed CUTOVER_COMPLETE auto-stop - resource stays running for bidirectional failover
    if [ "$current_state" = "CUTOVER_COMPLETE" ]; then
        ocf_log info "=========================================="
        ocf_log info "✓ MIGRATION COMPLETE (detected in monitor)"
        ocf_log info "=========================================="
        ocf_log info "Forward replication: Source → Target (disabled)"
        ocf_log info "Reverse replication: Target → Source (active)"
        ocf_log info "Production cluster: Target"
        ocf_log info "Backup cluster: Source"
        ocf_log info ""
        ocf_log info "NOTE: This is a safety check - auto-stop should have triggered during cutover"
        ocf_log info "=========================================="

        # Auto-stop the migration resource by setting target-role=Stopped
        # This is a SAFETY NET in case check_cutover_progress() didn't run
        # (e.g., resource manually restarted after completion)
        ocf_log info "Safety: Setting migration resource target-role=Stopped..."
        crm_resource --meta --resource "$OCF_RESOURCE_INSTANCE" --set-parameter target-role --parameter-value Stopped 2>&1 | \
            while IFS= read -r line; do ocf_log info "crm_resource: $line"; done

        # Clean up cluster attribute (no longer needed after migration complete)
        # Note: Attribute is in cluster properties and persists until explicitly deleted
        ocf_log info "Cleaning up migration-state cluster attribute..."
        crm_attribute -D -n migration-state 2>&1 | \
            while IFS= read -r line; do ocf_log info "crm_attribute cleanup: $line"; done

        # Return SUCCESS for this final monitor cycle
        # Next cycle won't run because target-role=Stopped
        return $OCF_SUCCESS
    fi

    # Track monitor cycles for better startup logging
    local state_dir="${HA_RSCTMP}"
    local monitor_counter_file="${state_dir}/pgtwin_migrate_${OCF_RESOURCE_INSTANCE}_monitor_count"
    local monitor_count=0

    if [ -f "$monitor_counter_file" ]; then
        monitor_count=$(cat "$monitor_counter_file" 2>/dev/null || echo "0")
    fi
    monitor_count=$((monitor_count + 1))
    echo "$monitor_count" > "$monitor_counter_file"

    # Step 1: Discover source and target nodes (based on configured roles)
    local source_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "$OCF_RESKEY_source_node_role")
    local target_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")

    if [ -z "$source_node" ] || [ -z "$target_node" ]; then
        if [ $monitor_count -le 5 ]; then
            ocf_log info "Monitor cycle $monitor_count: Waiting for clusters to start (source_node=$source_node, target_node=$target_node)"
            ocf_log info "This is EXPECTED during initial startup - clusters may still be promoting primaries"
        else
            ocf_log warn "Cannot monitor: failed to discover cluster nodes after $monitor_count attempts"
            ocf_log warn "Source cluster: $OCF_RESKEY_source_cluster, Source node: ${source_node:-NOT FOUND}"
            ocf_log warn "Target cluster: $OCF_RESKEY_target_cluster, Target node: ${target_node:-NOT FOUND}"
        fi
        return $OCF_NOT_RUNNING
    fi

    # Step 1b: Periodic slot health check (every 5th monitor cycle to reduce overhead)
    # This is a safety net for slot recovery if start/notify didn't catch it
    if [ $((monitor_count % 5)) -eq 0 ]; then
        local slot_check_host="$source_node"
        if [ "$OCF_RESKEY_source_node_role" = "Unpromoted" ]; then
            slot_check_host=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
        fi
        if [ -n "$slot_check_host" ]; then
            ensure_forward_replication_slots "$slot_check_host" "$OCF_RESKEY_target_replication_vip"
        fi
    fi

    # Step 2: Check publications and subscriptions for ALL databases
    local source_primary=""
    if [ "$OCF_RESKEY_source_node_role" = "Unpromoted" ]; then
        source_primary=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
    else
        source_primary="$source_node"
    fi

    local max_lag=0
    local db_count_found=0

    for db in $DB_LIST; do
        local pub_name="pgtwin_migrate_forward_pub_${db}"
        local sub_name="pgtwin_migrate_forward_sub_${db}"

        ((db_count_found++))

        # Monitor subscription state and lag (non-fatal - just report status)
        local sub_state=$(get_subscription_state "target" "$target_node" "$sub_name" "$db")
        if [ -z "$sub_state" ]; then
            ocf_log debug "Could not get subscription state for database: $db (may still be initializing)"
            continue
        fi

        local state=$(echo "$sub_state" | cut -d'|' -f2)
        local lag_bytes=$(echo "$sub_state" | cut -d'|' -f3)

        ocf_log debug "Database $db: subscription=${sub_name}, state=${state}, lag=${lag_bytes} bytes"

        # Log subscription state for monitoring (non-fatal)
        if [ "$state" != "streaming" ] && [ "$state" != "catchup" ]; then
            ocf_log info "Database $db: Subscription state: ${state} (waiting for streaming)"
        fi

        # Track maximum lag
        if [ "$lag_bytes" -gt "$max_lag" ]; then
            max_lag=$lag_bytes
        fi

        # Update per-database state file
        update_database_state "$db" "$lag_bytes" "false" "false"
    done

    # Log aggregate lag if significant
    if [ "$max_lag" -gt "$OCF_RESKEY_lag_threshold" ]; then
        ocf_log info "Maximum replication lag across ${DB_COUNT} database(s): ${max_lag} bytes (threshold: ${OCF_RESKEY_lag_threshold})"
    fi

    # Auto-sync subscription for new tables (every 2 minutes)
    # Only run on the target node itself (where subscription exists)
    local current_node=$(crm_node -n)

    ocf_log debug "Auto-sync check: current_node=$current_node, target_node=$target_node"

    if [ "$current_node" = "$target_node" ]; then
        # We're running on the target node - execute refresh locally
        local state_dir="${HA_RSCTMP}"
        local counter_file="${state_dir}/pgtwin_migrate_${OCF_RESOURCE_INSTANCE}_counter"
        local counter=0

        # Read current counter
        if [ -f "$counter_file" ]; then
            counter=$(cat "$counter_file" 2>/dev/null || echo "0")
        fi

        # Increment counter
        counter=$((counter + 1))
        echo "$counter" > "$counter_file"

        # Refresh subscriptions every 12 monitor cycles (120 seconds with 10s interval)
        if [ $((counter % 12)) -eq 0 ]; then
            ocf_log info "Running subscription auto-sync for ${DB_COUNT} database(s) locally (cycle $counter on $current_node)"

            # Execute subscription refresh sequence LOCALLY for ALL databases (no SSH needed!)
            for db in $DB_LIST; do
                local sub_name="pgtwin_migrate_forward_sub_${db}"

                ocf_log debug "Auto-sync database: $db (subscription: $sub_name)"

                su - postgres -c "psql -d $db -c 'ALTER SUBSCRIPTION ${sub_name} DISABLE'" >/dev/null 2>&1
                sleep 1
                su - postgres -c "psql -d $db -c 'ALTER SUBSCRIPTION ${sub_name} ENABLE'" >/dev/null 2>&1
                sleep 1
                su - postgres -c "psql -d $db -c 'ALTER SUBSCRIPTION ${sub_name} REFRESH PUBLICATION WITH (copy_data = true)'" >/dev/null 2>&1
            done

            if [ $? -eq 0 ]; then
                ocf_log info "Subscription auto-sync completed for all ${DB_COUNT} database(s) on $current_node"
            else
                ocf_log warn "Subscription auto-sync failed (non-critical)"
            fi
        fi
    else
        ocf_log debug "Monitor running on $current_node (target is $target_node), skipping auto-sync"
    fi

    # v2.0: Check for cutover by comparing production_cluster parameter with CIB state
    # This replaces the old cutover_ready=true mechanism
    local target_pghome=$(get_cluster_pghome "target")
    local cutover_state_file="${target_pghome}/.cutover_in_progress_${OCF_RESOURCE_INSTANCE}"

    # Check if cutover already in progress
    if [ -f "$cutover_state_file" ]; then
        # Cutover already started, check progress
        check_cutover_progress
        return $?
    fi

    # v2.0: Fallback cutover detection (every 5th cycle to reduce overhead)
    # Primary detection is via notify handler, this is the safety net
    if [ $((monitor_count % 5)) -eq 0 ]; then
        local current_production=$(get_current_production_cluster)
        local desired_production="$OCF_RESKEY_production_cluster"

        if [ -n "$desired_production" ] && [ "$desired_production" != "$current_production" ]; then
            ocf_log info "Monitor detected production_cluster change: $current_production → $desired_production"
            check_and_trigger_cutover
            return $?
        fi
    fi

    # v2.0: Check for databases without replication (every 10th cycle)
    # This handles: new databases created after migration started, missed databases, restarts
    if [ $((monitor_count % 10)) -eq 0 ]; then
        local current_production=$(get_current_production_cluster)
        local prod_type="source"
        local pub_vip="$OCF_RESKEY_source_replication_vip"
        if [ "$current_production" = "$OCF_RESKEY_target_cluster" ]; then
            prod_type="target"
            pub_vip="$OCF_RESKEY_target_replication_vip"
        fi

        # Determine active direction based on production cluster
        local check_direction="forward"
        if [ "$current_production" = "$OCF_RESKEY_target_cluster" ]; then
            check_direction="reverse"
        fi

        # Find databases that don't have publications set up yet
        local dbs_without_replication=""
        for db in $DB_LIST; do
            local pub_name="pgtwin_migrate_${check_direction}_pub_${db}"
            if ! check_publication_exists "$prod_type" "$pub_vip" "$pub_name" "$db" 2>/dev/null; then
                dbs_without_replication="$dbs_without_replication $db"
            fi
        done
        dbs_without_replication=$(echo "$dbs_without_replication" | sed 's/^ //')

        if [ -n "$dbs_without_replication" ]; then
            ocf_log info "Database(s) without $check_direction replication: $dbs_without_replication"

            # Determine pub/sub nodes
            local pub_node sub_node
            if [ "$check_direction" = "forward" ]; then
                pub_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
                sub_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
            else
                pub_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")
                sub_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
            fi

            if [ -n "$pub_node" ] && [ -n "$sub_node" ]; then
                # Check slot availability for new databases
                local new_db_count=$(echo "$dbs_without_replication" | wc -w)
                if check_replication_slot_availability_v2 "$prod_type" "$new_db_count"; then
                    # Setup replication for each database without replication
                    # setup_database_replication creates the DB on subscriber if needed
                    for db in $dbs_without_replication; do
                        ocf_log info "Setting up $check_direction replication for database: $db"
                        if setup_database_replication "$db" "$check_direction" "$pub_node" "$sub_node" "false" "false"; then
                            # Setup DDL trigger for the database
                            if [ "$check_direction" = "forward" ]; then
                                setup_ddl_trigger "source" "$pub_node" "$sub_node" "target" "false" "$db"
                            else
                                setup_ddl_trigger "target" "$pub_node" "$sub_node" "source" "false" "$db"
                            fi
                            ocf_log info "✓ Replication setup complete for database: $db"
                        else
                            ocf_log err "Failed to setup replication for database: $db"
                        fi
                    done
                fi
            else
                ocf_log warn "Cannot setup replication: cluster nodes not available"
            fi
        fi
    fi

    # Everything looks good
    ocf_log debug "Replication is running: max_lag=${max_lag} bytes"
    return $OCF_SUCCESS
}

pgtwin_migrate_notify() {
    # Handle Pacemaker notifications for slot recovery after cluster events
    # Notifications are received when source/target clusters have state changes
    #
    # To enable notifications, configure ordering with notify:
    #   order migration-after-source Optional: postgres-clone migration-forward
    # And set notify=true on migration-forward:
    #   meta notify="true"
    #
    # Environment variables set by Pacemaker:
    #   OCF_RESKEY_CRM_meta_notify_type: pre/post
    #   OCF_RESKEY_CRM_meta_notify_operation: start/stop/promote/demote
    #   OCF_RESKEY_CRM_meta_notify_start_uname: nodes starting
    #   OCF_RESKEY_CRM_meta_notify_promote_uname: nodes promoting

    local notify_type="${OCF_RESKEY_CRM_meta_notify_type}"
    local notify_op="${OCF_RESKEY_CRM_meta_notify_operation}"

    ocf_log info "Received notification: type=${notify_type}, operation=${notify_op}"

    # Only act on post-promote notifications (after a new primary is established)
    if [ "$notify_type" = "post" ] && [ "$notify_op" = "promote" ]; then
        ocf_log info "Post-promote notification: checking replication slot health"

        # Discover current cluster state
        local source_node=$(discover_cluster_node "$OCF_RESKEY_source_cluster" "Promoted")
        local target_node=$(discover_cluster_node "$OCF_RESKEY_target_cluster" "Promoted")

        if [ -z "$source_node" ] || [ -z "$target_node" ]; then
            ocf_log warn "Cannot check slots: cluster nodes not yet available"
            return $OCF_SUCCESS
        fi

        # Check migration state to determine which slots to recover
        local current_state=$(crm_attribute -G -n migration-state -q 2>/dev/null)

        if [ "$current_state" = "CUTOVER_COMPLETE" ]; then
            # After cutover, only reverse replication is active
            ocf_log info "Migration complete: checking reverse replication slots"
            ensure_reverse_replication_slots "$OCF_RESKEY_source_replication_vip" "$target_node"
        else
            # Before cutover, forward replication is active
            ocf_log info "Migration in progress: checking forward replication slots"
            ensure_forward_replication_slots "$source_node" "$OCF_RESKEY_target_replication_vip"
        fi
    fi

    return $OCF_SUCCESS
}

#######################################################################
# Main

case "$__OCF_ACTION" in
meta-data)          pgtwin_migrate_meta_data
                    exit $OCF_SUCCESS
                    ;;
usage|help)         pgtwin_migrate_usage
                    exit $OCF_SUCCESS
                    ;;
internal-cutover)   # Skip validation for internal-cutover (runs without OCF env vars)
                    pgtwin_migrate_prepare_cutover
                    exit $?
                    ;;
internal-cutover-v2)   # v2.0: Bidirectional cutover with delete/create
                    pgtwin_migrate_prepare_cutover_v2
                    exit $?
                    ;;
esac

# Validate configuration for all actions except meta-data and internal-cutover
pgtwin_migrate_validate
rc=$?
if [ $rc -ne $OCF_SUCCESS ]; then
    case "$__OCF_ACTION" in
    stop)       exit $OCF_SUCCESS ;;
    monitor)    exit $OCF_NOT_RUNNING ;;
    *)          exit $rc ;;
    esac
fi

case "$__OCF_ACTION" in
start)              pgtwin_migrate_start ;;
stop)               pgtwin_migrate_stop ;;
monitor)            pgtwin_migrate_monitor ;;
notify)             pgtwin_migrate_notify ;;
validate-all)       exit $OCF_SUCCESS ;;
*)                  pgtwin_migrate_usage
                    exit $OCF_ERR_UNIMPLEMENTED
                    ;;
esac

rc=$?
ocf_log debug "${OCF_RESOURCE_INSTANCE} $__OCF_ACTION : $rc"
exit $rc
