Skip to content

Commit bea27b8

Browse files
authored
Support OpenSSL Provider with default Netty allocator (opensearch-project#5460)
Signed-off-by: Andriy Redko <andriy.redko@aiven.io> Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
1 parent ce25dec commit bea27b8

File tree

5 files changed

+150
-4
lines changed

5 files changed

+150
-4
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
7979
- Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827))
8080
- Fixed compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944))
8181
- Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299))
82+
- Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460))
8283

8384
### Security
8485

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.transport;
10+
11+
import io.netty.channel.socket.InternetProtocolFamily;
12+
import io.netty.channel.socket.nio.NioServerSocketChannel;
13+
import io.netty.util.internal.SocketUtils;
14+
import io.netty.util.internal.logging.InternalLogger;
15+
import io.netty.util.internal.logging.InternalLoggerFactory;
16+
17+
import java.nio.channels.ServerSocketChannel;
18+
import java.nio.channels.SocketChannel;
19+
import java.nio.channels.spi.SelectorProvider;
20+
import java.util.List;
21+
22+
public class Netty4NioServerSocketChannel extends NioServerSocketChannel {
23+
private static final InternalLogger logger = InternalLoggerFactory.getInstance(Netty4NioServerSocketChannel.class);
24+
25+
public Netty4NioServerSocketChannel() {
26+
super();
27+
}
28+
29+
public Netty4NioServerSocketChannel(SelectorProvider provider) {
30+
super(provider);
31+
}
32+
33+
public Netty4NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
34+
super(provider, family);
35+
}
36+
37+
public Netty4NioServerSocketChannel(ServerSocketChannel channel) {
38+
super(channel);
39+
}
40+
41+
@Override
42+
protected int doReadMessages(List<Object> buf) throws Exception {
43+
SocketChannel ch = SocketUtils.accept(javaChannel());
44+
45+
try {
46+
if (ch != null) {
47+
buf.add(new Netty4NioSocketChannel(this, ch));
48+
return 1;
49+
}
50+
} catch (Throwable t) {
51+
logger.warn("Failed to create a new channel from an accepted socket.", t);
52+
53+
try {
54+
ch.close();
55+
} catch (Throwable t2) {
56+
logger.warn("Failed to close a socket.", t2);
57+
}
58+
}
59+
60+
return 0;
61+
}
62+
}

modules/transport-netty4/src/main/java/org/opensearch/transport/NettyAllocator.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import io.netty.buffer.UnpooledByteBufAllocator;
4040
import io.netty.channel.Channel;
4141
import io.netty.channel.ServerChannel;
42-
import io.netty.channel.socket.nio.NioServerSocketChannel;
4342
import org.apache.logging.log4j.LogManager;
4443
import org.apache.logging.log4j.Logger;
4544
import org.opensearch.common.Booleans;
@@ -181,7 +180,7 @@ public static Class<? extends ServerChannel> getServerChannelType() {
181180
if (ALLOCATOR instanceof NoDirectBuffers) {
182181
return CopyBytesServerSocketChannel.class;
183182
} else {
184-
return NioServerSocketChannel.class;
183+
return Netty4NioServerSocketChannel.class;
185184
}
186185
}
187186

server/src/main/java/org/opensearch/common/bytes/BytesReference.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,13 @@ static BytesReference fromByteBuffers(ByteBuffer[] buffers) {
122122
* Returns BytesReference composed of the provided ByteBuffer.
123123
*/
124124
static BytesReference fromByteBuffer(ByteBuffer buffer) {
125-
assert buffer.hasArray();
126-
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
125+
if (buffer.hasArray()) {
126+
return new BytesArray(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
127+
} else {
128+
final byte[] array = new byte[buffer.remaining()];
129+
buffer.asReadOnlyBuffer().get(array, 0, buffer.remaining());
130+
return new BytesArray(array);
131+
}
127132
}
128133

129134
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.bytes;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
12+
13+
import org.hamcrest.Matchers;
14+
15+
import java.io.IOException;
16+
import java.nio.ByteBuffer;
17+
import java.util.Arrays;
18+
import java.util.Collection;
19+
import java.util.function.Function;
20+
21+
public class ByteBuffersBytesReferenceTests extends AbstractBytesReferenceTestCase {
22+
@ParametersFactory
23+
public static Collection<Object[]> allocator() {
24+
return Arrays.asList(
25+
new Object[] { (Function<Integer, ByteBuffer>) ByteBuffer::allocateDirect },
26+
new Object[] { (Function<Integer, ByteBuffer>) ByteBuffer::allocate }
27+
);
28+
}
29+
30+
private final Function<Integer, ByteBuffer> allocator;
31+
32+
public ByteBuffersBytesReferenceTests(Function<Integer, ByteBuffer> allocator) {
33+
this.allocator = allocator;
34+
}
35+
36+
@Override
37+
protected BytesReference newBytesReference(int length) throws IOException {
38+
return newBytesReference(length, randomInt(length));
39+
}
40+
41+
@Override
42+
protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException {
43+
return newBytesReference(length, 0);
44+
}
45+
46+
private BytesReference newBytesReference(int length, int offset) throws IOException {
47+
// we know bytes stream output always creates a paged bytes reference, we use it to create randomized content
48+
final ByteBuffer buffer = allocator.apply(length + offset);
49+
for (int i = 0; i < length + offset; i++) {
50+
buffer.put((byte) random().nextInt(1 << 8));
51+
}
52+
assertEquals(length + offset, buffer.limit());
53+
buffer.flip().position(offset);
54+
55+
BytesReference ref = BytesReference.fromByteBuffer(buffer);
56+
assertEquals(length, ref.length());
57+
assertTrue(ref instanceof BytesArray);
58+
assertThat(ref.length(), Matchers.equalTo(length));
59+
return ref;
60+
}
61+
62+
public void testArray() throws IOException {
63+
int[] sizes = { 0, randomInt(PAGE_SIZE), PAGE_SIZE, randomIntBetween(2, PAGE_SIZE * randomIntBetween(2, 5)) };
64+
65+
for (int i = 0; i < sizes.length; i++) {
66+
BytesArray pbr = (BytesArray) newBytesReference(sizes[i]);
67+
byte[] array = pbr.array();
68+
assertNotNull(array);
69+
assertEquals(sizes[i], array.length - pbr.offset());
70+
assertSame(array, pbr.array());
71+
}
72+
}
73+
74+
public void testArrayOffset() throws IOException {
75+
int length = randomInt(PAGE_SIZE * randomIntBetween(2, 5));
76+
BytesArray pbr = (BytesArray) newBytesReferenceWithOffsetOfZero(length);
77+
assertEquals(0, pbr.offset());
78+
}
79+
}

0 commit comments

Comments
 (0)