org.apache.kafka
kafka-clients
diff --git a/src/main/java/dev/vality/testcontainers/annotations/clickhouse/ClickhouseContainerExtension.java b/src/main/java/dev/vality/testcontainers/annotations/clickhouse/ClickhouseContainerExtension.java
index ce116bb2..2dcb4b75 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/clickhouse/ClickhouseContainerExtension.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/clickhouse/ClickhouseContainerExtension.java
@@ -7,11 +7,13 @@
import org.testcontainers.containers.Network;
import org.testcontainers.utility.DockerImageName;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
+import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
@@ -76,7 +78,9 @@ private void executeMigration(Connection connection, String path) {
var split = sql.split(";");
for (var exec : split) {
if (exec != null && !exec.trim().isEmpty()) {
- connection.createStatement().execute(exec);
+ try (var statement = connection.createStatement()) {
+ statement.execute(exec);
+ }
}
}
} catch (SQLException e) {
@@ -87,11 +91,14 @@ private void executeMigration(Connection connection, String path) {
private String getFile(String fileName) {
var classLoader = ClickhouseContainerExtension.class.getClassLoader();
- try {
- return IOUtils.toString(classLoader.getResourceAsStream(fileName), StandardCharsets.UTF_8);
+ try (var inputStream = Optional.ofNullable(classLoader.getResourceAsStream(fileName))
+ .orElseThrow(() -> new ClickhouseStartingException(
+ "Migration file not found: " + fileName,
+ new FileNotFoundException(fileName)))) {
+ return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
} catch (IOException e) {
log.error("Error when getFile e: ", e);
- return "";
+ throw new ClickhouseStartingException("Error when reading migration file: " + fileName, e);
}
}
}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/clickhouse/ClickhouseTestcontainerFactory.java b/src/main/java/dev/vality/testcontainers/annotations/clickhouse/ClickhouseTestcontainerFactory.java
index ae4da713..4a30dea4 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/clickhouse/ClickhouseTestcontainerFactory.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/clickhouse/ClickhouseTestcontainerFactory.java
@@ -40,9 +40,7 @@ private ClickhouseContainerExtension getOrCreateSingletonContainer(String databa
}
private ClickhouseContainerExtension create(String databaseName, String[] migrations) {
- try (var container = new ClickhouseContainerExtension(databaseName, migrations)) {
- return container;
- }
+ return new ClickhouseContainerExtension(databaseName, migrations);
}
private static class SingletonHolder {
diff --git a/src/main/java/dev/vality/testcontainers/annotations/clickhouse/demo/DemoClickhouseTestcontainer.java b/src/main/java/dev/vality/testcontainers/annotations/clickhouse/demo/DemoClickhouseTestcontainer.java
deleted file mode 100644
index 36e28d7d..00000000
--- a/src/main/java/dev/vality/testcontainers/annotations/clickhouse/demo/DemoClickhouseTestcontainer.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package dev.vality.testcontainers.annotations.clickhouse.demo;
-
-import dev.vality.testcontainers.annotations.clickhouse.ClickhouseTestcontainer;
-
-/**
- * this is a demo example of filling in annotation, do not use
- */
-@ClickhouseTestcontainer(
- dbNameShouldBeDropped = "databaseName",
- properties = "clickhouse.db.connection.timeout=60000",
- migrations = {
- "sql/db_init.sql",
- "sql/V2__create_events_p2p.sql",
- "sql/V3__create_fraud_payments.sql",
- "sql/V4__create_payment.sql",
- "sql/V5__add_fields.sql",
- "sql/V6__add_result_fields_payment.sql",
- "sql/V7__add_fields.sql"})
-public @interface DemoClickhouseTestcontainer {
-}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTest.java b/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTest.java
new file mode 100644
index 00000000..8b250d11
--- /dev/null
+++ b/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTest.java
@@ -0,0 +1,100 @@
+package dev.vality.testcontainers.annotations.kafka;
+
+import dev.vality.testcontainers.annotations.KafkaTestConfig;
+import dev.vality.testcontainers.annotations.kafka.config.KafkaConsumer;
+import dev.vality.testcontainers.annotations.kafka.config.KafkaProducer;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.core.annotation.AliasFor;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.annotation.DirtiesContext;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Аннотация {@code @EmbeddedKafkaTest} подключает и запускает embedded Kafka
+ * {@link org.springframework.kafka.test.EmbeddedKafkaBroker}, также
+ * настройки embedded брокера будут проинициализированы в контекст тестового приложения
+ * Аннотация требует дополнительной конфигурации {@link EmbeddedKafkaTest#topics()}
+ *
Примеры
+ * В примере ниже создается обертка над аннотацией для конкретного приложения с инициализацией
+ * конкретных топиков приложения. Эту обертку можно позже переиспользовать для любых тестов,
+ * требующих embedded Kafka без запуска Docker контейнера
+ *
{@code
+ * @Target({ElementType.TYPE})
+ * @Retention(RetentionPolicy.RUNTIME)
+ * @EmbeddedKafkaTest(
+ * properties = {
+ * "kafka.topics.invoicing.consume.enabled=true",
+ * "kafka.topics.invoice-template.consume.enabled=true",
+ * "kafka.state.cache.size=0"},
+ * topics = {
+ * "magista-invoicing-test",
+ * "magista-invoice-template-test"})
+ * public @interface CustomEmbeddedKafkaTest {
+ * }}
+ * В примере ниже {@link EmbeddedKafkaTest} подключается напрямую
+ * к {@link SpringBootTest} для проведения теста консьюмера, который читает данные из топика
+ *
{@code
+ * @EmbeddedKafkaTest(
+ * properties = {
+ * "kafka.topics.invoicing.id=reporter-invoicing-test",
+ * "kafka.topics.invoicing.enabled=true"},
+ * topics = "reporter-invoicing-test")
+ * @SpringBootTest
+ * public class KafkaListenerTest {
+ *
+ * @Autowired
+ * private KafkaProducer> testThriftKafkaProducer;
+ *
+ * ...
+ * }}
+ *
+ * @see KafkaTestcontainer @KafkaTestcontainer
+ * @see KafkaTestcontainerSingleton @KafkaTestcontainerSingleton
+ * @see EmbeddedKafka @EmbeddedKafka
+ * @see KafkaProducer KafkaProducer
+ * @see KafkaConsumer KafkaConsumer
+ * @see KafkaTestConfig KafkaTestConfig
+ */
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@EmbeddedKafka
+@ExtendWith(EmbeddedKafkaTestExtension.class)
+@KafkaTestConfig
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+public @interface EmbeddedKafkaTest {
+
+ /**
+ * Аналогичный параметр как у аннотации {@link SpringBootTest#properties()}
+ *
+ * пример — properties = {"kafka.topics.invoicing.consume.enabled=true",...}
+ */
+ String[] properties() default {};
+
+ /**
+ * Обязательный параметр — здесь перечисляются имена топиков, которые требуется создать при старте embedded Kafka
+ *
+ * пример — topics = {"magista-invoicing-test",...}
+ */
+ @AliasFor(annotation = EmbeddedKafka.class, attribute = "topics")
+ String[] topics() default {};
+
+ /**
+ * Очищать топики между тестами
+ *
+ * @return true - данные между тестами удаляются из embedded Kafka
+ */
+ boolean cleanupTopics() default true;
+
+ /**
+ * Топики, которые не нужно очищать между тестами.
+ * Используется только если {@link #cleanupTopics()} = true
+ *
+ * пример — excludeCleanupTopics = {"magista-invoicing-test"}
+ */
+ String[] excludeCleanupTopics() default {};
+}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestContextCustomizerFactory.java b/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestContextCustomizerFactory.java
new file mode 100644
index 00000000..9cac00f3
--- /dev/null
+++ b/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestContextCustomizerFactory.java
@@ -0,0 +1,35 @@
+package dev.vality.testcontainers.annotations.kafka;
+
+import org.junit.platform.commons.support.AnnotationSupport;
+import org.springframework.boot.test.util.TestPropertyValues;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.test.context.ContextConfigurationAttributes;
+import org.springframework.test.context.ContextCustomizer;
+import org.springframework.test.context.ContextCustomizerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+public class EmbeddedKafkaTestContextCustomizerFactory implements ContextCustomizerFactory {
+
+ @Override
+ public ContextCustomizer createContextCustomizer(
+ Class> testClass,
+ List configAttributes) {
+ return (context, mergedConfig) ->
+ findAnnotation(testClass).ifPresent(annotation -> init(context, annotation));
+ }
+
+ private Optional findAnnotation(Class> testClass) {
+ return AnnotationSupport.findAnnotation(testClass, EmbeddedKafkaTest.class);
+ }
+
+ private void init(ConfigurableApplicationContext context, EmbeddedKafkaTest annotation) {
+ TestPropertyValues.of(
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "kafka.ssl.enabled=false")
+ .and(annotation.properties())
+ .applyTo(context);
+ }
+}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestExtension.java b/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestExtension.java
new file mode 100644
index 00000000..1b4fa670
--- /dev/null
+++ b/src/main/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestExtension.java
@@ -0,0 +1,96 @@
+package dev.vality.testcontainers.annotations.kafka;
+
+import lombok.SneakyThrows;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.platform.commons.support.AnnotationSupport;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+public class EmbeddedKafkaTestExtension implements BeforeEachCallback {
+
+ private static final int WAIT_TIMEOUT_SECONDS = 10;
+
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ AnnotationSupport.findAnnotation(context.getTestClass(), EmbeddedKafkaTest.class)
+ .filter(EmbeddedKafkaTest::cleanupTopics)
+ .ifPresent(annotation -> cleanupTopics(context, annotation));
+ }
+
+ @SneakyThrows
+ private void cleanupTopics(ExtensionContext context, EmbeddedKafkaTest annotation) {
+ var topics = Arrays.stream(annotation.topics())
+ .filter(topic -> !List.of(annotation.excludeCleanupTopics()).contains(topic))
+ .toList();
+ if (topics.isEmpty()) {
+ return;
+ }
+
+ var applicationContext = SpringExtension.getApplicationContext(context);
+ var embeddedKafkaBroker = applicationContext.getBean(EmbeddedKafkaBroker.class);
+ try (var adminClient = AdminClient.create(buildAdminProperties(embeddedKafkaBroker))) {
+ var existingTopics = adminClient.listTopics().names().get(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ var missingTopics = topics.stream()
+ .filter(topic -> !existingTopics.contains(topic))
+ .toList();
+ if (!missingTopics.isEmpty()) {
+ embeddedKafkaBroker.addTopics(missingTopics.toArray(String[]::new));
+ }
+ var recordsToDelete = buildRecordsToDelete(embeddedKafkaBroker, topics);
+ if (!recordsToDelete.isEmpty()) {
+ adminClient.deleteRecords(recordsToDelete).all().get(WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ }
+ }
+ }
+
+ private Map buildRecordsToDelete(
+ EmbeddedKafkaBroker embeddedKafkaBroker,
+ List topics) {
+ try (var consumer = new KafkaConsumer(buildConsumerProperties(embeddedKafkaBroker))) {
+ var topicPartitions = topics.stream()
+ .flatMap(topic -> buildTopicPartitions(topic, embeddedKafkaBroker.getPartitionsPerTopic()).stream())
+ .toList();
+ consumer.assign(topicPartitions);
+ var endOffsets = consumer.endOffsets(topicPartitions);
+ var recordsToDelete = new HashMap();
+ endOffsets.forEach((topicPartition, offset) -> {
+ if (offset > 0) {
+ recordsToDelete.put(topicPartition, RecordsToDelete.beforeOffset(offset));
+ }
+ });
+ return recordsToDelete;
+ }
+ }
+
+ private List buildTopicPartitions(String topic, int partitions) {
+ return java.util.stream.IntStream.range(0, partitions)
+ .mapToObj(partition -> new TopicPartition(topic, partition))
+ .toList();
+ }
+
+ private Properties buildAdminProperties(EmbeddedKafkaBroker embeddedKafkaBroker) {
+ var properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
+ return properties;
+ }
+
+ private Properties buildConsumerProperties(EmbeddedKafkaBroker embeddedKafkaBroker) {
+ var properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, "embedded-kafka-cleanup-" + UUID.randomUUID());
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ return properties;
+ }
+}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaContainerExtension.java b/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaContainerExtension.java
index bd15dae6..66307727 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaContainerExtension.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaContainerExtension.java
@@ -38,7 +38,17 @@ default void createTopics(List excludedTopics) {
var topics = topics().stream()
.filter(topic -> !excludedTopics.contains(topic))
.toList();
- var newTopics = topics.stream()
+ if (topics.isEmpty()) {
+ return;
+ }
+ var existingTopics = admin.listTopics().names().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
+ var topicsToCreate = topics.stream()
+ .filter(topic -> !existingTopics.contains(topic))
+ .toList();
+ if (topicsToCreate.isEmpty()) {
+ return;
+ }
+ var newTopics = topicsToCreate.stream()
.map(topic -> new NewTopic(topic, 1, (short) 1))
.peek(newTopic -> log.info(newTopic.toString()))
.collect(Collectors.toList());
@@ -47,12 +57,12 @@ default void createTopics(List excludedTopics) {
.atMost(Duration.ofSeconds(WAIT_TIMEOUT))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> topicsResult.all().get(1, TimeUnit.SECONDS));
- var adminClientTopics = admin.listTopics().names().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
- log.info("Topics list from 'AdminClient' after [TOPICS CREATED]: {}", adminClientTopics);
- assertThat(adminClientTopics.size())
- .isEqualTo(topics.size());
+ var topicsAfterCreate = admin.listTopics().names().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
+ log.info("Topics list from 'AdminClient' after [TOPICS CREATED]: {}", topicsAfterCreate);
+ assertThat(topicsAfterCreate)
+ .containsAll(topicsToCreate);
var actual = execInContainerKafkaTopicsListCommand();
- assertThat(topics.stream().allMatch(actual::contains))
+ assertThat(topicsToCreate.stream().allMatch(actual::contains))
.isTrue();
} catch (ExecutionException | TimeoutException ex) {
throw new KafkaStartingException("Error when topic creating, ", ex);
@@ -64,24 +74,30 @@ default void createTopics(List excludedTopics) {
default void deleteTopics(List excludedTopics) {
try (var admin = createAdminClient()) {
- var adminClientTopics = admin.listTopics().names().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
- if (adminClientTopics.isEmpty()) {
+ var existingTopics = admin.listTopics().names().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
+ if (existingTopics.isEmpty()) {
return;
}
var topics = topics().stream()
.filter(topic -> !excludedTopics.contains(topic))
.toList();
- var topicsResult = admin.deleteTopics(topics);
+ var topicsToDelete = topics.stream()
+ .filter(existingTopics::contains)
+ .toList();
+ if (topicsToDelete.isEmpty()) {
+ return;
+ }
+ var topicsResult = admin.deleteTopics(topicsToDelete);
Awaitility.await()
.atMost(Duration.ofSeconds(WAIT_TIMEOUT))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(() -> topicsResult.all().get(1, TimeUnit.SECONDS));
- adminClientTopics = admin.listTopics().names().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
- log.info("Topics list from 'AdminClient' after [TOPICS DELETED]: {} (should be empty)", adminClientTopics);
- assertThat(adminClientTopics)
- .isEmpty();
+ var topicsAfterDelete = admin.listTopics().names().get(WAIT_TIMEOUT, TimeUnit.SECONDS);
+ log.info("Topics list from 'AdminClient' after [TOPICS DELETED]: {}", topicsAfterDelete);
+ assertThat(topicsAfterDelete.stream().noneMatch(topicsToDelete::contains))
+ .isTrue();
var actual = execInContainerKafkaTopicsListCommand();
- assertThat(topics.stream().noneMatch(actual::contains))
+ assertThat(topicsToDelete.stream().noneMatch(actual::contains))
.isTrue();
} catch (ExecutionException | TimeoutException ex) {
throw new KafkaStartingException("Error when topic deleting, ", ex);
diff --git a/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerExtension.java b/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerExtension.java
index 99bcb134..d1eb2d37 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerExtension.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerExtension.java
@@ -69,10 +69,14 @@ public void beforeAll(ExtensionContext context) {
var container = KafkaTestcontainerFactory.singletonContainer(annotation.provider(), topics);
if (!container.isRunning()) {
GenericContainerUtil.startContainer(container);
- } else {
- container.deleteTopics(List.of());
+ container.createTopics(List.of());
+ } else if (annotation.truncateTopics()) {
+ var excludedTopics = Optional.ofNullable(annotation.excludeTruncateTopics())
+ .map(List::of)
+ .orElse(List.of());
+ container.deleteTopics(excludedTopics);
+ container.createTopics(excludedTopics);
}
- container.createTopics(List.of());
THREAD_CONTAINER.set(container);
}
}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerFactory.java b/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerFactory.java
index 528e1df3..5c3aa57b 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerFactory.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/kafka/KafkaTestcontainerFactory.java
@@ -44,16 +44,8 @@ private KafkaContainerExtension getOrCreateSingletonContainer(Provider provider,
private KafkaContainerExtension create(Provider provider, List topics) {
return switch (provider) {
- case APACHE -> {
- try (var container = new ApacheKafkaContainer(topics)) {
- yield container;
- }
- }
- case CONFLUENT -> {
- try (var container = new ConfluentKafkaContainer(topics)) {
- yield container;
- }
- }
+ case APACHE -> new ApacheKafkaContainer(topics);
+ case CONFLUENT -> new ConfluentKafkaContainer(topics);
};
}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/kafka/demo/DemoKafkaTestcontainer.java b/src/main/java/dev/vality/testcontainers/annotations/kafka/demo/DemoKafkaTestcontainer.java
deleted file mode 100644
index 2a94c094..00000000
--- a/src/main/java/dev/vality/testcontainers/annotations/kafka/demo/DemoKafkaTestcontainer.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package dev.vality.testcontainers.annotations.kafka.demo;
-
-import dev.vality.testcontainers.annotations.kafka.KafkaTestcontainer;
-
-/**
- * this is a demo example of filling in annotation, do not use
- */
-@KafkaTestcontainer(
- properties = "kafka.topics.invoicing.consume.enabled=true",
- topicsKeys = "kafka.topics.invoicing.id")
-public @interface DemoKafkaTestcontainer {
-}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/minio/MinioTestcontainerFactory.java b/src/main/java/dev/vality/testcontainers/annotations/minio/MinioTestcontainerFactory.java
index 30997cb9..9945356b 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/minio/MinioTestcontainerFactory.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/minio/MinioTestcontainerFactory.java
@@ -54,7 +54,7 @@ private GenericContainer> getOrCreateSingletonContainer() {
}
private GenericContainer> create() {
- try (GenericContainer> container = new GenericContainer<>(
+ GenericContainer> container = new GenericContainer<>(
DockerImageName
.parse(MINIO_IMAGE_NAME)
.withTag(loadDefaultLibraryProperty(TAG_PROPERTY)))
@@ -62,11 +62,10 @@ private GenericContainer> create() {
.withEnv("MINIO_ROOT_USER", loadDefaultLibraryProperty(MINIO_USER))
.withEnv("MINIO_ROOT_PASSWORD", loadDefaultLibraryProperty(MINIO_PASSWORD))
.withCommand("server /data{1...12}")
- .waitingFor(getWaitStrategy("/minio/health/live", 200, 9000, Duration.ofMinutes(1)))) {
- container.withNetworkAliases("minio-" + UUID.randomUUID());
- container.withNetwork(Network.SHARED);
- return container;
- }
+ .waitingFor(getWaitStrategy("/minio/health/live", 200, 9000, Duration.ofMinutes(1)));
+ container.withNetworkAliases("minio-" + UUID.randomUUID());
+ container.withNetwork(Network.SHARED);
+ return container;
}
private static class SingletonHolder {
diff --git a/src/main/java/dev/vality/testcontainers/annotations/opensearch/OpensearchTestcontainerExtension.java b/src/main/java/dev/vality/testcontainers/annotations/opensearch/OpensearchTestcontainerExtension.java
index 54ed723f..471a470c 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/opensearch/OpensearchTestcontainerExtension.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/opensearch/OpensearchTestcontainerExtension.java
@@ -47,10 +47,10 @@ public void beforeEach(ExtensionContext context) {
var container = THREAD_CONTAINER.get();
if (container != null && container.isRunning()) {
var builder = RestClient.builder(new HttpHost(container.getHost(), container.getFirstMappedPort()));
- var client = builder.build();
- var deleteRequest = new Request("DELETE", "/*");
- client.performRequest(deleteRequest);
- client.close();
+ try (var client = builder.build()) {
+ var deleteRequest = new Request("DELETE", "/*");
+ client.performRequest(deleteRequest);
+ }
}
}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/opensearch/OpensearchTestcontainerFactory.java b/src/main/java/dev/vality/testcontainers/annotations/opensearch/OpensearchTestcontainerFactory.java
index 140aec87..37a9859b 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/opensearch/OpensearchTestcontainerFactory.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/opensearch/OpensearchTestcontainerFactory.java
@@ -42,21 +42,20 @@ private GenericContainer> getOrCreateSingletonContainer() {
}
private GenericContainer> create() {
- try (var container = new GenericContainer<>(
+ var container = new GenericContainer<>(
DockerImageName
.parse(OPENSEARCH_IMAGE_NAME)
- .withTag(loadDefaultLibraryProperty(TAG_PROPERTY)))) {
- container.withNetworkAliases("opensearch-" + UUID.randomUUID());
- container.withNetwork(Network.SHARED);
- container.withExposedPorts(9200, 9600);
- container.setWaitStrategy((new HttpWaitStrategy())
- .forPort(9200)
- .forStatusCodeMatching(response -> response == 200 || response == 401));
- container.withEnv("discovery.type", "single-node");
- container.withEnv("DISABLE_INSTALL_DEMO_CONFIG", "true");
- container.withEnv("DISABLE_SECURITY_PLUGIN", "true");
- return container;
- }
+ .withTag(loadDefaultLibraryProperty(TAG_PROPERTY)));
+ container.withNetworkAliases("opensearch-" + UUID.randomUUID());
+ container.withNetwork(Network.SHARED);
+ container.withExposedPorts(9200, 9600);
+ container.setWaitStrategy((new HttpWaitStrategy())
+ .forPort(9200)
+ .forStatusCodeMatching(response -> response == 200 || response == 401));
+ container.withEnv("discovery.type", "single-node");
+ container.withEnv("DISABLE_INSTALL_DEMO_CONFIG", "true");
+ container.withEnv("DISABLE_SECURITY_PLUGIN", "true");
+ return container;
}
private static class SingletonHolder {
diff --git a/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTest.java b/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTest.java
new file mode 100644
index 00000000..61e3c77a
--- /dev/null
+++ b/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTest.java
@@ -0,0 +1,97 @@
+package dev.vality.testcontainers.annotations.postgresql;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.annotation.DirtiesContext;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Аннотация {@code @EmbeddedPostgresqlTest} подключает и запускает embedded PostgreSQL
+ * {@link io.zonky.test.db.postgres.embedded.EmbeddedPostgres}, также
+ * настройки embedded базы данных будут проинициализированы в контекст тестового приложения
+ * Аннотация не требует дополнительной конфигурации
+ *
Примеры
+ * В примере ниже создается обертка над аннотацией для конкретного приложения с инициализацией
+ * дополнительных Spring properties. Эту обертку можно позже переиспользовать для любых тестов,
+ * требующих embedded PostgreSQL без запуска Docker контейнера
+ *
{@code
+ * @Target({ElementType.TYPE})
+ * @Retention(RetentionPolicy.RUNTIME)
+ * @EmbeddedPostgresqlTest(
+ * properties = {
+ * "spring.flyway.schemas=public",
+ * "spring.jpa.hibernate.ddl-auto=none"})
+ * public @interface CustomEmbeddedPostgresqlTest {
+ * }}
+ * В примере ниже {@link EmbeddedPostgresqlTest} подключается напрямую
+ * к {@link SpringBootTest} для проведения теста DAO слоя, при котором идет запись и чтение данных из базы данных
+ *
{@code
+ * @EmbeddedPostgresqlTest
+ * @SpringBootTest
+ * public class AdjustmentDaoTest {
+ *
+ * @Autowired
+ * private AdjustmentDao adjustmentDao;
+ *
+ * ...
+ * }}
+ *
+ * @see PostgresqlTestcontainer @PostgresqlTestcontainer
+ * @see PostgresqlTestcontainerSingleton @PostgresqlTestcontainerSingleton
+ * @see EmbeddedPostgresqlTestExtension EmbeddedPostgresqlTestExtension
+ * @see io.zonky.test.db.postgres.embedded.EmbeddedPostgres EmbeddedPostgres
+ */
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+@ExtendWith(EmbeddedPostgresqlTestExtension.class)
+@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
+public @interface EmbeddedPostgresqlTest {
+
+ /**
+ * Аналогичный параметр как у аннотации {@link SpringBootTest#properties()}
+ *
+ * пример — properties = {"spring.flyway.schemas=public",...}
+ */
+ String[] properties() default {};
+
+ /**
+ * Имя embedded PostgreSQL базы данных
+ *
+ * пример — database = "postgres"
+ */
+ String database() default "postgres";
+
+ /**
+ * Имя embedded PostgreSQL пользователя
+ *
+ * пример — username = "postgres"
+ */
+ String username() default "postgres";
+
+ /**
+ * Пароль embedded PostgreSQL пользователя.
+ * Zonky embedded PostgreSQL по умолчанию использует пустой пароль
+ *
+ * пример — password = ""
+ */
+ String password() default "";
+
+ /**
+ * Очищать таблицы между тестами
+ *
+ * @return true - данные между тестами удаляются из embedded PostgreSQL
+ */
+ boolean truncateTables() default true;
+
+ /**
+ * Таблицы, которые не нужно очищать между тестами.
+ * Используется только если {@link #truncateTables()} = true
+ *
+ * пример — excludeTruncateTables = {"schema_history", "reference_data"}
+ */
+ String[] excludeTruncateTables() default {};
+}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestContextCustomizerFactory.java b/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestContextCustomizerFactory.java
new file mode 100644
index 00000000..1b766aff
--- /dev/null
+++ b/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestContextCustomizerFactory.java
@@ -0,0 +1,45 @@
+package dev.vality.testcontainers.annotations.postgresql;
+
+import org.springframework.boot.test.util.TestPropertyValues;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.test.context.ContextConfigurationAttributes;
+import org.springframework.test.context.ContextCustomizer;
+import org.springframework.test.context.ContextCustomizerFactory;
+
+import java.util.List;
+
+public class EmbeddedPostgresqlTestContextCustomizerFactory implements ContextCustomizerFactory {
+
+ @Override
+ public ContextCustomizer createContextCustomizer(
+ Class> testClass,
+ List configAttributes) {
+ return (context, mergedConfig) ->
+ EmbeddedPostgresqlTestExtension.findAnnotation(testClass)
+ .ifPresent(annotation -> init(context, annotation));
+ }
+
+ private void init(ConfigurableApplicationContext context, EmbeddedPostgresqlTest annotation) {
+ var postgresql = EmbeddedPostgresqlTestExtension.getOrStart(annotation);
+ var jdbcUrl = postgresql.jdbcUrl();
+ var username = annotation.username();
+ var password = annotation.password();
+ TestPropertyValues.of(
+ "spring.datasource.url=" + jdbcUrl,
+ "spring.datasource.username=" + username,
+ "spring.datasource.password=" + password,
+ "spring.flyway.url=" + jdbcUrl,
+ "spring.flyway.user=" + username,
+ "spring.flyway.password=" + password,
+ "postgres.db.url=" + jdbcUrl,
+ "postgres.db.user=" + username,
+ "postgres.db.username=" + username,
+ "postgres.db.password=" + password,
+ "flyway.url=" + jdbcUrl,
+ "flyway.user=" + username,
+ "flyway.password=" + password,
+ "flyway.postgresql.transactional.lock=false")
+ .and(annotation.properties())
+ .applyTo(context);
+ }
+}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestExtension.java b/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestExtension.java
new file mode 100644
index 00000000..d3fe7392
--- /dev/null
+++ b/src/main/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestExtension.java
@@ -0,0 +1,80 @@
+package dev.vality.testcontainers.annotations.postgresql;
+
+import io.zonky.test.db.postgres.embedded.EmbeddedPostgres;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.platform.commons.support.AnnotationSupport;
+
+import java.util.List;
+import java.util.Optional;
+
+public class EmbeddedPostgresqlTestExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback {
+
+ private static final ThreadLocal THREAD_POSTGRESQL = new ThreadLocal<>();
+
+ @Override
+ public void beforeAll(ExtensionContext context) {
+ findAnnotation(context).ifPresent(annotation -> THREAD_POSTGRESQL.set(getOrStart(annotation)));
+ }
+
+ @Override
+ public void beforeEach(ExtensionContext context) {
+ findAnnotation(context).ifPresent(annotation -> {
+ if (annotation.truncateTables()) {
+ var postgresql = getOrStart(annotation);
+ PostgresqlDatabaseCleaner.cleanupDatabaseTables(
+ postgresql.jdbcUrl(),
+ annotation.username(),
+ annotation.password(),
+ List.of(annotation.excludeTruncateTables()));
+ }
+ });
+ }
+
+ @Override
+ public void afterAll(ExtensionContext context) {
+ findAnnotation(context).ifPresent(annotation -> {
+ var postgresql = THREAD_POSTGRESQL.get();
+ THREAD_POSTGRESQL.remove();
+ if (postgresql != null) {
+ postgresql.close();
+ }
+ });
+ }
+
+ private static Optional findAnnotation(ExtensionContext context) {
+ return AnnotationSupport.findAnnotation(context.getTestClass(), EmbeddedPostgresqlTest.class);
+ }
+
+ static Optional findAnnotation(Class> testClass) {
+ return AnnotationSupport.findAnnotation(testClass, EmbeddedPostgresqlTest.class);
+ }
+
+ static EmbeddedPostgresql getOrStart(EmbeddedPostgresqlTest annotation) {
+ var postgresql = THREAD_POSTGRESQL.get();
+ if (postgresql == null) {
+ postgresql = EmbeddedPostgresql.start(annotation);
+ THREAD_POSTGRESQL.set(postgresql);
+ }
+ return postgresql;
+ }
+
+ record EmbeddedPostgresql(EmbeddedPostgres delegate, String jdbcUrl) {
+
+ @SneakyThrows
+ private static EmbeddedPostgresql start(EmbeddedPostgresqlTest annotation) {
+ var postgres = EmbeddedPostgres.start();
+ return new EmbeddedPostgresql(
+ postgres,
+ postgres.getJdbcUrl(annotation.database(), annotation.username()));
+ }
+
+ @SneakyThrows
+ private void close() {
+ delegate.close();
+ }
+ }
+}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlContainerExtension.java b/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlContainerExtension.java
index aeacc350..2ab2d5a9 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlContainerExtension.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlContainerExtension.java
@@ -6,9 +6,8 @@
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.*;
+import java.util.List;
+import java.util.UUID;
import static dev.vality.testcontainers.annotations.util.SpringApplicationPropertiesLoader.loadDefaultLibraryProperty;
@@ -17,13 +16,6 @@ public class PostgresqlContainerExtension extends PostgreSQLContainer EXCLUDE_SCHEMAS = Set.of("information_schema");
- private static final String PG_ = "pg_";
- private static final String SQL_ = "sql_";
public PostgresqlContainerExtension() {
super(DockerImageName
@@ -35,62 +27,6 @@ public PostgresqlContainerExtension() {
@SneakyThrows
public void cleanupDatabaseTables(List excludedTables) {
- try (var connection = DriverManager.getConnection(getJdbcUrl(), getUsername(), getPassword())) {
- for (var schema : getSchemas(connection)) {
- var tables = getUserTables(connection, schema, excludedTables);
- if (!tables.isEmpty()) {
- truncateTables(connection, schema, tables);
- }
- }
- }
- }
-
- @SneakyThrows
- private Set getSchemas(Connection connection) {
- var schemas = new HashSet();
- try (
- var statement = connection.createStatement();
- var resultSet = statement.executeQuery(CURRENT_SCHEMA_QUERY)) {
- while (resultSet.next()) {
- var schema = resultSet.getString("schema_name");
- if (!EXCLUDE_SCHEMAS.contains(schema)
- && !schema.startsWith(PG_) && !schema.startsWith(SQL_)) {
- schemas.add(schema);
- }
- }
- }
- return schemas;
- }
-
- @SneakyThrows
- private List getUserTables(Connection connection, String schema, List excludedTables) {
- var tables = new ArrayList();
- try (var statement = connection.prepareStatement(TABLES_QUERY)) {
- statement.setString(1, schema);
- try (var resultSet = statement.executeQuery()) {
- while (resultSet.next()) {
- var tableName = resultSet.getString("tablename");
- boolean isTruncatable = !tableName.startsWith(PG_)
- && !tableName.startsWith(SQL_)
- && !excludedTables.contains(tableName);
- if (isTruncatable) {
- tables.add(tableName);
- }
- }
- }
- }
- return tables;
- }
-
- @SneakyThrows
- private void truncateTables(Connection connection, String schema, List tables) {
- try (var statement = connection.createStatement()) {
- statement.execute("SET session_replication_role = 'replica'");
- for (var table : tables) {
- log.debug("Truncating table: {}", table);
- statement.execute(String.format(TRUNCATE_TABLE_QUERY, schema, table));
- }
- statement.execute("SET session_replication_role = 'origin'");
- }
+ PostgresqlDatabaseCleaner.cleanupDatabaseTables(getJdbcUrl(), getUsername(), getPassword(), excludedTables);
}
}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlDatabaseCleaner.java b/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlDatabaseCleaner.java
new file mode 100644
index 00000000..530db85d
--- /dev/null
+++ b/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlDatabaseCleaner.java
@@ -0,0 +1,91 @@
+package dev.vality.testcontainers.annotations.postgresql;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PostgresqlDatabaseCleaner {
+
+ private static final String CURRENT_SCHEMA_QUERY = "SELECT schema_name FROM information_schema.schemata";
+ private static final String TABLES_QUERY = "SELECT tablename FROM pg_tables " +
+ "WHERE schemaname = ? AND tablename NOT LIKE 'flyway%'AND tablename NOT LIKE 'schema_version'";
+ private static final String TRUNCATE_TABLE_QUERY = "TRUNCATE TABLE %s.%s CASCADE";
+ private static final Set EXCLUDE_SCHEMAS = Set.of("information_schema");
+ private static final String PG_ = "pg_";
+ private static final String SQL_ = "sql_";
+
+ @SneakyThrows
+ public static void cleanupDatabaseTables(
+ String jdbcUrl,
+ String username,
+ String password,
+ List excludedTables) {
+ try (var connection = DriverManager.getConnection(jdbcUrl, username, password)) {
+ for (var schema : getSchemas(connection)) {
+ var tables = getUserTables(connection, schema, excludedTables);
+ if (!tables.isEmpty()) {
+ truncateTables(connection, schema, tables);
+ }
+ }
+ }
+ }
+
+ @SneakyThrows
+ private static Set getSchemas(Connection connection) {
+ var schemas = new HashSet();
+ try (
+ var statement = connection.createStatement();
+ var resultSet = statement.executeQuery(CURRENT_SCHEMA_QUERY)) {
+ while (resultSet.next()) {
+ var schema = resultSet.getString("schema_name");
+ if (!EXCLUDE_SCHEMAS.contains(schema)
+ && !schema.startsWith(PG_) && !schema.startsWith(SQL_)) {
+ schemas.add(schema);
+ }
+ }
+ }
+ return schemas;
+ }
+
+ @SneakyThrows
+ private static List getUserTables(Connection connection, String schema, List excludedTables) {
+ var tables = new ArrayList();
+ try (var statement = connection.prepareStatement(TABLES_QUERY)) {
+ statement.setString(1, schema);
+ try (var resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ var tableName = resultSet.getString("tablename");
+ boolean isTruncatable = !tableName.startsWith(PG_)
+ && !tableName.startsWith(SQL_)
+ && !excludedTables.contains(tableName);
+ if (isTruncatable) {
+ tables.add(tableName);
+ }
+ }
+ }
+ }
+ return tables;
+ }
+
+ @SneakyThrows
+ private static void truncateTables(Connection connection, String schema, List tables) {
+ try (var statement = connection.createStatement()) {
+ statement.execute("SET session_replication_role = 'replica'");
+ for (var table : tables) {
+ log.debug("Truncating table: {}", table);
+ statement.execute(String.format(TRUNCATE_TABLE_QUERY, schema, table));
+ }
+ statement.execute("SET session_replication_role = 'origin'");
+ }
+ }
+}
diff --git a/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlTestcontainerExtension.java b/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlTestcontainerExtension.java
index 4d5b9a66..e52e5a31 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlTestcontainerExtension.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlTestcontainerExtension.java
@@ -50,8 +50,11 @@ public void beforeAll(ExtensionContext context) {
var container = PostgresqlTestcontainerFactory.singletonContainer();
if (!container.isRunning()) {
GenericContainerUtil.startContainer(container);
- } else {
- container.cleanupDatabaseTables(List.of());
+ } else if (findSingletonAnnotation(context).get().truncateTables()) {
+ var excludedTables = Optional.ofNullable(findSingletonAnnotation(context).get().excludeTruncateTables())
+ .map(List::of)
+ .orElse(List.of());
+ container.cleanupDatabaseTables(excludedTables);
}
THREAD_CONTAINER.set(container);
diff --git a/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlTestcontainerFactory.java b/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlTestcontainerFactory.java
index 418804ca..c3df4630 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlTestcontainerFactory.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/postgresql/PostgresqlTestcontainerFactory.java
@@ -38,9 +38,7 @@ private PostgresqlContainerExtension getOrCreateSingletonContainer() {
}
private PostgresqlContainerExtension create() {
- try (var container = new PostgresqlContainerExtension()) {
- return container;
- }
+ return new PostgresqlContainerExtension();
}
private static class SingletonHolder {
diff --git a/src/main/java/dev/vality/testcontainers/annotations/util/SpringApplicationPropertiesLoader.java b/src/main/java/dev/vality/testcontainers/annotations/util/SpringApplicationPropertiesLoader.java
index e7c22018..9f29d00a 100644
--- a/src/main/java/dev/vality/testcontainers/annotations/util/SpringApplicationPropertiesLoader.java
+++ b/src/main/java/dev/vality/testcontainers/annotations/util/SpringApplicationPropertiesLoader.java
@@ -30,7 +30,12 @@ public class SpringApplicationPropertiesLoader {
);
public static String loadDefaultLibraryProperty(String key) {
- var tag = loadPropertiesByFile().get(key);
+ Object tag;
+ try {
+ tag = loadPropertiesByFile().get(key);
+ } catch (NoSuchFileException ex) {
+ tag = null;
+ }
if (tag == null) {
tag = getSource(findPropertiesFileParametersByName("testcontainers-annotations")).get(key);
}
diff --git a/src/main/resources/META-INF/spring.factories b/src/main/resources/META-INF/spring.factories
index 06273234..269b58e6 100644
--- a/src/main/resources/META-INF/spring.factories
+++ b/src/main/resources/META-INF/spring.factories
@@ -2,6 +2,8 @@
org.springframework.test.context.ContextCustomizerFactory=\
dev.vality.testcontainers.annotations.postgresql.PostgresqlTestcontainerExtension.PostgresqlTestcontainerContextCustomizerFactory,\
dev.vality.testcontainers.annotations.kafka.KafkaTestcontainerExtension.KafkaTestcontainerContextCustomizerFactory,\
+dev.vality.testcontainers.annotations.kafka.EmbeddedKafkaTestContextCustomizerFactory,\
dev.vality.testcontainers.annotations.clickhouse.ClickhouseTestcontainerExtension.ClickhouseTestcontainerContextCustomizerFactory,\
dev.vality.testcontainers.annotations.minio.MinioTestcontainerExtension.MinioTestcontainerContextCustomizerFactory,\
-dev.vality.testcontainers.annotations.opensearch.OpensearchTestcontainerExtension.OpensearchTestcontainerContextCustomizerFactory
+dev.vality.testcontainers.annotations.opensearch.OpensearchTestcontainerExtension.OpensearchTestcontainerContextCustomizerFactory,\
+dev.vality.testcontainers.annotations.postgresql.EmbeddedPostgresqlTestContextCustomizerFactory
diff --git a/src/test/java/dev/vality/testcontainers/annotations/TestcontainerAnnotationsTest.java b/src/test/java/dev/vality/testcontainers/annotations/TestcontainerAnnotationsTest.java
deleted file mode 100644
index 442ee411..00000000
--- a/src/test/java/dev/vality/testcontainers/annotations/TestcontainerAnnotationsTest.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package dev.vality.testcontainers.annotations;
-
-import org.junit.jupiter.api.Test;
-
-public class TestcontainerAnnotationsTest {
-
- @Test
- public void setUp() {
-
- }
-}
diff --git a/src/test/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestIntegrationTest.java b/src/test/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestIntegrationTest.java
new file mode 100644
index 00000000..547b3378
--- /dev/null
+++ b/src/test/java/dev/vality/testcontainers/annotations/kafka/EmbeddedKafkaTestIntegrationTest.java
@@ -0,0 +1,125 @@
+package dev.vality.testcontainers.annotations.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.env.Environment;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@EmbeddedKafkaTest(
+ topics = EmbeddedKafkaTestIntegrationTest.TOPIC,
+ properties = {
+ "app.kafka.topic=" + EmbeddedKafkaTestIntegrationTest.TOPIC,
+ "spring.kafka.consumer.group-id=embedded-kafka-test"
+ }
+)
+@SpringBootTest(classes = EmbeddedKafkaTestIntegrationTest.Config.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class EmbeddedKafkaTestIntegrationTest {
+
+ static final String TOPIC = "embedded-kafka-test-topic";
+
+ @Autowired
+ private Environment environment;
+
+ @Autowired
+ private EmbeddedKafkaBroker embeddedKafkaBroker;
+
+ @Test
+ @Order(1)
+ void shouldStartEmbeddedKafkaAndExposeBootstrapProperties() throws Exception {
+ var bootstrapServers = environment.getRequiredProperty("spring.kafka.bootstrap-servers");
+
+ assertThat(bootstrapServers)
+ .isEqualTo(embeddedKafkaBroker.getBrokersAsString());
+ assertThat(environment.getRequiredProperty("kafka.bootstrap-servers"))
+ .isEqualTo(embeddedKafkaBroker.getBrokersAsString());
+ assertThat(environment.getRequiredProperty("kafka.ssl.enabled"))
+ .isEqualTo("false");
+ assertThat(environment.getRequiredProperty("app.kafka.topic"))
+ .isEqualTo(TOPIC);
+
+ try (var adminClient = AdminClient.create(adminProperties(bootstrapServers))) {
+ assertThat(adminClient.listTopics().names().get(10, TimeUnit.SECONDS))
+ .contains(TOPIC);
+ }
+
+ try (var producer = new KafkaProducer(producerProperties(bootstrapServers))) {
+ producer.send(new ProducerRecord<>(TOPIC, "key", "value")).get(10, TimeUnit.SECONDS);
+ producer.flush();
+ }
+
+ assertThat(readValues("first-reader", bootstrapServers))
+ .contains("value");
+ }
+
+ @Test
+ @Order(2)
+ void shouldCleanupTopicsBeforeEachTest() {
+ var bootstrapServers = environment.getRequiredProperty("spring.kafka.bootstrap-servers");
+
+ assertThat(readValues("second-reader", bootstrapServers))
+ .isEmpty();
+ }
+
+ private Properties adminProperties(String bootstrapServers) {
+ var properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ return properties;
+ }
+
+ private Properties producerProperties(String bootstrapServers) {
+ var properties = new Properties();
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.ACKS_CONFIG, "all");
+ return properties;
+ }
+
+ private List readValues(String groupId, String bootstrapServers) {
+ try (var consumer = new KafkaConsumer(consumerProperties(groupId, bootstrapServers))) {
+ consumer.subscribe(List.of(TOPIC));
+ var values = new ArrayList();
+ for (var record : consumer.poll(Duration.ofMillis(500)).records(TOPIC)) {
+ values.add(record.value());
+ }
+ return values;
+ }
+ }
+
+ private Properties consumerProperties(String groupId, String bootstrapServers) {
+ var properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ return properties;
+ }
+
+ @Configuration
+ static class Config {
+ }
+}
diff --git a/src/test/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestIntegrationTest.java b/src/test/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestIntegrationTest.java
new file mode 100644
index 00000000..5e7bc4f7
--- /dev/null
+++ b/src/test/java/dev/vality/testcontainers/annotations/postgresql/EmbeddedPostgresqlTestIntegrationTest.java
@@ -0,0 +1,82 @@
+package dev.vality.testcontainers.annotations.postgresql;
+
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.core.env.Environment;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@EmbeddedPostgresqlTest
+@SpringBootTest(classes = EmbeddedPostgresqlTestIntegrationTest.Config.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
+class EmbeddedPostgresqlTestIntegrationTest {
+
+ private static final String TABLE_NAME = "embedded_postgresql_test";
+
+ @Autowired
+ private Environment environment;
+
+ @Test
+ @Order(1)
+ void shouldStartEmbeddedPostgresqlAndExposeDatasourceProperties() throws Exception {
+ assertThat(environment.getRequiredProperty("spring.datasource.url"))
+ .startsWith("jdbc:postgresql://localhost:");
+ assertThat(environment.getRequiredProperty("spring.datasource.username"))
+ .isEqualTo("postgres");
+ assertThat(environment.getRequiredProperty("spring.datasource.password"))
+ .isEmpty();
+ assertThat(environment.getRequiredProperty("postgres.db.url"))
+ .isEqualTo(environment.getRequiredProperty("spring.datasource.url"));
+
+ try (var connection = connection()) {
+ execute(connection, "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (id INT PRIMARY KEY)");
+ execute(connection, "INSERT INTO " + TABLE_NAME + " (id) VALUES (1)");
+
+ assertThat(countRows(connection))
+ .isEqualTo(1);
+ }
+ }
+
+ @Test
+ @Order(2)
+ void shouldCleanupTablesBeforeEachTest() throws Exception {
+ try (var connection = connection()) {
+ assertThat(countRows(connection))
+ .isZero();
+ }
+ }
+
+ private Connection connection() throws Exception {
+ return DriverManager.getConnection(
+ environment.getRequiredProperty("spring.datasource.url"),
+ environment.getRequiredProperty("spring.datasource.username"),
+ environment.getRequiredProperty("spring.datasource.password"));
+ }
+
+ private void execute(Connection connection, String sql) throws Exception {
+ try (var statement = connection.createStatement()) {
+ statement.execute(sql);
+ }
+ }
+
+ private int countRows(Connection connection) throws Exception {
+ try (
+ var statement = connection.createStatement();
+ var resultSet = statement.executeQuery("SELECT COUNT(*) FROM " + TABLE_NAME)) {
+ resultSet.next();
+ return resultSet.getInt(1);
+ }
+ }
+
+ @Configuration
+ static class Config {
+ }
+}