feat: configurable bulk data loader#19
Conversation
Add RetryPolicy / DefaultRetryPolicy and a runner that re-executes a function on aborted Dgraph transactions with exponential backoff (retry.go). Self-contained; a follow-up wires retry into the client via a WithRetry method.
Add a bulk loader for RDF/JSON files: - load package: BatchSize, MutationWorkers, Schema, file match/sort options. - loaddata.go (embedded/Namespace path) + loaddata_grpc.go (gRPC path). - exposed as client.LoadData(ctx, dataDir, opts...). Replaces the previous live-loader (live.go). Uses RetryPolicy from the retry layer.
There was a problem hiding this comment.
6 issues found across 9 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="loaddata_grpc.go">
<violation number="1" location="loaddata_grpc.go:47">
P2: UID allocation RPC uses context.TODO() instead of propagating the caller's context, preventing cancellation/deadline from interrupting in-flight AllocateUIDs calls.</violation>
</file>
<file name="loaddata_test.go">
<violation number="1" location="loaddata_test.go:387">
P2: gRPC integration test cleanup is not deferred, so failures can skip `DropAll` cleanup and contaminate shared test state</violation>
</file>
<file name="load/options.go">
<violation number="1" location="load/options.go:32">
P1: Typed-nil `FileMatchFunc` passed via `WithFileMatch` causes a panic because interfaces holding typed nils are non-nil in Go. The `MatchFile` nil check won't short-circuit, and `FileMatchFunc.Match` will call `f(path)` on a nil function value, resulting in a runtime panic.</violation>
</file>
<file name="loaddata.go">
<violation number="1" location="loaddata.go:293">
P0: UID pre-resolution remaps all node IDs (not only blank nodes), risking data corruption during import</violation>
</file>
<file name="retry.go">
<violation number="1" location="retry.go:36">
P2: `DefaultRetryPolicy` comment says 5 retries, but code sets `MaxRetries: 10` — documentation and implementation disagree. Both the retry count and the documented max need to match.</violation>
<violation number="2" location="retry.go:77">
P2: WithRetry can panic when MaxRetries is negative (or math.MaxInt due to overflow) because the integer range loop executes zero iterations and falls through to panic("unreachable"). Missing input validation on the caller-provided RetryPolicy creates a reachable runtime crash path.</violation>
</file>
Reply with feedback, questions, or to request a fix.
Re-trigger cubic
| return g.Wait() | ||
| } | ||
|
|
||
| func (l *liveLoader) uid(ns uint64, val string) (string, error) { |
There was a problem hiding this comment.
P0: UID pre-resolution remaps all node IDs (not only blank nodes), risking data corruption during import
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At loaddata.go, line 293:
<comment>UID pre-resolution remaps all node IDs (not only blank nodes), risking data corruption during import</comment>
<file context>
@@ -0,0 +1,323 @@
+ return g.Wait()
+}
+
+func (l *liveLoader) uid(ns uint64, val string) (string, error) {
+ key := x.NamespaceAttr(ns, val)
+
</file context>
| type FileMatchFunc func(path string) bool | ||
|
|
||
| // Match implements FileMatch. | ||
| func (f FileMatchFunc) Match(path string) bool { return f(path) } |
There was a problem hiding this comment.
P1: Typed-nil FileMatchFunc passed via WithFileMatch causes a panic because interfaces holding typed nils are non-nil in Go. The MatchFile nil check won't short-circuit, and FileMatchFunc.Match will call f(path) on a nil function value, resulting in a runtime panic.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At load/options.go, line 32:
<comment>Typed-nil `FileMatchFunc` passed via `WithFileMatch` causes a panic because interfaces holding typed nils are non-nil in Go. The `MatchFile` nil check won't short-circuit, and `FileMatchFunc.Match` will call `f(path)` on a nil function value, resulting in a runtime panic.</comment>
<file context>
@@ -0,0 +1,200 @@
+type FileMatchFunc func(path string) bool
+
+// Match implements FileMatch.
+func (f FileMatchFunc) Match(path string) bool { return f(path) }
+
+// FileSort reorders the list of data files before processing.
</file context>
| if err != nil { | ||
| return nil, fmt.Errorf("get client from pool for UID allocation: %w", err) | ||
| } | ||
| start, end, err := dc.AllocateUIDs(context.TODO(), alloc) |
There was a problem hiding this comment.
P2: UID allocation RPC uses context.TODO() instead of propagating the caller's context, preventing cancellation/deadline from interrupting in-flight AllocateUIDs calls.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At loaddata_grpc.go, line 47:
<comment>UID allocation RPC uses context.TODO() instead of propagating the caller's context, preventing cancellation/deadline from interrupting in-flight AllocateUIDs calls.</comment>
<file context>
@@ -0,0 +1,160 @@
+ if err != nil {
+ return nil, fmt.Errorf("get client from pool for UID allocation: %w", err)
+ }
+ start, end, err := dc.AllocateUIDs(context.TODO(), alloc)
+ a.pool.put(dc)
+ if err != nil {
</file context>
| require.NoError(t, err) | ||
| defer client.Close() | ||
|
|
||
| require.NoError(t, client.DropAll(ctx)) |
There was a problem hiding this comment.
P2: gRPC integration test cleanup is not deferred, so failures can skip DropAll cleanup and contaminate shared test state
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At loaddata_test.go, line 387:
<comment>gRPC integration test cleanup is not deferred, so failures can skip `DropAll` cleanup and contaminate shared test state</comment>
<file context>
@@ -0,0 +1,472 @@
+ require.NoError(t, err)
+ defer client.Close()
+
+ require.NoError(t, client.DropAll(ctx))
+
+ // Build RDF data: 100 LoadTestPerson nodes, each linked to the previous one.
</file context>
| } | ||
|
|
||
| // DefaultRetryPolicy mirrors dgraph4j's defaults: | ||
| // 5 retries, 100ms base delay, 5s max delay, 10% jitter. |
There was a problem hiding this comment.
P2: DefaultRetryPolicy comment says 5 retries, but code sets MaxRetries: 10 — documentation and implementation disagree. Both the retry count and the documented max need to match.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At retry.go, line 36:
<comment>`DefaultRetryPolicy` comment says 5 retries, but code sets `MaxRetries: 10` — documentation and implementation disagree. Both the retry count and the documented max need to match.</comment>
<file context>
@@ -0,0 +1,96 @@
+}
+
+// DefaultRetryPolicy mirrors dgraph4j's defaults:
+// 5 retries, 100ms base delay, 5s max delay, 10% jitter.
+var DefaultRetryPolicy = RetryPolicy{
+ MaxRetries: 10,
</file context>
| // 5 retries, 100ms base delay, 5s max delay, 10% jitter. | |
| // DefaultRetryPolicy mirrors dgraph4j's defaults: | |
| // 10 retries, 100ms base delay, 5s max delay, 10% jitter. |
| // return client.Insert(ctx, &entity) | ||
| // }) | ||
| func (c client) WithRetry(ctx context.Context, policy RetryPolicy, fn func() error) error { | ||
| for attempt := range policy.MaxRetries + 1 { |
There was a problem hiding this comment.
P2: WithRetry can panic when MaxRetries is negative (or math.MaxInt due to overflow) because the integer range loop executes zero iterations and falls through to panic("unreachable"). Missing input validation on the caller-provided RetryPolicy creates a reachable runtime crash path.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At retry.go, line 77:
<comment>WithRetry can panic when MaxRetries is negative (or math.MaxInt due to overflow) because the integer range loop executes zero iterations and falls through to panic("unreachable"). Missing input validation on the caller-provided RetryPolicy creates a reachable runtime crash path.</comment>
<file context>
@@ -0,0 +1,96 @@
+// return client.Insert(ctx, &entity)
+// })
+func (c client) WithRetry(ctx context.Context, policy RetryPolicy, fn func() error) error {
+ for attempt := range policy.MaxRetries + 1 {
+ err := fn()
+ if err == nil {
</file context>
|
Closing this loader PR. After review we found the bulk loader has no production caller — the one project that drove its design ended up loading data with Dgraph's own bulk loader instead, and never achieved a performant load through this path. Its only dependents were a generated passthrough method and a test stub, both trivially removable. Rather than upstream ~680 LOC of core-coupled surface that isn't earning its keep, we're dropping it; upstream's existing |
Adds a configurable bulk loader for RDF/JSON files, replacing the previous live-loader:
loadpackage —BatchSize,MutationWorkers,Schema, and file match/sort options.loaddata.go(embedded /Namespacepath) andloaddata_grpc.go(gRPC path).Client.LoadData(ctx, dataDir, opts...)method; removeslive.go.Builds and
go tests green. Stacks on #18 (usesRetryPolicy), so the diff includes that PR's commit.Part of a series upstreaming work from a downstream fork; opened for review/discussion.
Summary by cubic
Adds a configurable bulk loader for RDF/JSON, replacing the live-loader and exposing a new
Client.LoadData(ctx, dataDir, opts...). It supports schema application, file filtering/sorting, batching, concurrent workers, and gRPC retry handling.New Features
loadpackage with options:WithBatchSize,WithMutationWorkers,WithSchema,WithFileMatch,WithFileSort.Client.LoadDataloads.rdf/.json(including.gz) from a directory; works with embedded and gRPC clients.RetryPolicy.Migration
Client.LoadData(ctx, dataDir, opts...); defaults match previous behavior (batch size 1000, 1 worker).Written for commit 971559c. Summary will update on new commits.