Skip to content
Open
14 changes: 13 additions & 1 deletion .seqbench/baseline.env
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
GOGC=100

SEQDB_STORAGE_FRAC_SIZE=16MiB
SEQDB_STORAGE_FRAC_SIZE=1MiB
SEQDB_STORAGE_TOTAL_SIZE=10GiB

SEQDB_COMPACTION_ENABLED=true
SEQDB_COMPACTION_WORKERS=4
SEQDB_COMPACTION_TIME_WINDOW=1h
SEQDB_COMPACTION_TICK_INTERVAL=1s

SEQDB_COMPACTION_STCS_MERGE_TRIGGER=4
SEQDB_COMPACTION_STCS_MERGE_FAN_IN=8
SEQDB_COMPACTION_STCS_MERGE_FAN_OUT_SIZE=256MiB

SEQDB_COMPACTION_STCS_BUCKET_LOWERBOUND=0.5
SEQDB_COMPACTION_STCS_BUCKET_UPPERBOUND=1.5

SEQDB_LIMITS_QUERY_RATE=1024
SEQDB_LIMITS_SEARCH_REQUESTS=1024
SEQDB_LIMITS_BULK_REQUESTS=128
Expand Down
14 changes: 13 additions & 1 deletion .seqbench/comparison.env
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
GOGC=100

SEQDB_STORAGE_FRAC_SIZE=16MiB
SEQDB_STORAGE_FRAC_SIZE=1MiB
SEQDB_STORAGE_TOTAL_SIZE=10GiB

SEQDB_COMPACTION_ENABLED=true
SEQDB_COMPACTION_WORKERS=4
SEQDB_COMPACTION_TIME_WINDOW=1h
SEQDB_COMPACTION_TICK_INTERVAL=1s

SEQDB_COMPACTION_STCS_MERGE_TRIGGER=4
SEQDB_COMPACTION_STCS_MERGE_FAN_IN=8
SEQDB_COMPACTION_STCS_MERGE_FAN_OUT_SIZE=256MiB

SEQDB_COMPACTION_STCS_BUCKET_LOWERBOUND=0.5
SEQDB_COMPACTION_STCS_BUCKET_UPPERBOUND=1.5

SEQDB_LIMITS_QUERY_RATE=1024
SEQDB_LIMITS_SEARCH_REQUESTS=1024
SEQDB_LIMITS_BULK_REQUESTS=128
Expand Down
14 changes: 13 additions & 1 deletion .seqbench/continuous.env
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,21 @@ GOGC=100

SEQDB_RESOURCES_SKIP_FSYNC=true

SEQDB_STORAGE_FRAC_SIZE=16MiB
SEQDB_STORAGE_FRAC_SIZE=1MiB
SEQDB_STORAGE_TOTAL_SIZE=10GiB

SEQDB_COMPACTION_ENABLED=true
SEQDB_COMPACTION_WORKERS=4
SEQDB_COMPACTION_TIME_WINDOW=1h
SEQDB_COMPACTION_TICK_INTERVAL=1s

SEQDB_COMPACTION_STCS_MERGE_TRIGGER=4
SEQDB_COMPACTION_STCS_MERGE_FAN_IN=8
SEQDB_COMPACTION_STCS_MERGE_FAN_OUT_SIZE=256MiB

SEQDB_COMPACTION_STCS_BUCKET_LOWERBOUND=0.5
SEQDB_COMPACTION_STCS_BUCKET_UPPERBOUND=1.5

SEQDB_LIMITS_QUERY_RATE=1024
SEQDB_LIMITS_SEARCH_REQUESTS=1024
SEQDB_LIMITS_BULK_REQUESTS=128
Expand Down
17 changes: 13 additions & 4 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,19 @@ func analyzeIndex(
logger.Fatal("error unpacking lids block", zap.Error(err))
}

last := len(block.Offsets) - 2
for i := 0; i <= last; i++ {
tokenLIDs = append(tokenLIDs, block.LIDs[block.Offsets[i]:block.Offsets[i+1]]...)
if i < last || block.IsLastLID { // the end of token lids
listsCount := block.GetCount()
for i := 0; i < listsCount; i++ {
lidsBatch := block.GetLIDs(i)
iter := lidsBatch.Iter()
for {
lid, ok := iter.Next()
if !ok {
break
}
tokenLIDs = append(tokenLIDs, lid)
}

if i < listsCount || block.IsLastLID() { // the end of token lids
lidsTotal += len(tokenLIDs)
lidsLens[tid] = len(tokenLIDs)
lidsUniq[getLIDsHash(tokenLIDs)] = len(tokenLIDs)
Expand Down
17 changes: 17 additions & 0 deletions cmd/seq-db/seq-db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/ozontech/seq-db/asyncsearcher"
"github.com/ozontech/seq-db/buildinfo"
"github.com/ozontech/seq-db/compaction"
"github.com/ozontech/seq-db/config"
"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac"
Expand Down Expand Up @@ -272,6 +273,7 @@ func startStore(
TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel,
DocBlocksZstdLevel: cfg.Compression.DocBlockZstdCompressionLevel,
DocBlockSize: int(cfg.DocsSorting.DocBlockSize),
LidsBitmapThreshold: cfg.Sealing.Lids.BitmapThreshold,
},
Fraction: frac.Config{
Search: frac.SearchConfig{
Expand All @@ -289,6 +291,7 @@ func startStore(
OffloadingRetention: cfg.Offloading.Retention,
OffloadingRetryDelay: cfg.Offloading.RetryDelay,
OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100),
CompactionEnabled: cfg.Compaction.Enabled,
},
API: storeapi.APIConfig{
StoreMode: configMode,
Expand Down Expand Up @@ -324,6 +327,20 @@ func startStore(
Workers: cfg.SkipMaskManager.Workers,
CacheSizeLimit: uint64(cfg.SkipMaskManager.CacheSize),
},
Compaction: compaction.Config{
Enabled: cfg.Compaction.Enabled,

MergeTrigger: cfg.Compaction.STCS.MergeTrigger,
MergeFanIn: cfg.Compaction.STCS.MergeFanIn,
MergeFanOutSize: uint64(cfg.Compaction.STCS.MergeFanOutSize),

BucketLowerbound: cfg.Compaction.STCS.BucketLowerbound,
BucketUpperbound: cfg.Compaction.STCS.BucketUpperbound,

Workers: cfg.Compaction.Workers,
TimeWindow: cfg.Compaction.TimeWindow,
TickInterval: cfg.Compaction.TickInterval,
},
}

s3cli := initS3Client(cfg)
Expand Down
76 changes: 76 additions & 0 deletions compaction/executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package compaction

import (
"sync"
"time"

"go.uber.org/zap"

"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/logger"
)

type Executor struct {
params common.SealParams

workers int
wg sync.WaitGroup

p *planner
}

func NewExecutor(workers int, params common.SealParams, p *planner) *Executor {
e := Executor{params: params, workers: workers, p: p}
e.init()
return &e
}

func (e *Executor) Stop() {
e.p.stop()
e.wg.Wait()
}

func (e *Executor) init() {
for range e.workers {
e.wg.Go(func() {
for t := range e.p.tasks {
start := time.Now()

result, err := e.compact(t)
compactionDurationSeconds.
WithLabelValues(t.bucketSize).
Observe(time.Since(start).Seconds())

t.onComplete(result, err)
}
})
}
}

func (e *Executor) compact(t task) (*sealed.PreloadedData, error) {
var (
names []string
srcs []Source
)

for _, f := range t.snapshot.Fractions() {
names = append(names, f.Info().Name())
srcs = append(srcs, frac.NewSealedSource(f))

compactionBytesTotal.
WithLabelValues(t.bucketSize).
Add(float64(f.Info().IndexOnDisk))
}

logger.Info(
"compacting fractions",
zap.Time("bin", t.bin),
zap.Strings("names", names),
zap.String("bucket_size", t.bucketSize),
)

preloaded, err := Merge(t.filename, e.params, srcs...)
return preloaded, err
}
162 changes: 162 additions & 0 deletions compaction/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package compaction

import (
"errors"
"os"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac/common"
"github.com/ozontech/seq-db/frac/sealed"
"github.com/ozontech/seq-db/indexwriter"
)

func Merge(filename string, params common.SealParams, srcs ...Source) (*sealed.PreloadedData, error) {
w := indexwriter.New(params)
src := NewMergeSource(filename, srcs)

if err := createAndWrite(
filename+consts.OffsetsTmpFileSuffix,
filename+consts.OffsetsFileSuffix,
func(f *os.File) error { return w.WriteOffsetsFile(f, src) },
); err != nil {
return nil, err
}

if err := createAndWrite(
filename+consts.IDTmpFileSuffix,
filename+consts.IDFileSuffix,
func(f *os.File) error { return w.WriteIDFile(f, src) },
); err != nil {
return nil, err
}

if err := createAndWriteBoth(
filename+consts.TokenTmpFileSuffix,
filename+consts.TokenFileSuffix,
filename+consts.LIDTmpFileSuffix,
filename+consts.LIDFileSuffix,
func(tf, lf *os.File) error { return w.WriteTokenTriplet(tf, lf, src) },
); err != nil {
return nil, err
}

if err := createAndWrite(
filename+consts.InfoTmpFileSuffix,
filename+consts.InfoFileSuffix,
func(f *os.File) error { return w.WriteInfoFile(f, src) },
); err != nil {
return nil, err
}

if err := mergeDocs(filename, srcs...); err != nil {
return nil, err
}

info := src.Info()
info.IndexOnDisk = 0

for _, suffix := range []string{
consts.InfoFileSuffix,
consts.TokenFileSuffix,
consts.OffsetsFileSuffix,
consts.IDFileSuffix,
consts.LIDFileSuffix,
} {
st, err := os.Stat(info.Path + suffix)
if err != nil {
return nil, err
}
info.IndexOnDisk += uint64(st.Size())
}

lidsTable := w.LIDsTable()
preloaded := &sealed.PreloadedData{
Info: info,
TokenTable: w.TokenTable(),
BlocksData: sealed.BlocksData{
LIDsTable: &lidsTable,
IDsTable: w.IDsTable(),
BlocksOffsets: src.BlockOffsets(),
},
}

return preloaded, nil
}

func mergeDocs(filename string, srcs ...Source) error {
return createAndWrite(
filename+consts.DocsTmpFileSuffix,
filename+consts.DocsFileSuffix,
func(f *os.File) error {
var docsSize uint64
for _, src := range srcs {
for loc, err := range src.DocBlock() {
if err != nil {
return err
}

payload, offset := loc.First, loc.Second
if _, err := f.WriteAt(payload, int64(offset+docsSize)); err != nil {
return err
}
}

docsSize += src.Info().DocsOnDisk
}

return nil
},
)
}

func syncAndClose(f *os.File) error {
if err := f.Sync(); err != nil {
f.Close()
return err
}
return f.Close()
}

func createAndWrite(
tmp, final string,
write func(*os.File) error,
) error {
f, err := os.Create(tmp)
if err != nil {
return err
}

if err := errors.Join(write(f), syncAndClose(f)); err != nil {
return err
}

return os.Rename(tmp, final)
}

func createAndWriteBoth(
atmp, afinal,
btmp, bfinal string,
write func(*os.File, *os.File) error,
) error {
a, err := os.Create(atmp)
if err != nil {
return err
}

b, err := os.Create(btmp)
if err != nil {
a.Close()
return err
}

writeErr := write(a, b)
if err := errors.Join(writeErr, syncAndClose(a), syncAndClose(b)); err != nil {
return err
}

if err := os.Rename(atmp, afinal); err != nil {
return err
}

return os.Rename(btmp, bfinal)
}
Loading