Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial import #1

Merged
merged 1 commit into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
74 changes: 74 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
name: CI

on:
pull_request:
push:
branches:
- master

jobs:
extract_versions:
name: Extract info from .tool-versions
runs-on: ubuntu-latest
outputs:
elixir-version: ${{ steps.set-versions.outputs.elixir_version }}
otp-version: ${{ steps.set-versions.outputs.otp_version }}
steps:
- name: Checkout .tool-versions file
uses: actions/checkout@v4
with:
sparse-checkout: |
.tool-versions
sparse-checkout-cone-mode: false

- name: Set Elixir, OTP, and Node.js versions as output
id: set-versions
run: |
elixir_version=$(grep -h elixir .tool-versions | awk '{ print $2 }' | awk -F - '{print $1}')
otp_version=$(grep -h erlang .tool-versions | awk '{ print $2 }')
echo "elixir_version=$elixir_version" >> $GITHUB_OUTPUT
echo "otp_version=$otp_version" >> $GITHUB_OUTPUT

test:
name: Test on OTP ${{ matrix.otp }} / Elixir ${{ matrix.elixir }}
runs-on: ubuntu-latest
needs: extract_versions
env:
otp-version: ${{ needs.extract_versions.outputs.otp-version }}
elixir-version: ${{ needs.extract_versions.outputs.elixir-version }}

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Elixir
uses: erlef/setup-beam@v1
with:
otp-version: ${{ env.otp-version }}
elixir-version: ${{ env.elixir-version }}

- name: Cache deps
id: cache-deps
uses: actions/cache@v4
env:
cache-name: cache-elixir-deps
with:
path: deps
key: ${{ runner.os }}-mix-${{ env.cache-name }}-${{ hashFiles('**/mix.lock') }}
restore-keys: |
${{ runner.os }}-mix-${{ env.cache-name }}-

- name: Fetch dependencies
run: mix deps.get

- name: Check formatting
run: mix format --check-formatted

- name: Check for unused dependencies
run: mix deps.get && mix deps.unlock --check-unused

- name: Compile
run: mix compile --warnings-as-errors

- name: Test
run: mix test
25 changes: 25 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where 3rd-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
broadway_sqs-*.tar

.elixir_ls
2 changes: 2 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
erlang 27.0.1
elixir 1.17.2-otp-27
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Changelog

## v0.1.0 (2024-09-27)

* Initial release
56 changes: 55 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,55 @@
# broadway_sqs_aws_elixir
# BroadwaySQS

A AWS SQS connector for [Broadway](https://github.com/dashbitco/broadway).

This is substantially a copy of [https://hexdocs.pm/broadway_sqs](https://hexdocs.pm/broadway_sqs),
but adapted to the [https://hex.pm/packages/aws](https://hex.pm/packages/aws) AWS client.

An upstream PR was offered to [https://hexdocs.pm/broadway_sqs](https://hexdocs.pm/broadway_sqs),
but was rejected due to the desire to support only one client. Rather than use a fork of that
repository, we decided to make a new repository so that we can independently tag releases.

## Installation

Add `:broadway_sqs` to the list of dependencies in `mix.exs` along with the HTTP
client of your choice (defaults to `:hackney`):

```elixir
def deps do
[
{:broadway_sqs, github: "thoroai/broadway_sqs_aws_elixir", ref: "0.1.0"}
]
end
```

## Usage

Configure Broadway with one or more producers using `BroadwaySQS.Producer`:

```elixir
Broadway.start_link(MyBroadway,
name: MyBroadway,
producer: [
module: {BroadwaySQS.Producer,
queue_url: "https://sqs.amazonaws.com/1234567890/queue"
}
]
)
```

## License

Copyright 2019 Plataformatec\
Copyright 2020 Dashbit

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
203 changes: 203 additions & 0 deletions lib/broadway_sqs/aws_elixir_client.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
if Code.ensure_loaded?(AWS) do
defmodule BroadwaySQS.AwsElixirClient do
@moduledoc """
Alternative SQS client used by `BroadwaySQS.Producer` to communicate with AWS
SQS service.

This client uses the `AWS.SQS` library and implements the
`BroadwaySQS.SQSClient` and `Broadway.Acknowledger` behaviours which define
callbacks for receiving and acknowledging messages.
"""

alias Broadway.{Message, Acknowledger}
require Logger

@behaviour BroadwaySQS.SQSClient
@behaviour Acknowledger

@max_num_messages_allowed_by_aws 10

@aws_sqs_client_args [:access_key_id, :secret_access_key, :token, :region]

@impl true
def init(opts) do
opts_map = opts |> Enum.into(%{ack_ref: opts[:broadway][:name]})

{:ok, opts_map}
end

@impl true
def receive_messages(demand, opts) do
receive_messages_request =
build_receive_messages_opts(opts, demand)

opts.config
|> build_client()
|> AWS.SQS.receive_message(receive_messages_request)
|> wrap_received_messages(opts)
end

@impl Acknowledger
def ack(ack_ref, successful, failed) do
ack_options = :persistent_term.get(ack_ref)

messages =
Enum.filter(successful, &ack?(&1, ack_options, :on_success)) ++
Enum.filter(failed, &ack?(&1, ack_options, :on_failure))

messages
|> Enum.chunk_every(@max_num_messages_allowed_by_aws)
|> Enum.each(fn messages -> delete_messages(messages, ack_options) end)
end

defp ack?(message, ack_options, option) do
{_, _, message_ack_options} = message.acknowledger
(message_ack_options[option] || Map.fetch!(ack_options, option)) == :ack
end

@impl Acknowledger
def configure(_ack_ref, ack_data, options) do
{:ok, Map.merge(ack_data, Map.new(options))}
end

defp delete_messages(messages, ack_options) do
delete_message_batch_request =
build_delete_message_opts(ack_options, messages)

ack_options.config
|> build_client()
|> AWS.SQS.delete_message_batch(delete_message_batch_request)
end

defp wrap_received_messages({:ok, %{"Messages" => raw_messages}, _}, %{ack_ref: ack_ref}) do
raw_messages
|> Enum.map(&build_message/1)
|> Enum.map(fn message ->
metadata = Map.delete(message, :body)
acknowledger = build_acknowledger(message, ack_ref)
%Message{data: message.body, metadata: metadata, acknowledger: acknowledger}
end)
end

defp wrap_received_messages({:ok, %{}, _}, %{ack_ref: _ack_ref}), do: []

defp wrap_received_messages(
{:error,
{:unexpected_response, %{body: body, headers: _headers, status_code: _status_code}}},
%{queue_url: queue_url}
) do
decoded = Jason.decode!(body)
# The documentation specifies that this key will be `message`, but I have
# observed `Message` in error responses. I am unsure if the documentation
# is incorrect, or the JSON API response occasionally does not conform to
# the specification.
reason = decoded["message"] || decoded["Message"]

Logger.error(
"Unable to fetch events from AWS queue #{queue_url}. Reason: #{inspect(reason)}"
)

[]
end

defp wrap_received_messages({:error, reason}, %{queue_url: queue_url}) do
Logger.error(
"Unable to fetch events from AWS queue #{queue_url}. Reason: #{inspect(reason)}"
)

[]
end

defp build_acknowledger(message, ack_ref) do
receipt = %{id: message.message_id, receipt_handle: message.receipt_handle}
{__MODULE__, ack_ref, %{receipt: receipt}}
end

defp build_receive_messages_opts(opts, demand) do
max_number_of_messages = min(demand, opts[:max_number_of_messages])

%{
"QueueUrl" => opts.queue_url,
"MaxNumberOfMessages" => max_number_of_messages,
"WaitTimeSeconds" => opts[:wait_time_seconds],
"VisibilityTimeout" => opts[:visibility_timeout],
"AttributeNames" => opts[:attribute_names],
"MessageAttributeNames" => opts[:message_attribute_names]
}
|> Enum.filter(fn {_, value} -> value end)
|> Enum.into(%{})
end

defp extract_message_receipt(message) do
{_, _, %{receipt: receipt}} = message.acknowledger
receipt
end

defp build_message(raw_message) do
%{
attributes: raw_message["Attributes"],
body: raw_message["Body"],
md5_of_body: raw_message["MD5OfBody"],
md5_of_message_attributes: raw_message["MD5OfMessageAttributes"],
message_attributes: raw_message["MessageAttributes"],
message_id: raw_message["MessageId"],
receipt_handle: raw_message["ReceiptHandle"]
}
|> Enum.filter(fn {_, value} -> value end)
|> Enum.into(%{})
end

defp build_delete_message_opts(opts, messages) do
receipts =
messages
|> Enum.map(&extract_message_receipt/1)
|> Enum.map(&build_delete_message_batch_request_entry/1)

%{"QueueUrl" => opts.queue_url, "Entries" => receipts}
end

defp build_delete_message_batch_request_entry(receipt) do
%{"Id" => receipt.id, "ReceiptHandle" => receipt.receipt_handle}
end

defp build_client(opts) do
options =
Keyword.merge(get_sanitized_aws_credentials(), opts)
|> Enum.into(%{})

client =
AWS.Client.create(
options[:access_key_id],
options[:secret_access_key],
options[:token],
options[:region]
)
|> put_endpoint(options[:endpoint])
|> put_http_client(options[:http_client])

options
|> Map.drop(@aws_sqs_client_args ++ [:endpoint, :http_client])
|> Enum.reduce(client, fn {k, v}, client -> Map.put(client, k, v) end)
end

defp put_endpoint(client, nil), do: client
defp put_endpoint(client, endpoint), do: AWS.Client.put_endpoint(client, endpoint)

defp put_http_client(client, nil), do: client
defp put_http_client(client, http_client), do: AWS.Client.put_http_client(client, http_client)

defp get_sanitized_aws_credentials() do
case :aws_credentials.get_credentials() do
:undefined ->
Keyword.new()

creds ->
# Rename the token field, and sanitize the output.
creds
|> Map.put(:session_token, creds[:token])
|> Map.take(@aws_sqs_client_args)
|> Enum.into(Keyword.new())
end
end
end
end
Loading