Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,23 @@ import (
json "github.com/bytedance/sonic"
)

// useInt64API decodes JSON integers into int64 instead of float64, preserving
// full precision for values whose magnitude exceeds 2^53. Non-integer numbers
// still decode as float64.
var useInt64API = json.Config{UseInt64: true}.Froze()

func Unmarshal(b []byte, v any) error {
return json.Unmarshal(b, v)
}

// UnmarshalUseInt64 behaves like Unmarshal but decodes JSON integers into
// int64 (rather than float64) when the destination is an interface{}. This
// matters for full-range bigint values from wal2json: float64 only has 53
// bits of mantissa and would silently round.
func UnmarshalUseInt64(b []byte, v any) error {
return useInt64API.Unmarshal(b, v)
}

func Marshal(v any) ([]byte, error) {
return json.Marshal(v)
}
105 changes: 105 additions & 0 deletions internal/json/json_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// SPDX-License-Identifier: Apache-2.0

package json

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestUnmarshalUseInt64(t *testing.T) {
t.Parallel()

tests := []struct {
name string
input string
expected any
}{
{
name: "small integer decodes as int64",
input: `{"value": 42}`,
expected: int64(42),
},
{
name: "bigint above 2^53 preserved",
input: `{"value": 9007199254740993}`,
expected: int64(9007199254740993),
},
{
name: "max int64 preserved",
input: `{"value": 9223372036854775807}`,
expected: int64(9223372036854775807),
},
{
name: "negative bigint preserved",
input: `{"value": -9223372036854775808}`,
expected: int64(-9223372036854775808),
},
{
name: "non-integer number decodes as float64",
input: `{"value": 1.5}`,
expected: 1.5,
},
{
// `1.0` is mathematically an integer but the JSON literal
// contains a decimal point, so sonic treats it as float64.
// Pin this down β€” Postgres serialises non-integer numerics
// this way and pgstream relies on the type discriminator.
name: "integer-valued literal with decimal point stays float64",
input: `{"value": 1.0}`,
expected: 1.0,
},
{
// Scientific notation always decodes as float64 regardless of
// whether the value is mathematically an integer.
name: "scientific notation decodes as float64",
input: `{"value": 1e10}`,
expected: 1e10,
},
{
name: "negative scientific notation decodes as float64",
input: `{"value": -1.5e2}`,
expected: -1.5e2,
},
{
name: "zero decodes as int64",
input: `{"value": 0}`,
expected: int64(0),
},
{
name: "small negative integer decodes as int64",
input: `{"value": -1}`,
expected: int64(-1),
},
{
// 2^63 doesn't fit in int64; sonic falls back to float64
// rather than wrapping around or returning an error.
name: "value above MaxInt64 falls back to float64",
input: `{"value": 9223372036854775808}`,
expected: float64(9223372036854775808),
},
{
name: "null decodes as nil",
input: `{"value": null}`,
expected: nil,
},
{
// Sanity check that the rule applies inside arrays / nested
// objects too β€” this is what makes the snapshot/jsonb fix
// for #686 work end-to-end.
name: "nested large integer preserved through array",
input: `{"value": [1, 9223372036854775807, 3]}`,
expected: []any{int64(1), int64(9223372036854775807), int64(3)},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
var got map[string]any
require.NoError(t, UnmarshalUseInt64([]byte(tt.input), &got))
require.Equal(t, tt.expected, got["value"])
})
}
}
4 changes: 4 additions & 0 deletions internal/postgres/pg_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/lib/pq"
pgxvec "github.com/pgvector/pgvector-go/pgx"
pgjson "github.com/xataio/pgstream/internal/json"
)

type QualifiedName struct {
Expand Down Expand Up @@ -164,6 +165,9 @@ type extensionType struct {
// extensionTypes lists the postgres extension types pgstream teaches pgx
// about on every connection.
var extensionTypes = []extensionType{
{name: "json", register: registerWithCodec("json", &pgtype.JSONCodec{Marshal: pgjson.Marshal, Unmarshal: pgjson.UnmarshalUseInt64})},
{name: "jsonb", register: registerWithCodec("jsonb", &pgtype.JSONBCodec{Marshal: pgjson.Marshal, Unmarshal: pgjson.UnmarshalUseInt64})},

{name: "hstore", register: registerWithCodec("hstore", pgtype.HstoreCodec{})},
{name: "vector", register: func(ctx context.Context, conn *pgx.Conn, _ uint32) error {
// pgxvec registers vector, halfvec and sparsevec in one call β€”
Expand Down
126 changes: 126 additions & 0 deletions pkg/stream/integration/pg_pg_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"os"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -947,3 +948,128 @@ func getRoles(t *testing.T, ctx context.Context, conn pglib.Querier) []string {

return roles
}

// Test_PostgresToPostgres_LargeIntegerPrecisionWAL is the WAL-replication
// counterpart of Test_SnapshotToPostgres_LargeIntegerPrecision. It pins down
// that INSERT / UPDATE events carrying large integers β€” both as plain bigint
// column values and as integer fields inside a jsonb payload β€” replicate
// bit-for-bit through the wal2json β†’ listener β†’ target path.
//
// Covers the WAL side of #824 (bigint) and #686 (jsonb large int).
func Test_PostgresToPostgres_LargeIntegerPrecisionWAL(t *testing.T) {
if os.Getenv("PGSTREAM_INTEGRATION_TESTS") == "" {
t.Skip("skipping integration test...")
}

cfg := &stream.Config{
Listener: testPostgresListenerCfg(),
Processor: testPostgresProcessorCfg(),
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
runStream(t, ctx, cfg)

testTable := "pg2pg_largeint_wal"

// 9007199254740993 = 2^53 + 1, the smallest int64 that float64 cannot
// represent exactly. 9223372036854775807 is MaxInt64.
execQuery(t, ctx, fmt.Sprintf(
`CREATE TABLE %s(
id bigint PRIMARY KEY,
amount bigint NOT NULL,
payload jsonb NOT NULL
)`, testTable))

type row struct {
id int64
amount int64
payload string
}

targetConn, err := pglib.NewConn(ctx, targetPGURL)
require.NoError(t, err)

fetch := func() ([]row, error) {
rows, err := targetConn.Query(ctx,
fmt.Sprintf("SELECT id, amount, payload::text FROM %s ORDER BY id", testTable))
if err != nil {
return nil, err
}
defer rows.Close()
out := []row{}
for rows.Next() {
var r row
if err := rows.Scan(&r.id, &r.amount, &r.payload); err != nil {
return nil, err
}
out = append(out, r)
}
return out, rows.Err()
}

waitFor := func(want []row) {
t.Helper()
timer := time.NewTimer(20 * time.Second)
defer timer.Stop()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-timer.C:
cancel()
t.Fatalf("timeout waiting for WAL replication; last fetch did not match %v", want)
case <-ticker.C:
got, err := fetch()
if err != nil || len(got) != len(want) {
continue
}
if reflect.DeepEqual(got, want) {
require.Equal(t, want, got)
return
}
}
}
}

t.Run("insert bigint above 2^53 + jsonb with large int", func(t *testing.T) {
execQuery(t, ctx, fmt.Sprintf(
`INSERT INTO %s(id, amount, payload) VALUES
(9007199254740993, 9007199254740993, '{"n":9007199254740993}'),
(9223372036854775807, 9223372036854775807, '{"n":9223372036854775807,"nested":{"k":1234567890123456789}}')`,
testTable))

waitFor([]row{
{id: 9007199254740993, amount: 9007199254740993, payload: `{"n": 9007199254740993}`},
{id: 9223372036854775807, amount: 9223372036854775807, payload: `{"n": 9223372036854775807, "nested": {"k": 1234567890123456789}}`},
})
})

t.Run("update jsonb payload with new large int", func(t *testing.T) {
execQuery(t, ctx, fmt.Sprintf(
`UPDATE %s
SET payload = jsonb_set(payload, '{k2}', '987654321987654321'::jsonb)
WHERE id = 9223372036854775807`,
testTable))

// jsonb stores keys ordered by length then alphabetically, so
// `n` (1) before `k2` (2) before `nested` (6).
waitFor([]row{
{id: 9007199254740993, amount: 9007199254740993, payload: `{"n": 9007199254740993}`},
{id: 9223372036854775807, amount: 9223372036854775807, payload: `{"n": 9223372036854775807, "k2": 987654321987654321, "nested": {"k": 1234567890123456789}}`},
})
})

t.Run("update bigint above 2^53 by +1", func(t *testing.T) {
// The repro from #824: incrementing a bigint above 2^53 must
// land on the destination as exactly source+1, not source+0.
execQuery(t, ctx, fmt.Sprintf(
`UPDATE %s SET amount = amount + 1 WHERE id = 9007199254740993`,
testTable))

waitFor([]row{
{id: 9007199254740993, amount: 9007199254740994, payload: `{"n": 9007199254740993}`},
{id: 9223372036854775807, amount: 9223372036854775807, payload: `{"n": 9223372036854775807, "k2": 987654321987654321, "nested": {"k": 1234567890123456789}}`},
})
})
}
Loading
Loading