diff --git a/cmd/index_analyzer/main.go b/cmd/index_analyzer/main.go index b7422da4..c84ad734 100644 --- a/cmd/index_analyzer/main.go +++ b/cmd/index_analyzer/main.go @@ -147,13 +147,14 @@ func analyzeIndex( } tokens := [][]byte{} + tokenUnpackBuf := &token.UnpackBuffer{} for { data := readTokenBlock() if len(data) == 0 { // empty block - section separator break } block := token.Block{} - if err := block.Unpack(data); err != nil { + if err := block.Unpack(data, b.Info.BinaryDataVer, tokenUnpackBuf); err != nil { logger.Fatal("error unpacking tokens", zap.Error(err)) } for i := range block.Len() { @@ -189,6 +190,7 @@ func analyzeIndex( lidsUniq := map[[16]byte]int{} lidsLens := make([]int, len(tokens)) tokenLIDs := []uint32{} + lidUnpackBuf := &lids.UnpackBuffer{} for { data := readLIDBlock() if len(data) == 0 { // empty block - section separator @@ -196,7 +198,7 @@ func analyzeIndex( } block := &lids.Block{} - if err := block.Unpack(data, ver, &lids.UnpackBuffer{}); err != nil { + if err := block.Unpack(data, ver, b.Info.BinaryDataVer, lidUnpackBuf); err != nil { logger.Fatal("error unpacking lids block", zap.Error(err)) } diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index de29ac4c..df908fea 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -272,6 +272,7 @@ func startStore( TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel, DocBlocksZstdLevel: cfg.Compression.DocBlockZstdCompressionLevel, DocBlockSize: int(cfg.DocsSorting.DocBlockSize), + TokenFreqThreshold: cfg.Sealing.Tokens.FreqThreshold, }, Fraction: frac.Config{ Search: frac.SearchConfig{ diff --git a/compaction/merge.go b/compaction/merge.go new file mode 100644 index 00000000..928b3044 --- /dev/null +++ b/compaction/merge.go @@ -0,0 +1,162 @@ +package compaction + +import ( + "errors" + "os" + + "github.com/ozontech/seq-db/consts" + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed" + "github.com/ozontech/seq-db/indexwriter" +) + +func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.PreloadedData, error) { + writer := indexwriter.New(params) + src := NewMergeSource(filename, srcs) + + if err := createAndWrite( + filename+consts.OffsetsTmpFileSuffix, + filename+consts.OffsetsFileSuffix, + func(f *os.File) error { return writer.WriteOffsetsFile(f, src) }, + ); err != nil { + return nil, err + } + + if err := createAndWrite( + filename+consts.IDTmpFileSuffix, + filename+consts.IDFileSuffix, + func(f *os.File) error { return writer.WriteIDFile(f, src) }, + ); err != nil { + return nil, err + } + + if err := createAndWriteBoth( + filename+consts.TokenTmpFileSuffix, + filename+consts.TokenFileSuffix, + filename+consts.LIDTmpFileSuffix, + filename+consts.LIDFileSuffix, + func(tf, lf *os.File) error { return writer.WriteTokenTriplet(tf, lf, src) }, + ); err != nil { + return nil, err + } + + if err := createAndWrite( + filename+consts.InfoTmpFileSuffix, + filename+consts.InfoFileSuffix, + func(f *os.File) error { return writer.WriteInfoFile(f, src) }, + ); err != nil { + return nil, err + } + + if err := mergeDocs(filename, srcs...); err != nil { + return nil, err + } + + info := src.Info() + info.IndexOnDisk = 0 + + for _, suffix := range []string{ + consts.InfoFileSuffix, + consts.TokenFileSuffix, + consts.OffsetsFileSuffix, + consts.IDFileSuffix, + consts.LIDFileSuffix, + } { + st, err := os.Stat(info.Path + suffix) + if err != nil { + return nil, err + } + info.IndexOnDisk += uint64(st.Size()) + } + + lidsTable := writer.LIDsTable() + preloaded := &sealed.PreloadedData{ + Info: info, + TokenTable: writer.TokenTable(), + BlocksData: sealed.BlocksData{ + LIDsTable: &lidsTable, + IDsTable: writer.IDsTable(), + BlocksOffsets: src.BlockOffsets(), + }, + } + + return preloaded, nil +} + +func mergeDocs(filename string, srcs ...Source) error { + return createAndWrite( + filename+consts.DocsTmpFileSuffix, + filename+consts.DocsFileSuffix, + func(f *os.File) error { + var docsSize uint64 + for _, src := range srcs { + for loc, err := range src.DocBlock() { + if err != nil { + return err + } + + payload, offset := loc.First, loc.Second + if _, err := f.WriteAt(payload, int64(offset+docsSize)); err != nil { + return err + } + } + + docsSize += src.Info().DocsOnDisk + } + + return nil + }, + ) +} + +func syncAndClose(f *os.File) error { + if err := f.Sync(); err != nil { + f.Close() + return err + } + return f.Close() +} + +func createAndWrite( + tmp, final string, + write func(*os.File) error, +) error { + f, err := os.Create(tmp) + if err != nil { + return err + } + + if err := errors.Join(write(f), syncAndClose(f)); err != nil { + return err + } + + return os.Rename(tmp, final) +} + +func createAndWriteBoth( + atmp, afinal, + btmp, bfinal string, + write func(*os.File, *os.File) error, +) error { + a, err := os.Create(atmp) + if err != nil { + return err + } + + b, err := os.Create(btmp) + if err != nil { + a.Close() + return err + } + + writeErr := write(a, b) + if err := errors.Join(writeErr, syncAndClose(a), syncAndClose(b)); err != nil { + return err + } + + if err := os.Rename(atmp, afinal); err != nil { + return err + } + + return os.Rename(btmp, bfinal) +} diff --git a/compaction/merge_source.go b/compaction/merge_source.go new file mode 100644 index 00000000..f2e49da7 --- /dev/null +++ b/compaction/merge_source.go @@ -0,0 +1,445 @@ +package compaction + +import ( + "bytes" + "iter" + "slices" + "sync" + + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/indexwriter" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/util" +) + +type ( + Document = util.Pair[seq.ID, []byte] + DocBlockLocation = util.Pair[[]byte, uint64] + TokenPosting = util.Pair[[]byte, []uint32] + DocLocation = util.Pair[seq.ID, seq.DocPos] + IndexedDocBlock = util.Pair[[]byte, []seq.DocPos] +) + +type Source interface { + indexwriter.Source + DocBlock() iter.Seq2[DocBlockLocation, error] +} + +type MergeSource struct { + filename string + + // sources is a slice of [sealing.Source] + // which provide view into underlying fractions. + sources []Source + + info *common.Info + infoOnce sync.Once + + offsets []uint64 + offsetsOnce sync.Once + + // docBlockCount is populated during [MergeSource.BlockOffsets] call. + // This slice is used for changing block indexes in [seq.DocPos]. + docBlockCount []int + + // lidMapping describes the transformation of lids + // after k-merge of several fractions. + // + // i-th index of [lidMapping] correponds to i-th fraction. + // j-th index of i-th [lidMapping] corresponds to rename of j-th lid. + lidMapping [][]uint32 +} + +func NewMergeSource(filename string, sources []Source) *MergeSource { + lidMapping := make([][]uint32, len(sources)) + + for i, src := range sources { + lidMapping[i] = make( + []uint32, + // Increment for [seq.SystemID]. + src.Info().DocsTotal+1, + ) + } + + s := &MergeSource{ + filename: filename, + sources: sources, + lidMapping: lidMapping, + } + + s.info = s.prepareInfo() + return s +} + +func (s *MergeSource) prepareInfo() *common.Info { + info := common.NewInfo(s.filename, 0, 0) + + var ( + from seq.MID = seq.MaxID.MID + to seq.MID = seq.MinID.MID + ) + + for _, src := range s.sources { + from = min(from, src.Info().From) + to = max(to, src.Info().To) + } + + info.From, info.To = from, to + info.SealingTime = info.CreationTime + + info.InitEmptyDistribution() + return info +} + +func (s *MergeSource) Info() *common.Info { + s.infoOnce.Do(func() { + for i := range s.sources { + sinfo := s.sources[i].Info() + + s.info.DocsRaw += sinfo.DocsRaw + s.info.DocsTotal += sinfo.DocsTotal + s.info.DocsOnDisk += sinfo.DocsOnDisk + + // NOTE(dkharms): [IndexOnDisk] is calculated later. + } + }) + + return s.info +} + +func (s *MergeSource) BlockOffsets() []uint64 { + s.offsetsOnce.Do(func() { + var ( + docsSize uint64 + offsets []uint64 + ) + + s.docBlockCount = append(s.docBlockCount, 0) + for i := 0; i < len(s.sources); i++ { + for _, offset := range s.sources[i].BlockOffsets() { + offsets = append(offsets, uint64(offset)+docsSize) + } + docsSize += s.sources[i].Info().DocsOnDisk + s.docBlockCount = append(s.docBlockCount, len(offsets)) + } + + s.offsets = offsets + }) + + return s.offsets +} + +func (s *MergeSource) ID() iter.Seq2[DocLocation, error] { + // TODO(dkharms): For now, I will use stupid-simple linear scan for k-way merge. + // + // Its time complexity O(k*n) so it's not efficient enough if we compare it + // against time complexity of min-heap (which is O(n*log(k))) + // or another great data structure -- tournament tree -- which is O(n*log(k)) as well. + // + // However, tournament tree performs less comparisons than min-heap + // and it is around log(k) vs 2*log(k). + + type cursor struct { + next func() (DocLocation, error, bool) + stop func() + + loc DocLocation + lidOld uint32 + + ok bool + } + + return func(yield func(DocLocation, error) bool) { + var cursors []cursor + + defer func() { + for _, c := range cursors { + c.stop() + } + }() + + for i := range s.sources { + src := s.sources[i] + next, stop := iter.Pull2(src.ID()) + + // Skip [seq.SystemID] and [seq.SystemDocPos]. + _, _, _ = next() + + loc, err, ok := next() + cursors = append(cursors, cursor{ + next: next, stop: stop, + loc: loc, lidOld: 1, + ok: ok && err == nil, + }) + + if err != nil { + yield(DocLocation{}, err) + return + } + } + + lid := uint32(1) + // We've previosly dropped [seq.SystemID] from + // iterators however we do have to emit one such id. + if !yield(DocLocation{First: seq.SystemID, Second: seq.SystemDocPos}, nil) { + return + } + + for { + var ( + id seq.ID = seq.MinID + idx int = -1 + ) + + for i, c := range cursors { + // We exhausted i-th cursor so there is nothing pull. + if !c.ok { + continue + } + + if seq.Less(id, c.loc.First) { + id = c.loc.First + idx = i + } + } + + // All pull-iterators are exhausted. + // Close all iterators and return. + if idx == -1 { + break + } + + c := cursors[idx] + + minID, lidOld := c.loc.First, c.lidOld + s.info.AddMID(uint64(minID.MID)) + + blockIdx, offset := c.loc.Second.Unpack() + minDocPos := seq.PackDocPos(uint32(s.docBlockCount[idx]+int(blockIdx)), offset) + + if !yield(DocLocation{First: minID, Second: minDocPos}, nil) { + return + } + + // Rename lid from picked cursor to the new value. + s.lidMapping[idx][lidOld] = lid + + var err error + c.loc, err, c.ok = c.next() + c.lidOld += 1 + + if err != nil { + cursors[idx] = c + yield(DocLocation{}, err) + return + } + + lid += 1 + cursors[idx] = c + } + } +} + +func (s *MergeSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { + // TODO(dkharms): For now, I will use stupid-simple linear scan for k-way merge. + // + // Its time complexity O(k*n) so it's not efficient enough if we compare it + // against time complexity of min-heap (which is O(n*log(k))) + // or another great data structure -- tournament tree -- which is O(n*log(k)) as well. + // + // However, tournament tree performs less comparisons than min-heap + // and it is around log(k) vs 2*log(k). + + type cursor struct { + next func() (string, iter.Seq2[TokenPosting, error], bool) + stop func() + + field string + tokIt iter.Seq2[TokenPosting, error] + + ok bool + } + + minimal := func(cursors []cursor) (string, bool) { + var ( + set bool + field string + ) + + for _, c := range cursors { + if !c.ok { + continue + } + + if !set { + field = c.field + set = true + continue + } + + field = min(field, c.field) + } + + return field, set + } + + return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) { + var cursors []cursor + + for i := range s.sources { + src := s.sources[i] + + next, stop := iter.Pull2(src.TokenTriplet()) + field, tokIt, has := next() + + cursors = append(cursors, cursor{ + next: next, stop: stop, + field: field, tokIt: tokIt, + ok: has, + }) + } + + defer func() { + for _, c := range cursors { + c.stop() + } + }() + + for { + field, ok := minimal(cursors) + if !ok { + break + } + + var ( + idxs []int + iters []iter.Seq2[TokenPosting, error] + ) + + for i, c := range cursors { + if !c.ok || c.field != field { + continue + } + + idxs = append(idxs, i) + iters = append(iters, c.tokIt) + } + + if !yield(field, s.postingsForField(idxs, iters)) { + return + } + + // Advance all cursors that were on this field. + for _, idx := range idxs { + c := cursors[idx] + c.field, c.tokIt, c.ok = c.next() + cursors[idx] = c + } + } + } +} + +func (s *MergeSource) postingsForField( + idxs []int, iters []iter.Seq2[TokenPosting, error], +) iter.Seq2[TokenPosting, error] { + type cursor struct { + next func() (TokenPosting, error, bool) + stop func() + + idx int + posting TokenPosting + + ok bool + } + + minimal := func(cursors []cursor) ([]byte, bool) { + var ( + set bool + token []byte + ) + + for _, c := range cursors { + if !c.ok { + continue + } + + if !set { + token = c.posting.First + set = true + continue + } + + if bytes.Compare(c.posting.First, token) < 0 { + token = c.posting.First + } + } + + return token, set + } + + // NB: This buffer will be reused across + // all calls within current field. + var lidRenamed []uint32 + + return func(yield func(TokenPosting, error) bool) { + var cursors []cursor + + defer func() { + for _, c := range cursors { + c.stop() + } + }() + + for i := range iters { + next, stop := iter.Pull2(iters[i]) + posting, err, ok := next() + + cursors = append(cursors, cursor{ + next: next, stop: stop, + idx: idxs[i], posting: posting, + ok: ok && err == nil, + }) + + if err != nil { + yield(TokenPosting{}, err) + return + } + } + + for { + token, ok := minimal(cursors) + if !ok { + break + } + + // Collect and remap lids from all cursors at this token, then advance them. + for i, c := range cursors { + if !c.ok || !bytes.Equal(c.posting.First, token) { + continue + } + + for _, lid := range c.posting.Second { + lidRenamed = append(lidRenamed, s.lidMapping[c.idx][lid]) + } + + var err error + c.posting, err, c.ok = c.next() + + if err != nil { + cursors[i] = c + yield(TokenPosting{}, err) + return + } + + cursors[i] = c + } + + slices.Sort(lidRenamed) + if !yield(TokenPosting{First: token, Second: lidRenamed}, nil) { + return + } + + lidRenamed = lidRenamed[:0] + } + } +} diff --git a/compaction/merge_source_test.go b/compaction/merge_source_test.go new file mode 100644 index 00000000..bb5fb3b1 --- /dev/null +++ b/compaction/merge_source_test.go @@ -0,0 +1,352 @@ +package compaction + +import ( + "cmp" + "fmt" + "iter" + "math/rand" + "slices" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/seq" +) + +type mockSealingSource struct { + ids []seq.ID + pos []seq.DocPos + blocks []uint64 + docsOnDisk uint64 + fields map[string]map[string][]uint32 +} + +func (m *mockSealingSource) Info() *common.Info { + return &common.Info{ + DocsRaw: m.docsOnDisk, + DocsTotal: uint32(len(m.ids)), + DocsOnDisk: m.docsOnDisk, + + From: slices.MinFunc(m.ids, func(x, y seq.ID) int { + return cmp.Compare(x.MID, y.MID) + }).MID, + + To: slices.MaxFunc(m.ids, func(x, y seq.ID) int { + return cmp.Compare(x.MID, y.MID) + }).MID, + } +} + +func (m *mockSealingSource) BlockOffsets() []uint64 { + return m.blocks +} + +func (m *mockSealingSource) ID() iter.Seq2[DocLocation, error] { + return func(yield func(DocLocation, error) bool) { + docloc := DocLocation{First: seq.SystemID, Second: seq.SystemDocPos} + if !yield(docloc, nil) { + return + } + + for i, id := range m.ids { + docloc = DocLocation{First: id, Second: m.pos[i]} + if !yield(docloc, nil) { + return + } + } + } +} + +func (m *mockSealingSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { + fields := make([]string, 0, len(m.fields)) + for f := range m.fields { + fields = append(fields, f) + } + + slices.Sort(fields) + return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) { + for _, field := range fields { + if !yield(field, m.postingsForField(field)) { + return + } + } + } +} + +func (m *mockSealingSource) postingsForField(field string) iter.Seq2[TokenPosting, error] { + return func(yield func(TokenPosting, error) bool) { + tokens := make([]string, 0, len(m.fields[field])) + for t := range m.fields[field] { + tokens = append(tokens, t) + } + + slices.Sort(tokens) + for _, tok := range tokens { + posting := TokenPosting{ + First: []byte(tok), + Second: m.fields[field][tok], + } + + if !yield(posting, nil) { + return + } + } + } +} + +func (m *mockSealingSource) DocBlock() iter.Seq2[DocBlockLocation, error] { + return func(yield func(DocBlockLocation, error) bool) { + if !yield(DocBlockLocation{}, nil) { + return + } + } +} + +func (m *mockSealingSource) LastError() error { + return nil +} + +func TestMergeSource(t *testing.T) { + first := &mockSealingSource{ + ids: []seq.ID{ + {MID: 3}, + {MID: 2}, + {MID: 1}, + }, + + pos: []seq.DocPos{ + seq.PackDocPos(0, 0), + seq.PackDocPos(0, 1024), + seq.PackDocPos(0, 2048), + }, + + fields: map[string]map[string][]uint32{ + "level": { + "error": {1, 3}, + "info": {2, 3}, + }, + }, + + blocks: []uint64{0}, + docsOnDisk: 1024, + } + + second := &mockSealingSource{ + ids: []seq.ID{ + {MID: 6}, + {MID: 5}, + }, + + pos: []seq.DocPos{ + seq.PackDocPos(0, 0), + seq.PackDocPos(0, 2048), + }, + + fields: map[string]map[string][]uint32{ + "level": { + "debug": {1}, + "info": {2}, + }, + }, + + blocks: []uint64{0}, + docsOnDisk: 2048, + } + + source := NewMergeSource("inmemory", []Source{first, second}) + + t.Run("offsets", func(t *testing.T) { + // Validate correctness of [storage.DocBlock] calculation. + offsets := source.BlockOffsets() + require.Equal(t, []uint64{0, 1024}, offsets) + }) + + t.Run("ids", func(t *testing.T) { + var ( + ids []seq.ID + docpos []seq.DocPos + ) + + for loc, err := range source.ID() { + require.NoError(t, err) + ids = append(ids, loc.First) + docpos = append(docpos, loc.Second) + } + + require.Equal(t, + []seq.ID{ + seq.SystemID, + // [seq.ID] from the second source. + {MID: 6}, + {MID: 5}, + // [seq.ID] from the first source. + {MID: 3}, + {MID: 2}, + {MID: 1}, + }, + ids, + ) + + require.Equal(t, + []seq.DocPos{ + seq.SystemDocPos, + // [seq.DocPos] from the second source. + seq.PackDocPos(1, 0), seq.PackDocPos(1, 2048), + // [seq.DocPos] from the first source. + seq.PackDocPos(0, 0), seq.PackDocPos(0, 1024), seq.PackDocPos(0, 2048), + }, + docpos, + ) + }) + + t.Run("tokens-lids", func(t *testing.T) { + var ( + fields []string + tokens [][]byte + lids [][]uint32 + ) + + for field, fieldIt := range source.TokenTriplet() { + fields = append(fields, field) + + for posting, err := range fieldIt { + require.NoError(t, err) + tokens = append(tokens, posting.First) + lids = append(lids, slices.Clone(posting.Second)) + } + } + + // Both sources have the same and the only field. + require.Equal(t, []string{"level"}, fields) + + // Ensure tokens are sorted in ascending order. + require.Equal(t, + [][]byte{[]byte("debug"), []byte("error"), []byte("info")}, + tokens, + ) + + // Ensure correctness of lids remapping: + // ------------------------- + // seq.MID 6 5 | 3 2 1 + // seq.LID (old) 1 2 | 1 2 3 + // seq.LID (new) 1 2 | 3 4 5 + // ------------------------- + require.Equal(t, + [][]uint32{ + // Sequence of [seq.LID] for token `debug`. + {1}, + // Sequence of [seq.LID] for token `error`. + {3, 5}, + // Sequence of [seq.LID] for token `info`. + {2, 4, 5}, + }, + lids, + ) + }) + + t.Run("info", func(t *testing.T) { + merged := source.Info() + finfo, sinfo := first.Info(), second.Info() + + // Validate correctness of fraction time-range. + require.Equal(t, merged.From, min(finfo.From, sinfo.From)) + require.Equal(t, merged.To, max(finfo.To, sinfo.To)) + + // Validate correctness of total documents of merged fractions. + require.Equal(t, merged.DocsTotal, finfo.DocsTotal+sinfo.DocsTotal) + require.Equal(t, merged.DocsOnDisk, finfo.DocsOnDisk+sinfo.DocsOnDisk) + require.Equal(t, merged.DocsRaw, finfo.DocsRaw+sinfo.DocsRaw) + + // Validate correctness of distribution. + require.NotNil(t, merged.Distribution) + require.True(t, merged.IsIntersecting(finfo.From, finfo.To)) + require.True(t, merged.IsIntersecting(sinfo.From, sinfo.To)) + require.True(t, merged.IsIntersecting(min(finfo.From, sinfo.From), max(finfo.To, sinfo.To))) + }) +} + +func BenchmarkMergeSource(b *testing.B) { + const ( + numSources = 4 + docsPerSource = 512_000 + + // Total count of pairs of (field, token) will be + // [numFields] * [numTokens]. + numFields = 512 + numTokens = 16384 + ) + + rng := rand.New(rand.NewSource(42)) + + fieldNames := make([]string, numFields) + for i := range fieldNames { + fieldNames[i] = fmt.Sprintf("field-%d", i) + } + + tokenNames := make([]string, numTokens) + for i := range tokenNames { + tokenNames[i] = fmt.Sprintf("token-%d", i) + } + + makeSource := func(midOffset seq.MID) Source { + ids := make([]seq.ID, docsPerSource) + pos := make([]seq.DocPos, docsPerSource) + + for j := range ids { + // IDs must be in descending MID order within each source. + ids[j] = seq.ID{MID: midOffset + seq.MID(docsPerSource-j)} + pos[j] = seq.PackDocPos(0, uint64(j*64)) + } + + // Assign each lid to a random (field, token) pair from the vocabulary + // so that total lids per source equals [docsPerSource]. + fields := make(map[string]map[string][]uint32) + for lid := uint32(1); lid <= uint32(docsPerSource); lid++ { + field := fieldNames[rng.Intn(numFields)] + token := tokenNames[rng.Intn(numTokens)] + + if fields[field] == nil { + fields[field] = make(map[string][]uint32) + } + + fields[field][token] = append(fields[field][token], lid) + } + + for _, tokens := range fields { + for tok, lids := range tokens { + slices.Sort(lids) + tokens[tok] = lids + } + } + + return &mockSealingSource{ + ids: ids, + pos: pos, + blocks: []uint64{0}, + docsOnDisk: docsPerSource * 64, + fields: fields, + } + } + + sources := make([]Source, numSources) + for i := range sources { + sources[i] = makeSource(seq.MID(i * docsPerSource)) + } + + b.ResetTimer() + b.ReportAllocs() + + for b.Loop() { + ms := NewMergeSource("bench", sources) + + ms.BlockOffsets() + for range ms.ID() { + } + + for _, tokIt := range ms.TokenTriplet() { + for range tokIt { + } + } + } +} diff --git a/config/config.go b/config/config.go index 0d929a7f..f32ee574 100644 --- a/config/config.go +++ b/config/config.go @@ -75,6 +75,12 @@ type Config struct { // BlockSize sets max lids (postings) saved per LIDs block. BlockSize int `config:"block_size" default:"65536"` } `config:"lids"` + + Tokens struct { + // FreqThreshold specifies minimum number of lids (postings) a token should have + // so that frequency for that token will be stored inside token blocks. + FreqThreshold int `config:"freq_threshold" default:"50"` + } `config:"tokens"` } `config:"sealing"` Cluster struct { diff --git a/config/frac_version.go b/config/frac_version.go index 73c3261a..624b718b 100644 --- a/config/frac_version.go +++ b/config/frac_version.go @@ -21,6 +21,9 @@ const ( // BinaryDataV4 - delta bitpack encoded MIDs and LIDs BinaryDataV4 + + // BinaryDataV5 - token block has doc frequencies for heavy tokens + BinaryDataV5 ) -const CurrentFracVersion = BinaryDataV4 +const CurrentFracVersion = BinaryDataV5 diff --git a/consts/consts.go b/consts/consts.go index ccaba4e2..1ea8120b 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -23,6 +23,7 @@ const ( DefaultBulkRequestsLimit = 32 DefaultSearchRequestsLimit = 32 + DefaultTokenFreqThreshold = 50 BulkMaxTries = 3 @@ -56,6 +57,7 @@ const ( WalFileSuffix = ".wal" DocsFileSuffix = ".docs" + DocsTmpFileSuffix = "._docs" DocsDelFileSuffix = ".docs.del" SdocsFileSuffix = ".sdocs" diff --git a/frac/common/info.go b/frac/common/info.go index b82f6b99..2a3805aa 100644 --- a/frac/common/info.go +++ b/frac/common/info.go @@ -82,6 +82,13 @@ func (s *Info) BuildDistribution(mids []uint64) { } } +func (s *Info) AddMID(mid uint64) { + if s.Distribution == nil { + return + } + s.Distribution.Add(seq.MID(mid)) +} + func (s *Info) InitEmptyDistribution() bool { from := s.From.Time() creationTime := time.UnixMilli(int64(s.CreationTime)) diff --git a/frac/common/seal_params.go b/frac/common/seal_params.go index 05f89696..e0fd362e 100644 --- a/frac/common/seal_params.go +++ b/frac/common/seal_params.go @@ -10,4 +10,6 @@ type SealParams struct { DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block. LIDBlockSize int DocBlockSize int // DocBlockSize is decompressed payload size of document block. + + TokenFreqThreshold int // TokenFreqThreshold Min lids count to store frequency for a token. } diff --git a/frac/fraction_concurrency_test.go b/frac/fraction_concurrency_test.go index 560760cd..40024a6e 100644 --- a/frac/fraction_concurrency_test.go +++ b/frac/fraction_concurrency_test.go @@ -16,9 +16,9 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" testcommon "github.com/ozontech/seq-db/tests/common" diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 244aeb99..ab69509d 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -23,10 +23,10 @@ import ( "github.com/ozontech/seq-db/cache" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/processor" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/indexer" "github.com/ozontech/seq-db/node" "github.com/ozontech/seq-db/parser" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" @@ -1901,11 +1901,11 @@ func (s *FractionTestSuite) TestFractionInfo() { s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") case *Sealed: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1450), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1500), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) case *Remote: s.Require().Equal(uint64(0), info.MetaOnDisk, "meta on disk doesn't match. actual value") - s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1450), + s.Require().True(info.IndexOnDisk > uint64(1300) && info.IndexOnDisk < uint64(1500), "index on disk doesn't match. actual value: %d", info.IndexOnDisk) default: s.Require().Fail("unsupported fraction type") diff --git a/frac/remote.go b/frac/remote.go index 9630b8ca..b249a27e 100644 --- a/frac/remote.go +++ b/frac/remote.go @@ -191,7 +191,7 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs), - tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, f.info.BinaryDataVer, tokenReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, diff --git a/frac/sealed.go b/frac/sealed.go index 4bde6d5b..20759b8c 100644 --- a/frac/sealed.go +++ b/frac/sealed.go @@ -392,6 +392,7 @@ func (f *Sealed) Release() { func (f *Sealed) Suicide() { f.Release() + // Rename docs atomically first — this commits the intent to delete. oldPath := f.BaseFileName + consts.DocsFileSuffix newPath := f.BaseFileName + consts.DocsDelFileSuffix @@ -513,7 +514,7 @@ func (f *Sealed) createDataProvider(ctx context.Context) *sealedDataProvider { blocksOffsets: f.blocksData.BlocksOffsets, lidsTable: f.blocksData.LIDsTable, lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs), - tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, f.info.BinaryDataVer, tokenReader, f.indexCache.Tokens), tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable), idsTable: &f.blocksData.IDsTable, diff --git a/frac/sealed/block_offsets.go b/frac/sealed/block_offsets.go index 2be59942..d644a0f7 100644 --- a/frac/sealed/block_offsets.go +++ b/frac/sealed/block_offsets.go @@ -6,13 +6,17 @@ import ( ) type BlockOffsets struct { - IDsTotal uint32 // todo: the best place for this field is Info block - Offsets []uint64 + Offsets []uint64 } func (b *BlockOffsets) Pack(buf []byte) []byte { buf = binary.LittleEndian.AppendUint32(buf, uint32(len(b.Offsets))) - buf = binary.LittleEndian.AppendUint32(buf, b.IDsTotal) + + // NOTE(dkharms): Previously we stored here amount of documents ids. + // + // I've created a task which will require fraction binary version bumping + // to get rid of this: https://github.com/ozontech/seq-db/issues/409 + buf = binary.LittleEndian.AppendUint32(buf, 0) var prev uint64 for _, pos := range b.Offsets { @@ -26,13 +30,16 @@ func (b *BlockOffsets) Unpack(data []byte) error { if len(data) < 4 { return errors.New("blocks offset decoding error: truncated header (missing offsets count)") } + idsBlocksCount := binary.LittleEndian.Uint32(data) data = data[4:] if len(data) < 4 { return errors.New("blocks offset decoding error: truncated header (missing IDsTotal)") } - b.IDsTotal = binary.LittleEndian.Uint32(data) + + // NOTE(dkharms): Previously we stored here amount of documents ids. + _ = binary.LittleEndian.Uint32(data) data = data[4:] offset := uint64(0) @@ -42,15 +49,20 @@ func (b *BlockOffsets) Unpack(data []byte) error { if n == 0 { return errors.New("blocks offset decoding error: varint returned 0") } + if n < 0 { return errors.New("blocks offset decoding error: varint overflow") } + data = data[n:] offset += uint64(delta) + b.Offsets = append(b.Offsets, offset) } + if uint32(len(b.Offsets)) != idsBlocksCount { return errors.New("blocks offset decoding error: offset count mismatch") } + return nil } diff --git a/frac/sealed/seqids/loader.go b/frac/sealed/seqids/loader.go index defa865f..334e07a8 100644 --- a/frac/sealed/seqids/loader.go +++ b/frac/sealed/seqids/loader.go @@ -13,7 +13,6 @@ import ( type Table struct { MinBlockIDs []seq.ID // from max to min - IDBlocksTotal uint32 IDsTotal uint32 StartBlockIndex uint32 } diff --git a/frac/sealed/token/block_loader.go b/frac/sealed/token/block_loader.go index 77c73798..f95296e1 100644 --- a/frac/sealed/token/block_loader.go +++ b/frac/sealed/token/block_loader.go @@ -5,46 +5,109 @@ import ( "encoding/binary" "fmt" "math" + "sort" "unsafe" "go.uber.org/zap" "github.com/ozontech/seq-db/cache" + "github.com/ozontech/seq-db/config" "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/packer" "github.com/ozontech/seq-db/pattern" "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" ) -const sizeOfUint32 = uint32(unsafe.Sizeof(uint32(0))) - type Block struct { - Payload []byte - Offsets []uint32 + Payload []byte + Offsets []uint32 + FreqIndexes []uint16 // indexes of tokens which have doc freqs (frequencies) + Freqs []uint32 // frequencies of certain tokens (how many docs have this token included at least once) } func (b *Block) Size() int { const selfSize = int(unsafe.Sizeof(Block{})) - return selfSize + cap(b.Payload) + cap(b.Offsets)*int(sizeOfUint32) + return selfSize + + cap(b.Payload) + + cap(b.Offsets)*util.SizeOfUint32 + + cap(b.FreqIndexes)*util.SizeOfUint16 + + cap(b.Freqs)*util.SizeOfUint32 +} + +func (b Block) Pack(dst []byte, buf []uint32) []byte { + dst = binary.LittleEndian.AppendUint32(dst, uint32(len(b.Payload))) + dst = append(dst, b.Payload...) + dst = packer.CompressDeltaBitpackUint16(dst, b.FreqIndexes, buf) + return packer.CompressDeltaBitpackUint32(dst, b.Freqs, buf) } -func (b Block) Pack(dst []byte) []byte { - return append(dst, b.Payload...) +func (b *Block) Unpack(data []byte, fracVer config.BinaryDataVersion, unpackBuf *UnpackBuffer) error { + if fracVer >= config.BinaryDataV5 { + unpackBuf.Reset(fracVer) + return b.unpackV5(data, unpackBuf) + } + return b.unpackV1(data) +} + +func (b *Block) unpackV1(data []byte) error { + b.Payload = append([]byte{}, data...) + return b.parseTokenPayload(b.Payload) +} + +func (b *Block) unpackV5(data []byte, buf *UnpackBuffer) error { + if len(data) < util.SizeOfUint32 { + return fmt.Errorf("token block too short: %d bytes", len(data)) + } + + payloadLen := binary.LittleEndian.Uint32(data[:util.SizeOfUint32]) + data = data[util.SizeOfUint32:] + if uint32(len(data)) < payloadLen { + return fmt.Errorf("invalid token block payload length: %d, data len %d", payloadLen, len(data)) + } + + payload := data[:payloadLen] + data = data[payloadLen:] + + b.Payload = append(b.Payload[:0], payload...) + + if err := b.parseTokenPayload(payload); err != nil { + return err + } + + var err error + var freqIndexes []uint16 + data, freqIndexes, err = packer.DecompressDeltaBitpackUint16(data, buf.decompressedUint16, buf.compressed) + if err != nil { + return err + } + b.FreqIndexes = append(b.FreqIndexes, freqIndexes...) + + var freqs []uint32 + _, freqs, err = packer.DecompressDeltaBitpackUint32(data, buf.decompressedUint32, buf.compressed) + if err != nil { + return err + } + b.Freqs = append(b.Freqs, freqs...) + + return nil } -func (b *Block) Unpack(data []byte) error { +func (b *Block) parseTokenPayload(data []byte) error { + b.Offsets = b.Offsets[:0] + var offset uint32 - b.Payload = data for i := 0; len(data) != 0; i++ { l := binary.LittleEndian.Uint32(data) - data = data[sizeOfUint32:] - offset += sizeOfUint32 + data = data[util.SizeOfUint32:] + offset += uint32(util.SizeOfUint32) if l == math.MaxUint32 { continue } if l > uint32(len(data)) { return fmt.Errorf("wrong field block for token %d, in pos %d", i, offset) } - b.Offsets = append(b.Offsets, offset-sizeOfUint32) + b.Offsets = append(b.Offsets, offset-uint32(util.SizeOfUint32)) data = data[l:] offset += l } @@ -55,10 +118,24 @@ func (b *Block) Len() int { return len(b.Offsets) } +// GetFreq returns frequency for a token if stored or 0 otherwise +func (b *Block) GetFreq(index int) uint32 { + if b.Freqs == nil { + return 0 + } + + idx := uint16(index) + found := sort.Search(len(b.FreqIndexes), func(i int) bool { return b.FreqIndexes[i] >= idx }) + if found < len(b.FreqIndexes) && b.FreqIndexes[found] == idx { + return b.Freqs[found] + } + return 0 +} + func (b *Block) GetToken(index int) []byte { offset := b.Offsets[index] l := binary.LittleEndian.Uint32(b.Payload[offset:]) - offset += sizeOfUint32 // skip val length + offset += uint32(util.SizeOfUint32) // skip val length return b.Payload[offset : offset+l] } @@ -90,16 +167,26 @@ func (b *Block) find(from, to int, searcher pattern.Searcher) ([]int, error) { // NOT THREAD SAFE. Do not use concurrently. // Use your own BlockLoader instance for each search query type BlockLoader struct { - fracName string - cache *cache.Cache[*Block] - reader *storage.IndexReader + fracName string + fracVer config.BinaryDataVersion + cache *cache.Cache[*Block] + reader *storage.IndexReader + unpackBuf *UnpackBuffer + blockBuf []byte } -func NewBlockLoader(fracName string, reader *storage.IndexReader, c *cache.Cache[*Block]) *BlockLoader { +func NewBlockLoader( + fracName string, + fracVer config.BinaryDataVersion, + reader *storage.IndexReader, + c *cache.Cache[*Block], +) *BlockLoader { return &BlockLoader{ - fracName: fracName, - cache: c, - reader: reader, + fracName: fracName, + fracVer: fracVer, + cache: c, + reader: reader, + unpackBuf: &UnpackBuffer{}, } } @@ -120,11 +207,34 @@ func (l *BlockLoader) Load(index uint32) *Block { } func (l *BlockLoader) read(index uint32) (*Block, error) { - data, _, err := l.reader.ReadIndexBlock(index, nil) + var err error + l.blockBuf, _, err = l.reader.ReadIndexBlock(index, l.blockBuf) if err != nil { return nil, err } - block := Block{} - err = block.Unpack(data) - return &block, err + block := &Block{} + err = block.Unpack(l.blockBuf, l.fracVer, l.unpackBuf) + return block, err +} + +type UnpackBuffer struct { + decompressedUint32 []uint32 // temporary buffer for bitpack + decompressedUint16 []uint16 // temporary buffer for bitpack + compressed []uint32 // temporary buffer for bitpack +} + +func (b *UnpackBuffer) Reset(fracVer config.BinaryDataVersion) { + if fracVer < config.BinaryDataV5 { + return + } + if b.decompressedUint32 == nil { + b.decompressedUint32 = make([]uint32, 0, 256) + } else { + b.decompressedUint32 = b.decompressedUint32[:0] + } + if b.compressed == nil { + b.compressed = make([]uint32, 0, 256) + } else { + b.compressed = b.compressed[:0] + } } diff --git a/frac/sealed/token/block_loader_test.go b/frac/sealed/token/block_loader_test.go new file mode 100644 index 00000000..738331f8 --- /dev/null +++ b/frac/sealed/token/block_loader_test.go @@ -0,0 +1,91 @@ +package token + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/ozontech/seq-db/config" +) + +func TestBlock_PackUnpack_NoFreq(t *testing.T) { + src := Block{ + Payload: packTokenPayload([]byte("foo"), []byte("bar")), + } + + var buf []uint32 + packed := src.Pack(nil, buf) + var dst Block + require.NoError(t, dst.Unpack(packed, config.BinaryDataV5, &UnpackBuffer{})) + + assert.Equal(t, 2, dst.Len()) + assert.Equal(t, []byte("foo"), dst.GetToken(0)) + assert.Equal(t, []byte("bar"), dst.GetToken(1)) + + assert.Empty(t, dst.FreqIndexes) + assert.Empty(t, dst.Freqs) +} + +func TestBlock_PackUnpack_WithFreq(t *testing.T) { + src := Block{ + Payload: packTokenPayload([]byte("dog"), []byte("cat"), []byte("horse"), []byte("duck")), + FreqIndexes: []uint16{0, 2}, + Freqs: []uint32{100, 200}, + } + + var buf []uint32 + packed := src.Pack(nil, buf) + var dst Block + require.NoError(t, dst.Unpack(packed, config.BinaryDataV5, &UnpackBuffer{})) + + assert.Equal(t, src.Payload, dst.Payload) + + assert.Equal(t, uint32(100), dst.GetFreq(0)) + assert.Equal(t, uint32(0), dst.GetFreq(1)) + assert.Equal(t, uint32(200), dst.GetFreq(2)) + assert.Equal(t, uint32(0), dst.GetFreq(3)) +} + +func TestBlock_Unpack_Legacy(t *testing.T) { + legacy := packTokenPayload([]byte("legacy")) + + var dst Block + require.NoError(t, dst.Unpack(legacy, config.BinaryDataV4, &UnpackBuffer{})) + + assert.Equal(t, legacy, dst.Payload) + assert.Equal(t, []uint32{0}, dst.Offsets) + assert.Empty(t, dst.FreqIndexes) + assert.Empty(t, dst.Freqs) +} + +func TestBlock_UnpackBufferReuse(t *testing.T) { + src := Block{ + Payload: packTokenPayload([]byte("a"), []byte("b")), + FreqIndexes: []uint16{1}, + Freqs: []uint32{64}, + } + + var packBuf []uint32 + packed := src.Pack(nil, packBuf) + + var dst1, dst2 Block + require.NoError(t, dst1.Unpack(packed, config.BinaryDataV5, &UnpackBuffer{})) + require.NoError(t, dst2.Unpack(packed, config.BinaryDataV5, &UnpackBuffer{})) + + assert.Equal(t, dst1.FreqIndexes, dst2.FreqIndexes) + assert.Equal(t, dst1.Freqs, dst2.Freqs) + + assert.Equal(t, uint32(0), dst2.GetFreq(0)) + assert.Equal(t, uint32(64), dst2.GetFreq(1)) +} + +func packTokenPayload(tokens ...[]byte) []byte { + var payload []byte + for _, tok := range tokens { + payload = binary.LittleEndian.AppendUint32(payload, uint32(len(tok))) + payload = append(payload, tok...) + } + return payload +} diff --git a/frac/sealed_loader.go b/frac/sealed_loader.go index 893b75a4..10d95a2d 100644 --- a/frac/sealed_loader.go +++ b/frac/sealed_loader.go @@ -36,7 +36,7 @@ func (l *LegacyLoader) Load(blocksData *sealed.BlocksData, info *common.Info, re l.skipSection() // skip token table blocks var err error - blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info.BinaryDataVer) + blocksData.IDsTable, blocksData.BlocksOffsets, err = l.loadIDs(info) if err != nil { logger.Fatal("legacy load ids error", zap.Error(err)) } @@ -77,7 +77,7 @@ func (l *LegacyLoader) skipSection() { } // loadIDs reads the BlockOffsets block and then scans MID/RID/Pos triplets. -func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Table, []uint64, error) { +func (l *LegacyLoader) loadIDs(info *common.Info) (seqids.Table, []uint64, error) { var buf []byte data, _, err := l.reader.ReadIndexBlock(l.blockIndex, buf) @@ -94,9 +94,8 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab l.blockIndex++ table := seqids.Table{ - StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index - IDsTotal: offsets.IDsTotal, - IDBlocksTotal: uint32(len(offsets.Offsets)), + StartBlockIndex: l.blockIndex, // absolute index of first MID block in .index + IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID] } for { @@ -111,7 +110,7 @@ func (l *LegacyLoader) loadIDs(fracVersion config.BinaryDataVersion) (seqids.Tab } mid := seq.MID(h.GetExt1()) - if fracVersion < config.BinaryDataV2 { + if info.BinaryDataVer < config.BinaryDataV2 { mid = seq.MillisToMID(h.GetExt1()) } @@ -184,10 +183,9 @@ func (l *Loader) Load(blocksData *sealed.BlocksData, info *common.Info, readers if err != nil { logger.Fatal("load offsets error", zap.Error(err)) } - blocksData.BlocksOffsets = blockOffsets.Offsets - blocksData.IDsTable = l.loadIDsTable(readers.ID, blockOffsets.IDsTotal, info.BinaryDataVer) + blocksData.IDsTable = l.loadIDsTable(readers.ID, info) blocksData.LIDsTable, err = l.loadLIDsTable(readers.LID) if err != nil { logger.Fatal("load lids error", zap.Error(err)) @@ -227,10 +225,10 @@ func (l *Loader) loadBlocksOffsets(r storage.IndexReader) (sealed.BlockOffsets, // loadIDsTable scans block headers in the .id file to build seqids.Table. // Blocks are stored as (MIDs, RIDs, Pos) triplets; we only need MIDs headers. -func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersion config.BinaryDataVersion) seqids.Table { +func (l *Loader) loadIDsTable(r storage.IndexReader, info *common.Info) seqids.Table { table := seqids.Table{ StartBlockIndex: 0, - IDsTotal: idsTotal, + IDsTotal: info.DocsTotal + 1, // Increment by one for [seq.SystemID] } blocksCount, err := r.BlocksCount() @@ -248,7 +246,7 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio } var mid seq.MID - if fracVersion < config.BinaryDataV2 { + if info.BinaryDataVer < config.BinaryDataV2 { mid = seq.MillisToMID(header.GetExt1()) } else { mid = seq.MID(header.GetExt1()) @@ -258,8 +256,6 @@ func (l *Loader) loadIDsTable(r storage.IndexReader, idsTotal uint32, fracVersio MID: mid, RID: seq.RID(header.GetExt2()), }) - - table.IDBlocksTotal++ } return table diff --git a/frac/sealed_source.go b/frac/sealed_source.go new file mode 100644 index 00000000..a36ca34d --- /dev/null +++ b/frac/sealed_source.go @@ -0,0 +1,159 @@ +package frac + +import ( + "iter" + "slices" + + "github.com/ozontech/seq-db/frac/common" + "github.com/ozontech/seq-db/frac/sealed/lids" + "github.com/ozontech/seq-db/frac/sealed/seqids" + "github.com/ozontech/seq-db/frac/sealed/token" + "github.com/ozontech/seq-db/indexwriter" + "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" +) + +type DocBlockLocation = util.Pair[[]byte, uint64] + +// SealedSource implements [indexwriter.Source] for a sealed fraction. +// Used as input to [compaction.MergeSource] when compacting multiple fractions. +type SealedSource struct { + f *Sealed + + idsProvider *seqids.Provider + lidsLoader *lids.Loader + + tokenBlockLoader *token.BlockLoader + tokenTableLoader *token.TableLoader +} + +func NewSealedSource(f *Sealed) *SealedSource { + f.init(true) + return &SealedSource{ + f: f, + idsProvider: seqids.NewProvider( + &f.idReader, + f.indexCache.MIDs, + f.indexCache.RIDs, + f.indexCache.Params, + &f.blocksData.IDsTable, + f.info.BinaryDataVer, + ), + lidsLoader: lids.NewLoader(f.Info().BinaryDataVer, &f.lidReader, f.indexCache.LIDs), + tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, f.info.BinaryDataVer, &f.tokenReader, f.indexCache.Tokens), + tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, &f.tokenReader, f.indexCache.TokenTable), + } +} + +func (s *SealedSource) Info() *common.Info { + return s.f.info +} + +func (s *SealedSource) BlockOffsets() []uint64 { + return s.f.blocksData.BlocksOffsets +} + +func (s *SealedSource) ID() iter.Seq2[indexwriter.DocLocation, error] { + return func(yield func(indexwriter.DocLocation, error) bool) { + for lid := uint32(0); lid < s.f.blocksData.IDsTable.IDsTotal; lid++ { + mid, err := s.idsProvider.MID(seq.LID(lid)) + if err != nil { + yield(indexwriter.DocLocation{}, err) + return + } + + rid, err := s.idsProvider.RID(seq.LID(lid)) + if err != nil { + yield(indexwriter.DocLocation{}, err) + return + } + + pos, err := s.idsProvider.DocPos(seq.LID(lid)) + if err != nil { + yield(indexwriter.DocLocation{}, err) + return + } + + if !yield(indexwriter.DocLocation{First: seq.ID{MID: mid, RID: rid}, Second: pos}, nil) { + return + } + } + } +} + +func (s *SealedSource) TokenTriplet() iter.Seq2[string, iter.Seq2[indexwriter.TokenPosting, error]] { + tokenTable := s.tokenTableLoader.Load() + + fields := make([]string, 0, len(tokenTable)) + for field := range tokenTable { + fields = append(fields, field) + } + + slices.Sort(fields) + return func(yield func(string, iter.Seq2[indexwriter.TokenPosting, error]) bool) { + for _, field := range fields { + if !yield(field, s.postingsForField(field)) { + return + } + } + } +} + +func (s *SealedSource) postingsForField(field string) iter.Seq2[indexwriter.TokenPosting, error] { + lidsTable := s.f.blocksData.LIDsTable + tokenTable := s.tokenTableLoader.Load() + + var lidsBuf []uint32 + return func(yield func(indexwriter.TokenPosting, error) bool) { + for _, entry := range tokenTable[field].Entries { + block := s.tokenBlockLoader.Load(entry.BlockIndex) + + for tid := entry.StartTID; tid < entry.StartTID+entry.ValCount; tid++ { + lidsBuf = lidsBuf[:0] + + tokenVal := block.GetToken(entry.GetIndexInTokensBlock(tid)) + firstBlock := lidsTable.GetFirstBlockIndexForTID(tid) + lastBlock := lidsTable.GetLastBlockIndexForTID(tid) + + for bi := firstBlock; bi <= lastBlock; bi++ { + lidBlock, err := s.lidsLoader.GetLIDsBlock(bi) + if err != nil { + yield(indexwriter.TokenPosting{}, err) + return + } + + chunkIdx := lidsTable.GetChunkIndex(bi, tid) + lidsBuf = append(lidsBuf, lidBlock.LIDs[lidBlock.Offsets[chunkIdx]:lidBlock.Offsets[chunkIdx+1]]...) + } + + if !yield(indexwriter.TokenPosting{First: tokenVal, Second: lidsBuf}, nil) { + return + } + } + } + } +} + +func (s *SealedSource) DocBlock() iter.Seq2[DocBlockLocation, error] { + return func(yield func(DocBlockLocation, error) bool) { + // We do not want to cache payload of DocBlock because + // it will just pollute cache and cause unnecessary evictions. + r := storage.NewDocBlocksReader(s.f.readLimiter, s.f.docsFile) + + for _, offset := range s.f.blocksData.BlocksOffsets { + // Read DocBlock payload (including its header) but do not decompress it. + // Caller of [SealedSource.DocBlock] will decide whether it requires decompressed data. + payload, _, err := r.ReadDocBlock(int64(offset)) + if err != nil { + yield(DocBlockLocation{}, err) + return + } + + loc := DocBlockLocation{First: payload, Second: offset} + if !yield(loc, nil) { + return + } + } + } +} diff --git a/fracmanager/config.go b/fracmanager/config.go index e295aada..e909efb2 100644 --- a/fracmanager/config.go +++ b/fracmanager/config.go @@ -69,6 +69,9 @@ func FillConfigWithDefault(config *Config) *Config { if config.SealParams.TokenTableZstdLevel == 0 { config.SealParams.TokenTableZstdLevel = zstdDefaultLevel } + if config.SealParams.TokenFreqThreshold == 0 { + config.SealParams.TokenFreqThreshold = consts.DefaultTokenFreqThreshold + } if config.ReplayWorkers == 0 { config.ReplayWorkers = consts.DefaultReplayWorkers } diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 569c5cec..81960502 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -79,11 +79,11 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk wg.Wait() // finalize appender to prevent new writes - appender := lc.registry.Appender() - if err := appender.Finalize(); err != nil { + appender := lc.registry.appender() + if err := appender.finalize(); err != nil { logger.Fatal("shutdown fraction freezing error", zap.Error(err)) } - appender.WaitWriteIdle() + appender.waitWriteIdle() stopIdx() @@ -98,16 +98,50 @@ func New(ctx context.Context, cfg *Config, s3cli *s3.Client, skipMaskProvider sk return &fm, stop, nil } +type CompactionSnapshot struct { + claimed []*refCountedSealed +} + +func (cs *CompactionSnapshot) Fractions() []*frac.Sealed { + result := make([]*frac.Sealed, len(cs.claimed)) + for i, f := range cs.claimed { + result[i] = f.Sealed + } + return result +} + +func (cs *CompactionSnapshot) Destroy() { + for _, f := range cs.claimed { + f.Destroy() + } +} + +func (fm *FracManager) SealedFractionsSnapshot() []*frac.Sealed { + return fm.lc.registry.sealedSnapshot() +} + +func (fm *FracManager) ClaimForCompaction(names []string) (*CompactionSnapshot, error) { + claimed, err := fm.lc.registry.claimForCompaction(names) + if err != nil { + return nil, err + } + return &CompactionSnapshot{claimed: claimed}, nil +} + +func (fm *FracManager) SubstituteWithSealed(produced *frac.Sealed, snapshot *CompactionSnapshot) { + fm.lc.registry.substituteWithSealed(produced, snapshot.claimed...) +} + func (fm *FracManager) AcquireFraction(name string) (frac.Fraction, func(), bool) { - return fm.lc.registry.AcquireOneFraction(name) + return fm.lc.registry.acquireOneFraction(name) } func (fm *FracManager) AcquireFractions() (List, func()) { - return fm.lc.registry.AcquireAllFractions() + return fm.lc.registry.acquireAllFractions() } func (fm *FracManager) Oldest() uint64 { - return fm.lc.registry.OldestTotal() + return fm.lc.registry.oldestTotal() } func (fm *FracManager) Flags() *StateManager { @@ -123,7 +157,7 @@ func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas return ctx.Err() default: // Try to append data to the currently active fraction - err := fm.lc.registry.Appender().Append(docs, metas) + err := fm.lc.registry.appender().append(docs, metas) if err != nil { logger.Info("append fail", zap.Error(err)) if err == ErrFractionNotWritable { @@ -169,7 +203,7 @@ func startStatsWorker(ctx context.Context, cfg *Config, reg *fractionRegistry, w logger.Info("stats loop is started") // Run stats collection every 10 seconds util.RunEvery(ctx.Done(), time.Second*10, func() { - stats := reg.Stats() + stats := reg.statistics() stats.Log() // Log statistics stats.SetMetrics() // Update Prometheus metrics diff --git a/fracmanager/fracmanager_for_tests.go b/fracmanager/fracmanager_for_tests.go index c4ec1cad..39349289 100644 --- a/fracmanager/fracmanager_for_tests.go +++ b/fracmanager/fracmanager_for_tests.go @@ -3,7 +3,7 @@ package fracmanager import "sync" func (fm *FracManager) WaitIdleForTests() { - fm.lc.registry.Appender().WaitWriteIdle() + fm.lc.registry.appender().waitWriteIdle() } func (fm *FracManager) SealForcedForTests() { diff --git a/fracmanager/fracmanager_test.go b/fracmanager/fracmanager_test.go index 4c372754..4a25deae 100644 --- a/fracmanager/fracmanager_test.go +++ b/fracmanager/fracmanager_test.go @@ -66,7 +66,7 @@ func TestSealingOnShutdown(t *testing.T) { cfg, fm, stop := setupFracManager(t, cfg) appendDocsToFracManager(t, fm, 10) - activeName := fm.lc.registry.all.fractions[0].Info().Name() + activeName := fm.lc.registry.snapshot.fractions[0].Info().Name() stop() @@ -74,7 +74,7 @@ func TestSealingOnShutdown(t *testing.T) { cfg.MinSealFracSize = 1 // to ensure that the frac will be sealed on shutdown cfg, fm, stop = setupFracManager(t, cfg) - allFractions := fm.lc.registry.all.fractions + allFractions := fm.lc.registry.snapshot.fractions assert.Equal(t, 1, len(allFractions), "should have one fraction") assert.Equal(t, activeName, allFractions[0].Info().Name(), "fraction should have the same name") _, ok := allFractions[0].(*syncAppender) @@ -84,7 +84,7 @@ func TestSealingOnShutdown(t *testing.T) { // third start _, fm, stop = setupFracManager(t, cfg) - allFractions = fm.lc.registry.all.fractions + allFractions = fm.lc.registry.snapshot.fractions assert.Equal(t, 2, len(allFractions), "should have 2 fraction: new active and old sealed") _, ok = allFractions[0].(*refCountedSealed) assert.True(t, ok, "first fraction should be sealed") diff --git a/fracmanager/fracs_stats.go b/fracmanager/fracs_stats.go index c70bbd37..ee255543 100644 --- a/fracmanager/fracs_stats.go +++ b/fracmanager/fracs_stats.go @@ -76,6 +76,7 @@ type registryStats struct { active fracsStats // Statistics for active fraction sealing fracsStats // Statistics for fractions in the sealing process sealed fracsStats // Statistics for fractions on sealed disk + compacting fracsStats // Statistics for fractions participating in compaction offloading fracsStats // Statistics for fractions in the offloading process remotes fracsStats // Statistics for fractions in remote storage } @@ -84,6 +85,7 @@ func (s *registryStats) Log() { s.active.Log("active") s.sealing.Log("sealing") s.sealed.Log("sealed") + s.compacting.Log("compacting") s.offloading.Log("offloading") s.remotes.Log("remotes") } @@ -92,10 +94,11 @@ func (s *registryStats) SetMetrics() { s.active.SetMetrics(dataSizeTotal, "active") s.sealing.SetMetrics(dataSizeTotal, "sealing") s.sealed.SetMetrics(dataSizeTotal, "sealed") + s.compacting.SetMetrics(dataSizeTotal, "compacting") s.offloading.SetMetrics(dataSizeTotal, "offloading") s.remotes.SetMetrics(dataSizeTotal, "remotes") } func (s registryStats) TotalSizeOnDiskLocal() uint64 { - return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk + return s.sealing.totalSizeOnDisk + s.sealed.totalSizeOnDisk + s.compacting.totalSizeOnDisk } diff --git a/fracmanager/fraction_provider.go b/fracmanager/fraction_provider.go index 54b10979..0445d8cc 100644 --- a/fracmanager/fraction_provider.go +++ b/fracmanager/fraction_provider.go @@ -14,9 +14,9 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/node" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/storage/s3" "github.com/ozontech/seq-db/util" diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index b0667c04..c0112383 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -21,16 +21,17 @@ type fractionRegistry struct { sealing map[string]*syncAppender // fractions being sealed (0-5 typical) sealed PartitionedCollection[*refCountedSealed] // local sealed fractions (can be thousands) + compacting map[string]*refCountedSealed // fractions participating in compaction offloading PartitionedCollection[*refCountedSealed] // fractions being offloaded (0-5 typical) remotes PartitionedCollection[*refCountedRemote] // offloaded fractions (can be thousands) stats registryStats // size statistics for monitoring muAppender sync.RWMutex - appender *syncAppender // currently active writable fraction + sappender *syncAppender // currently active writable fraction - muAll sync.RWMutex - all fractionsSnapshot // all fractions + muSnapshot sync.RWMutex + snapshot fractionsSnapshot // all fractions } // NewFractionRegistry creates and initializes a new fraction registry instance. @@ -51,10 +52,11 @@ func NewFractionRegistry(active *frac.Active, sealed []*frac.Sealed, remotes []* } reg := fractionRegistry{ - appender: &syncAppender{refCountedActive: refCountedActive{Active: active}}, + sappender: &syncAppender{refCountedActive: refCountedActive{Active: active}}, sealing: map[string]*syncAppender{}, sealed: NewPartitionedCollection(func(rcs *refCountedSealed) uint64 { return creationTime(rcs) }), + compacting: map[string]*refCountedSealed{}, offloading: NewPartitionedCollection(func(rcs *refCountedSealed) uint64 { return lastDocTime(rcs) }), remotes: NewPartitionedCollection(func(rcr *refCountedRemote) uint64 { return lastDocTime(rcr) }), } @@ -76,51 +78,51 @@ func NewFractionRegistry(active *frac.Active, sealed []*frac.Sealed, remotes []* return ®, nil } -// Appender returns the currently active writable fraction. -func (r *fractionRegistry) Appender() *syncAppender { +// appender returns the currently active writable fraction. +func (r *fractionRegistry) appender() *syncAppender { r.muAppender.RLock() defer r.muAppender.RUnlock() - return r.appender + return r.sappender } -func (r *fractionRegistry) AcquireOneFraction(name string) (frac.Fraction, func(), bool) { - r.muAll.RLock() - defer r.muAll.RUnlock() +func (r *fractionRegistry) acquireOneFraction(name string) (frac.Fraction, func(), bool) { + r.muSnapshot.RLock() + defer r.muSnapshot.RUnlock() - return r.all.AcquireOne(name) + return r.snapshot.AcquireOne(name) } -// AcquireAllFractions returns a read-only view of all fractions -func (r *fractionRegistry) AcquireAllFractions() ([]frac.Fraction, func()) { - r.muAll.RLock() - defer r.muAll.RUnlock() +// acquireAllFractions returns a read-only view of all fractions +func (r *fractionRegistry) acquireAllFractions() ([]frac.Fraction, func()) { + r.muSnapshot.RLock() + defer r.muSnapshot.RUnlock() - return r.all.AcquireAll() + return r.snapshot.AcquireAll() } -// Stats returns current size statistics of the registry. -func (r *fractionRegistry) Stats() registryStats { +// statistics returns current size statistics of the registry. +func (r *fractionRegistry) statistics() registryStats { r.mu.RLock() s := r.stats - i := r.appender.Info() + i := r.sappender.Info() r.mu.RUnlock() s.active.Set(i) return s } -// OldestTotal returns the creation time of the oldest fraction in the registry. -func (r *fractionRegistry) OldestTotal() uint64 { - r.muAll.RLock() - defer r.muAll.RUnlock() - return r.all.oldestTotal +// oldestTotal returns the creation time of the oldest fraction in the registry. +func (r *fractionRegistry) oldestTotal() uint64 { + r.muSnapshot.RLock() + defer r.muSnapshot.RUnlock() + return r.snapshot.oldestTotal } -// OldestLocal returns the creation time of the oldest local fraction in the registry. -func (r *fractionRegistry) OldestLocal() uint64 { - r.muAll.RLock() - defer r.muAll.RUnlock() - return r.all.oldestLocal +// oldestLocal returns the creation time of the oldest local fraction in the registry. +func (r *fractionRegistry) oldestLocal() uint64 { + r.muSnapshot.RLock() + defer r.muSnapshot.RUnlock() + return r.snapshot.oldestLocal } type activeProvider interface { @@ -131,39 +133,39 @@ func (r *fractionRegistry) setAppender(appender *syncAppender) { r.muAppender.Lock() defer r.muAppender.Unlock() - r.appender = appender + r.sappender = appender - r.muAll.Lock() - defer r.muAll.Unlock() + r.muSnapshot.Lock() + defer r.muSnapshot.Unlock() - r.all.AddActive(appender) + r.snapshot.AddActive(appender) } -// RotateIfFull completes the current active fraction and starts a new one. +// rotateIfFull completes the current active fraction and starts a new one. // Moves previous active fraction to sealing queue. // Should be called when the current active fraction reaches size limit and needs to be rotated -func (r *fractionRegistry) RotateIfFull(maxSize uint64, ap activeProvider) (*refCountedActive, func(), error) { +func (r *fractionRegistry) rotateIfFull(maxSize uint64, ap activeProvider) (*refCountedActive, func(), error) { r.mu.Lock() defer r.mu.Unlock() - if r.appender.Info().DocsOnDisk <= maxSize { + if r.sappender.Info().DocsOnDisk <= maxSize { return nil, nil, nil } - old := r.appender + old := r.sappender r.sealing[old.Info().Name()] = old r.setAppender(&syncAppender{refCountedActive: refCountedActive{Active: ap.CreateActive()}}) - if err := old.Finalize(); err != nil { + if err := old.finalize(); err != nil { return nil, nil, err } curInfo := old.Info() r.stats.sealing.Add(curInfo) - r.appender.Suspend(old.Suspended()) + r.sappender.suspend(old.isSuspended()) wg := sync.WaitGroup{} wg.Add(1) @@ -172,7 +174,7 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, ap activeProvider) (*ref go func() { defer wg.Done() - old.WaitWriteIdle() // can be long enough + old.waitWriteIdle() // can be long enough finalInfo := old.Info() r.mu.Lock() @@ -187,11 +189,11 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, ap activeProvider) (*ref return &old.refCountedActive, wg.Wait, nil } -func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { +func (r *fractionRegistry) suspendIfOverCapacity(maxQueue, maxSize uint64) { r.mu.Lock() defer r.mu.Unlock() - suspended := r.appender.Suspended() + suspended := r.sappender.isSuspended() if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) { if !suspended { @@ -199,7 +201,7 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.String("reason", "sealing queue size exceeded"), zap.Uint64("limit", maxQueue), zap.Int("queue_size", r.stats.sealing.count)) - r.appender.Suspend(true) + r.sappender.suspend(true) } return } @@ -212,7 +214,7 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.String("reason", "occupied space limit exceeded"), zap.Float64("queue_size_limit_gb", util.Float64ToPrec(util.SizeToUnit(maxSize, "gb"), 2)), zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2))) - r.appender.Suspend(true) + r.sappender.suspend(true) } return } @@ -223,20 +225,21 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2)), zap.Uint64("sealing_queue_size_limit", maxQueue), zap.Int("queue_size", r.stats.sealing.count)) - r.appender.Suspend(false) + r.sappender.suspend(false) } } func (r *fractionRegistry) diskUsage() uint64 { - return r.appender.Info().FullSize() + + return r.sappender.Info().FullSize() + r.stats.sealed.totalSizeOnDisk + r.stats.sealing.totalSizeOnDisk + + r.stats.compacting.totalSizeOnDisk + r.stats.offloading.totalSizeOnDisk } -// EvictLocalForDelete removes oldest local fractions to free disk space. +// evictLocalForDelete removes oldest local fractions to free disk space. // Returns evicted fractions or error if insufficient space is released. -func (r *fractionRegistry) EvictLocalForDelete(sizeLimit uint64) (evicted []*refCountedSealed, err error) { +func (r *fractionRegistry) evictLocalForDelete(sizeLimit uint64) (evicted []*refCountedSealed, err error) { r.mu.Lock() defer r.mu.Unlock() @@ -249,9 +252,9 @@ func (r *fractionRegistry) EvictLocalForDelete(sizeLimit uint64) (evicted []*ref return evicted, nil } -// EvictLocalForOffload removes oldest local fractions to moves it to offloading queue. +// evictLocalForOffload removes oldest local fractions to moves it to offloading queue. // Returns evicted fractions or error if insufficient space is released. -func (r *fractionRegistry) EvictLocalForOffload(sizeLimit uint64) ([]*refCountedSealed, error) { +func (r *fractionRegistry) evictLocalForOffload(sizeLimit uint64) ([]*refCountedSealed, error) { r.mu.Lock() defer r.mu.Unlock() @@ -272,16 +275,17 @@ func (r *fractionRegistry) evictLocal(sizeLimit uint64) ([]*refCountedSealed, er var releasingSize uint64 // calculate total used disk space - totalUsedSize := r.stats.TotalSizeOnDiskLocal() + r.appender.Info().FullSize() - - evicted := []*refCountedSealed{} + totalUsedSize := r.stats.TotalSizeOnDiskLocal() + r.sappender.Info().FullSize() + var evicted []*refCountedSealed for r.sealed.Len() > 0 && totalUsedSize-releasingSize > sizeLimit { for _, s := range r.sealed.GetByPartition(r.sealed.MinPartition()) { info := s.Info() releasingSize += info.FullSize() + r.stats.sealed.Sub(info) r.sealed.Del(info.Name()) + evicted = append(evicted, s) } } @@ -296,10 +300,10 @@ func (r *fractionRegistry) evictLocal(sizeLimit uint64) ([]*refCountedSealed, er return evicted, nil } -// EvictRemote removes oldest remote fractions based on retention policy. +// evictRemote removes oldest remote fractions based on retention policy. // Fractions older than retention period are permanently deleted. // Returns removed fractions or empty slice if nothing to remove. -func (r *fractionRegistry) EvictRemote(retention time.Duration) []*refCountedRemote { +func (r *fractionRegistry) evictRemote(retention time.Duration) []*refCountedRemote { if retention == 0 { return nil } @@ -322,9 +326,9 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*refCountedRem return evicted } -// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit. +// evictOverflowed removes oldest fractions from offloading queue when it exceeds size limit. // Used when offloading queue grows too large due to slow remote storage performance. -func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) (evicted []*refCountedSealed) { +func (r *fractionRegistry) evictOverflowed(sizeLimit uint64) (evicted []*refCountedSealed) { if sizeLimit == 0 { return nil } @@ -355,23 +359,43 @@ loop: return evicted } -// PromoteToSealed moves fractions from sealing to local queue when sealing completes. -func (r *fractionRegistry) PromoteToSealed(active *refCountedActive, sealed *frac.Sealed) { +// promoteToSealed moves fractions from sealing to local queue when sealing completes. +func (r *fractionRegistry) promoteToSealed(active *refCountedActive, sealed ...*frac.Sealed) { r.mu.Lock() defer r.mu.Unlock() - r.sealed.Add(sealed.Info().Name(), &refCountedSealed{Sealed: sealed}) - r.stats.sealed.Add(sealed.Info()) - r.stats.sealing.Sub(active.Info()) + for _, f := range sealed { + info := f.Info() + r.sealed.Add(info.Name(), &refCountedSealed{Sealed: f}) + r.stats.sealed.Add(info) + } + r.stats.sealing.Sub(active.Info()) delete(r.sealing, active.Info().Name()) r.rebuildSnapshot() } -// PromoteToRemote moves fractions from offloading to remote queue when offloading completes. +func (r *fractionRegistry) substituteWithSealed(produced *frac.Sealed, consumed ...*refCountedSealed) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, f := range consumed { + info := f.Info() + r.stats.compacting.Sub(info) + delete(r.compacting, info.Name()) + } + + info := produced.Info() + r.stats.sealed.Add(info) + r.sealed.Add(info.Name(), &refCountedSealed{Sealed: produced}) + + r.rebuildSnapshot() +} + +// promoteToRemote moves fractions from offloading to remote queue when offloading completes. // Special case: handles fractions that don't require offloading (remote == nil). -func (r *fractionRegistry) PromoteToRemote(sealed *refCountedSealed, remote *frac.Remote) { +func (r *fractionRegistry) promoteToRemote(sealed *refCountedSealed, remote *frac.Remote) { r.mu.Lock() defer r.mu.Unlock() @@ -380,14 +404,60 @@ func (r *fractionRegistry) PromoteToRemote(sealed *refCountedSealed, remote *fra r.stats.remotes.Add(remote.Info()) } - r.stats.offloading.Sub(sealed.Info()) r.offloading.Del(sealed.Info().Name()) + r.stats.offloading.Sub(sealed.Info()) + + r.rebuildSnapshot() +} + +func (r *fractionRegistry) sealedSnapshot() []*frac.Sealed { + r.mu.RLock() + defer r.mu.RUnlock() + + result := make([]*frac.Sealed, 0, r.sealed.Len()) + for s := range r.sealed.All() { + result = append(result, s.Sealed) + } + + return result +} + +func (r *fractionRegistry) claimForCompaction(names []string) ([]*refCountedSealed, error) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, name := range names { + // NOTE(dkharms): If offloading pressure is high on the oldest fractions, + // compaction may repeatedly fail to claim them and get into livelock. + if _, ok := r.sealed.Get(name); !ok { + return nil, fmt.Errorf( + "fraction %q is not available for compaction", + name, + ) + } + } + + claimed := make([]*refCountedSealed, 0, len(names)) + for _, name := range names { + s, _ := r.sealed.Get(name) + + r.sealed.Del(name) + r.stats.sealed.Sub(s.Info()) + + r.compacting[name] = s + r.stats.compacting.Add(s.Info()) + + claimed = append(claimed, s) + } + r.rebuildSnapshot() + return claimed, nil } // rebuildSnapshot reconstructs the all fractions list func (r *fractionRegistry) rebuildSnapshot() { - capacity := r.remotes.Len() + r.offloading.Len() + r.sealed.Len() + len(r.sealing) + 1 + capacity := r.remotes.Len() + r.offloading.Len() + + r.sealed.Len() + len(r.compacting) + len(r.sealing) + 1 // allocate extra capacity to accommodate appender rotation that may occur during snapshot lifetime all := newFractionsSnapshot(capacity + 1) @@ -404,13 +474,18 @@ func (r *fractionRegistry) rebuildSnapshot() { all.AddSealed(s) } + for _, c := range r.compacting { + all.AddSealed(c) + } + for _, a := range r.sealing { all.AddActive(a) } - all.AddActive(r.appender) + all.AddActive(r.sappender) + + r.muSnapshot.Lock() + defer r.muSnapshot.Unlock() - r.muAll.Lock() - defer r.muAll.Unlock() - r.all = all + r.snapshot = all } diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index 24025c23..e98c5871 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -42,7 +42,7 @@ func newLifecycleManager( // Maintain performs periodic lifecycle management tasks. // It coordinates rotation, offloading, cleanup based on configuration. func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { - lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, cfg.SuspendThreshold()) + lc.registry.suspendIfOverCapacity(cfg.SealingQueueLen, cfg.SuspendThreshold()) lc.rotate(cfg.FracSize, wg) if cfg.OffloadingEnabled { @@ -68,7 +68,7 @@ func (lc *lifecycleManager) SyncInfoCache() { // rotate checks if active fraction needs rotation based on size limit. // Creates new active fraction and starts sealing the previous one. func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { - active, waitBeforeSealing, err := lc.registry.RotateIfFull(maxSize, lc.provider) + active, waitBeforeSealing, err := lc.registry.rotateIfFull(maxSize, lc.provider) if err != nil { logger.Fatal("active fraction rotation error", zap.Error(err)) } @@ -89,7 +89,7 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { } lc.infoCache.Add(sealed.Info()) - lc.registry.PromoteToSealed(active, sealed) + lc.registry.promoteToSealed(active, sealed) active.Destroy() }() } @@ -97,7 +97,7 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { // offloadLocal starts offloading of local fractions to remote storage. // Selects fractions based on disk space usage and retention policy. func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) { - toOffload, err := lc.registry.EvictLocalForOffload(sizeLimit) + toOffload, err := lc.registry.evictLocalForOffload(sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } @@ -108,7 +108,7 @@ func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, remote := lc.offloadWithRetry(ctx, frac.Sealed, retryDelay) - lc.registry.PromoteToRemote(frac, remote) + lc.registry.promoteToRemote(frac, remote) if remote == nil { lc.infoCache.Remove(frac.Info().Name()) @@ -181,7 +181,7 @@ func (lc *lifecycleManager) tryOffload(ctx context.Context, sealed *frac.Sealed) // cleanRemote deletes outdated remote fractions based on retention policy. func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGroup) { - toDelete := lc.registry.EvictRemote(retention) + toDelete := lc.registry.evictRemote(retention) wg.Add(len(toDelete)) for _, remote := range toDelete { go func() { @@ -194,10 +194,11 @@ func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGr // cleanLocal deletes outdated local fractions when offloading is disabled. func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { - toDelete, err := lc.registry.EvictLocalForDelete(sizeLimit) + toDelete, err := lc.registry.evictLocalForDelete(sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } + if len(toDelete) > 0 && !lc.flags.IsCapacityExceeded() { if err := lc.flags.setCapacityExceeded(true); err != nil { logger.Fatal("can't set capacity_exceeded flag", zap.Error(err)) @@ -217,14 +218,14 @@ func (lc *lifecycleManager) cleanLocal(sizeLimit uint64, wg *sync.WaitGroup) { // updateOldestMetric updates the prometheus metric with oldest fraction timestamp. func (lc *lifecycleManager) updateOldestMetric() { - oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.OldestTotal()) * time.Millisecond).Seconds()) - oldestFracTime.WithLabelValues("local").Set((time.Duration(lc.registry.OldestLocal()) * time.Millisecond).Seconds()) + oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.oldestTotal()) * time.Millisecond).Seconds()) + oldestFracTime.WithLabelValues("local").Set((time.Duration(lc.registry.oldestLocal()) * time.Millisecond).Seconds()) } // removeOverflowed removes fractions from offloading queue that exceed size limit // Stops ongoing offloading tasks and cleans up both local and remote resources. func (lc *lifecycleManager) removeOverflowed(sizeLimit uint64, wg *sync.WaitGroup) { - evicted := lc.registry.EvictOverflowed(sizeLimit) + evicted := lc.registry.evictOverflowed(sizeLimit) for _, sealed := range evicted { wg.Add(1) go func() { diff --git a/fracmanager/lifecycle_manager_test.go b/fracmanager/lifecycle_manager_test.go index cb9ab1e0..bebc2c1f 100644 --- a/fracmanager/lifecycle_manager_test.go +++ b/fracmanager/lifecycle_manager_test.go @@ -38,7 +38,7 @@ func TestFracInfoCache(t *testing.T) { defer tearDown() fillRotateAndCheck := func(names map[string]struct{}) { - appender := lc.registry.Appender() + appender := lc.registry.appender() appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} @@ -56,13 +56,13 @@ func TestFracInfoCache(t *testing.T) { for range 10 { fillRotateAndCheck(first) } - halfSize := lc.registry.Stats().TotalSizeOnDiskLocal() + halfSize := lc.registry.statistics().TotalSizeOnDiskLocal() second := map[string]struct{}{} for range 10 { fillRotateAndCheck(second) } - total := lc.registry.Stats().TotalSizeOnDiskLocal() + total := lc.registry.statistics().TotalSizeOnDiskLocal() wg := sync.WaitGroup{} lc.cleanLocal(total-halfSize, &wg) @@ -86,7 +86,7 @@ func TestCapacityExceeded(t *testing.T) { const fracsCount = 10 fillAndRotate := func() { - appender := lc.registry.Appender() + appender := lc.registry.appender() appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} @@ -102,19 +102,19 @@ func TestCapacityExceeded(t *testing.T) { } assert.False(t, lc.flags.IsCapacityExceeded(), "there should be no deletions and the flag is false") - total := lc.registry.Stats().TotalSizeOnDiskLocal() + total := lc.registry.statistics().TotalSizeOnDiskLocal() wg := sync.WaitGroup{} lc.cleanLocal(total, &wg) wg.Wait() - assert.Equal(t, fracsCount, lc.registry.Stats().sealed.count, "as much as was added, so much should be") + assert.Equal(t, fracsCount, lc.registry.statistics().sealed.count, "as much as was added, so much should be") assert.False(t, lc.flags.IsCapacityExceeded(), "there should still be no deletions, and the flag is false") lc.cleanLocal(total-1, &wg) wg.Wait() - assert.Equal(t, fracsCount-1, lc.registry.Stats().sealed.count, "expect one less") + assert.Equal(t, fracsCount-1, lc.registry.statistics().sealed.count, "expect one less") assert.True(t, lc.flags.IsCapacityExceeded(), "the flag must be true now") } @@ -124,30 +124,30 @@ func TestOldestMetrics(t *testing.T) { const fracsCount = 10 fillAndRotate := func() { - appender := lc.registry.Appender() + appender := lc.registry.appender() appendDocsToActive(t, appender.Active, 10+rand.Intn(10)) wg := sync.WaitGroup{} lc.rotate(0, &wg) wg.Wait() } - firstFracTime := lc.registry.Appender().Info().CreationTime + firstFracTime := lc.registry.appender().Info().CreationTime for range fracsCount { fillAndRotate() } // Check state after initial rotations - assert.Equal(t, firstFracTime, lc.registry.OldestTotal(), "should point to the very first fraction when all data is local") - assert.Equal(t, firstFracTime, lc.registry.OldestLocal(), "should point to the first fraction when nothing is offloaded") + assert.Equal(t, firstFracTime, lc.registry.oldestTotal(), "should point to the very first fraction when all data is local") + assert.Equal(t, firstFracTime, lc.registry.oldestLocal(), "should point to the first fraction when nothing is offloaded") - halfSize := lc.registry.Stats().TotalSizeOnDiskLocal() + halfSize := lc.registry.statistics().TotalSizeOnDiskLocal() - halfwayFracTime := lc.registry.Appender().Info().CreationTime + halfwayFracTime := lc.registry.appender().Info().CreationTime for range fracsCount { fillAndRotate() } - total := lc.registry.Stats().TotalSizeOnDiskLocal() + total := lc.registry.statistics().TotalSizeOnDiskLocal() wg := sync.WaitGroup{} lc.offloadLocal(t.Context(), total-halfSize, 0, &wg) @@ -155,8 +155,8 @@ func TestOldestMetrics(t *testing.T) { // Check state after offloading assert.NotEqual(t, firstFracTime, halfwayFracTime, "expect different creation times") - assert.Equal(t, firstFracTime, lc.registry.OldestTotal(), "should still reference the first fraction after offload") - assert.Equal(t, halfwayFracTime, lc.registry.OldestLocal(), "should point to the oldest remaining local fraction after offload") + assert.Equal(t, firstFracTime, lc.registry.oldestTotal(), "should still reference the first fraction after offload") + assert.Equal(t, halfwayFracTime, lc.registry.oldestLocal(), "should point to the oldest remaining local fraction after offload") } func TestPendingDestroy(t *testing.T) { @@ -170,19 +170,19 @@ func TestPendingDestroy(t *testing.T) { // appending docs to `fracsCount` fractions where the last is active and the rest are sealed wg := sync.WaitGroup{} for range fracsCount - 1 { - appendDocsToActive(t, lc.registry.Appender().Active, docsPerFrac) + appendDocsToActive(t, lc.registry.appender().Active, docsPerFrac) lc.rotate(0, &wg) } - appendDocsToActive(t, lc.registry.Appender().Active, docsPerFrac) + appendDocsToActive(t, lc.registry.appender().Active, docsPerFrac) // wait sealing complete wg.Wait() // take all fracs to search - fractions1, release1 := lc.registry.AcquireAllFractions() + fractions1, release1 := lc.registry.acquireAllFractions() // delete all sealing fracs - lc.cleanLocal(lc.registry.Appender().Info().FullSize(), &wg) + lc.cleanLocal(lc.registry.appender().Info().FullSize(), &wg) var ( beforeRelease time.Time @@ -220,7 +220,7 @@ func TestPendingDestroy(t *testing.T) { cleanup.Wait() assert.Less(t, beforeRelease, afterCleanup, "we expect cleanup to happen after release") - fractions2, release2 := lc.registry.AcquireAllFractions() + fractions2, release2 := lc.registry.acquireAllFractions() assert.Len(t, fractions2, 1, "only one active fraction should remain") singleName := fractions2[0].Info().Name() diff --git a/fracmanager/sealer_test.go b/fracmanager/sealer_test.go index f85c3f8f..51c16b6b 100644 --- a/fracmanager/sealer_test.go +++ b/fracmanager/sealer_test.go @@ -19,8 +19,8 @@ import ( "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/frac/sealed/sealing" "github.com/ozontech/seq-db/indexer" + "github.com/ozontech/seq-db/sealing" "github.com/ozontech/seq-db/seq" testscommon "github.com/ozontech/seq-db/tests/common" ) diff --git a/fracmanager/sync_appender.go b/fracmanager/sync_appender.go index 76cf4ee0..1acb15a3 100644 --- a/fracmanager/sync_appender.go +++ b/fracmanager/sync_appender.go @@ -26,8 +26,8 @@ type syncAppender struct { suspended bool // Temporarily suspended for writes } -// Append adds documents to the active fraction -func (a *syncAppender) Append(docs, meta []byte) error { +// append adds documents to the active fraction +func (a *syncAppender) append(docs, meta []byte) error { a.mu.RLock() if a.finalized { a.mu.RUnlock() @@ -43,22 +43,22 @@ func (a *syncAppender) Append(docs, meta []byte) error { return a.refCountedActive.Append(docs, meta, &a.wg) } -func (a *syncAppender) Suspended() bool { +func (a *syncAppender) isSuspended() bool { a.mu.Lock() defer a.mu.Unlock() return a.suspended } -func (a *syncAppender) Suspend(value bool) { +func (a *syncAppender) suspend(value bool) { a.mu.Lock() a.suspended = value a.mu.Unlock() } -// WaitWriteIdle waits for all pending write operations to complete +// waitWriteIdle waits for all pending write operations to complete // Used before sealing to ensure data consistency. -func (a *syncAppender) WaitWriteIdle() { +func (a *syncAppender) waitWriteIdle() { start := time.Now() logger.Info("waiting fraction to stop write...", zap.String("name", a.BaseFileName)) a.wg.Wait() @@ -70,8 +70,8 @@ func (a *syncAppender) WaitWriteIdle() { ) } -// Finalize marks the fraction as read-only and prevents new writes from starting after finalize. -func (a *syncAppender) Finalize() error { +// finalize marks the fraction as read-only and prevents new writes from starting after finalize. +func (a *syncAppender) finalize() error { a.mu.Lock() if a.finalized { a.mu.Unlock() diff --git a/frac/sealed/sealing/blocks_builder.go b/indexwriter/blocks.go similarity index 52% rename from frac/sealed/sealing/blocks_builder.go rename to indexwriter/blocks.go index 3c6ce1b0..636f8fe9 100644 --- a/frac/sealed/sealing/blocks_builder.go +++ b/indexwriter/blocks.go @@ -1,64 +1,58 @@ -package sealing +package indexwriter import ( "encoding/binary" "iter" + "math" "unsafe" - "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/seqids" "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/util" ) -type ( - TokenBlock = util.Pair[tokensSealBlock, []token.FieldTable] -) +type tokenFieldBlock = util.Pair[unpackedTokenBlock, []token.FieldTable] -// tokensExt represents the token ID range contained in a block. -type tokensExt struct { +// tokenExt represents the token ID range contained in a block. +type tokenExt struct { minTID uint32 // First token ID in the block maxTID uint32 // Last token ID in the block } -// tokensSealBlock represents a sealed block containing token data with metadata. -type tokensSealBlock struct { - ext tokensExt // Tokens block metadata for registry marking +// unpackedTokenBlock represents a sealed block containing token data with metadata. +type unpackedTokenBlock struct { + ext tokenExt // Tokens block metadata for registry marking payload token.Block // Actual token data payload } -// lidsExt represents the range and continuation status of LID blocks. -type lidsExt struct { +// lidExt represents the range and continuation status of LID blocks. +type lidExt struct { minTID uint32 // First token ID in the LID block maxTID uint32 // Last token ID in the LID block isContinued bool // Whether LID sequence continues in next block } -// lidsSealBlock represents a sealed block containing LID (Local ID) data. -type lidsSealBlock struct { - ext lidsExt // LIDs block metadata for registry marking +// unpackedLIDBlock represents a sealed block containing LID (Local ID) data. +type unpackedLIDBlock struct { + ext lidExt // LIDs block metadata for registry marking payload lids.Block // LID data payload } -// idsSealBlock represents a sealed block containing various identifier types. -type idsSealBlock struct { +// unpackedIDBlock represents a sealed block containing various identifier types. +type unpackedIDBlock struct { mids seqids.BlockMIDs rids seqids.BlockRIDs params seqids.BlockParams } -// blocksBuilder constructs sealed blocks from various data sources. -// Provides error tracking and consistency validation during block construction. -type blocksBuilder struct{} - -func (bb *blocksBuilder) BuildTokenBlocks( +func tokenBlock( it iter.Seq2[string, iter.Seq2[TokenPosting, error]], - accumulate func([]uint32) error, blockCapacity int, -) iter.Seq2[TokenBlock, error] { - return func(yield func(TokenBlock, error) bool) { + accumulate func([]uint32) error, blockCapacity int, tokenFreqThreshold int, +) iter.Seq2[tokenFieldBlock, error] { + return func(yield func(tokenFieldBlock, error) bool) { var ( - block tokensSealBlock + block unpackedTokenBlock blockIdx uint32 blockSize int ) @@ -87,13 +81,15 @@ func (bb *blocksBuilder) BuildTokenBlocks( emitFieldEntry() block.ext.maxTID = currentTID - pair := TokenBlock{First: block, Second: pendingTable} + pair := tokenFieldBlock{First: block, Second: pendingTable} if !yield(pair, nil) { return false } block.payload.Payload = block.payload.Payload[:0] block.payload.Offsets = block.payload.Offsets[:0] + block.payload.FreqIndexes = block.payload.FreqIndexes[:0] + block.payload.Freqs = block.payload.Freqs[:0] block.ext.minTID = currentTID + 1 blockIdx++ @@ -106,15 +102,15 @@ func (bb *blocksBuilder) BuildTokenBlocks( } block.ext.minTID = 1 - for field, tokenIterator := range it { + for field, tokIt := range it { emitFieldEntry() fieldName = field fieldEntryStartTID = currentTID + 1 - for pair, err := range tokenIterator { + for pair, err := range tokIt { if err != nil { - yield(TokenBlock{}, err) + yield(tokenFieldBlock{}, err) return } @@ -127,12 +123,21 @@ func (bb *blocksBuilder) BuildTokenBlocks( } } + tokenIndex := uint32(len(block.payload.Offsets)) block.payload.Offsets = append(block.payload.Offsets, uint32(len(block.payload.Payload))) block.payload.Payload = binary.LittleEndian.AppendUint32(block.payload.Payload, uint32(len(tok))) block.payload.Payload = append(block.payload.Payload, tok...) + if len(tlids) >= tokenFreqThreshold { + if tokenIndex > math.MaxUint16 { + panic("unsupported token block size") + } + block.payload.FreqIndexes = append(block.payload.FreqIndexes, uint16(tokenIndex)) + block.payload.Freqs = append(block.payload.Freqs, uint32(len(tlids))) + } + if err := accumulate(tlids); err != nil { - yield(TokenBlock{}, err) + yield(tokenFieldBlock{}, err) return } @@ -149,7 +154,7 @@ func (bb *blocksBuilder) BuildTokenBlocks( func newTokenTableEntry( entryStartTID, entryEndTID uint32, - blockIndex uint32, block tokensSealBlock, + blockIndex uint32, block unpackedTokenBlock, ) *token.TableEntry { // Convert global TIDs to block-local indices firstIndex := entryStartTID - block.ext.minTID @@ -169,15 +174,15 @@ func newTokenTableEntry( } } -// seqBlockID accumulates scalar (ID, position) pairs into sealed ID blocks. +// idBlock accumulates scalar (ID, position) pairs into sealed ID blocks. // A new block is yielded every `blockCapacity` IDs. -func seqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[idsSealBlock, error] { - return func(yield func(idsSealBlock, error) bool) { - var block idsSealBlock +func idBlock(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[unpackedIDBlock, error] { + return func(yield func(unpackedIDBlock, error) bool) { + var block unpackedIDBlock for pair, err := range ids { if err != nil { - yield(idsSealBlock{}, err) + yield(unpackedIDBlock{}, err) return } @@ -203,87 +208,24 @@ func seqBlockID(ids iter.Seq2[DocLocation, error], blockCapacity int) iter.Seq2[ } } -type lidAccumulator struct { - blockCapacity int - onBlock func(lidsSealBlock) error - - currentTID uint32 - currentBlock lidsSealBlock - - isEndOfToken bool - isContinued bool -} - -func newLIDAccumulator( - blockCapacity int, - onBlock func(lidsSealBlock) error, -) *lidAccumulator { - if blockCapacity == 0 { - blockCapacity = consts.DefaultLIDBlockCap - } - a := &lidAccumulator{ - blockCapacity: blockCapacity, - onBlock: onBlock, +// collapseOrderedFieldsTables merges FieldTables with the same field name. +// Assumes input is sorted by Field. +func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { + if len(src) == 0 { + return nil } - a.currentBlock.ext.minTID = 1 - a.currentBlock.payload = lids.Block{ - LIDs: make([]uint32, 0, blockCapacity), - Offsets: []uint32{0}, - } - - return a -} - -// Add processes LIDs of one token (must be called in TID order). -// -// For each block that fills up, `onBlock` is called immediately -// before the backing arrays are reset, so `onBlock` may read the -// block data but must not retain references to it. -func (a *lidAccumulator) Add(lidsbuf []uint32) error { - a.currentTID++ - - for _, lid := range lidsbuf { - if len(a.currentBlock.payload.LIDs) == a.blockCapacity { - if err := a.onBlock(a.finalizeBlock()); err != nil { - return err - } - - a.currentBlock.ext.minTID = a.currentTID - a.currentBlock.payload.LIDs = a.currentBlock.payload.LIDs[:0] - a.currentBlock.payload.Offsets = a.currentBlock.payload.Offsets[:1] + current := src[0] + var dst []token.FieldTable + for _, ft := range src[1:] { + if current.Field == ft.Field { + current.Entries = append(current.Entries, ft.Entries...) + continue } - a.isEndOfToken = false - a.currentBlock.ext.maxTID = a.currentTID - a.currentBlock.payload.LIDs = append(a.currentBlock.payload.LIDs, lid) - } - - a.isEndOfToken = true - a.currentBlock.payload.Offsets = append( - a.currentBlock.payload.Offsets, - uint32(len(a.currentBlock.payload.LIDs)), - ) - - return nil -} - -func (a *lidAccumulator) Finalize() error { - return a.onBlock(a.finalizeBlock()) -} - -func (a *lidAccumulator) finalizeBlock() lidsSealBlock { - if !a.isEndOfToken { - a.currentBlock.payload.Offsets = append( - a.currentBlock.payload.Offsets, - uint32(len(a.currentBlock.payload.LIDs)), - ) + dst = append(dst, current) + current = ft } - result := a.currentBlock - result.payload.IsLastLID = a.isEndOfToken - result.ext.isContinued = a.isContinued - - a.isContinued = !a.isEndOfToken - return result + return append(dst, current) } diff --git a/frac/sealed/sealing/blocks_builder_test.go b/indexwriter/blocks_test.go similarity index 79% rename from frac/sealed/sealing/blocks_builder_test.go rename to indexwriter/blocks_test.go index d6bca144..c49f1d97 100644 --- a/frac/sealed/sealing/blocks_builder_test.go +++ b/indexwriter/blocks_test.go @@ -1,4 +1,4 @@ -package sealing +package indexwriter import ( "iter" @@ -6,28 +6,22 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" - "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed/lids" "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/seq" ) -var _ Source = (*mockSource)(nil) - type mockSource struct { - info common.Info - tokens [][]byte - fields []string - fieldMaxTIDs []uint32 - ids []seq.ID - pos []seq.DocPos - tokenLIDs [][]uint32 - blocksOffsets []uint64 + tokens [][]byte + fields []string + fieldMaxTIDs []uint32 + ids []seq.ID + pos []seq.DocPos + tokenLIDs [][]uint32 } -func (m *mockSource) Info() *common.Info { return &m.info } - func (m *mockSource) TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] { return func(yield func(string, iter.Seq2[TokenPosting, error]) bool) { start := 0 @@ -48,8 +42,7 @@ func (m *mockSource) tokensForField(start, end int) iter.Seq2[TokenPosting, erro if j < len(m.tokenLIDs) { lidsbuf = m.tokenLIDs[j] } - pair := TokenPosting{First: m.tokens[j], Second: lidsbuf} - if !yield(pair, nil) { + if !yield(TokenPosting{First: m.tokens[j], Second: lidsbuf}, nil) { return } } @@ -66,7 +59,40 @@ func (m *mockSource) ID() iter.Seq2[DocLocation, error] { } } -func (m *mockSource) BlockOffsets() []uint64 { return m.blocksOffsets } +func TestBlocksBuilder_BuildTokenBlocksWithFreq(t *testing.T) { + const ( + blockSize = 1024 + tokenFreqThreshold = 50 + ) + manyLids := make([]uint32, tokenFreqThreshold) + for i := range manyLids { + manyLids[i] = uint32(i + 1) + } + + src := mockSource{ + tokens: [][]byte{ + []byte("rare"), + []byte("common"), + }, + fields: []string{"f1"}, + fieldMaxTIDs: []uint32{2}, + tokenLIDs: [][]uint32{ + {1, 2, 3}, + manyLids, + }, + } + + var blocks []unpackedTokenBlock + for pair, err := range tokenBlock(src.TokenTriplet(), func([]uint32) error { return nil }, blockSize, tokenFreqThreshold) { + assert.NoError(t, err) + blocks = append(blocks, pair.First) + } + + require.Len(t, blocks, 1) + + assert.Equal(t, uint32(0), blocks[0].payload.GetFreq(0)) + assert.Equal(t, uint32(tokenFreqThreshold), blocks[0].payload.GetFreq(1)) +} func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { src := mockSource{ @@ -114,10 +140,10 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { const blockSize = 24 const lidBlockCap = 3 - var lidBlocks []lidsSealBlock + var lidBlocks []unpackedLIDBlock lidAccumulator := newLIDAccumulator( lidBlockCap, - func(block lidsSealBlock) error { + func(block unpackedLIDBlock) error { block.payload.LIDs = slices.Clone(block.payload.LIDs) block.payload.Offsets = slices.Clone(block.payload.Offsets) lidBlocks = append(lidBlocks, block) @@ -125,13 +151,11 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { }, ) - var bb blocksBuilder - tokenBlocks := bb.BuildTokenBlocks( + tokenBlocksIter := tokenBlock( src.TokenTriplet(), - func(lids []uint32) error { - return lidAccumulator.Add(lids) - }, + lidAccumulator.add, blockSize, + 50, ) // In our test case, each token is 4 bytes long. Also for each token we use uint32 to encode the length. @@ -142,7 +166,7 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { blockIndex := 0 allFieldsTables := []token.FieldTable{} - for pair, err := range tokenBlocks { + for pair, err := range tokenBlocksIter { assert.NoError(t, err) block, fieldsTables := pair.First, pair.Second assert.Equal(t, expectedSizes[blockIndex], block.payload.Len()) @@ -249,31 +273,31 @@ func TestBlocksBuilder_BuildTokenBlocks(t *testing.T) { }, } assert.Equal(t, actualTokenTable.FieldsTables, expectedTokenTable.FieldsTables) - assert.NoError(t, lidAccumulator.Finalize()) + assert.NoError(t, lidAccumulator.finalize()) - expectedLIDBlocks := []lidsSealBlock{ + expectedLIDBlocks := []unpackedLIDBlock{ { - ext: lidsExt{minTID: 1, maxTID: 1, isContinued: false}, + ext: lidExt{minTID: 1, maxTID: 1, isContinued: false}, payload: lids.Block{LIDs: []uint32{10, 20, 30}, Offsets: []uint32{0, 3}, IsLastLID: false}, }, { - ext: lidsExt{minTID: 1, maxTID: 3, isContinued: true}, + ext: lidExt{minTID: 1, maxTID: 3, isContinued: true}, payload: lids.Block{LIDs: []uint32{40, 2, 3}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 4, maxTID: 6, isContinued: false}, + ext: lidExt{minTID: 4, maxTID: 6, isContinued: false}, payload: lids.Block{LIDs: []uint32{4, 5, 6}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 7, maxTID: 9, isContinued: false}, + ext: lidExt{minTID: 7, maxTID: 9, isContinued: false}, payload: lids.Block{LIDs: []uint32{7, 8, 9}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 10, maxTID: 12, isContinued: false}, + ext: lidExt{minTID: 10, maxTID: 12, isContinued: false}, payload: lids.Block{LIDs: []uint32{10, 11, 12}, Offsets: []uint32{0, 1, 2, 3}, IsLastLID: true}, }, { - ext: lidsExt{minTID: 13, maxTID: 14, isContinued: false}, + ext: lidExt{minTID: 13, maxTID: 14, isContinued: false}, payload: lids.Block{LIDs: []uint32{13, 14}, Offsets: []uint32{0, 1, 2}, IsLastLID: true}, }, } @@ -313,7 +337,7 @@ func TestBlocksBuilder_IDsBlocks(t *testing.T) { i := 0 ids := []seq.ID{} pos := []seq.DocPos{} - for block, err := range seqBlockID(src.ID(), 3) { + for block, err := range idBlock(src.ID(), 3) { assert.NoError(t, err) assert.Equal(t, expectedSizes[i], len(block.mids.Values)) diff --git a/frac/sealed/sealing/index.go b/indexwriter/index.go similarity index 69% rename from frac/sealed/sealing/index.go rename to indexwriter/index.go index e14b36b9..56bfa33e 100644 --- a/frac/sealed/sealing/index.go +++ b/indexwriter/index.go @@ -1,7 +1,8 @@ -package sealing +package indexwriter import ( "io" + "iter" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" @@ -11,9 +12,34 @@ import ( "github.com/ozontech/seq-db/frac/sealed/token" "github.com/ozontech/seq-db/seq" "github.com/ozontech/seq-db/storage" + "github.com/ozontech/seq-db/util" "github.com/ozontech/seq-db/zstd" ) +type ( + DocLocation = util.Pair[seq.ID, seq.DocPos] + TokenPosting = util.Pair[[]byte, []uint32] +) + +// Source defines the data required to write all index files for a fraction. +type Source interface { + // Info returns metadata describing this source. + Info() *common.Info + + // ID returns an iterator over stored document identifiers paired with + // their positions, in descending [seq.ID] order. + ID() iter.Seq2[DocLocation, error] + + // BlockOffsets returns byte offsets to each document block + // within this source's `.docs` file. + BlockOffsets() []uint64 + + // TokenTriplet iterates over fields in lexicographic order. + // For each field, it yields tokens (lexicographically sorted) + // paired with the local document ID list for that token. + TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] +} + // indexBlock is one compressed (or not) block with its registry metadata. type indexBlock struct { codec storage.Codec @@ -23,11 +49,11 @@ type indexBlock struct { ext2 uint64 } -func (i indexBlock) Bin(pos int64) (storage.IndexBlockHeader, []byte) { +func (i indexBlock) bin(pos int64) (storage.IndexBlockHeader, []byte) { return storage.NewIndexBlockHeader(pos, i.ext1, i.ext2, uint32(len(i.payload)), i.rawLen, i.codec), i.payload } -type IndexSealer struct { +type IndexWriter struct { params common.SealParams buf1 []byte @@ -40,8 +66,8 @@ type IndexSealer struct { tokenTable token.Table } -func NewIndexSealer(params common.SealParams) *IndexSealer { - return &IndexSealer{ +func New(params common.SealParams) *IndexWriter { + return &IndexWriter{ params: params, buf1: make([]byte, 0, consts.RegularBlockSize), buf2: make([]byte, 0, consts.RegularBlockSize), @@ -50,31 +76,27 @@ func NewIndexSealer(params common.SealParams) *IndexSealer { } } -func (s *IndexSealer) LIDsTable() lids.Table { +func (s *IndexWriter) LIDsTable() lids.Table { return s.lidsTable } -func (s *IndexSealer) TokenTable() token.Table { +func (s *IndexWriter) TokenTable() token.Table { return s.tokenTable } -func (s *IndexSealer) IDsTable() seqids.Table { +func (s *IndexWriter) IDsTable() seqids.Table { return s.idsTable } // WriteOffsetsFile writes the .offsets file containing a single BlockOffsets block. -func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { w, err := newWriter(ws) if err != nil { return err } defer w.release() - offsets := sealed.BlockOffsets{ - IDsTotal: src.Info().DocsTotal + 1, - Offsets: src.BlockOffsets(), - } - + offsets := sealed.BlockOffsets{Offsets: src.BlockOffsets()} if err := w.writeBlock(blockTypeOffset, s.packBlocksOffsetsBlock(offsets)); err != nil { return err } @@ -82,14 +104,14 @@ func (s *IndexSealer) WriteOffsetsFile(ws io.WriteSeeker, src Source) error { return w.finalize() } -func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteIDFile(ws io.WriteSeeker, src Source) error { w, err := newWriter(ws) if err != nil { return err } defer w.release() - for block, err := range seqBlockID(src.ID(), consts.IDsPerBlock) { + for block, err := range idBlock(src.ID(), consts.IDsPerBlock) { if err != nil { return err } @@ -110,7 +132,7 @@ func (s *IndexSealer) WriteIDFile(ws io.WriteSeeker, src Source) error { return w.finalize() } -func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) error { +func (s *IndexWriter) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) error { tw, err := newWriter(tws) if err != nil { return err @@ -123,19 +145,15 @@ func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err } defer lw.release() - var ( - bb blocksBuilder - allFieldsTables []token.FieldTable - ) - lidAccumulator := newLIDAccumulator( s.params.LIDBlockSize, - func(block lidsSealBlock) error { + func(block unpackedLIDBlock) error { return lw.writeBlock(blockTypeLID, s.packLIDsBlock(block)) }, ) - for pair, err := range bb.BuildTokenBlocks(src.TokenTriplet(), lidAccumulator.Add, consts.RegularBlockSize) { + var allFieldsTables []token.FieldTable + for pair, err := range tokenBlock(src.TokenTriplet(), lidAccumulator.add, consts.RegularBlockSize, s.params.TokenFreqThreshold) { if err != nil { return err } @@ -154,15 +172,15 @@ func (s *IndexSealer) WriteTokenTriplet(tws, lws io.WriteSeeker, src Source) err return s.finalizeTokenFile(tw, allFieldsTables) } -func (s *IndexSealer) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) error { - if err := lidAccumulator.Finalize(); err != nil { +func (s *IndexWriter) finalizeLIDFile(w *writer, lidAccumulator *lidAccumulator) error { + if err := lidAccumulator.finalize(); err != nil { return err } return w.finalize() } -func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.FieldTable) error { +func (s *IndexWriter) finalizeTokenFile(w *writer, allFieldsTables []token.FieldTable) error { // Emit section separator. if err := w.writeEmptyBlock(); err != nil { return err @@ -176,39 +194,17 @@ func (s *IndexSealer) finalizeTokenFile(w *writer, allFieldsTables []token.Field return w.finalize() } -func (s *IndexSealer) WriteInfoFile(ws io.Writer, src Source) error { +func (s *IndexWriter) WriteInfoFile(ws io.Writer, src Source) error { block := sealed.BlockInfo{Info: src.Info()} _, err := ws.Write(s.packInfoBlock(block).payload) return err } -// collapseOrderedFieldsTables merges FieldTables with the same field name. -// Assumes input is sorted by Field. -func collapseOrderedFieldsTables(src []token.FieldTable) []token.FieldTable { - if len(src) == 0 { - return nil - } - - current := src[0] - var dst []token.FieldTable - for _, ft := range src[1:] { - if current.Field == ft.Field { - current.Entries = append(current.Entries, ft.Entries...) - continue - } - - dst = append(dst, current) - current = ft - } - - return append(dst, current) -} - func newIndexBlock(raw []byte) indexBlock { return indexBlock{codec: storage.CodecNo, rawLen: uint32(len(raw)), payload: raw} } -func (s *IndexSealer) newIndexBlockZSTD(raw []byte, level int) indexBlock { +func (s *IndexWriter) newIndexBlockZSTD(raw []byte, level int) indexBlock { s.buf2 = zstd.CompressLevel(raw, s.buf2[:0], level) if len(s.buf2) < len(raw) { return indexBlock{codec: storage.CodecZSTD, rawLen: uint32(len(raw)), payload: s.buf2} @@ -217,14 +213,15 @@ func (s *IndexSealer) newIndexBlockZSTD(raw []byte, level int) indexBlock { } // packInfoBlock packs fraction information into an index block. -func (s *IndexSealer) packInfoBlock(block sealed.BlockInfo) indexBlock { +func (s *IndexWriter) packInfoBlock(block sealed.BlockInfo) indexBlock { + s.idsTable.IDsTotal = block.Info.DocsTotal + 1 // Increment by one for [seq.SystemID] s.buf1 = block.Pack(s.buf1[:0]) return newIndexBlock(s.buf1) // Info block is typically small, no compression } // packTokenBlock packs token data into a compressed index block. -func (s *IndexSealer) packTokenBlock(block tokensSealBlock) indexBlock { - s.buf1 = block.payload.Pack(s.buf1[:0]) // Pack token data +func (s *IndexWriter) packTokenBlock(block unpackedTokenBlock) indexBlock { + s.buf1 = block.payload.Pack(s.buf1[:0], s.buf32[:0]) // Pack token data b := s.newIndexBlockZSTD(s.buf1, s.params.TokenListZstdLevel) // Store TID range in extended metadata b.ext1 = uint64(block.ext.maxTID)<<32 | uint64(block.ext.minTID) @@ -232,7 +229,7 @@ func (s *IndexSealer) packTokenBlock(block tokensSealBlock) indexBlock { } // packTokenTableBlock packs the token table into a compressed index block. -func (s *IndexSealer) packTokenTableBlock(tokenTableBlock token.TableBlock) indexBlock { +func (s *IndexWriter) packTokenTableBlock(tokenTableBlock token.TableBlock) indexBlock { s.tokenTable = token.TableFromBlocks([]token.TableBlock{tokenTableBlock}) // Store for PreloadedData // Packing block @@ -241,11 +238,7 @@ func (s *IndexSealer) packTokenTableBlock(tokenTableBlock token.TableBlock) inde } // packBlocksOffsetsBlock packs document block offsets into a compressed index block. -func (s *IndexSealer) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { - // Update IDs table for PreloadedData - s.idsTable.IDsTotal = block.IDsTotal // Total number of IDs - s.idsTable.IDBlocksTotal = uint32(len(block.Offsets)) // Number of ID blocks - +func (s *IndexWriter) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlock { // Packing block s.buf1 = block.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.DocsPositionsZstdLevel) @@ -253,7 +246,7 @@ func (s *IndexSealer) packBlocksOffsetsBlock(block sealed.BlockOffsets) indexBlo } // packMIDsBlock packs MIDs into a compressed index block. -func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { +func (s *IndexWriter) packMIDsBlock(block unpackedIDBlock) indexBlock { // Get the last ID in the block (smallest due to descending order) last := len(block.mids.Values) - 1 @@ -276,14 +269,14 @@ func (s *IndexSealer) packMIDsBlock(block idsSealBlock) indexBlock { } // packRIDsBlock packs RIDs into a compressed index block. -func (s *IndexSealer) packRIDsBlock(block idsSealBlock) indexBlock { +func (s *IndexWriter) packRIDsBlock(block unpackedIDBlock) indexBlock { s.buf1 = block.rids.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) return b } // packPosBlock packs document positions into a compressed index block. -func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { +func (s *IndexWriter) packPosBlock(block unpackedIDBlock) indexBlock { s.buf1 = block.params.Pack(s.buf1[:0]) b := s.newIndexBlockZSTD(s.buf1, s.params.IDsZstdLevel) return b @@ -291,7 +284,7 @@ func (s *IndexSealer) packPosBlock(block idsSealBlock) indexBlock { // packLIDsBlock packs Local IDs (LIDs) into a compressed index block. // Also updates LIDs table for preloaded data access. -func (s *IndexSealer) packLIDsBlock(block lidsSealBlock) indexBlock { +func (s *IndexWriter) packLIDsBlock(block unpackedLIDBlock) indexBlock { var ext1 uint64 if block.ext.isContinued { // todo: Legacy continuation flag ext1 = 1 diff --git a/indexwriter/lid_accumulator.go b/indexwriter/lid_accumulator.go new file mode 100644 index 00000000..311311ef --- /dev/null +++ b/indexwriter/lid_accumulator.go @@ -0,0 +1,85 @@ +package indexwriter + +import "github.com/ozontech/seq-db/frac/sealed/lids" + +type lidAccumulator struct { + blockCapacity int + onBlock func(unpackedLIDBlock) error + + currentTID uint32 + currentBlock unpackedLIDBlock + + isEndOfToken bool + isContinued bool +} + +func newLIDAccumulator( + blockCapacity int, + onBlock func(unpackedLIDBlock) error, +) *lidAccumulator { + a := &lidAccumulator{ + blockCapacity: blockCapacity, + onBlock: onBlock, + } + + a.currentBlock.ext.minTID = 1 + a.currentBlock.payload = lids.Block{ + LIDs: make([]uint32, 0, blockCapacity), + Offsets: []uint32{0}, + } + + return a +} + +// add processes LIDs of one token (must be called in TID order). +// +// For each block that fills up, `onBlock` is called immediately +// before the backing arrays are reset, so `onBlock` may read the +// block data but must not retain references to it. +func (a *lidAccumulator) add(lidsbuf []uint32) error { + a.currentTID++ + + for _, lid := range lidsbuf { + if len(a.currentBlock.payload.LIDs) == a.blockCapacity { + if err := a.onBlock(a.finalizeBlock()); err != nil { + return err + } + + a.currentBlock.ext.minTID = a.currentTID + a.currentBlock.payload.LIDs = a.currentBlock.payload.LIDs[:0] + a.currentBlock.payload.Offsets = a.currentBlock.payload.Offsets[:1] + } + + a.isEndOfToken = false + a.currentBlock.ext.maxTID = a.currentTID + a.currentBlock.payload.LIDs = append(a.currentBlock.payload.LIDs, lid) + } + + a.isEndOfToken = true + a.currentBlock.payload.Offsets = append( + a.currentBlock.payload.Offsets, + uint32(len(a.currentBlock.payload.LIDs)), + ) + + return nil +} + +func (a *lidAccumulator) finalize() error { + return a.onBlock(a.finalizeBlock()) +} + +func (a *lidAccumulator) finalizeBlock() unpackedLIDBlock { + if !a.isEndOfToken { + a.currentBlock.payload.Offsets = append( + a.currentBlock.payload.Offsets, + uint32(len(a.currentBlock.payload.LIDs)), + ) + } + + result := a.currentBlock + result.payload.IsLastLID = a.isEndOfToken + result.ext.isContinued = a.isContinued + + a.isContinued = !a.isEndOfToken + return result +} diff --git a/frac/sealed/sealing/writer.go b/indexwriter/writer.go similarity index 96% rename from frac/sealed/sealing/writer.go rename to indexwriter/writer.go index 1a147e4e..7746c1db 100644 --- a/frac/sealed/sealing/writer.go +++ b/indexwriter/writer.go @@ -1,4 +1,4 @@ -package sealing +package indexwriter import ( "bytes" @@ -73,7 +73,7 @@ func newWriter(ws io.WriteSeeker) (*writer, error) { } func (w *writer) writeBlock(btype string, block indexBlock) error { - header, payload := block.Bin(int64(w.pos)) + header, payload := block.bin(int64(w.pos)) if _, err := w.wpayload.Write(payload); err != nil { return err } @@ -92,7 +92,7 @@ func (w *writer) writeBlock(btype string, block indexBlock) error { } func (w *writer) writeEmptyBlock() error { - header, _ := indexBlock{}.Bin(int64(w.pos)) + header, _ := indexBlock{}.bin(int64(w.pos)) w.wheader.Write(header) return nil } diff --git a/packer/delta_bitpacker.go b/packer/delta_bitpacker.go index 36fd68df..7185d23e 100644 --- a/packer/delta_bitpacker.go +++ b/packer/delta_bitpacker.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "unsafe" "github.com/ronanh/intcomp" @@ -13,6 +14,12 @@ const ( sizeOfUint32 = int(unsafe.Sizeof(uint32(0))) ) +var uint32Pool = sync.Pool{ + New: func() any { + return make([]uint32, 0, 16*1024) + }, +} + // CompressDeltaBitpackUint32 works on top of intcomp library. intcomp can only compress slices which are multiple of 128, but // this function supports slices of any length. Residual part is always less than 128 numbers and is not delta encoded, // since we know the number of blocks with length non-multiple of 128 is very low. @@ -36,6 +43,19 @@ func CompressDeltaBitpackUint32(dst []byte, values, buf []uint32) []byte { return dst } +// CompressDeltaBitpackUint16 uses a temporary buffer to copy and cast values from uint16 to uint32 so it's a bit slower than CompressDeltaBitpackUint32. +func CompressDeltaBitpackUint16(dst []byte, values []uint16, buf []uint32) []byte { + uint32Values, _ := uint32Pool.Get().([]uint32) + uint32Values = uint32Values[:0] + + for _, i := range values { + uint32Values = append(uint32Values, uint32(i)) + } + dst = CompressDeltaBitpackUint32(dst, uint32Values, buf) + uint32Pool.Put(uint32Values) + return dst +} + func DecompressDeltaBitpackUint32(data []byte, buf, compressed []uint32) ([]byte, []uint32, error) { if len(data) < sizeOfUint32 { return nil, nil, fmt.Errorf("not enough data. slice len %d", len(data)) @@ -81,6 +101,26 @@ func DecompressDeltaBitpackUint32(data []byte, buf, compressed []uint32) ([]byte return data, buf, nil } +// DecompressDeltaBitpackUint16 works on top of DecompressDeltaBitpackUint32 so it's a bit slower +func DecompressDeltaBitpackUint16(data []byte, buf []uint16, compressed []uint32) ([]byte, []uint16, error) { + uint32Values, _ := uint32Pool.Get().([]uint32) + uint32Values = uint32Values[:0] + + var ( + values []uint32 + err error + ) + + data, values, err = DecompressDeltaBitpackUint32(data, uint32Values, compressed) + + for _, i := range values { + buf = append(buf, uint16(i)) + } + uint32Pool.Put(uint32Values) + + return data, buf, err +} + // CompressDeltaBitpackUint64 works on top of intcomp library. intcomp can only compress uint64 slices which are multiple of 256, but // this function supports slices of any length. Residual part is always less than 256 uint64 numbers and is not delta encoded, // since we know the number of blocks with length non-multiple of 256 is very low. diff --git a/packer/delta_bitpacker_test.go b/packer/delta_bitpacker_test.go index 049c658c..9334e8dc 100644 --- a/packer/delta_bitpacker_test.go +++ b/packer/delta_bitpacker_test.go @@ -7,6 +7,68 @@ import ( "github.com/stretchr/testify/require" ) +func TestCompressDeltaBitpackUint16(t *testing.T) { + testCases := []struct { + name string + values []uint16 + }{ + { + name: "empty", + values: []uint16{}, + }, + { + name: "small_single_value", + values: []uint16{1}, + }, + { + name: "small_few_values", + values: []uint16{1, 4, 7, 8, 10}, + }, + { + name: "small_127_values", + values: generateUint16(127), + }, + { + name: "small_128", + values: generateUint16(128), + }, + { + name: "small_129", + values: generateUint16(129), + }, + { + name: "midium_4k", + values: generateUint16(4096), + }, + { + name: "midium_4k_more", + values: generateUint16(4105), + }, + { + name: "midium_64k", + values: generateUint16(64 * 1024), + }, + { + name: "midium_64k_more", + values: generateUint16(64*1024 + 34), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + compressed := CompressDeltaBitpackUint16([]byte{}, tc.values, []uint32{}) + tmp := make([]uint32, 0, len(compressed)/sizeOfUint32) + _, decompressed, err := DecompressDeltaBitpackUint16(compressed, []uint16{}, tmp) + require.NoError(t, err) + if len(tc.values) > 0 { + require.Equal(t, tc.values, decompressed) + } else { + require.Equal(t, 0, len(decompressed)) + } + }) + } +} + func TestCompressDeltaBitpackUint32(t *testing.T) { testCases := []struct { name string @@ -131,6 +193,16 @@ func TestCompressDeltaBitpackUint64(t *testing.T) { } } +func generateUint16(n int) []uint16 { + v := make([]uint16, n) + last := uint16(100) + for i := range v { + v[i] = last + last += uint16(1 + rand.Intn(5)) + } + return v +} + func generateUint32(n int) []uint32 { v := make([]uint32, n) last := uint32(100) diff --git a/frac/sealed/sealing/sealer.go b/sealing/sealer.go similarity index 61% rename from frac/sealed/sealing/sealer.go rename to sealing/sealer.go index 57863d82..0c21ffc4 100644 --- a/frac/sealed/sealing/sealer.go +++ b/sealing/sealer.go @@ -2,41 +2,19 @@ package sealing import ( "errors" - "iter" "os" "path/filepath" "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac/common" "github.com/ozontech/seq-db/frac/sealed" - "github.com/ozontech/seq-db/seq" + "github.com/ozontech/seq-db/indexwriter" "github.com/ozontech/seq-db/util" ) -type ( - DocLocation = util.Pair[seq.ID, seq.DocPos] - TokenPosting = util.Pair[[]byte, []uint32] -) - -// Source interface defines the contract for data sources that can be sealed. -// Provides access to all necessary data components for index creation -type Source interface { - // Info returns metadata describing this source. - Info() *common.Info - - // ID returns an iterator over stored document identifiers paired with - // their positions, in descending [seq.ID] order. - ID() iter.Seq2[DocLocation, error] - - // BlockOffsets returns byte offsets to each document block - // within this source's `.docs` file. - BlockOffsets() []uint64 - - // TokenTriplet iterates over fields in lexicographic order. - // For each field, it yields tokens (lexicographically sorted) - // paired with the local document ID list for that token. - TokenTriplet() iter.Seq2[string, iter.Seq2[TokenPosting, error]] -} +// Source defines the contract for data sources that can be sealed. +// Provides access to all necessary data components for index creation. +type Source = indexwriter.Source // Seal writes five index files (.info, .token, .offsets, .id, .lid) for the fraction // and returns PreloadedData for fast initialization of the sealed fraction. @@ -47,12 +25,11 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { return nil, errors.New("sealing of an empty active fraction is not supported") } - sealer := NewIndexSealer(params) - + w := indexwriter.New(params) if err := createAndWrite( info.Path+consts.OffsetsTmpFileSuffix, info.Path+consts.OffsetsFileSuffix, - func(f *os.File) error { return sealer.WriteOffsetsFile(f, src) }, + func(f *os.File) error { return w.WriteOffsetsFile(f, src) }, ); err != nil { return nil, err } @@ -60,7 +37,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWrite( info.Path+consts.IDTmpFileSuffix, info.Path+consts.IDFileSuffix, - func(f *os.File) error { return sealer.WriteIDFile(f, src) }, + func(f *os.File) error { return w.WriteIDFile(f, src) }, ); err != nil { return nil, err } @@ -68,7 +45,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWriteBoth( info.Path+consts.TokenTmpFileSuffix, info.Path+consts.TokenFileSuffix, info.Path+consts.LIDTmpFileSuffix, info.Path+consts.LIDFileSuffix, - func(tokenF, lidF *os.File) error { return sealer.WriteTokenTriplet(tokenF, lidF, src) }, + func(tokenF, lidF *os.File) error { return w.WriteTokenTriplet(tokenF, lidF, src) }, ); err != nil { return nil, err } @@ -76,7 +53,7 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { if err := createAndWrite( info.Path+consts.InfoTmpFileSuffix, info.Path+consts.InfoFileSuffix, - func(f *os.File) error { return sealer.WriteInfoFile(f, src) }, + func(f *os.File) error { return w.WriteInfoFile(f, src) }, ); err != nil { return nil, err } @@ -100,13 +77,13 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { } info.IndexOnDisk = totalSize - lidsTable := sealer.LIDsTable() + lidsTable := w.LIDsTable() preloaded := &sealed.PreloadedData{ Info: info, - TokenTable: sealer.TokenTable(), + TokenTable: w.TokenTable(), BlocksData: sealed.BlocksData{ - IDsTable: sealer.IDsTable(), + IDsTable: w.IDsTable(), LIDsTable: &lidsTable, BlocksOffsets: src.BlockOffsets(), }, @@ -123,10 +100,7 @@ func syncAndClose(f *os.File) error { return f.Close() } -func createAndWrite( - tmp, final string, - write func(*os.File) error, -) error { +func createAndWrite(tmp, final string, write func(*os.File) error) error { f, err := os.Create(tmp) if err != nil { return err @@ -140,16 +114,16 @@ func createAndWrite( } func createAndWriteBoth( - tmpa, finala, - tmpb, finalb string, + atmp, afinal, + btmp, bfinal string, write func(*os.File, *os.File) error, ) error { - a, err := os.Create(tmpa) + a, err := os.Create(atmp) if err != nil { return err } - b, err := os.Create(tmpb) + b, err := os.Create(btmp) if err != nil { a.Close() return err @@ -160,9 +134,9 @@ func createAndWriteBoth( return err } - if err := os.Rename(tmpa, finala); err != nil { + if err := os.Rename(atmp, afinal); err != nil { return err } - return os.Rename(tmpb, finalb) + return os.Rename(btmp, bfinal) } diff --git a/seq/seq.go b/seq/seq.go index adae4265..d3557a16 100644 --- a/seq/seq.go +++ b/seq/seq.go @@ -11,9 +11,13 @@ import ( ) var ( - SystemMID MID = math.MaxUint64 - SystemRID RID = math.MaxUint64 - SystemID ID = ID{SystemMID, SystemRID} + SystemMID MID = math.MaxUint64 + SystemRID RID = math.MaxUint64 + + SystemID ID = ID{SystemMID, SystemRID} + MinID ID = ID{0, 0} + MaxID ID = SystemID + SystemDocPos DocPos = DocPos(0) ) diff --git a/util/size.go b/util/size.go index 048b6405..4f22b845 100644 --- a/util/size.go +++ b/util/size.go @@ -4,6 +4,7 @@ import "unsafe" const ( SizeOfString = int(unsafe.Sizeof("")) + SizeOfUint16 = int(unsafe.Sizeof(uint16(0))) SizeOfUint32 = int(unsafe.Sizeof(uint32(0))) SizeOfUint64 = int(unsafe.Sizeof(uint64(0))) SizeOfPointer = int(unsafe.Sizeof(int(0)))