This project utilizes Polygon's live stream and The Guardian's API to showcase real-time stock prices and news to watch for causation and signals.
Real-Time Dashboard - delayed by 15 minutes since I didn't want to pay to upgrade my Polygon subscription
- Introduction
- Technology Choices
- Initial Data Investigations - Polygon Websocket
- Initial Data Investigations - The Guardian API
- Metrics
- Website
- Kafka
- Flink
- InfluxDB
- Grafana
- Deployment
- Thoughts
- Conclusion
This portfolio project is designed to showcase my ability to learn new technologies in regards to streaming. Almost every technology chosen in this project I had limited knowledge in. Each section is commentary on the project that includes my learnings along the way. I wanted to understand how the stock market can be emotionally driven by news articles that may impact perception about certain stocks, in real-time. Overall, I had a lot of fun. Enjoy!
From a skillset perspective I am proficient in SQL and Python, which led me to choose these tools:
- Python: easiest way to access Polygon's real-time websocket and The Guardian's API to get the data and send it to Kafka as a buffer.
- Kafka: a buffer to collect the results of the streams as a temporary storage. This is the de-facto tool for real-time message brokers. Checked out RabbitMQ but Kafka is better for real-time.
- Flink: the de-facto tool for streaming with minimal latency. I had already worked with Spark, so I wanted to expand my skillset here. Utilized pyflink.
- Iceberg: Iceberg was created for streaming, gracefully handling appends. Works great with open source and long-term storage.
- AWS: I used AWS for my last project and really enjoyed the ease of use and UI. So here we are again.
- Flask: I am proficient in Flask web development and I wanted a web page so that I could easily adjust the stream inputs.
- InfluxDB: I wanted to originally use Prometheus, but Prometheus doesn't support inserting of data, just scraping websites. Needed a time-series database that supported inserts from Flink, so decided on the popular InfluxDB.
EDIT: InfluxDB ended up not having an official Flink connector, but still became my best option for real-time. Worked well with Grafana. - Grafana: Free dashboard tool designed for real-time data. I had already used Prometheus before, so it made sense to gain proficiency in Grafana since they work so well together.
- Docker: great way to re-produce environments and easily create a development network using docker-compose.
I am already subscribed to some Polygon endpoints and with that subscription comes access to their 15 min delay websocket. Polygon is a great data resource that I was excited to use again.
Feed: delayed.Polygon.io
Results
EquityAgg(
event_type="AM",
symbol="WMT",
volume=49893,
accumulated_volume=7266036,
official_open_price=96.77,
vwap=99.2916,
open=99.235,
close=99.38,
high=99.38,
low=99.233614,
aggregate_vwap=98.1625,
average_size=80,
start_timestamp=1738601460000,
end_timestamp=1738601520000,
otc=None,
)
Initial Thoughts: Given the start and end timestamps of the aggregation, can use the end timestamp as a watermark. Can see whether price went up or down during the time period as well. Aggregation is over a time frame of 1 minute. Noticed that the decimal placement can go pretty long.
symbol
STRING NOT NULL- Symbol ticker of the company
volume
INTEGER NOT NULL- Volume traded within the 1 minute interval
accumulated_volume
INTEGER NOT NULL- Accumulated volume for the day
official_open_price
FLOAT NOT NULL- Open stock price of the day
vwap
FLOAT NOT NULL- Volume Weighted Average Price
open
FLOAT NOT NULL- Open stock price of 1 minute interval
close
FLOAT NOT NULL- Close stock price of 1 minute interval
high
FLOAT NOT NULL- High stock price of 1 minute interval
low
FLOAT NOT NULL- Low stock rice of 1 minute interval
aggregate_vwap
FLOAT NOT NULL- Volume Weighted Average Price of the 1 minute interval
average_size
INTEGER NOT NULL- Average trade size of the 1 minute interval
start_timestamp
INTEGER NOT NULL- Epoch timestamp showing start of 1 minute interval
end_timestamp
INTEGER NOT NULL- Epoch timestamp showing end of 1 minute interval
otc
BOOLEAN NULL- Whether ticker is otc or not. Came in as None sometimes.
The Guardian is a well-respected news outlet that focuses on the US. It offers a free API key for development/non-profit purposes. I've had my eye on The Guardian ever since they helped Edward Snowden get the word out about the NSA's surveillance activities. You can check out their platform here.
URL: https://content.Guardianapis.com/search
Results
{
"response": {
"status": "ok",
"userTier": "developer",
"total": 28465,
"startIndex": 1,
"pageSize": 1,
"currentPage": 1,
"pages": 28465,
"orderBy": "relevance",
"results": [
{
"id": "global-development/2024/dec/22/exclusive-photographs-reveal-first-glimpse-of-uncontacted-amazon-community-massaco",
"type": "article",
"sectionId": "global-development",
"sectionName": "Global development",
"webPublicationDate": "2024-12-22T05:00:36Z",
"webTitle": "Photographs reveal first glimpse of uncontacted Amazon community",
"webUrl": "https://www.theGuardian.com/global-development/2024/dec/22/exclusive-photographs-reveal-first-glimpse-of-uncontacted-amazon-community-massaco",
"apiUrl": "https://content.Guardianapis.com/global-development/2024/dec/22/exclusive-photographs-reveal-first-glimpse-of-uncontacted-amazon-community-massaco",
"isHosted": False,
"pillarId": "pillar/news",
"pillarName": "News"
}
]
}
}
Initial thoughts: webTitle and webPublicationDate seem to be the most important data points. Status is important as well to check. Also have to be careful about search terms, I wanted the ecommerce Amazon, not the jungle. Looks like there is a sectionName that can be filtered for US activity only
id
STRING NOT NULL- Domain Endpoint; ID of the article
type
STRING NOT NULL- Type of published material ex. Article, Blog
sectionId
STRING NOT NULL- ID of section
sectionName
STRING NOT NULL- Proper name of section, low level category
webPublicationDate
STRING NOT NULL- Datetime in UTC when article was published/updated. Converted to INT NOT NULL epoch timestamp for InfluxDB.
webTitle
STRING NOT NULL- Title of the article
webUrl
STRING NOT NULL- URL link to the article on Guardian.com, html content
apiUrl
STRING NOT NULL- Programmatic address, very similar to URL link, raw content
isHosted
BOOLEAN NOT NULL- Whether the article is hosted by the Guardian or not
pillarId
STRING NOT NULL- ID of pillar
pillarName
STRING NOT NULL- Proper name of pillar, high level category
search
STRING NOT NULL- This is the text input for the API call to query results. Used as the Kafka Key.
The main metrics are from the Polygon API, essentially the average of open, close, high, low so I can see the stock performance in real-time. In the future I could use a watermark in Flink and calculate the change percentage in 5-15 min interval to look for trade signals.
I decided early on that I wanted to create a web page to easily change the Polygon and Guardian query inputs. I ended up with a simple flask home page.
Flask Website UI to control the Stream Tracking
One of the main issues I ran into while using the flask home page is when submitting a form, the websocket held the main python thread, which never ended up reloading the page. I ended up with a simple solution to create a background thread for the websocket to have the web page load properly after a form submission.
def start_websocket(self, ws):
def run_ws():
with self.app.app_context():
current_app.logger.debug("Starting Stream...")
ws.run(handle_msg=self._handle_msg)
threading.Thread(target=run_ws).start()
with self.app.app_context():
current_app.logger.debug("Stream Started")
One of the challenges with The Guardian's API is that it only allowed to search through articles for a specific date. That means I could continuously query the API, micro-batching, but I would keep getting old results. I utilized the webPublicationDate
timestamp field in the results to ensure only new articles were added to the kafka topic by maintaining a watermark timestamp and comparing the two fields.
Data Quality Checks can be difficult with streaming data because most checks have to be row level. I decided to create a Marshmallow schema for each data source to ensure the schema and data types are checked. The second check for each is also incorporated by Marshmallow by ensuring the fields are not missing or assigned as None. I found a package called marshmallow_dataclass
that allows a marshmallow schema to be created from a dataclass. Made it super easy. See below:
@dataclass
class EquityAgg:
event_type: str
symbol: str
volume: int
accumulated_volume: int
official_open_price: float
vwap: float
open: float
close: float
high: float
low: float
aggregate_vwap: float
average_size: int
start_timestamp: int
end_timestamp: int
otc: str = None # Make optional in schema
def __post_init__(self):
schema = EquityAggSchema()
errors = schema.validate(asdict(self))
if errors:
raise ValidationError(f"Validation errors: {errors}")
EquityAggSchema = class_schema(EquityAgg)
Ended up writing a lot of unit tests around this to ensure the checks were working!
The Kafka UI was very helpful in seeing the status of the cluster, topics, partitions, and the messages themselves while developing. I used the free UI developed by provectuslabs.
A Kafka cluster is traditionally managed by Zookeeper, but Kafka recently came out with K-raft mode, which allows a number of Kafka brokers to be a controller
. One of these brokers is elected as the leader, which coordinates the cluster and ensures proper failover.
Kafka Topics are used to logically split up the data that Kafka stores. I created two topics: stock-prices
and news-articles
to ensure that the two data sources I have were stored separately.
Kafka Partitioning allows for the splitting up of topics for parallelism and scalability. An optional identifier for a partition within a topic can be created/assigned as a key. The stock symbol
or the search
term was assigned as the key to ensure that data was being logically divided as a partition. This also allowed me to grab the partition key during Flink processing to properly tag the time-series data.
I ended up having one Flink job to process the data from the Kafka Topics and insert it into InfluxDB. In retrospect, it makes sense to split up the tasks into multiple Flink jobs to minimize points of failure and unnecessary dependencies.
Flink Tables are a higher level abstraction that allows Flink to treat a data stream similar to a table and enables the use of Flink SQL. Flink Tables can only be used with official connectors. When both the source and sink are official connectors simple SQL statements can be used such as the below:
INSERT INTO {table_sink}
SELECT
column_a,
column_b,
...
FROM {table_source}
Flink DataStreams are a lower level abstraction that allows for more fine-tuning and complex operations. DataStreams have to be used if Flink does not have an official connector, which unfortunately, I found out InfluxDB did not have one. I ended up having to utilize DataStreams to create a custom sink for InfluxDB. I ended up having tables for my sources to simplify, which I then converted to a DataStream to utilize my custom sink / insert. An interesting annoyance is when converting to a DataStream you have to declare the DataStream schema as well, even if it is converted from a Table. So I had to declare the schema twice, in two different formats, to utilize the custom sink.
Debugging Flink proved quite challenging due to not being able to run locally. Due to its distributed processing framework, a cluster needed spun up and the flink job submitted to test the job. A lot of logging was required to pinpoint the issues. The logging statements would show up in jobmanager
if it dealt with SQL syntax or startup issues and in taskmanager
if it dealt with the data processing. These can be viewed in docker or in the Flink UI.
InfluxDB intakes a Point, which has a time in unix nanoseconds, multiple tags and multiple labels. It is important to make sure numbers were properly casted so that they could be visualized appropriately. I developed some code to transform the Flink Row into an InfluxDB Point:
def process_element(self, value, context: "ProcessFunction.Context"):
if isinstance(value, Row):
row_dict = {field: value[field] for field in value.as_dict()}
p = Point(self.measurement)
for k, v in row_dict.items():
if k == "event_timestamp":
p.time(
int(v.timestamp() * 1_000_000_000)
) # Ensure nanosecond precision for InfluxDB
if k in {"symbol", "search"}:
p.tag("key", str(v))
logger.info(f"Kafka Key is: {str(v)}")
elif isinstance(v, (int, float)):
p.field(k, float(v))
else:
p.field(k, str(v))
Unfortunately InfluxDB did not have an official Flink Connector, which meant I had to create my own. Ended up greatly expanding upon my knowledge of InfluxDB and how to use their Python SDK to create a custom sink by using their API.
Learned quite a lot about time-series databases and how important it is to know the difference beween tags
and labels
in InfluxDB. Tags are indexed and labels are not. Tags allow filtering to occur, such as filtering to a specific stock ticker while labels are additional data that can be attached, similar to dimension attributes. Given that InfluxDB is time-series, most labels need to be a number since they're used to analyze metrics over time.
Time-series databases, at least InfluxDB, have a unique schema of three columns: Timestamp, Field, Value. It is essentially a SQL table unpivoted, which I thought was really interesting to simply data management. It is always important to know how your data is stored!
Grafan was pretty intuitive to use as you could hook up a data source and then just use the language needed to query that data source. InfluxDB also had a timeRangeStart and Stop to be dynamic and in line with Grafana's visualization. Ended up writing a simple InfluxDB Flux query to view the stock prices:
fieldList = ["high", "low"]
from(bucket: "events")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "stock_prices")
|> filter(fn: (r) => contains(value: r._field, set: fieldList))
To view the web articles I had to add what are called annotations
to the grafana graph, effectively marking important events on the stock graph. They are shown as the vertical dotted lines. Used the below query to "annotate" events to the time-series graph:
from(bucket: "events")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r._measurement == "news_articles")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> keep(columns: ["_time", "webTitle", "webUrl"])
I also ended up adding a table below the graph so it is easy to quickly see all the articles that came out and give the URLs for more deep-dives.
For deployment, the plan was to have Kafka and Flink pushed off onto Confluent. The producer/website could be on AWS EC2 instance and it would send messages to Confluent/Kafka, processed through serverless Flink, and then inserted back into InfluxDB on the same EC2 instance. I could then view the Grafana dashboard by using ssh and linking the grafana port to my local port.
It was a pretty simple transition from the docker-compose to the cloud. Here were a couple of things that needed addressed:
- Exposing the right ports for the EC2 server to produce the kafka messages appropriately and have the flink job insert back into InfluxDB
- Becoming familiar with the Confluent UI
- Using a custom sink became much more ambiguous since Confluent abstracts a lot of Flink away from the user
When it comes to streaming I checked AWS's services regarding Flink and Kafka, but the smallest versions cost way too much for a simple project. I ended up going with Confluent since it made things so easy by providing serverless. They also offered a $400 credit in the first month of usage! Score.
I found that Kafka had an InfluxDB connector when messing with Confluent. I ended up pivoting away from Flink due to the simplicity of Kafka Connect. This required some restructing of the message in the kafka topics, but was well worth it to simplify the process. It took a lot of work to get Flink working locally, but having to create my own custom sink as a DataStream made me wonder how I would implement it on Confluent using Flink SQL. Luckily I didn't have to work that out.
There were some initial errors with the records and I found that during an error the sink connector will create a dql topc, Dead Letter Queue
, which held the failed records. It was cool to see the thought-out reliability of making sure no data was lost.
I found that confluent supports stream lineage, which was really cool to see where records were coming from and where they were going.
One of the requirements for this project (I made this during Zach Wilson's Paid Bootcamp) was working in the cloud, so providing pictures as proof.
I ended up using SSH to connect to my EC2 instance and linking the ports to utilize the InfluxDB, Input Website, and Grafana UIs. Sample code below:
ssh -i <key_path> -L 3000:localhost:3000 -L 8001:localhost:8001 -L 8086:localhost:8086 <user>@<ip_address>
One aspect of this project that I would do differently is try and separate the flink processing from the website and move it all into its own folder to separate dependencies. I would also split up the flink job into multiple jobs. That way the producers and consumers are separated and can scale as needed.
Some more modifications I would make if I had the time:
- Physically store the watermark/timestamp for the Guardian API to prevent duplicate data upon shutdown mid-day
- I would create an iceberg table to store the appends of the stream separately. The historical data could be very valuable since InfluxDB trims the data past 48 hours. Probably use S3, create the table using pyiceberg, then a simple flink query could do append.
- Find a scalable way to setup alerts in Grafana to trigger if some stock prices go above or below a certain point
- Work on observability around the pipeline due to so many failure points
- Look into the Guardian Blogs and see how to filter them out. Looks the publication date is really the update date.
- I would add in schema registry for the topics. Given I already validate the records before sending them to Kafka using dataclasses and marshmallow, I didn't see it as a necessity for the initial project.
- A cool idea I had is to grab the text of the articles and try and determine sentiment. Possibly displaying a sentiment score in the articles table on the dashboard.
- I'm not sure but I think Pydantic could replace my dataclass/marshmallow schema data quality checks. Need to look further into it.
Developing a streaming solution is incredibly complex, but highly satisfying when it works. I learned a lot while working on this project; Having little to no exposure of Kafka, Flink, InfluxDB, and Grafana forced me to really challenge myself to move the project forward. Overall I learned a ton and realized Kafka itself has a lot of tools such as Kafka Connect that can replace Flink in simple cases. Cheers!