Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 196 additions & 3 deletions crates/iceberg/src/avro/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,83 @@ use crate::{Error, ErrorKind, Result, ensure_data_valid};

const ELEMENT_ID: &str = "element-id";
const FIELD_ID_PROP: &str = "field-id";
const ICEBERG_FIELD_NAME_PROP: &str = "iceberg-field-name";
const KEY_ID: &str = "key-id";
const VALUE_ID: &str = "value-id";
const MAP_LOGICAL_TYPE: &str = "map";
// This const may better to maintain in avro-rs.
const LOGICAL_TYPE: &str = "logicalType";

fn is_valid_avro_name(name: &str) -> bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we could use something from https://github.com/apache/avro-rs/blob/4edb1ce1ae1ab5bd3fafb08ca3f622946c01c9fd/avro/src/validator.rs#L4 but it looks like validate_record_field_name is pub(crate). Is it work filing an issue upstream to see if we could expose something that would allow us to validate against their implementation?

let mut chars = name.chars();
match chars.next() {
None => false,
Some(first) => {
(first.is_ascii_alphabetic() || first == '_')
&& chars.all(|c| c.is_ascii_alphanumeric() || c == '_')
}
}
}

/// Sanitizes an Iceberg field name to a valid Avro field name.
///
/// Matches Java `AvroSchemaUtil.sanitize()` semantics, operating on UTF-16
/// code units (to match Java's `String.charAt()`). Characters that are not
/// ASCII letters, ASCII digits, or underscore are escaped as `_x<HEX>` where
/// HEX is the uppercase hexadecimal representation of the UTF-16 code unit
/// with no leading zeros.
///
/// Special handling for the first character:
/// - ASCII digit: prefix with `_`, digit is preserved (e.g., `1foo` -> `_1foo`)
/// - Non-letter, non-underscore: escaped as `_x<HEX>` (e.g., `.foo` -> `_x2Efoo`)
///
/// For supplementary characters (above U+FFFF), each surrogate half is escaped
/// independently (e.g., U+1F600 -> `_xD83D_xDE00`), matching Java's behavior
/// of iterating over `char` (UTF-16 code unit) values.
fn sanitize_avro_name(name: &str) -> String {
let utf16_units: Vec<u16> = name.encode_utf16().collect();
if utf16_units.is_empty() {
return String::new();
}

let mut result = String::with_capacity(name.len() + 16);

let first = utf16_units[0];
if is_ascii_alpha_u16(first) || first == b'_' as u16 {
result.push(first as u8 as char);
} else if is_ascii_digit_u16(first) {
result.push('_');
result.push(first as u8 as char);
} else {
result.push_str(&format!("_x{:X}", first));
}

for &unit in &utf16_units[1..] {
if is_ascii_alphanum_u16(unit) || unit == b'_' as u16 {
result.push(unit as u8 as char);
} else {
result.push_str(&format!("_x{:X}", unit));
}
}

result
}

#[inline]
fn is_ascii_alpha_u16(c: u16) -> bool {
matches!(c, 0x41..=0x5A | 0x61..=0x7A)
}

#[inline]
fn is_ascii_digit_u16(c: u16) -> bool {
matches!(c, 0x30..=0x39)
}
Comment on lines +105 to +107
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    /**
     * Determines if the specified character is a digit.
     * <p>
     * A character is a digit if its general category type, provided
     * by {@code Character.getType(ch)}, is
     * {@code DECIMAL_DIGIT_NUMBER}.
     * <p>
     * Some Unicode character ranges that contain digits:
     * <ul>
     * <li>{@code '\u005Cu0030'} through {@code '\u005Cu0039'},
     *     ISO-LATIN-1 digits ({@code '0'} through {@code '9'})
     * <li>{@code '\u005Cu0660'} through {@code '\u005Cu0669'},
     *     Arabic-Indic digits
     * <li>{@code '\u005Cu06F0'} through {@code '\u005Cu06F9'},
     *     Extended Arabic-Indic digits
     * <li>{@code '\u005Cu0966'} through {@code '\u005Cu096F'},
     *     Devanagari digits
     * <li>{@code '\u005CuFF10'} through {@code '\u005CuFF19'},
     *     Fullwidth digits
     * </ul>
     *
     * Many other character ranges contain digits as well.
     *
     * <p><b>Note:</b> This method cannot handle <a
     * href="#supplementary"> supplementary characters</a>. To support
     * all Unicode characters, including supplementary characters, use
     * the {@link #isDigit(int)} method.
     *
     * @param   ch   the character to be tested.
     * @return  {@code true} if the character is a digit;
     *          {@code false} otherwise.
     * @see     Character#digit(char, int)
     * @see     Character#forDigit(int, int)
     * @see     Character#getType(char)
     */
    public static boolean isDigit(char ch) {
        return isDigit((int)ch);
    }

https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java#L551
Java's isDigit() actually covers more than just ascii digits. This is a pretty niche edge-case.

Trying to reason about this in my head and I don't think it matters for interop that the representations are identical because both will correctly underscore the field and restore it from the map? Can you check my reasoning here?


#[inline]
fn is_ascii_alphanum_u16(c: u16) -> bool {
is_ascii_alpha_u16(c) || is_ascii_digit_u16(c)
}

struct SchemaToAvroSchema {
schema: String,
}
Expand Down Expand Up @@ -86,8 +157,14 @@ impl SchemaVisitor for SchemaToAvroSchema {
None
};

let (avro_name, original_name) = if is_valid_avro_name(&field.name) {
(field.name.clone(), None)
} else {
(sanitize_avro_name(&field.name), Some(field.name.clone()))
};

let mut avro_record_field = AvroRecordField {
name: field.name.clone(),
name: avro_name,
schema: field_schema,
order: RecordFieldOrder::Ignore,
position: 0,
Expand All @@ -102,6 +179,12 @@ impl SchemaVisitor for SchemaToAvroSchema {
Value::Number(Number::from(field.id)),
);

if let Some(name) = original_name {
avro_record_field
.custom_attributes
.insert(ICEBERG_FIELD_NAME_PROP.to_string(), Value::String(name));
}

Ok(Either::Right(avro_record_field))
}

Expand Down Expand Up @@ -442,8 +525,15 @@ impl AvroSchemaVisitor for AvroSchemaToSchema {

let optional = is_avro_optional(&avro_field.schema);

let mut field =
NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional);
// Prefer the original Iceberg field name stored during sanitization,
// falling back to the Avro field name if the property is absent.
let field_name = avro_field
.custom_attributes
.get(ICEBERG_FIELD_NAME_PROP)
.and_then(|v| v.as_str())
.unwrap_or(&avro_field.name);

let mut field = NestedField::new(field_id, field_name, field_type.unwrap(), !optional);

if let Some(doc) = &avro_field.doc {
field = field.with_doc(doc);
Expand Down Expand Up @@ -1212,4 +1302,107 @@ mod tests {
converter.primitive(&avro_schema).unwrap().unwrap()
);
}

#[test]
fn test_is_valid_avro_name() {
assert!(is_valid_avro_name("hello"));
assert!(is_valid_avro_name("_private"));
assert!(is_valid_avro_name("field_123"));
assert!(!is_valid_avro_name("123field"));
assert!(!is_valid_avro_name("field.name"));
assert!(!is_valid_avro_name("has space"));
assert!(!is_valid_avro_name(""));
}

#[test]
fn test_sanitize_avro_name() {
assert_eq!(sanitize_avro_name("valid_name"), "valid_name");
assert_eq!(sanitize_avro_name("123field"), "_123field");
assert_eq!(sanitize_avro_name("field.name"), "field_x2Ename");
assert_eq!(sanitize_avro_name("has space"), "has_x20space");
assert_eq!(sanitize_avro_name(".dotfirst"), "_x2Edotfirst");
assert_eq!(sanitize_avro_name("a-b"), "a_x2Db");
}

#[test]
fn test_sanitize_avro_name_unicode() {
// Non-ASCII BMP character: U+00E9 = 0xE9 in UTF-16
assert_eq!(sanitize_avro_name("a\u{00E9}"), "a_xE9");
// CJK character: U+4E2D = 0x4E2D in UTF-16
assert_eq!(sanitize_avro_name("\u{4E2D}"), "_x4E2D");
// Supplementary character U+1F600 = surrogate pair D83D, DE00
assert_eq!(sanitize_avro_name("a\u{1F600}b"), "a_xD83D_xDE00b");
// Supplementary character at start
assert_eq!(sanitize_avro_name("\u{1F600}"), "_xD83D_xDE00");
}

#[test]
fn test_sanitize_avro_name_empty() {
assert_eq!(sanitize_avro_name(""), "");
}

#[test]
fn test_sanitization_round_trip() {
let iceberg_schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "123column", PrimitiveType::String.into()).into(),
NestedField::required(2, "normal_field", PrimitiveType::Int.into()).into(),
NestedField::optional(3, "field.with.dots", PrimitiveType::Long.into()).into(),
])
.build()
.unwrap();

let avro_schema = schema_to_avro_schema("test_schema", &iceberg_schema).unwrap();

if let AvroSchema::Record(record) = &avro_schema {
assert_eq!(record.fields[0].name, "_123column");
assert_eq!(
record.fields[0].custom_attributes.get("iceberg-field-name"),
Some(&Value::String("123column".to_string()))
);
assert_eq!(record.fields[1].name, "normal_field");
assert_eq!(
record.fields[1].custom_attributes.get("iceberg-field-name"),
None
);
assert_eq!(record.fields[2].name, "field_x2Ewith_x2Edots");
assert_eq!(
record.fields[2].custom_attributes.get("iceberg-field-name"),
Some(&Value::String("field.with.dots".to_string()))
);
} else {
panic!("Expected record schema");
}

let converted_back = avro_schema_to_schema(&avro_schema).unwrap();
assert_eq!(iceberg_schema, converted_back);
}

#[test]
fn test_avro_to_iceberg_uses_iceberg_field_name_property() {
// Simulate reading an Avro schema written by Java with sanitized names
let avro_json = r#"{
"type": "record",
"name": "test_schema",
"fields": [
{
"name": "_123column",
"type": "string",
"field-id": 1,
"iceberg-field-name": "123column"
},
{
"name": "normal_field",
"type": "int",
"field-id": 2
}
]
}"#;
let avro_schema = AvroSchema::parse_str(avro_json).unwrap();
let iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap();

let fields = iceberg_schema.as_struct().fields();
assert_eq!(fields[0].name, "123column");
assert_eq!(fields[1].name, "normal_field");
}
}
Loading