diff --git a/python/cudaq/contrib/__init__.py b/python/cudaq/contrib/__init__.py index 968595cc7ac..1aa337465b8 100644 --- a/python/cudaq/contrib/__init__.py +++ b/python/cudaq/contrib/__init__.py @@ -7,3 +7,4 @@ # ============================================================================ # from .qiskit_convert import from_qiskit, from_qasm +from .propagator import propagator diff --git a/python/cudaq/contrib/propagator.py b/python/cudaq/contrib/propagator.py new file mode 100644 index 00000000000..d432b9e13c9 --- /dev/null +++ b/python/cudaq/contrib/propagator.py @@ -0,0 +1,246 @@ +# ============================================================================ # +# Copyright (c) 2022 - 2026 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from typing import Optional + +import cudaq +import numpy as np + + +def _total_dimension(dimensions: Mapping[int, int]) -> int: + """Return the product of all local Hilbert-space dimensions.""" + dimension = 1 + for local_dimension in dimensions.values(): + dimension *= local_dimension + return dimension + + +def _identity_state(dimension: int): + """Return |I> used to evolve closed-system propagators directly.""" + identity = np.eye(dimension, dtype=np.complex128).reshape(-1) + return cudaq.State.from_data(identity) + + +def _basis_states(dimension: int): + """Return `Liouville` basis states used to reconstruct Lindblad maps.""" + states = [] + for index in range(dimension): + data = np.zeros(dimension, dtype=np.complex128) + data[index] = 1.0 + states.append(cudaq.State.from_data(data)) + return states + + +def _state_to_matrix(state, dimension: int) -> np.ndarray: + """Convert a `vectorized` propagated identity state back to a matrix.""" + data = np.array(state).reshape(-1) + expected_size = dimension * dimension + + if data.size != expected_size: + raise RuntimeError("Expected propagator state with size " + f"{expected_size}, got {data.size}.") + + return data.reshape((dimension, dimension)).T + + +def _closed_system_generator(hamiltonian): + """Represent `dU/dt = -i H U` as left multiplication by `-i H`.""" + generator = cudaq.SuperOperator() + generator += cudaq.SuperOperator.left_multiply(-1j * hamiltonian) + return generator + + +def _extract_propagator(result, dimension: int, + store_intermediate_results: bool): + """Extract closed-system propagators from a CUDA-Q evolve result.""" + if store_intermediate_results: + return [ + _state_to_matrix(state, dimension) + for state in result.intermediate_states() + ] + + return _state_to_matrix(result.final_state(), dimension) + + +def _extract_batched_basis_propagator(results, + store_intermediate_results: bool): + """Stack evolved `Liouville` basis states into dense Lindblad maps.""" + if store_intermediate_results: + intermediate_states = [ + list(single_result.intermediate_states()) + for single_result in results + ] + return [ + np.column_stack([ + np.array(states[time_index]).reshape(-1) + for states in intermediate_states + ]) + for time_index in range(len(intermediate_states[0])) + ] + + columns = [ + np.array(single_result.final_state()).reshape(-1) + for single_result in results + ] + return np.column_stack(columns) + + +def _is_operator_like(value) -> bool: + """Return True for CUDA-Q operators accepted by the dynamics backend.""" + return hasattr(value, "to_matrix") + + +def _is_collapse_operator_batch(collapse_operators) -> bool: + """Return True when collapse operators are grouped per Hamiltonian.""" + return bool(collapse_operators) and not _is_operator_like( + collapse_operators[0]) + + +def _collapse_operator_batches(collapse_operators, batch_size: int): + """Broadcast collapse operators to match the Hamiltonian batch size.""" + if not collapse_operators: + return [[] for _ in range(batch_size)] + + if _is_collapse_operator_batch(collapse_operators): + if len(collapse_operators) != batch_size: + raise ValueError("Batched collapse_operators must have the same " + "length as the Hamiltonian batch.") + return [list(ops) for ops in collapse_operators] + + return [collapse_operators for _ in range(batch_size)] + + +def propagator( + hamiltonian, + dimensions: Mapping[int, int], + schedule, + *, + collapse_operators=None, + store_intermediate_results: bool = False, + integrator=None, + max_batch_size: Optional[int] = None, +): + """Compute dynamics propagators. + + For closed-system dynamics, computes the matrix U satisfying the + Schrodinger-picture propagator equation with initial condition U(t0) = I. + + For open-system dynamics with collapse operators, computes the Lindblad + map S with initial condition S(t0) = I. This map acts on density + matrices after matrix-to-vector reshaping and propagates rho(t0) to + rho(t). + + Args: + hamiltonian: CUDA-Q operator H(t), or a sequence of operators for + batched propagator computation. + dimensions: Mapping from degree-of-freedom index to local dimension. + schedule: CUDA-Q dynamics schedule. + collapse_operators: Optional sequence of Lindblad collapse operators. + If provided, the helper returns the Lindblad map. + store_intermediate_results: If True, return propagators at the + intermediate schedule points saved by the dynamics backend. + integrator: Optional dynamics integrator. + max_batch_size: Optional maximum batch size for the dynamics backend. + + Returns: + For closed-system dynamics, returns a dense complex NumPy array with + shape ``(dim, dim)``. + + For open-system dynamics, returns a dense complex NumPy array with + shape ``(dim**2, dim**2)``. + + If ``store_intermediate_results`` is True, returns a list of dense + matrices. For a sequence of Hamiltonians, returns one such result per + Hamiltonian. + """ + collapse_operators = [] if collapse_operators is None else list( + collapse_operators) + + is_batched = isinstance(hamiltonian, + Sequence) and not hasattr(hamiltonian, "to_matrix") + hamiltonians = list(hamiltonian) if is_batched else [hamiltonian] + collapse_operator_batches = _collapse_operator_batches( + collapse_operators, len(hamiltonians)) + open_system = any(collapse_operator_batches) + + system_dimension = _total_dimension(dimensions) + propagator_dimension = (system_dimension * system_dimension + if open_system else system_dimension) + evolution_dimensions = dimensions + + if open_system: + generators = hamiltonians + else: + generators = [_closed_system_generator(h) for h in hamiltonians] + + if open_system: + initial_states = [ + _basis_states(propagator_dimension) for _ in generators + ] + else: + initial_states = [ + _identity_state(propagator_dimension) for _ in generators + ] + + save_mode = (cudaq.IntermediateResultSave.ALL if store_intermediate_results + else cudaq.IntermediateResultSave.NONE) + + evolve_collapse_operators = [] + if open_system: + evolve_collapse_operators = (collapse_operator_batches if is_batched + else collapse_operator_batches[0]) + + evolve_generators = generators if is_batched else generators[0] + evolve_initial_states = initial_states if is_batched else initial_states[0] + + if open_system and is_batched: + evolve_generators = [] + evolve_initial_states = [] + evolve_collapse_operators = [] + for generator, basis_states, collapse_batch in zip( + generators, initial_states, collapse_operator_batches): + for basis_state in basis_states: + evolve_generators.append(generator) + evolve_initial_states.append(basis_state) + evolve_collapse_operators.append(collapse_batch) + + result = cudaq.evolve( + evolve_generators, + evolution_dimensions, + schedule, + evolve_initial_states, + collapse_operators=evolve_collapse_operators, + observables=[], + store_intermediate_results=save_mode, + integrator=integrator, + max_batch_size=max_batch_size, + ) + + if open_system: + if is_batched: + return [ + _extract_batched_basis_propagator( + result[index * propagator_dimension:(index + 1) * + propagator_dimension], store_intermediate_results) + for index in range(len(generators)) + ] + return _extract_batched_basis_propagator(result, + store_intermediate_results) + + if is_batched: + return [ + _extract_propagator(single_result, propagator_dimension, + store_intermediate_results) + for single_result in result + ] + + return _extract_propagator(result, propagator_dimension, + store_intermediate_results) diff --git a/python/tests/contrib/test_propagator.py b/python/tests/contrib/test_propagator.py new file mode 100644 index 00000000000..3570f70508c --- /dev/null +++ b/python/tests/contrib/test_propagator.py @@ -0,0 +1,333 @@ +# ============================================================================ # +# Copyright (c) 2022 - 2026 NVIDIA Corporation & Affiliates. # +# All rights reserved. # +# # +# This source code and the accompanying materials are made available under # +# the terms of the Apache License 2.0 which accompanies this distribution. # +# ============================================================================ # + +import numpy as np +import pytest + +import cudaq +import cudaq.contrib +from cudaq import ScalarOperator, Schedule, spin + +scipy_integrate = pytest.importorskip("scipy.integrate") +scipy_linalg = pytest.importorskip("scipy.linalg") + +if cudaq.num_available_gpus() == 0: + pytest.skip("cudaq.contrib.propagator requires the dynamics backend", + allow_module_level=True) + + +@pytest.fixture(autouse=True) +def dynamics_target(): + cudaq.set_target("dynamics") + yield + cudaq.reset_target() + + +_X = np.array([[0.0, 1.0], [1.0, 0.0]], dtype=np.complex128) +_Z = np.array([[1.0, 0.0], [0.0, -1.0]], dtype=np.complex128) + + +def _solve_propagator_reference(hamiltonian_matrix, t_final): + dimension = hamiltonian_matrix(0.0).shape[0] + + def rhs(t, flattened_u): + u = flattened_u.reshape((dimension, dimension)) + du = -1j * hamiltonian_matrix(t) @ u + return du.reshape(-1) + + initial = np.eye(dimension, dtype=np.complex128).reshape(-1) + solution = scipy_integrate.solve_ivp( + rhs, + (0.0, t_final), + initial, + rtol=1e-10, + atol=1e-12, + ) + + assert solution.success + return solution.y[:, -1].reshape((dimension, dimension)) + + +def test_propagator_constant_z_hamiltonian(): + omega = 0.3 + t_final = 1.2 + + hamiltonian = 0.5 * omega * spin.z(0) + schedule = Schedule(np.linspace(0.0, t_final, 21), ["t"]) + + computed = cudaq.contrib.propagator(hamiltonian, {0: 2}, schedule) + + expected = scipy_linalg.expm(-1j * 0.5 * omega * _Z * t_final) + + np.testing.assert_allclose(computed, expected, atol=1e-4) + np.testing.assert_allclose(computed.conj().T @ computed, + np.eye(2), + atol=1e-4) + + +def test_propagator_constant_x_hamiltonian(): + t_final = 0.4 + + hamiltonian = spin.x(0) + schedule = Schedule(np.linspace(0.0, t_final, 21), ["t"]) + + computed = cudaq.contrib.propagator(hamiltonian, {0: 2}, schedule) + expected = scipy_linalg.expm(-1j * _X * t_final) + + np.testing.assert_allclose(computed, expected, atol=1e-4) + + +def test_propagator_batched_hamiltonians(): + t_final = 0.7 + omegas = [0.2, 0.5] + + hamiltonians = [0.5 * omega * spin.z(0) for omega in omegas] + schedule = Schedule(np.linspace(0.0, t_final, 21), ["t"]) + + computed = cudaq.contrib.propagator( + hamiltonians, + {0: 2}, + schedule, + max_batch_size=2, + ) + + assert len(computed) == len(omegas) + + for propagator, omega in zip(computed, omegas): + expected = scipy_linalg.expm(-1j * 0.5 * omega * _Z * t_final) + np.testing.assert_allclose(propagator, expected, atol=1e-4) + + +def test_propagator_intermediate_results(): + omega = 0.3 + steps = np.linspace(0.0, 1.2, 7) + + hamiltonian = 0.5 * omega * spin.z(0) + schedule = Schedule(steps, ["t"]) + + computed = cudaq.contrib.propagator( + hamiltonian, + {0: 2}, + schedule, + store_intermediate_results=True, + ) + + assert len(computed) == len(steps) + + for propagator, t in zip(computed, steps): + expected = scipy_linalg.expm(-1j * 0.5 * omega * _Z * t) + np.testing.assert_allclose(propagator, expected, atol=1e-4) + + +def test_propagator_commuting_time_dependent_hamiltonian(): + omega0 = 0.2 + omega1 = 0.4 + t_final = 1.1 + + hamiltonian = (0.5 * omega0 * spin.z(0) + + 0.5 * omega1 * ScalarOperator(lambda t: t) * spin.z(0)) + schedule = Schedule(np.linspace(0.0, t_final, 31), ["t"]) + + computed = cudaq.contrib.propagator(hamiltonian, {0: 2}, schedule) + + phase_integral = omega0 * t_final + 0.5 * omega1 * t_final**2 + expected = scipy_linalg.expm(-1j * 0.5 * phase_integral * _Z) + + np.testing.assert_allclose(computed, expected, atol=1e-4) + + +def test_propagator_noncommuting_time_dependent_hamiltonian(): + ax = 0.2 + bz = 0.35 + t_final = 0.8 + + hamiltonian = (ax * ScalarOperator(lambda t: t) * spin.x(0) + + bz * ScalarOperator(lambda t: 1.0 - t) * spin.z(0)) + schedule = Schedule(np.linspace(0.0, t_final, 41), ["t"]) + + computed = cudaq.contrib.propagator(hamiltonian, {0: 2}, schedule) + + def hamiltonian_matrix(t): + return ax * t * _X + bz * (1.0 - t) * _Z + + expected = _solve_propagator_reference(hamiltonian_matrix, t_final) + + np.testing.assert_allclose(computed, expected, atol=1e-4) + + +def _amplitude_damping_liouvillian(gamma): + sigma_minus = np.array([[0.0, 0.0], [1.0, 0.0]], dtype=np.complex128) + sigma_plus = sigma_minus.conj().T + identity = np.eye(2, dtype=np.complex128) + + collapse_product = sigma_plus @ sigma_minus + + return (gamma * np.kron(sigma_minus.conj(), sigma_minus) - + 0.5 * gamma * np.kron(identity, collapse_product) - + 0.5 * gamma * np.kron(collapse_product.T, identity)) + + +def test_open_system_propagator_amplitude_damping(): + gamma = 0.2 + t_final = 0.7 + + hamiltonian = 0.0 * spin.z(0) + collapse_operators = [np.sqrt(gamma) * spin.minus(0)] + schedule = Schedule(np.linspace(0.0, t_final, 21), ["t"]) + + computed = cudaq.contrib.propagator( + hamiltonian, + {0: 2}, + schedule, + collapse_operators=collapse_operators, + ) + + expected = scipy_linalg.expm( + _amplitude_damping_liouvillian(gamma) * t_final) + + np.testing.assert_allclose(computed, expected, atol=1e-4) + + +def test_open_system_propagator_shape(): + gamma = 0.1 + t_final = 0.3 + + hamiltonian = 0.0 * spin.z(0) + collapse_operators = [np.sqrt(gamma) * spin.minus(0)] + schedule = Schedule(np.linspace(0.0, t_final, 11), ["t"]) + + computed = cudaq.contrib.propagator( + hamiltonian, + {0: 2}, + schedule, + collapse_operators=collapse_operators, + ) + + assert computed.shape == (4, 4) + + +def test_propagator_multi_degree_of_freedom(): + hamiltonian = 0.25 * spin.z(0) + 0.5 * spin.x(1) + 0.75 * spin.z(2) + dimensions = {0: 2, 1: 2, 2: 2} + schedule = Schedule(np.linspace(0.0, 0.4, 21), ["t"]) + + actual = cudaq.contrib.propagator(hamiltonian, dimensions, schedule) + + expected = scipy_linalg.expm(-1j * hamiltonian.to_matrix(dimensions) * 0.4) + + np.testing.assert_allclose(actual, expected, atol=1e-5) + + +def test_open_system_propagator_schedule_parameter_name(): + gamma = 0.2 + hamiltonian = 0.1 * spin.z(0) + collapse_operator = np.sqrt(gamma) * spin.minus(0) + computed = cudaq.contrib.propagator( + hamiltonian, + {0: 2}, + Schedule([0.0, 0.1], ["time"]), + collapse_operators=[collapse_operator], + ) + + assert computed.shape == (4, 4) + + +def test_open_system_propagator_nonzero_hamiltonian(): + gamma = 0.2 + omega = 0.5 + t_final = 0.7 + + hamiltonian = 0.5 * omega * spin.z(0) + collapse_operators = [np.sqrt(gamma) * spin.minus(0)] + schedule = Schedule(np.linspace(0.0, t_final, 21), ["t"]) + + computed = cudaq.contrib.propagator( + hamiltonian, + {0: 2}, + schedule, + collapse_operators=collapse_operators, + ) + + liouvillian = _amplitude_damping_liouvillian(gamma) + hamiltonian_matrix = hamiltonian.to_matrix({0: 2}) + coherent = (-1j * np.kron(np.eye(2), hamiltonian_matrix) + + 1j * np.kron(hamiltonian_matrix.T, np.eye(2))) + expected = scipy_linalg.expm((coherent + liouvillian) * t_final) + + np.testing.assert_allclose(computed, expected, atol=1e-4) + + +def test_open_system_propagator_batched_shared_collapse_operators(): + gamma = 0.2 + t_final = 0.7 + omegas = [0.2, 0.5] + + hamiltonians = [0.5 * omega * spin.z(0) for omega in omegas] + collapse_operators = [np.sqrt(gamma) * spin.minus(0)] + schedule = Schedule(np.linspace(0.0, t_final, 21), ["t"]) + + computed = cudaq.contrib.propagator( + hamiltonians, + {0: 2}, + schedule, + collapse_operators=collapse_operators, + ) + + assert len(computed) == len(hamiltonians) + for actual, hamiltonian in zip(computed, hamiltonians): + hamiltonian_matrix = hamiltonian.to_matrix({0: 2}) + coherent = (-1j * np.kron(np.eye(2), hamiltonian_matrix) + + 1j * np.kron(hamiltonian_matrix.T, np.eye(2))) + expected = scipy_linalg.expm( + (coherent + _amplitude_damping_liouvillian(gamma)) * t_final) + np.testing.assert_allclose(actual, expected, atol=1e-4) + + +def test_open_system_propagator_batched_collapse_operator_lists(): + gammas = [0.1, 0.2] + t_final = 0.7 + + hamiltonians = [0.0 * spin.z(0), 0.0 * spin.z(0)] + collapse_operators = [[np.sqrt(gamma) * spin.minus(0)] for gamma in gammas] + schedule = Schedule(np.linspace(0.0, t_final, 21), ["t"]) + + computed = cudaq.contrib.propagator( + hamiltonians, + {0: 2}, + schedule, + collapse_operators=collapse_operators, + ) + + assert len(computed) == len(gammas) + for actual, gamma in zip(computed, gammas): + expected = scipy_linalg.expm( + _amplitude_damping_liouvillian(gamma) * t_final) + np.testing.assert_allclose(actual, expected, atol=1e-4) + + +def test_open_system_propagator_intermediate_results(): + gamma = 0.2 + times = np.linspace(0.0, 0.7, 6) + + hamiltonian = 0.0 * spin.z(0) + collapse_operators = [np.sqrt(gamma) * spin.minus(0)] + schedule = Schedule(times, ["t"]) + + computed = cudaq.contrib.propagator( + hamiltonian, + {0: 2}, + schedule, + collapse_operators=collapse_operators, + store_intermediate_results=True, + ) + + assert len(computed) == len(times) + for actual, time in zip(computed, times): + expected = scipy_linalg.expm( + _amplitude_damping_liouvillian(gamma) * time) + np.testing.assert_allclose(actual, expected, atol=1e-4)