diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/AbstractFinisherTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/AbstractFinisherTest.java deleted file mode 100644 index ee00ef024f..0000000000 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/AbstractFinisherTest.java +++ /dev/null @@ -1,107 +0,0 @@ -package oap.logstream.disk; - -import oap.logstream.Timestamp; -import oap.storage.cloud.FileSystemConfiguration; -import oap.testng.Fixtures; -import oap.testng.TestDirectoryFixture; -import oap.util.Dates; -import org.joda.time.DateTime; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.attribute.FileTime; -import java.util.List; -import java.util.Map; - -import static oap.util.Pair.__; -import static org.assertj.core.api.Assertions.assertThat; -import static org.joda.time.DateTimeZone.UTC; - -public class AbstractFinisherTest extends Fixtures { - private final TestDirectoryFixture testDirectoryFixture; - - public AbstractFinisherTest() { - testDirectoryFixture = fixture( new TestDirectoryFixture() ); - } - - @Test - public void testSort() throws IOException { - long safeInterval = 10; - Timestamp timestamp = Timestamp.BPH_6; - - Path logs = testDirectoryFixture.testPath( "logs" ); - Files.createDirectory( logs ); - MockFinisher finisher = new MockFinisher( new FileSystemConfiguration( Map.of( - "fs.default.clouds.scheme", "s3", "fs.default.jclouds.container", "test" ) ), logs, safeInterval, List.of( "*.txt" ), timestamp ); - finisher.start(); - - finisher.priorityByType.put( "type2", 10 ); - - Path file11 = Files.createFile( logs.resolve( "file1-type1.txt" ) ); - Path file12 = Files.createFile( logs.resolve( "file1-type2.txt" ) ); - Path file21 = Files.createFile( logs.resolve( "file2-type1.txt" ) ); - Path file22 = Files.createFile( logs.resolve( "file2-type2.txt" ) ); - - LogMetadata type1 = new LogMetadata( "", "type1", "", Map.of(), new String[] {}, new byte[][] {} ); - LogMetadata type2 = new LogMetadata( "", "type2", "", Map.of(), new String[] {}, new byte[][] {} ); - - type1.writeFor( file11 ); - type2.writeFor( file12 ); - type1.writeFor( file21 ); - type2.writeFor( file22 ); - - long time = new DateTime( 2025, 4, 6, 14, 13, 39, 0, UTC ).getMillis(); - Files.setLastModifiedTime( file11, FileTime.fromMillis( time + 1 ) ); - Files.setLastModifiedTime( file12, FileTime.fromMillis( time + 2 ) ); - Files.setLastModifiedTime( file21, FileTime.fromMillis( time + 3 ) ); - Files.setLastModifiedTime( file22, FileTime.fromMillis( time + 4 ) ); - - Dates.setTimeFixed( time + Dates.m( 60 / timestamp.bucketsPerHour ) + safeInterval + 10 ); - - - finisher.run(); - - assertThat( finisher.files ).hasSize( 4 ); - - assertThat( finisher.files.subList( 0, 2 ) ).containsAnyOf( - __( file12, new DateTime( 2025, 4, 6, 14, 10, 0, 0, UTC ) ), - __( file22, new DateTime( 2025, 4, 6, 14, 10, 0, 0, UTC ) ) - ); - - assertThat( finisher.files.subList( 2, 4 ) ).containsAnyOf( - __( file11, new DateTime( 2025, 4, 6, 14, 10, 0, 0, UTC ) ), - __( file21, new DateTime( 2025, 4, 6, 14, 10, 0, 0, UTC ) ) - ); - } - - @Test - public void testSafeInterval() throws IOException { - long safeInterval = Dates.s( 30 ); - - Timestamp timestamp = Timestamp.BPH_6; - - Path logs = testDirectoryFixture.testPath( "logs" ); - Files.createDirectory( logs ); - MockFinisher finisher = new MockFinisher( new FileSystemConfiguration( Map.of( - "fs.default.clouds.scheme", "s3", "fs.default.jclouds.container", "test" ) ), logs, safeInterval, List.of( "*.txt" ), timestamp ); - finisher.start(); - - Path file11 = Files.createFile( logs.resolve( "file1-type1-2025-04-05-15-02-10m.txt" ) ); - - LogMetadata type1 = new LogMetadata( "", "type1", "", Map.of(), new String[] {}, new byte[][] {} ); - type1.writeFor( file11 ); - - Files.setLastModifiedTime( file11, FileTime.fromMillis( new DateTime( 2025, 4, 5, 15, 20, 31, 0, UTC ).getMillis() ) ); - - Dates.setTimeFixed( 2025, 4, 5, 15, 21, 0, 0 ); - - System.out.println( timestamp.currentBucket( new DateTime( UTC ) ) ); - System.out.println( timestamp.toStartOfBucket( new DateTime( UTC ) ) ); - - finisher.run(); - - assertThat( finisher.files ).isEmpty(); - } -} diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogFileTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogFileTest.java new file mode 100644 index 0000000000..d904be68c9 --- /dev/null +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogFileTest.java @@ -0,0 +1,285 @@ +package oap.logstream.disk; + +import oap.logstream.LogId; +import oap.template.Types; +import oap.testng.Fixtures; +import oap.testng.TestDirectoryFixture; +import org.joda.time.DateTime; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.attribute.FileTime; +import java.util.Map; + +import static oap.testng.Asserts.assertFile; +import static org.assertj.core.api.Assertions.assertThat; +import static org.joda.time.DateTimeZone.UTC; + + +public class LogFileTest extends Fixtures { + private final TestDirectoryFixture testDirectoryFixture; + + public LogFileTest() { + testDirectoryFixture = fixture( new TestDirectoryFixture() ); + } + + private static LogId newLogId() { + return new LogId( "fpp", "type", "host", Map.of(), + new String[] { "h1" }, new byte[][] { new byte[] { Types.STRING.id } } ); + } + + @Test + public void testSave() { + Path file = testDirectoryFixture.testPath( "file" ); + + new LogFile( file ).create( new LogId( "fpp", "type", "host", Map.of(), + new String[] { "h1", "h2" }, new byte[][] { new byte[] { Types.STRING.id }, new byte[] { Types.STRING.id } } ) ); + + assertFile( Path.of( file + ".metadata.yaml" ) ).hasContent( """ + --- + filePrefixPattern: "fpp" + type: "type" + clientHostname: "host" + headers: + - "h1" + - "h2" + types: + - - 11 + - - 11 + """ ); + } + + @Test + public void testLoadWithoutHeaders() throws IOException { + Files.writeString( testDirectoryFixture.testPath( "file.gz.metadata.yaml" ), """ + --- + filePrefixPattern: "fpp" + type: "type" + clientHostname: "host" + """ ); + + LogFile logFile = new LogFile( testDirectoryFixture.testPath( "file.gz" ) ); + + LogMetadata metadata = logFile.getLogMetadata(); + assertThat( metadata.headers ).isNull(); + assertThat( metadata.types ).isNull(); + } + + @Test + public void testSaveLoad() { + Path file = testDirectoryFixture.testPath( "file" ); + + LogFile logFile = new LogFile( file ).create( new LogId( "fpp", "type", "host", Map.of(), + new String[] { "h1", "h2" }, new byte[][] { new byte[] { Types.STRING.id }, new byte[] { Types.STRING.id } } ) ); + + DateTime dt = new DateTime( 2019, 11, 29, 10, 9, 0, 0, UTC ); + logFile.addProperty( "time", dt.toString() ); + + LogMetadata newLm = new LogFile( file ).getLogMetadata(); + assertThat( newLm.getDateTime( "time" ) ).isEqualTo( dt ); + } + + @Test + public void testLoadFromPath() { + Path base = testDirectoryFixture.testPath( "file.gz" ); + LogFile logFile = new LogFile( base ); + + assertThat( LogFile.loadFromPath( logFile.pathFor( LogFile.EXTENSION_LOG_METADATA ) ).outFilename ).isEqualTo( base ); + assertThat( LogFile.loadFromPath( logFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ) ).outFilename ).isEqualTo( base ); + assertThat( LogFile.loadFromPath( logFile.pathFor( LogFile.EXTENSION_LOG_COMPLETED ) ).outFilename ).isEqualTo( base ); + assertThat( LogFile.loadFromPath( base ).outFilename ).isEqualTo( base ); + } + + @Test + public void testBeginTransactionNoFile() { + assertThat( new LogFile( testDirectoryFixture.testPath( "file" ) ).beginTransaction() ).isEqualTo( 0L ); + } + + @Test + public void testCommitTransaction() throws IOException { + Path file = testDirectoryFixture.testPath( "file" ); + LogFile logFile = new LogFile( file ); + logFile.commitTransaction( 10 ); + assertThat( Files.readString( logFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ) ) ).isEqualTo( "10" ); + logFile.commitTransaction( 5 ); + assertThat( Files.readString( logFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ) ) ).isEqualTo( "15" ); + } + + @Test + public void testReadyForUpload() { + Path file = testDirectoryFixture.testPath( "file" ); + LogFile logFile = new LogFile( file ); + logFile.readyForUpload(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_COMPLETED ) ).exists(); + logFile.readyForUpload(); + } + + @Test + public void testIsCompleted() { + Path file = testDirectoryFixture.testPath( "file" ); + LogFile logFile = new LogFile( file ); + assertThat( logFile.isCompleted() ).isFalse(); + logFile.readyForUpload(); + assertThat( logFile.isCompleted() ).isTrue(); + } + + @Test + public void testIsValid() { + Path file = testDirectoryFixture.testPath( "file" ); + LogFile logFile = new LogFile( file ); + assertThat( logFile.isValid() ).isFalse(); + logFile.create( newLogId() ); + assertThat( logFile.isValid() ).isFalse(); + logFile.commitTransaction( 0 ); + assertThat( logFile.isValid() ).isTrue(); + logFile.close(); + } + + @Test + public void testExistsAndValid() { + Path file = testDirectoryFixture.testPath( "file" ); + LogFile logFile = new LogFile( file ); + assertThat( logFile.existsAndValid() ).isFalse(); + + logFile.create( newLogId() ); + logFile.commitTransaction( 0 ); + assertThat( logFile.existsAndValid() ).isTrue(); + + logFile.readyForUpload(); + assertThat( logFile.existsAndValid() ).isTrue(); + + logFile.close(); + } + + @Test + public void testWriteAndCommitTransaction() throws IOException { + Path file = testDirectoryFixture.testPath( "file" ); + byte[] data = "hello".getBytes( StandardCharsets.UTF_8 ); + + LogFile logFile = new LogFile( file ).create( newLogId() ); + logFile.writeAndCommitTransaction( data, 0, data.length ); + + assertThat( Files.readAllBytes( file ) ).isEqualTo( data ); + assertThat( Files.readString( logFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ) ) ) + .isEqualTo( String.valueOf( data.length ) ); + logFile.close(); + } + + @Test + public void testBeginTransactionWriteAndCommitTransaction() throws IOException { + Path file = testDirectoryFixture.testPath( "file" ); + byte[] first = "hello".getBytes( StandardCharsets.UTF_8 ); + byte[] second = "world".getBytes( StandardCharsets.UTF_8 ); + + LogFile logFile = new LogFile( file ).create( newLogId() ); + logFile.beginTransactionWriteAndCommitTransaction( first, 0, first.length ); + logFile.beginTransactionWriteAndCommitTransaction( second, 0, second.length ); + + assertThat( Files.readAllBytes( file ) ).isEqualTo( "helloworld".getBytes( StandardCharsets.UTF_8 ) ); + logFile.close(); + } + + @Test + public void testGetDataSize() { + Path file = testDirectoryFixture.testPath( "file" ); + byte[] data = "hello".getBytes( StandardCharsets.UTF_8 ); + + LogFile logFile = new LogFile( file ).create( newLogId() ); + logFile.writeAndCommitTransaction( data, 0, data.length ); + + assertThat( logFile.getDataSize() ).isEqualTo( data.length ); + logFile.close(); + } + + @Test + public void testGetMaxModificationTime() throws IOException { + Path file = testDirectoryFixture.testPath( "file" ); + LogFile logFile = new LogFile( file ); + assertThat( logFile.getMaxModificationTime() ).isEqualTo( -1L ); + + logFile.create( newLogId() ); + logFile.commitTransaction( 0 ); + logFile.readyForUpload(); + + Files.setLastModifiedTime( file, FileTime.fromMillis( 1_000_000L ) ); + Files.setLastModifiedTime( logFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ), FileTime.fromMillis( 2_000_000L ) ); + Files.setLastModifiedTime( logFile.pathFor( LogFile.EXTENSION_LOG_METADATA ), FileTime.fromMillis( 3_000_000L ) ); + Files.setLastModifiedTime( logFile.pathFor( LogFile.EXTENSION_LOG_COMPLETED ), FileTime.fromMillis( 4_000_000L ) ); + + assertThat( logFile.getMaxModificationTime() ).isEqualTo( 4_000_000L ); + logFile.close(); + } + + @Test + public void testClose() { + Path file = testDirectoryFixture.testPath( "file" ); + new LogFile( file ).close(); + + LogFile logFile = new LogFile( file ).create( newLogId() ); + logFile.close(); + } + + @Test + public void testGetTransactionPosition() { + Path file = testDirectoryFixture.testPath( "file" ); + LogFile logFile = new LogFile( file ); + assertThat( logFile.getTransactionPosition() ).isEqualTo( 0L ); + + logFile.commitTransaction( 10 ); + assertThat( logFile.getTransactionPosition() ).isEqualTo( 10L ); + + logFile.commitTransaction( 5 ); + assertThat( logFile.getTransactionPosition() ).isEqualTo( 15L ); + } + + @Test + public void testMoveTo() { + Path baseDir = testDirectoryFixture.testPath( "base" ); + Path destDir = testDirectoryFixture.testPath( "dest" ); + + Path file = baseDir.resolve( "a/b/file.gz" ); + LogFile logFile = new LogFile( file ).create( newLogId() ); + logFile.commitTransaction( 0 ); + logFile.readyForUpload(); + logFile.close(); + + logFile.moveTo( destDir, baseDir ); + + assertThat( file ).doesNotExist(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_METADATA ) ).doesNotExist(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ) ).doesNotExist(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_COMPLETED ) ).doesNotExist(); + + LogFile destLogFile = new LogFile( destDir.resolve( "a/b/file.gz" ) ); + assertThat( destLogFile.outFilename ).exists(); + assertThat( destLogFile.pathFor( LogFile.EXTENSION_LOG_METADATA ) ).exists(); + assertThat( destLogFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ) ).exists(); + assertThat( destLogFile.pathFor( LogFile.EXTENSION_LOG_COMPLETED ) ).exists(); + } + + @Test + public void testDelete() { + Path file = testDirectoryFixture.testPath( "file" ); + LogFile logFile = new LogFile( file ).create( newLogId() ); + logFile.commitTransaction( 0 ); + logFile.readyForUpload(); + logFile.close(); + + assertThat( file ).exists(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_METADATA ) ).exists(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ) ).exists(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_COMPLETED ) ).exists(); + + logFile.delete(); + + assertThat( file ).doesNotExist(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_METADATA ) ).doesNotExist(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_TRANSACTION ) ).doesNotExist(); + assertThat( logFile.pathFor( LogFile.EXTENSION_LOG_COMPLETED ) ).doesNotExist(); + + logFile.delete(); + } +} diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogMetadataTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogMetadataTest.java deleted file mode 100644 index d8d51cd098..0000000000 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/LogMetadataTest.java +++ /dev/null @@ -1,83 +0,0 @@ -package oap.logstream.disk; - -import oap.template.Types; -import oap.testng.Fixtures; -import oap.testng.TestDirectoryFixture; -import org.joda.time.DateTime; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Map; - -import static oap.testng.Asserts.assertFile; -import static org.assertj.core.api.Assertions.assertThat; -import static org.joda.time.DateTimeZone.UTC; - - -public class LogMetadataTest extends Fixtures { - private final TestDirectoryFixture testDirectoryFixture; - - public LogMetadataTest() { - testDirectoryFixture = fixture( new TestDirectoryFixture() ); - } - - @Test - public void testSave() { - Path file = testDirectoryFixture.testPath( "file" ); - - LogMetadata metadata = new LogMetadata( "fpp", "type", "host", Map.of(), - new String[] { "h1", "h2" }, new byte[][] { new byte[] { Types.STRING.id }, new byte[] { Types.STRING.id } } ); - metadata.writeFor( file ); - - assertFile( Path.of( file.toString() + ".metadata.yaml" ) ).hasContent( """ - --- - filePrefixPattern: "fpp" - type: "type" - clientHostname: "host" - headers: - - "h1" - - "h2" - types: - - - 11 - - - 11 - """ ); - } - - @Test - public void testLoadWithoutHeaders() throws IOException { - Files.writeString( testDirectoryFixture.testPath( "file.gz.metadata.yaml" ), """ - --- - filePrefixPattern: "fpp" - type: "type" - clientHostname: "host" - """ ); - - LogMetadata metadata = LogMetadata.readFor( testDirectoryFixture.testPath( "file.gz" ) ); - assertThat( metadata.headers ).isNull(); - assertThat( metadata.types ).isNull(); - } - - @Test - public void testSaveLoad() { - Path file = testDirectoryFixture.testPath( "file" ); - - LogMetadata metadata = new LogMetadata( "fpp", "type", "host", Map.of(), - new String[] { "h1", "h2" }, new byte[][] { new byte[] { Types.STRING.id }, new byte[] { Types.STRING.id } } ); - metadata.writeFor( file ); - - DateTime dt = new DateTime( 2019, 11, 29, 10, 9, 0, 0, UTC ); - LogMetadata.addProperty( file, "time", dt.toString() ); - - LogMetadata newLm = LogMetadata.readFor( file ); - assertThat( newLm.getDateTime( "time" ) ).isEqualTo( dt ); - } - - @Test - public void testPathForDataFromMetadata() { - Path metadataFilePath = testDirectoryFixture.testPath( "file.gz.metadata.yaml" ); - - assertThat( LogMetadata.pathForDataFromMetadata( metadataFilePath ) ).isEqualTo( testDirectoryFixture.testPath( "file.gz" ) ); - } -} diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/MockFinisher.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/MockFinisher.java deleted file mode 100644 index 4a4a6ad5f9..0000000000 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/MockFinisher.java +++ /dev/null @@ -1,30 +0,0 @@ -package oap.logstream.disk; - -import oap.logstream.Timestamp; -import oap.storage.cloud.FileSystem; -import oap.storage.cloud.FileSystemConfiguration; -import oap.util.Pair; -import org.joda.time.DateTime; - -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; - -import static oap.util.Pair.__; - -public class MockFinisher extends AbstractFinisher { - public final ArrayList> files = new ArrayList<>(); - - protected MockFinisher( FileSystemConfiguration fileSystemConfiguration, Path sourceDirectory, long safeInterval, List mask, Timestamp timestamp ) { - super( fileSystemConfiguration, sourceDirectory, safeInterval, mask, timestamp ); - } - - @Override - protected void cleanup() { - } - - @Override - protected void process( FileSystem fileSystem, Path path, DateTime bucketTime ) { - files.add( __( path, bucketTime ) ); - } -} diff --git a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java index 97313eef42..5221ce919d 100644 --- a/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java +++ b/oap-formats/oap-logstream/oap-logstream-test/src/test/java/oap/logstream/disk/RowBinaryWriterTest.java @@ -152,4 +152,42 @@ public void testConcurrency() throws IOException { rows ); } + + @Test + public void testWriteToNewVersionWhenCompleted() throws IOException { + Dates.setTimeFixed( 2022, 3, 8, 21, 11 ); + + String[] headers = new String[] { "COL1" }; + byte[][] types = new byte[][] { new byte[] { Types.STRING.id } }; + LogId logId = new LogId( "", "log", "log", Map.of( "p", "1" ), headers, types ); + Path logs = testDirectoryFixture.testPath( "logs" ); + + byte[] content1 = Compression.gzip( RowBinaryUtils.lines( List.of( List.of( "row1" ) ) ) ); + byte[] content2 = Compression.gzip( RowBinaryUtils.lines( List.of( List.of( "row2" ) ) ) ); + + Path v1 = logs.resolve( "1-file-02-47b82ddc0-1.rb.gz.rb.gz" ); + Path v2 = logs.resolve( "1-file-02-47b82ddc0-2.rb.gz.rb.gz" ); + + try( RowBinaryWriter writer = new RowBinaryWriter( templateEngineFixture.templateEngine, logs, FILE_PATTERN, logId, 1024, BPH_12, 20, "localhost" ) ) { + writer.write( CURRENT_PROTOCOL_VERSION, content1 ); + + writer.refresh(); + assertThat( v1 ).exists(); + assertThat( new LogFile( v1 ).isCompleted() ).isFalse(); + + // refresh(true) forces closeOutput() — closes v1 and marks it completed via readyForUpload() + writer.refresh( true ); + assertThat( v1 ).exists(); + assertThat( new LogFile( v1 ).isCompleted() ).isTrue(); + + // v1 is completed — next write must go to v2 + writer.write( CURRENT_PROTOCOL_VERSION, content2 ); + assertThat( v2 ).exists(); + assertThat( new LogFile( v2 ).isCompleted() ).isFalse(); + } + + byte[] rb = Compression.ungzip( Files.readAllBytes( v2 ) ); + Pair>, List> read = RowBinaryUtils.read( rb, 0, rb.length, null, null ); + assertThat( read._1 ).contains( List.of( "row2" ) ); + } } diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/CompletedLogLoggerException.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/CompletedLogLoggerException.java new file mode 100644 index 0000000000..2733c94309 --- /dev/null +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/CompletedLogLoggerException.java @@ -0,0 +1,39 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) Open Application Platform Authors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package oap.logstream; + +public class CompletedLogLoggerException extends LoggerException { + public CompletedLogLoggerException( String message ) { + super( message ); + } + + public CompletedLogLoggerException( Throwable cause ) { + super( cause ); + } + + public CompletedLogLoggerException( String message, Throwable cause ) { + super( message, cause ); + } +} diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogIdTemplate.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogIdTemplate.java index c445aadbd4..46e7a7f8e5 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogIdTemplate.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/LogIdTemplate.java @@ -66,7 +66,7 @@ public String render( TemplateEngine templateEngine, String template, DateTime t context.putAll( variables ); - return templateEngine.getRuntimeTemplate( "LogIdTemplate", new TypeRef>() {}, template, TemplateAccumulators.STRING, null, null ).render( context ).get(); + return templateEngine.getTemplate( "LogIdTemplate_" + logId.logType, new TypeRef>() {}, template, TemplateAccumulators.STRING, null, null ).render( context ).get(); } public void init( Map context, DateTime time, Timestamp timestamp, int version, String hostname ) { diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractFinisher.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractFinisher.java deleted file mode 100644 index edf32c8b27..0000000000 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractFinisher.java +++ /dev/null @@ -1,173 +0,0 @@ -package oap.logstream.disk; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import lombok.AllArgsConstructor; -import lombok.SneakyThrows; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; -import oap.concurrent.Executors; -import oap.concurrent.ThreadPoolExecutor; -import oap.io.Files; -import oap.logstream.Timestamp; -import oap.storage.cloud.FileSystem; -import oap.storage.cloud.FileSystemConfiguration; -import oap.util.Dates; -import oap.util.Lists; -import org.joda.time.DateTime; -import org.joda.time.DateTimeUtils; -import org.joda.time.Duration; - -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -import static org.joda.time.DateTimeZone.UTC; - -@Slf4j -public abstract class AbstractFinisher implements Runnable { - public static final String CORRUPTED_DIRECTORY = ".corrupted"; - public final Path sourceDirectory; - public final long safeInterval; - public final List mask; - public final Path corruptedDirectory; - protected final FileSystemConfiguration fileSystemConfiguration; - private final Timestamp timestamp; - public int threads = -1; - public LinkedHashMap priorityByType = new LinkedHashMap<>(); - protected int bufferSize = 1024 * 256 * 4 * 4; - - - @SneakyThrows - protected AbstractFinisher( FileSystemConfiguration fileSystemConfiguration, Path sourceDirectory, long safeInterval, List mask, Timestamp timestamp ) { - this.fileSystemConfiguration = fileSystemConfiguration; - this.sourceDirectory = sourceDirectory; - this.safeInterval = safeInterval; - this.mask = mask; - this.corruptedDirectory = sourceDirectory.resolve( CORRUPTED_DIRECTORY ); - this.timestamp = timestamp; - } - - public void start() { - if( threads <= 0 ) { - threads = Runtime.getRuntime().availableProcessors(); - } - log.info( "threads = {}, sourceDirectory = {}, corruptedDirectory = {}, mask = {}, safeInterval = {}, bufferSize = {}", - threads, sourceDirectory, corruptedDirectory, mask, Dates.durationToString( safeInterval ), bufferSize ); - } - - @Override - public void run() { - run( false ); - } - - @SuppressWarnings( "checkstyle:ModifiedControlVariable" ) - @SneakyThrows - public void run( boolean forceSync ) { - DateTime start = new DateTime( UTC ); - log.debug( "force {} let's start packing of {} in {}", forceSync, mask, sourceDirectory ); - - log.debug( "current timestamp is {}", timestamp.toStartOfBucket( DateTime.now( UTC ) ) ); - DateTime bucketStartTime = timestamp.toStartOfBucket( start ); - Duration elapsed = new Duration( bucketStartTime, start ); - if( elapsed.getMillis() < safeInterval ) { - log.debug( "not safe to process yet ({}ms), some of the files could still be open, waiting...", elapsed ); - cleanup(); - log.debug( "packing is skipped" ); - return; - } - ThreadPoolExecutor pool = Executors.newFixedBlockingThreadPool( threads, new ThreadFactoryBuilder().setNameFormat( "finisher-%d" ).build() ); - - - List logs = Files.wildcard( sourceDirectory, mask ); - logs = Lists.filter( logs, path -> { - if( path.startsWith( corruptedDirectory ) ) return false; - if( LogMetadata.isMetadata( path ) ) return false; - - DateTime lastModifiedTime = timestamp.toStartOfBucket( new DateTime( Files.getLastModifiedTime( path ), UTC ) ); - if( !forceSync && !lastModifiedTime.isBefore( bucketStartTime ) ) { - log.debug( "skipping (current timestamp) {}", path ); - return false; - } - - return true; - } ); - - List logInfos = Lists.map( logs, path -> { - LogMetadata logMetadata = LogMetadata.readFor( path ); - long lastModifiedTime = Files.getLastModifiedTime( path ); - - return new LogInfo( path, lastModifiedTime, logMetadata.type, priorityByType.getOrDefault( logMetadata.type, 0 ) ); - } ); - - - logInfos.sort( ( li1, li2 ) -> { - Comparator comparator = Comparator - .comparingInt( logInfo -> logInfo.priority ).reversed() - .thenComparingLong( logInfo -> logInfo.lastModifiedTime ); - - return comparator.compare( li1, li2 ); - } ); - - - int priority = 0; - ArrayList> futures = new ArrayList<>(); - - if( !logInfos.isEmpty() ) { - log.info( "uploading..." ); - } - - try( FileSystem fileSystem = new FileSystem( fileSystemConfiguration ) ) { - for( int i = 0; i < logInfos.size(); i++ ) { - LogInfo logInfo = logInfos.get( i ); - if( priority == logInfo.priority ) { - DateTime lastModifiedTime = timestamp.toStartOfBucket( new DateTime( logInfo.lastModifiedTime, UTC ) ); - futures.add( CompletableFuture.runAsync( () -> process( fileSystem, logInfo.path, lastModifiedTime ), pool ) ); - } else { - CompletableFuture allOf = CompletableFuture.allOf( futures.toArray( new CompletableFuture[0] ) ); - allOf.get( 60 / timestamp.bucketsPerHour, TimeUnit.MINUTES ); - futures.clear(); - - priority = logInfo.priority; - i--; - } - } - CompletableFuture allOf = CompletableFuture.allOf( futures.toArray( new CompletableFuture[0] ) ); - allOf.get( 60 / timestamp.bucketsPerHour, TimeUnit.MINUTES ); - } - - pool.shutdown(); - - long fullTimeout = DateTime.now().getMillis() + TimeUnit.MINUTES.toMillis( 20 ); - while( !pool.awaitTermination( 1, TimeUnit.MINUTES ) ) { - if( DateTime.now().getMillis() <= fullTimeout ) { - log.debug( "Timeout passed, but pool still is working... {} tasks left", pool.shutdownNow().size() ); - break; - } - log.debug( "Waiting for finishing..." ); - } - cleanup(); - if( !logInfos.isEmpty() ) { - log.info( "uploading... Done. (files {} duration {})", - logInfos.size(), Dates.durationToString( DateTimeUtils.currentTimeMillis() - start.getMillis() ) ); - } else { - log.debug( "packing is done" ); - } - } - - protected abstract void cleanup(); - - protected abstract void process( FileSystem fileSystem, Path path, DateTime bucketTime ); - - @ToString - @AllArgsConstructor - private static class LogInfo { - public final Path path; - public final long lastModifiedTime; - public final String type; - public final int priority; - } -} diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java index 581cabc42d..65abfd06cd 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/AbstractWriter.java @@ -39,14 +39,12 @@ import org.joda.time.DateTime; import java.io.Closeable; -import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; @Slf4j -public abstract class AbstractWriter implements Closeable { +public abstract class AbstractWriter implements Closeable { public final LogFormat logFormat; protected final TemplateEngine templateEngine; protected final Path logDirectory; @@ -58,8 +56,7 @@ public abstract class AbstractWriter implements Closeable { protected final int maxVersions; protected final String hostname; protected final ReentrantLock lock = new ReentrantLock(); - protected T out; - protected Path outFilename; + protected LogFile logFile; protected String lastPattern; protected int fileVersion = 1; protected boolean closed = false; @@ -155,18 +152,17 @@ protected Path filename() { protected void closeOutput() throws LoggerException { lock.lock(); try { - if( out != null ) try { - stopwatch.count( out::close ); + if( logFile != null ) try { + stopwatch.count( logFile::close ); - long fileSize = Files.size( outFilename ); + long fileSize = logFile.getDataSize(); log.trace( "closing output {} ({} bytes)", this, fileSize ); Metrics.summary( "logstream_logging_server_bucket_size" ).record( fileSize ); Metrics.summary( "logstream_logging_server_bucket_time_seconds" ).record( Dates.nanosToSeconds( stopwatch.elapsed() ) ); - } catch( IOException e ) { - throw new LoggerException( e ); + + logFile.readyForUpload(); } finally { - outFilename = null; - out = null; + logFile = null; } } finally { lock.unlock(); diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java index d8af4f221d..ce50188250 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/DiskLoggerBackend.java @@ -83,8 +83,9 @@ public class DiskLoggerBackend extends AbstractLoggerBackend implements Cloneabl public final Path logDirectory; public final Timestamp timestamp; public final int bufferSize; - public final LoadingCache> writers; + public final LoadingCache writers; public final ScheduledExecutorService pool; + public final String hostname; protected final TemplateEngine templateEngine; public String filePattern = "{{ YEAR }}-{{ MONTH }}/{{ DAY }}/{{ LOG_TYPE }}_v{{ LOG_VERSION }}_{{ CLIENT_HOST }}-{{ YEAR }}-{{ MONTH }}-{{ DAY }}-{{ HOUR }}-{{ INTERVAL }}.tsv.gz"; public long requiredFreeSpace = DEFAULT_FREE_SPACE_REQUIRED; @@ -92,7 +93,6 @@ public class DiskLoggerBackend extends AbstractLoggerBackend implements Cloneabl public long refreshInitDelay = Dates.s( 10 ); public long refreshPeriod = Dates.s( 10 ); public volatile boolean closed; - public final String hostname; public DiskLoggerBackend( TemplateEngine templateEngine, Path logDirectory, Timestamp timestamp, int bufferSize, String hostname ) { this( templateEngine, logDirectory, new WriterConfiguration(), timestamp, bufferSize, hostname ); @@ -121,7 +121,7 @@ public DiskLoggerBackend( TemplateEngine templateEngine, Path logDirectory, Writ } ) .build( new CacheLoader<>() { @Override - public AbstractWriter load( LogId id ) { + public AbstractWriter load( LogId id ) { FilePatternConfiguration fp = filePatternByType.getOrDefault( id.logType.toUpperCase(), new FilePatternConfiguration( filePattern ) ); log.trace( "new writer id '{}' filePattern '{}'", id, fp ); @@ -172,7 +172,7 @@ public String log( ProtocolVersion protocolVersion, String hostName, String file Metrics.counter( "logstream_logging_disk_counter", List.of( Tag.of( "from", hostName ) ) ).increment(); Metrics.summary( "logstream_logging_disk_buffers", List.of( Tag.of( "from", hostName ) ) ).record( length ); - AbstractWriter writer = writers.get( new LogId( filePreffix, logType, hostName, properties, headers, types ) ); + AbstractWriter writer = writers.get( new LogId( filePreffix, logType, hostName, properties, headers, types ) ); log.trace( "logging {} bytes to {}", length, writer ); try { @@ -219,7 +219,7 @@ public void refresh() { public void refresh( boolean forceSync ) { log.trace( "refresh forceSync {}", forceSync ); - for( AbstractWriter writer : writers.asMap().values() ) { + for( AbstractWriter writer : writers.asMap().values() ) { try { writer.refresh( forceSync ); } catch( Exception e ) { diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogFile.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogFile.java new file mode 100644 index 0000000000..919aeebbc0 --- /dev/null +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogFile.java @@ -0,0 +1,317 @@ +package oap.logstream.disk; + +import lombok.EqualsAndHashCode; +import lombok.ToString; +import oap.json.Binder; +import oap.logstream.LogId; +import oap.logstream.LoggerException; +import org.apache.commons.io.FilenameUtils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; + +import static java.nio.charset.StandardCharsets.UTF_8; + +@ToString( exclude = "out" ) +@EqualsAndHashCode( of = "outFilename" ) +public class LogFile implements AutoCloseable { + public static final String EXTENSION_LOG_METADATA = ".metadata.yaml"; + public static final String EXTENSION_LOG_TRANSACTION = ".metadata.transaction"; + public static final String EXTENSION_LOG_COMPLETED = ".metadata.completed"; + + public final Path outFilename; + @Nullable + protected FileChannel out; + + public LogFile( Path outFilename ) { + this( outFilename, null ); + } + + protected LogFile( Path outFilename, @Nullable FileChannel out ) throws LoggerException { + this.outFilename = outFilename; + this.out = out; + } + + public static LogFile loadFromPath( Path path ) { + String s = path.toString(); + if( s.endsWith( EXTENSION_LOG_METADATA ) ) { + return new LogFile( getMainFilePath( path, EXTENSION_LOG_METADATA ) ); + } else if( s.endsWith( EXTENSION_LOG_TRANSACTION ) ) { + return new LogFile( getMainFilePath( path, EXTENSION_LOG_TRANSACTION ) ); + } else if( s.endsWith( EXTENSION_LOG_COMPLETED ) ) { + return new LogFile( getMainFilePath( path, EXTENSION_LOG_COMPLETED ) ); + } + return new LogFile( path ); + } + + private static Path getMainFilePath( Path path, String ext ) { + return Paths.get( path.toString().substring( 0, path.toString().length() - ext.length() ) ); + } + + public Path pathFor( String extension ) { + return Path.of( outFilename.toString() + extension ); + } + + public LogFile create( LogId logId ) { + try { + oap.io.Files.ensureDirectory( outFilename.getParent() ); + out = FileChannel.open( outFilename, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.APPEND ); + + syncLogMetadata( new LogMetadata( logId ) ); + + return this; + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public long beginTransaction() throws LoggerException { + try { + Path path = getTransactionFile(); + + long dataSize; + if( Files.exists( path ) ) { + dataSize = Long.parseLong( Files.readString( path, UTF_8 ) ); + } else { + dataSize = 0; + } + + return dataSize; + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public long getTransactionPosition() throws LoggerException { + return beginTransaction(); + } + + public void commitTransaction( int length ) throws LoggerException { + try { + Path path = getTransactionFile(); + Path tmpPath = pathFor( EXTENSION_LOG_TRANSACTION + ".tmp" ); + + long dataSize; + if( Files.exists( path ) ) { + dataSize = Long.parseLong( Files.readString( path, UTF_8 ) ); + } else { + dataSize = 0; + } + + + dataSize += length; + + Files.writeString( tmpPath, String.valueOf( dataSize ), UTF_8, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE ); + Files.move( tmpPath, path, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE ); + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public void readyForUpload() throws LoggerException { + try { + Files.createFile( pathFor( EXTENSION_LOG_COMPLETED ) ); + } catch( FileAlreadyExistsException ignored ) { + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public long getDataSize() throws LoggerException { + try { + return Files.size( outFilename ); + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + @Override + public void close() { + try { + if( out != null ) { + out.close(); + } + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public void writeAndCommitTransaction( byte[] bytes, int offset, int length ) throws LoggerException { + try { + ByteBuffer byteBuffer = ByteBuffer.wrap( bytes, offset, length ); + do { + out.write( byteBuffer ); + } while( byteBuffer.hasRemaining() ); + out.force( true ); + + commitTransaction( length ); + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public void beginTransactionWriteAndCommitTransaction( byte[] buffer, int offset, int length ) throws LoggerException { + try { + long position = beginTransaction(); + + ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, offset, length ); + do { + out.position( position ); + out.write( byteBuffer ); + } while( byteBuffer.hasRemaining() ); + out.force( true ); + + commitTransaction( length ); + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public LogMetadata getLogMetadata() { + return Binder.yaml.unmarshal( LogMetadata.class, getMetadataFile() ); + } + + public void addProperty( String name, String value ) { + LogMetadata metadata = getLogMetadata(); + metadata.setProperty( name, value ); + syncLogMetadata( metadata ); + } + + private void syncLogMetadata( LogMetadata logMetadata ) { + try { + Path path = getMetadataFile(); + Path tmpFile = Path.of( path + ".tmp" ); + Binder.yaml.marshal( tmpFile, logMetadata ); + + Files.move( tmpFile, path, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE ); + } catch( IOException e ) { + throw new UncheckedIOException( e ); + } + } + + public boolean existsAndValid() { + boolean mainFile = Files.exists( outFilename ); + boolean transactionFile = Files.exists( getTransactionFile() ); + boolean metadataFile = Files.exists( getMetadataFile() ); + + if( mainFile ) { + return transactionFile && metadataFile; + } + + return false; + } + + public boolean isCompleted() { + return Files.exists( getCompletedFile() ); + } + + public boolean isValid() { + boolean mainFile = Files.exists( outFilename ); + boolean transactionFile = Files.exists( getTransactionFile() ); + boolean metadataFile = Files.exists( getMetadataFile() ); + + return mainFile && transactionFile && metadataFile; + } + + public long getMaxModificationTime() throws LoggerException { + try { + Path transactionFile = getTransactionFile(); + Path metadataFile = getMetadataFile(); + Path completedFile = getCompletedFile(); + + boolean mainFile = Files.exists( outFilename ); + boolean transactionFileExists = Files.exists( transactionFile ); + boolean metadataFileExists = Files.exists( metadataFile ); + boolean completedFileExists = Files.exists( completedFile ); + + long time = -1; + + if( mainFile ) { + time = Long.max( time, Files.getLastModifiedTime( outFilename ).toMillis() ); + } + if( transactionFileExists ) { + time = Long.max( time, Files.getLastModifiedTime( transactionFile ).toMillis() ); + } + if( metadataFileExists ) { + time = Long.max( time, Files.getLastModifiedTime( metadataFile ).toMillis() ); + } + if( completedFileExists ) { + time = Long.max( time, Files.getLastModifiedTime( completedFile ).toMillis() ); + } + + return time; + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public Path getCompletedFile() { + return pathFor( EXTENSION_LOG_COMPLETED ); + } + + public Path getMetadataFile() { + return pathFor( EXTENSION_LOG_METADATA ); + } + + public Path getTransactionFile() { + return pathFor( EXTENSION_LOG_TRANSACTION ); + } + + public void delete() throws LoggerException { + try { + Files.deleteIfExists( outFilename ); + Files.deleteIfExists( getMetadataFile() ); + Files.deleteIfExists( getTransactionFile() ); + Files.deleteIfExists( getCompletedFile() ); + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + public void moveTo( Path destinationDirectory, Path baseDirectory ) throws LoggerException { + Path transactionFile = getTransactionFile(); + Path metadataFile = getMetadataFile(); + Path completedFile = getCompletedFile(); + + boolean mainFileExists = Files.exists( outFilename ); + boolean transactionFileExists = Files.exists( transactionFile ); + boolean metadataFileExists = Files.exists( metadataFile ); + boolean completedFileExists = Files.exists( completedFile ); + + if( mainFileExists ) { + moveFile( outFilename, baseDirectory, destinationDirectory ); + } + if( transactionFileExists ) { + moveFile( transactionFile, baseDirectory, destinationDirectory ); + } + if( metadataFileExists ) { + moveFile( metadataFile, baseDirectory, destinationDirectory ); + } + if( completedFileExists ) { + moveFile( completedFile, baseDirectory, destinationDirectory ); + } + } + + private void moveFile( Path file, Path baseDirectory, Path destinationDirectory ) throws LoggerException { + try { + Path targetPath = getTargetPath( baseDirectory, file, destinationDirectory ); + Files.createDirectories( targetPath.getParent() ); + Files.move( file, targetPath, StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.ATOMIC_MOVE ); + } catch( IOException e ) { + throw new LoggerException( e ); + } + } + + private Path getTargetPath( Path baseDirectory, Path file, Path destinationDirectory ) { + return destinationDirectory.resolve( FilenameUtils.separatorsToUnix( file.toString().substring( baseDirectory.toString().length() + 1 ) ) ); + } +} diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java index b651ee0d98..b9b861cfba 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/LogMetadata.java @@ -5,21 +5,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonGetter; import com.fasterxml.jackson.annotation.JsonIgnore; -import com.google.common.base.Preconditions; import lombok.EqualsAndHashCode; import lombok.ToString; -import oap.io.Files; -import oap.json.Binder; import oap.logstream.LogId; import oap.util.Maps; import org.joda.time.DateTime; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -31,9 +22,6 @@ @ToString @EqualsAndHashCode( exclude = "clientHostname" ) public class LogMetadata { - public static final String EXTENSION_LOG_METADATA = ".metadata.yaml"; - public static final String EXTENSION_LOG_TRANSACTION = ".metadata.transaction"; - public final String type; public final String clientHostname; @JsonIgnore @@ -63,106 +51,6 @@ public LogMetadata( LogId logId ) { logId.clientHostname, logId.properties, logId.headers, logId.types ); } - public static LogMetadata readFor( Path file ) { - return Binder.yaml.unmarshal( LogMetadata.class, pathFor( file ) ); - } - - public static Path pathFor( Path file, String extension ) { - return Path.of( file.toString() + extension ); - } - - public static Path pathFor( Path file ) { - return pathFor( file, EXTENSION_LOG_METADATA ); - } - - public static Path pathFor( String file, String extension ) { - return Path.of( file + extension ); - } - - public static Path pathFor( String file ) { - return pathFor( file, EXTENSION_LOG_METADATA ); - } - - public static Path pathForDataFromMetadata( Path metadataPath ) { - Preconditions.checkArgument( isMetadata( metadataPath ) ); - - String metadataPathString = metadataPath.toString(); - return Paths.get( metadataPathString.substring( 0, metadataPathString.indexOf( EXTENSION_LOG_METADATA ) ) ); - } - - public static Path pathForTransactionFromMetadata( Path metadataPath ) { - return Paths.get( pathForDataFromMetadata( metadataPath ) + EXTENSION_LOG_TRANSACTION ); - } - - public static Path pathForDataFromTransaction( Path transactionPath ) { - - String transactionPathString = transactionPath.toString(); - return Paths.get( transactionPathString.substring( 0, transactionPathString.indexOf( EXTENSION_LOG_TRANSACTION ) ) ); - } - - public static Path pathForMetadataFromTransaction( Path transactionPath ) { - return Paths.get( pathForDataFromTransaction( transactionPath ) + EXTENSION_LOG_METADATA ); - } - - public static boolean isMetadata( Path filename ) { - return filename.toString().endsWith( EXTENSION_LOG_METADATA ); - } - - public static boolean isTransaction( Path filename ) { - return filename.toString().endsWith( EXTENSION_LOG_TRANSACTION ); - } - - public static void rename( Path filename, Path newFile ) { - Path from = pathFor( filename ); - if( Files.exists( from ) ) - Files.rename( from, pathFor( newFile ) ); - } - - public static void addProperty( Path path, String name, String value ) { - LogMetadata metadata = LogMetadata.readFor( path ); - metadata.setProperty( name, value ); - metadata.writeFor( path ); - } - - public static long beginTransaction( Path file ) throws IOException { - Path path = pathFor( file, EXTENSION_LOG_TRANSACTION ); - - long dataSize; - if( java.nio.file.Files.exists( path ) ) { - dataSize = Long.parseLong( java.nio.file.Files.readString( path, StandardCharsets.UTF_8 ) ); - } else { - dataSize = 0; - } - - return dataSize; - } - - public static void commitTransaction( Path file, int length ) throws IOException { - Path path = pathFor( file, EXTENSION_LOG_TRANSACTION ); - Path tmpPath = pathFor( file, EXTENSION_LOG_TRANSACTION + ".tmp" ); - - long dataSize; - if( java.nio.file.Files.exists( path ) ) { - dataSize = Long.parseLong( java.nio.file.Files.readString( path, StandardCharsets.UTF_8 ) ); - } else { - dataSize = 0; - } - - - dataSize += length; - - java.nio.file.Files.writeString( tmpPath, String.valueOf( dataSize ), StandardCharsets.UTF_8, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.CREATE ); - java.nio.file.Files.move( tmpPath, path, StandardCopyOption.REPLACE_EXISTING ); - } - - public static void deleteTransaction( Path file ) throws IOException { - java.nio.file.Files.delete( pathFor( file, EXTENSION_LOG_TRANSACTION ) ); - } - - public static void deleteLogMetadata( Path file ) throws IOException { - java.nio.file.Files.delete( pathFor( file, EXTENSION_LOG_METADATA ) ); - } - @JsonAnyGetter public Map getProperties() { return properties; @@ -173,14 +61,6 @@ public void setProperty( String name, String value ) { properties.put( name, value ); } - public void writeFor( Path file ) { - Path path = pathFor( file ); - Path tmpFile = Path.of( path + ".tmp" ); - Binder.yaml.marshal( tmpFile, this ); - - Files.rename( tmpFile, path ); - } - public DateTime getDateTime( String name ) { return Maps.get( properties, name ) .map( v -> new DateTime( v, UTC ) ) diff --git a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java index 71bc85226b..9aeda25d3c 100644 --- a/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java +++ b/oap-formats/oap-logstream/oap-logstream/src/main/java/oap/logstream/disk/RowBinaryWriter.java @@ -1,7 +1,6 @@ package oap.logstream.disk; import lombok.extern.slf4j.Slf4j; -import oap.io.Files; import oap.logstream.LogId; import oap.logstream.LogIdTemplate; import oap.logstream.LogStreamProtocol; @@ -13,15 +12,12 @@ import oap.util.FastByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; import java.nio.file.Path; -import java.nio.file.StandardOpenOption; import java.util.List; import java.util.zip.GZIPOutputStream; @Slf4j -public class RowBinaryWriter extends AbstractWriter { +public class RowBinaryWriter extends AbstractWriter { public RowBinaryWriter( TemplateEngine templateEngine, Path logDirectory, String filePattern, LogId logId, int bufferSize, Timestamp timestamp, int maxVersions, String hostname ) { super( templateEngine, LogFormat.ROW_BINARY_GZ, logDirectory, filePattern, logId, bufferSize, timestamp, maxVersions, hostname ); } @@ -32,47 +28,34 @@ public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] b try { refresh(); Path filename = filename(); - if( out == null ) - if( !java.nio.file.Files.exists( filename ) ) { + if( logFile == null ) { + LogFile checkLogFile = new LogFile( filename ); + if( !checkLogFile.existsAndValid() && !checkLogFile.isCompleted() ) { log.debug( "[{}] open new file v{}", filename, fileVersion ); - outFilename = filename; - Files.ensureDirectory( filename.getParent() ); - out = FileChannel.open( filename, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE, StandardOpenOption.APPEND ); + logFile = checkLogFile.create( logId ); + LogIdTemplate logIdTemplate = new LogIdTemplate( logId ); - new LogMetadata( logId ).withProperty( "VERSION", logIdTemplate.getHashWithVersion( fileVersion, Inet.hostname() ) ).writeFor( filename ); + logFile.addProperty( "VERSION", logIdTemplate.getHashWithVersion( fileVersion, Inet.hostname() ) ); + + log.trace( "[{}] write headers {}", filename, logId.headers ); FastByteArrayOutputStream outputStream = new FastByteArrayOutputStream(); GZIPOutputStream gzip = new GZIPOutputStream( outputStream ); RowBinaryOutputStream rbOut = new RowBinaryOutputStream( gzip, List.of( logId.headers ), logId.types ); rbOut.close(); - ByteBuffer byteBuffer = ByteBuffer.wrap( outputStream.array, 0, outputStream.length ); - do { - out.write( byteBuffer ); - } while( byteBuffer.hasRemaining() ); - out.force( true ); - - LogMetadata.commitTransaction( filename, outputStream.length ); - - log.trace( "[{}] write headers {}", filename, logId.headers ); + logFile.writeAndCommitTransaction( outputStream.array, 0, outputStream.length ); } else { log.debug( "[{}] file exists v{}", filename, fileVersion ); fileVersion += 1; if( fileVersion > maxVersions ) throw new IllegalStateException( "version > " + maxVersions ); return write( protocolVersion, buffer, offset, length ); } - log.trace( "writing {} bytes to {}", length, this ); - - long position = LogMetadata.beginTransaction( filename ); + } - ByteBuffer byteBuffer = ByteBuffer.wrap( buffer, offset, length ); - do { - out.position( position ); - out.write( byteBuffer ); - } while( byteBuffer.hasRemaining() ); - out.force( true ); + log.trace( "writing {} bytes to {}", length, this ); - LogMetadata.commitTransaction( filename, length ); + logFile.beginTransactionWriteAndCommitTransaction( buffer, offset, length ); return filename.toString(); @@ -81,8 +64,7 @@ public String write( LogStreamProtocol.ProtocolVersion protocolVersion, byte[] b try { closeOutput(); } finally { - outFilename = null; - out = null; + logFile = null; } throw new LoggerException( e ); } finally { diff --git a/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java b/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java index 0550a9cb96..b9b0bf2e9e 100644 --- a/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java +++ b/oap-message/oap-message-client/src/main/java/oap/message/client/MessageSender.java @@ -347,12 +347,12 @@ private Messages.MessageInfo onOkRespone( Messages.MessageInfo messageInfo, Inpu case MessageProtocol.STATUS_UNKNOWN_ERROR_NO_RETRY -> { Metrics.counter( "oap.messages", "type", MessageProtocol.messageTypeToString( message.messageType ), "status", "error_no_retry" ).increment(); log.error( "[{}] unknown error -> no retry", uniqueName ); - lastStatus.put( message.messageType, Pair.__( MessageStatus.ERROR, status ) ); + lastStatus.put( message.messageType, Pair.__( MessageStatus.ERROR_NO_RETRY, status ) ); } case MessageProtocol.STATUS_UNKNOWN_MESSAGE_TYPE -> { Metrics.counter( "oap.messages", "type", MessageProtocol.messageTypeToString( message.messageType ), "status", "unknown_message_type" ).increment(); log.error( "[{}] unknown message type: {}", uniqueName, status ); - lastStatus.put( message.messageType, Pair.__( MessageStatus.ERROR, status ) ); + lastStatus.put( message.messageType, Pair.__( MessageStatus.ERROR_NO_RETRY, status ) ); } default -> { String clientStatus = MessageProtocol.getStatus( status ); diff --git a/oap-message/oap-message-common/src/main/java/oap/message/MessageProtocol.java b/oap-message/oap-message-common/src/main/java/oap/message/MessageProtocol.java index 5332b4daa8..777867816a 100644 --- a/oap-message/oap-message-common/src/main/java/oap/message/MessageProtocol.java +++ b/oap-message/oap-message-common/src/main/java/oap/message/MessageProtocol.java @@ -31,6 +31,7 @@ import java.io.Serial; import java.io.Serializable; import java.util.HashMap; +import java.util.Properties; public final class MessageProtocol { public static final short PROTOCOL_VERSION_1 = 1; @@ -50,9 +51,9 @@ public final class MessageProtocol { private static final HashMap typeMap = new HashMap<>(); static { - var properties = Resources.readAllProperties( "META-INF/oap-messages.properties" ); - for( var propertyName : properties.stringPropertyNames() ) { - var key = propertyName.trim(); + Properties properties = Resources.readAllProperties( "META-INF/oap-messages.properties" ); + for( String propertyName : properties.stringPropertyNames() ) { + String key = propertyName.trim(); if( key.startsWith( "type." ) ) { key = key.substring( 5 ); @@ -69,18 +70,19 @@ public final class MessageProtocol { public static String messageStatusToString( short status ) { return switch( status ) { case STATUS_OK -> "OK"; - case STATUS_UNKNOWN_ERROR, STATUS_UNKNOWN_ERROR_NO_RETRY -> "UNKNOWN_ERROR"; + case STATUS_UNKNOWN_ERROR -> "UNKNOWN_ERROR"; + case STATUS_UNKNOWN_ERROR_NO_RETRY -> "UNKNOWN_ERROR_NO_RETRY"; case STATUS_ALREADY_WRITTEN -> "ALREADY_WRITTEN"; case STATUS_UNKNOWN_MESSAGE_TYPE -> "UNKNOWN_MESSAGE_TYPE"; default -> { - var str = statusMap.get( status ); + String str = statusMap.get( status ); yield str != null ? str : "Unknown status: " + status; } }; } public static String messageTypeToString( byte messageType ) { - var str = typeMap.get( messageType ); + String str = typeMap.get( messageType ); return str != null ? str : String.valueOf( messageType ); } diff --git a/oap-stdlib/src/main/java/oap/io/Files.java b/oap-stdlib/src/main/java/oap/io/Files.java index 19b5092a7c..42422940c8 100644 --- a/oap-stdlib/src/main/java/oap/io/Files.java +++ b/oap-stdlib/src/main/java/oap/io/Files.java @@ -586,14 +586,14 @@ public static long usableSpaceAtDirectory( Path path ) { @SuppressWarnings( "checkstyle:UnnecessaryParentheses" ) public static boolean wildcardMatch( String filename, String wildcardMatcher ) { - var wmPosition = 0; - var fnPosition = 0; + int wmPosition = 0; + int fnPosition = 0; - var mp = 0; - var cp = 0; + int mp = 0; + int cp = 0; - var fnLength = filename.length(); - var wmLength = wildcardMatcher.length(); + int fnLength = filename.length(); + int wmLength = wildcardMatcher.length(); char wm = 0; diff --git a/pom.xml b/pom.xml index 19d239b7dc..aad5ad2e3f 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ - 25.6.12 + 25.6.13 25.0.1 25.0.0