diff --git a/replication/changesets.go b/replication/changesets.go deleted file mode 100644 index 4480aa5..0000000 --- a/replication/changesets.go +++ /dev/null @@ -1,214 +0,0 @@ -package replication - -import ( - "bytes" - "compress/gzip" - "context" - "fmt" - "io" - "io/ioutil" - "net/http" - "strconv" - - "github.com/onXmaps/osm" - "github.com/onXmaps/osm/osmxml" -) - -// ChangesetSeqNum indicates the sequence of the changeset replication found here: -// https://planet.osm.org/replication/changesets/ -type ChangesetSeqNum uint64 - -// String returns 'changeset/%d'. -func (n ChangesetSeqNum) String() string { - return fmt.Sprintf("changeset/%d", n) -} - -// Dir returns the directory of this data on planet osm. -func (n ChangesetSeqNum) Dir() string { - return "changesets" -} - -// Uint64 returns the seq num as a uint64 type. -func (n ChangesetSeqNum) Uint64() uint64 { - return uint64(n) -} - -// CurrentChangesetState returns the current state of the changeset replication. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func CurrentChangesetState(ctx context.Context) (ChangesetSeqNum, *State, error) { - return DefaultDatasource.CurrentChangesetState(ctx) -} - -// CurrentChangesetState returns the current state of the changeset replication. -func (ds *Datasource) CurrentChangesetState(ctx context.Context) (ChangesetSeqNum, *State, error) { - s, err := ds.fetchChangesetState(ctx, 0) - if err != nil { - return 0, nil, err - } - - return ChangesetSeqNum(s.SeqNum), s, err -} - -// ChangesetState returns the state for the given changeset replication. -// There are no state files before 2007990. In that case a 404 error is returned. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func ChangesetState(ctx context.Context, n ChangesetSeqNum) (*State, error) { - return DefaultDatasource.ChangesetState(ctx, n) -} - -// ChangesetState returns the state for the given changeset replication. -// There are no state files before 2007990. In that case a 404 error is returned. -func (ds *Datasource) ChangesetState(ctx context.Context, n ChangesetSeqNum) (*State, error) { - return ds.fetchChangesetState(ctx, n) -} - -func (ds *Datasource) fetchChangesetState(ctx context.Context, n ChangesetSeqNum) (*State, error) { - var url string - if n.Uint64() != 0 { - url = ds.baseChangesetURL(n) + ".state.txt" - } else { - url = fmt.Sprintf("%s/replication/%s/state.yaml", ds.baseURL(), n.Dir()) - } - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, err - } - - resp, err := ds.client().Do(req) - if err != nil { - return nil, err - } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - return nil, &UnexpectedStatusCodeError{ - Code: resp.StatusCode, - URL: url, - } - } - - data, err := ioutil.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - s, err := decodeChangesetState(data) - if err != nil { - return nil, err - } - - // starting at 2008004 the changeset sequence number in the state file is - // one less than the name of the file. This is a consistent mistake. - // The correctly paired state and data files have the same name. The number - // in the state file is the one that is off. - if n == 0 { - s.SeqNum++ - } else { - s.SeqNum = uint64(n) - } - - return s, nil -} - -func decodeChangesetState(data []byte) (*State, error) { - // example - // --- - // last_run: 2016-07-02 22:46:01.422137422 +00:00 (or Z) - // sequence: 1912325 - - lines := bytes.Split(data, []byte("\n")) - - parts := bytes.Split(lines[1], []byte(":")) - timeString := string(bytes.TrimSpace(bytes.Join(parts[1:], []byte(":")))) - - t, err := decodeTime(timeString) - if err != nil { - return nil, err - } - - parts = bytes.Split(lines[2], []byte(":")) - n, err := strconv.ParseUint(string(bytes.TrimSpace(parts[1])), 10, 64) - if err != nil { - return nil, err - } - - return &State{ - SeqNum: n, - Timestamp: t, - }, nil -} - -// Changesets returns the complete list of changesets for the given replication sequence. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func Changesets(ctx context.Context, n ChangesetSeqNum) (osm.Changesets, error) { - return DefaultDatasource.Changesets(ctx, n) -} - -// Changesets returns the complete list of changesets in for the given replication sequence. -func (ds *Datasource) Changesets(ctx context.Context, n ChangesetSeqNum) (osm.Changesets, error) { - r, err := ds.changesetReader(ctx, n) - if err != nil { - return nil, err - } - defer r.Close() - - return changesetDecoder(ctx, r) -} - -func changesetDecoder(ctx context.Context, r io.Reader) (osm.Changesets, error) { - gzReader, err := gzip.NewReader(r) - if err != nil { - return nil, err - } - defer gzReader.Close() - - var changesets []*osm.Changeset - scanner := osmxml.New(ctx, gzReader) - for scanner.Scan() { - o := scanner.Object() - c, ok := o.(*osm.Changeset) - if !ok { - return nil, fmt.Errorf("osm replication: object not a changeset: %[1]T: %[1]v", o) - } - changesets = append(changesets, c) - } - - return changesets, scanner.Err() -} - -// changesetReader will return a ReadCloser with the data from the changeset. -// It will be gzip compressed, so the caller must decompress. -// It is the caller's responsibility to call Close on the Reader when done. -func (ds *Datasource) changesetReader(ctx context.Context, n ChangesetSeqNum) (io.ReadCloser, error) { - url := ds.baseChangesetURL(n) + ".osm.gz" - req, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, err - } - - resp, err := ds.client().Do(req.WithContext(ctx)) - if err != nil { - return nil, err - } - - if resp.StatusCode != 200 { - resp.Body.Close() - return nil, &UnexpectedStatusCodeError{ - Code: resp.StatusCode, - URL: url, - } - } - - return resp.Body, nil -} - -func (ds *Datasource) baseChangesetURL(cn ChangesetSeqNum) string { - n := cn.Uint64() - return fmt.Sprintf("%s/replication/%s/%03d/%03d/%03d", - ds.baseURL(), - cn.Dir(), - n/1000000, - (n%1000000)/1000, - n%1000) -} diff --git a/replication/changesets_test.go b/replication/changesets_test.go deleted file mode 100644 index 8ce1716..0000000 --- a/replication/changesets_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package replication - -import ( - "bytes" - "compress/gzip" - "context" - "testing" - "time" -) - -func TestDecodeChangesetState(t *testing.T) { - data := []byte(`--- -last_run: 2016-07-02 22:46:01.422137422 Z -sequence: 1912325 -`) - - state, err := decodeChangesetState(data) - if v := ChangesetSeqNum(state.SeqNum); v != 1912325 { - t.Errorf("incorrect sequence number, got %v", v) - } - - if !state.Timestamp.Equal(time.Date(2016, 7, 2, 22, 46, 1, 422137422, time.UTC)) { - t.Errorf("incorrect time, got %v", state.Timestamp) - } - - if err != nil { - t.Errorf("got error: %v", err) - } -} - -func TestChangesetDecoder(t *testing.T) { - ctx := context.Background() - - buf := bytes.NewBuffer(nil) - w := gzip.NewWriter(buf) - w.Write([]byte(` - - - - - - - - - - - - - - -`)) - w.Close() - - changesets, err := changesetDecoder(ctx, buf) - if err != nil { - t.Fatalf("decode error: %v", err) - } - - if len(changesets) != 2 { - t.Errorf("incorrect number of changes: %d", len(changesets)) - } -} - -func TestBaseChangesetURL(t *testing.T) { - url := DefaultDatasource.baseChangesetURL(123456789) - if url != "https://planet.osm.org/replication/changesets/123/456/789" { - t.Errorf("incorrect url, got %v", url) - } -} diff --git a/replication/datasource.go b/replication/datasource.go index d32b9f4..90d103c 100644 --- a/replication/datasource.go +++ b/replication/datasource.go @@ -6,8 +6,8 @@ import ( "time" ) -// BaseURL defines the default planet server to hit. -const BaseURL = "https://planet.osm.org" +// BaseURL defines the default replication server to hit. +const BaseURL = "https://planet.osm.org/replication/minute" // Datasource defines context around replication data requests. type Datasource struct { @@ -17,36 +17,26 @@ type Datasource struct { // DefaultDatasource is the Datasource used by the package level convenience functions. var DefaultDatasource = &Datasource{ + BaseURL: BaseURL, Client: &http.Client{ Timeout: 30 * time.Minute, }, } // NewDatasource creates a Datasource using the given client. -func NewDatasource(client *http.Client) *Datasource { +func NewDatasource(baseURL string, client *http.Client) *Datasource { return &Datasource{ - Client: client, + BaseURL: baseURL, + Client: client, } } func (ds Datasource) baseURL() string { - if ds.BaseURL != "" { - return ds.BaseURL - } - - return BaseURL + return ds.BaseURL } func (ds Datasource) client() *http.Client { - if ds.Client != nil { - return ds.Client - } - - if DefaultDatasource.Client != nil { - return DefaultDatasource.Client - } - - return http.DefaultClient + return ds.Client } // UnexpectedStatusCodeError is return for a non 200 or 404 status code. diff --git a/replication/interval.go b/replication/interval.go index c10d7cb..3a9e64c 100644 --- a/replication/interval.go +++ b/replication/interval.go @@ -14,22 +14,6 @@ import ( "github.com/onXmaps/osm" ) -var _ SeqNum = MinuteSeqNum(0) -var _ SeqNum = HourSeqNum(0) -var _ SeqNum = DaySeqNum(0) - -// MinuteSeqStart is the beginning of valid minutely sequence data. -// The few before look to be way more than a minute. -// A quick looks says about 75, 57, 17 for 1, 2, 3 respectively. -const MinuteSeqStart = MinuteSeqNum(4) - -// HourSeqStart is the beginning of valid hour sequence data. -// Without deep inspection it looks like 1-10 are from July 2013. -const HourSeqStart = HourSeqNum(11) - -// DaySeqStart is the beginning of valid day sequence data. -const DaySeqStart = DaySeqNum(1) - // State returns information about the current replication state. type State struct { SeqNum uint64 `json:"seq_num"` @@ -38,174 +22,44 @@ type State struct { TxnMaxQueried int `json:"txn_max_queries,omitempty"` } -// SeqNum is an interface type that includes MinuteSeqNum, -// HourSeqNum and DaySeqNum. This is an experiment to implement -// a sum type, a type that can be one of several things only. -type SeqNum interface { - fmt.Stringer - Dir() string - Uint64() uint64 - private() -} - -func (n MinuteSeqNum) private() {} -func (n HourSeqNum) private() {} -func (n DaySeqNum) private() {} - -var _ = SeqNum(MinuteSeqNum(0)).private // for the linters - -// MinuteSeqNum indicates the sequence of the minutely diff replication found here: -// http://planet.osm.org/replication/minute -type MinuteSeqNum uint64 - -// String returns 'minute/%d'. -func (n MinuteSeqNum) String() string { - return fmt.Sprintf("minute/%d", n) -} - -// Dir returns the directory of this data on planet osm. -func (n MinuteSeqNum) Dir() string { - return "minute" -} - -// Uint64 returns the seq num as a uint64 type. -func (n MinuteSeqNum) Uint64() uint64 { - return uint64(n) -} - -// HourSeqNum indicates the sequence of the hourly diff replication found here: -// http://planet.osm.org/replication/hour -type HourSeqNum uint64 - -// String returns 'hour/%d'. -func (n HourSeqNum) String() string { - return fmt.Sprintf("hour/%d", n) -} - -// Dir returns the directory of this data on planet osm. -func (n HourSeqNum) Dir() string { - return "hour" -} - -// Uint64 returns the seq num as a uint64 type. -func (n HourSeqNum) Uint64() uint64 { - return uint64(n) -} - -// DaySeqNum indicates the sequence of the daily diff replication found here: -// http://planet.osm.org/replication/day -type DaySeqNum uint64 - -// String returns 'day/%d'. -func (n DaySeqNum) String() string { - return fmt.Sprintf("day/%d", n) -} - -// Dir returns the directory of this data on planet osm. -func (n DaySeqNum) Dir() string { - return "day" -} - -// Uint64 returns the seq num as a uint64 type. -func (n DaySeqNum) Uint64() uint64 { - return uint64(n) -} - -// CurrentMinuteState returns the current state of the minutely replication. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func CurrentMinuteState(ctx context.Context) (MinuteSeqNum, *State, error) { - return DefaultDatasource.CurrentMinuteState(ctx) -} - -// CurrentMinuteState returns the current state of the minutely replication. -func (ds *Datasource) CurrentMinuteState(ctx context.Context) (MinuteSeqNum, *State, error) { - s, err := ds.MinuteState(ctx, 0) +// CurrentState returns the current state of the replication. +func (ds *Datasource) CurrentState(ctx context.Context) (uint64, *State, error) { + s, err := ds.State(ctx, 0) if err != nil { return 0, nil, err } - return MinuteSeqNum(s.SeqNum), s, err + return s.SeqNum, s, err } -// MinuteState returns the state of the given minutely replication. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func MinuteState(ctx context.Context, n MinuteSeqNum) (*State, error) { - return DefaultDatasource.MinuteState(ctx, n) +// CurrentState returns the current state of the replication. +func CurrentState(ctx context.Context) (uint64, *State, error) { + return DefaultDatasource.CurrentState(ctx) } -// MinuteState returns the state of the given minutely replication. -func (ds *Datasource) MinuteState(ctx context.Context, n MinuteSeqNum) (*State, error) { - return ds.fetchState(ctx, n) +// State returns the state of the given replication. +func (ds *Datasource) State(ctx context.Context, seqNum uint64) (*State, error) { + return ds.fetchState(ctx, seqNum) } -// CurrentHourState returns the current state of the hourly replication. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func CurrentHourState(ctx context.Context) (HourSeqNum, *State, error) { - return DefaultDatasource.CurrentHourState(ctx) +func GetState(ctx context.Context, seqNum uint64) (*State, error) { + return DefaultDatasource.State(ctx, seqNum) } -// CurrentHourState returns the current state of the hourly replication. -func (ds *Datasource) CurrentHourState(ctx context.Context) (HourSeqNum, *State, error) { - s, err := ds.HourState(ctx, 0) - if err != nil { - return 0, nil, err - } - - return HourSeqNum(s.SeqNum), s, err -} - -// HourState returns the state of the given hourly replication. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func HourState(ctx context.Context, n HourSeqNum) (*State, error) { - return DefaultDatasource.HourState(ctx, n) -} - -// HourState returns the state of the given hourly replication. -func (ds *Datasource) HourState(ctx context.Context, n HourSeqNum) (*State, error) { - return ds.fetchState(ctx, n) -} - -// CurrentDayState returns the current state of the daily replication. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func CurrentDayState(ctx context.Context) (DaySeqNum, *State, error) { - return DefaultDatasource.CurrentDayState(ctx) -} - -// CurrentDayState returns the current state of the daily replication. -func (ds *Datasource) CurrentDayState(ctx context.Context) (DaySeqNum, *State, error) { - s, err := ds.DayState(ctx, 0) - if err != nil { - return 0, nil, err - } - - return DaySeqNum(s.SeqNum), s, err -} - -// DayState returns the state of the given daily replication. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func DayState(ctx context.Context, n DaySeqNum) (*State, error) { - return DefaultDatasource.DayState(ctx, n) -} - -// DayState returns the state of the given daily replication. -func (ds *Datasource) DayState(ctx context.Context, n DaySeqNum) (*State, error) { - return ds.fetchState(ctx, n) -} - -func (ds *Datasource) fetchState(ctx context.Context, n SeqNum) (*State, error) { +func (ds *Datasource) fetchState(ctx context.Context, seqNum uint64) (*State, error) { var url string - if n.Uint64() != 0 { - url = ds.baseSeqURL(n) + ".state.txt" + if seqNum != 0 { + url = ds.baseSeqURL(seqNum) + ".state.txt" } else { - url = fmt.Sprintf("%s/%s/state.txt", ds.baseURL(), n.Dir()) + url = fmt.Sprintf("%s/state.txt", ds.baseURL()) } - req, err := http.NewRequest(http.MethodGet, url, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { return nil, err } - resp, err := ds.client().Do(req.WithContext(ctx)) + resp, err := ds.client().Do(req) if err != nil { return nil, err } @@ -276,37 +130,14 @@ func decodeIntervalState(data []byte) (*State, error) { return state, nil } -// Minute returns the change diff for a given minute. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func Minute(ctx context.Context, n MinuteSeqNum) (*osm.Change, error) { - return DefaultDatasource.Minute(ctx, n) -} - -// Minute returns the change diff for a given minute. -func (ds *Datasource) Minute(ctx context.Context, n MinuteSeqNum) (*osm.Change, error) { - return ds.fetchIntervalData(ctx, ds.changeURL(n)) -} - -// Hour returns the change diff for a given hour. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func Hour(ctx context.Context, n HourSeqNum) (*osm.Change, error) { - return DefaultDatasource.Hour(ctx, n) -} - -// Hour returns the change diff for a given hour. -func (ds *Datasource) Hour(ctx context.Context, n HourSeqNum) (*osm.Change, error) { - return ds.fetchIntervalData(ctx, ds.changeURL(n)) -} - -// Day returns the change diff for a given day. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func Day(ctx context.Context, n DaySeqNum) (*osm.Change, error) { - return DefaultDatasource.Day(ctx, n) +// GetDiff returns the change diff for the given replication sequence number. +func (ds *Datasource) GetDiff(ctx context.Context, seqNum uint64) (*osm.Change, error) { + return ds.fetchIntervalData(ctx, ds.changeURL(seqNum)) } -// Day returns the change diff for a given day. -func (ds *Datasource) Day(ctx context.Context, n DaySeqNum) (*osm.Change, error) { - return ds.fetchIntervalData(ctx, ds.changeURL(n)) +// GetDiff returns the change diff for the given replication sequence number. +func GetDiff(ctx context.Context, seqNum uint64) (*osm.Change, error) { + return DefaultDatasource.GetDiff(ctx, seqNum) } func (ds *Datasource) fetchIntervalData(ctx context.Context, url string) (*osm.Change, error) { @@ -339,16 +170,14 @@ func (ds *Datasource) fetchIntervalData(ctx context.Context, url string) (*osm.C return change, err } -func (ds *Datasource) changeURL(n SeqNum) string { - return ds.baseSeqURL(n) + ".osc.gz" +func (ds *Datasource) changeURL(seqNum uint64) string { + return ds.baseSeqURL(seqNum) + ".osc.gz" } -func (ds *Datasource) baseSeqURL(sn SeqNum) string { - n := sn.Uint64() - return fmt.Sprintf("%s/%s/%03d/%03d/%03d", +func (ds *Datasource) baseSeqURL(seqNum uint64) string { + return fmt.Sprintf("%s/%03d/%03d/%03d", ds.baseURL(), - sn.Dir(), - n/1000000, - (n%1000000)/1000, - n%1000) + seqNum/1000000, + (seqNum%1000000)/1000, + seqNum%1000) } diff --git a/replication/interval_test.go b/replication/interval_test.go index c9402ea..be7ef57 100644 --- a/replication/interval_test.go +++ b/replication/interval_test.go @@ -16,7 +16,7 @@ txnActiveList=836441203 `) state, err := decodeIntervalState(data) - if v := MinuteSeqNum(state.SeqNum); v != 2010594 { + if v := state.SeqNum; v != 2010594 { t.Errorf("incorrect id, got %v", v) } diff --git a/replication/live_test.go b/replication/live_test.go index 97ab488..3805b44 100644 --- a/replication/live_test.go +++ b/replication/live_test.go @@ -16,17 +16,7 @@ func TestCurrentState(t *testing.T) { liveOnly(t) ctx := context.Background() - _, _, err := CurrentMinuteState(ctx) - if err != nil { - t.Fatalf("request error: %v", err) - } - - _, _, err = CurrentHourState(ctx) - if err != nil { - t.Fatalf("request error: %v", err) - } - - _, _, err = CurrentDayState(ctx) + _, _, err := CurrentState(ctx) if err != nil { t.Fatalf("request error: %v", err) } @@ -36,73 +26,8 @@ func TestDownloadChanges(t *testing.T) { liveOnly(t) ctx := context.Background() - _, err := Minute(ctx, 10) - if err != nil { - t.Fatalf("request error: %v", err) - } - - _, err = Hour(ctx, 10) - if err != nil { - t.Fatalf("request error: %v", err) - } - - _, err = Day(ctx, 1) - if err != nil { - t.Fatalf("request error: %v", err) - } -} - -func TestCurrentChangesetState(t *testing.T) { - liveOnly(t) - - ctx := context.Background() - _, _, err := CurrentChangesetState(ctx) - if err != nil { - t.Fatalf("request error: %v", err) - } -} - -func TestChangesets(t *testing.T) { - liveOnly(t) - - ctx := context.Background() - sets, err := Changesets(ctx, 100) - if err != nil { - t.Fatalf("request error: %v", err) - } - - if l := len(sets); l != 12 { - t.Errorf("incorrect number of changesets: %v", l) - } -} - -func TestChangesetState(t *testing.T) { - liveOnly(t) - - ctx := context.Background() - state, err := ChangesetState(ctx, 5001990) - if err != nil { - t.Fatalf("request error: %v", err) - } - - if state.SeqNum != 5001990 { - t.Errorf("incorrect state: %+v", state) - } - - // current state - n, state, err := CurrentChangesetState(ctx) + _, err := GetDiff(ctx, 10) if err != nil { t.Fatalf("request error: %v", err) } - - changes, err := Changesets(ctx, n) - if err != nil { - t.Fatalf("request error: %v", err) - } - - for _, c := range changes { - if c.CreatedAt.After(state.Timestamp) { - t.Errorf("data is after the state file?") - } - } } diff --git a/replication/search.go b/replication/search.go index c5abcd9..5ad60cf 100644 --- a/replication/search.go +++ b/replication/search.go @@ -7,12 +7,7 @@ import ( // the valid minimum state number on planet.osm.org const ( - minMinute = 1 // up to 2012-09-12T08:15:45Z - minHour = 1 // up to 2013-07-14T12:00:00Z - minDay = 1 // up to 2012-09-13T00:00:00Z - - // There are changes before this, but no state. - minChangeset = 2007990 // 2016-09-07 10:45:02.148547780 Z + minSeqNum = 1 // up to 2012-09-12T08:15:45Z ) type stater struct { @@ -21,127 +16,7 @@ type stater struct { State func(context.Context, uint64) (*State, error) } -// MinuteStateAt will return the replication state/sequence number that contains -// data for the given timestamp. This would be the first replication state written -// after the timestamp. If the timestamp is after all current replication state -// the most recent will be returned. The caller can check for this case using -// state.Before(givenTimestamp). -// -// This call can do 20+ requests to the binary search the replication states. -// Use sparingly or use a mirror. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func MinuteStateAt(ctx context.Context, timestamp time.Time) (MinuteSeqNum, *State, error) { - return DefaultDatasource.MinuteStateAt(ctx, timestamp) -} - -// MinuteStateAt will return the replication state/sequence number that contains -// data for the given timestamp. This would be the first replication state written -// after the timestamp. If the timestamp is after all current replication state -// the most recent will be returned. The caller can check for this case using -// state.Before(givenTimestamp). -// -// This call can do 20+ requests to the binary search the replication states. -// Use sparingly or use a mirror. -func (ds *Datasource) MinuteStateAt(ctx context.Context, timestamp time.Time) (MinuteSeqNum, *State, error) { - s := &stater{ - Min: minMinute, - Current: func(ctx context.Context) (*State, error) { - _, s, err := ds.CurrentMinuteState(ctx) - return s, err - }, - State: func(ctx context.Context, n uint64) (*State, error) { - return ds.MinuteState(ctx, MinuteSeqNum(n)) - }, - } - state, err := searchTimestamp(ctx, s, timestamp) - if err != nil { - return 0, nil, err - } - - return MinuteSeqNum(state.SeqNum), state, nil -} - -// HourStateAt will return the replication state/sequence number that contains -// data for the given timestamp. This would be the first replication state written -// after the timestamp. If the timestamp is after all current replication state -// the most recent will be returned. The caller can check for this case using -// state.Before(givenTimestamp). -// -// This call can do 20+ requests to the binary search the replication states. -// Use sparingly or use a mirror. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func HourStateAt(ctx context.Context, timestamp time.Time) (HourSeqNum, *State, error) { - return DefaultDatasource.HourStateAt(ctx, timestamp) -} - -// HourStateAt will return the replication state/sequence number that contains -// data for the given timestamp. This would be the first replication state written -// after the timestamp. If the timestamp is after all current replication state -// the most recent will be returned. The caller can check for this case using -// state.Before(givenTimestamp). -// -// This call can do 20+ requests to the binary search the replication states. -// Use sparingly or use a mirror. -func (ds *Datasource) HourStateAt(ctx context.Context, timestamp time.Time) (HourSeqNum, *State, error) { - s := &stater{ - Min: minHour, - Current: func(ctx context.Context) (*State, error) { - _, s, err := ds.CurrentHourState(ctx) - return s, err - }, - State: func(ctx context.Context, n uint64) (*State, error) { - return ds.HourState(ctx, HourSeqNum(n)) - }, - } - state, err := searchTimestamp(ctx, s, timestamp) - if err != nil { - return 0, nil, err - } - - return HourSeqNum(state.SeqNum), state, nil -} - -// DayStateAt will return the replication state/sequence number that contains -// data for the given timestamp. This would be the first replication state written -// after the timestamp. If the timestamp is after all current replication state -// the most recent will be returned. The caller can check for this case using -// state.Before(givenTimestamp). -// -// This call can do 20+ requests to the binary search the replication states. -// Use sparingly or use a mirror. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func DayStateAt(ctx context.Context, timestamp time.Time) (DaySeqNum, *State, error) { - return DefaultDatasource.DayStateAt(ctx, timestamp) -} - -// DayStateAt will return the replication state/sequence number that contains -// data for the given timestamp. This would be the first replication state written -// after the timestamp. If the timestamp is after all current replication state -// the most recent will be returned. The caller can check for this case using -// state.Before(givenTimestamp). -// -// This call can do 20+ requests to the binary search the replication states. -// Use sparingly or use a mirror. -func (ds *Datasource) DayStateAt(ctx context.Context, timestamp time.Time) (DaySeqNum, *State, error) { - s := &stater{ - Min: minDay, - Current: func(ctx context.Context) (*State, error) { - _, s, err := ds.CurrentDayState(ctx) - return s, err - }, - State: func(ctx context.Context, n uint64) (*State, error) { - return ds.DayState(ctx, DaySeqNum(n)) - }, - } - state, err := searchTimestamp(ctx, s, timestamp) - if err != nil { - return 0, nil, err - } - - return DaySeqNum(state.SeqNum), state, nil -} - -// ChangesetStateAt will return the replication state/sequence number that contains +// StateAt will return the replication state/sequence number that contains // data for the given timestamp. This would be the first replication state written // after the timestamp. If the timestamp is after all current replication state // the most recent will be returned. The caller can check for this case using @@ -149,12 +24,11 @@ func (ds *Datasource) DayStateAt(ctx context.Context, timestamp time.Time) (DayS // // This call can do 20+ requests to the binary search the replication states. // Use sparingly or use a mirror. -// Delegates to the DefaultDatasource and uses its http.Client to make the request. -func ChangesetStateAt(ctx context.Context, timestamp time.Time) (ChangesetSeqNum, *State, error) { - return DefaultDatasource.ChangesetStateAt(ctx, timestamp) +func StateAt(ctx context.Context, timestamp time.Time) (uint64, *State, error) { + return DefaultDatasource.StateAt(ctx, timestamp) } -// ChangesetStateAt will return the replication state/sequence number that contains +// StateAt will return the replication state/sequence number that contains // data for the given timestamp. This would be the first replication state written // after the timestamp. If the timestamp is after all current replication state // the most recent will be returned. The caller can check for this case using @@ -162,15 +36,15 @@ func ChangesetStateAt(ctx context.Context, timestamp time.Time) (ChangesetSeqNum // // This call can do 20+ requests to the binary search the replication states. // Use sparingly or use a mirror. -func (ds *Datasource) ChangesetStateAt(ctx context.Context, timestamp time.Time) (ChangesetSeqNum, *State, error) { +func (ds *Datasource) StateAt(ctx context.Context, timestamp time.Time) (uint64, *State, error) { s := &stater{ - Min: minDay, + Min: minSeqNum, Current: func(ctx context.Context) (*State, error) { - _, s, err := ds.CurrentChangesetState(ctx) + _, s, err := ds.CurrentState(ctx) return s, err }, - State: func(ctx context.Context, n uint64) (*State, error) { - return ds.ChangesetState(ctx, ChangesetSeqNum(n)) + State: func(ctx context.Context, seqNum uint64) (*State, error) { + return ds.State(ctx, seqNum) }, } state, err := searchTimestamp(ctx, s, timestamp) @@ -178,7 +52,7 @@ func (ds *Datasource) ChangesetStateAt(ctx context.Context, timestamp time.Time) return 0, nil, err } - return ChangesetSeqNum(state.SeqNum), state, nil + return state.SeqNum, state, nil } func searchTimestamp(ctx context.Context, s *stater, timestamp time.Time) (*State, error) { diff --git a/replication/search_test.go b/replication/search_test.go index 422a993..327eaaa 100644 --- a/replication/search_test.go +++ b/replication/search_test.go @@ -143,7 +143,7 @@ func TestFindBound(t *testing.T) { } } -func TestMinuteStateAt(t *testing.T) { +func TestStateAt(t *testing.T) { liveOnly(t) ctx := context.Background() @@ -159,13 +159,13 @@ func TestMinuteStateAt(t *testing.T) { timestamp := base.Add(time.Duration(secs) * time.Second) t.Logf("n: %d timestamp: %v", i, timestamp) - _, state, err := MinuteStateAt(ctx, timestamp) + _, state, err := StateAt(ctx, timestamp) if err != nil { t.Fatalf("failed to get state: %v", err) } if timestamp.After(state.Timestamp) { - _, current, err := CurrentMinuteState(ctx) + _, current, err := CurrentState(ctx) if err != nil { t.Fatalf("could not get current state: %v", err) } @@ -178,71 +178,14 @@ func TestMinuteStateAt(t *testing.T) { continue } - if state.SeqNum == minMinute { + if state.SeqNum == minSeqNum { // timestamp was before the first state continue } // state's timestamp is after what we want // get previous to make sure it before what we want - previous, err := MinuteState(ctx, MinuteSeqNum(state.SeqNum-1)) - if err != nil { - t.Fatalf("could not get previous state: %v", err) - } - - if !previous.Timestamp.Before(timestamp) { - t.Logf("prev: %+v", previous) - t.Logf("state: %+v", state) - t.Fatalf("previus state not before timestamp") - } - - t.Logf("found: %+v", state) - } -} - -func TestChangesetStateAt(t *testing.T) { - liveOnly(t) - - ctx := context.Background() - - base := time.Date(2016, 9, 15, 0, 0, 0, 0, time.UTC) - now := time.Date(2022, 9, 1, 0, 0, 0, 0, time.UTC) - diff := int64(now.Sub(base)/time.Second + 10) - - r := rand.New(rand.NewSource(42)) - - for i := 0; i < 10; i++ { - secs := r.Int63n(diff) - timestamp := base.Add(time.Duration(secs) * time.Second) - - t.Logf("n: %d timestamp: %v", i, timestamp) - _, state, err := ChangesetStateAt(ctx, timestamp) - if err != nil { - t.Fatalf("failed to get state: %v", err) - } - - if timestamp.After(state.Timestamp) { - _, current, err := CurrentChangesetState(ctx) - if err != nil { - t.Fatalf("could not get current state: %v", err) - } - - if current.SeqNum != state.SeqNum { - t.Logf("state: %+v", state) - t.Fatalf("if timstamp is before, it must be the current timestamp") - } - - continue - } - - if state.SeqNum == minChangeset { - // timestamp was before the first state - continue - } - - // state's timestamp is after what we want - // get previous to make sure it before what we want - previous, err := ChangesetState(ctx, ChangesetSeqNum(state.SeqNum-1)) + previous, err := GetState(ctx, state.SeqNum-1) if err != nil { t.Fatalf("could not get previous state: %v", err) }