Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
package oap.logstream.disk;

import oap.compression.Compression;
import oap.io.IoStreams;
import oap.logstream.LogId;
import oap.logstream.formats.RowBinaryAssertion;
import oap.logstream.formats.rowbinary.RowBinaryUtils;
import oap.template.TemplateEngineFixture;
import oap.template.Types;
Expand All @@ -41,9 +43,14 @@
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static oap.logstream.LogStreamProtocol.CURRENT_PROTOCOL_VERSION;
import static oap.logstream.Timestamp.BPH_12;
import static oap.logstream.formats.RowBinaryAssertion.assertRowBinaryFile;
import static oap.logstream.formats.RowBinaryAssertion.header;
import static oap.logstream.formats.RowBinaryAssertion.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.joda.time.DateTimeZone.UTC;

Expand Down Expand Up @@ -101,4 +108,48 @@ public void testWrite() throws IOException {
List.of( "s112", 122L, List.of( "zz", "66" ), new DateTime( 2022, 3, 11, 15, 16, 15, UTC ) )
) );
}

@Test
public void testConcurrency() throws IOException {
Dates.setTimeFixed( 2022, 3, 8, 21, 11 );

String[] headers = new String[] { "COL1", "COL2", "COL3", "DATETIME" };
byte[][] types = new byte[][] { new byte[] { Types.STRING.id },
new byte[] { Types.LONG.id },
new byte[] { Types.LIST.id, Types.STRING.id },
new byte[] { Types.DATETIME.id }
};
LogId logId = new LogId( "", "log", "log", Map.of( "p", "1" ), headers, types );
Path logs = testDirectoryFixture.testPath( "logs" );

int count = 10;

try( RowBinaryWriter writer = new RowBinaryWriter( templateEngineFixture.templateEngine, logs, FILE_PATTERN, logId, 1024, BPH_12, 20, "localhost" ) ) {
try( ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor() ) {
for( long i = 0; i < count; i++ ) {

byte[] content = Compression.gzip( RowBinaryUtils.lines( List.of(
List.of( "s11", i, List.of( "1" ), new DateTime( 2022, 3, 11, 15, 16, 12, UTC ) )
) ) );

executorService.execute( () -> {
writer.write( CURRENT_PROTOCOL_VERSION, content );
} );
}
}
}

Path path = logs.resolve( "1-file-02-4cd64dae0-1.rb.gz.rb.gz" );

RowBinaryAssertion.Row[] rows = new RowBinaryAssertion.Row[count];
for( long i = 0; i < count; i++ ) {
rows[( int ) i] = row( i );
}

assertRowBinaryFile( path, IoStreams.Encoding.GZIP )
.containsExactlyInAnyOrderEntriesOf(
header( "COL2" ),
rows
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,23 @@ protected Path filename() {
}

protected void closeOutput() throws LoggerException {
if( out != null ) try {
stopwatch.count( out::close );

long fileSize = Files.size( outFilename );
log.trace( "closing output {} ({} bytes)", this, fileSize );
Metrics.summary( "logstream_logging_server_bucket_size" ).record( fileSize );
Metrics.summary( "logstream_logging_server_bucket_time_seconds" ).record( Dates.nanosToSeconds( stopwatch.elapsed() ) );
} catch( IOException e ) {
throw new LoggerException( e );
lock.lock();
try {
if( out != null ) try {
stopwatch.count( out::close );

long fileSize = Files.size( outFilename );
log.trace( "closing output {} ({} bytes)", this, fileSize );
Metrics.summary( "logstream_logging_server_bucket_size" ).record( fileSize );
Metrics.summary( "logstream_logging_server_bucket_time_seconds" ).record( Dates.nanosToSeconds( stopwatch.elapsed() ) );
} catch( IOException e ) {
throw new LoggerException( e );
} finally {
outFilename = null;
out = null;
}
} finally {
outFilename = null;
out = null;
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public RowBinaryWriter( TemplateEngine templateEngine, Path logDirectory, String

@Override
public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] buffer, int offset, int length ) throws LoggerException {
lock.lock();
try {
refresh();
Path filename = filename();
Expand Down Expand Up @@ -84,6 +85,8 @@ public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] b
out = null;
}
throw new LoggerException( e );
} finally {
lock.unlock();
}
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
</distributionManagement>

<properties>
<oap.project.version>25.6.8</oap.project.version>
<oap.project.version>25.6.9</oap.project.version>

<oap.deps.config.version>25.0.1</oap.deps.config.version>
<oap.deps.oap-teamcity.version>25.0.0</oap.deps.oap-teamcity.version>
Expand Down
Loading