Skip to content

Commit dca31a1

Browse files
committed
Implement record path filter. (#385)
1 parent fdf1832 commit dca31a1

File tree

5 files changed

+282
-3
lines changed

5 files changed

+282
-3
lines changed

metafacture-mangling/build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,10 @@ dependencies {
2323
testImplementation 'junit:junit:4.12'
2424
testImplementation 'org.mockito:mockito-core:2.5.5'
2525
}
26+
27+
test {
28+
testLogging {
29+
showStandardStreams = true
30+
exceptionFormat = 'full'
31+
}
32+
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2021 hbz NRW
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.metafacture.mangling;
17+
18+
import org.metafacture.framework.FluxCommand;
19+
import org.metafacture.framework.StreamReceiver;
20+
import org.metafacture.framework.annotations.In;
21+
import org.metafacture.framework.annotations.Out;
22+
import org.metafacture.framework.helpers.DefaultStreamPipe;
23+
24+
@In(StreamReceiver.class)
25+
@Out(StreamReceiver.class)
26+
@FluxCommand("filter-records-by-path")
27+
public final class RecordPathFilter extends DefaultStreamPipe<StreamReceiver> {
28+
29+
public static final String ROOT_PATH = "";
30+
31+
private final EntityPathTracker entityPathTracker = new EntityPathTracker();
32+
33+
private String match;
34+
private String path;
35+
36+
public RecordPathFilter() {
37+
this(ROOT_PATH);
38+
}
39+
40+
public RecordPathFilter(final String path) {
41+
super();
42+
setPath(path);
43+
}
44+
45+
public void setPath(final String path) {
46+
this.path = path;
47+
}
48+
49+
public String getPath() {
50+
return path;
51+
}
52+
53+
@Override
54+
public void startRecord(final String identifier) {
55+
assert !isClosed();
56+
57+
getReceiver().startRecord(identifier);
58+
entityPathTracker.startRecord(identifier);
59+
}
60+
61+
@Override
62+
public void endRecord() {
63+
assert !isClosed();
64+
65+
getReceiver().endRecord();
66+
entityPathTracker.endRecord();
67+
}
68+
69+
@Override
70+
public void startEntity(final String name) {
71+
if (path.equals(entityPathTracker.getCurrentPathWith(name))) {
72+
match = path;
73+
}
74+
else if (matching()) {
75+
getReceiver().startEntity(name);
76+
}
77+
else {
78+
entityPathTracker.startEntity(name);
79+
}
80+
}
81+
82+
@Override
83+
public void endEntity() {
84+
if (path.equals(match)) {
85+
match = null;
86+
}
87+
else if (matching()) {
88+
getReceiver().endEntity();
89+
}
90+
else {
91+
entityPathTracker.endEntity();
92+
}
93+
}
94+
95+
@Override
96+
public void literal(final String name, final String value) {
97+
if (matching()) {
98+
getReceiver().literal(name, value);
99+
}
100+
}
101+
102+
@Override
103+
public void onResetStream() {
104+
entityPathTracker.resetStream();
105+
}
106+
107+
@Override
108+
public void onCloseStream() {
109+
entityPathTracker.closeStream();
110+
}
111+
112+
private boolean matching() {
113+
return match != null || path.equals(ROOT_PATH);
114+
}
115+
116+
}

metafacture-mangling/src/main/resources/flux-commands.properties

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515
#
16+
change-id org.metafacture.mangling.RecordIdChanger
17+
discard-events org.metafacture.mangling.StreamEventDiscarder
1618
filter-duplicate-objects org.metafacture.mangling.DuplicateObjectFilter
1719
filter-null-values org.metafacture.mangling.NullFilter
20+
filter-records-by-path org.metafacture.mangling.RecordPathFilter
21+
flatten org.metafacture.mangling.StreamFlattener
1822
literal-to-object org.metafacture.mangling.LiteralToObject
1923
object-to-literal org.metafacture.mangling.ObjectToLiteral
20-
change-id org.metafacture.mangling.RecordIdChanger
2124
record-to-entity org.metafacture.mangling.RecordToEntity
22-
discard-events org.metafacture.mangling.StreamEventDiscarder
23-
flatten org.metafacture.mangling.StreamFlattener
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2021 hbz NRW
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.metafacture.mangling;
17+
18+
import org.junit.Rule;
19+
import org.junit.Test;
20+
import org.metafacture.framework.StreamReceiver;
21+
import org.mockito.Mock;
22+
import org.mockito.junit.MockitoJUnit;
23+
import org.mockito.junit.MockitoRule;
24+
25+
import java.util.function.Consumer;
26+
import java.util.function.Supplier;
27+
28+
public final class RecordPathFilterTest {
29+
30+
@Rule
31+
public final MockitoRule mockitoRule = MockitoJUnit.rule();
32+
33+
@Mock
34+
private StreamReceiver receiver;
35+
36+
@Test
37+
public void shouldProcessRootPath() {
38+
assertFilter(
39+
i -> {
40+
i.startRecord("1");
41+
i.startEntity("data");
42+
i.literal("lit", "record 1");
43+
i.endEntity();
44+
i.endRecord();
45+
i.startRecord("2");
46+
i.startEntity("data");
47+
i.literal("lit", "record 2");
48+
i.endEntity();
49+
i.endRecord();
50+
},
51+
o -> {
52+
o.get().startRecord("1");
53+
o.get().startEntity("data");
54+
o.get().literal("lit", "record 1");
55+
o.get().endEntity();
56+
o.get().endRecord();
57+
o.get().startRecord("2");
58+
o.get().startEntity("data");
59+
o.get().literal("lit", "record 2");
60+
o.get().endEntity();
61+
o.get().endRecord();
62+
}
63+
);
64+
}
65+
66+
@Test
67+
public void shouldProcessSimplePath() {
68+
assertFilter(
69+
i -> {
70+
i.setPath("data");
71+
72+
i.startRecord("1");
73+
i.startEntity("data");
74+
i.literal("lit", "record 1");
75+
i.endEntity();
76+
i.endRecord();
77+
i.startRecord("2");
78+
i.startEntity("junk");
79+
i.literal("lit", "skipped");
80+
i.endEntity();
81+
i.startEntity("data");
82+
i.literal("lit", "record 2");
83+
i.endEntity();
84+
i.endRecord();
85+
},
86+
o -> {
87+
o.get().startRecord("1");
88+
o.get().literal("lit", "record 1");
89+
o.get().endRecord();
90+
o.get().startRecord("2");
91+
o.get().literal("lit", "record 2");
92+
o.get().endRecord();
93+
}
94+
);
95+
}
96+
97+
private void assertFilter(final Consumer<RecordPathFilter> in, final Consumer<Supplier<StreamReceiver>> out) {
98+
final RecordPathFilter recordPathFilter = new RecordPathFilter();
99+
recordPathFilter.setReceiver(receiver);
100+
101+
TestHelpers.assertProcess(receiver, () -> in.accept(recordPathFilter), out);
102+
}
103+
104+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2021 hbz NRW
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.metafacture.mangling;
17+
18+
import org.metafacture.framework.StreamReceiver;
19+
import org.mockito.InOrder;
20+
import org.mockito.Mockito;
21+
import org.mockito.exceptions.base.MockitoAssertionError;
22+
23+
import java.util.function.BiConsumer;
24+
import java.util.function.Consumer;
25+
import java.util.function.IntFunction;
26+
import java.util.function.Supplier;
27+
28+
public final class TestHelpers {
29+
30+
public static void assertProcess(final StreamReceiver receiver, final Runnable process, final Consumer<Supplier<StreamReceiver>> out) {
31+
assertProcess(receiver, process, (s, f) -> out.accept(s));
32+
}
33+
34+
public static void assertProcess(final StreamReceiver receiver, final Runnable process, final BiConsumer<Supplier<StreamReceiver>, IntFunction<StreamReceiver>> out) {
35+
final InOrder ordered = Mockito.inOrder(receiver);
36+
37+
process.run();
38+
39+
try {
40+
out.accept(() -> ordered.verify(receiver), i -> ordered.verify(receiver, Mockito.times(i)));
41+
42+
ordered.verifyNoMoreInteractions();
43+
Mockito.verifyNoMoreInteractions(receiver);
44+
}
45+
catch (final MockitoAssertionError e) {
46+
System.out.println(Mockito.mockingDetails(receiver).printInvocations());
47+
throw e;
48+
}
49+
}
50+
51+
}

0 commit comments

Comments
 (0)