From 416488089da2fbaed014e7e8b67a560bed5f4948 Mon Sep 17 00:00:00 2001 From: "igor.petrenko" Date: Thu, 7 May 2026 09:44:19 +0300 Subject: [PATCH] CE-159 oap-logstream: fix concurrency --- .../logstream/disk/RowBinaryWriterTest.java | 51 +++++++++++++++++++ .../oap/logstream/disk/AbstractWriter.java | 27 ++++++---- .../oap/logstream/disk/RowBinaryWriter.java | 3 ++ pom.xml | 2 +- 4 files changed, 71 insertions(+), 12 deletions(-) diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java index 053a1c88f0..97313eef42 100644 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java @@ -25,7 +25,9 @@ package oap.logstream.disk; import oap.compression.Compression; +import oap.io.IoStreams; import oap.logstream.LogId; +import oap.logstream.formats.RowBinaryAssertion; import oap.logstream.formats.rowbinary.RowBinaryUtils; import oap.template.TemplateEngineFixture; import oap.template.Types; @@ -41,9 +43,14 @@ import java.nio.file.Path; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import static oap.logstream.LogStreamProtocol.CURRENT_PROTOCOL_VERSION; import static oap.logstream.Timestamp.BPH_12; +import static oap.logstream.formats.RowBinaryAssertion.assertRowBinaryFile; +import static oap.logstream.formats.RowBinaryAssertion.header; +import static oap.logstream.formats.RowBinaryAssertion.row; import static org.assertj.core.api.Assertions.assertThat; import static org.joda.time.DateTimeZone.UTC; @@ -101,4 +108,48 @@ public void testWrite() throws IOException { List.of( "s112", 122L, List.of( "zz", "66" ), new DateTime( 2022, 3, 11, 15, 16, 15, UTC ) ) ) ); } + + @Test + public void testConcurrency() throws IOException { + Dates.setTimeFixed( 2022, 3, 8, 21, 11 ); + + String[] headers = new String[] { "COL1", "COL2", "COL3", "DATETIME" }; + byte[][] types = new byte[][] { new byte[] { Types.STRING.id }, + new byte[] { Types.LONG.id }, + new byte[] { Types.LIST.id, Types.STRING.id }, + new byte[] { Types.DATETIME.id } + }; + LogId logId = new LogId( "", "log", "log", Map.of( "p", "1" ), headers, types ); + Path logs = testDirectoryFixture.testPath( "logs" ); + + int count = 10; + + try( RowBinaryWriter writer = new RowBinaryWriter( templateEngineFixture.templateEngine, logs, FILE_PATTERN, logId, 1024, BPH_12, 20, "localhost" ) ) { + try( ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor() ) { + for( long i = 0; i < count; i++ ) { + + byte[] content = Compression.gzip( RowBinaryUtils.lines( List.of( + List.of( "s11", i, List.of( "1" ), new DateTime( 2022, 3, 11, 15, 16, 12, UTC ) ) + ) ) ); + + executorService.execute( () -> { + writer.write( CURRENT_PROTOCOL_VERSION, content ); + } ); + } + } + } + + Path path = logs.resolve( "1-file-02-4cd64dae0-1.rb.gz.rb.gz" ); + + RowBinaryAssertion.Row[] rows = new RowBinaryAssertion.Row[count]; + for( long i = 0; i < count; i++ ) { + rows[( int ) i] = row( i ); + } + + assertRowBinaryFile( path, IoStreams.Encoding.GZIP ) + .containsExactlyInAnyOrderEntriesOf( + header( "COL2" ), + rows + ); + } } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java index 6d63ca6003..581cabc42d 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java @@ -153,18 +153,23 @@ protected Path filename() { } protected void closeOutput() throws LoggerException { - if( out != null ) try { - stopwatch.count( out::close ); - - long fileSize = Files.size( outFilename ); - log.trace( "closing output {} ({} bytes)", this, fileSize ); - Metrics.summary( "logstream_logging_server_bucket_size" ).record( fileSize ); - Metrics.summary( "logstream_logging_server_bucket_time_seconds" ).record( Dates.nanosToSeconds( stopwatch.elapsed() ) ); - } catch( IOException e ) { - throw new LoggerException( e ); + lock.lock(); + try { + if( out != null ) try { + stopwatch.count( out::close ); + + long fileSize = Files.size( outFilename ); + log.trace( "closing output {} ({} bytes)", this, fileSize ); + Metrics.summary( "logstream_logging_server_bucket_size" ).record( fileSize ); + Metrics.summary( "logstream_logging_server_bucket_time_seconds" ).record( Dates.nanosToSeconds( stopwatch.elapsed() ) ); + } catch( IOException e ) { + throw new LoggerException( e ); + } finally { + outFilename = null; + out = null; + } } finally { - outFilename = null; - out = null; + lock.unlock(); } } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java index b754d5d11b..71bc85226b 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java @@ -28,6 +28,7 @@ public RowBinaryWriter( TemplateEngine templateEngine, Path logDirectory, String @Override public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException { + lock.lock(); try { refresh(); Path filename = filename(); @@ -84,6 +85,8 @@ public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] b out = null; } throw new LoggerException( e ); + } finally { + lock.unlock(); } } } diff --git a/pom.xml b/pom.xml index 041ad076fd..17af7259e0 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.6.8 + 25.6.9 25.0.1 25.0.0