This directory provides samples related to using custom events with Business Automation Insights. The samples demonstrate how to:
- Register an Avro schema.
- Send to Business Automation Insights events that conform to the registered schema, by using either Confluent or IBM Automation Foundation Avro APIs.
Before sending events, you must configure Business Automation Insights to process custom events. For more information about such configuration, see the IBM Knowledge Center.
- How to build the samples
- Prerequisites
- Confluent
- IBM Automation Foundation
- Tests
You build the samples with the ./gradlew clean jar
command. This command produces the
./build/libs/bai-event-emitter-samples.jar
JAR file.
The build/libs/bai-event-emitter-samples.jar
file is a fat JAR which contains several Java main entry points.
These entry points are used by the launch scripts.
You need:
- IBM SDK, Java Technology Edition, Version 8
- A .properties file for Kafka security configuration, such as KafkaAvroProducer.properties.
- A Kafka installation: either
- Confluent Kafka 5.4.1 coming with IBM Business Automation Insights for a server
- IBM Automation Foundation (IAF) Kafka coming with IBM Business Automation Insights for Kubernetes
- An Avro schema as an
.avsc
file (see the schema example below) - An event to send, such as a file in JSON format (
.json
file) corresponding to the Avro schema
One benefit of using the docker-compose packaging of IBM Business Automation Insights is that this deployment mode is lightweight because of its single server architecture. One additional benefit is that it comes with an embedded Confluent Kafka distribution, which you can either use as is or replace with your own. Also, be aware that if you scale a single server deployment, you must restart the platform. On the other hand, the Kubernetes deployment mode is more complex to install but offers the inherent scalability and high availability of the platform.
When you use IBM Business Automation Insights in single server deployment mode, you can run the Confluent Kafka and Schema Registry containers. This is how you run this sample with Confluent.
For Business Automation Insights to support your custom events, you need to configure how the events are routed from
Kafka to Elasticsearch and, optionally, to HDFS, by modifying the config/flink/event-processor-config.yml
configuration file, as explained
here.
Here is an example of the event-processor-config.yml
file:
##
## Configurations file to define flows of events in the Flink Event Processor Job.
##
## Syntax:
## The overall configuration is defined as an array of configurations:
## configuration:
## [{configuration1}, {configuration2}, ...]
##
## Each configuration contains:
## A single Kafka topic
## A single Elasticsearch index bound to the topic
## An optional HDFS bucket bound to the topic
##
## Example:
## configurations:
## - kafka-topic: <topic name 1>
## elasticsearch-index: <Elasticsearch index name 1>
## hdfs-bucket: <hdfs://hdfs-hostname/bucket-name 1>
## - kafka-topic: <topic name 2>
## elasticsearch-index: <Elasticsearch index name 2>
---
configurations:
- kafka-topic: generic-schema-topic
elasticsearch-index: generic-schema-index
- kafka-topic: another-schema-ToPiC
elasticsearch-index: another-schema-idx
In this example, Business Automation Insights forwards events from the generic-schema-topic
Kafka topic to the
generic-schema-index
Elasticsearch index.
This sample considers that it is a best practice to use the same value for
both Kafka topic and Elasticsearch parameters.
A schema is a structure in JSON format which is generally located in a .avsc
file. Here is an example:
{
"name": "generic",
"type": "record",
"namespace": "com.ibm.bai",
"fields": [
{
"name": "timestamp",
"logicalType": "timestamp-millis",
"type": "long"
},
{
"name": "order",
"type": "string"
},
{
"name": "total_price",
"type": "int"
},
{
"name": "products",
"type": {
"type": "array",
"items": {
"name": "products_record",
"type": "record",
"fields": [
{
"name": "product_id",
"type": "string",
"ibm.automation.identifier": true
},
{
"name": "description",
"type": [
"string",
"null"
]
},
{
"name": "quantity",
"type": "int"
}
]
}
}
}
]
}
NOTE: the "ibm.automation.identifier": true
field attribute is optional and specific to IBM Business Automation
Insights.
This attribute allows the user to create an identifier that can be used later for creating some monitoring sources.
For more information, see the following documentation pages:
- Business Automation Insights for server documentation
- Business Automation Insights for Kubernetes documentation
You can also find this schema here.
The Avro schema must be registered in an Avro registry. The schema validates and encodes events.
You do not need to directly access the schema registry to register the schema because such registration is the purpose
of the management service.
For more information about the schema specification, see the Avro schema specification.
The following event complies with the Avro schema that is presented in the previous section:
{
"timestamp": 27,
"order": "3d478-36e",
"total_price": 500,
"products": [
{
"product_id": "First product",
"description": null,
"quantity": 1
},
{
"product_id": "Second product",
"description": {
"string": "Fragile"
},
"quantity": 2
}
]
}
You can find the corresponding file here.
- Start Business Automation Insights for a server.
- Edit the confluent.config file.
- Set the topic name:
TOPIC=generic-schema
- Set the Kafka credentials:
KAFKA_USERNAME=<kafka_user>
andKAFKA_PASSWORD=<kafka_password>
- Set the management service credentials:
MANAGEMENT_USERNAME=<mngt_user>
andMANAGEMENT_PASSWORD=<mngt_password>
- Set the management service URL:
MANAGEMENT_URL=https://localhost:6898
- Set the Kafka registry URL:
REGISTRY_URL=https://localhost:8084
- Set the event source file:
EVENT=src/test/resources/avro-sample-event.json
- Set the schema source file:
src/test/resources/confluent/generic/generic-schema.avsc
- Set the path to a Kafka security properties file:
KAFKA_SECURITY_PROPERTIES=src/test/resources/confluent/KafkaAvroProducer.properties
- Set the topic name:
- Edit the Kafka security properties file, example: KafkaAvroProducer.properties.
- Compile the sample:
./gradlew clean jar
- Run the sample:
bin/run-confluent-sample
. The output is similar to this example output. - To verify that the event sent with the sample is processed by Business Automation Insights, check that the
Elasticsearch index
generic-schema
is created, for example:curl -X GET -u <admin_user>:<admin_password> -k https://localhost:"$ELASTICSEARCH_PORT"/_cat/indices/generic-schema?h=index, docs.count\&v
KAFKA_SECURITY_PROPERTIES is src/test/resources/confluent/KafkaAvroProducer.properties
MANAGEMENT_URL is https://localhost:6898
SCHEMA is src/test/resources/confluent/generic/generic-schema.avsc
KAFKA_USERNAME is admin
KAFKA_PASSWORD is **********
MANAGEMENT_USERNAME is admin
MANAGEMENT_PASSWORD is **********
TOPIC is generic-schema
EVENT is src/test/resources/confluent/generic/event-4-generic.json
REGISTRY_URL is https://localhost:8084
Schema src/test/resources/confluent/generic/generic-schema.avsc successfully registered
Sent event: {
"timestamp": 27,
"order": "3d478-36e",
"total_price": 500,
"products": [
{
"product_id": "First product",
"description": null,
"quantity": 1
},
{
"product_id": "Second product",
"description": {
"string": "Fragile"
},
"quantity": 2
}
]
}
Listening for messages on Kafka topic 'generic-schema'
Waiting for message ...
Received a message: offset: 0, partition: 26, key: null, value: {"timestamp": 27,
"order": "3d478-36e", "total_price": 500,
"products": [{"product_id": "First product", "description": null, "quantity": 1}, {"product_id": "Second product",
"description": "Fragile", "quantity": 2}]}
1 message(s) found
Sending an event requires four steps: retrieve a schema, ensure the schema registration, convert the event to binary and send this binary payload.
You can retrieve a schema either from a schema registry or by reading the schema from a .avsc
file.
Retrieving a schema from the schema registry is not a feature that is used in the present samples because the schema
definition is already known.
You upload a schema to the Confluent Avro registry through the management service.
The management service
needs an Elasticsearch index name, a schema name, and the schema itself as a JSON string.
This schema name defines both the subject used to retrieve a schema from the registry and the Kafka topic used to
send an event.
Because the topic name is stored under an Elasticsearch index, a good naming convention is to use the same
value for both the topic name and Elasticsearch index name.
The relationship between the subject and the topic is described here:
What is a topic versus a schema versus a subject?.
This sample uses the
Confluent default name strategy
where the topic name is the name that was used at schema registration time. The subject name is deduced from the topic
name by adding a -value
keyword to the topic name.
The code of the sample uses the management service REST API in the sendSchema
method of the
ManagementServiceClient class.
// Content of the src/test/resources/confluent/generic/generic-schema.avsc file
String jsonSchema = ...;
String managementRootUrl = ...;
String managementUser = ...;
String managementPwd = ...;
String topicName = ...;
ManagementServiceClient mngtService = new ManagementServiceClient(managementRootUrl, managementUser, managementPwd);
if (mngtService.initialize()) {
// ensure the schema is registered
String response = mngtService.sendSchema(jsonSchema, topicName, topicName + "-value");
if(mngtService.validateSchemaRegistration(response)) ...
}
Schema schema = ...;
String jsonEvent = ...;
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema)
DecoderFactory decoderFactory = new DecoderFactory();
Decoder decoder = decoderFactory.jsonDecoder(schema, jsonEvent);
Object encodedMessage = reader.read(null, decoder);
if (schema.getType().equals(Schema.Type.STRING)) {
encodedMessage = ((Utf8) encodedMessage).toString();
}
return encodedMessage;
You can find this code in KafkaAvroProducerCommon in the sample code.
// Read the Kafka security properties and create a producer (see the "Prerequisites" section)
Properties kafkaProps = ...;
final KafkaProducer<String, Object> producer = new KafkaProducer<>(kafkaProps);
Object binaryEvent = ...;
String topic = ...;
ProducerRecord<String, Object> record =
new ProducerRecord<>(topic, null, null, binaryEvent, Collections.emptyList());
producer.send(record);
You can find this code in ConfluentProducer.
IBM Business Automation Insights Kubernetes deployment mode allows you to run the IBM Automation Foundation Kafka implementation and its registry.
For Business Automation Insights Kubernetes deployment mode to support your custom events, you need to configure how the events are routed from Kafka to Elasticsearch as explained here.
This is the easiest way to run this sample with IBM Automation Foundation.
A schema is a structure in JSON format which is generally located in a .avsc
file. Here is an example:
{
"name": "generic",
"type": "record",
"namespace": "com.ibm.bai",
"fields": [
{
"name": "order",
"type": "string"
},
{
"name": "total_price",
"type": "int"
},
{
"name": "products",
"type": {
"type": "array",
"items": {
"name": "products_record",
"type": "record",
"fields": [
{
"name": "product_id",
"type": "string",
"ibm.automation.identifier": true
},
{
"name": "description",
"type": [
"string",
"null"
]
},
{
"name": "quantity",
"type": "int"
}
]
}
}
}
]
}
NOTE: the "ibm.automation.identifier": true
field attribute is optional and specific to IBM Business Automation
Insights.
This attribute allows the user to create an identifier that can be used later for creating some monitoring sources.
For more information, see the following documentation pages:
- Business Automation Insights for server related documentation
- Business Automation Insights for Kubernetes related documentation
You can also find this schema here.
The Avro schema must be registered in an Avro registry. It is used to validate and encode events. As already detailed in the Confluent related chapter, the management service client is in charge of this purpose. For more information, see the Avro schema specification.
The following event complies with the Avro schema that is presented in the previous section:
{
"order": "13d478-36e",
"total_price": 500,
"products": [
{
"product_id": "First product",
"description": null,
"quantity": 1
},
{
"product_id": "Second product",
"description": {
"string": "Fragile"
},
"quantity": 2
}
]
}
You can find the corresponding file here.
-
Ensure that IBM Business Automation Insights is installed and IBM Automation Foundation is configured.
-
Edit the iaf.config file.
- Set the topic name (important note: the topic name must start with
icp4ba-bai
):
TOPIC=icp4ba-bai-ingress4samples-test
- Set the event source file:
EVENT=src/test/resources/iaf/generic-event.json
- Set the Kafka username of your Business Automation Insights installation. The documentation.
provides a way to retrieve this information.
KAFKA_USERNAME=kafka_user
- Set the corresponding Kafka password of your Business Automation Insights installation:
KAFKA_PASSWORD=aPassw0rd
- Set the path to a Kafka client security properties file:
KAFKA_SECURITY_PROPERTIES=src/test/resources/iaf/kafkaProducer.properties
- Set the path to the schema definition file:
SCHEMA=src/test/resources/iaf/generic-v1.0.0.avsc
- Set the Business Automation Insights management service URL. The
host of the URL is the Management route.
MANAGEMENT_URL=https://bai-management-bai.apps.my-domain.com
- Set the management service registered username. The documentation
provides a way to obtain this information.
MANAGEMENT_USERNAME=adminUser
- Set the management service user password:
MANAGEMENT_PASSWORD=user-password
- Set the topic name (important note: the topic name must start with
-
Edit the Kafka client properties file, for example: kafkaProducer.properties.
-
Compile the sample:
./gradlew clean jar
-
Run the sample:
bin/run-iaf-sample
. This produces an output similar to this example output -
Check that the Elasticsearch index
icp4ba-bai-ingress4samples-test
(corresponding to the above topic) is created in the IBM Automation Foundation environment:
export ES_USER=admin
export ES_PASSWORD=passw0rd
export ES_CLIENT_POD=$(kubectl get pods -o custom-columns=Name:.metadata.name --no-headers --selector=chart=ibm-dba-ek,role=client)
export ES_URL=https://localhost:9200
kubectl exec -it ${ES_CLIENT_POD} -- curl -u ${ES_USER}:${ES_PASSWORD} -k ${ES_URL}/_cat/indices/icp4ba-bai-ingress4samples-test?h=index,docs.count\&v
KAFKA_USERNAME is icp4ba-kafka-auth
KAFKA_PASSWORD is **********
KAFKA_SECURITY_PROPERTIES is src/test/resources/iaf/KafkaProducer.properties
TOPIC is icp4ba-bai-testkit-custom-event
EVENT is src/test/resources/iaf/generic-event.json
SCHEMA is src/test/resources/iaf/generic-v1.0.0.avsc
MANAGEMENT_URL is https://management.bai.apps.bai-ocp-test.cp.fyre.ibm.com
MANAGEMENT_USERNAME is admin
MANAGEMENT_PASSWORD is **********
Using default value of "*********" for property name "sasl.jaas.config"
topics: [icp4ba-bai-1, icp4ba-bai-2, icp4ba-bai-ingress4samples-test]
topic 'icp4ba-bai-ingress4samples-test' already exist, not creating it
configuring topic 'icp4ba-bai-ingress4samples-test'
adding 120s message retention duration
Schema src/test/resources/iaf/generic-v1.0.0.avsc successfully registered
Sent event: {
"order": "13d478-36e",
"total_price": 500,
"products": [
{
"product_id": "First product",
"description": null,
"quantity": 1
},
{
"product_id": "Second product",
"description": {
"string": "Fragile"
},
"quantity": 2
}
]
}
Kafka consumer listening for messages on topic 'icp4ba-bai-ingress4samples-test'
Received a message: offset: 0, partition: 11, key: null, value: {"order": "13d478-36e", "total_price": 500, "products":
[{"product_id": "First product", "description": null, "quantity": 1}, {"product_id": "Second product", "description":
"Fragile", "quantity": 2}]}
Found 1 message(s).
You send an event in three steps: ensure the schema is registered in the IBM Automation Foundation registry, convert the event to binary and send this binary payload.
Follow the procedure described in the Java code to register a schema section.
Convert the event JSON payload to binary representation by using a schema with IBM Automation Foundation
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema)
DecoderFactory decoderFactory = DecoderFactory.get();
Decoder decoder = decoderFactory.jsonDecoder(schema, jsonEvent);
Object encodedMessage = reader.read(null, decoder);
if (schema.getType().equals(Schema.Type.STRING)) {
encodedMessage = ((Utf8) object).toString();
}
You can find this code in KafkaAvroProducerCommon in the sample code.
Properties props = ....;
// Get a new Generic KafkaProducer
KafkaProducer<String, Object> producer = new KafkaProducer<>(props);
// Read in the local schema file
Schema.Parser schemaDefinitionParser = new Schema.Parser();
Schema schema = schemaDefinitionParser.parse(new File(schemaPath));
// the string event is supposed to be conformal to the schema
Object eventObject = jsonToAvro(event, schema);
// Prepare the record
ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, eventObject);
// Send the record to Kafka
producer.send(producerRecord);
You can find this kind of code in IAFProducer.java.
For development work, you can directly run event emission, for both supported platforms, from your preferred IDE.
The test source code contains a single class allowing you to test:
- event emission for IBM Automation Foundation
- event emission for Confluent
- schema registration
- sending event using the jar (after it is built and available in the
build/libs
directory)
The launch arguments are read from the properties files
src/test/resources/tests/variables4Confluent.properties or
src/test/resources/tests/variables4IAF.properties.
These properties files must contain the exact same property names as,
respectively the
confluent.config or
iaf.config
configuration files which are used when the executable JAR file of the
samples is started.