Skip to content

linkedin/Hoptimator

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Hoptimator

Intro

Hoptimator gives you a SQL interface to a Kubernetes cluster. You can install databases, query tables, create views, and deploy data pipelines using just SQL.

To install a database, use kubectl:

 $ kubectl apply -f my-database.yaml

(create database is coming soon!)

Then use Hoptimator DDL to create a materialized view:

 > create materialized view my.foo as select * from ads.page_views;

Views created via DDL show up in Kubernetes as views:

 $ kubectl get views
 NAME     SCHEMA  VIEW  SQL
 my-foo   MY      FOO   SELECT *...

Materialized views result in pipelines:

 $ kubectl get pipelines
 NAME     SQL               STATUS
 my-foo   INSERT INTO...    Ready.

Quickstart

Hoptimator requires a Kubernetes cluster. To connect from outside a Kubernetes cluster, make sure your kubectl is properly configured.

The below setup will install two local demo DBs, ads and profiles.

  $ make install            # build and install SQL CLI
  $ make deploy deploy-demo # install demo DB CRDs and K8s objects
  $ ./hoptimator            # start the SQL CLI
  > !intro

Set up dev environment

The below setup will create a dev environment with various resources within Kubernetes.

  $ make install                                                    # build and install SQL CLI
  $ make deploy-dev-environment                                     # start all local dev setups
  $ kubectl port-forward -n kafka svc/one-kafka-external-bootstrap 9092 &   # forward external Kafka port for use by SQL CLI
  $ ./hoptimator                                                    # start the SQL CLI
  > !intro

Commands deploy-kafka, deploy-venice, deploy-flink, etc. exist in isolation to deploy individual components.

Kafka

To produce/consume Kafka data, use the following commands:

  $ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1
  $ kubectl run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.45.0-kafka-3.9.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server one-kafka-bootstrap.kafka.svc.cluster.local:9094 --topic existing-topic-1 --from-beginning

Flink

  $ kubectl get pods
  NAME                                              READY   STATUS    RESTARTS      AGE
  basic-session-deployment-7b94b98b6b-d6jt5         1/1     Running   0             43s

Once the Flink deployment pod has STATUS 'Running', you can forward port 8081 and connect to http://localhost:8081/ to access the Flink dashboard.

  $ kubectl port-forward svc/basic-session-deployment-rest 8081 &

See the Flink SQL Gateway Documentation for sample adhoc queries through Flink.

To push a Flink job directly to the Flink deployment created above, kubectl apply the following yaml:

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkSessionJob
    metadata:
      name: test-flink-session-job
    spec:
      deploymentName: basic-session-deployment
      job:
        entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
        args:
          - CREATE TABLE IF NOT EXISTS `datagen-table` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='datagen', 'number-of-rows'='10');
          - CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'topic'='existing-topic-1', 'value.format'='json');
          - INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `datagen-table`;
        jarURI: file:///opt/hoptimator-flink-runner.jar
        parallelism: 1
        upgradeMode: stateless
        state: running

The SQL CLI

The ./hoptimator script launches the sqlline SQL CLI pre-configured to connect to jdbc:hoptimator://. The CLI includes some additional commands. See !intro.

The JDBC Driver

To use Hoptimator from Java code, or from anything that supports JDBC, use the jdbc:hoptimator:// JDBC driver.

The Operator

hoptimator-operator turns materialized views into real data pipelines. The name operator comes from the Kubernetes Operator pattern. PipelineOperatorApp is intended to be an entry point for a running application that can listen to and reconcile the resources created in Kubernetes by the K8s Deployers. See hoptimator-operator-deployment.yaml for K8s pod deployment of the operator.

Extending Hoptimator

Hoptimator is extensible via hoptimator-api, which provides hooks for deploying, validating, and configuring the elements of a pipeline, including external objects (e.g. Kafka topics) and data plane connectors (e.g. Flink connectors). To deploy a source or sink, implement Deployer<Source>. To deploy a job, implement Deployer<Job>.

In addition, the k8s catalog is itself highly extensible via TableTemplates and JobTemplates. Generally, you can get Hoptimator to do what you want without writing any new code.

Table Templates

TableTemplates let you specify how sources and sinks should be included in a pipeline. For example see kafkadb.yaml.

In this case, any tables within kafka-database will get deployed as KafkaTopics and use kafka connectors.

Job Templates

JobTemplates are similar, but can embed SQL. For example see flink-template.yaml.

In this case, any jobs created with this template will get deployed as FlinkSessionJobs within the flink namespace.

Configuration

The {{ }} sections you see in the templates are variable placeholders that will be filled in by the Deployer. See Template.java for how to specify templates.

While Deployers are extensible, today the primary deployer is to Kubernetes. These deployers K8sSourceDeployer (for table-templates) and K8sJobDeployer (for job-templates) provide a few template defaults that you can choose to include in your templates:

K8sSourceDeployer: name, database, schema, table, pipeline

K8sJobDeployer: name, database, schema, table, pipeline, sql, flinksql, flinkconfigs

However, it is often a case where you want to add additional information to the templates that will be passed through during Source or Job creation. There are two mechanisms to achieve this:

ConfigProvider

The ConfigProvider interface allows you to load additional configuration information that will be used during Source or Job creation.

The K8sConfigProvider is the default one used in the K8s deployers. K8sConfigProvider contains the ability to read configuration information via a Kubernetes configmap, hoptimator-configmap. See hoptimator-configmap.yaml for information on how to use configmaps.

Configmaps are meant to be used for static configuration information applicable to the namespace hoptimator-configmap belongs to.

Hints

Users may want to provide additional information for their Job or Sink creation at runtime. This can be done by adding hints as JDBC properties.

Hints are key-value pairs separated by an equals sign. Multiple hints are separated by a comma.

For example, to specify the number of kafka partitions and the flink parallelism, you could add the following hints to the query:

jdbc:hoptimator://hints=kafka.partitions=4,flink.parallelism=2

These fields can then be added to templates as {{kafka.partitions}} or {{flink.parallelism}} where applicable.

Note that hints are simply recommendations, if the planner plans a different pipeline, they will be ignored.