1 package main
2
3 import (
4 "bufio"
5 "bytes"
6 "errors"
7 "flag"
8 "fmt"
9 "io"
10 "log"
11 "net/http"
12 "os"
13 "os/signal"
14 "runtime"
15 "runtime/debug"
16 "runtime/pprof"
17 "runtime/trace"
18 "strconv"
19 "strings"
20 "sync"
21 "time"
22 "unicode"
23
24 "github.com/klauspost/compress/s2"
25 "github.com/klauspost/compress/s2/cmd/internal/filepathx"
26 "github.com/klauspost/compress/s2/cmd/internal/readahead"
27 )
28
29 var (
30 faster = flag.Bool("faster", false, "Compress faster, but with a minor compression loss")
31 slower = flag.Bool("slower", false, "Compress more, but a lot slower")
32 snappy = flag.Bool("snappy", false, "Generate Snappy compatible output stream")
33 recomp = flag.Bool("recomp", false, "Recompress Snappy or S2 input")
34 cpu = flag.Int("cpu", runtime.GOMAXPROCS(0), "Compress using this amount of threads")
35 blockSize = flag.String("blocksize", "4M", "Max block size. Examples: 64K, 256K, 1M, 4M. Must be power of two and <= 4MB")
36 block = flag.Bool("block", false, "Compress as a single block. Will load content into memory.")
37 safe = flag.Bool("safe", false, "Do not overwrite output files")
38 index = flag.Bool("index", true, "Add seek index")
39 padding = flag.String("pad", "1", "Pad size to a multiple of this value, Examples: 500, 64K, 256K, 1M, 4M, etc")
40 stdout = flag.Bool("c", false, "Write all output to stdout. Multiple input files will be concatenated")
41 out = flag.String("o", "", "Write output to another file. Single input file only")
42 remove = flag.Bool("rm", false, "Delete source file(s) after successful compression")
43 quiet = flag.Bool("q", false, "Don't write any output to terminal, except errors")
44 bench = flag.Int("bench", 0, "Run benchmark n times. No output will be written")
45 verify = flag.Bool("verify", false, "Verify written files")
46 help = flag.Bool("help", false, "Display help")
47
48 cpuprofile, memprofile, traceprofile string
49
50 version = "(dev)"
51 date = "(unknown)"
52 )
53
54 const (
55 s2Ext = ".s2"
56 snappyExt = ".sz"
57 )
58
59 func main() {
60 if false {
61 flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
62 flag.StringVar(&memprofile, "memprofile", "", "write mem profile to file")
63 flag.StringVar(&traceprofile, "traceprofile", "", "write trace profile to file")
64 }
65 flag.Parse()
66 sz, err := toSize(*blockSize)
67 exitErr(err)
68 pad, err := toSize(*padding)
69 exitErr(err)
70
71 args := flag.Args()
72 if len(args) == 0 || *help || (*slower && *faster) {
73 _, _ = fmt.Fprintf(os.Stderr, "s2 compress v%v, built at %v.\n\n", version, date)
74 _, _ = fmt.Fprintf(os.Stderr, "Copyright (c) 2011 The Snappy-Go Authors. All rights reserved.\n"+
75 "Copyright (c) 2019+ Klaus Post. All rights reserved.\n\n")
76 _, _ = fmt.Fprintln(os.Stderr, `Usage: s2c [options] file1 file2
77
78 Compresses all files supplied as input separately.
79 Output files are written as 'filename.ext`+s2Ext+`' or 'filename.ext`+snappyExt+`'.
80 By default output files will be overwritten.
81 Use - as the only file name to read from stdin and write to stdout.
82
83 Wildcards are accepted: testdir/*.txt will compress all files in testdir ending with .txt
84 Directories can be wildcards as well. testdir/*/*.txt will match testdir/subdir/b.txt
85
86 File names beginning with 'http://' and 'https://' will be downloaded and compressed.
87 Only http response code 200 is accepted.
88
89 Options:`)
90 flag.PrintDefaults()
91 os.Exit(0)
92 }
93 opts := []s2.WriterOption{s2.WriterBlockSize(sz), s2.WriterConcurrency(*cpu), s2.WriterPadding(pad)}
94 if *index {
95 opts = append(opts, s2.WriterAddIndex())
96 }
97 if !*faster {
98 opts = append(opts, s2.WriterBetterCompression())
99 }
100 if *slower {
101 opts = append(opts, s2.WriterBestCompression())
102 }
103 if *snappy {
104 opts = append(opts, s2.WriterSnappyCompat())
105 }
106 wr := s2.NewWriter(nil, opts...)
107
108
109 if len(args) == 1 && args[0] == "-" {
110
111
112 signal.Notify(make(chan os.Signal, 1), os.Interrupt)
113 if len(*out) == 0 {
114 wr.Reset(os.Stdout)
115 } else {
116 if *safe {
117 _, err := os.Stat(*out)
118 if !os.IsNotExist(err) {
119 exitErr(errors.New("destination file exists"))
120 }
121 }
122 dstFile, err := os.OpenFile(*out, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, os.ModePerm)
123 exitErr(err)
124 defer dstFile.Close()
125 bw := bufio.NewWriterSize(dstFile, sz*2)
126 defer bw.Flush()
127 wr.Reset(bw)
128 }
129 _, err = wr.ReadFrom(os.Stdin)
130 printErr(err)
131 printErr(wr.Close())
132 return
133 }
134 var files []string
135
136 for _, pattern := range args {
137 if isHTTP(pattern) {
138 files = append(files, pattern)
139 continue
140 }
141 found, err := filepathx.Glob(pattern)
142 exitErr(err)
143 if len(found) == 0 {
144 exitErr(fmt.Errorf("unable to find file %v", pattern))
145 }
146 files = append(files, found...)
147 }
148 if cpuprofile != "" {
149 f, err := os.Create(cpuprofile)
150 if err != nil {
151 log.Fatal(err)
152 }
153 pprof.StartCPUProfile(f)
154 defer pprof.StopCPUProfile()
155 }
156 if memprofile != "" {
157 f, err := os.Create(memprofile)
158 if err != nil {
159 log.Fatal(err)
160 }
161 defer f.Close()
162 defer pprof.WriteHeapProfile(f)
163 }
164 if traceprofile != "" {
165 f, err := os.Create(traceprofile)
166 if err != nil {
167 log.Fatal(err)
168 }
169 defer f.Close()
170 err = trace.Start(f)
171 if err != nil {
172 log.Fatal(err)
173 }
174 defer trace.Stop()
175 }
176
177 *quiet = *quiet || *stdout
178 if *bench > 0 {
179 debug.SetGCPercent(10)
180 dec := s2.NewReader(nil)
181 for _, filename := range files {
182 if *block {
183 func() {
184 if !*quiet {
185 fmt.Print("Reading ", filename, "...")
186 }
187
188 file, size, _ := openFile(filename)
189 b := make([]byte, size)
190 _, err = io.ReadFull(file, b)
191 exitErr(err)
192 file.Close()
193 for i := 0; i < *bench; i++ {
194 if !*quiet {
195 fmt.Print("\nCompressing...")
196 }
197 start := time.Now()
198 var compressed []byte
199 switch {
200 case *faster:
201 if *snappy {
202 compressed = s2.EncodeSnappy(nil, b)
203 break
204 }
205 compressed = s2.Encode(nil, b)
206 case *slower:
207 if *snappy {
208 compressed = s2.EncodeSnappyBest(nil, b)
209 break
210 }
211 compressed = s2.EncodeBest(nil, b)
212 default:
213 if *snappy {
214 compressed = s2.EncodeSnappyBetter(nil, b)
215 break
216 }
217 compressed = s2.EncodeBetter(nil, b)
218 }
219 exitErr(err)
220 err = wr.Close()
221 exitErr(err)
222 if !*quiet {
223 input := len(b)
224 elapsed := time.Since(start)
225 mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
226 pct := float64(len(compressed)) * 100 / float64(input)
227 ms := elapsed.Round(time.Millisecond)
228 fmt.Printf(" %d -> %d [%.02f%%]; %v, %.01fMB/s", input, len(compressed), pct, ms, mbpersec)
229 }
230 if *verify {
231 if !*quiet {
232 fmt.Print("\nDecompressing.")
233 }
234 decomp := make([]byte, 0, len(b))
235 start := time.Now()
236 decomp, err = s2.Decode(decomp, compressed)
237 exitErr(err)
238 if len(decomp) != len(b) {
239 exitErr(fmt.Errorf("unexpected size, want %d, got %d", len(b), len(decomp)))
240 }
241 if !*quiet {
242 input := len(b)
243 elapsed := time.Since(start)
244 mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
245 pct := float64(input) * 100 / float64(len(compressed))
246 ms := elapsed.Round(time.Millisecond)
247 fmt.Printf(" %d -> %d [%.02f%%]; %v, %.01fMB/s", len(compressed), len(decomp), pct, ms, mbpersec)
248 }
249 if !bytes.Equal(decomp, b) {
250 exitErr(fmt.Errorf("decompresed data mismatch"))
251 }
252 if !*quiet {
253 fmt.Print("... Verified ok.")
254 }
255 }
256 }
257 if !*quiet {
258 fmt.Println("")
259 }
260 wr.Close()
261 }()
262 continue
263 }
264 func() {
265 if !*quiet {
266 fmt.Print("Reading ", filename, "...")
267 }
268
269 file, size, _ := openFile(filename)
270 b := make([]byte, size)
271 _, err = io.ReadFull(file, b)
272 exitErr(err)
273 file.Close()
274 var buf *bytes.Buffer
275 for i := 0; i < *bench; i++ {
276 w := io.Discard
277
278 if *verify {
279 if buf == nil {
280 buf = bytes.NewBuffer(make([]byte, 0, len(b)+(len(b)>>8)))
281 }
282 buf.Reset()
283 w = buf
284 }
285 wc := wCounter{out: w}
286 if !*quiet {
287 fmt.Print("\nCompressing...")
288 }
289 wr.Reset(&wc)
290 start := time.Now()
291 err := wr.EncodeBuffer(b)
292 exitErr(err)
293 err = wr.Close()
294 exitErr(err)
295 if !*quiet {
296 input := len(b)
297 elapsed := time.Since(start)
298 mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
299 pct := float64(wc.n) * 100 / float64(input)
300 ms := elapsed.Round(time.Millisecond)
301 fmt.Printf(" %d -> %d [%.02f%%]; %v, %.01fMB/s", input, wc.n, pct, ms, mbpersec)
302 }
303 if *verify {
304 if !*quiet {
305 fmt.Print("\nDecompressing.")
306 }
307 start := time.Now()
308 dec.Reset(buf)
309 n, err := dec.DecodeConcurrent(io.Discard, *cpu)
310 exitErr(err)
311 if int(n) != len(b) {
312 exitErr(fmt.Errorf("unexpected size, want %d, got %d", len(b), n))
313 }
314 if !*quiet {
315 input := len(b)
316 elapsed := time.Since(start)
317 mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
318 pct := float64(input) * 100 / float64(wc.n)
319 ms := elapsed.Round(time.Millisecond)
320 fmt.Printf(" %d -> %d [%.02f%%]; %v, %.01fMB/s", wc.n, n, pct, ms, mbpersec)
321 }
322 dec.Reset(nil)
323 }
324 }
325 if !*quiet {
326 fmt.Println("")
327 }
328 wr.Close()
329 }()
330 }
331 os.Exit(0)
332 }
333 ext := s2Ext
334 if *snappy {
335 ext = snappyExt
336 }
337 if *block {
338 ext += ".block"
339 }
340 if *out != "" && len(files) > 1 {
341 exitErr(errors.New("-out parameter can only be used with one input"))
342 }
343 for _, filename := range files {
344 if *block {
345 if *recomp {
346 exitErr(errors.New("cannot recompress blocks (yet)"))
347 }
348 func() {
349 var closeOnce sync.Once
350 dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ext))
351 if *out != "" {
352 dstFilename = *out
353 }
354 if !*quiet {
355 fmt.Print("Compressing ", filename, " -> ", dstFilename)
356 }
357
358 file, _, mode := openFile(filename)
359 exitErr(err)
360 defer closeOnce.Do(func() { file.Close() })
361 inBytes, err := io.ReadAll(file)
362 exitErr(err)
363
364 var out io.Writer
365 switch {
366 case *stdout:
367 out = os.Stdout
368 default:
369 if *safe {
370 _, err := os.Stat(dstFilename)
371 if !os.IsNotExist(err) {
372 exitErr(errors.New("destination file exists"))
373 }
374 }
375 dstFile, err := os.OpenFile(dstFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode)
376 exitErr(err)
377 defer dstFile.Close()
378 out = dstFile
379 }
380 start := time.Now()
381 var compressed []byte
382 switch {
383 case *faster:
384 if *snappy {
385 compressed = s2.EncodeSnappy(nil, inBytes)
386 break
387 }
388 compressed = s2.Encode(nil, inBytes)
389 case *slower:
390 if *snappy {
391 compressed = s2.EncodeSnappyBest(nil, inBytes)
392 break
393 }
394 compressed = s2.EncodeBest(nil, inBytes)
395 default:
396 if *snappy {
397 compressed = s2.EncodeSnappyBetter(nil, inBytes)
398 break
399 }
400 compressed = s2.EncodeBetter(nil, inBytes)
401 }
402 _, err = out.Write(compressed)
403 exitErr(err)
404 if !*quiet {
405 elapsed := time.Since(start)
406 mbpersec := (float64(len(inBytes)) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
407 pct := float64(len(compressed)) * 100 / float64(len(inBytes))
408 fmt.Printf(" %d -> %d [%.02f%%]; %.01fMB/s\n", len(inBytes), len(compressed), pct, mbpersec)
409 }
410 if *verify {
411 got, err := s2.Decode(make([]byte, 0, len(inBytes)), compressed)
412 exitErr(err)
413 if !bytes.Equal(got, inBytes) {
414 exitErr(fmt.Errorf("decoded content mismatch"))
415 }
416 if !*quiet {
417 fmt.Print("... Verified ok.")
418 }
419 }
420 if *remove {
421 closeOnce.Do(func() {
422 file.Close()
423 if !*quiet {
424 fmt.Println("Removing", filename)
425 }
426 err := os.Remove(filename)
427 exitErr(err)
428 })
429 }
430 }()
431 continue
432 }
433 func() {
434 var closeOnce sync.Once
435 outFileBase := filename
436 if *recomp {
437 switch {
438 case strings.HasSuffix(outFileBase, s2Ext):
439 outFileBase = strings.TrimSuffix(outFileBase, s2Ext)
440 case strings.HasSuffix(outFileBase, snappyExt):
441 outFileBase = strings.TrimSuffix(outFileBase, snappyExt)
442 case strings.HasSuffix(outFileBase, ".snappy"):
443 outFileBase = strings.TrimSuffix(outFileBase, ".snappy")
444 }
445 }
446 dstFilename := cleanFileName(fmt.Sprintf("%s%s", outFileBase, ext))
447 if *out != "" {
448 dstFilename = *out
449 }
450 if !*quiet {
451 fmt.Print("Compressing ", filename, " -> ", dstFilename)
452 }
453
454 if dstFilename == filename && !*stdout {
455 if *remove {
456 exitErr(errors.New("cannot remove when input = output"))
457 }
458 renameDst := dstFilename
459 dstFilename = fmt.Sprintf(".tmp-%s%s", time.Now().Format("2006-01-02T15-04-05Z07"), ext)
460 defer func() {
461 exitErr(os.Rename(dstFilename, renameDst))
462 }()
463 }
464
465
466 file, _, mode := openFile(filename)
467 exitErr(err)
468 defer closeOnce.Do(func() { file.Close() })
469 src, err := readahead.NewReaderSize(file, *cpu+1, 1<<20)
470 exitErr(err)
471 defer src.Close()
472 var rc = &rCounter{
473 in: src,
474 }
475 if !*quiet {
476
477 src = rc
478 }
479 if *recomp {
480 dec := s2.NewReader(src)
481 pr, pw := io.Pipe()
482 go func() {
483 _, err := dec.DecodeConcurrent(pw, *cpu)
484 pw.CloseWithError(err)
485 }()
486 src = pr
487 }
488
489 var out io.Writer
490 switch {
491 case *stdout:
492 out = os.Stdout
493 default:
494 if *safe {
495 _, err := os.Stat(dstFilename)
496 if !os.IsNotExist(err) {
497 exitErr(errors.New("destination file exists"))
498 }
499 }
500 dstFile, err := os.OpenFile(dstFilename, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, mode)
501 exitErr(err)
502 defer dstFile.Close()
503 bw := bufio.NewWriterSize(dstFile, int(sz)*2)
504 defer bw.Flush()
505 out = bw
506 }
507 out, errFn := verifyTo(out)
508 wc := wCounter{out: out}
509 wr.Reset(&wc)
510 defer wr.Close()
511 start := time.Now()
512 _, err = wr.ReadFrom(src)
513 exitErr(err)
514 err = wr.Close()
515 exitErr(err)
516 if !*quiet {
517 input := rc.n
518 elapsed := time.Since(start)
519 mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
520 pct := float64(wc.n) * 100 / float64(input)
521 fmt.Printf(" %d -> %d [%.02f%%]; %.01fMB/s\n", input, wc.n, pct, mbpersec)
522 }
523 exitErr(errFn())
524 if *remove {
525 closeOnce.Do(func() {
526 file.Close()
527 if !*quiet {
528 fmt.Println("Removing", filename)
529 }
530 err := os.Remove(filename)
531 exitErr(err)
532 })
533 }
534 }()
535 }
536 }
537
538 func isHTTP(name string) bool {
539 return strings.HasPrefix(name, "http://") || strings.HasPrefix(name, "https://")
540 }
541
542 func openFile(name string) (rc io.ReadCloser, size int64, mode os.FileMode) {
543 if isHTTP(name) {
544 resp, err := http.Get(name)
545 exitErr(err)
546 if resp.StatusCode != http.StatusOK {
547 exitErr(fmt.Errorf("unexpected response status code %v, want OK", resp.Status))
548 }
549 return resp.Body, resp.ContentLength, os.ModePerm
550 }
551 file, err := os.Open(name)
552 exitErr(err)
553 st, err := file.Stat()
554 exitErr(err)
555 return file, st.Size(), st.Mode()
556 }
557
558 func cleanFileName(s string) string {
559 if isHTTP(s) {
560 s = strings.TrimPrefix(s, "http://")
561 s = strings.TrimPrefix(s, "https://")
562 s = strings.Map(func(r rune) rune {
563 switch r {
564 case '\\', '/', '*', '?', ':', '|', '<', '>', '~':
565 return '_'
566 }
567 if r < 20 {
568 return '_'
569 }
570 return r
571 }, s)
572 }
573 return s
574 }
575
576 func verifyTo(w io.Writer) (io.Writer, func() error) {
577 if !*verify {
578 return w, func() error {
579 return nil
580 }
581 }
582 pr, pw := io.Pipe()
583 writer := io.MultiWriter(w, pw)
584 var wg sync.WaitGroup
585 var err error
586 wg.Add(1)
587 go func() {
588 defer wg.Done()
589 r := s2.NewReader(pr)
590 _, err = r.DecodeConcurrent(io.Discard, *cpu)
591 pr.CloseWithError(fmt.Errorf("verify: %w", err))
592 }()
593 return writer, func() error {
594 pw.Close()
595 wg.Wait()
596 if err == nil {
597 if !*quiet {
598 fmt.Print("... Verified ok.")
599 }
600 }
601 return err
602 }
603 }
604
605 func printErr(err error) {
606 if err != nil {
607 fmt.Fprintln(os.Stderr, "\nERROR:", err.Error())
608 }
609 }
610
611 func exitErr(err error) {
612 if err != nil {
613 fmt.Fprintln(os.Stderr, "\nERROR:", err.Error())
614 os.Exit(2)
615 }
616 }
617
618
619 func toSize(size string) (int, error) {
620 size = strings.ToUpper(strings.TrimSpace(size))
621 firstLetter := strings.IndexFunc(size, unicode.IsLetter)
622 if firstLetter == -1 {
623 firstLetter = len(size)
624 }
625
626 bytesString, multiple := size[:firstLetter], size[firstLetter:]
627 sz, err := strconv.Atoi(bytesString)
628 if err != nil {
629 return 0, fmt.Errorf("unable to parse size: %v", err)
630 }
631
632 switch multiple {
633 case "M", "MB", "MIB":
634 return sz * 1 << 20, nil
635 case "K", "KB", "KIB":
636 return sz * 1 << 10, nil
637 case "B", "":
638 return sz, nil
639 default:
640 return 0, fmt.Errorf("unknown size suffix: %v", multiple)
641 }
642 }
643
644 type wCounter struct {
645 n int
646 out io.Writer
647 }
648
649 func (w *wCounter) Write(p []byte) (n int, err error) {
650 n, err = w.out.Write(p)
651 w.n += n
652 return n, err
653
654 }
655
656 type rCounter struct {
657 n int64
658 in io.Reader
659 }
660
661 func (w *rCounter) Read(p []byte) (n int, err error) {
662 n, err = w.in.Read(p)
663 w.n += int64(n)
664 return n, err
665 }
666
667 func (w *rCounter) Close() (err error) {
668 return nil
669 }
670
View as plain text