Skip to content

Commit

Permalink
Tdl 26656 transform state (#212)
Browse files Browse the repository at this point in the history
* translate state to stream name then repo

* update streams.py to use new state format

* udpate unittests

* test fixes so far

* fix bug in bookmarks test

* test updates

* fix start date

* fix event data generation for pagination test, update doc strings in sync

* comment cleanup

* update comment

* version bump and changelog update

* version bump

* remove reference to qcdi

* Add tests to translate multiple streams

* fix test to not access full table stream key in bookmark

---------

Co-authored-by: Scott Nakano <scott.nakano@qlik.com>
Co-authored-by: Andy Lu <andy.lu@qlik.com>
  • Loading branch information
3 people authored Jan 13, 2025
1 parent 6a683c4 commit 6aa5d3e
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 216 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

# 3.1.0
* Transform state to allow stream level resets [#212](https://github.com/singer-io/tap-github/pull/212)

# 3.0.1
* Remove URI format of `/payload/issue/labels/url` field from `events` stream [#205](https://github.com/singer-io/tap-github/pull/205)

Expand Down Expand Up @@ -184,4 +187,4 @@
* [#9](https://github.com/singer-io/tap-github/pull/9)

## 0.3.0
* Adds support for retrieving pull requests, assignees and collaborars [#8](https://github.com/singer-io/tap-github/pull/8)
* Adds support for retrieving pull requests, assignees and collaborars [#8](https://github.com/singer-io/tap-github/pull/8)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup, find_packages

setup(name='tap-github',
version='3.0.1',
version='3.1.0',
description='Singer.io tap for extracting data from the GitHub API',
author='Stitch',
url='http://singer.io',
Expand Down
20 changes: 10 additions & 10 deletions tap_github/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def get_bookmark(state, repo, stream_name, bookmark_key, start_date):
"""
Return bookmark value if available in the state otherwise return start date
"""
repo_stream_dict = bookmarks.get_bookmark(state, repo, stream_name)
repo_stream_dict = bookmarks.get_bookmark(state, stream_name, repo)
if repo_stream_dict:
return repo_stream_dict.get(bookmark_key)

Expand Down Expand Up @@ -119,7 +119,7 @@ def write_bookmarks(self, stream, selected_streams, bookmark_value, repo_path, s

# If the stream is selected, write the bookmark.
if stream in selected_streams:
singer.write_bookmark(state, repo_path, stream_obj.tap_stream_id, {"since": bookmark_value})
singer.write_bookmark(state, stream_obj.tap_stream_id, repo_path, {"since": bookmark_value})

# For the each child, write the bookmark if it is selected.
for child in stream_obj.children:
Expand Down Expand Up @@ -205,14 +205,14 @@ def add_fields_at_1st_level(self, record, parent_record = None):

class FullTableStream(Stream):
def sync_endpoint(self,
client,
state,
catalog,
repo_path,
start_date,
selected_stream_ids,
stream_to_sync
):
client,
state,
catalog,
repo_path,
start_date,
selected_stream_ids,
stream_to_sync
):
"""
A common function sync full table streams.
"""
Expand Down
69 changes: 37 additions & 32 deletions tap_github/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,9 @@ def get_ordered_repos(state, repositories):

def translate_state(state, catalog, repositories):
'''
This tap used to only support a single repository, in which case the
the state took the shape of:
{
"bookmarks": {
"commits": {
"since": "2018-11-14T13:21:20.700360Z"
}
}
}
The tap now supports multiple repos, so this function should be called
at the beginning of each run to ensure the state is translated to the
new format:
The tap supports multiple repositories. Previously, the state format
for bookmarks included stream keys nested under each repository, as
shown below:
{
"bookmarks": {
"singer-io/tap-adwords": {
Expand All @@ -91,41 +82,55 @@ def translate_state(state, catalog, repositories):
}
}
}
The stream keys must be the second key after bookmarks in order for
standardized table-level resets to function correctly. This function
should be called at the start of each run to ensure that the state
is properly converted to the new format:
{
"bookmarks": {
"commits" : {
"singer-io/tap-adwords": {
"since": "2018-11-14T13:21:20.700360Z"
},
"singer-io/tap-salesforce": {
"since": "2018-11-14T13:21:20.700360Z"
}
},
"issues" : {
"singer-io/tap-adwords": {
"since": "2018-11-14T13:21:20.700360Z"
},
"singer-io/tap-salesforce": {
"since": "2018-11-14T13:21:20.700360Z"
}
}
}
}
'''
nested_dict = lambda: collections.defaultdict(nested_dict)
new_state = nested_dict()

# Collect keys(repo_name for update state or stream_name for older state) from state available in the `bookmarks``
# Collect keys(stream_name for update state or repo_name for older state) from state available in the `bookmarks``
previous_state_keys = state.get('bookmarks', {}).keys()
# Collect stream names from the catalog
stream_names = [stream['tap_stream_id'] for stream in catalog['streams']]

for key in previous_state_keys:
# Loop through each key of `bookmarks` available in the previous state.

# Case 1:
# Older connections `bookmarks` contain stream names so check if it is the stream name or not.
# If the previous state's key is found in the stream name list then continue to check other keys. Because we want
# to migrate each stream's bookmark into the repo name as mentioned below:
# Example: {`bookmarks`: {`stream_a`: `bookmark_a`}} to {`bookmarks`: {`repo_a`: {`stream_a`: `bookmark_a`}}}

# Case 2:
# Check if the key is available in the list of currently selected repo's list or not. Newer format `bookmarks` contain repo names.
# Return the state if the previous state's key is not found in the repo name list or stream name list.

# If the state contains a bookmark for `repo_a` and `repo_b` and the user deselects these both repos and adds another repo
# then in that case this function was returning an empty state. Now this change will return the existing state instead of the empty state.
if key not in stream_names and key not in repositories:
# Return the existing state if all repos from the previous state are deselected(not found) in the current sync.
return state
for inner_key in state['bookmarks'][key].keys():
if inner_key not in stream_names and inner_key not in repositories:
# Return the existing state if all repos from the previous state are deselected(not found) in the current sync.
return state

for stream in catalog['streams']:
stream_name = stream['tap_stream_id']
for repo in repositories:
if bookmarks.get_bookmark(state, repo, stream_name):
if bookmarks.get_bookmark(state, stream_name, repo):
return state
if bookmarks.get_bookmark(state, stream_name, 'since'):
new_state['bookmarks'][repo][stream_name]['since'] = bookmarks.get_bookmark(state, stream_name, 'since')
if bookmarks.get_bookmark(state, repo, stream_name):
new_state['bookmarks'][stream_name][repo] = bookmarks.get_bookmark(state, repo, stream_name)

return new_state

Expand Down
1 change: 0 additions & 1 deletion tests/test_github_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
},
'issues': {
'body_text',
'closed_by',
'body_html'
},
'releases': {
Expand Down
36 changes: 19 additions & 17 deletions tests/test_github_bookmarks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import dateutil.parser
import pytz
import copy

from tap_tester import runner, menagerie, connections

Expand All @@ -27,21 +28,23 @@ def calculated_states_by_stream(self, current_state, synced_records, replication
timedelta_by_stream["commits"] = [7, 0, 0]

repo = self.get_properties().get('repository')
#stream_to_calculated_state = {repo: {stream: "" for stream in current_state['bookmarks'][repo].keys()}}
stream_to_calculated_state = copy.deepcopy(current_state)['bookmarks']

stream_to_calculated_state = {repo: {stream: "" for stream in current_state['bookmarks'][repo].keys()}}
for stream, state in current_state['bookmarks'][repo].items():
state_key, state_value = next(iter(state.keys())), next(iter(state.values()))
state_as_datetime = dateutil.parser.parse(state_value)
for stream in current_state['bookmarks'].keys():
for repo, state in current_state['bookmarks'][stream].items():
state_key, state_value = next(iter(state.keys())), next(iter(state.values()))
state_as_datetime = dateutil.parser.parse(state_value)

days, hours, minutes = timedelta_by_stream[stream]
days, hours, minutes = timedelta_by_stream[stream]

start_date_as_datetime = dateutil.parser.parse(start_date)
calculated_state_as_datetime = start_date_as_datetime + datetime.timedelta(days=days, hours=hours, minutes=minutes)
start_date_as_datetime = dateutil.parser.parse(start_date)
calculated_state_as_datetime = start_date_as_datetime + datetime.timedelta(days=days, hours=hours, minutes=minutes)

state_format = '%Y-%m-%dT%H:%M:%SZ'
calculated_state_formatted = datetime.datetime.strftime(calculated_state_as_datetime, state_format)
state_format = '%Y-%m-%dT%H:%M:%SZ'
calculated_state_formatted = datetime.datetime.strftime(calculated_state_as_datetime, state_format)

stream_to_calculated_state[repo][stream] = {state_key: calculated_state_formatted}
stream_to_calculated_state[stream][repo] = {state_key: calculated_state_formatted}

return stream_to_calculated_state

Expand Down Expand Up @@ -69,7 +72,6 @@ def test_run(self):
##########################################################################
### First Sync
##########################################################################

conn_id = connections.ensure_connection(self, original_properties=True)

# Run in check mode
Expand All @@ -94,8 +96,8 @@ def test_run(self):
first_sync_records,
expected_replication_keys,
first_sync_start_date)
for repo, new_state in simulated_states.items():
new_states['bookmarks'][repo] = new_state
for stream, new_state in simulated_states.items():
new_states['bookmarks'][stream] = new_state
menagerie.set_state(conn_id, new_states)

##########################################################################
Expand Down Expand Up @@ -125,20 +127,21 @@ def test_run(self):
second_sync_messages = [record.get('data') for record in
second_sync_records.get(stream, {'messages': []}).get('messages')
if record.get('action') == 'upsert']
first_bookmark_key_value = first_sync_bookmarks.get('bookmarks', {}).get(repo, {stream: None}).get(stream)
second_bookmark_key_value = second_sync_bookmarks.get('bookmarks', {}).get(repo, {stream: None}).get(stream)
first_bookmark_key_value = first_sync_bookmarks.get('bookmarks', {}).get(stream, {repo: None}).get(repo)
second_bookmark_key_value = second_sync_bookmarks.get('bookmarks', {}).get(stream, {repo: None}).get(repo)


if expected_replication_method == self.INCREMENTAL:
# Collect information specific to incremental streams from syncs 1 & 2
replication_key = next(iter(expected_replication_keys[stream]))

first_bookmark_value = first_bookmark_key_value.get('since')
second_bookmark_value = second_bookmark_key_value.get('since')

first_bookmark_value_ts = self.dt_to_ts(first_bookmark_value, self.BOOKMARK_FORMAT)
second_bookmark_value_ts = self.dt_to_ts(second_bookmark_value, self.BOOKMARK_FORMAT)

simulated_bookmark_value = self.dt_to_ts(new_states['bookmarks'][repo][stream]['since'], self.BOOKMARK_FORMAT)
simulated_bookmark_value = self.dt_to_ts(new_states['bookmarks'][stream][repo]['since'], self.BOOKMARK_FORMAT)

# Verify the first sync sets a bookmark of the expected form
self.assertIsNotNone(first_bookmark_key_value)
Expand All @@ -159,7 +162,6 @@ def test_run(self):
for record in first_sync_messages:
# Verify the first sync bookmark value is the max replication key value for a given stream
replication_key_value = self.dt_to_ts(record.get(replication_key), replication_key_format)

self.assertLessEqual(
replication_key_value, first_bookmark_value_ts,
msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced."
Expand Down
63 changes: 32 additions & 31 deletions tests/test_github_interrupted_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_run(self):
expected_replication_methods = self.expected_replication_method()
expected_replication_keys = self.expected_bookmark_keys()
repo_key = "_sdc_repository"

start_date = self.dt_to_ts(self.get_properties().get("start_date"), self.BOOKMARK_FORMAT)

# Run a discovery job
Expand All @@ -53,24 +53,26 @@ def test_run(self):
"currently_syncing": "pull_requests",
"currently_syncing_repo": "singer-io/test-repo",
"bookmarks": {
"singer-io/singer-python": {
"issues": {
"issues": {
"singer-io/singer-python": {
"since": "2022-06-22T13:32:42Z"
},
"pull_requests": {
"since": "2022-06-22T13:32:42Z"
"singer-io/test-repo": {
"since": "2022-07-13T09:21:19Z"
},
"issue_events": {
"since": "2022-06-22T13:32:42Z"
}
},
"singer-io/test-repo": {
"issues": {
"since": "2022-07-13T09:21:19Z"
"pull_requests": {
"singer-io/singer-python": {
"since": "2022-06-22T13:32:42Z"
},
"pull_requests": {
"singer-io/test-repo": {
"since": "2022-06-30T05:33:24Z"
}
},
},
"issue_events": {
"singer-io/singer-python": {
"since": "2022-06-22T13:32:42Z"
},
}
}
}
Expand Down Expand Up @@ -98,23 +100,18 @@ def test_run(self):
# (This is what the value would have been without an interruption and proves resuming succeeds)
self.assertDictEqual(final_state, full_sync_state)

for repository in self.get_properties().get("repository").split():
with self.subTest(repository=repository):

full_sync_bookmark = full_sync_state["bookmarks"][repository]
final_bookmark = final_state["bookmarks"][repository]
interrupted_repo_bookmark = interrupted_state["bookmarks"][repository]

for stream in streams_to_test:
with self.subTest(stream=stream):

for stream in streams_to_test:
with self.subTest(stream=stream):
for repository in self.get_properties().get("repository").split():
with self.subTest(repository=repository):

# Expected values
expected_replication_method = expected_replication_methods[stream]
expected_primary_keys = list(self.expected_primary_keys()[stream])

# Gather results
full_records = [message['data'] for message in
full_sync_records.get(stream, {}).get('messages', [])
full_sync_records.get(stream, {}).get('messages', [])
if message['data'][repo_key] == repository]
full_record_count = len(full_records)

Expand All @@ -124,11 +121,15 @@ def test_run(self):
interrupted_record_count = len(interrupted_records)

if expected_replication_method == self.INCREMENTAL:
full_sync_bookmark = full_sync_state["bookmarks"][stream]
final_bookmark = final_state["bookmarks"][stream]
interrupted_repo_bookmark = interrupted_state["bookmarks"][stream]

expected_replication_key = next(iter(expected_replication_keys[stream]))
if stream in interrupted_repo_bookmark.keys():
interrupted_bookmark = self.dt_to_ts(interrupted_repo_bookmark[stream]["since"], self.BOOKMARK_FORMAT)

if repository in interrupted_repo_bookmark.keys():
interrupted_bookmark = self.dt_to_ts(interrupted_repo_bookmark[repository]["since"], self.BOOKMARK_FORMAT)

if stream == interrupted_state['currently_syncing'] and repository == interrupted_state['currently_syncing_repo']:

for record in interrupted_records:
Expand All @@ -147,7 +148,7 @@ def test_run(self):

if (rec_time >= interrupted_bookmark):
full_records_after_interrupted_bookmark += 1

self.assertEqual(full_records_after_interrupted_bookmark, len(interrupted_records), \
msg="Expected {} records in each sync".format(full_records_after_interrupted_bookmark))
else:
Expand All @@ -163,8 +164,8 @@ def test_run(self):
self.assertIn(record, interrupted_records, msg='Record missing from resuming sync.' )
else:
# Verify full table streams do not save bookmarked values at the conclusion of a successful sync
self.assertNotIn(stream, full_sync_bookmark.keys())
self.assertNotIn(stream, final_bookmark.keys())
self.assertNotIn(stream, full_sync_state["bookmarks"].keys())
self.assertNotIn(stream, final_state["bookmarks"].keys())

# Verify first and second sync have the same records
self.assertEqual(full_record_count, interrupted_record_count)
Expand Down
Loading

0 comments on commit 6aa5d3e

Please sign in to comment.