Skip to content

(database) Update to rxjs pipeable operators #1622

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/database/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Reference, DataSnapshot, ThenableReference, Query } from '@firebase/database-types';
import { Observable } from 'rxjs/Observable';
import { Observable } from 'rxjs';

export type FirebaseOperation = string | Reference | DataSnapshot;

Expand Down
8 changes: 4 additions & 4 deletions src/database/list/audit-trail.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
import { AngularFireDatabase, AngularFireDatabaseModule, auditTrail, ChildEvent } from 'angularfire2/database';
import { TestBed, inject } from '@angular/core/testing';
import { COMMON_CONFIG } from '../test-config';
import 'rxjs/add/operator/skip';
import { skip } from 'rxjs/operators';

// generate random string to test fidelity of naming
const rando = () => (Math.random() + 1).toString(36).substring(7);
Expand Down Expand Up @@ -41,13 +41,13 @@ describe('auditTrail', () => {
app.delete().then(done, done.fail);
});

function prepareAuditTrail(opts: { events?: ChildEvent[], skip: number } = { skip: 0 }) {
const { events, skip } = opts;
function prepareAuditTrail(opts: { events?: ChildEvent[], skipnumber: number } = { skipnumber: 0 }) {
const { events, skipnumber } = opts;
const aref = createRef(rando());
aref.set(batch);
const changes = auditTrail(aref, events);
return {
changes: changes.skip(skip),
changes: changes.pipe(skip(skipnumber)),
ref: aref
};
}
Expand Down
68 changes: 36 additions & 32 deletions src/database/list/audit-trail.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { DatabaseQuery, ChildEvent, DatabaseSnapshot, AngularFireAction, SnapshotAction } from '../interfaces';
import { stateChanges } from './state-changes';
import { Observable } from 'rxjs/Observable';
import { Observable } from 'rxjs';
import { DataSnapshot } from '@firebase/database-types';
import { fromRef } from '../observable/fromRef';
import { AngularFireDatabase } from '../database';

import 'rxjs/add/operator/skipWhile';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';
import { skipWhile, withLatestFrom, map, scan } from 'rxjs/operators';

export function createAuditTrail(query: DatabaseQuery, afDatabase: AngularFireDatabase) {
return (events?: ChildEvent[]) => afDatabase.scheduler.keepUnstableUntilFirst(
Expand All @@ -19,7 +17,9 @@ export function createAuditTrail(query: DatabaseQuery, afDatabase: AngularFireDa

export function auditTrail(query: DatabaseQuery, events?: ChildEvent[]): Observable<SnapshotAction[]> {
const auditTrail$ = stateChanges(query, events)
.scan((current, action) => [...current, action], []);
.pipe(
scan<SnapshotAction>((current, action) => [...current, action], [])
);
return waitForLoaded(query, auditTrail$);
}

Expand All @@ -33,36 +33,40 @@ function loadedData(query: DatabaseQuery): Observable<LoadedMetadata> {
// known dataset. This will allow us to know what key to
// emit the "whole" array at when listening for child events.
return fromRef(query, 'value')
.map(data => {
// Store the last key in the data set
let lastKeyToLoad;
// Loop through loaded dataset to find the last key
data.payload.forEach(child => {
lastKeyToLoad = child.key; return false;
});
// return data set and the current last key loaded
return { data, lastKeyToLoad };
});
.pipe(
map(data => {
// Store the last key in the data set
let lastKeyToLoad;
// Loop through loaded dataset to find the last key
data.payload.forEach(child => {
lastKeyToLoad = child.key; return false;
});
// return data set and the current last key loaded
return { data, lastKeyToLoad };
})
);
}

function waitForLoaded(query: DatabaseQuery, action$: Observable<SnapshotAction[]>) {
const loaded$ = loadedData(query);
return loaded$
.withLatestFrom(action$)
// Get the latest values from the "loaded" and "child" datasets
// We can use both datasets to form an array of the latest values.
.map(([loaded, actions]) => {
// Store the last key in the data set
let lastKeyToLoad = loaded.lastKeyToLoad;
// Store all child keys loaded at this point
const loadedKeys = actions.map(snap => snap.key);
return { actions, lastKeyToLoad, loadedKeys }
})
// This is the magical part, only emit when the last load key
// in the dataset has been loaded by a child event. At this point
// we can assume the dataset is "whole".
.skipWhile(meta => meta.loadedKeys.indexOf(meta.lastKeyToLoad) === -1)
// Pluck off the meta data because the user only cares
// to iterate through the snapshots
.map(meta => meta.actions);
.pipe(
withLatestFrom(action$),
// Get the latest values from the "loaded" and "child" datasets
// We can use both datasets to form an array of the latest values.
map(([loaded, actions]) => {
// Store the last key in the data set
let lastKeyToLoad = loaded.lastKeyToLoad;
// Store all child keys loaded at this point
const loadedKeys = actions.map(snap => snap.key);
return { actions, lastKeyToLoad, loadedKeys }
}),
// This is the magical part, only emit when the last load key
// in the dataset has been loaded by a child event. At this point
// we can assume the dataset is "whole".
skipWhile(meta => meta.loadedKeys.indexOf(meta.lastKeyToLoad) === -1),
// Pluck off the meta data because the user only cares
// to iterate through the snapshots
map(meta => meta.actions)
);
}
18 changes: 9 additions & 9 deletions src/database/list/changes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
import { AngularFireDatabase, AngularFireDatabaseModule, listChanges } from 'angularfire2/database';
import { TestBed, inject } from '@angular/core/testing';
import { COMMON_CONFIG } from '../test-config';
import 'rxjs/add/operator/skip';
import { skip, take } from 'rxjs/operators';

// generate random string to test fidelity of naming
const rando = () => (Math.random() + 1).toString(36).substring(7);
Expand Down Expand Up @@ -46,7 +46,7 @@ describe('listChanges', () => {
it('should stream value at first', (done) => {
const someRef = ref(rando());
const obs = listChanges(someRef, ['child_added']);
const sub = obs.take(1).subscribe(changes => {
const sub = obs.pipe(take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data).toEqual(items);
}).add(done);
Expand All @@ -56,7 +56,7 @@ describe('listChanges', () => {
it('should process a new child_added event', done => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added']);
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data[3]).toEqual({ name: 'anotha one' });
}).add(done);
Expand All @@ -67,7 +67,7 @@ describe('listChanges', () => {
it('should stream in order events', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name'), ['child_added']);
const sub = obs.take(1).subscribe(changes => {
const sub = obs.pipe(take(1)).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('one');
expect(names[1]).toEqual('two');
Expand All @@ -79,7 +79,7 @@ describe('listChanges', () => {
it('should stream in order events w/child_added', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name'), ['child_added']);
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('anotha one');
expect(names[1]).toEqual('one');
Expand All @@ -93,7 +93,7 @@ describe('listChanges', () => {
it('should stream events filtering', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name').equalTo('zero'), ['child_added']);
obs.skip(1).take(1).subscribe(changes => {
obs.pipe(skip(1),take(1)).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('zero');
expect(names[1]).toEqual('zero');
Expand All @@ -105,7 +105,7 @@ describe('listChanges', () => {
it('should process a new child_removed event', done => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added','child_removed']);
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data.length).toEqual(items.length - 1);
}).add(done);
Expand All @@ -118,7 +118,7 @@ describe('listChanges', () => {
it('should process a new child_changed event', (done) => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added','child_changed'])
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data[1].name).toEqual('lol');
}).add(done);
Expand All @@ -131,7 +131,7 @@ describe('listChanges', () => {
it('should process a new child_moved event', (done) => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added','child_moved'])
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
// We moved the first item to the last item, so we check that
// the new result is now the last result
Expand Down
27 changes: 13 additions & 14 deletions src/database/list/changes.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import { fromRef } from '../observable/fromRef';
import { Observable } from 'rxjs/Observable';
import { Observable } from 'rxjs';
import { of } from 'rxjs/observable/of';
import { merge } from 'rxjs/observable/merge';

import { DatabaseQuery, ChildEvent, AngularFireAction, SnapshotAction } from '../interfaces';
import { isNil } from '../utils';

import 'rxjs/add/operator/scan';
import 'rxjs/add/observable/merge';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/delay';
import 'rxjs/add/operator/distinctUntilChanged';
import { switchMap, distinctUntilChanged, scan } from 'rxjs/operators';

export function listChanges<T>(ref: DatabaseQuery, events: ChildEvent[]): Observable<SnapshotAction[]> {
return fromRef(ref, 'value', 'once').switchMap(snapshotAction => {
const childEvent$ = [Observable.of(snapshotAction)];
events.forEach(event => childEvent$.push(fromRef(ref, event)));
return Observable.merge(...childEvent$).scan(buildView, [])
})
.distinctUntilChanged();
return fromRef(ref, 'value', 'once').pipe(
switchMap(snapshotAction => {
const childEvent$ = [of(snapshotAction)];
events.forEach(event => childEvent$.push(fromRef(ref, event)));
return merge(...childEvent$).pipe(scan(buildView, []))
}),
distinctUntilChanged()
);
}

function positionFor(changes: SnapshotAction[], key) {
Expand Down
5 changes: 4 additions & 1 deletion src/database/list/create-reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { createAuditTrail } from './audit-trail';
import { createDataOperationMethod } from './data-operation';
import { createRemoveMethod } from './remove';
import { AngularFireDatabase } from '../database';
import { map } from 'rxjs/operators';

export function createListReference<T>(query: DatabaseQuery, afDatabase: AngularFireDatabase): AngularFireList<T> {
return {
Expand All @@ -29,7 +30,9 @@ export function createListReference<T>(query: DatabaseQuery, afDatabase: Angular
afDatabase.scheduler.runOutsideAngular(
snapshotChanges$
)
).map(actions => actions.map(a => a.payload.val()));
).pipe(
map(actions => actions.map(a => a.payload.val()))
);
}
}
}
30 changes: 15 additions & 15 deletions src/database/list/snapshot-changes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
import { AngularFireDatabase, AngularFireDatabaseModule, snapshotChanges, ChildEvent } from 'angularfire2/database';
import { TestBed, inject } from '@angular/core/testing';
import { COMMON_CONFIG } from '../test-config';
import 'rxjs/add/operator/skip';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { BehaviorSubject } from 'rxjs';
import { skip, take, switchMap } from 'rxjs/operators';

// generate random string to test fidelity of naming
const rando = () => (Math.random() + 1).toString(36).substring(7);
Expand Down Expand Up @@ -42,12 +42,12 @@ describe('snapshotChanges', () => {
app.delete().then(done, done.fail);
});

function prepareSnapshotChanges(opts: { events?: ChildEvent[], skip: number } = { skip: 0 }) {
const { events, skip } = opts;
function prepareSnapshotChanges(opts: { events?: ChildEvent[], skipnumber: number } = { skipnumber: 0 }) {
const { events, skipnumber } = opts;
const aref = createRef(rando());
const snapChanges = snapshotChanges(aref, events);
return {
snapChanges: snapChanges.skip(skip),
snapChanges: snapChanges.pipe(skip(skipnumber)),
ref: aref
};
}
Expand All @@ -64,7 +64,7 @@ describe('snapshotChanges', () => {
it('should handle multiple subscriptions (hot)', (done) => {
const { snapChanges, ref } = prepareSnapshotChanges();
const sub = snapChanges.subscribe(() => {}).add(done);
snapChanges.take(1).subscribe(actions => {
snapChanges.pipe(take(1)).subscribe(actions => {
const data = actions.map(a => a.payload!.val());
expect(data).toEqual(items);
}).add(sub);
Expand All @@ -73,8 +73,8 @@ describe('snapshotChanges', () => {

it('should handle multiple subscriptions (warm)', done => {
const { snapChanges, ref } = prepareSnapshotChanges();
snapChanges.take(1).subscribe(() => {}).add(() => {
snapChanges.take(1).subscribe(actions => {
snapChanges.pipe(take(1)).subscribe(() => {}).add(() => {
snapChanges.pipe(take(1)).subscribe(actions => {
const data = actions.map(a => a.payload!.val());
expect(data).toEqual(items);
}).add(done);
Expand All @@ -83,8 +83,8 @@ describe('snapshotChanges', () => {
});

it('should listen to only child_added events', (done) => {
const { snapChanges, ref } = prepareSnapshotChanges({ events: ['child_added'], skip: 0 });
snapChanges.take(1).subscribe(actions => {
const { snapChanges, ref } = prepareSnapshotChanges({ events: ['child_added'], skipnumber: 0 });
snapChanges.pipe(take(1)).subscribe(actions => {
const data = actions.map(a => a.payload!.val());
expect(data).toEqual(items);
}).add(done);
Expand All @@ -94,10 +94,10 @@ describe('snapshotChanges', () => {
it('should listen to only child_added, child_changed events', (done) => {
const { snapChanges, ref } = prepareSnapshotChanges({
events: ['child_added', 'child_changed'],
skip: 1
skipnumber: 1
});
const name = 'ligatures';
snapChanges.take(1).subscribe(actions => {
snapChanges.pipe(take(1)).subscribe(actions => {
const data = actions.map(a => a.payload!.val());;
const copy = [...items];
copy[0].name = name;
Expand All @@ -112,7 +112,7 @@ describe('snapshotChanges', () => {
it('should handle empty sets', done => {
const aref = createRef(rando());
aref.set({});
snapshotChanges(aref).take(1).subscribe(data => {
snapshotChanges(aref).pipe(take(1)).subscribe(data => {
expect(data.length).toEqual(0);
}).add(done);
});
Expand All @@ -124,10 +124,10 @@ describe('snapshotChanges', () => {
let namefilter$ = new BehaviorSubject<number|null>(null);
const aref = createRef(rando());
aref.set(batch);
namefilter$.switchMap(name => {
namefilter$.pipe(switchMap(name => {
const filteredRef = name ? aref.child('name').equalTo(name) : aref
return snapshotChanges(filteredRef);
}).take(2).subscribe(data => {
}),take(2)).subscribe(data => {
count = count + 1;
// the first time should all be 'added'
if(count === 1) {
Expand Down
3 changes: 1 addition & 2 deletions src/database/list/snapshot-changes.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Observable } from 'rxjs/Observable';
import { Observable } from 'rxjs';
import { listChanges } from './changes';
import { DatabaseQuery, ChildEvent, SnapshotAction } from '../interfaces';
import { validateEventsArray } from './utils';
import 'rxjs/add/operator/map';

export function snapshotChanges(query: DatabaseQuery, events?: ChildEvent[]): Observable<SnapshotAction[]> {
events = validateEventsArray(events);
Expand Down
Loading