Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/database/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Reference, DataSnapshot, ThenableReference, Query } from '@firebase/database-types';
import { Observable } from 'rxjs/Observable';
import { Observable } from 'rxjs';

export type FirebaseOperation = string | Reference | DataSnapshot;

Expand Down
8 changes: 4 additions & 4 deletions src/database/list/audit-trail.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
import { AngularFireDatabase, AngularFireDatabaseModule, auditTrail, ChildEvent } from 'angularfire2/database';
import { TestBed, inject } from '@angular/core/testing';
import { COMMON_CONFIG } from '../test-config';
import 'rxjs/add/operator/skip';
import { skip } from 'rxjs/operators';

// generate random string to test fidelity of naming
const rando = () => (Math.random() + 1).toString(36).substring(7);
Expand Down Expand Up @@ -41,13 +41,13 @@ describe('auditTrail', () => {
app.delete().then(done, done.fail);
});

function prepareAuditTrail(opts: { events?: ChildEvent[], skip: number } = { skip: 0 }) {
const { events, skip } = opts;
function prepareAuditTrail(opts: { events?: ChildEvent[], skipnumber: number } = { skipnumber: 0 }) {
const { events, skipnumber } = opts;
const aref = createRef(rando());
aref.set(batch);
const changes = auditTrail(aref, events);
return {
changes: changes.skip(skip),
changes: changes.pipe(skip(skipnumber)),
ref: aref
};
}
Expand Down
68 changes: 36 additions & 32 deletions src/database/list/audit-trail.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { DatabaseQuery, ChildEvent, DatabaseSnapshot, AngularFireAction, SnapshotAction } from '../interfaces';
import { stateChanges } from './state-changes';
import { Observable } from 'rxjs/Observable';
import { Observable } from 'rxjs';
import { DataSnapshot } from '@firebase/database-types';
import { fromRef } from '../observable/fromRef';
import { AngularFireDatabase } from '../database';

import 'rxjs/add/operator/skipWhile';
import 'rxjs/add/operator/withLatestFrom';
import 'rxjs/add/operator/map';
import { skipWhile, withLatestFrom, map, scan } from 'rxjs/operators';

export function createAuditTrail(query: DatabaseQuery, afDatabase: AngularFireDatabase) {
return (events?: ChildEvent[]) => afDatabase.scheduler.keepUnstableUntilFirst(
Expand All @@ -19,7 +17,9 @@ export function createAuditTrail(query: DatabaseQuery, afDatabase: AngularFireDa

export function auditTrail(query: DatabaseQuery, events?: ChildEvent[]): Observable<SnapshotAction[]> {
const auditTrail$ = stateChanges(query, events)
.scan((current, action) => [...current, action], []);
.pipe(
scan<SnapshotAction>((current, action) => [...current, action], [])
);
return waitForLoaded(query, auditTrail$);
}

Expand All @@ -33,36 +33,40 @@ function loadedData(query: DatabaseQuery): Observable<LoadedMetadata> {
// known dataset. This will allow us to know what key to
// emit the "whole" array at when listening for child events.
return fromRef(query, 'value')
.map(data => {
// Store the last key in the data set
let lastKeyToLoad;
// Loop through loaded dataset to find the last key
data.payload.forEach(child => {
lastKeyToLoad = child.key; return false;
});
// return data set and the current last key loaded
return { data, lastKeyToLoad };
});
.pipe(
map(data => {
// Store the last key in the data set
let lastKeyToLoad;
// Loop through loaded dataset to find the last key
data.payload.forEach(child => {
lastKeyToLoad = child.key; return false;
});
// return data set and the current last key loaded
return { data, lastKeyToLoad };
})
);
}

function waitForLoaded(query: DatabaseQuery, action$: Observable<SnapshotAction[]>) {
const loaded$ = loadedData(query);
return loaded$
.withLatestFrom(action$)
// Get the latest values from the "loaded" and "child" datasets
// We can use both datasets to form an array of the latest values.
.map(([loaded, actions]) => {
// Store the last key in the data set
let lastKeyToLoad = loaded.lastKeyToLoad;
// Store all child keys loaded at this point
const loadedKeys = actions.map(snap => snap.key);
return { actions, lastKeyToLoad, loadedKeys }
})
// This is the magical part, only emit when the last load key
// in the dataset has been loaded by a child event. At this point
// we can assume the dataset is "whole".
.skipWhile(meta => meta.loadedKeys.indexOf(meta.lastKeyToLoad) === -1)
// Pluck off the meta data because the user only cares
// to iterate through the snapshots
.map(meta => meta.actions);
.pipe(
withLatestFrom(action$),
// Get the latest values from the "loaded" and "child" datasets
// We can use both datasets to form an array of the latest values.
map(([loaded, actions]) => {
// Store the last key in the data set
let lastKeyToLoad = loaded.lastKeyToLoad;
// Store all child keys loaded at this point
const loadedKeys = actions.map(snap => snap.key);
return { actions, lastKeyToLoad, loadedKeys }
}),
// This is the magical part, only emit when the last load key
// in the dataset has been loaded by a child event. At this point
// we can assume the dataset is "whole".
skipWhile(meta => meta.loadedKeys.indexOf(meta.lastKeyToLoad) === -1),
// Pluck off the meta data because the user only cares
// to iterate through the snapshots
map(meta => meta.actions)
);
}
18 changes: 9 additions & 9 deletions src/database/list/changes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
import { AngularFireDatabase, AngularFireDatabaseModule, listChanges } from 'angularfire2/database';
import { TestBed, inject } from '@angular/core/testing';
import { COMMON_CONFIG } from '../test-config';
import 'rxjs/add/operator/skip';
import { skip, take } from 'rxjs/operators';

// generate random string to test fidelity of naming
const rando = () => (Math.random() + 1).toString(36).substring(7);
Expand Down Expand Up @@ -46,7 +46,7 @@ describe('listChanges', () => {
it('should stream value at first', (done) => {
const someRef = ref(rando());
const obs = listChanges(someRef, ['child_added']);
const sub = obs.take(1).subscribe(changes => {
const sub = obs.pipe(take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data).toEqual(items);
}).add(done);
Expand All @@ -56,7 +56,7 @@ describe('listChanges', () => {
it('should process a new child_added event', done => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added']);
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data[3]).toEqual({ name: 'anotha one' });
}).add(done);
Expand All @@ -67,7 +67,7 @@ describe('listChanges', () => {
it('should stream in order events', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name'), ['child_added']);
const sub = obs.take(1).subscribe(changes => {
const sub = obs.pipe(take(1)).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('one');
expect(names[1]).toEqual('two');
Expand All @@ -79,7 +79,7 @@ describe('listChanges', () => {
it('should stream in order events w/child_added', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name'), ['child_added']);
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('anotha one');
expect(names[1]).toEqual('one');
Expand All @@ -93,7 +93,7 @@ describe('listChanges', () => {
it('should stream events filtering', (done) => {
const aref = ref(rando());
const obs = listChanges(aref.orderByChild('name').equalTo('zero'), ['child_added']);
obs.skip(1).take(1).subscribe(changes => {
obs.pipe(skip(1),take(1)).subscribe(changes => {
const names = changes.map(change => change.payload.val().name);
expect(names[0]).toEqual('zero');
expect(names[1]).toEqual('zero');
Expand All @@ -105,7 +105,7 @@ describe('listChanges', () => {
it('should process a new child_removed event', done => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added','child_removed']);
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data.length).toEqual(items.length - 1);
}).add(done);
Expand All @@ -118,7 +118,7 @@ describe('listChanges', () => {
it('should process a new child_changed event', (done) => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added','child_changed'])
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
expect(data[1].name).toEqual('lol');
}).add(done);
Expand All @@ -131,7 +131,7 @@ describe('listChanges', () => {
it('should process a new child_moved event', (done) => {
const aref = ref(rando());
const obs = listChanges(aref, ['child_added','child_moved'])
const sub = obs.skip(1).take(1).subscribe(changes => {
const sub = obs.pipe(skip(1),take(1)).subscribe(changes => {
const data = changes.map(change => change.payload.val());
// We moved the first item to the last item, so we check that
// the new result is now the last result
Expand Down
24 changes: 10 additions & 14 deletions src/database/list/changes.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
import { fromRef } from '../observable/fromRef';
import { Observable } from 'rxjs/Observable';
import { Observable } from 'rxjs';
import { DatabaseQuery, ChildEvent, AngularFireAction, SnapshotAction } from '../interfaces';
import { isNil } from '../utils';

import 'rxjs/add/operator/scan';
import 'rxjs/add/observable/merge';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/delay';
import 'rxjs/add/operator/distinctUntilChanged';
import { switchMap, distinctUntilChanged, scan } from 'rxjs/operators';

export function listChanges<T>(ref: DatabaseQuery, events: ChildEvent[]): Observable<SnapshotAction[]> {
return fromRef(ref, 'value', 'once').switchMap(snapshotAction => {
const childEvent$ = [Observable.of(snapshotAction)];
events.forEach(event => childEvent$.push(fromRef(ref, event)));
return Observable.merge(...childEvent$).scan(buildView, [])
})
.distinctUntilChanged();
return fromRef(ref, 'value', 'once').pipe(
switchMap(snapshotAction => {
const childEvent$ = [Observable.of(snapshotAction)];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import of and merge and use

events.forEach(event => childEvent$.push(fromRef(ref, event)));
return Observable.merge(...childEvent$).pipe(scan(buildView, []))
}),
distinctUntilChanged()
);
}

function positionFor(changes: SnapshotAction[], key) {
Expand Down
5 changes: 4 additions & 1 deletion src/database/list/create-reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { createAuditTrail } from './audit-trail';
import { createDataOperationMethod } from './data-operation';
import { createRemoveMethod } from './remove';
import { AngularFireDatabase } from '../database';
import { map } from 'rxjs/operators';

export function createListReference<T>(query: DatabaseQuery, afDatabase: AngularFireDatabase): AngularFireList<T> {
return {
Expand All @@ -29,7 +30,9 @@ export function createListReference<T>(query: DatabaseQuery, afDatabase: Angular
afDatabase.scheduler.runOutsideAngular(
snapshotChanges$
)
).map(actions => actions.map(a => a.payload.val()));
).pipe(
map(actions => actions.map(a => a.payload.val()))
);
}
}
}
30 changes: 15 additions & 15 deletions src/database/list/snapshot-changes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
import { AngularFireDatabase, AngularFireDatabaseModule, snapshotChanges, ChildEvent } from 'angularfire2/database';
import { TestBed, inject } from '@angular/core/testing';
import { COMMON_CONFIG } from '../test-config';
import 'rxjs/add/operator/skip';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import { BehaviorSubject } from 'rxjs';
import { skip, take, switchMap } from 'rxjs/operators';

// generate random string to test fidelity of naming
const rando = () => (Math.random() + 1).toString(36).substring(7);
Expand Down Expand Up @@ -42,12 +42,12 @@ describe('snapshotChanges', () => {
app.delete().then(done, done.fail);
});

function prepareSnapshotChanges(opts: { events?: ChildEvent[], skip: number } = { skip: 0 }) {
const { events, skip } = opts;
function prepareSnapshotChanges(opts: { events?: ChildEvent[], skipnumber: number } = { skipnumber: 0 }) {
const { events, skipnumber } = opts;
const aref = createRef(rando());
const snapChanges = snapshotChanges(aref, events);
return {
snapChanges: snapChanges.skip(skip),
snapChanges: snapChanges.pipe(skip(skipnumber)),
ref: aref
};
}
Expand All @@ -64,7 +64,7 @@ describe('snapshotChanges', () => {
it('should handle multiple subscriptions (hot)', (done) => {
const { snapChanges, ref } = prepareSnapshotChanges();
const sub = snapChanges.subscribe(() => {}).add(done);
snapChanges.take(1).subscribe(actions => {
snapChanges.pipe(take(1)).subscribe(actions => {
const data = actions.map(a => a.payload!.val());
expect(data).toEqual(items);
}).add(sub);
Expand All @@ -73,8 +73,8 @@ describe('snapshotChanges', () => {

it('should handle multiple subscriptions (warm)', done => {
const { snapChanges, ref } = prepareSnapshotChanges();
snapChanges.take(1).subscribe(() => {}).add(() => {
snapChanges.take(1).subscribe(actions => {
snapChanges.pipe(take(1)).subscribe(() => {}).add(() => {
snapChanges.pipe(take(1)).subscribe(actions => {
const data = actions.map(a => a.payload!.val());
expect(data).toEqual(items);
}).add(done);
Expand All @@ -83,8 +83,8 @@ describe('snapshotChanges', () => {
});

it('should listen to only child_added events', (done) => {
const { snapChanges, ref } = prepareSnapshotChanges({ events: ['child_added'], skip: 0 });
snapChanges.take(1).subscribe(actions => {
const { snapChanges, ref } = prepareSnapshotChanges({ events: ['child_added'], skipnumber: 0 });
snapChanges.pipe(take(1)).subscribe(actions => {
const data = actions.map(a => a.payload!.val());
expect(data).toEqual(items);
}).add(done);
Expand All @@ -94,10 +94,10 @@ describe('snapshotChanges', () => {
it('should listen to only child_added, child_changed events', (done) => {
const { snapChanges, ref } = prepareSnapshotChanges({
events: ['child_added', 'child_changed'],
skip: 1
skipnumber: 1
});
const name = 'ligatures';
snapChanges.take(1).subscribe(actions => {
snapChanges.pipe(take(1)).subscribe(actions => {
const data = actions.map(a => a.payload!.val());;
const copy = [...items];
copy[0].name = name;
Expand All @@ -112,7 +112,7 @@ describe('snapshotChanges', () => {
it('should handle empty sets', done => {
const aref = createRef(rando());
aref.set({});
snapshotChanges(aref).take(1).subscribe(data => {
snapshotChanges(aref).pipe(take(1)).subscribe(data => {
expect(data.length).toEqual(0);
}).add(done);
});
Expand All @@ -124,10 +124,10 @@ describe('snapshotChanges', () => {
let namefilter$ = new BehaviorSubject<number|null>(null);
const aref = createRef(rando());
aref.set(batch);
namefilter$.switchMap(name => {
namefilter$.pipe(switchMap(name => {
const filteredRef = name ? aref.child('name').equalTo(name) : aref
return snapshotChanges(filteredRef);
}).take(2).subscribe(data => {
}),take(2)).subscribe(data => {
count = count + 1;
// the first time should all be 'added'
if(count === 1) {
Expand Down
3 changes: 1 addition & 2 deletions src/database/list/snapshot-changes.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Observable } from 'rxjs/Observable';
import { Observable } from 'rxjs';
import { listChanges } from './changes';
import { DatabaseQuery, ChildEvent, SnapshotAction } from '../interfaces';
import { validateEventsArray } from './utils';
import 'rxjs/add/operator/map';

export function snapshotChanges(query: DatabaseQuery, events?: ChildEvent[]): Observable<SnapshotAction[]> {
events = validateEventsArray(events);
Expand Down
10 changes: 5 additions & 5 deletions src/database/list/state-changes.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { FirebaseApp, FirebaseAppConfig, AngularFireModule } from 'angularfire2'
import { AngularFireDatabase, AngularFireDatabaseModule, stateChanges, ChildEvent } from 'angularfire2/database';
import { TestBed, inject } from '@angular/core/testing';
import { COMMON_CONFIG } from '../test-config';
import 'rxjs/add/operator/skip';
import { skip } from 'rxjs/operators';

// generate random string to test fidelity of naming
const rando = () => (Math.random() + 1).toString(36).substring(7);
Expand Down Expand Up @@ -41,20 +41,20 @@ describe('stateChanges', () => {
app.delete().then(done, done.fail);
});

function prepareStateChanges(opts: { events?: ChildEvent[], skip: number } = { skip: 0 }) {
const { events, skip } = opts;
function prepareStateChanges(opts: { events?: ChildEvent[], skipnumber: number } = { skipnumber: 0 }) {
const { events, skipnumber } = opts;
const aref = createRef(rando());
aref.set(batch);
const changes = stateChanges(aref, events);
return {
changes: changes.skip(skip),
changes: changes.pipe(skip(skipnumber)),
ref: aref
};
}

it('should listen to all events by default', (done) => {

const { changes } = prepareStateChanges({ skip: 2 });
const { changes } = prepareStateChanges({ skipnumber: 2 });
changes.subscribe(action => {
expect(action.key).toEqual('2');
expect(action.payload.val()).toEqual(items[items.length - 1]);
Expand Down
Loading