diff --git a/binlog_streamer.go b/binlog_streamer.go index e74d070a..c1f8dbba 100644 --- a/binlog_streamer.go +++ b/binlog_streamer.go @@ -31,6 +31,11 @@ type BinlogStreamer struct { ErrorHandler ErrorHandler Filter CopyFilter + // SchemaChangeDetector, when set, intercepts row events for tables in + // transition (skipping them before NewBinlogDMLEvents would crash on a + // column-count mismatch) and receives DDL QueryEvents. + SchemaChangeDetector *SchemaChangeDetector + TableSchema TableSchemaCache LogTag string @@ -423,6 +428,17 @@ func (s *BinlogStreamer) handleRowsEvent(ev *replication.BinlogEvent, query []by return nil } + // Drop row events for tables mid-schema-change before parsing. Parsing + // would call into NewBinlogDMLEvents which panics when the row column + // count diverges from the cached schema — exactly what happens during a + // transition. + if s.SchemaChangeDetector != nil && s.SchemaChangeDetector.IsInTransition(db, table) { + metrics.Count("BinlogStreamer.SkippedRowsEventForDDL", 1, []MetricTag{ + {"table", table}, + }, 1.0) + return nil + } + dmlEvs, err := NewBinlogDMLEvents(tableFromSchemaCache, ev, pos, s.lastResumableBinlogPosition, query) if err != nil { return err diff --git a/config.go b/config.go index d696b79a..afff0660 100644 --- a/config.go +++ b/config.go @@ -808,6 +808,32 @@ type Config struct { // Optional: defaults to "info" LogLevel string + // AutomaticDDLHandling, when true, makes Ghostferry tolerate schema + // changes on either source or target during an active migration. The + // schema-change detector watches DDL on both sides; when source and + // target converge on a new schema for a migrated table, target rows for + // that table are wiped (scoped via CopyFilter.BuildDelete or TRUNCATE) + // and the table is re-iterated from source's current state. Other + // tables continue uninterrupted. Defaults to false (preserves current + // crash-on-DDL behavior). + AutomaticDDLHandling bool + + // SchemaChangeTransitionTimeout bounds how long a single table may sit + // in transition (waiting for source/target to converge) before + // Ghostferry aborts. Only used when AutomaticDDLHandling is true. + // + // Optional: defaults to 24h. + SchemaChangeTransitionTimeout time.Duration + + // DDLTraceFile, when non-empty, makes the schema-change detector append + // one structured line per decision (OnSourceDDL, affectedMigratedTables, + // state transition, checkConvergence, recopy steps, RequeueTable start, + // iterateTable completion, OnTableIterationComplete). Intended for + // diagnosing why a recopy did or did not fire — the file stays small + // (one short line per decision) so it can be cat'd from a rails console + // after the move. Only used when AutomaticDDLHandling is true. + DDLTraceFile string + // ---------------------------------------------------------------------------------------------------------------- // Updatable config // The following configs are updatable via the `Config.Update` method and should be passed by pointer diff --git a/data_iterator.go b/data_iterator.go index 0586fd8a..a63c5aa2 100644 --- a/data_iterator.go +++ b/data_iterator.go @@ -1,12 +1,36 @@ package ghostferry import ( + "errors" "fmt" "sync" sql "github.com/Shopify/ghostferry/sqlwrapper" ) +// TransitionChecker reports whether a table's iteration should be aborted +// because the schema-change detector has marked it as in-transition or +// recopying. Decoupled as an interface so DataIterator stays unit-testable +// and to avoid an import cycle with the detector. +type TransitionChecker interface { + IsInTransition(schemaName, tableName string) bool +} + +// DDLTracer (optional) lets DataIterator emit lines into the schema-change +// detector's trace file at moments only DataIterator can observe — recopy +// goroutine spawn, recopy iteration completion. Decoupled as an interface to +// keep data_iterator.go free of the detector type. +type DDLTracer interface { + Trace(format string, args ...interface{}) +} + +// errSchemaDriftDetected is the sentinel returned from cursor.Each when we +// notice column drift between the cached schema and the live SELECT result, +// or when the table has entered transition mid-iteration. The data iterator +// catches it, marks the table complete, and lets the schema-change detector +// drive the eventual recopy. +var errSchemaDriftDetected = errors.New("schema drift detected mid-iteration; aborting cleanly for recopy") + type DataIterator struct { DB *sql.DB Concurrency int @@ -17,10 +41,29 @@ type DataIterator struct { StateTracker *StateTracker TableSorter DataIteratorSorter - TargetPaginationKeys *sync.Map - batchListeners []func(*RowBatch) error - doneListeners []func() error - logger Logger + // TransitionChecker (optional) lets the iterator abort cleanly when a + // table enters StateInTransition or StateRecopying mid-copy. Nil disables + // the check (legacy behavior). + TransitionChecker TransitionChecker + + // Tracer (optional) emits DDL-trace lines for events the detector cannot + // see — recopy goroutine spawn and iteration completion. + Tracer DDLTracer + + TargetPaginationKeys *sync.Map + batchListeners []func(*RowBatch) error + doneListeners []func() error + tableCompletionListeners []func(*TableSchema) + logger Logger + + // iterations tracks per-table in-flight iterateTable goroutines. The + // schema-change detector waits on the channel for a given table before + // running its bulk DELETE, so concurrent INSERTs from an abandoned worker + // can't survive the delete. The channel is closed when the goroutine + // returns (any reason, including errSchemaDriftDetected) and the entry + // is removed from the map. + iterationsMu sync.Mutex + iterations map[string]chan struct{} } type TableMaxPaginationKey struct { @@ -72,94 +115,7 @@ func (d *DataIterator) Run(tables []*TableSchema) { if !ok { break } - - logger := d.logger.WithField("table", table.String()) - - targetPaginationKeyInterface, found := d.TargetPaginationKeys.Load(table.String()) - if !found { - err := fmt.Errorf("%s not found in TargetPaginationKeys, this is likely a programmer error", table.String()) - logger.WithError(err).Error("this is definitely a bug") - d.ErrorHandler.Fatal("data_iterator", err) - return - } - - startPaginationKey := d.StateTracker.LastSuccessfulPaginationKey(table.String(), table) - if startPaginationKey.IsMax() { - err := fmt.Errorf("%v has been marked as completed but a table iterator has been spawned, this is likely a programmer error which resulted in the inconsistent starting state", table.String()) - logger.WithError(err).Error("this is definitely a bug") - d.ErrorHandler.Fatal("data_iterator", err) - return - } - - cursor := d.CursorConfig.NewCursor(table, startPaginationKey, targetPaginationKeyInterface.(PaginationKey)) - if d.SelectFingerprint { - if len(cursor.ColumnsToSelect) == 0 { - cursor.ColumnsToSelect = []string{"*"} - } - - cursor.ColumnsToSelect = append(cursor.ColumnsToSelect, table.RowMd5Query()) - } - - err := cursor.Each(func(batch *RowBatch) error { - metrics.Count("RowEvent", int64(batch.Size()), []MetricTag{ - MetricTag{"table", table.Name}, - MetricTag{"source", "table"}, - }, 1.0) - - if d.SelectFingerprint { - fingerprints := make(map[string][]byte) - rows := make([]RowData, batch.Size()) - paginationColumn := table.GetPaginationColumn() - - for i, rowData := range batch.Values() { - paginationKey, err := NewPaginationKeyFromRow(rowData, batch.PaginationKeyIndex(), paginationColumn) - if err != nil { - logger.WithError(err).Error("failed to get paginationKey data") - return err - } - - fingerprints[paginationKey.String()] = rowData[len(rowData)-1].([]byte) - rows[i] = rowData[:len(rowData)-1] - } - - batch = &RowBatch{ - values: rows, - paginationKeyIndex: batch.PaginationKeyIndex(), - table: table, - fingerprints: fingerprints, - columns: batch.columns[:len(batch.columns)-1], - } - } - - for _, listener := range d.batchListeners { - err := listener(batch) - if err != nil { - logger.WithError(err).Error("failed to process row batch with listeners") - return err - } - } - - return nil - }) - - if err != nil { - switch e := err.(type) { - case BatchWriterVerificationFailed: - logger.WithField("incorrect_tables", e.table).Error(e.Error()) - d.ErrorHandler.Fatal("inline_verifier", err) - default: - logger.WithError(err).Error("failed to iterate table") - d.ErrorHandler.Fatal("data_iterator", err) - } - - } - - logger.Debug("table iteration completed") - - // Right now the BatchWriter.WriteRowBatch happens synchronously in - // this method. If it ever becomes async, this MarkTableAsCompleted - // call MUST be done in WriteRowBatch somehow. - d.StateTracker.MarkTableAsCompleted(table.String()) + d.iterateTable(table) } }() } @@ -196,6 +152,215 @@ func (d *DataIterator) Run(tables []*TableSchema) { } } +func (d *DataIterator) iterateTable(table *TableSchema) { + logger := d.logger.WithField("table", table.String()) + + // Register an iteration-in-flight channel so the detector can block on it + // before running a bulk DELETE. Done is closed when the function returns + // for any reason — including the errSchemaDriftDetected abort path — so + // the detector unblocks promptly even on aborted iterations. + d.iterationsMu.Lock() + if d.iterations == nil { + d.iterations = map[string]chan struct{}{} + } + done := make(chan struct{}) + d.iterations[table.String()] = done + d.iterationsMu.Unlock() + defer func() { + d.iterationsMu.Lock() + if cur, ok := d.iterations[table.String()]; ok && cur == done { + delete(d.iterations, table.String()) + } + d.iterationsMu.Unlock() + close(done) + }() + + targetPaginationKeyInterface, found := d.TargetPaginationKeys.Load(table.String()) + if !found { + err := fmt.Errorf("%s not found in TargetPaginationKeys, this is likely a programmer error", table.String()) + logger.WithError(err).Error("this is definitely a bug") + d.ErrorHandler.Fatal("data_iterator", err) + return + } + + startPaginationKey := d.StateTracker.LastSuccessfulPaginationKey(table.String(), table) + if startPaginationKey.IsMax() { + err := fmt.Errorf("%v has been marked as completed but a table iterator has been spawned, this is likely a programmer error which resulted in the inconsistent starting state", table.String()) + logger.WithError(err).Error("this is definitely a bug") + d.ErrorHandler.Fatal("data_iterator", err) + return + } + + cursor := d.CursorConfig.NewCursor(table, startPaginationKey, targetPaginationKeyInterface.(PaginationKey)) + if d.SelectFingerprint { + if len(cursor.ColumnsToSelect) == 0 { + cursor.ColumnsToSelect = []string{"*"} + } + + cursor.ColumnsToSelect = append(cursor.ColumnsToSelect, table.RowMd5Query()) + } + + err := cursor.Each(func(batch *RowBatch) error { + metrics.Count("RowEvent", int64(batch.Size()), []MetricTag{ + MetricTag{"table", table.Name}, + MetricTag{"source", "table"}, + }, 1.0) + + if d.SelectFingerprint { + fingerprints := make(map[string][]byte) + rows := make([]RowData, batch.Size()) + paginationColumn := table.GetPaginationColumn() + + for i, rowData := range batch.Values() { + paginationKey, err := NewPaginationKeyFromRow(rowData, batch.PaginationKeyIndex(), paginationColumn) + if err != nil { + logger.WithError(err).Error("failed to get paginationKey data") + return err + } + + fingerprints[paginationKey.String()] = rowData[len(rowData)-1].([]byte) + rows[i] = rowData[:len(rowData)-1] + } + + batch = &RowBatch{ + values: rows, + paginationKeyIndex: batch.PaginationKeyIndex(), + table: table, + fingerprints: fingerprints, + columns: batch.columns[:len(batch.columns)-1], + } + } + + // Abort iteration cleanly if the schema-change detector has flagged + // this table mid-copy, or if the live source columns now drift from + // the cached schema (covers the race window before the DDL binlog + // event reaches the detector). Either way the recopy mechanism will + // re-iterate from a refreshed schema once source/target converge. + if d.TransitionChecker != nil && d.TransitionChecker.IsInTransition(table.Schema, table.Name) { + logger.Info("aborting iteration: table entered transition mid-copy") + return errSchemaDriftDetected + } + if columnsDriftFromCache(batch.columns, table) { + logger.WithFields(Fields{ + "liveColumns": batch.columns, + "cachedColumns": ConvertTableColumnsToStrings(table.Columns), + }).Info("aborting iteration: source column list drifted from cached schema") + return errSchemaDriftDetected + } + + for _, listener := range d.batchListeners { + err := listener(batch) + if err != nil { + logger.WithError(err).Error("failed to process row batch with listeners") + return err + } + } + + return nil + }) + + if err != nil { + if errors.Is(err, errSchemaDriftDetected) { + // Drop the partial cursor so the table is visibly back in the + // "not started" state — neither completed nor mid-copy. The + // detector's recopy() also calls ResetTable before re-iteration + // (via RequeueTable), but doing it here makes ferry's externally + // observable state accurate from the instant the drift is seen. + // + // Do NOT mark complete and do NOT fire completion listeners. + // The detector waits on AwaitIterationDrained (closed by the + // deferred channel close above) to know when this goroutine + // has exited and it is safe to DELETE. + d.StateTracker.ResetTable(table.String()) + logger.Info("iteration aborted for schema drift; table reset to pending, awaiting detector recopy") + return + } + switch e := err.(type) { + case BatchWriterVerificationFailed: + logger.WithField("incorrect_tables", e.table).Error(e.Error()) + d.ErrorHandler.Fatal("inline_verifier", err) + default: + logger.WithError(err).Error("failed to iterate table") + d.ErrorHandler.Fatal("data_iterator", err) + } + } + + logger.Debug("table iteration completed") + if d.Tracer != nil { + d.Tracer.Trace("iterateTable_completed table=%s", table.String()) + } + + // Right now the BatchWriter.WriteRowBatch happens synchronously in + // this method. If it ever becomes async, this MarkTableAsCompleted + // call MUST be done in WriteRowBatch somehow. + d.StateTracker.MarkTableAsCompleted(table.String()) + + for _, listener := range d.tableCompletionListeners { + listener(table) + } +} + +// AwaitIterationDrained blocks until any in-flight iterateTable goroutine +// for the given table name has returned. Returns immediately if no +// iteration is currently registered for the table. Used by the schema-change +// detector before issuing a bulk DELETE so concurrent INSERTs from an +// abandoned worker cannot survive the delete. +func (d *DataIterator) AwaitIterationDrained(tableName string) { + d.iterationsMu.Lock() + done, ok := d.iterations[tableName] + d.iterationsMu.Unlock() + if !ok { + return + } + <-done +} + +// RequeueTable resets state and spawns a fresh single-table iteration. Used +// by automatic DDL handling: after the schema-change detector clears the +// target rows for a table, it calls RequeueTable to repopulate from source's +// current state. Safe to call from a goroutine other than DataIterator.Run's +// worker pool. +// +// Caller is responsible for ensuring no in-flight worker is still copying +// this table. The detector achieves that by waiting for IsTableComplete to +// return true before deleting target rows. +func (d *DataIterator) RequeueTable(table *TableSchema) error { + if d.logger == nil { + d.logger = LogWithField("tag", "data_iterator") + } + logger := d.logger.WithField("table", table.String()) + + d.StateTracker.ResetTable(table.String()) + + tablesWithData, emptyTables, err := MaxPaginationKeys(d.DB, []*TableSchema{table}, d.logger) + if err != nil { + return fmt.Errorf("requeue: read max pagination key for %s: %w", table.String(), err) + } + + if len(emptyTables) > 0 { + // Source has no rows for this table — mark complete and notify. + d.StateTracker.MarkTableAsCompleted(table.String()) + for _, listener := range d.tableCompletionListeners { + listener(table) + } + logger.Info("requeue: source table empty; marked complete without iteration") + return nil + } + + maxKey, ok := tablesWithData[table] + if !ok { + return fmt.Errorf("requeue: %s missing from MaxPaginationKeys result", table.String()) + } + d.TargetPaginationKeys.Store(table.String(), maxKey) + + logger.Info("requeue: spawning fresh iteration goroutine for table") + if d.Tracer != nil { + d.Tracer.Trace("RequeueTable goroutine_start table=%s max_pk=%s", table.String(), maxKey.String()) + } + go d.iterateTable(table) + return nil +} + func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error) { d.batchListeners = append(d.batchListeners, listener) } @@ -203,3 +368,28 @@ func (d *DataIterator) AddBatchListener(listener func(*RowBatch) error) { func (d *DataIterator) AddDoneListener(listener func() error) { d.doneListeners = append(d.doneListeners, listener) } + +// AddTableCompletionListener registers a callback fired whenever a single +// table finishes iteration (initial copy or requeue). +func (d *DataIterator) AddTableCompletionListener(listener func(*TableSchema)) { + d.tableCompletionListeners = append(d.tableCompletionListeners, listener) +} + +// columnsDriftFromCache returns true when the live SELECT columns no longer +// match the cached TableSchema columns by name and order. The cursor's batch +// columns come straight from rows.Columns(), so this catches DDL on source +// before the QUERY_EVENT for that DDL has been processed. +func columnsDriftFromCache(liveColumns []string, cached *TableSchema) bool { + if cached == nil || cached.Table == nil { + return false + } + if len(liveColumns) != len(cached.Columns) { + return true + } + for i, name := range liveColumns { + if name != cached.Columns[i].Name { + return true + } + } + return false +} diff --git a/ddl_events.go b/ddl_events.go new file mode 100644 index 00000000..128ba494 --- /dev/null +++ b/ddl_events.go @@ -0,0 +1,346 @@ +package ghostferry + +import ( + "regexp" + "strings" + "sync" + + "github.com/go-mysql-org/go-mysql/replication" + "github.com/pingcap/tidb/pkg/parser" + "github.com/pingcap/tidb/pkg/parser/ast" + + // test_driver registers the value-expression types the TiDB parser needs. + // Despite the name, it is the standard driver every non-TiDB consumer of + // the parser uses (sqlc, soar, bytebase, etc.) — the production driver + // lives in pkg/types and pulls in the rest of TiDB. + _ "github.com/pingcap/tidb/pkg/parser/test_driver" +) + +// DDLStatement represents a parsed DDL statement extracted from a binlog +// QueryEvent. Only the fields needed to drive schema-change reactions are +// populated — we use the TiDB parser to extract them and discard the rest of +// the AST. +type DDLStatement struct { + // SchemaName is the database the statement targets. Falls back to the + // QueryEvent's default schema when the statement does not qualify the table. + SchemaName string + // TableName is the primary table the statement targets. For RENAME with + // multiple pairs the parser populates RenamePairs and leaves this empty. + TableName string + // RawQuery is the original SQL text, useful for logging. + RawQuery string + // DDLType is one of "ALTER", "RENAME", "CREATE", "DROP", "TRUNCATE". + DDLType string + + // RenamePairs is populated for RENAME TABLE statements. A single statement + // can rename multiple tables (e.g. the gh-ost cutover swap): + // RENAME TABLE foo TO _foo_del, _foo_gho TO foo + RenamePairs []RenamePair +} + +// TableRef identifies a single table in a parsed DDL statement. +type TableRef struct { + SchemaName string + TableName string +} + +// RenamePair captures one rename clause (FROM → TO) within a RENAME TABLE +// statement. +type RenamePair struct { + From TableRef + To TableRef +} + +// SchemaImpact classifies how a schema change affects an in-progress copy. +type SchemaImpact int + +const ( + // ImpactMetadataOnly means the cached schema can be refreshed in place and + // no row recopy is required. Index changes, default value changes, comment + // changes, and column reorders fall into this bucket. + ImpactMetadataOnly SchemaImpact = iota + // ImpactRequiresRecopy means rows already on the target are no longer + // guaranteed to match source state and the table must be re-copied. + ImpactRequiresRecopy +) + +// leadingNoiseRe strips MySQL hint comments and SQL-style comments so the +// keyword sniff in IsDDLQuery can run without a full parse. +var leadingNoiseRe = regexp.MustCompile(`(?s)^(\s|/\*.*?\*/|--[^\n]*\n|#[^\n]*\n)+`) + +// alterLegacyModifierRe strips MariaDB/legacy MySQL keywords (ONLINE, OFFLINE, +// IGNORE) that may appear between ALTER and TABLE. The TiDB parser does not +// accept them — real MySQL 5.7+ does not emit them in binlogs either, but +// older fixtures still carry the syntax so we normalize before parsing. +var alterLegacyModifierRe = regexp.MustCompile(`(?i)^ALTER(\s+(?:ONLINE|OFFLINE|IGNORE))+\s+TABLE\b`) + +// Shadow-table suffixes — these are conventions, not SQL, so a regex is the +// right tool. gh-ost uses __gho/_del/_ghc, pt-osc uses _new/_old. +var ( + ghostShadowRe = regexp.MustCompile(`^_(.+)_(gho|del|ghc)$`) + ptOscShadowRe = regexp.MustCompile(`^_(.+)_(new|old)$`) +) + +// parserPool reuses TiDB parser instances. Per the TiDB docs, a parser is +// not goroutine-safe and not lightweight to construct, so a sync.Pool keeps +// allocations bounded under the binlog event rate. +var parserPool = sync.Pool{ + New: func() interface{} { return parser.New() }, +} + +// IsDDLQuery returns true when the QueryEvent payload is a DDL statement we +// might react to. Transactional control statements (BEGIN, COMMIT, SAVEPOINT, +// ROLLBACK) and other non-DDL queries return false so callers can drop them +// without invoking the full parser. +func IsDDLQuery(query []byte) bool { + keyword := firstKeyword(query) + if keyword == "" { + return false + } + switch strings.ToUpper(keyword) { + case "ALTER", "RENAME", "CREATE", "DROP", "TRUNCATE": + return true + } + return false +} + +// ParseDDLFromQueryEvent extracts a DDLStatement from a binlog QueryEvent. +// Returns nil for non-DDL statements (BEGIN, COMMIT, etc.) and for DDL the +// parser doesn't recognize or fails to parse. +func ParseDDLFromQueryEvent(ev *replication.QueryEvent) *DDLStatement { + if ev == nil || len(ev.Query) == 0 { + return nil + } + + rawQuery := string(ev.Query) + defaultSchema := string(ev.Schema) + + // Cheap keyword sniff first — skip the full parse for BEGIN/COMMIT/etc. + // The binlog stream is mostly transactional control statements; running + // the parser on every one would dominate CPU. + if !IsDDLQuery(ev.Query) { + return nil + } + + p := parserPool.Get().(*parser.Parser) + defer parserPool.Put(p) + + stmtNode, err := p.ParseOneStmt(normalizeForParser(rawQuery), "", "") + if err != nil { + // Statements the parser can't handle (vendor-specific syntax, etc.) + // are dropped silently. Returning nil is consistent with the previous + // behavior of returning nil for unrecognized DDL — callers are expected + // to fall back to a conservative recopy if they care. + return nil + } + + switch s := stmtNode.(type) { + case *ast.AlterTableStmt: + schema, table := tableNameOrDefault(s.Table, defaultSchema) + return &DDLStatement{ + RawQuery: rawQuery, + DDLType: "ALTER", + SchemaName: schema, + TableName: table, + } + case *ast.CreateTableStmt: + schema, table := tableNameOrDefault(s.Table, defaultSchema) + return &DDLStatement{ + RawQuery: rawQuery, + DDLType: "CREATE", + SchemaName: schema, + TableName: table, + } + case *ast.DropTableStmt: + // DROP TABLE supports a comma-separated list. The detector only + // inspects the primary table, so we surface the first; callers that + // need every dropped table can extend AllAffectedTables. + if len(s.Tables) == 0 { + return nil + } + schema, table := tableNameOrDefault(s.Tables[0], defaultSchema) + return &DDLStatement{ + RawQuery: rawQuery, + DDLType: "DROP", + SchemaName: schema, + TableName: table, + } + case *ast.TruncateTableStmt: + schema, table := tableNameOrDefault(s.Table, defaultSchema) + return &DDLStatement{ + RawQuery: rawQuery, + DDLType: "TRUNCATE", + SchemaName: schema, + TableName: table, + } + case *ast.RenameTableStmt: + if len(s.TableToTables) == 0 { + return nil + } + pairs := make([]RenamePair, 0, len(s.TableToTables)) + for _, t2t := range s.TableToTables { + fromSchema, fromTable := tableNameOrDefault(t2t.OldTable, defaultSchema) + toSchema, toTable := tableNameOrDefault(t2t.NewTable, defaultSchema) + pairs = append(pairs, RenamePair{ + From: TableRef{SchemaName: fromSchema, TableName: fromTable}, + To: TableRef{SchemaName: toSchema, TableName: toTable}, + }) + } + stmt := &DDLStatement{ + RawQuery: rawQuery, + DDLType: "RENAME", + RenamePairs: pairs, + } + if len(pairs) == 1 { + stmt.SchemaName = pairs[0].From.SchemaName + stmt.TableName = pairs[0].From.TableName + } + return stmt + } + return nil +} + +// AllAffectedTables returns every table referenced by the statement. For +// non-RENAME DDL this is a single-element slice; for RENAME it includes both +// sides of every pair so callers can react regardless of which side names a +// migrated table. +func (s *DDLStatement) AllAffectedTables() []TableRef { + if s == nil { + return nil + } + if len(s.RenamePairs) > 0 { + out := make([]TableRef, 0, 2*len(s.RenamePairs)) + for _, p := range s.RenamePairs { + out = append(out, p.From, p.To) + } + return out + } + if s.TableName == "" { + return nil + } + return []TableRef{{SchemaName: s.SchemaName, TableName: s.TableName}} +} + +// IsShadowTable reports whether the given table name matches a known +// online-schema-change shadow pattern (gh-ost or pt-osc). +func IsShadowTable(tableName string) bool { + return ghostShadowRe.MatchString(tableName) || ptOscShadowRe.MatchString(tableName) +} + +// OriginalTableFromShadow returns the original (unmangled) table name for a +// gh-ost or pt-osc shadow table, or "" if the name is not a recognised shadow +// pattern. +func OriginalTableFromShadow(tableName string) string { + if m := ghostShadowRe.FindStringSubmatch(tableName); m != nil { + return m[1] + } + if m := ptOscShadowRe.FindStringSubmatch(tableName); m != nil { + return m[1] + } + return "" +} + +// ClassifyImpact compares two schemas for the same logical table and decides +// whether a change between them needs a row recopy. +// +// Conservative by design: anything we can't positively identify as +// metadata-only is treated as ImpactRequiresRecopy. We can only inspect what +// go-mysql's schema package exposes (column names, raw types, index list, PK +// columns), so changes outside that set — for example NOT NULL ↔ NULL — fall +// through to recopy. +func ClassifyImpact(oldSchema, newSchema *TableSchema) SchemaImpact { + if oldSchema == nil || newSchema == nil || oldSchema.Table == nil || newSchema.Table == nil { + return ImpactRequiresRecopy + } + + oldCols := oldSchema.Columns + newCols := newSchema.Columns + + if len(oldCols) != len(newCols) { + return ImpactRequiresRecopy + } + + newByName := make(map[string]int, len(newCols)) + for i, c := range newCols { + newByName[c.Name] = i + } + + for _, oc := range oldCols { + ni, ok := newByName[oc.Name] + if !ok { + return ImpactRequiresRecopy + } + nc := newCols[ni] + if !columnTypesEquivalent(oc.RawType, nc.RawType) { + return ImpactRequiresRecopy + } + if oc.IsUnsigned != nc.IsUnsigned { + return ImpactRequiresRecopy + } + } + + if oldSchema.PaginationKeyIndex != newSchema.PaginationKeyIndex { + return ImpactRequiresRecopy + } + if oldSchema.PaginationKeyColumn != nil && newSchema.PaginationKeyColumn != nil && + oldSchema.PaginationKeyColumn.Name != newSchema.PaginationKeyColumn.Name { + return ImpactRequiresRecopy + } + + return ImpactMetadataOnly +} + +// normalizeForParser rewrites legacy ALTER syntax the TiDB parser rejects but +// older binlog fixtures may still contain. +func normalizeForParser(sql string) string { + return alterLegacyModifierRe.ReplaceAllString(sql, "ALTER TABLE") +} + +// tableNameOrDefault extracts (schema, table) from a parsed *ast.TableName, +// falling back to the QueryEvent's default schema when the SQL did not +// qualify the table. +func tableNameOrDefault(t *ast.TableName, defaultSchema string) (string, string) { + if t == nil { + return defaultSchema, "" + } + schema := t.Schema.O + if schema == "" { + schema = defaultSchema + } + return schema, t.Name.O +} + +func stripLeadingNoise(q []byte) []byte { + for { + loc := leadingNoiseRe.FindIndex(q) + if loc == nil || loc[0] != 0 { + return q + } + q = q[loc[1]:] + } +} + +func firstKeyword(q []byte) string { + stripped := stripLeadingNoise(q) + end := 0 + for end < len(stripped) { + c := stripped[end] + if (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') { + end++ + continue + } + break + } + if end == 0 { + return "" + } + return string(stripped[:end]) +} + +// columnTypesEquivalent compares two RawType strings ignoring trivial +// whitespace and case differences. We don't try to decide whether e.g. +// VARCHAR(64) → VARCHAR(128) is safe — it isn't necessarily, since the binlog +// row payloads pre- and post-ALTER use different byte widths. +func columnTypesEquivalent(a, b string) bool { + return strings.EqualFold(strings.TrimSpace(a), strings.TrimSpace(b)) +} diff --git a/ferry.go b/ferry.go index 9666576c..b12c8c49 100644 --- a/ferry.go +++ b/ferry.go @@ -20,6 +20,7 @@ import ( _ "net/http/pprof" siddontangmysql "github.com/go-mysql-org/go-mysql/mysql" + "github.com/go-mysql-org/go-mysql/replication" "github.com/go-sql-driver/mysql" ) @@ -65,6 +66,13 @@ type Ferry struct { DataIterator *DataIterator BatchWriter *BatchWriter + // SchemaChangeDetector is non-nil when Config.AutomaticDDLHandling is set. + // It owns the state machine that lets the ferry tolerate DDL on either + // source or target during an active migration. + SchemaChangeDetector *SchemaChangeDetector + TargetBinlogTail *TargetBinlogTail + ddlTraceFile *os.File + StateTracker *StateTracker ErrorHandler ErrorHandler Throttler Throttler @@ -116,6 +124,13 @@ func (f *Ferry) NewDataIterator() *DataIterator { dataIterator.CursorConfig.BuildSelect = f.CopyFilter.BuildSelect } + // Propagate the schema-change detector so standalone data copies (delta + // joined-table copy, primary-key table copy at cutover) also abort + // cleanly on column drift instead of crashing with "Unknown column ...". + if f.SchemaChangeDetector != nil { + dataIterator.TransitionChecker = f.SchemaChangeDetector + } + return dataIterator } @@ -559,6 +574,54 @@ func (f *Ferry) Initialize() (err error) { } } + if f.Config.AutomaticDDLHandling { + transitionTimeout := f.Config.SchemaChangeTransitionTimeout + if transitionTimeout == 0 { + transitionTimeout = 24 * time.Hour + } + + detector := NewSchemaChangeDetector(f.SourceDB, f.TargetDB, f.Tables) + detector.CopyFilter = f.Config.CopyFilter + detector.RecopyTrigger = f.DataIterator + detector.Verifier = f.inlineVerifier + detector.Throttler = f.Throttler + detector.StateTracker = f.StateTracker + detector.ErrorHandler = f.ErrorHandler + detector.TransitionTimeout = transitionTimeout + detector.CascadingPaginationColumnConfig = f.Config.CascadingPaginationColumnConfig + detector.DatabaseRewrites = f.Config.DatabaseRewrites + detector.TableRewrites = f.Config.TableRewrites + detector.Init() + + if err := detector.PreflightCheck(f.Tables.AsSlice()); err != nil { + return err + } + + if f.Config.DDLTraceFile != "" { + tf, err := os.OpenFile(f.Config.DDLTraceFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644) + if err != nil { + return fmt.Errorf("open DDLTraceFile %s: %w", f.Config.DDLTraceFile, err) + } + detector.TraceWriter = tf + f.ddlTraceFile = tf + f.logger.WithField("path", f.Config.DDLTraceFile).Info("DDL trace file opened") + } + + f.SchemaChangeDetector = detector + f.BinlogStreamer.SchemaChangeDetector = detector + f.DataIterator.TransitionChecker = detector + f.DataIterator.Tracer = detector + if f.inlineVerifier != nil { + f.inlineVerifier.QuiescenceChecker = detector + } + f.TargetBinlogTail = &TargetBinlogTail{ + DB: f.TargetDB, + DBConfig: f.Config.Target, + ErrorHandler: f.ErrorHandler, + OnDDL: detector.OnTargetDDL, + } + } + f.logger.Info("ferry initialized") return nil } @@ -583,6 +646,36 @@ func (f *Ferry) Start() error { f.BinlogStreamer.AddEventListener(f.inlineVerifier.binlogEventListener) } + if f.SchemaChangeDetector != nil { + // Hand source-side DDL QueryEvents to the detector. The handler is + // invoked from the binlog streamer's read loop so it must remain + // non-blocking — handleDDL kicks the convergence check onto its own + // goroutine. + err := f.BinlogStreamer.AddBinlogEventHandler(replication.QUERY_EVENT, + func(ev *replication.BinlogEvent, query []byte, es *BinlogEventState) ([]byte, error) { + qe, ok := ev.Event.(*replication.QueryEvent) + if !ok { + return query, nil + } + if !IsDDLQuery(qe.Query) { + return query, nil + } + stmt := ParseDDLFromQueryEvent(qe) + if stmt == nil { + return query, nil + } + f.SchemaChangeDetector.OnSourceDDL(stmt) + return query, nil + }) + if err != nil { + return err + } + + // Bridge DataIterator table completions into the detector so it can + // flip Recopying → Normal once a re-iteration finishes. + f.DataIterator.AddTableCompletionListener(f.SchemaChangeDetector.OnTableIterationComplete) + } + // The starting binlog coordinates must be determined first. If it is // determined after the DataIterator starts, the DataIterator might // miss some records that are inserted between the time the @@ -633,6 +726,12 @@ func (f *Ferry) Run() { f.logger.Info("starting ferry run") f.OverallState.Store(StateCopying) + defer func() { + if f.ddlTraceFile != nil { + _ = f.ddlTraceFile.Close() + } + }() + if f.Config.EnablePProf { go func() { err := http.ListenAndServe("localhost:6060", nil) @@ -658,6 +757,22 @@ func (f *Ferry) Run() { handleError("throttler", f.Throttler.Run(ctx)) }() + if f.SchemaChangeDetector != nil { + supportingServicesWg.Add(1) + go func() { + defer supportingServicesWg.Done() + f.SchemaChangeDetector.RunTimeoutChecker(ctx) + }() + + if f.TargetBinlogTail != nil { + supportingServicesWg.Add(1) + go func() { + defer supportingServicesWg.Done() + handleError("target_binlog_tail", f.TargetBinlogTail.Run(ctx)) + }() + } + } + if f.Config.ControlServerConfig.Enabled { go f.ControlServer.Run() } @@ -768,6 +883,19 @@ func (f *Ferry) Run() { f.OverallState.Store(StateWaitingForCutover) f.waitUntilAutomaticCutoverIsTrue() + // Block before any cutover work runs until every table is back in + // StateNormal. If a schema-change recopy is mid-flight (DELETE on target + // done but re-iteration not yet complete), running VerifyBeforeCutover + // against it would surface a checksum mismatch the recopy is about to + // fix. AwaitAllNormal honors TransitionTimeout so a stuck transition + // surfaces here rather than hanging cutover indefinitely. + if f.SchemaChangeDetector != nil { + if err := f.SchemaChangeDetector.AwaitAllNormal(); err != nil { + f.logger.WithError(err).Error("schema-change drain failed before cutover") + f.ErrorHandler.Fatal("schema_change_detector", err) + } + } + if f.inlineVerifier != nil { // Stops the periodic verification of binlogs in the inline verifier // This should be okay as we enqueue the binlog events into the verifier, @@ -1005,7 +1133,16 @@ func (f *Ferry) Progress() *Progress { tableName := table.String() lastSuccessfulPaginationKeyInterface, foundInProgress := serializedState.LastSuccessfulPaginationKeys[tableName] - if serializedState.CompletedTables[tableName] { + // Detector overrides the default action while a DDL transition is in + // flight so shop-mover (and any other consumer of move.stats) can see + // "clearing_for_recopy" / "awaiting_ddl_convergence" / "recopying" + // instead of stale "completed" or premature "waiting". + if override := f.SchemaChangeDetector.TableAction(table.Schema, table.Name); override != "" { + currentAction = override + if override == TableActionRecopying { + s.ActiveDataIterators += 1 + } + } else if serializedState.CompletedTables[tableName] { currentAction = TableActionCompleted } else if foundInProgress { currentAction = TableActionCopying @@ -1054,6 +1191,12 @@ func (f *Ferry) Progress() *Progress { return s } +// callbackHTTPClient bounds the per-call wait so a slow or hung callback +// receiver cannot wedge the reporter goroutine indefinitely. Without this, +// a single hung POST stalls all subsequent state/progress callbacks, and +// shop-mover's stall watchdog fires because no further updates arrive. +var callbackHTTPClient = &http.Client{Timeout: 10 * time.Second} + func (f *Ferry) ReportProgress() { callback := f.Config.ProgressCallback // make a copy as we need to set the Payload. progress := f.Progress() @@ -1064,7 +1207,7 @@ func (f *Ferry) ReportProgress() { } callback.Payload = string(data) - err = callback.Post(&http.Client{}) + err = callback.Post(callbackHTTPClient) if err != nil { f.logger.WithError(err).Warn("failed to post status, but that's probably okay") } @@ -1081,7 +1224,7 @@ func (f *Ferry) ReportState() { } callback.Payload = string(state) - err = callback.Post(&http.Client{}) + err = callback.Post(callbackHTTPClient) if err != nil { f.logger.WithError(err).Errorf("failed to post state to callback %s", callback.URI) } diff --git a/filter.go b/filter.go index 9c367156..1e8405da 100644 --- a/filter.go +++ b/filter.go @@ -23,6 +23,12 @@ type CopyFilter interface { // otherwise. // Returning an error here will cause the ferry to be aborted. ApplicableEvent(DMLEvent) (bool, error) + + // BuildDelete returns a DELETE query scoped to the same subset of rows the + // filter selects from BuildSelect. It is invoked when automatic DDL + // handling clears the target rows for a table that needs recopy after a + // schema change. Returning an error aborts the ferry. + BuildDelete(*TableSchema) (sq.DeleteBuilder, error) } type TableFilter interface { diff --git a/go.mod b/go.mod index 0dbdbfd0..8652a0be 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-sql-driver/mysql v1.7.1 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/gorilla/mux v1.6.1 + github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d github.com/rs/zerolog v1.35.0 github.com/shopspring/decimal v1.2.0 github.com/sirupsen/logrus v1.8.1 @@ -31,8 +32,8 @@ require ( github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec // indirect + github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 // indirect github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a // indirect - github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect diff --git a/go.sum b/go.sum index 4ae0504a..e13e35cd 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/pingcap/errors v0.11.0/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec h1:3EiGmeJWoNixU+EwllIn26x6s4njiWRXewdx2zlYa84= github.com/pingcap/errors v0.11.5-0.20250318082626-8f80e5cb09ec/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= +github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a h1:WIhmJBlNGmnCWH6TLMdZfNEDaiU8cFpZe3iaqDbQ0M8= github.com/pingcap/log v1.1.1-0.20241212030209-7e3ff8601a2a/go.mod h1:ORfBOFp1eteu2odzsyaxI+b8TzJwgjwyQcGhI+9SfEA= github.com/pingcap/tidb/pkg/parser v0.0.0-20250421232622-526b2c79173d h1:3Ej6eTuLZp25p3aH/EXdReRHY12hjZYs3RrGp7iLdag= diff --git a/inline_verifier.go b/inline_verifier.go index d69e48ff..0c33f1fd 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -157,6 +157,38 @@ func (s *BinlogVerifyStore) Add(table *TableSchema, paginationKey string) { } } +// RemoveTable drops every reverify entry for a single table. Used by +// automatic DDL handling: when a table is about to be re-copied, any pending +// re-verification of its rows would target rows that are about to be deleted +// and re-inserted with potentially different schemas. +func (s *BinlogVerifyStore) RemoveTable(schemaName, tableName string) { + s.mutex.Lock() + defer s.mutex.Unlock() + + dbStore, exists := s.store[schemaName] + if !exists { + return + } + tableStore, exists := dbStore[tableName] + if !exists { + return + } + + var removed uint64 + for _, count := range tableStore { + removed += uint64(count) + } + delete(dbStore, tableName) + if len(dbStore) == 0 { + delete(s.store, schemaName) + } + if removed > s.currentRowCount { + s.currentRowCount = 0 + } else { + s.currentRowCount -= removed + } +} + func (s *BinlogVerifyStore) RemoveVerifiedBatch(batch BinlogVerifyBatch) { s.mutex.Lock() defer s.mutex.Unlock() @@ -269,6 +301,13 @@ type InlineVerifier struct { StateTracker *StateTracker ErrorHandler ErrorHandler + // QuiescenceChecker (optional) reports whether a given table is currently + // mid-schema-change. The periodic reverify loop skips batches for such + // tables — verifying old reverify entries against a possibly-converged + // or just-deleted target would surface spurious mismatches that the + // recopy is about to fix anyway. Nil disables the check. + QuiescenceChecker TableQuiescenceChecker + reverifyStore *BinlogVerifyStore verifyDuringCutoverStarted AtomicBoolean @@ -282,6 +321,24 @@ type InlineVerifier struct { backgroundVerificationWg *sync.WaitGroup } +// TableQuiescenceChecker reports whether the inline verifier should skip a +// table's reverify entries because it is mid-schema-change. Decoupled as an +// interface to keep InlineVerifier unit-testable without the full detector. +type TableQuiescenceChecker interface { + IsTableQuiesced(schemaName, tableName string) bool +} + +// FlushTableEntries drops every queued re-verification entry for the given +// table. Called by automatic DDL handling before re-copy so we don't try to +// re-verify rows that are about to be deleted and re-inserted with a new +// schema. +func (v *InlineVerifier) FlushTableEntries(schemaName, tableName string) { + if v == nil || v.reverifyStore == nil { + return + } + v.reverifyStore.RemoveTable(schemaName, tableName) +} + // This is called from the control server, which is triggered by pushing Run // Verification during cutover. // This step is necessary to ensure the binlogs are verified in Ghostferry. @@ -780,6 +837,17 @@ func (v *InlineVerifier) verifyAllEventsInStore() (bool, map[string]map[string][ v.logger.WithField("batches", len(allBatches)).Debug("verifyAllEventsInStore") for _, batch := range allBatches { + // Skip batches whose table is mid-schema-change. The detector will + // flush these entries from reverifyStore as part of recopy; running + // the checksum now would compare reverify-time fingerprints against + // a target whose rows are about to be deleted (or already are), + // surfacing a mismatch the recopy is about to fix. Cutover blocks + // on AwaitAllNormal, so deferring until the table returns to normal + // is safe. + if v.QuiescenceChecker != nil && v.QuiescenceChecker.IsTableQuiesced(batch.SchemaName, batch.TableName) { + continue + } + batchMismatches, err := v.verifyBinlogBatch(batch) if err != nil { return false, nil, err diff --git a/progress.go b/progress.go index 85105452..a3505db9 100644 --- a/progress.go +++ b/progress.go @@ -5,9 +5,12 @@ import ( ) const ( - TableActionWaiting = "waiting" - TableActionCopying = "copying" - TableActionCompleted = "completed" + TableActionWaiting = "waiting" + TableActionCopying = "copying" + TableActionCompleted = "completed" + TableActionAwaitingDDLConvergence = "awaiting_ddl_convergence" + TableActionClearingForRecopy = "clearing_for_recopy" + TableActionRecopying = "recopying" ) type TableProgress struct { diff --git a/schema_change_detector.go b/schema_change_detector.go new file mode 100644 index 00000000..d87f25ed --- /dev/null +++ b/schema_change_detector.go @@ -0,0 +1,1013 @@ +package ghostferry + +import ( + "context" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/Shopify/ghostferry/sqlwrapper" + "github.com/go-mysql-org/go-mysql/schema" +) + +// TableState tracks where a single migrated table sits in the schema-change +// reaction state machine. +type TableState int + +const ( + // StateNormal is the steady state: source and target schemas agree, the + // data iterator and binlog streamer process events normally. + StateNormal TableState = iota + // StateInTransition means a schema change has been observed on at least + // one side but the two sides have not converged yet. Binlog row events + // for the table are skipped to avoid column-count crashes. + StateInTransition + // StateRecopying means the target has been cleared and the data iterator + // is in the process of re-copying the table from source's current state. + // Binlog row events for the table are skipped during this phase, then + // applied normally once the copy completes. + StateRecopying +) + +func (s TableState) String() string { + switch s { + case StateNormal: + return "normal" + case StateInTransition: + return "in_transition" + case StateRecopying: + return "recopying" + } + return "unknown" +} + +// RecopyTrigger is the slice of DataIterator the detector needs to re-queue +// a table for re-iteration. Defined as an interface so the detector can be +// unit-tested without the full DataIterator wired up. +type RecopyTrigger interface { + RequeueTable(table *TableSchema) error + // AwaitIterationDrained blocks until any in-flight iterateTable goroutine + // for the named table has returned. The detector uses this — not + // StateTracker.IsTableComplete — to know when its bulk DELETE is safe. + AwaitIterationDrained(tableName string) +} + +// VerifierFlusher flushes per-table reverify entries when a table is about +// to be re-copied. Optional — nil is acceptable. +type VerifierFlusher interface { + FlushTableEntries(schemaName, tableName string) +} + +// SchemaChangeDetector reacts to DDL on either the source or target side of +// an in-progress migration. It owns a per-table state machine, drives recopy +// when source and target converge on a new schema, and exposes IsInTransition +// so the binlog streamer can skip rows for tables mid-change. +type SchemaChangeDetector struct { + SourceDB *sqlwrapper.DB + TargetDB *sqlwrapper.DB + + // SchemaCache is the live cache the rest of the ferry consults. The + // detector mutates entries in place when refreshing schemas after a + // metadata-only change; for full recopy it replaces the entry as well. + SchemaCache TableSchemaCache + + // CopyFilter scopes BuildDelete calls during recopy. When nil, recopy + // falls back to TRUNCATE TABLE. + CopyFilter CopyFilter + + // RecopyTrigger is invoked after a successful target delete to put the + // table back in DataIterator's work queue. + RecopyTrigger RecopyTrigger + + // Verifier (optional) gets a per-table flush call when a table enters + // StateRecopying. + Verifier VerifierFlusher + + // Throttler (optional) is consulted before issuing the bulk delete so the + // detector cooperates with the rest of the ferry's pacing. + Throttler Throttler + + // StateTracker is consulted to determine whether a table's initial copy + // has completed before the detector issues a bulk DELETE on the target. + // Concurrently DELETEing rows that an in-flight iterator is INSERTing + // would silently lose data — see recopy() comments. + StateTracker *StateTracker + + ErrorHandler ErrorHandler + + // TransitionTimeout bounds how long a table may sit in StateInTransition + // before the ferry aborts. Zero disables the timeout. + TransitionTimeout time.Duration + + // CascadingPaginationColumnConfig is required to recompute pagination + // metadata when refreshing a table's schema. + CascadingPaginationColumnConfig *CascadingPaginationColumnConfig + + // DatabaseRewrites and TableRewrites mirror the ferry's source→target + // name translation. Required when the source and target use different + // schema/table names — without them, target schema fetches and recopy + // DELETEs would hit the source-named (and possibly source-resident) + // table on the target connection, producing spurious convergence. + DatabaseRewrites map[string]string + TableRewrites map[string]string + + // TraceWriter, when non-nil, receives one short line per decision the + // detector makes (parsed DDL, affected tables, state transitions, + // convergence outcomes, recopy steps). Diagnosing whether a recopy fired + // or stalled is a cat-the-file operation when this is wired up. + TraceWriter io.Writer + + mu sync.RWMutex + tableState map[string]TableState + transitionStartedAt map[string]time.Time + // clearTargetDone tracks per-table channels covering the synchronous + // target-row DELETE inside recopy(). The channel is created when + // recopy() begins and closed once the DELETE has returned. Its + // presence under d.mu lets concurrent recopy() calls deduplicate + // (only one DELETE per convergence) and lets TableAction render the + // "clearing for recopy" UI state. Cleared in OnTableIterationComplete + // when the table returns to Normal. + clearTargetDone map[string]chan struct{} + logger Logger + loggerOnce sync.Once + traceMu sync.Mutex + + // migratedTablesByDB indexes migrated tables under both source-side and + // target-side schema/table names so DDLs picked up on either binlog stream + // resolve to a known table. Lookups must always normalize back to source- + // side names via targetToSource{Schema,Table} before keying the rest of + // the detector — tableState, SchemaCache, etc. all use source-side names. + migratedTablesByDB map[string]map[string]struct{} + targetToSourceSchema map[string]string + targetToSourceTable map[string]string +} + +// NewSchemaChangeDetector returns a zero-initialized detector. Callers must +// call Init before sending DDL events. +func NewSchemaChangeDetector(sourceDB, targetDB *sqlwrapper.DB, cache TableSchemaCache) *SchemaChangeDetector { + return &SchemaChangeDetector{ + SourceDB: sourceDB, + TargetDB: targetDB, + SchemaCache: cache, + tableState: map[string]TableState{}, + transitionStartedAt: map[string]time.Time{}, + clearTargetDone: map[string]chan struct{}{}, + } +} + +func (d *SchemaChangeDetector) ensureLogger() { + d.loggerOnce.Do(func() { + if d.logger == nil { + d.logger = LogWithField("tag", "schema_change_detector") + } + }) +} + +// trace appends a short timestamped line to TraceWriter. No-op when +// TraceWriter is nil. Errors writing to the trace are intentionally swallowed +// — the trace is for diagnostics; it must never fail the migration. +func (d *SchemaChangeDetector) trace(format string, args ...interface{}) { + if d == nil || d.TraceWriter == nil { + return + } + line := fmt.Sprintf(format, args...) + d.traceMu.Lock() + defer d.traceMu.Unlock() + _, _ = fmt.Fprintf(d.TraceWriter, "%s %s\n", time.Now().UTC().Format(time.RFC3339Nano), line) +} + +// Trace satisfies DDLTracer — same body as trace(), exposed so DataIterator +// can emit lines through the detector's tracer. +func (d *SchemaChangeDetector) Trace(format string, args ...interface{}) { + d.trace(format, args...) +} + +// Init populates the migrated-tables index from the schema cache. Call once +// after the cache has been built. +// +// The cache is keyed by source-side schema/table names. We index migrated +// tables under both the source-side and the target-side identity so that +// DDLs picked up on either binlog stream resolve to the same TableRef +// (which we always normalize back to source-side names — the rest of the +// detector keys on those). +func (d *SchemaChangeDetector) Init() { + d.ensureLogger() + + d.migratedTablesByDB = map[string]map[string]struct{}{} + d.targetToSourceSchema = map[string]string{} + d.targetToSourceTable = map[string]string{} + + for srcDB, tgtDB := range d.DatabaseRewrites { + d.targetToSourceSchema[tgtDB] = srcDB + } + for srcTbl, tgtTbl := range d.TableRewrites { + d.targetToSourceTable[tgtTbl] = srcTbl + } + + for _, t := range d.SchemaCache { + if t == nil || t.Table == nil { + continue + } + // Index under source-side names. + if _, ok := d.migratedTablesByDB[t.Schema]; !ok { + d.migratedTablesByDB[t.Schema] = map[string]struct{}{} + } + d.migratedTablesByDB[t.Schema][t.Name] = struct{}{} + + // Also index under target-side names so DDLs landing on the target + // binlog tail with their rewritten schema are recognized. + tgtSchema, tgtTable := t.Schema, t.Name + if rewritten, ok := d.DatabaseRewrites[t.Schema]; ok { + tgtSchema = rewritten + } + if rewritten, ok := d.TableRewrites[t.Name]; ok { + tgtTable = rewritten + } + if tgtSchema != t.Schema || tgtTable != t.Name { + if _, ok := d.migratedTablesByDB[tgtSchema]; !ok { + d.migratedTablesByDB[tgtSchema] = map[string]struct{}{} + } + d.migratedTablesByDB[tgtSchema][tgtTable] = struct{}{} + } + } + + d.mu.Lock() + for _, t := range d.SchemaCache { + d.tableState[t.String()] = StateNormal + } + d.mu.Unlock() +} + +// PreflightCheck refuses to start the migration when source and target are +// already drifted, or when there's a shadow table in flight on either side +// for any of the migrated tables. The caller (typically shop-mover) is +// expected to perform similar checks; this is a defense-in-depth fallback so +// ghostferry doesn't bind itself to a target that's mid-DDL. +// +// Returns a non-nil error when the migration should not start. The error +// message names every problematic table so operators can see the full picture +// in one log line. +func (d *SchemaChangeDetector) PreflightCheck(tables []*TableSchema) error { + d.ensureLogger() + + var schemaIssues []string + var shadowIssues []string + + for _, table := range tables { + targetSchema, err := d.fetchTableSchema(d.TargetDB, table.Schema, table.Name) + if err != nil { + return fmt.Errorf("preflight: failed to read target schema for %s: %w", table.String(), err) + } + if targetSchema == nil { + // Missing on target is fine — the initial copy will create rows. + } else if !schemasEquivalent(table, targetSchema) { + schemaIssues = append(schemaIssues, table.String()) + } + } + + databases := map[string]struct{}{} + for _, table := range tables { + databases[table.Schema] = struct{}{} + } + + migratedSet := func(db string) map[string]struct{} { + set := map[string]struct{}{} + for _, t := range tables { + if t.Schema == db { + set[t.Name] = struct{}{} + } + } + return set + } + + checkSide := func(db *sqlwrapper.DB, side string) error { + for dbname := range databases { + tableNames, err := showTablesFrom(db, dbname) + if err != nil { + return fmt.Errorf("preflight: failed to list %s tables in %s: %w", side, dbname, err) + } + migrated := migratedSet(dbname) + for _, name := range tableNames { + original := OriginalTableFromShadow(name) + if original == "" { + continue + } + if _, isMigrated := migrated[original]; isMigrated { + shadowIssues = append(shadowIssues, + fmt.Sprintf("%s on %s side (shadow of %s.%s)", name, side, dbname, original)) + } + } + } + return nil + } + + if err := checkSide(d.SourceDB, "source"); err != nil { + return err + } + if err := checkSide(d.TargetDB, "target"); err != nil { + return err + } + + if len(schemaIssues) == 0 && len(shadowIssues) == 0 { + return nil + } + + parts := []string{} + if len(schemaIssues) > 0 { + parts = append(parts, "tables with mismatched source/target schemas: "+strings.Join(schemaIssues, ", ")) + } + if len(shadowIssues) > 0 { + parts = append(parts, "shadow tables present (DDL likely in flight): "+strings.Join(shadowIssues, ", ")) + } + return fmt.Errorf("preflight check failed: %s", strings.Join(parts, "; ")) +} + +// IsInTransition reports whether row events for the given table should be +// dropped before parsing. Called from the binlog row handler hot path — +// keeps the lock window small. +// +// Only StateInTransition is treated as "skip": at that point source and +// target schemas differ and parsing the row event would either crash on +// column-count mismatch (binlog parse path) or land in BatchWriter as an +// INSERT against a column the destination does not have. +// +// StateRecopying is NOT skipped. By the time we flip to Recopying the cache +// has been refreshed to the converged schema, the target rows have been +// deleted, and a fresh iteration is running. Binlog events arriving in this +// window must be applied — the iterator's pagination cursor only sees rows +// up to its current position, so any UPDATE on an already-iterated row is +// only captured via the binlog. Skipping here would leave those updates +// missing on target and surface as cutover checksum mismatches. +func (d *SchemaChangeDetector) IsInTransition(schemaName, tableName string) bool { + if d == nil { + return false + } + key := fullTableName(schemaName, tableName) + d.mu.RLock() + state := d.tableState[key] + d.mu.RUnlock() + return state == StateInTransition +} + +// TableAction returns a non-empty TableAction* string when the detector has +// authoritative state for the given table that should override the +// DataIterator-derived progress action. Returns "" when the table is in +// StateNormal (or the detector is nil), in which case Ferry.Progress() falls +// back to its default Completed/Copying/Waiting derivation. +func (d *SchemaChangeDetector) TableAction(schemaName, tableName string) string { + if d == nil { + return "" + } + key := fullTableName(schemaName, tableName) + d.mu.RLock() + state := d.tableState[key] + _, hasClear := d.clearTargetDone[key] + d.mu.RUnlock() + + switch state { + case StateInTransition: + if hasClear { + // recopy() has begun the synchronous DELETE — surface this so + // the UI doesn't sit on "Awaiting DDL convergence" while the + // DELETE is the actual long pole. + return TableActionClearingForRecopy + } + return TableActionAwaitingDDLConvergence + case StateRecopying: + return TableActionRecopying + } + return "" +} + +// OnSourceDDL is registered with the source BinlogStreamer's QueryEvent +// handler. It is invoked synchronously from the streamer goroutine so it must +// stay non-blocking. +func (d *SchemaChangeDetector) OnSourceDDL(stmt *DDLStatement) { + d.handleDDL(stmt, "source") +} + +// OnTargetDDL is the corresponding hook for TargetBinlogTail. +func (d *SchemaChangeDetector) OnTargetDDL(stmt *DDLStatement) { + d.handleDDL(stmt, "target") +} + +func (d *SchemaChangeDetector) handleDDL(stmt *DDLStatement, side string) { + if stmt == nil { + return + } + d.ensureLogger() + + d.trace("OnDDL side=%s ddlType=%s schema=%s table=%s rawQuery=%q", + side, stmt.DDLType, stmt.SchemaName, stmt.TableName, truncateQuery(stmt.RawQuery)) + + affected := d.affectedMigratedTables(stmt) + d.trace("affectedMigratedTables side=%s out=%s", side, formatTableRefs(affected)) + if len(affected) == 0 { + return + } + + for _, ref := range affected { + key := fullTableName(ref.SchemaName, ref.TableName) + + d.mu.Lock() + state := d.tableState[key] + transitioned := false + if state == StateNormal { + d.tableState[key] = StateInTransition + d.transitionStartedAt[key] = time.Now() + transitioned = true + d.logger.WithFields(Fields{ + "table": key, + "side": side, + "ddlType": stmt.DDLType, + }).Info("table entered transition") + } + d.mu.Unlock() + + if transitioned { + d.trace("transition table=%s from=normal to=in_transition side=%s ddlType=%s", key, side, stmt.DDLType) + } else { + d.trace("transition_skipped table=%s already_state=%s side=%s", key, state, side) + } + + // Convergence check runs in its own goroutine so we never block the + // binlog handler on a target schema fetch. The DELETE itself only + // fires once both sides have converged on the new schema — running + // it earlier overlaps with whatever DDL is still pending on the + // other side and contends on metadata locks (e.g. with gh-ost on + // the target). + go d.checkConvergence(ref) + } +} + +// affectedMigratedTables returns the set of migrated tables this DDL touches. +// Includes shadow-table CREATE/RENAME mappings: a CREATE _foo_gho on a +// migrated table foo puts foo into transition, and the gh-ost cutover RENAME +// puts both halves into transition. +func (d *SchemaChangeDetector) affectedMigratedTables(stmt *DDLStatement) []TableRef { + if d.migratedTablesByDB == nil { + return nil + } + + candidates := stmt.AllAffectedTables() + out := make([]TableRef, 0, len(candidates)) + seen := map[string]struct{}{} + + // addRef appends a (source-keyed) TableRef, deduping. We always normalize + // matches back to source-side names because the rest of the detector + // (tableState, SchemaCache lookups, IsInTransition queries from the + // streamer) keys on those. + addRef := func(sourceSchema, sourceTable string) { + key := sourceSchema + "." + sourceTable + if _, dup := seen[key]; dup { + return + } + seen[key] = struct{}{} + out = append(out, TableRef{SchemaName: sourceSchema, TableName: sourceTable}) + } + + considerName := func(schemaName, tableName string) { + if schemaName == "" || tableName == "" { + return + } + tables, ok := d.migratedTablesByDB[schemaName] + if !ok { + return + } + + // Translate target-side identity back to source-side. When the DDL + // arrives on the target binlog tail, schemaName/tableName are the + // target's, but we want to key the rest of the detector under the + // source-side names that match the schema cache. + sourceSchema := schemaName + if rewritten, ok := d.targetToSourceSchema[schemaName]; ok { + sourceSchema = rewritten + } + sourceTable := tableName + if rewritten, ok := d.targetToSourceTable[tableName]; ok { + sourceTable = rewritten + } + + // Direct match: a migrated table is the literal subject of the DDL. + if _, isMigrated := tables[tableName]; isMigrated { + addRef(sourceSchema, sourceTable) + return + } + + // Shadow-table match: DDL on __gho/_del/_ghc/_new/_old means + // gh-ost or pt-osc is mid-migration on a migrated original. + if original := OriginalTableFromShadow(tableName); original != "" { + if _, isMigrated := tables[original]; isMigrated { + // Normalize the shadow's "original" through the rewrite map + // in case the rewrite is on the table name rather than the + // schema. Falls back to original when no rewrite exists. + originalSource := original + if rewritten, ok := d.targetToSourceTable[original]; ok { + originalSource = rewritten + } + addRef(sourceSchema, originalSource) + } + } + } + + for _, ref := range candidates { + considerName(ref.SchemaName, ref.TableName) + } + return out +} + +func (d *SchemaChangeDetector) checkConvergence(ref TableRef) { + key := fullTableName(ref.SchemaName, ref.TableName) + logger := d.logger.WithField("table", key) + + d.mu.RLock() + state := d.tableState[key] + d.mu.RUnlock() + if state != StateInTransition { + d.trace("checkConvergence_skip table=%s state=%s", key, state) + return + } + + sourceSchema, err := d.fetchTableSchema(d.SourceDB, ref.SchemaName, ref.TableName) + if err != nil { + logger.WithError(err).Warn("convergence: failed to read source schema; will retry on next DDL") + d.trace("checkConvergence_error table=%s side=source err=%q", key, err.Error()) + return + } + targetSchemaName, targetTableName := d.rewriteToTarget(ref.SchemaName, ref.TableName) + targetSchema, err := d.fetchTableSchema(d.TargetDB, targetSchemaName, targetTableName) + if err != nil { + logger.WithError(err).Warn("convergence: failed to read target schema; will retry on next DDL") + d.trace("checkConvergence_error table=%s side=target target_table=%s.%s err=%q", + key, targetSchemaName, targetTableName, err.Error()) + return + } + + d.trace("checkConvergence table=%s source_cols=%d target_cols=%d source_present=%t target_present=%t", + key, columnCount(sourceSchema), columnCount(targetSchema), sourceSchema != nil, targetSchema != nil) + + if sourceSchema == nil || targetSchema == nil { + // One side dropped or hasn't yet recreated. Stay in transition. + logger.Debug("convergence: one side missing the table; remaining in transition") + d.trace("convergence_pending table=%s reason=one_side_missing", key) + return + } + + if !schemasEquivalent(sourceSchema, targetSchema) { + logger.Debug("convergence: source and target still differ; remaining in transition") + d.trace("convergence_pending table=%s reason=schemas_differ source_cols=%s target_cols=%s", + key, formatColumnNames(sourceSchema), formatColumnNames(targetSchema)) + return + } + + cached := d.SchemaCache[key] + impact := ClassifyImpact(cached, sourceSchema) + d.trace("convergence_reached table=%s impact=%s cached_cols=%d fresh_cols=%d", + key, impactString(impact), columnCount(cached), columnCount(sourceSchema)) + + if impact == ImpactMetadataOnly { + d.applyMetadataOnly(ref, sourceSchema) + return + } + + d.recopy(ref, sourceSchema) +} + +func (d *SchemaChangeDetector) applyMetadataOnly(ref TableRef, fresh *TableSchema) { + key := fullTableName(ref.SchemaName, ref.TableName) + logger := d.logger.WithField("table", key) + + d.mu.Lock() + // Concurrent checkConvergence calls can race here: a parallel recopy() + // may have already refreshed SchemaCache[key] to the new schema. From + // this caller's vantage point, ClassifyImpact then reports + // metadata_only — but the truth is recopy is mid-DELETE. Both + // clearTargetDone[key] (DELETE in progress) and StateRecopying + // (DELETE finished, iteration running) signal that a recopy already + // owns this transition; abort without touching state or cache. + if _, clearing := d.clearTargetDone[key]; clearing { + d.mu.Unlock() + d.trace("metadata_only_skipped table=%s reason=recopy_in_progress", key) + return + } + if d.tableState[key] == StateRecopying { + d.mu.Unlock() + d.trace("metadata_only_skipped table=%s reason=already_recopying", key) + return + } + if d.tableState[key] != StateInTransition { + // Already flipped back to Normal by another path. Nothing to do. + d.mu.Unlock() + d.trace("metadata_only_skipped table=%s reason=not_in_transition", key) + return + } + if d.SchemaCache != nil { + d.SchemaCache[key] = fresh + } + d.tableState[key] = StateNormal + delete(d.transitionStartedAt, key) + d.mu.Unlock() + + logger.Info("convergence: metadata-only change applied; table back to normal") + d.trace("metadata_only_applied table=%s state=in_transition→normal", key) +} + +func (d *SchemaChangeDetector) recopy(ref TableRef, fresh *TableSchema) { + key := fullTableName(ref.SchemaName, ref.TableName) + logger := d.logger.WithField("table", key) + + // Claim the recopy under lock. clearTargetDone[key] is the in-progress + // signal: present-and-open while DELETE runs, present-and-closed for + // the brief window after DELETE finishes and before state flips, and + // absent both before DELETE starts and after the table returns to + // Normal. This serves two roles: (1) it deduplicates concurrent recopy + // calls when checkConvergence fires more than once for the same table, + // and (2) it lets TableAction surface the "clearing" UI state. + d.mu.Lock() + if d.tableState[key] == StateRecopying { + d.mu.Unlock() + d.trace("recopy step=0_skipped table=%s reason=already_recopying", key) + return + } + if _, alreadyClearing := d.clearTargetDone[key]; alreadyClearing { + d.mu.Unlock() + d.trace("recopy step=0_skipped table=%s reason=already_clearing", key) + return + } + clearDone := make(chan struct{}) + d.clearTargetDone[key] = clearDone + if d.SchemaCache != nil { + d.SchemaCache[key] = fresh + } + d.mu.Unlock() + d.trace("recopy step=1_refresh_cache table=%s", key) + + d.trace("recopy step=2_flush_verifier table=%s verifier_present=%t", key, d.Verifier != nil) + if d.Verifier != nil { + d.Verifier.FlushTableEntries(ref.SchemaName, ref.TableName) + } + + // Drain any in-flight iterateTable for this table before issuing the + // DELETE. The iterator aborts itself via TransitionChecker once it + // observes InTransition, but concurrent INSERT IGNORE would survive + // the DELETE and leave inconsistent state. + if d.RecopyTrigger != nil { + d.RecopyTrigger.AwaitIterationDrained(key) + } + + if d.Throttler != nil { + WaitForThrottle(d.Throttler) + } + + d.trace("recopy step=3_clear_target_start table=%s", key) + if err := d.deleteTargetRows(fresh); err != nil { + logger.WithError(err).Error("clear_target failed") + d.trace("recopy step=3_clear_target_failed table=%s err=%q", key, err.Error()) + // Drop the in-progress channel so a future retry can re-enter. + // In practice ErrorHandler.Fatal aborts the ferry, but cleanup is + // cheap and keeps the state machine well-formed. + d.mu.Lock() + delete(d.clearTargetDone, key) + d.mu.Unlock() + close(clearDone) + if d.ErrorHandler != nil { + d.ErrorHandler.Fatal("schema_change_detector", err) + } + return + } + d.trace("recopy step=3_clear_target_done table=%s", key) + + d.mu.Lock() + d.tableState[key] = StateRecopying + d.mu.Unlock() + close(clearDone) + d.trace("recopy step=4_state table=%s state=in_transition→recopying", key) + + if d.RecopyTrigger == nil { + logger.Warn("recopy: no RecopyTrigger configured; target is empty for this table but no re-iteration will run") + d.trace("recopy step=5_requeue_skipped table=%s reason=no_trigger", key) + return + } + d.trace("recopy step=5_requeue table=%s", key) + if err := d.RecopyTrigger.RequeueTable(fresh); err != nil { + logger.WithError(err).Error("recopy: failed to requeue table for re-iteration") + d.trace("recopy step=5_requeue_failed table=%s err=%q", key, err.Error()) + if d.ErrorHandler != nil { + d.ErrorHandler.Fatal("schema_change_detector", err) + } + return + } + d.trace("recopy step=5_requeue_done table=%s", key) +} + +// IsTableQuiesced reports whether the given table is currently mid-schema- +// change (StateInTransition or StateRecopying). Background work that would +// surface spurious failures during a transition — most notably the inline +// verifier's periodic reverify cycle — should skip these tables until the +// detector reports them back to StateNormal. +// +// Distinct from IsInTransition: that method is the binlog row-event hot +// path and is intentionally narrower (only StateInTransition skips events, +// because StateRecopying needs binlog application to capture UPDATEs to +// rows the cursor has already passed). +func (d *SchemaChangeDetector) IsTableQuiesced(schemaName, tableName string) bool { + if d == nil { + return false + } + key := fullTableName(schemaName, tableName) + d.mu.RLock() + state := d.tableState[key] + d.mu.RUnlock() + return state != StateNormal +} + +// AwaitAllNormal blocks until every tracked table is in StateNormal. Ferry +// calls this immediately before VerifyBeforeCutover so the verifier never +// runs against a table whose target rows are in the middle of being +// deleted/recopied. Honors TransitionTimeout — if any table is still not +// Normal after the timeout, returns an error so the caller can abort. +func (d *SchemaChangeDetector) AwaitAllNormal() error { + if d == nil { + return nil + } + d.ensureLogger() + logged := false + deadline := time.Time{} + if d.TransitionTimeout > 0 { + deadline = time.Now().Add(d.TransitionTimeout) + } + for { + if d.allNormal() { + return nil + } + if !logged { + d.logger.Info("waiting for any in-flight schema-change recopies to finish before cutover") + logged = true + } + if !deadline.IsZero() && time.Now().After(deadline) { + return fmt.Errorf("timeout (%s) waiting for schema-change recopies to drain", d.TransitionTimeout) + } + time.Sleep(500 * time.Millisecond) + } +} + +func (d *SchemaChangeDetector) allNormal() bool { + d.mu.RLock() + defer d.mu.RUnlock() + for _, s := range d.tableState { + if s != StateNormal { + return false + } + } + return true +} + +// OnTableIterationComplete is wired up as a DataIterator table-completion +// listener at startup. Whenever an iteration finishes (initial copy or a +// requeue), the detector inspects the table's state and, if it was in +// StateRecopying, transitions it back to StateNormal. Initial-copy +// completions for tables in StateNormal are no-ops. +func (d *SchemaChangeDetector) OnTableIterationComplete(table *TableSchema) { + if table == nil { + return + } + key := fullTableName(table.Schema, table.Name) + d.mu.Lock() + prev := d.tableState[key] + if prev == StateRecopying { + d.tableState[key] = StateNormal + delete(d.transitionStartedAt, key) + // Drop the completed clearTargetDone channel so a future DDL on + // this table allocates a fresh one for its early DELETE. + delete(d.clearTargetDone, key) + d.logger.WithField("table", key).Info("recopy completed; table back to normal") + } + d.mu.Unlock() + d.trace("OnTableIterationComplete table=%s prev_state=%s flipped_to_normal=%t", + key, prev, prev == StateRecopying) +} + +// CheckTimeouts returns an error when any table has been stuck in transition +// past TransitionTimeout. Caller should run this from a periodic ticker. +func (d *SchemaChangeDetector) CheckTimeouts(now time.Time) error { + if d.TransitionTimeout <= 0 { + return nil + } + d.mu.RLock() + defer d.mu.RUnlock() + for key, started := range d.transitionStartedAt { + if now.Sub(started) > d.TransitionTimeout { + return fmt.Errorf("table %s stuck in transition for %s (timeout %s)", + key, now.Sub(started), d.TransitionTimeout) + } + } + return nil +} + +// RunTimeoutChecker drives CheckTimeouts on a ticker until ctx is cancelled. +// Aborts the ferry via ErrorHandler on any timeout. +func (d *SchemaChangeDetector) RunTimeoutChecker(ctx context.Context) { + if d.TransitionTimeout <= 0 { + return + } + ticker := time.NewTicker(d.TransitionTimeout / 8) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + if err := d.CheckTimeouts(now); err != nil { + if d.ErrorHandler != nil { + d.ErrorHandler.Fatal("schema_change_detector", err) + } + return + } + } + } +} + +func (d *SchemaChangeDetector) deleteTargetRows(table *TableSchema) error { + // Build a transient TableSchema bearing the target-side schema and + // table names so BuildDelete / TRUNCATE produce queries that resolve on + // the target connection. The original table object is left untouched — + // it is shared with the rest of the ferry which still keys on source + // names. + targetTable := d.tableForTarget(table) + + if d.CopyFilter != nil { + builder, err := d.CopyFilter.BuildDelete(targetTable) + if err != nil { + d.logger.WithFields(Fields{ + "table": targetTable.String(), + "err": err.Error(), + }).Warn("BuildDelete unavailable; falling back to no-op (recopy will rely on INSERT IGNORE)") + return nil + } + query, args, err := builder.ToSql() + if err != nil { + return fmt.Errorf("BuildDelete ToSql: %w", err) + } + if _, err := d.TargetDB.Exec(query, args...); err != nil { + return fmt.Errorf("delete target rows for %s: %w", targetTable.String(), err) + } + return nil + } + + // No CopyFilter (full-table copy): TRUNCATE is the safe equivalent. + q := fmt.Sprintf("TRUNCATE TABLE %s", QuotedTableName(targetTable)) + if _, err := d.TargetDB.Exec(q); err != nil { + return fmt.Errorf("truncate target table %s: %w", targetTable.String(), err) + } + return nil +} + +// rewriteToTarget translates a source-side schema/table name into the +// target-side equivalent using the ferry's rewrite maps. Returns the input +// unchanged when no rewrite is configured. +func (d *SchemaChangeDetector) rewriteToTarget(schemaName, tableName string) (string, string) { + if rewritten, ok := d.DatabaseRewrites[schemaName]; ok { + schemaName = rewritten + } + if rewritten, ok := d.TableRewrites[tableName]; ok { + tableName = rewritten + } + return schemaName, tableName +} + +// tableForTarget returns a *TableSchema with Schema/Name rewritten to the +// target-side identity. Column metadata is shared with the input — only the +// outer name fields differ — so callers can safely use it for QuotedTableName +// and BuildDelete without disturbing the original. +func (d *SchemaChangeDetector) tableForTarget(src *TableSchema) *TableSchema { + if src == nil || src.Table == nil { + return src + } + targetSchema, targetTable := d.rewriteToTarget(src.Schema, src.Name) + if targetSchema == src.Schema && targetTable == src.Name { + return src + } + clonedInner := *src.Table + clonedInner.Schema = targetSchema + clonedInner.Name = targetTable + cloned := *src + cloned.Table = &clonedInner + return &cloned +} + +func (d *SchemaChangeDetector) fetchTableSchema(db *sqlwrapper.DB, schemaName, tableName string) (*TableSchema, error) { + t, err := schema.NewTableFromSqlDB(db.DB, schemaName, tableName) + if err != nil { + // MySQL surfaces a missing table as an error; treat it as "not present" + // so callers can distinguish from real failures. + if isTableNotFoundError(err) { + return nil, nil + } + return nil, err + } + + visibleIndexes := make([]*schema.Index, 0, len(t.Indexes)) + for _, idx := range t.Indexes { + if idx.Visible { + visibleIndexes = append(visibleIndexes, idx) + } + } + t.Indexes = visibleIndexes + + ts := &TableSchema{Table: t} + if d.CascadingPaginationColumnConfig != nil { + col, idx, err := ts.paginationKeyColumn(d.CascadingPaginationColumnConfig) + if err == nil { + ts.PaginationKeyColumn = col + ts.PaginationKeyIndex = idx + } + } + return ts, nil +} + +func isTableNotFoundError(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "Error 1146") || strings.Contains(strings.ToLower(msg), "doesn't exist") +} + +// Helpers used to format trace lines. Kept here so trace strings stay +// consistent across decision points. + +func truncateQuery(q string) string { + const max = 200 + q = strings.ReplaceAll(q, "\n", " ") + if len(q) <= max { + return q + } + return q[:max] + "…" +} + +func formatTableRefs(refs []TableRef) string { + if len(refs) == 0 { + return "[]" + } + parts := make([]string, len(refs)) + for i, r := range refs { + parts[i] = fmt.Sprintf("%s.%s", r.SchemaName, r.TableName) + } + return "[" + strings.Join(parts, ",") + "]" +} + +func columnCount(t *TableSchema) int { + if t == nil || t.Table == nil { + return -1 + } + return len(t.Columns) +} + +func formatColumnNames(t *TableSchema) string { + if t == nil || t.Table == nil { + return "[]" + } + names := make([]string, len(t.Columns)) + for i, c := range t.Columns { + names[i] = c.Name + } + return "[" + strings.Join(names, ",") + "]" +} + +func impactString(i SchemaImpact) string { + switch i { + case ImpactMetadataOnly: + return "metadata_only" + case ImpactRequiresRecopy: + return "requires_recopy" + } + return "unknown" +} + +// schemasEquivalent compares two schemas by structural fields the binlog +// stream cares about: column names, column types, and pagination column. +// Index/comment/default differences do not count as a divergence. +func schemasEquivalent(a, b *TableSchema) bool { + if a == nil || b == nil || a.Table == nil || b.Table == nil { + return false + } + if len(a.Columns) != len(b.Columns) { + return false + } + for i := range a.Columns { + ac := a.Columns[i] + bc := b.Columns[i] + if ac.Name != bc.Name { + return false + } + if !columnTypesEquivalent(ac.RawType, bc.RawType) { + return false + } + if ac.IsUnsigned != bc.IsUnsigned { + return false + } + } + return true +} diff --git a/sharding/filter.go b/sharding/filter.go index a29145b4..00478033 100644 --- a/sharding/filter.go +++ b/sharding/filter.go @@ -139,6 +139,31 @@ func (f *ShardedCopyFilter) BuildSelect(columns []string, table *ghostferry.Tabl OrderBy(quotedPaginationKey), nil // LIMIT comes from the subquery. } +func (f *ShardedCopyFilter) BuildDelete(table *ghostferry.TableSchema) (sq.DeleteBuilder, error) { + quotedTable := ghostferry.QuotedTableName(table) + + // Joined tables are reference-counted/copy-on-write rows shared across + // shards. Deleting them based on this shard's view would corrupt other + // shards' data on the same target, so refuse — the detector falls back to + // an INSERT IGNORE recopy that leaves stale rows alone (acceptable since + // joined-table rows are immutable by contract). + if _, exists := f.JoinedTables[table.Name]; exists { + return sq.DeleteBuilder{}, fmt.Errorf("BuildDelete is not supported for joined table %s", table.Name) + } + + if _, exists := f.PrimaryKeyTables[table.Name]; exists { + // PrimaryKeyTables hold one row per shard keyed by the pagination + // column (which IS the sharding key here). + quotedPaginationKey := "`" + table.GetPaginationColumn().Name + "`" + return sq.Delete(quotedTable). + Where(sq.Eq{quotedPaginationKey: f.ShardingValue}), nil + } + + quotedShardingKey := "`" + f.ShardingKey + "`" + return sq.Delete(quotedTable). + Where(sq.Eq{quotedShardingKey: f.ShardingValue}), nil +} + func (f *ShardedCopyFilter) shardingKeyIndexHint(table *ghostferry.TableSchema, indexHint string) string { indexName := f.shardingKeyIndexName(table) diff --git a/state_tracker.go b/state_tracker.go index 9f97221d..3eb4a5f7 100644 --- a/state_tracker.go +++ b/state_tracker.go @@ -257,6 +257,18 @@ func (s *StateTracker) IsTableComplete(table string) bool { return s.completedTables[table] } +// ResetTable wipes any persisted progress for a table so the next iteration +// starts from the beginning. Used by automatic DDL handling when a table is +// re-queued after a schema change. +func (s *StateTracker) ResetTable(table string) { + s.CopyRWMutex.Lock() + defer s.CopyRWMutex.Unlock() + + delete(s.completedTables, table) + delete(s.lastSuccessfulPaginationKeys, table) + delete(s.rowStatsWrittenPerTable, table) +} + // This is reasonably accurate if the rows copied are distributed uniformly // between paginationKey = 0 -> max(paginationKey). It would not be accurate if the distribution is // concentrated in a particular region. diff --git a/target_binlog_tail.go b/target_binlog_tail.go new file mode 100644 index 00000000..4a66698f --- /dev/null +++ b/target_binlog_tail.go @@ -0,0 +1,182 @@ +package ghostferry + +import ( + "context" + "crypto/tls" + "errors" + "fmt" + "time" + + sql "github.com/Shopify/ghostferry/sqlwrapper" + + "github.com/go-mysql-org/go-mysql/replication" +) + +// TargetBinlogTail is a lightweight binlog reader pointed at the target +// database. Its sole purpose is to surface DDL events that land on the target +// — for example a tier-scheduled ALTER, or a gh-ost shadow-table CREATE — so +// the schema-change detector can pair them with source-side DDL and trigger a +// recopy when both sides converge. +// +// Unlike the main BinlogStreamer the target tail does not parse row events +// and does not feed downstream listeners. RowEvents are dropped on the floor. +type TargetBinlogTail struct { + DB *sql.DB + DBConfig *DatabaseConfig + MyServerId uint32 + ErrorHandler ErrorHandler + + // OnDDL is invoked for every successfully parsed DDL QueryEvent observed + // on the target. Non-DDL QUERY_EVENTs (BEGIN, COMMIT, SAVEPOINT) are + // dropped before the callback fires. + OnDDL func(*DDLStatement) + + binlogSyncer *replication.BinlogSyncer + binlogStreamer *replication.BinlogStreamer + + stopRequested bool + logger Logger +} + +func (t *TargetBinlogTail) ensureLogger() { + if t.logger == nil { + t.logger = LogWithField("tag", "target_binlog_tail") + } +} + +func (t *TargetBinlogTail) connect() error { + t.ensureLogger() + + var tlsConfig *tls.Config + if t.DBConfig != nil && t.DBConfig.TLS != nil { + var err error + tlsConfig, err = t.DBConfig.TLS.BuildConfig() + if err != nil { + return err + } + } + + if t.MyServerId == 0 { + // The target tail must use a server_id distinct from any source-side + // reader. Caller is expected to either set this explicitly or accept a + // random one — collisions are exceedingly unlikely against a target + // shop pod which is not itself a replication source. + t.MyServerId = randomServerId() + } + + syncerConfig := replication.BinlogSyncerConfig{ + ServerID: t.MyServerId, + Host: t.DBConfig.Host, + Port: t.DBConfig.Port, + User: t.DBConfig.User, + Password: t.DBConfig.Pass, + TLSConfig: tlsConfig, + UseDecimal: true, + UseFloatWithTrailingZero: true, + TimestampStringLocation: time.UTC, + Logger: NewSlogLogger(t.logger), + } + t.binlogSyncer = replication.NewBinlogSyncer(syncerConfig) + + startPos, err := ShowMasterStatusBinlogPosition(t.DB) + if err != nil { + t.logger.WithError(err).Error("failed to read target binlog position") + return err + } + + t.logger.WithFields(Fields{ + "file": startPos.Name, + "position": startPos.Pos, + "host": t.DBConfig.Host, + "port": t.DBConfig.Port, + }).Info("starting target binlog tail") + + t.binlogStreamer, err = t.binlogSyncer.StartSync(startPos) + if err != nil { + t.logger.WithError(err).Error("unable to start target binlog tail") + return err + } + return nil +} + +// Run streams the target's binlog and dispatches DDL QueryEvents to OnDDL +// until ctx is cancelled or Stop is called. Connection failures are fatal — +// the design treats schema-drift detection as load-bearing for correctness, so +// silently degrading the tail would let a target-side ALTER slip past while +// source rows continued to apply. +func (t *TargetBinlogTail) Run(ctx context.Context) error { + t.ensureLogger() + + if t.OnDDL == nil { + return errors.New("TargetBinlogTail.OnDDL must be set before Run") + } + + if err := t.connect(); err != nil { + t.ErrorHandler.Fatal("target_binlog_tail", err) + return err + } + defer t.binlogSyncer.Close() + + for !t.stopRequested { + select { + case <-ctx.Done(): + t.logger.Info("target binlog tail stopping (context cancelled)") + return nil + default: + } + + ev, err := t.readEvent(ctx) + if err == context.DeadlineExceeded { + continue + } + if err == context.Canceled { + return nil + } + if err != nil { + t.logger.WithError(err).Error("target binlog tail read failed") + t.ErrorHandler.Fatal("target_binlog_tail", err) + return err + } + + t.handleEvent(ev) + } + return nil +} + +func (t *TargetBinlogTail) readEvent(ctx context.Context) (*replication.BinlogEvent, error) { + readCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + return t.binlogStreamer.GetEvent(readCtx) +} + +func (t *TargetBinlogTail) handleEvent(ev *replication.BinlogEvent) { + queryEv, ok := ev.Event.(*replication.QueryEvent) + if !ok { + return + } + if !IsDDLQuery(queryEv.Query) { + return + } + stmt := ParseDDLFromQueryEvent(queryEv) + if stmt == nil { + return + } + + t.logger.WithFields(Fields{ + "schema": stmt.SchemaName, + "table": stmt.TableName, + "ddlType": stmt.DDLType, + }).Info("target DDL observed") + + defer func() { + if r := recover(); r != nil { + t.logger.WithField("panic", fmt.Sprintf("%v", r)).Error("OnDDL handler panicked; continuing") + } + }() + t.OnDDL(stmt) +} + +// Stop signals the tail loop to exit at the next iteration. +func (t *TargetBinlogTail) Stop() { + t.stopRequested = true +} diff --git a/test/go/ddl_events_test.go b/test/go/ddl_events_test.go new file mode 100644 index 00000000..72982f1b --- /dev/null +++ b/test/go/ddl_events_test.go @@ -0,0 +1,133 @@ +package test + +import ( + "testing" + + "github.com/Shopify/ghostferry" + "github.com/go-mysql-org/go-mysql/replication" + "github.com/stretchr/testify/assert" +) + +func qe(query, defaultSchema string) *replication.QueryEvent { + return &replication.QueryEvent{ + Query: []byte(query), + Schema: []byte(defaultSchema), + } +} + +func TestIsDDLQuery(t *testing.T) { + cases := map[string]bool{ + "ALTER TABLE foo ADD COLUMN bar INT": true, + " alter table foo add column bar int": true, + "/* hint */ ALTER TABLE foo DROP bar": true, + "-- comment\nALTER TABLE foo DROP bar\n": true, + "CREATE TABLE foo (id INT)": true, + "DROP TABLE foo": true, + "TRUNCATE TABLE foo": true, + "RENAME TABLE foo TO bar": true, + "BEGIN": false, + "COMMIT": false, + "SAVEPOINT sp1": false, + "ROLLBACK": false, + "INSERT INTO foo VALUES (1)": false, + "SELECT 1": false, + "": false, + } + for q, expected := range cases { + assert.Equalf(t, expected, ghostferry.IsDDLQuery([]byte(q)), "query: %q", q) + } +} + +func TestParseDDLAlter(t *testing.T) { + stmt := ghostferry.ParseDDLFromQueryEvent(qe("ALTER TABLE `gftest`.`mytable` ADD COLUMN x INT", "fallback")) + if assert.NotNil(t, stmt) { + assert.Equal(t, "ALTER", stmt.DDLType) + assert.Equal(t, "gftest", stmt.SchemaName) + assert.Equal(t, "mytable", stmt.TableName) + } + + stmt = ghostferry.ParseDDLFromQueryEvent(qe("ALTER TABLE mytable ADD COLUMN x INT", "fallback")) + if assert.NotNil(t, stmt) { + assert.Equal(t, "fallback", stmt.SchemaName) + assert.Equal(t, "mytable", stmt.TableName) + } + + stmt = ghostferry.ParseDDLFromQueryEvent(qe("alter online table foo drop column bar", "db")) + if assert.NotNil(t, stmt) { + assert.Equal(t, "ALTER", stmt.DDLType) + assert.Equal(t, "foo", stmt.TableName) + } +} + +func TestParseDDLCreateDropTruncate(t *testing.T) { + stmt := ghostferry.ParseDDLFromQueryEvent(qe("CREATE TABLE IF NOT EXISTS `db`.`_foo_gho` (id INT)", "x")) + if assert.NotNil(t, stmt) { + assert.Equal(t, "CREATE", stmt.DDLType) + assert.Equal(t, "db", stmt.SchemaName) + assert.Equal(t, "_foo_gho", stmt.TableName) + } + + stmt = ghostferry.ParseDDLFromQueryEvent(qe("DROP TABLE IF EXISTS `db`.`_foo_del`", "x")) + if assert.NotNil(t, stmt) { + assert.Equal(t, "DROP", stmt.DDLType) + assert.Equal(t, "_foo_del", stmt.TableName) + } + + stmt = ghostferry.ParseDDLFromQueryEvent(qe("TRUNCATE TABLE foo", "db")) + if assert.NotNil(t, stmt) { + assert.Equal(t, "TRUNCATE", stmt.DDLType) + assert.Equal(t, "foo", stmt.TableName) + } +} + +func TestParseDDLRenameSinglePair(t *testing.T) { + stmt := ghostferry.ParseDDLFromQueryEvent(qe("RENAME TABLE `db`.`old` TO `db`.`new`", "fallback")) + if assert.NotNil(t, stmt) { + assert.Equal(t, "RENAME", stmt.DDLType) + assert.Equal(t, 1, len(stmt.RenamePairs)) + assert.Equal(t, "old", stmt.RenamePairs[0].From.TableName) + assert.Equal(t, "new", stmt.RenamePairs[0].To.TableName) + } +} + +func TestParseDDLRenameGhostSwap(t *testing.T) { + stmt := ghostferry.ParseDDLFromQueryEvent(qe( + "RENAME TABLE `db`.`foo` TO `db`.`_foo_del`, `db`.`_foo_gho` TO `db`.`foo`", + "fallback")) + if assert.NotNil(t, stmt) { + assert.Equal(t, "RENAME", stmt.DDLType) + assert.Equal(t, 2, len(stmt.RenamePairs)) + assert.Equal(t, "foo", stmt.RenamePairs[0].From.TableName) + assert.Equal(t, "_foo_del", stmt.RenamePairs[0].To.TableName) + assert.Equal(t, "_foo_gho", stmt.RenamePairs[1].From.TableName) + assert.Equal(t, "foo", stmt.RenamePairs[1].To.TableName) + + all := stmt.AllAffectedTables() + assert.Equal(t, 4, len(all)) + } +} + +func TestParseDDLNonDDL(t *testing.T) { + for _, q := range []string{"BEGIN", "COMMIT", "INSERT INTO foo VALUES (1)", ""} { + stmt := ghostferry.ParseDDLFromQueryEvent(qe(q, "db")) + assert.Nilf(t, stmt, "expected nil for non-DDL query %q", q) + } +} + +func TestShadowTableDetection(t *testing.T) { + cases := map[string]string{ + "_foo_gho": "foo", + "_foo_del": "foo", + "_foo_ghc": "foo", + "_foo_new": "foo", + "_foo_old": "foo", + "_my_table_gho": "my_table", + "foo": "", + "foo_gho": "", + "_foo_other": "", + } + for name, expectedOriginal := range cases { + assert.Equalf(t, expectedOriginal != "", ghostferry.IsShadowTable(name), "IsShadowTable(%q)", name) + assert.Equalf(t, expectedOriginal, ghostferry.OriginalTableFromShadow(name), "OriginalTableFromShadow(%q)", name) + } +}