1 package migrate 2 3 import ( 4 "bufio" 5 "fmt" 6 "io" 7 "time" 8 ) 9 10 // DefaultBufferSize sets the in memory buffer size (in Bytes) for every 11 // pre-read migration (see DefaultPrefetchMigrations). 12 var DefaultBufferSize = uint(100000) 13 14 // Migration holds information about a migration. 15 // It is initially created from data coming from the source and then 16 // used when run against the database. 17 type Migration struct { 18 // Identifier can be any string to help identifying 19 // the migration in the source. 20 Identifier string 21 22 // Version is the version of this migration. 23 Version uint 24 25 // TargetVersion is the migration version after this migration 26 // has been applied to the database. 27 // Can be -1, implying that this is a NilVersion. 28 TargetVersion int 29 30 // Body holds an io.ReadCloser to the source. 31 Body io.ReadCloser 32 33 // BufferedBody holds an buffered io.Reader to the underlying Body. 34 BufferedBody io.Reader 35 36 // BufferSize defaults to DefaultBufferSize 37 BufferSize uint 38 39 // bufferWriter holds an io.WriteCloser and pipes to BufferBody. 40 // It's an *Closer for flow control. 41 bufferWriter io.WriteCloser 42 43 // Scheduled is the time when the migration was scheduled/ queued. 44 Scheduled time.Time 45 46 // StartedBuffering is the time when buffering of the migration source started. 47 StartedBuffering time.Time 48 49 // FinishedBuffering is the time when buffering of the migration source finished. 50 FinishedBuffering time.Time 51 52 // FinishedReading is the time when the migration source is fully read. 53 FinishedReading time.Time 54 55 // BytesRead holds the number of Bytes read from the migration source. 56 BytesRead int64 57 } 58 59 // NewMigration returns a new Migration and sets the body, identifier, 60 // version and targetVersion. Body can be nil, which turns this migration 61 // into a "NilMigration". If no identifier is provided, it will default to "<empty>". 62 // targetVersion can be -1, implying it is a NilVersion. 63 // 64 // What is a NilMigration? 65 // Usually each migration version coming from source is expected to have an 66 // Up and Down migration. This is not a hard requirement though, leading to 67 // a situation where only the Up or Down migration is present. So let's say 68 // the user wants to migrate up to a version that doesn't have the actual Up 69 // migration, in that case we still want to apply the version, but with an empty 70 // body. We are calling that a NilMigration, a migration with an empty body. 71 // 72 // What is a NilVersion? 73 // NilVersion is a const(-1). When running down migrations and we are at the 74 // last down migration, there is no next down migration, the targetVersion should 75 // be nil. Nil in this case is represented by -1 (because type int). 76 func NewMigration(body io.ReadCloser, identifier string, 77 version uint, targetVersion int) (*Migration, error) { 78 tnow := time.Now() 79 m := &Migration{ 80 Identifier: identifier, 81 Version: version, 82 TargetVersion: targetVersion, 83 Scheduled: tnow, 84 } 85 86 if body == nil { 87 if len(identifier) == 0 { 88 m.Identifier = "<empty>" 89 } 90 91 m.StartedBuffering = tnow 92 m.FinishedBuffering = tnow 93 m.FinishedReading = tnow 94 return m, nil 95 } 96 97 br, bw := io.Pipe() 98 m.Body = body // want to simulate low latency? newSlowReader(body) 99 m.BufferSize = DefaultBufferSize 100 m.BufferedBody = br 101 m.bufferWriter = bw 102 return m, nil 103 } 104 105 // String implements string.Stringer and is used in tests. 106 func (m *Migration) String() string { 107 return fmt.Sprintf("%v [%v=>%v]", m.Identifier, m.Version, m.TargetVersion) 108 } 109 110 // LogString returns a string describing this migration to humans. 111 func (m *Migration) LogString() string { 112 directionStr := "u" 113 if m.TargetVersion < int(m.Version) { 114 directionStr = "d" 115 } 116 return fmt.Sprintf("%v/%v %v", m.Version, directionStr, m.Identifier) 117 } 118 119 // Buffer buffers Body up to BufferSize. 120 // Calling this function blocks. Call with goroutine. 121 func (m *Migration) Buffer() error { 122 if m.Body == nil { 123 return nil 124 } 125 126 m.StartedBuffering = time.Now() 127 128 b := bufio.NewReaderSize(m.Body, int(m.BufferSize)) 129 130 // start reading from body, peek won't move the read pointer though 131 // poor man's solution? 132 if _, err := b.Peek(int(m.BufferSize)); err != nil && err != io.EOF { 133 return err 134 } 135 136 m.FinishedBuffering = time.Now() 137 138 // write to bufferWriter, this will block until 139 // something starts reading from m.Buffer 140 n, err := b.WriteTo(m.bufferWriter) 141 if err != nil { 142 return err 143 } 144 145 m.FinishedReading = time.Now() 146 m.BytesRead = n 147 148 // close bufferWriter so Buffer knows that there is no 149 // more data coming 150 if err := m.bufferWriter.Close(); err != nil { 151 return err 152 } 153 154 // it's safe to close the Body too 155 if err := m.Body.Close(); err != nil { 156 return err 157 } 158 159 return nil 160 } 161