diff --git a/x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/CommunityIdProcessor.java b/x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/CommunityIdProcessor.java new file mode 100644 index 0000000000000..87fbca662463e --- /dev/null +++ b/x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/CommunityIdProcessor.java @@ -0,0 +1,562 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ingest; + +import org.elasticsearch.common.network.InetAddresses; +import org.elasticsearch.ingest.AbstractProcessor; +import org.elasticsearch.ingest.ConfigurationUtils; +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.Processor; + +import java.math.BigInteger; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException; +import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty; + +public final class CommunityIdProcessor extends AbstractProcessor { + + public static final String TYPE = "community_id"; + + private final String sourceIpField; + private final String sourcePortField; + private final String destinationIpField; + private final String destinationPortField; + private final String ianaNumberField; + private final String transportField; + private final String icmpTypeField; + private final String icmpCodeField; + private final String targetField; + private final MessageDigest messageDigest; + private final byte[] seed; + private final boolean ignoreMissing; + + CommunityIdProcessor( + String tag, + String description, + String sourceIpField, + String sourcePortField, + String destinationIpField, + String destinationPortField, + String ianaNumberField, + String transportField, + String icmpTypeField, + String icmpCodeField, + String targetField, + MessageDigest messageDigest, + byte[] seed, + boolean ignoreMissing + ) { + super(tag, description); + this.sourceIpField = sourceIpField; + this.sourcePortField = sourcePortField; + this.destinationIpField = destinationIpField; + this.destinationPortField = destinationPortField; + this.ianaNumberField = ianaNumberField; + this.transportField = transportField; + this.icmpTypeField = icmpTypeField; + this.icmpCodeField = icmpCodeField; + this.targetField = targetField; + this.messageDigest = messageDigest; + this.seed = seed; + this.ignoreMissing = ignoreMissing; + } + + public String getSourceIpField() { + return sourceIpField; + } + + public String getSourcePortField() { + return sourcePortField; + } + + public String getDestinationIpField() { + return destinationIpField; + } + + public String getDestinationPortField() { + return destinationPortField; + } + + public String getIanaNumberField() { + return ianaNumberField; + } + + public String getTransportField() { + return transportField; + } + + public String getIcmpTypeField() { + return icmpTypeField; + } + + public String getIcmpCodeField() { + return icmpCodeField; + } + + public String getTargetField() { + return targetField; + } + + public MessageDigest getMessageDigest() { + return messageDigest; + } + + public byte[] getSeed() { + return seed; + } + + public boolean getIgnoreMissing() { + return ignoreMissing; + } + + @Override + public IngestDocument execute(IngestDocument ingestDocument) throws Exception { + Flow flow = buildFlow(ingestDocument); + if (flow == null) { + if (ignoreMissing) { + return ingestDocument; + } else { + throw new IllegalArgumentException("unable to construct flow from document"); + } + } + + ingestDocument.setFieldValue(targetField, flow.toCommunityId(messageDigest, seed)); + return ingestDocument; + } + + private Flow buildFlow(IngestDocument d) { + String sourceIpAddrString = d.getFieldValue(sourceIpField, String.class, ignoreMissing); + if (sourceIpAddrString == null) { + return null; + } + + String destIpAddrString = d.getFieldValue(destinationIpField, String.class, ignoreMissing); + if (destIpAddrString == null) { + return null; + } + + Flow flow = new Flow(); + flow.source = InetAddresses.forString(sourceIpAddrString); + flow.destination = InetAddresses.forString(destIpAddrString); + + Object protocol = d.getFieldValue(ianaNumberField, Object.class, true); + if (protocol == null) { + protocol = d.getFieldValue(transportField, Object.class, ignoreMissing); + if (protocol == null) { + return null; + } + } + flow.protocol = Transport.fromObject(protocol); + + switch (flow.protocol) { + case Tcp: + case Udp: + case Sctp: + flow.sourcePort = parseIntFromObjectOrString(d.getFieldValue(sourcePortField, Object.class, ignoreMissing), "source port"); + if (flow.sourcePort == 0) { + throw new IllegalArgumentException("invalid source port [0]"); + } + + flow.destinationPort = parseIntFromObjectOrString( + d.getFieldValue(destinationPortField, Object.class, ignoreMissing), + "destination port" + ); + if (flow.destinationPort == 0) { + throw new IllegalArgumentException("invalid destination port [0]"); + } + break; + case Icmp: + case IcmpIpV6: + // tolerate missing or invalid ICMP types and codes + flow.icmpType = parseIntFromObjectOrString(d.getFieldValue(icmpTypeField, Object.class, true), "icmp type"); + flow.icmpCode = parseIntFromObjectOrString(d.getFieldValue(icmpCodeField, Object.class, true), "icmp code"); + break; + } + + return flow; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Converts an integer in the range of an unsigned 16-bit integer to a big-endian byte pair + */ + static byte[] toUint16(int num) { + if (num < 0 || num > 65535) { + throw new IllegalStateException("number [" + num + "] must be a value between 0 and 65535"); + } + return new byte[] { (byte) (num >> 8), (byte) num }; + } + + /** + * Attempts to coerce an object to an integer + */ + static int parseIntFromObjectOrString(Object o, String fieldName) { + if (o == null) { + return 0; + } else if (o instanceof Number) { + return (int) o; + } else if (o instanceof String) { + try { + return Integer.parseInt((String) o); + } catch (NumberFormatException e) { + // fall through to IllegalArgumentException below + } + } + throw new IllegalArgumentException("unable to parse " + fieldName + " [" + o + "]"); + } + + public static final class Factory implements Processor.Factory { + + static final String DEFAULT_SOURCE_IP = "source.ip"; + static final String DEFAULT_SOURCE_PORT = "source.port"; + static final String DEFAULT_DEST_IP = "destination.ip"; + static final String DEFAULT_DEST_PORT = "destination.port"; + static final String DEFAULT_IANA_NUMBER = "network.iana_number"; + static final String DEFAULT_TRANSPORT = "network.transport"; + static final String DEFAULT_ICMP_TYPE = "icmp.type"; + static final String DEFAULT_ICMP_CODE = "icmp.code"; + static final String DEFAULT_TARGET = "network.community_id"; + + @Override + public CommunityIdProcessor create( + Map registry, + String processorTag, + String description, + Map config + ) throws Exception { + String sourceIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_ip", DEFAULT_SOURCE_IP); + String sourcePortField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "source_port", DEFAULT_SOURCE_PORT); + String destIpField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_ip", DEFAULT_DEST_IP); + String destPortField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "destination_port", DEFAULT_DEST_PORT); + String ianaNumberField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "iana_number", DEFAULT_IANA_NUMBER); + String transportField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "transport", DEFAULT_TRANSPORT); + String icmpTypeField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "icmp_type", DEFAULT_ICMP_TYPE); + String icmpCodeField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "icmp_code", DEFAULT_ICMP_CODE); + String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "target_field", DEFAULT_TARGET); + int seedInt = ConfigurationUtils.readIntProperty(TYPE, processorTag, config, "seed", 0); + if (seedInt < 0 || seedInt > 65535) { + throw newConfigurationException(TYPE, processorTag, "seed", "must be a value between 0 and 65535"); + } + MessageDigest messageDigest; + try { + messageDigest = MessageDigest.getInstance("SHA-1"); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("unable to obtain SHA-1 hasher", e); + } + + boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", true); + return new CommunityIdProcessor( + processorTag, + description, + sourceIpField, + sourcePortField, + destIpField, + destPortField, + ianaNumberField, + transportField, + icmpTypeField, + icmpCodeField, + targetField, + messageDigest, + toUint16(seedInt), + ignoreMissing + ); + } + } + + /** + * Represents flow data per https://github.com/corelight/community-id-spec + */ + public static final class Flow { + + private static final List TRANSPORTS_WITH_PORTS = List.of( + Transport.Tcp, + Transport.Udp, + Transport.Sctp, + Transport.Icmp, + Transport.IcmpIpV6 + ); + + InetAddress source; + InetAddress destination; + Transport protocol; + int sourcePort; + int destinationPort; + int icmpType; + int icmpCode; + + /** + * @return true iff the source address/port is numerically less than the destination address/port as described + * at https://github.com/corelight/community-id-spec + */ + boolean isOrdered() { + int result = new BigInteger(1, source.getAddress()).compareTo(new BigInteger(1, destination.getAddress())); + return result < 0 || (result == 0 && sourcePort < destinationPort); + } + + byte[] toBytes() { + boolean hasPort = TRANSPORTS_WITH_PORTS.contains(protocol); + int len = source.getAddress().length + destination.getAddress().length + 2 + (hasPort ? 4 : 0); + ByteBuffer bb = ByteBuffer.allocate(len); + + boolean isOneWay = false; + if (protocol == Transport.Icmp || protocol == Transport.IcmpIpV6) { + // ICMP protocols populate port fields with ICMP data + Integer equivalent = IcmpType.codeEquivalent(icmpType, protocol == Transport.IcmpIpV6); + isOneWay = equivalent == null; + sourcePort = icmpType; + destinationPort = equivalent == null ? icmpCode : equivalent; + } + + boolean keepOrder = isOrdered() || ((protocol == Transport.Icmp || protocol == Transport.IcmpIpV6) && isOneWay); + bb.put(keepOrder ? source.getAddress() : destination.getAddress()); + bb.put(keepOrder ? destination.getAddress() : source.getAddress()); + bb.put(toUint16(protocol.getTransportNumber() << 8)); + + if (hasPort) { + bb.put(keepOrder ? toUint16(sourcePort) : toUint16(destinationPort)); + bb.put(keepOrder ? toUint16(destinationPort) : toUint16(sourcePort)); + } + + return bb.array(); + } + + String toCommunityId(MessageDigest md, byte[] seed) { + md.update(seed); + byte[] encodedBytes = Base64.getEncoder().encode(md.digest(toBytes())); + return "1:" + new String(encodedBytes, StandardCharsets.UTF_8); + } + } + + public enum Transport { + Icmp(1), + Igmp(2), + Tcp(6), + Udp(17), + Gre(47), + IcmpIpV6(58), + Eigrp(88), + Ospf(89), + Pim(103), + Sctp(132); + + private final int transportNumber; + + private static final Map TRANSPORT_NAMES; + + static { + TRANSPORT_NAMES = new HashMap<>(); + TRANSPORT_NAMES.put("icmp", Icmp); + TRANSPORT_NAMES.put("igmp", Igmp); + TRANSPORT_NAMES.put("tcp", Tcp); + TRANSPORT_NAMES.put("udp", Udp); + TRANSPORT_NAMES.put("gre", Gre); + TRANSPORT_NAMES.put("ipv6-icmp", IcmpIpV6); + TRANSPORT_NAMES.put("icmpv6", IcmpIpV6); + TRANSPORT_NAMES.put("eigrp", Eigrp); + TRANSPORT_NAMES.put("ospf", Ospf); + TRANSPORT_NAMES.put("pim", Pim); + TRANSPORT_NAMES.put("sctp", Sctp); + } + + Transport(int transportNumber) { + this.transportNumber = transportNumber; + } + + public int getTransportNumber() { + return transportNumber; + } + + public static Transport fromNumber(int transportNumber) { + switch (transportNumber) { + case 1: + return Icmp; + case 2: + return Igmp; + case 6: + return Tcp; + case 17: + return Udp; + case 47: + return Gre; + case 58: + return IcmpIpV6; + case 88: + return Eigrp; + case 89: + return Ospf; + case 103: + return Pim; + case 132: + return Sctp; + default: + throw new IllegalArgumentException("unknown transport protocol number [" + transportNumber + "]"); + } + } + + public static Transport fromObject(Object o) { + if (o instanceof Number) { + return fromNumber(((Number) o).intValue()); + } else if (o instanceof String) { + String protocolStr = (String) o; + + // check if matches protocol name + if (TRANSPORT_NAMES.containsKey(protocolStr.toLowerCase(Locale.ROOT))) { + return TRANSPORT_NAMES.get(protocolStr.toLowerCase(Locale.ROOT)); + } + + // check if convertible to protocol number + try { + int protocolNumber = Integer.parseInt(protocolStr); + return fromNumber(protocolNumber); + } catch (NumberFormatException e) { + // fall through to IllegalArgumentException + } + + throw new IllegalArgumentException("could not convert string [" + protocolStr + "] to transport protocol"); + } else { + throw new IllegalArgumentException( + "could not convert value of type [" + o.getClass().getName() + "] to transport protocol" + ); + } + } + } + + public enum IcmpType { + EchoReply(0), + EchoRequest(8), + RouterAdvertisement(9), + RouterSolicitation(10), + TimestampRequest(13), + TimestampReply(14), + InfoRequest(15), + InfoReply(16), + AddressMaskRequest(17), + AddressMaskReply(18), + V6EchoRequest(128), + V6EchoReply(129), + V6RouterSolicitation(133), + V6RouterAdvertisement(134), + V6NeighborSolicitation(135), + V6NeighborAdvertisement(136), + V6MLDv1MulticastListenerQueryMessage(130), + V6MLDv1MulticastListenerReportMessage(131), + V6WhoAreYouRequest(139), + V6WhoAreYouReply(140), + V6HomeAddressDiscoveryRequest(144), + V6HomeAddressDiscoveryResponse(145); + + private static final Map ICMP_V4_CODE_EQUIVALENTS; + private static final Map ICMP_V6_CODE_EQUIVALENTS; + + static { + ICMP_V4_CODE_EQUIVALENTS = new HashMap<>(); + ICMP_V4_CODE_EQUIVALENTS.put(EchoRequest.getType(), EchoReply.getType()); + ICMP_V4_CODE_EQUIVALENTS.put(EchoReply.getType(), EchoRequest.getType()); + ICMP_V4_CODE_EQUIVALENTS.put(TimestampRequest.getType(), TimestampReply.getType()); + ICMP_V4_CODE_EQUIVALENTS.put(TimestampReply.getType(), TimestampRequest.getType()); + ICMP_V4_CODE_EQUIVALENTS.put(InfoRequest.getType(), InfoReply.getType()); + ICMP_V4_CODE_EQUIVALENTS.put(RouterSolicitation.getType(), RouterAdvertisement.getType()); + ICMP_V4_CODE_EQUIVALENTS.put(RouterAdvertisement.getType(), RouterSolicitation.getType()); + ICMP_V4_CODE_EQUIVALENTS.put(AddressMaskRequest.getType(), AddressMaskReply.getType()); + ICMP_V4_CODE_EQUIVALENTS.put(AddressMaskReply.getType(), AddressMaskRequest.getType()); + + ICMP_V6_CODE_EQUIVALENTS = new HashMap<>(); + ICMP_V6_CODE_EQUIVALENTS.put(V6EchoRequest.getType(), V6EchoReply.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6EchoReply.getType(), V6EchoRequest.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6RouterSolicitation.getType(), V6RouterAdvertisement.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6RouterAdvertisement.getType(), V6RouterSolicitation.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6NeighborAdvertisement.getType(), V6NeighborSolicitation.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6NeighborSolicitation.getType(), V6NeighborAdvertisement.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6MLDv1MulticastListenerQueryMessage.getType(), V6MLDv1MulticastListenerReportMessage.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6WhoAreYouRequest.getType(), V6WhoAreYouReply.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6WhoAreYouReply.getType(), V6WhoAreYouRequest.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6HomeAddressDiscoveryRequest.getType(), V6HomeAddressDiscoveryResponse.getType()); + ICMP_V6_CODE_EQUIVALENTS.put(V6HomeAddressDiscoveryResponse.getType(), V6HomeAddressDiscoveryRequest.getType()); + } + + private final int type; + + IcmpType(int type) { + this.type = type; + } + + public int getType() { + return type; + } + + public static IcmpType fromNumber(int type) { + switch (type) { + case 0: + return EchoReply; + case 8: + return EchoRequest; + case 9: + return RouterAdvertisement; + case 10: + return RouterSolicitation; + case 13: + return TimestampRequest; + case 14: + return TimestampReply; + case 15: + return InfoRequest; + case 16: + return InfoReply; + case 17: + return AddressMaskRequest; + case 18: + return AddressMaskReply; + case 128: + return V6EchoRequest; + case 129: + return V6EchoReply; + case 133: + return V6RouterSolicitation; + case 134: + return V6RouterAdvertisement; + case 135: + return V6NeighborSolicitation; + case 136: + return V6NeighborAdvertisement; + case 130: + return V6MLDv1MulticastListenerQueryMessage; + case 131: + return V6MLDv1MulticastListenerReportMessage; + case 139: + return V6WhoAreYouRequest; + case 140: + return V6WhoAreYouReply; + case 144: + return V6HomeAddressDiscoveryRequest; + case 145: + return V6HomeAddressDiscoveryResponse; + default: + // don't fail if the type is unknown + return EchoReply; + } + } + + public static Integer codeEquivalent(int icmpType, boolean isIpV6) { + return isIpV6 ? ICMP_V6_CODE_EQUIVALENTS.get(icmpType) : ICMP_V4_CODE_EQUIVALENTS.get(icmpType); + } + } +} diff --git a/x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/IngestPlugin.java b/x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/IngestPlugin.java index 9f673b34d5119..4fbee2788b942 100644 --- a/x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/IngestPlugin.java +++ b/x-pack/plugin/ingest/src/main/java/org/elasticsearch/xpack/ingest/IngestPlugin.java @@ -15,6 +15,11 @@ public class IngestPlugin extends Plugin implements org.elasticsearch.plugins.In @Override public Map getProcessors(Processor.Parameters parameters) { - return Map.of(UriPartsProcessor.TYPE, new UriPartsProcessor.Factory()); + return Map.of( + UriPartsProcessor.TYPE, + new UriPartsProcessor.Factory(), + CommunityIdProcessor.TYPE, + new CommunityIdProcessor.Factory() + ); } } diff --git a/x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/CommunityIdProcessorFactoryTests.java b/x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/CommunityIdProcessorFactoryTests.java new file mode 100644 index 0000000000000..9ece600581007 --- /dev/null +++ b/x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/CommunityIdProcessorFactoryTests.java @@ -0,0 +1,121 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ingest; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_DEST_IP; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_DEST_PORT; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_IANA_NUMBER; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_ICMP_CODE; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_ICMP_TYPE; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_SOURCE_IP; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_SOURCE_PORT; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_TARGET; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_TRANSPORT; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.toUint16; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; + +public class CommunityIdProcessorFactoryTests extends ESTestCase { + + private CommunityIdProcessor.Factory factory; + + @Before + public void init() { + factory = new CommunityIdProcessor.Factory(); + } + + public void testCreate() throws Exception { + Map config = new HashMap<>(); + + String sourceIpField = randomAlphaOfLength(6); + config.put("source_ip", sourceIpField); + String sourcePortField = randomAlphaOfLength(6); + config.put("source_port", sourcePortField); + String destIpField = randomAlphaOfLength(6); + config.put("destination_ip", destIpField); + String destPortField = randomAlphaOfLength(6); + config.put("destination_port", destPortField); + String ianaNumberField = randomAlphaOfLength(6); + config.put("iana_number", ianaNumberField); + String transportField = randomAlphaOfLength(6); + config.put("transport", transportField); + String icmpTypeField = randomAlphaOfLength(6); + config.put("icmp_type", icmpTypeField); + String icmpCodeField = randomAlphaOfLength(6); + config.put("icmp_code", icmpCodeField); + String targetField = randomAlphaOfLength(6); + config.put("target_field", targetField); + int seedInt = randomIntBetween(0, 65535); + config.put("seed", Integer.toString(seedInt)); + boolean ignoreMissing = randomBoolean(); + config.put("ignore_missing", ignoreMissing); + + String processorTag = randomAlphaOfLength(10); + CommunityIdProcessor communityIdProcessor = factory.create(null, processorTag, null, config); + assertThat(communityIdProcessor.getTag(), equalTo(processorTag)); + assertThat(communityIdProcessor.getSourceIpField(), equalTo(sourceIpField)); + assertThat(communityIdProcessor.getSourcePortField(), equalTo(sourcePortField)); + assertThat(communityIdProcessor.getDestinationIpField(), equalTo(destIpField)); + assertThat(communityIdProcessor.getDestinationPortField(), equalTo(destPortField)); + assertThat(communityIdProcessor.getIanaNumberField(), equalTo(ianaNumberField)); + assertThat(communityIdProcessor.getTransportField(), equalTo(transportField)); + assertThat(communityIdProcessor.getIcmpTypeField(), equalTo(icmpTypeField)); + assertThat(communityIdProcessor.getIcmpCodeField(), equalTo(icmpCodeField)); + assertThat(communityIdProcessor.getTargetField(), equalTo(targetField)); + assertThat(communityIdProcessor.getSeed(), equalTo(toUint16(seedInt))); + assertThat(communityIdProcessor.getIgnoreMissing(), equalTo(ignoreMissing)); + } + + public void testSeed() throws Exception { + Map config = new HashMap<>(); + String processorTag = randomAlphaOfLength(10); + + // negative seeds are rejected + int tooSmallSeed = randomIntBetween(Integer.MIN_VALUE, -1); + config.put("seed", Integer.toString(tooSmallSeed)); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> factory.create(null, processorTag, null, config)); + assertThat(e.getMessage(), containsString("must be a value between 0 and 65535")); + + // seeds >= 2^16 are rejected + int tooBigSeed = randomIntBetween(65536, Integer.MAX_VALUE); + config.put("seed", Integer.toString(tooBigSeed)); + e = expectThrows(ElasticsearchException.class, () -> factory.create(null, processorTag, null, config)); + assertThat(e.getMessage(), containsString("must be a value between 0 and 65535")); + + // seeds between 0 and 2^16-1 are accepted + int justRightSeed = randomIntBetween(0, 65535); + byte[] expectedSeed = new byte[] { (byte) (justRightSeed >> 8), (byte) justRightSeed }; + config.put("seed", Integer.toString(justRightSeed)); + CommunityIdProcessor communityIdProcessor = factory.create(null, processorTag, null, config); + assertThat(communityIdProcessor.getSeed(), equalTo(expectedSeed)); + } + + public void testRequiredFields() throws Exception { + HashMap config = new HashMap<>(); + String processorTag = randomAlphaOfLength(10); + CommunityIdProcessor communityIdProcessor = factory.create(null, processorTag, null, config); + assertThat(communityIdProcessor.getTag(), equalTo(processorTag)); + assertThat(communityIdProcessor.getSourceIpField(), equalTo(DEFAULT_SOURCE_IP)); + assertThat(communityIdProcessor.getSourcePortField(), equalTo(DEFAULT_SOURCE_PORT)); + assertThat(communityIdProcessor.getDestinationIpField(), equalTo(DEFAULT_DEST_IP)); + assertThat(communityIdProcessor.getDestinationPortField(), equalTo(DEFAULT_DEST_PORT)); + assertThat(communityIdProcessor.getIanaNumberField(), equalTo(DEFAULT_IANA_NUMBER)); + assertThat(communityIdProcessor.getTransportField(), equalTo(DEFAULT_TRANSPORT)); + assertThat(communityIdProcessor.getIcmpTypeField(), equalTo(DEFAULT_ICMP_TYPE)); + assertThat(communityIdProcessor.getIcmpCodeField(), equalTo(DEFAULT_ICMP_CODE)); + assertThat(communityIdProcessor.getTargetField(), equalTo(DEFAULT_TARGET)); + assertThat(communityIdProcessor.getSeed(), equalTo(toUint16(0))); + assertThat(communityIdProcessor.getIgnoreMissing(), equalTo(true)); + } +} diff --git a/x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/CommunityIdProcessorTests.java b/x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/CommunityIdProcessorTests.java new file mode 100644 index 0000000000000..26dccf098c231 --- /dev/null +++ b/x-pack/plugin/ingest/src/test/java/org/elasticsearch/xpack/ingest/CommunityIdProcessorTests.java @@ -0,0 +1,316 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ingest; + +import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.security.MessageDigest; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_DEST_IP; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_DEST_PORT; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_IANA_NUMBER; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_ICMP_CODE; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_ICMP_TYPE; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_SOURCE_IP; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_SOURCE_PORT; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_TARGET; +import static org.elasticsearch.xpack.ingest.CommunityIdProcessor.Factory.DEFAULT_TRANSPORT; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +public class CommunityIdProcessorTests extends ESTestCase { + + // NOTE: all test methods beginning with "testBeats" are intended to duplicate the unit tests for the Beats + // community_id processor (see Github link below) to ensure that this processor produces the same values. To + // the extent possible, these tests should be kept in sync. + // + // https://github.com/elastic/beats/blob/master/libbeat/processors/communityid/communityid_test.go + + private Map event; + private MessageDigest messageDigest; + + @Before + public void setup() throws Exception { + messageDigest = MessageDigest.getInstance("SHA-1"); + event = buildEvent(); + } + + private Map buildEvent() { + event = new HashMap<>(); + var source = new HashMap(); + source.put("ip", "128.232.110.120"); + source.put("port", 34855); + event.put("source", source); + var destination = new HashMap(); + destination.put("ip", "66.35.250.204"); + destination.put("port", 80); + event.put("destination", destination); + var network = new HashMap(); + network.put("transport", "TCP"); + event.put("network", network); + return event; + } + + public void testBeatsValid() throws Exception { + testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="); + } + + public void testBeatsSeed() throws Exception { + testCommunityIdProcessor(event, 123, "1:hTSGlFQnR58UCk+NfKRZzA32dPg="); + } + + public void testBeatsInvalidSourceIp() throws Exception { + @SuppressWarnings("unchecked") + var source = (Map) event.get("source"); + source.put("ip", 2162716280L); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null)); + assertThat(e.getMessage(), containsString("field [source.ip] of type [java.lang.Long] cannot be cast to [java.lang.String]")); + } + + public void testBeatsInvalidSourcePort() throws Exception { + @SuppressWarnings("unchecked") + var source = (Map) event.get("source"); + source.put("port", 0); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null)); + assertThat(e.getMessage(), containsString("invalid source port")); + } + + public void testBeatsInvalidDestinationIp() throws Exception { + @SuppressWarnings("unchecked") + var destination = (Map) event.get("destination"); + String invalidIp = "308.111.1.2.3"; + destination.put("ip", invalidIp); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null)); + assertThat(e.getMessage(), containsString("'" + invalidIp + "' is not an IP string literal")); + } + + public void testBeatsInvalidDestinationPort() throws Exception { + @SuppressWarnings("unchecked") + var destination = (Map) event.get("destination"); + destination.put("port", null); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null)); + assertThat(e.getMessage(), containsString("invalid destination port [0]")); + } + + public void testBeatsUnknownProtocol() throws Exception { + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.put("transport", "xyz"); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> testCommunityIdProcessor(event, null)); + assertThat(e.getMessage(), containsString("could not convert string [xyz] to transport protocol")); + } + + public void testBeatsIcmp() throws Exception { + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.put("transport", "icmp"); + var icmp = new HashMap(); + icmp.put("type", 3); + icmp.put("code", 3); + event.put("icmp", icmp); + testCommunityIdProcessor(event, "1:KF3iG9XD24nhlSy4r1TcYIr5mfE="); + } + + public void testBeatsIcmpWithoutTypeOrCode() throws Exception { + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.put("transport", "icmp"); + testCommunityIdProcessor(event, "1:PAE85ZfR4SbNXl5URZwWYyDehwU="); + } + + public void testBeatsIgmp() throws Exception { + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.put("transport", "igmp"); + @SuppressWarnings("unchecked") + var source = (Map) event.get("source"); + source.remove("port"); + @SuppressWarnings("unchecked") + var destination = (Map) event.get("destination"); + destination.remove("port"); + testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI="); + } + + public void testBeatsProtocolNumberAsString() throws Exception { + @SuppressWarnings("unchecked") + var source = (Map) event.get("source"); + source.remove("port"); + @SuppressWarnings("unchecked") + var destination = (Map) event.get("destination"); + destination.remove("port"); + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.put("transport", "2"); + testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI="); + } + + public void testBeatsProtocolNumber() throws Exception { + @SuppressWarnings("unchecked") + var source = (Map) event.get("source"); + source.remove("port"); + @SuppressWarnings("unchecked") + var destination = (Map) event.get("destination"); + destination.remove("port"); + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.put("transport", 2); + testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI="); + } + + public void testBeatsIanaNumber() throws Exception { + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.remove("transport"); + network.put("iana_number", CommunityIdProcessor.Transport.Tcp.getTransportNumber()); + testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="); + } + + public void testIpv6() throws Exception { + @SuppressWarnings("unchecked") + var source = (Map) event.get("source"); + source.put("ip", "2001:0db8:85a3:0000:0000:8a2e:0370:7334"); + @SuppressWarnings("unchecked") + var destination = (Map) event.get("destination"); + destination.put("ip", "2001:0:9d38:6ab8:1c48:3a1c:a95a:b1c2"); + testCommunityIdProcessor(event, "1:YC1+javPJ2LpK5xVyw1udfT83Qs="); + } + + public void testIcmpWithCodeEquivalent() throws Exception { + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.put("transport", "icmp"); + var icmp = new HashMap(); + icmp.put("type", 10); + icmp.put("code", 3); + event.put("icmp", icmp); + testCommunityIdProcessor(event, "1:L8wnzpmRHIESLqLBy+zTqW3Pmqs="); + } + + public void testStringAndNumber() throws Exception { + // iana + event = buildEvent(); + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.remove("transport"); + network.put("iana_number", CommunityIdProcessor.Transport.Tcp.getTransportNumber()); + testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="); + + network.put("iana_number", Integer.toString(CommunityIdProcessor.Transport.Tcp.getTransportNumber())); + testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="); + + // protocol number + event = buildEvent(); + @SuppressWarnings("unchecked") + var source = (Map) event.get("source"); + source.remove("port"); + @SuppressWarnings("unchecked") + var destination = (Map) event.get("destination"); + destination.remove("port"); + @SuppressWarnings("unchecked") + var network2 = (Map) event.get("network"); + network2.put("transport", 2); + testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI="); + + network2.put("transport", "2"); + testCommunityIdProcessor(event, "1:D3t8Q1aFA6Ev0A/AO4i9PnU3AeI="); + + // source port + event = buildEvent(); + @SuppressWarnings("unchecked") + var source2 = (Map) event.get("source"); + source2.put("port", 34855); + testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="); + + source2.put("port", "34855"); + testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="); + + // dest port + event = buildEvent(); + @SuppressWarnings("unchecked") + var dest2 = (Map) event.get("destination"); + dest2.put("port", 80); + testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="); + + dest2.put("port", "80"); + testCommunityIdProcessor(event, "1:LQU9qZlK+B5F3KDmev6m5PMibrg="); + + // icmp type and code + event = buildEvent(); + @SuppressWarnings("unchecked") + var network3 = (Map) event.get("network"); + network3.put("transport", "icmp"); + var icmp = new HashMap(); + icmp.put("type", 3); + icmp.put("code", 3); + event.put("icmp", icmp); + testCommunityIdProcessor(event, "1:KF3iG9XD24nhlSy4r1TcYIr5mfE="); + + icmp = new HashMap(); + icmp.put("type", "3"); + icmp.put("code", "3"); + event.put("icmp", icmp); + testCommunityIdProcessor(event, "1:KF3iG9XD24nhlSy4r1TcYIr5mfE="); + } + + public void testIgnoreMissing() throws Exception { + @SuppressWarnings("unchecked") + var network = (Map) event.get("network"); + network.remove("transport"); + testCommunityIdProcessor(event, 0, null, true); + } + + private void testCommunityIdProcessor(Map source, String expectedHash) throws Exception { + testCommunityIdProcessor(source, 0, expectedHash); + } + + private void testCommunityIdProcessor(Map source, int seed, String expectedHash) throws Exception { + testCommunityIdProcessor(source, seed, expectedHash, false); + } + + private void testCommunityIdProcessor(Map source, int seed, String expectedHash, boolean ignoreMissing) + throws Exception { + + var processor = new CommunityIdProcessor( + null, + null, + DEFAULT_SOURCE_IP, + DEFAULT_SOURCE_PORT, + DEFAULT_DEST_IP, + DEFAULT_DEST_PORT, + DEFAULT_IANA_NUMBER, + DEFAULT_TRANSPORT, + DEFAULT_ICMP_TYPE, + DEFAULT_ICMP_CODE, + DEFAULT_TARGET, + messageDigest, + CommunityIdProcessor.toUint16(seed), + ignoreMissing + ); + + IngestDocument input = new IngestDocument(source, Map.of()); + IngestDocument output = processor.execute(input); + + String hash = output.getFieldValue(DEFAULT_TARGET, String.class, ignoreMissing); + assertThat(hash, equalTo(expectedHash)); + } + + public void testTransportEnum() { + for (CommunityIdProcessor.Transport t : CommunityIdProcessor.Transport.values()) { + assertThat(CommunityIdProcessor.Transport.fromNumber(t.getTransportNumber()), equalTo(t)); + } + } + + public void testIcmpTypeEnum() { + for (CommunityIdProcessor.IcmpType i : CommunityIdProcessor.IcmpType.values()) { + assertThat(CommunityIdProcessor.IcmpType.fromNumber(i.getType()), equalTo(i)); + } + } +}