Skip to content

Commit 492e8ab

Browse files
committed
GH-6375: Harmonize package structures
Fixes: #6375 * Move respective classes to the `inbound`, `outbound` or `support` packages to make the whole picture for the API more concise and clean The firs commit is for `stream` module
1 parent 964dd9f commit 492e8ab

21 files changed

+737
-405
lines changed

spring-integration-stream/src/main/java/org/springframework/integration/stream/ByteStreamReadingMessageSource.java

Lines changed: 6 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,7 @@
1616

1717
package org.springframework.integration.stream;
1818

19-
import java.io.BufferedInputStream;
20-
import java.io.IOException;
2119
import java.io.InputStream;
22-
import java.util.concurrent.locks.Lock;
23-
import java.util.concurrent.locks.ReentrantLock;
24-
25-
import org.jspecify.annotations.Nullable;
26-
27-
import org.springframework.integration.endpoint.AbstractMessageSource;
28-
import org.springframework.messaging.MessagingException;
2920

3021
/**
3122
* A pollable source for receiving bytes from an {@link InputStream}.
@@ -34,77 +25,19 @@
3425
* @author Artem Bilan
3526
* @author Christian Tzolov
3627
* @author Ngoc Nhan
28+
*
29+
* @deprecated since 7.0 in favor of {@link org.springframework.integration.stream.inbound.ByteStreamReadingMessageSource}
3730
*/
38-
public class ByteStreamReadingMessageSource extends AbstractMessageSource<byte[]> {
39-
40-
private final Lock lock = new ReentrantLock();
41-
42-
private final BufferedInputStream stream;
43-
44-
private int bytesPerMessage = 1024; // NOSONAR magic number
45-
46-
private boolean shouldTruncate = true;
31+
@Deprecated(forRemoval = true, since = "7.0")
32+
public class ByteStreamReadingMessageSource
33+
extends org.springframework.integration.stream.inbound.ByteStreamReadingMessageSource {
4734

4835
public ByteStreamReadingMessageSource(InputStream stream) {
4936
this(stream, -1);
5037
}
5138

5239
public ByteStreamReadingMessageSource(InputStream stream, int bufferSize) {
53-
if (stream instanceof BufferedInputStream bufferedInputStream) {
54-
this.stream = bufferedInputStream;
55-
}
56-
else if (bufferSize > 0) {
57-
this.stream = new BufferedInputStream(stream, bufferSize);
58-
}
59-
else {
60-
this.stream = new BufferedInputStream(stream);
61-
}
62-
}
63-
64-
public void setBytesPerMessage(int bytesPerMessage) {
65-
this.bytesPerMessage = bytesPerMessage;
66-
}
67-
68-
public void setShouldTruncate(boolean shouldTruncate) {
69-
this.shouldTruncate = shouldTruncate;
70-
}
71-
72-
@Override
73-
public String getComponentType() {
74-
return "stream:stdin-channel-adapter(byte)";
75-
}
76-
77-
@Override
78-
protected byte @Nullable [] doReceive() {
79-
try {
80-
byte[] bytes;
81-
int bytesRead = 0;
82-
this.lock.lock();
83-
try {
84-
if (this.stream.available() == 0) {
85-
return null;
86-
}
87-
bytes = new byte[this.bytesPerMessage];
88-
bytesRead = this.stream.read(bytes, 0, bytes.length);
89-
}
90-
finally {
91-
this.lock.unlock();
92-
}
93-
if (bytesRead <= 0) {
94-
return null;
95-
}
96-
if (!this.shouldTruncate) {
97-
return bytes;
98-
}
99-
else {
100-
byte[] result = new byte[bytesRead];
101-
System.arraycopy(bytes, 0, result, 0, result.length);
102-
return result;
103-
}
104-
}
105-
catch (IOException e) {
106-
throw new MessagingException("IO failure occurred in adapter", e);
107-
}
40+
super(stream, bufferSize);
10841
}
10942

11043
}

spring-integration-stream/src/main/java/org/springframework/integration/stream/ByteStreamWritingMessageHandler.java

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -16,63 +16,28 @@
1616

1717
package org.springframework.integration.stream;
1818

19-
import java.io.BufferedOutputStream;
20-
import java.io.IOException;
2119
import java.io.OutputStream;
2220

23-
import org.springframework.integration.handler.AbstractMessageHandler;
24-
import org.springframework.messaging.Message;
25-
import org.springframework.messaging.MessagingException;
26-
2721
/**
2822
* A {@link org.springframework.messaging.MessageHandler} that writes a byte array to an
2923
* {@link OutputStream}.
3024
*
3125
* @author Mark Fisher
3226
* @author Gary Russell
3327
* @author Ngoc Nhan
28+
*
29+
* @deprecated since 7.0 in favor of {@link org.springframework.integration.stream.outbound.ByteStreamWritingMessageHandler}
3430
*/
35-
public class ByteStreamWritingMessageHandler extends AbstractMessageHandler {
36-
37-
private final BufferedOutputStream stream;
31+
@Deprecated(forRemoval = true, since = "7.0")
32+
public class ByteStreamWritingMessageHandler extends
33+
org.springframework.integration.stream.outbound.ByteStreamWritingMessageHandler {
3834

3935
public ByteStreamWritingMessageHandler(OutputStream stream) {
4036
this(stream, -1);
4137
}
4238

4339
public ByteStreamWritingMessageHandler(OutputStream stream, int bufferSize) {
44-
if (bufferSize > 0) {
45-
this.stream = new BufferedOutputStream(stream, bufferSize);
46-
}
47-
else {
48-
this.stream = new BufferedOutputStream(stream);
49-
}
50-
}
51-
52-
@Override
53-
public String getComponentType() {
54-
return "stream:outbound-channel-adapter(byte)";
55-
}
56-
57-
@Override
58-
protected void handleMessageInternal(Message<?> message) {
59-
Object payload = message.getPayload();
60-
try {
61-
if (payload instanceof String string) {
62-
this.stream.write(string.getBytes());
63-
}
64-
else if (payload instanceof byte[] bytes) {
65-
this.stream.write(bytes);
66-
}
67-
else {
68-
throw new MessagingException(this.getClass().getSimpleName() +
69-
" only supports byte array and String-based messages");
70-
}
71-
this.stream.flush();
72-
}
73-
catch (IOException e) {
74-
throw new MessagingException("IO failure occurred in target", e);
75-
}
40+
super(stream, bufferSize);
7641
}
7742

7843
}

spring-integration-stream/src/main/java/org/springframework/integration/stream/CharacterStreamReadingMessageSource.java

Lines changed: 6 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,9 @@
1717
package org.springframework.integration.stream;
1818

1919
import java.io.BufferedReader;
20-
import java.io.IOException;
2120
import java.io.InputStreamReader;
2221
import java.io.Reader;
2322
import java.io.UnsupportedEncodingException;
24-
import java.util.concurrent.locks.Lock;
25-
import java.util.concurrent.locks.ReentrantLock;
26-
27-
import org.jspecify.annotations.Nullable;
28-
29-
import org.springframework.context.ApplicationEventPublisher;
30-
import org.springframework.context.ApplicationEventPublisherAware;
31-
import org.springframework.integration.endpoint.AbstractMessageSource;
32-
import org.springframework.messaging.MessagingException;
33-
import org.springframework.util.Assert;
3423

3524
/**
3625
* A pollable source for {@link Reader Readers}.
@@ -40,17 +29,12 @@
4029
* @author Artem Bilan
4130
* @author Christian Tzolov
4231
* @author Ngoc Nhan
32+
*
33+
* @deprecated since 7.0 in favor of {@link org.springframework.integration.stream.inbound.CharacterStreamReadingMessageSource}
4334
*/
44-
public class CharacterStreamReadingMessageSource extends AbstractMessageSource<String>
45-
implements ApplicationEventPublisherAware {
46-
47-
private final Lock lock = new ReentrantLock();
48-
49-
private final BufferedReader reader;
50-
51-
private final boolean blockToDetectEOF;
52-
53-
private @Nullable ApplicationEventPublisher applicationEventPublisher;
35+
@Deprecated(forRemoval = true, since = "7.0")
36+
public class CharacterStreamReadingMessageSource
37+
extends org.springframework.integration.stream.inbound.CharacterStreamReadingMessageSource {
5438

5539
/**
5640
* Construct an instance with the provider reader.
@@ -94,50 +78,7 @@ public CharacterStreamReadingMessageSource(Reader reader, int bufferSize) {
9478
* @since 5.0
9579
*/
9680
public CharacterStreamReadingMessageSource(Reader reader, int bufferSize, boolean blockToDetectEOF) {
97-
Assert.notNull(reader, "reader must not be null");
98-
if (reader instanceof BufferedReader bufferedReader) {
99-
this.reader = bufferedReader;
100-
}
101-
else if (bufferSize > 0) {
102-
this.reader = new BufferedReader(reader, bufferSize);
103-
}
104-
else {
105-
this.reader = new BufferedReader(reader);
106-
}
107-
this.blockToDetectEOF = blockToDetectEOF;
108-
}
109-
110-
@Override
111-
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
112-
this.applicationEventPublisher = applicationEventPublisher;
113-
}
114-
115-
@Override
116-
public String getComponentType() {
117-
return "stream:stdin-channel-adapter(character)";
118-
}
119-
120-
@Override
121-
public @Nullable String doReceive() {
122-
try {
123-
this.lock.lock();
124-
try {
125-
if (!this.blockToDetectEOF && !this.reader.ready()) {
126-
return null;
127-
}
128-
String line = this.reader.readLine();
129-
if (line == null && this.applicationEventPublisher != null) {
130-
this.applicationEventPublisher.publishEvent(new StreamClosedEvent(this));
131-
}
132-
return line;
133-
}
134-
finally {
135-
this.lock.unlock();
136-
}
137-
}
138-
catch (IOException e) {
139-
throw new MessagingException("IO failure occurred in adapter", e);
140-
}
81+
super(reader, bufferSize, blockToDetectEOF);
14182
}
14283

14384
/**

0 commit comments

Comments
 (0)