Skip to content

feat: configurable bulk data loader#19

Closed
mlwelles wants to merge 2 commits into
matthewmcneely:mainfrom
mlwelles:feature/bulk-loader
Closed

feat: configurable bulk data loader#19
mlwelles wants to merge 2 commits into
matthewmcneely:mainfrom
mlwelles:feature/bulk-loader

Conversation

@mlwelles

@mlwelles mlwelles commented Jun 4, 2026

Copy link
Copy Markdown

Adds a configurable bulk loader for RDF/JSON files, replacing the previous live-loader:

  • load package — BatchSize, MutationWorkers, Schema, and file match/sort options.
  • loaddata.go (embedded / Namespace path) and loaddata_grpc.go (gRPC path).
  • Exposed via a new Client.LoadData(ctx, dataDir, opts...) method; removes live.go.

Builds and go tests green. Stacks on #18 (uses RetryPolicy), so the diff includes that PR's commit.

Part of a series upstreaming work from a downstream fork; opened for review/discussion.


Summary by cubic

Adds a configurable bulk loader for RDF/JSON, replacing the live-loader and exposing a new Client.LoadData(ctx, dataDir, opts...). It supports schema application, file filtering/sorting, batching, concurrent workers, and gRPC retry handling.

  • New Features

    • New load package with options: WithBatchSize, WithMutationWorkers, WithSchema, WithFileMatch, WithFileSort.
    • Client.LoadData loads .rdf/.json (including .gz) from a directory; works with embedded and gRPC clients.
    • Pre-allocates UIDs for blank nodes and streams mutations in batches with concurrent workers.
    • Retries aborted gRPC mutations using RetryPolicy.
  • Migration

    • Replace any live-loader usage with Client.LoadData(ctx, dataDir, opts...); defaults match previous behavior (batch size 1000, 1 worker).

Written for commit 971559c. Summary will update on new commits.

Review in cubic

mlwelles added 2 commits June 4, 2026 13:12
Add RetryPolicy / DefaultRetryPolicy and a runner that re-executes a
function on aborted Dgraph transactions with exponential backoff
(retry.go). Self-contained; a follow-up wires retry into the client via a
WithRetry method.
Add a bulk loader for RDF/JSON files:
- load package: BatchSize, MutationWorkers, Schema, file match/sort options.
- loaddata.go (embedded/Namespace path) + loaddata_grpc.go (gRPC path).
- exposed as client.LoadData(ctx, dataDir, opts...).

Replaces the previous live-loader (live.go). Uses RetryPolicy from the
retry layer.

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 issues found across 9 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="loaddata_grpc.go">

<violation number="1" location="loaddata_grpc.go:47">
P2: UID allocation RPC uses context.TODO() instead of propagating the caller's context, preventing cancellation/deadline from interrupting in-flight AllocateUIDs calls.</violation>
</file>

<file name="loaddata_test.go">

<violation number="1" location="loaddata_test.go:387">
P2: gRPC integration test cleanup is not deferred, so failures can skip `DropAll` cleanup and contaminate shared test state</violation>
</file>

<file name="load/options.go">

<violation number="1" location="load/options.go:32">
P1: Typed-nil `FileMatchFunc` passed via `WithFileMatch` causes a panic because interfaces holding typed nils are non-nil in Go. The `MatchFile` nil check won't short-circuit, and `FileMatchFunc.Match` will call `f(path)` on a nil function value, resulting in a runtime panic.</violation>
</file>

<file name="loaddata.go">

<violation number="1" location="loaddata.go:293">
P0: UID pre-resolution remaps all node IDs (not only blank nodes), risking data corruption during import</violation>
</file>

<file name="retry.go">

<violation number="1" location="retry.go:36">
P2: `DefaultRetryPolicy` comment says 5 retries, but code sets `MaxRetries: 10` — documentation and implementation disagree. Both the retry count and the documented max need to match.</violation>

<violation number="2" location="retry.go:77">
P2: WithRetry can panic when MaxRetries is negative (or math.MaxInt due to overflow) because the integer range loop executes zero iterations and falls through to panic("unreachable"). Missing input validation on the caller-provided RetryPolicy creates a reachable runtime crash path.</violation>
</file>

Reply with feedback, questions, or to request a fix.

Re-trigger cubic

Comment thread loaddata.go
return g.Wait()
}

func (l *liveLoader) uid(ns uint64, val string) (string, error) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0: UID pre-resolution remaps all node IDs (not only blank nodes), risking data corruption during import

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At loaddata.go, line 293:

<comment>UID pre-resolution remaps all node IDs (not only blank nodes), risking data corruption during import</comment>

<file context>
@@ -0,0 +1,323 @@
+	return g.Wait()
+}
+
+func (l *liveLoader) uid(ns uint64, val string) (string, error) {
+	key := x.NamespaceAttr(ns, val)
+
</file context>

Comment thread load/options.go
type FileMatchFunc func(path string) bool

// Match implements FileMatch.
func (f FileMatchFunc) Match(path string) bool { return f(path) }

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Typed-nil FileMatchFunc passed via WithFileMatch causes a panic because interfaces holding typed nils are non-nil in Go. The MatchFile nil check won't short-circuit, and FileMatchFunc.Match will call f(path) on a nil function value, resulting in a runtime panic.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At load/options.go, line 32:

<comment>Typed-nil `FileMatchFunc` passed via `WithFileMatch` causes a panic because interfaces holding typed nils are non-nil in Go. The `MatchFile` nil check won't short-circuit, and `FileMatchFunc.Match` will call `f(path)` on a nil function value, resulting in a runtime panic.</comment>

<file context>
@@ -0,0 +1,200 @@
+type FileMatchFunc func(path string) bool
+
+// Match implements FileMatch.
+func (f FileMatchFunc) Match(path string) bool { return f(path) }
+
+// FileSort reorders the list of data files before processing.
</file context>

Comment thread loaddata_grpc.go
if err != nil {
return nil, fmt.Errorf("get client from pool for UID allocation: %w", err)
}
start, end, err := dc.AllocateUIDs(context.TODO(), alloc)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: UID allocation RPC uses context.TODO() instead of propagating the caller's context, preventing cancellation/deadline from interrupting in-flight AllocateUIDs calls.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At loaddata_grpc.go, line 47:

<comment>UID allocation RPC uses context.TODO() instead of propagating the caller's context, preventing cancellation/deadline from interrupting in-flight AllocateUIDs calls.</comment>

<file context>
@@ -0,0 +1,160 @@
+		if err != nil {
+			return nil, fmt.Errorf("get client from pool for UID allocation: %w", err)
+		}
+		start, end, err := dc.AllocateUIDs(context.TODO(), alloc)
+		a.pool.put(dc)
+		if err != nil {
</file context>

Comment thread loaddata_test.go
require.NoError(t, err)
defer client.Close()

require.NoError(t, client.DropAll(ctx))

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: gRPC integration test cleanup is not deferred, so failures can skip DropAll cleanup and contaminate shared test state

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At loaddata_test.go, line 387:

<comment>gRPC integration test cleanup is not deferred, so failures can skip `DropAll` cleanup and contaminate shared test state</comment>

<file context>
@@ -0,0 +1,472 @@
+	require.NoError(t, err)
+	defer client.Close()
+
+	require.NoError(t, client.DropAll(ctx))
+
+	// Build RDF data: 100 LoadTestPerson nodes, each linked to the previous one.
</file context>

Comment thread retry.go
}

// DefaultRetryPolicy mirrors dgraph4j's defaults:
// 5 retries, 100ms base delay, 5s max delay, 10% jitter.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: DefaultRetryPolicy comment says 5 retries, but code sets MaxRetries: 10 — documentation and implementation disagree. Both the retry count and the documented max need to match.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At retry.go, line 36:

<comment>`DefaultRetryPolicy` comment says 5 retries, but code sets `MaxRetries: 10` — documentation and implementation disagree. Both the retry count and the documented max need to match.</comment>

<file context>
@@ -0,0 +1,96 @@
+}
+
+// DefaultRetryPolicy mirrors dgraph4j's defaults:
+// 5 retries, 100ms base delay, 5s max delay, 10% jitter.
+var DefaultRetryPolicy = RetryPolicy{
+	MaxRetries: 10,
</file context>
Suggested change
// 5 retries, 100ms base delay, 5s max delay, 10% jitter.
// DefaultRetryPolicy mirrors dgraph4j's defaults:
// 10 retries, 100ms base delay, 5s max delay, 10% jitter.

Comment thread retry.go
// return client.Insert(ctx, &entity)
// })
func (c client) WithRetry(ctx context.Context, policy RetryPolicy, fn func() error) error {
for attempt := range policy.MaxRetries + 1 {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: WithRetry can panic when MaxRetries is negative (or math.MaxInt due to overflow) because the integer range loop executes zero iterations and falls through to panic("unreachable"). Missing input validation on the caller-provided RetryPolicy creates a reachable runtime crash path.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At retry.go, line 77:

<comment>WithRetry can panic when MaxRetries is negative (or math.MaxInt due to overflow) because the integer range loop executes zero iterations and falls through to panic("unreachable"). Missing input validation on the caller-provided RetryPolicy creates a reachable runtime crash path.</comment>

<file context>
@@ -0,0 +1,96 @@
+//	    return client.Insert(ctx, &entity)
+//	})
+func (c client) WithRetry(ctx context.Context, policy RetryPolicy, fn func() error) error {
+	for attempt := range policy.MaxRetries + 1 {
+		err := fn()
+		if err == nil {
</file context>

@mlwelles

mlwelles commented Jun 4, 2026

Copy link
Copy Markdown
Author

Closing this loader PR. After review we found the bulk loader has no production caller — the one project that drove its design ended up loading data with Dgraph's own bulk loader instead, and never achieved a performant load through this path. Its only dependents were a generated passthrough method and a test stub, both trivially removable. Rather than upstream ~680 LOC of core-coupled surface that isn't earning its keep, we're dropping it; upstream's existing live.go/Engine.Load ingest path is untouched and remains the supported loader.

@mlwelles mlwelles closed this Jun 4, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant