Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/demo/src/app/app.component.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<a mat-list-item routerLink="/immutable-state">withImmutableState</a>
<a mat-list-item routerLink="/feature-factory">withFeatureFactory</a>
<a mat-list-item routerLink="/conditional">withConditional</a>
<a mat-list-item routerLink="/mutation">withMutation</a>
</mat-nav-list>
</mat-drawer>
<mat-drawer-content>
Expand Down
Empty file.
13 changes: 13 additions & 0 deletions apps/demo/src/app/counter-mutation/counter-mutation.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<h1>withMutations</h1>

<div class="counter">{{ counter() }}</div>

<ul>
<li>isPending: {{ isPending() }}</li>
<li>Status: {{ status() }}</li>
<li>Error: {{ error() | json }}</li>
</ul>

<div>
<button (click)="increment()">Increment</button>
</div>
22 changes: 22 additions & 0 deletions apps/demo/src/app/counter-mutation/counter-mutation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { CommonModule } from '@angular/common';
import { Component, inject } from '@angular/core';
import { CounterStore } from './counter.store';

@Component({
selector: 'demo-counter-mutation',
imports: [CommonModule],
templateUrl: './counter-mutation.html',
styleUrl: './counter-mutation.css',
})
export class CounterMutation {
private store = inject(CounterStore);

protected counter = this.store.counter;
protected error = this.store.incrementError;
protected isPending = this.store.incrementIsPending;
protected status = this.store.incrementStatus;

increment() {
this.store.increment({ value: 1 });
}
}
53 changes: 53 additions & 0 deletions apps/demo/src/app/counter-mutation/counter.store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import {
concatOp,
rxMutation,
withMutations,
} from '@angular-architects/ngrx-toolkit';
import { patchState, signalStore, withState } from '@ngrx/signals';
import { delay, Observable } from 'rxjs';

export type Params = {
value: number;
};

export const CounterStore = signalStore(
{ providedIn: 'root' },
withState({ counter: 0 }),
withMutations((store) => ({
increment: rxMutation({
operation: (params: Params) => {
return calcSum(store.counter(), params.value);
},
operator: concatOp,
onSuccess: (result) => {
console.log('result', result);
patchState(store, { counter: result });
},
onError: (error) => {
console.error('Error occurred:', error);
},
}),
})),
);

let error = false;

function createSumObservable(a: number, b: number): Observable<number> {
return new Observable<number>((subscriber) => {
const result = a + b;

if ((result === 7 || result === 13) && !error) {
subscriber.error({ message: 'error due to bad luck!', result });
error = true;
} else {
subscriber.next(result);
error = false;
}
subscriber.complete();
});
}

function calcSum(a: number, b: number): Observable<number> {
// return of(a + b);
return createSumObservable(a, b).pipe(delay(500));
}
7 changes: 7 additions & 0 deletions apps/demo/src/app/lazy-routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,11 @@ export const lazyRoutes: Route[] = [
(m) => m.ConditionalSettingComponent,
),
},
{
path: 'mutation',
loadComponent: () =>
import('./counter-mutation/counter-mutation').then(
(m) => m.CounterMutation,
),
},
];
10 changes: 10 additions & 0 deletions libs/ngrx-toolkit/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,14 @@ export {
} from './lib/storage-sync/with-storage-sync';
export { emptyFeature, withConditional } from './lib/with-conditional';
export { withFeatureFactory } from './lib/with-feature-factory';

export * from './lib/rx-mutation';
export * from './lib/with-mutations';
export { mapToResource, withResource } from './lib/with-resource';

export {
concatOp,
exhaustOp,
mergeOp,
switchOp,
} from './lib/flattening-operator';
42 changes: 42 additions & 0 deletions libs/ngrx-toolkit/src/lib/flattening-operator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import {
concatMap,
exhaustMap,
mergeMap,
ObservableInput,
ObservedValueOf,
OperatorFunction,
switchMap,
} from 'rxjs';

export type RxJsFlatteningOperator = <T, O extends ObservableInput<unknown>>(
project: (value: T, index: number) => O,
) => OperatorFunction<T, ObservedValueOf<O>>;

/**
* A wrapper for an RxJS flattening operator.
* This wrapper informs about whether the operator has exhaust semantics or not.
*/
export type FlatteningOperator = {
rxJsOperator: RxJsFlatteningOperator;
exhaustSemantics: boolean;
};

export const switchOp: FlatteningOperator = {
rxJsOperator: switchMap,
exhaustSemantics: false,
};

export const mergeOp: FlatteningOperator = {
rxJsOperator: mergeMap,
exhaustSemantics: false,
};

export const concatOp: FlatteningOperator = {
rxJsOperator: concatMap,
exhaustSemantics: false,
};

export const exhaustOp: FlatteningOperator = {
rxJsOperator: exhaustMap,
exhaustSemantics: true,
};
172 changes: 172 additions & 0 deletions libs/ngrx-toolkit/src/lib/rx-mutation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
import { computed, DestroyRef, inject, Injector, signal } from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import {
catchError,
defer,
EMPTY,
finalize,
Observable,
Subject,
tap,
} from 'rxjs';

import { concatOp, FlatteningOperator } from './flattening-operator';
import { Mutation, MutationResult, MutationStatus } from './with-mutations';

export type Func<P, R> = (params: P) => R;

export interface RxMutationOptions<P, R> {
operation: Func<P, Observable<R>>;
onSuccess?: (result: R, params: P) => void;
onError?: (error: unknown, params: P) => void;
operator?: FlatteningOperator;
injector?: Injector;
}

/**
* Creates a mutation that leverages RxJS.
*
* For each mutation the following options can be defined:
* - `operation`: A function that defines the mutation logic. It returns an Observable.
* - `onSuccess`: A callback that is called when the mutation is successful.
* - `onError`: A callback that is called when the mutation fails.
* - `operator`: An optional wrapper of an RxJS flattening operator. By default `concat` sematics are used.
* - `injector`: An optional Angular injector to use for dependency injection.
*
* The `operation` is the only mandatory option.
*
* ```typescript
* export type Params = {
* value: number;
* };
*
* export const CounterStore = signalStore(
* { providedIn: 'root' },
* withState({ counter: 0 }),
* withMutations((store) => ({
* increment: rxMutation({
* operation: (params: Params) => {
* return calcSum(store.counter(), params.value);
* },
* operator: concatOp,
* onSuccess: (result) => {
* console.log('result', result);
* patchState(store, { counter: result });
* },
* onError: (error) => {
* console.error('Error occurred:', error);
* },
* }),
* })),
* );
*
* function calcSum(a: number, b: number): Observable<number> {
* return of(a + b);
* }
* ```
*
* @param options
* @returns
*/
export function rxMutation<P, R>(
options: RxMutationOptions<P, R>,
): Mutation<P, R> {
const inputSubject = new Subject<{
param: P;
resolve: (result: MutationResult<R>) => void;
}>();
const flatteningOp = options.operator ?? concatOp;

const destroyRef = options.injector?.get(DestroyRef) ?? inject(DestroyRef);

const callCount = signal(0);
const errorSignal = signal<unknown>(undefined);
const idle = signal(true);
const isPending = computed(() => callCount() > 0);

const status = computed<MutationStatus>(() => {
if (idle()) {
return 'idle';
}
if (callCount() > 0) {
return 'pending';
}
if (errorSignal()) {
return 'error';
}
return 'success';
});

const initialInnerStatus: MutationStatus = 'idle';
let innerStatus: MutationStatus = initialInnerStatus;
let lastResult: R;

inputSubject
.pipe(
flatteningOp.rxJsOperator((input) =>
defer(() => {
callCount.update((c) => c + 1);
idle.set(false);
return options.operation(input.param).pipe(
tap((result: R) => {
options.onSuccess?.(result, input.param);
innerStatus = 'success';
errorSignal.set(undefined);
lastResult = result;
}),
catchError((error: unknown) => {
options.onError?.(error, input.param);
errorSignal.set(error);
innerStatus = 'error';
return EMPTY;
}),
finalize(() => {
callCount.update((c) => c - 1);

if (innerStatus === 'success') {
input.resolve({
status: 'success',
value: lastResult,
});
} else if (innerStatus === 'error') {
input.resolve({
status: 'error',
error: errorSignal(),
});
} else {
input.resolve({
status: 'aborted',
});
}

innerStatus = initialInnerStatus;
}),
);
}),
),
takeUntilDestroyed(destroyRef),
)
.subscribe();

const mutationFn = (param: P) => {
return new Promise<MutationResult<R>>((resolve) => {
if (callCount() > 0 && flatteningOp.exhaustSemantics) {
resolve({
status: 'aborted',
});
} else {
inputSubject.next({
param,
resolve,
});
}
});
};

const mutation = mutationFn as Mutation<P, R>;
mutation.status = status;
mutation.isPending = isPending;
mutation.error = errorSignal;

return mutation;
}
Loading