...

Source file src/github.com/klauspost/compress/s2/index_test.go

Documentation: github.com/klauspost/compress/s2

     1  package s2_test
     2  
     3  import (
     4  	"bytes"
     5  	"encoding/hex"
     6  	"fmt"
     7  	"io"
     8  	"math/rand"
     9  	"os"
    10  	"sync"
    11  	"testing"
    12  
    13  	"github.com/klauspost/compress/s2"
    14  )
    15  
    16  func ExampleIndex_Load() {
    17  	fatalErr := func(err error) {
    18  		if err != nil {
    19  			panic(err)
    20  		}
    21  	}
    22  
    23  	// Create a test corpus
    24  	tmp := make([]byte, 5<<20)
    25  	rng := rand.New(rand.NewSource(0xbeefcafe))
    26  	rng.Read(tmp)
    27  	// Make it compressible...
    28  	for i, v := range tmp {
    29  		tmp[i] = '0' + v&3
    30  	}
    31  	// Compress it...
    32  	var buf bytes.Buffer
    33  	// We use smaller blocks just for the example...
    34  	enc := s2.NewWriter(&buf, s2.WriterBlockSize(100<<10))
    35  	err := enc.EncodeBuffer(tmp)
    36  	fatalErr(err)
    37  
    38  	// Close and get index...
    39  	idxBytes, err := enc.CloseIndex()
    40  	fatalErr(err)
    41  
    42  	// This is our compressed stream...
    43  	compressed := buf.Bytes()
    44  
    45  	var once sync.Once
    46  	for wantOffset := int64(0); wantOffset < int64(len(tmp)); wantOffset += 555555 {
    47  		// Let's assume we want to read from uncompressed offset 'i'
    48  		// and we cannot seek in input, but we have the index.
    49  		want := tmp[wantOffset:]
    50  
    51  		// Load the index.
    52  		var index s2.Index
    53  		_, err = index.Load(idxBytes)
    54  		fatalErr(err)
    55  
    56  		// Find offset in file:
    57  		compressedOffset, uncompressedOffset, err := index.Find(wantOffset)
    58  		fatalErr(err)
    59  
    60  		// Offset the input to the compressed offset.
    61  		// Notice how we do not provide any bytes before the offset.
    62  		input := io.Reader(bytes.NewBuffer(compressed[compressedOffset:]))
    63  		if _, ok := input.(io.Seeker); !ok {
    64  			// Notice how the input cannot be seeked...
    65  			once.Do(func() {
    66  				fmt.Println("Input does not support seeking...")
    67  			})
    68  		} else {
    69  			panic("did you implement seeking on bytes.Buffer?")
    70  		}
    71  
    72  		// When creating the decoder we must specify that it should not
    73  		// expect a stream identifier at the beginning og the frame.
    74  		dec := s2.NewReader(input, s2.ReaderIgnoreStreamIdentifier())
    75  
    76  		// We now have a reader, but it will start outputting at uncompressedOffset,
    77  		// and not the actual offset we want, so skip forward to that.
    78  		toSkip := wantOffset - uncompressedOffset
    79  		err = dec.Skip(toSkip)
    80  		fatalErr(err)
    81  
    82  		// Read the rest of the stream...
    83  		got, err := io.ReadAll(dec)
    84  		fatalErr(err)
    85  		if bytes.Equal(got, want) {
    86  			fmt.Println("Successfully skipped forward to", wantOffset)
    87  		} else {
    88  			fmt.Println("Failed to skip forward to", wantOffset)
    89  		}
    90  	}
    91  	// OUTPUT:
    92  	//Input does not support seeking...
    93  	//Successfully skipped forward to 0
    94  	//Successfully skipped forward to 555555
    95  	//Successfully skipped forward to 1111110
    96  	//Successfully skipped forward to 1666665
    97  	//Successfully skipped forward to 2222220
    98  	//Successfully skipped forward to 2777775
    99  	//Successfully skipped forward to 3333330
   100  	//Successfully skipped forward to 3888885
   101  	//Successfully skipped forward to 4444440
   102  	//Successfully skipped forward to 4999995
   103  }
   104  
   105  func TestSeeking(t *testing.T) {
   106  	compressed := bytes.Buffer{}
   107  
   108  	// Use small blocks so there are plenty of them.
   109  	enc := s2.NewWriter(&compressed, s2.WriterBlockSize(16<<10))
   110  	var nElems = 1_000_000
   111  	var testSizes = []int{100, 1_000, 10_000, 20_000, 100_000, 200_000, 400_000}
   112  	if testing.Short() {
   113  		nElems = 100_000
   114  		testSizes = []int{100, 1_000, 10_000, 20_000}
   115  	}
   116  	testSizes = append(testSizes, nElems-1)
   117  	//24 bytes per item plus \n = 25 bytes per record
   118  	for i := 0; i < nElems; i++ {
   119  		fmt.Fprintf(enc, "Item %019d\n", i)
   120  	}
   121  
   122  	index, err := enc.CloseIndex()
   123  	if err != nil {
   124  		t.Fatal(err)
   125  	}
   126  
   127  	// Test trimming
   128  	slim := s2.RemoveIndexHeaders(index)
   129  	if slim == nil {
   130  		t.Error("Removing headers failed")
   131  	}
   132  	restored := s2.RestoreIndexHeaders(slim)
   133  	if !bytes.Equal(restored, index) {
   134  		t.Errorf("want %s, got %s", hex.EncodeToString(index), hex.EncodeToString(restored))
   135  	}
   136  	t.Logf("Saved %d bytes", len(index)-len(slim))
   137  
   138  	for _, skip := range testSizes {
   139  		t.Run(fmt.Sprintf("noSeekSkip=%d", skip), func(t *testing.T) {
   140  			dec := s2.NewReader(io.NopCloser(bytes.NewReader(compressed.Bytes())))
   141  			seeker, err := dec.ReadSeeker(false, nil)
   142  			if err != nil {
   143  				t.Fatal(err)
   144  			}
   145  			buf := make([]byte, 25)
   146  			for rec := 0; rec < nElems; rec += skip {
   147  				offset := int64(rec * 25)
   148  				//t.Logf("Reading record %d", rec)
   149  				_, err := seeker.Seek(offset, io.SeekStart)
   150  				if err != nil {
   151  					t.Fatalf("Failed to seek: %v", err)
   152  				}
   153  				_, err = io.ReadFull(dec, buf)
   154  				if err != nil {
   155  					t.Fatalf("Failed to seek: %v", err)
   156  				}
   157  				expected := fmt.Sprintf("Item %019d\n", rec)
   158  				if string(buf) != expected {
   159  					t.Fatalf("Expected %q, got %q", expected, buf)
   160  				}
   161  			}
   162  		})
   163  		t.Run(fmt.Sprintf("seekSkip=%d", skip), func(t *testing.T) {
   164  			dec := s2.NewReader(io.ReadSeeker(bytes.NewReader(compressed.Bytes())))
   165  			seeker, err := dec.ReadSeeker(false, nil)
   166  			if err != nil {
   167  				t.Fatal(err)
   168  			}
   169  			buf := make([]byte, 25)
   170  			for rec := 0; rec < nElems; rec += skip {
   171  				offset := int64(rec * 25)
   172  				//t.Logf("Reading record %d", rec)
   173  				_, err := seeker.Seek(offset, io.SeekStart)
   174  				if err != nil {
   175  					t.Fatalf("Failed to seek: %v", err)
   176  				}
   177  				_, err = io.ReadFull(dec, buf)
   178  				if err != nil {
   179  					t.Fatalf("Failed to seek: %v", err)
   180  				}
   181  				expected := fmt.Sprintf("Item %019d\n", rec)
   182  				if string(buf) != expected {
   183  					t.Fatalf("Expected %q, got %q", expected, buf)
   184  				}
   185  			}
   186  		})
   187  		t.Run(fmt.Sprintf("noSeekIndexSkip=%d", skip), func(t *testing.T) {
   188  			dec := s2.NewReader(io.NopCloser(bytes.NewReader(compressed.Bytes())))
   189  			seeker, err := dec.ReadSeeker(false, index)
   190  			if err != nil {
   191  				t.Fatal(err)
   192  			}
   193  			buf := make([]byte, 25)
   194  			for rec := 0; rec < nElems; rec += skip {
   195  				offset := int64(rec * 25)
   196  				//t.Logf("Reading record %d", rec)
   197  				_, err := seeker.Seek(offset, io.SeekStart)
   198  				if err != nil {
   199  					t.Fatalf("Failed to seek: %v", err)
   200  				}
   201  				_, err = io.ReadFull(dec, buf)
   202  				if err != nil {
   203  					t.Fatalf("Failed to seek: %v", err)
   204  				}
   205  				expected := fmt.Sprintf("Item %019d\n", rec)
   206  				if string(buf) != expected {
   207  					t.Fatalf("Expected %q, got %q", expected, buf)
   208  				}
   209  			}
   210  		})
   211  		t.Run(fmt.Sprintf("seekIndexSkip=%d", skip), func(t *testing.T) {
   212  			dec := s2.NewReader(io.ReadSeeker(bytes.NewReader(compressed.Bytes())))
   213  
   214  			seeker, err := dec.ReadSeeker(false, index)
   215  			if err != nil {
   216  				t.Fatal(err)
   217  			}
   218  			buf := make([]byte, 25)
   219  			for rec := 0; rec < nElems; rec += skip {
   220  				offset := int64(rec * 25)
   221  				//t.Logf("Reading record %d", rec)
   222  				_, err := seeker.Seek(offset, io.SeekStart)
   223  				if err != nil {
   224  					t.Fatalf("Failed to seek: %v", err)
   225  				}
   226  				_, err = io.ReadFull(dec, buf)
   227  				if err != nil {
   228  					t.Fatalf("Failed to seek: %v", err)
   229  				}
   230  				expected := fmt.Sprintf("Item %019d\n", rec)
   231  				if string(buf) != expected {
   232  					t.Fatalf("Expected %q, got %q", expected, buf)
   233  				}
   234  			}
   235  		})
   236  	}
   237  	// Test seek current
   238  	t.Run("seekCurrent", func(t *testing.T) {
   239  		dec := s2.NewReader(io.ReadSeeker(bytes.NewReader(compressed.Bytes())))
   240  
   241  		seeker, err := dec.ReadSeeker(true, index)
   242  		if err != nil {
   243  			t.Fatal(err)
   244  		}
   245  		buf := make([]byte, 25)
   246  		rng := rand.New(rand.NewSource(0))
   247  		var currentOff int64
   248  		for i := 0; i < nElems/10; i++ {
   249  			rec := rng.Intn(nElems)
   250  			offset := int64(rec * 25)
   251  			//t.Logf("Reading record %d", rec)
   252  			absOff, err := seeker.Seek(offset-currentOff, io.SeekCurrent)
   253  			if err != nil {
   254  				t.Fatalf("Failed to seek: %v", err)
   255  			}
   256  			if absOff != offset {
   257  				t.Fatalf("Unexpected seek offset: want %v, got %v", offset, absOff)
   258  			}
   259  			_, err = io.ReadFull(dec, buf)
   260  			if err != nil {
   261  				t.Fatalf("Failed to seek: %v", err)
   262  			}
   263  			expected := fmt.Sprintf("Item %019d\n", rec)
   264  			if string(buf) != expected {
   265  				t.Fatalf("Expected %q, got %q", expected, buf)
   266  			}
   267  			// Adjust offset
   268  			currentOff = offset + int64(len(buf))
   269  		}
   270  	})
   271  	// Test ReadAt
   272  	t.Run("ReadAt", func(t *testing.T) {
   273  		dec := s2.NewReader(io.ReadSeeker(bytes.NewReader(compressed.Bytes())))
   274  
   275  		seeker, err := dec.ReadSeeker(true, index)
   276  		if err != nil {
   277  			t.Fatal(err)
   278  		}
   279  		buf := make([]byte, 25)
   280  		rng := rand.New(rand.NewSource(0))
   281  		for i := 0; i < nElems/10; i++ {
   282  			rec := rng.Intn(nElems)
   283  			offset := int64(rec * 25)
   284  			n, err := seeker.ReadAt(buf, offset)
   285  			if err != nil {
   286  				t.Fatalf("Failed to seek: %v", err)
   287  			}
   288  			if n != len(buf) {
   289  				t.Fatalf("Unexpected read length: want %v, got %v", len(buf), n)
   290  			}
   291  			expected := fmt.Sprintf("Item %019d\n", rec)
   292  			if string(buf) != expected {
   293  				t.Fatalf("Expected %q, got %q", expected, buf)
   294  			}
   295  		}
   296  	})
   297  }
   298  
   299  // ExampleIndexStream shows an example of indexing a stream
   300  // and indexing it after it has been written.
   301  // The index can either be appended.
   302  func ExampleIndexStream() {
   303  	fatalErr := func(err error) {
   304  		if err != nil {
   305  			panic(err)
   306  		}
   307  	}
   308  
   309  	// Create a test stream without index
   310  	var streamName = ""
   311  	tmp := make([]byte, 5<<20)
   312  	{
   313  		rng := rand.New(rand.NewSource(0xbeefcafe))
   314  		rng.Read(tmp)
   315  		// Make it compressible...
   316  		for i, v := range tmp {
   317  			tmp[i] = '0' + v&3
   318  		}
   319  		// Compress it...
   320  		output, err := os.CreateTemp("", "IndexStream")
   321  		streamName = output.Name()
   322  		fatalErr(err)
   323  
   324  		// We use smaller blocks just for the example...
   325  		enc := s2.NewWriter(output, s2.WriterSnappyCompat())
   326  		err = enc.EncodeBuffer(tmp)
   327  		fatalErr(err)
   328  
   329  		// Close and get index...
   330  		err = enc.Close()
   331  		fatalErr(err)
   332  		err = output.Close()
   333  		fatalErr(err)
   334  	}
   335  
   336  	// Open our compressed stream without an index...
   337  	stream, err := os.Open(streamName)
   338  	fatalErr(err)
   339  	defer stream.Close()
   340  
   341  	var indexInput = io.Reader(stream)
   342  	var indexOutput io.Writer
   343  	var indexedName string
   344  
   345  	// Should index be combined with stream by appending?
   346  	// This could also be done by appending to an os.File
   347  	// If not it will be written to a separate file.
   348  	const combineOutput = false
   349  
   350  	// Function to easier use defer.
   351  	func() {
   352  		if combineOutput {
   353  			output, err := os.CreateTemp("", "IndexStream-Combined")
   354  			fatalErr(err)
   355  			defer func() {
   356  				fatalErr(output.Close())
   357  				if false {
   358  					fi, err := os.Stat(output.Name())
   359  					fatalErr(err)
   360  					fmt.Println("Combined:", fi.Size(), "bytes")
   361  				} else {
   362  					fmt.Println("Index saved")
   363  				}
   364  			}()
   365  
   366  			// Everything read from stream will also be written to output.
   367  			indexedName = output.Name()
   368  			indexInput = io.TeeReader(stream, output)
   369  			indexOutput = output
   370  		} else {
   371  			output, err := os.CreateTemp("", "IndexStream-Index")
   372  			fatalErr(err)
   373  			defer func() {
   374  				fatalErr(output.Close())
   375  				fi, err := os.Stat(output.Name())
   376  				fatalErr(err)
   377  				if false {
   378  					fmt.Println("Index:", fi.Size(), "bytes")
   379  				} else {
   380  					fmt.Println("Index saved")
   381  				}
   382  			}()
   383  			indexedName = output.Name()
   384  			indexOutput = output
   385  		}
   386  
   387  		// Index the input
   388  		idx, err := s2.IndexStream(indexInput)
   389  		fatalErr(err)
   390  
   391  		// Write the index
   392  		_, err = indexOutput.Write(idx)
   393  		fatalErr(err)
   394  	}()
   395  
   396  	if combineOutput {
   397  		// Read from combined stream only.
   398  		stream, err := os.Open(indexedName)
   399  		fatalErr(err)
   400  		defer stream.Close()
   401  		// Create a reader with the input.
   402  		// We assert that the stream is an io.ReadSeeker.
   403  		r := s2.NewReader(io.ReadSeeker(stream))
   404  
   405  		// Request a ReadSeeker with random access.
   406  		// This will load the index from the stream.
   407  		rs, err := r.ReadSeeker(true, nil)
   408  		fatalErr(err)
   409  
   410  		_, err = rs.Seek(-10, io.SeekEnd)
   411  		fatalErr(err)
   412  
   413  		b, err := io.ReadAll(rs)
   414  		fatalErr(err)
   415  		if want := tmp[len(tmp)-10:]; !bytes.Equal(b, want) {
   416  			fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
   417  		}
   418  		fmt.Println("last 10 bytes read")
   419  
   420  		_, err = rs.Seek(10, io.SeekStart)
   421  		fatalErr(err)
   422  		_, err = io.ReadFull(rs, b)
   423  		fatalErr(err)
   424  		if want := tmp[10:20]; !bytes.Equal(b, want) {
   425  			fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
   426  		}
   427  		fmt.Println("10 bytes at offset 10 read")
   428  	} else {
   429  		// Read from separate stream and index.
   430  		stream, err := os.Open(streamName)
   431  		fatalErr(err)
   432  		defer stream.Close()
   433  		// Create a reader with the input.
   434  		// We assert that the stream is an io.ReadSeeker.
   435  		r := s2.NewReader(io.ReadSeeker(stream))
   436  
   437  		// Read the separate index.
   438  		index, err := os.ReadFile(indexedName)
   439  		fatalErr(err)
   440  
   441  		// Request a ReadSeeker with random access.
   442  		// The provided index will be used.
   443  		rs, err := r.ReadSeeker(true, index)
   444  		fatalErr(err)
   445  
   446  		_, err = rs.Seek(-10, io.SeekEnd)
   447  		fatalErr(err)
   448  
   449  		b, err := io.ReadAll(rs)
   450  		fatalErr(err)
   451  		if want := tmp[len(tmp)-10:]; !bytes.Equal(b, want) {
   452  			fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
   453  		}
   454  		fmt.Println("last 10 bytes read")
   455  
   456  		_, err = rs.Seek(10, io.SeekStart)
   457  		fatalErr(err)
   458  		_, err = io.ReadFull(rs, b)
   459  		fatalErr(err)
   460  		if want := tmp[10:20]; !bytes.Equal(b, want) {
   461  			fatalErr(fmt.Errorf("wanted %v, got %v", want, b))
   462  		}
   463  		fmt.Println("10 bytes at offset 10 read")
   464  	}
   465  
   466  	// OUTPUT:
   467  	// Index saved
   468  	// last 10 bytes read
   469  	// 10 bytes at offset 10 read
   470  }
   471  

View as plain text