Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/distribution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ func getReader(path string) (storage.IndexReader, *os.File) {
if err != nil {
panic(err)
}
return storage.NewIndexReader(readLimiter, f.Name(), f, c), f
readerProvider := storage.NewReaderProvider(readLimiter, f.Name(), f, c)
reader, err := readerProvider.GetReader()
if err != nil {
panic(err)
}
return reader, f
}

func readBlock(reader storage.IndexReader, blockIndex uint32) ([]byte, error) {
Expand Down
13 changes: 11 additions & 2 deletions cmd/index_analyzer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,17 @@ func analyzeIndex(
defer tokenFile.Close()
defer lidFile.Close()

tokenReader := storage.NewIndexReader(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry)
lidReader := storage.NewIndexReader(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry)
tokenReaderProvider := storage.NewReaderProvider(rl, tokenFile.Name(), tokenFile, indexCache.TokenRegistry)
tokenReader, err := tokenReaderProvider.GetReader()
if err != nil {
logger.Fatal("error creating tokenReader", zap.String("file", tokenFile.Name()), zap.Error(err))
}

lidReaderPorvider := storage.NewReaderProvider(rl, lidFile.Name(), lidFile, indexCache.LIDRegistry)
lidReader, err := lidReaderPorvider.GetReader()
if err != nil {
logger.Fatal("error creating lidReader", zap.String("file", lidFile.Name()), zap.Error(err))
}

// --- Info ---
var blockIndex uint32
Expand Down
101 changes: 73 additions & 28 deletions frac/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type Remote struct {
docsReader storage.DocsReader

// IsLegacy is true for fractions that use the old single .index file format.
IsLegacy bool
legacyFile storage.ImmutableFile
legacyReader storage.IndexReader
IsLegacy bool
legacyFile storage.ImmutableFile
legacyReaderProvider *storage.ReaderProvider

// Per-section index files and their readers (new split format only).
infoFile storage.ImmutableFile
Expand All @@ -55,10 +55,10 @@ type Remote struct {
idFile storage.ImmutableFile
lidFile storage.ImmutableFile

tokenReader storage.IndexReader
offsetsReader storage.IndexReader
idReader storage.IndexReader
lidReader storage.IndexReader
tokenReaderProvider *storage.ReaderProvider
offsetsReaderProvider *storage.ReaderProvider
idReaderProvider *storage.ReaderProvider
lidReaderProvider *storage.ReaderProvider

indexCache *IndexCache

Expand Down Expand Up @@ -171,14 +171,34 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
return nil, err
}

tokenReader := &f.tokenReader
lidReader := &f.lidReader
idReader := &f.idReader
var (
tokenReader storage.IndexReader
lidReader storage.IndexReader
idReader storage.IndexReader
)

if f.IsLegacy {
tokenReader = &f.legacyReader
lidReader = &f.legacyReader
idReader = &f.legacyReader
legacyReader, err := f.legacyReaderProvider.GetReader()
if err != nil {
return nil, err
}
tokenReader = legacyReader
lidReader = legacyReader
idReader = legacyReader
} else {
var err error
tokenReader, err = f.tokenReaderProvider.GetReader()
if err != nil {
return nil, err
}
lidReader, err = f.lidReaderProvider.GetReader()
if err != nil {
return nil, err
}
idReader, err = f.idReaderProvider.GetReader()
if err != nil {
return nil, err
}
}

return &sealedDataProvider{
Expand All @@ -190,13 +210,13 @@ func (f *Remote) createDataProvider(ctx context.Context) (*sealedDataProvider, e
docsReader: &f.docsReader,
blocksOffsets: f.blocksData.BlocksOffsets,
lidsTable: f.blocksData.LIDsTable,
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, tokenReader, f.indexCache.TokenTable),
lidsLoader: lids.NewLoader(f.info.BinaryDataVer, &lidReader, f.indexCache.LIDs),
tokenBlockLoader: token.NewBlockLoader(f.BaseFileName, &tokenReader, f.indexCache.Tokens),
tokenTableLoader: token.NewTableLoader(f.BaseFileName, f.IsLegacy, &tokenReader, f.indexCache.TokenTable),

idsTable: &f.blocksData.IDsTable,
idsProvider: seqids.NewProvider(
idReader,
&idReader,
f.indexCache.MIDs,
f.indexCache.RIDs,
f.indexCache.Params,
Expand Down Expand Up @@ -259,7 +279,11 @@ func (f *Remote) loadInfo() error {
if err := f.openInfoLegacy(); err != nil {
return err
}
if f.info, err = loadInfoLegacy(f.legacyReader); err != nil {
legacyReader, err := f.legacyReaderProvider.GetReader()
if err != nil {
logger.Fatal("error creating legacyReader", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
if f.info, err = loadInfoLegacy(legacyReader); err != nil {
logger.Fatal("error loading Info", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
return nil
Expand Down Expand Up @@ -292,16 +316,37 @@ func (f *Remote) init() error {
}

if f.IsLegacy {
(&LegacyLoader{}).Load(&f.blocksData, f.info, f.legacyReader)
legacyReader, err := f.legacyReaderProvider.GetReader()
if err != nil {
logger.Fatal("error creating legacyReader", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
(&LegacyLoader{}).Load(&f.blocksData, f.info, legacyReader)
f.isInited = true
return nil
}

tokenReader, err := f.tokenReaderProvider.GetReader()
if err != nil {
logger.Fatal("error creating tokenReader", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
offsetsReader, err := f.offsetsReaderProvider.GetReader()
if err != nil {
logger.Fatal("error creating offsetsReader", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
idReader, err := f.idReaderProvider.GetReader()
if err != nil {
logger.Fatal("error creating idReader", zap.String("fraction", f.BaseFileName), zap.Error(err))
}
lidReader, err := f.lidReaderProvider.GetReader()
if err != nil {
logger.Fatal("error creating lidReader", zap.String("fraction", f.BaseFileName), zap.Error(err))
}

(&Loader{}).Load(&f.blocksData, f.info, IndexReaders{
Token: f.tokenReader,
Offsets: f.offsetsReader,
ID: f.idReader,
LID: f.lidReader,
Token: tokenReader,
Offsets: offsetsReader,
ID: idReader,
LID: lidReader,
})

f.isInited = true
Expand All @@ -315,7 +360,7 @@ func (f *Remote) openInfoLegacy() error {

return f.openRemoteFile(consts.IndexFileSuffix, func(file storage.ImmutableFile) {
f.legacyFile = file
f.legacyReader = storage.NewIndexReader(
f.legacyReaderProvider = storage.NewReaderProvider(
f.readLimiter, file.Name(),
file, f.indexCache.LegacyRegistry,
)
Expand Down Expand Up @@ -349,7 +394,7 @@ func (f *Remote) openIndex() error {
consts.TokenFileSuffix,
func(file storage.ImmutableFile) {
f.tokenFile = file
f.tokenReader = storage.NewIndexReader(
f.tokenReaderProvider = storage.NewReaderProvider(
f.readLimiter, file.Name(),
file, f.indexCache.TokenRegistry,
)
Expand All @@ -364,7 +409,7 @@ func (f *Remote) openIndex() error {
consts.OffsetsFileSuffix,
func(file storage.ImmutableFile) {
f.offsetsFile = file
f.offsetsReader = storage.NewIndexReader(
f.offsetsReaderProvider = storage.NewReaderProvider(
f.readLimiter, file.Name(),
file, f.indexCache.OffsetsRegistry,
)
Expand All @@ -379,7 +424,7 @@ func (f *Remote) openIndex() error {
consts.IDFileSuffix,
func(file storage.ImmutableFile) {
f.idFile = file
f.idReader = storage.NewIndexReader(
f.idReaderProvider = storage.NewReaderProvider(
f.readLimiter, file.Name(),
file, f.indexCache.IDRegistry,
)
Expand All @@ -394,7 +439,7 @@ func (f *Remote) openIndex() error {
consts.LIDFileSuffix,
func(file storage.ImmutableFile) {
f.lidFile = file
f.lidReader = storage.NewIndexReader(
f.lidReaderProvider = storage.NewReaderProvider(
f.readLimiter, file.Name(),
file, f.indexCache.LIDRegistry,
)
Expand Down
Loading
Loading