Skip to content
This repository was archived by the owner on Sep 11, 2020. It is now read-only.

Commit 9d4279f

Browse files
committed
plumbing: packfile/scanner, readability/performance improvements, zlib pooling
Signed-off-by: Arran Walker <[email protected]>
1 parent e5268e9 commit 9d4279f

File tree

3 files changed

+143
-105
lines changed

3 files changed

+143
-105
lines changed

plumbing/format/packfile/common.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package packfile
22

33
import (
44
"bytes"
5+
"compress/zlib"
56
"io"
67
"sync"
78

@@ -66,3 +67,12 @@ var bufPool = sync.Pool{
6667
return bytes.NewBuffer(nil)
6768
},
6869
}
70+
71+
var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01}
72+
73+
var zlibReaderPool = sync.Pool{
74+
New: func() interface{} {
75+
r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes))
76+
return r
77+
},
78+
}

plumbing/format/packfile/scanner.go

Lines changed: 84 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ type ObjectHeader struct {
3939
}
4040

4141
type Scanner struct {
42-
r reader
43-
zr readerResetter
42+
r *scannerReader
4443
crc hash.Hash32
4544

4645
// pendingObject is used to detect if an object has been read, or still
@@ -56,19 +55,27 @@ type Scanner struct {
5655
// NewScanner returns a new Scanner based on a reader, if the given reader
5756
// implements io.ReadSeeker the Scanner will be also Seekable
5857
func NewScanner(r io.Reader) *Scanner {
59-
seeker, ok := r.(io.ReadSeeker)
60-
if !ok {
61-
seeker = &trackableReader{Reader: r}
62-
}
58+
_, ok := r.(io.ReadSeeker)
6359

6460
crc := crc32.NewIEEE()
6561
return &Scanner{
66-
r: newTeeReader(newByteReadSeeker(seeker), crc),
62+
r: newScannerReader(r, crc),
6763
crc: crc,
6864
IsSeekable: ok,
6965
}
7066
}
7167

68+
func (s *Scanner) Reset(r io.Reader) {
69+
_, ok := r.(io.ReadSeeker)
70+
71+
s.r.Reset(r)
72+
s.crc.Reset()
73+
s.IsSeekable = ok
74+
s.pendingObject = nil
75+
s.version = 0
76+
s.objects = 0
77+
}
78+
7279
// Header reads the whole packfile header (signature, version and object count).
7380
// It returns the version and the object count and performs checks on the
7481
// validity of the signature and the version fields.
@@ -182,8 +189,7 @@ func (s *Scanner) NextObjectHeader() (*ObjectHeader, error) {
182189
// nextObjectHeader returns the ObjectHeader for the next object in the reader
183190
// without the Offset field
184191
func (s *Scanner) nextObjectHeader() (*ObjectHeader, error) {
185-
defer s.Flush()
186-
192+
s.r.Flush()
187193
s.crc.Reset()
188194

189195
h := &ObjectHeader{}
@@ -304,35 +310,29 @@ func (s *Scanner) readLength(first byte) (int64, error) {
304310
// NextObject writes the content of the next object into the reader, returns
305311
// the number of bytes written, the CRC32 of the content and an error, if any
306312
func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err error) {
307-
defer s.crc.Reset()
308-
309313
s.pendingObject = nil
310314
written, err = s.copyObject(w)
311-
s.Flush()
315+
316+
s.r.Flush()
312317
crc32 = s.crc.Sum32()
318+
s.crc.Reset()
319+
313320
return
314321
}
315322

316323
// ReadRegularObject reads and write a non-deltified object
317324
// from it zlib stream in an object entry in the packfile.
318325
func (s *Scanner) copyObject(w io.Writer) (n int64, err error) {
319-
if s.zr == nil {
320-
var zr io.ReadCloser
321-
zr, err = zlib.NewReader(s.r)
322-
if err != nil {
323-
return 0, fmt.Errorf("zlib initialization error: %s", err)
324-
}
326+
zr := zlibReaderPool.Get().(io.ReadCloser)
327+
defer zlibReaderPool.Put(zr)
325328

326-
s.zr = zr.(readerResetter)
327-
} else {
328-
if err = s.zr.Reset(s.r, nil); err != nil {
329-
return 0, fmt.Errorf("zlib reset error: %s", err)
330-
}
329+
if err = zr.(zlib.Resetter).Reset(s.r, nil); err != nil {
330+
return 0, fmt.Errorf("zlib reset error: %s", err)
331331
}
332332

333-
defer ioutil.CheckClose(s.zr, &err)
333+
defer ioutil.CheckClose(zr, &err)
334334
buf := byteSlicePool.Get().([]byte)
335-
n, err = io.CopyBuffer(w, s.zr, buf)
335+
n, err = io.CopyBuffer(w, zr, buf)
336336
byteSlicePool.Put(buf)
337337
return
338338
}
@@ -378,110 +378,89 @@ func (s *Scanner) Close() error {
378378
return err
379379
}
380380

381-
// Flush finishes writing the buffer to crc hasher in case we are using
382-
// a teeReader. Otherwise it is a no-op.
381+
// Flush is a no-op (deprecated)
383382
func (s *Scanner) Flush() error {
384-
tee, ok := s.r.(*teeReader)
385-
if ok {
386-
return tee.Flush()
387-
}
388383
return nil
389384
}
390385

391-
type trackableReader struct {
392-
count int64
393-
io.Reader
386+
// scannerReader has the following characteristics:
387+
// - Provides an io.SeekReader impl for bufio.Reader, when the underlying
388+
// reader supports it.
389+
// - Keeps track of the current read position, for when the underlying reader
390+
// isn't an io.SeekReader, but we still want to know the current offset.
391+
// - Writes to the hash writer what it reads, with the aid of a smaller buffer.
392+
// The buffer helps avoid a performance penality for performing small writes
393+
// to the crc32 hash writer.
394+
type scannerReader struct {
395+
reader io.Reader
396+
crc io.Writer
397+
rbuf *bufio.Reader
398+
wbuf *bufio.Writer
399+
offset int64
394400
}
395401

396-
// Read reads up to len(p) bytes into p.
397-
func (r *trackableReader) Read(p []byte) (n int, err error) {
398-
n, err = r.Reader.Read(p)
399-
r.count += int64(n)
400-
401-
return
402-
}
403-
404-
// Seek only supports io.SeekCurrent, any other operation fails
405-
func (r *trackableReader) Seek(offset int64, whence int) (int64, error) {
406-
if whence != io.SeekCurrent {
407-
return -1, ErrSeekNotSupported
402+
func newScannerReader(r io.Reader, h io.Writer) *scannerReader {
403+
sr := &scannerReader{
404+
rbuf: bufio.NewReader(nil),
405+
wbuf: bufio.NewWriterSize(nil, 64),
406+
crc: h,
408407
}
408+
sr.Reset(r)
409409

410-
return r.count, nil
410+
return sr
411411
}
412412

413-
func newByteReadSeeker(r io.ReadSeeker) *bufferedSeeker {
414-
return &bufferedSeeker{
415-
r: r,
416-
Reader: *bufio.NewReader(r),
417-
}
418-
}
413+
func (r *scannerReader) Reset(reader io.Reader) {
414+
r.reader = reader
415+
r.rbuf.Reset(r.reader)
416+
r.wbuf.Reset(r.crc)
419417

420-
type bufferedSeeker struct {
421-
r io.ReadSeeker
422-
bufio.Reader
423-
}
424-
425-
func (r *bufferedSeeker) Seek(offset int64, whence int) (int64, error) {
426-
if whence == io.SeekCurrent && offset == 0 {
427-
current, err := r.r.Seek(offset, whence)
428-
if err != nil {
429-
return current, err
430-
}
431-
432-
return current - int64(r.Buffered()), nil
418+
r.offset = 0
419+
if seeker, ok := r.reader.(io.ReadSeeker); ok {
420+
r.offset, _ = seeker.Seek(0, io.SeekCurrent)
433421
}
434-
435-
defer r.Reader.Reset(r.r)
436-
return r.r.Seek(offset, whence)
437422
}
438423

439-
type readerResetter interface {
440-
io.ReadCloser
441-
zlib.Resetter
442-
}
424+
func (r *scannerReader) Read(p []byte) (n int, err error) {
425+
n, err = r.rbuf.Read(p)
443426

444-
type reader interface {
445-
io.Reader
446-
io.ByteReader
447-
io.Seeker
427+
r.offset += int64(n)
428+
if _, err := r.wbuf.Write(p[:n]); err != nil {
429+
return n, err
430+
}
431+
return
448432
}
449433

450-
type teeReader struct {
451-
reader
452-
w hash.Hash32
453-
bufWriter *bufio.Writer
434+
func (r *scannerReader) ReadByte() (b byte, err error) {
435+
b, err = r.rbuf.ReadByte()
436+
if err == nil {
437+
r.offset++
438+
return b, r.wbuf.WriteByte(b)
439+
}
440+
return
454441
}
455442

456-
func newTeeReader(r reader, h hash.Hash32) *teeReader {
457-
return &teeReader{
458-
reader: r,
459-
w: h,
460-
bufWriter: bufio.NewWriter(h),
461-
}
443+
func (r *scannerReader) Flush() error {
444+
return r.wbuf.Flush()
462445
}
463446

464-
func (r *teeReader) Read(p []byte) (n int, err error) {
465-
r.Flush()
447+
// Seek seeks to a location. If the underlying reader is not an io.ReadSeeker,
448+
// then only whence=io.SeekCurrent is supported, any other operation fails.
449+
func (r *scannerReader) Seek(offset int64, whence int) (int64, error) {
450+
var err error
466451

467-
n, err = r.reader.Read(p)
468-
if n > 0 {
469-
if n, err := r.w.Write(p[:n]); err != nil {
470-
return n, err
452+
if seeker, ok := r.reader.(io.ReadSeeker); !ok {
453+
if whence != io.SeekCurrent || offset != 0 {
454+
return -1, ErrSeekNotSupported
455+
}
456+
} else {
457+
if whence == io.SeekCurrent && offset == 0 {
458+
return r.offset, nil
471459
}
472-
}
473-
return
474-
}
475460

476-
func (r *teeReader) ReadByte() (b byte, err error) {
477-
b, err = r.reader.ReadByte()
478-
if err == nil {
479-
return b, r.bufWriter.WriteByte(b)
461+
r.offset, err = seeker.Seek(offset, whence)
462+
r.rbuf.Reset(r.reader)
480463
}
481464

482-
return
483-
}
484-
485-
func (r *teeReader) Flush() (err error) {
486-
return r.bufWriter.Flush()
465+
return r.offset, err
487466
}

plumbing/format/packfile/scanner_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,55 @@ func (s *ScannerSuite) TestSeekObjectHeaderNonSeekable(c *C) {
135135
c.Assert(err, Equals, ErrSeekNotSupported)
136136
}
137137

138+
func (s *ScannerSuite) TestReaderReset(c *C) {
139+
r := fixtures.Basic().One().Packfile()
140+
p := NewScanner(r)
141+
142+
version, objects, err := p.Header()
143+
c.Assert(version, Equals, VersionSupported)
144+
c.Assert(objects, Equals, uint32(31))
145+
146+
h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset)
147+
c.Assert(err, IsNil)
148+
c.Assert(h, DeepEquals, &expectedHeadersOFS[0])
149+
150+
p.Reset(r)
151+
c.Assert(p.pendingObject, IsNil)
152+
c.Assert(p.version, Equals, uint32(0))
153+
c.Assert(p.objects, Equals, uint32(0))
154+
c.Assert(p.r.reader, Equals, r)
155+
c.Assert(p.r.offset > expectedHeadersOFS[0].Offset, Equals, true)
156+
157+
p.Reset(bytes.NewReader(nil))
158+
c.Assert(p.r.offset, Equals, int64(0))
159+
}
160+
161+
func (s *ScannerSuite) TestReaderResetSeeks(c *C) {
162+
r := fixtures.Basic().One().Packfile()
163+
164+
// seekable
165+
p := NewScanner(r)
166+
c.Assert(p.IsSeekable, Equals, true)
167+
h, err := p.SeekObjectHeader(expectedHeadersOFS[0].Offset)
168+
c.Assert(err, IsNil)
169+
c.Assert(h, DeepEquals, &expectedHeadersOFS[0])
170+
171+
// reset with seekable
172+
p.Reset(r)
173+
c.Assert(p.IsSeekable, Equals, true)
174+
h, err = p.SeekObjectHeader(expectedHeadersOFS[1].Offset)
175+
c.Assert(err, IsNil)
176+
c.Assert(h, DeepEquals, &expectedHeadersOFS[1])
177+
178+
// reset with non-seekable
179+
f := fixtures.Basic().ByTag("ref-delta").One()
180+
p.Reset(io.MultiReader(f.Packfile()))
181+
c.Assert(p.IsSeekable, Equals, false)
182+
183+
_, err = p.SeekObjectHeader(expectedHeadersOFS[4].Offset)
184+
c.Assert(err, Equals, ErrSeekNotSupported)
185+
}
186+
138187
var expectedHeadersOFS = []ObjectHeader{
139188
{Type: plumbing.CommitObject, Offset: 12, Length: 254},
140189
{Type: plumbing.OFSDeltaObject, Offset: 186, Length: 93, OffsetReference: 12},

0 commit comments

Comments
 (0)