Skip to content
This repository has been archived by the owner on Aug 31, 2023. It is now read-only.

Commit

Permalink
fix for #31 - partitioned tables are now properly streamed
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-kolda committed Mar 26, 2018
1 parent 3279e13 commit 228f8f0
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 12 deletions.
26 changes: 20 additions & 6 deletions gcp_census/bigquery/transformers/table_metadata_v0_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,28 @@ def transform(self):
'numBytes': table['numBytes'],
'numLongTermBytes': table['numLongTermBytes'],
'numRows': table['numRows'],
'timePartitioning': table[
'timePartitioning'] if 'timePartitioning'
in table else None,
'timePartitioning': self.__copy_time_partitioning(table),
'type': table['type'],
'json': json.dumps(table),
'description': table['description']
if 'description' in table else None,
'labels': [{'key': key, 'value': value} for key, value
in table['labels'].iteritems()]
if 'labels' in table else []
'labels': self.__copy_labels(table)
}

@staticmethod
def __copy_labels(table):
if 'labels' in table:
return [{'key': key, 'value': value} for key, value
in table['labels'].iteritems()]
else:
return []

@staticmethod
def __copy_time_partitioning(table):
if 'timePartitioning' in table:
tp_dict = table['timePartitioning']
return {filtered_key: tp_dict[filtered_key] for
filtered_key in ['expirationMs', 'type'] if
filtered_key in tp_dict}
else:
return None
26 changes: 20 additions & 6 deletions gcp_census/bigquery/transformers/table_metadata_v1_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,28 @@ def transform(self):
'numBytes': table['numBytes'],
'numLongTermBytes': table['numLongTermBytes'],
'numRows': table['numRows'],
'timePartitioning': table[
'timePartitioning'] if 'timePartitioning'
in table else None,
'timePartitioning': self.__copy_time_partitioning(table),
'type': table['type'],
'json': json.dumps(table),
'description': table['description']
if 'description' in table else None,
'labels': [{'key': key, 'value': value} for key, value
in table['labels'].iteritems()]
if 'labels' in table else []
'labels': self.__copy_labels(table)
}

@staticmethod
def __copy_labels(table):
if 'labels' in table:
return [{'key': key, 'value': value} for key, value
in table['labels'].iteritems()]
else:
return []

@staticmethod
def __copy_time_partitioning(table):
if 'timePartitioning' in table:
tp_dict = table['timePartitioning']
return {filtered_key: tp_dict[filtered_key] for
filtered_key in ['expirationMs', 'type'] if
filtered_key in tp_dict}
else:
return None
33 changes: 33 additions & 0 deletions tests/bigquery_transformer_partition_metadata_v1_0_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import unittest

import test_utils

from gcp_census.bigquery.bigquery_table_metadata import BigQueryTableMetadata
from gcp_census.bigquery.transformers.partition_metadata_v1_0 import \
PartitionMetadataV1_0
Expand Down Expand Up @@ -33,3 +35,34 @@ def test_transforming_table_without_labels(self):
# then
self.assertEqual('account_1_0_0', data['tableId'])
self.assertEqual('20150603', data['partitionId'])

def test_should_ignore_timepartitioning_field(self):
# given
table = test_utils.create_minimal_table_dict()
table['timePartitioning'] = {
'type': 'DAY',
'expirationMs': '259200000',
'field': 'transaction_date'
}

# when
data = PartitionMetadataV1_0(BigQueryTableMetadata(table)).transform()

# then
self.assertEqual('DAY', data['timePartitioning']['type'])
self.assertEqual('259200000', data['timePartitioning']['expirationMs'])
self.assertFalse('field' in data['timePartitioning'])

def test_should_parse_timepartitioning_without_expiration_ms(self):
# given
table = test_utils.create_minimal_table_dict()
table['timePartitioning'] = {
'type': 'DAY',
}

# when
data = PartitionMetadataV1_0(BigQueryTableMetadata(table)).transform()

# then
self.assertEqual('DAY', data['timePartitioning']['type'])
self.assertFalse('expirationMs' in data['timePartitioning'])
33 changes: 33 additions & 0 deletions tests/bigquery_transformer_table_metadata_v0_1_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import unittest

import test_utils

from gcp_census.bigquery.bigquery_table_metadata import BigQueryTableMetadata
from gcp_census.bigquery.transformers.table_metadata_v0_1 import \
TableMetadataV0_1
Expand Down Expand Up @@ -67,3 +69,34 @@ def test_transforming_table_with_labels(self):
self.assertEqual(3, len(data['labels']))
self.assertEqual(data['labels'][0]['key'], 'key3')
self.assertEqual(data['labels'][0]['value'], 'value3')

def test_should_ignore_timepartitioning_field(self):
# given
table = test_utils.create_minimal_table_dict()
table['timePartitioning'] = {
'type': 'DAY',
'expirationMs': '259200000',
'field': 'transaction_date'
}

# when
data = TableMetadataV0_1(BigQueryTableMetadata(table)).transform()

# then
self.assertEqual('DAY', data['timePartitioning']['type'])
self.assertEqual('259200000', data['timePartitioning']['expirationMs'])
self.assertFalse('field' in data['timePartitioning'])

def test_should_parse_timepartitioning_without_expiration_ms(self):
# given
table = test_utils.create_minimal_table_dict()
table['timePartitioning'] = {
'type': 'DAY',
}

# when
data = TableMetadataV0_1(BigQueryTableMetadata(table)).transform()

# then
self.assertEqual('DAY', data['timePartitioning']['type'])
self.assertFalse('expirationMs' in data['timePartitioning'])
33 changes: 33 additions & 0 deletions tests/bigquery_transformer_table_metadata_v1_0_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import unittest

import test_utils

from gcp_census.bigquery.bigquery_table_metadata import BigQueryTableMetadata
from gcp_census.bigquery.transformers.table_metadata_v1_0 import \
TableMetadataV1_0
Expand Down Expand Up @@ -33,3 +35,34 @@ def test_transforming_table_without_labels(self):
# then
self.assertEqual('account_1_0_0_20150603', data['tableId'])
self.assertEqual(0, len(data['labels']))

def test_should_ignore_timepartitioning_field(self):
# given
table = test_utils.create_minimal_table_dict()
table['timePartitioning'] = {
'type': 'DAY',
'expirationMs': '259200000',
'field': 'transaction_date'
}

# when
data = TableMetadataV1_0(BigQueryTableMetadata(table)).transform()

# then
self.assertEqual('DAY', data['timePartitioning']['type'])
self.assertEqual('259200000', data['timePartitioning']['expirationMs'])
self.assertFalse('field' in data['timePartitioning'])

def test_should_parse_timepartitioning_without_expiration_ms(self):
# given
table = test_utils.create_minimal_table_dict()
table['timePartitioning'] = {
'type': 'DAY',
}

# when
data = TableMetadataV1_0(BigQueryTableMetadata(table)).transform()

# then
self.assertEqual('DAY', data['timePartitioning']['type'])
self.assertFalse('expirationMs' in data['timePartitioning'])
28 changes: 28 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,31 @@ def content(filename):
def get_body_from_http_request(call):
payload = call[1][2]
return json.loads(payload) if payload else None


def create_minimal_table_dict():
return {
'tableReference': {
'projectId': 'dev-manager',
'datasetId': 'crm_raw',
'tableId': 'account_1_0_0$20150603'
},
'schema': {
'fields': [
{
'name': 'transaction_id',
'type': 'INTEGER'
},
{
'name': 'transaction_date',
'type': 'DATE'
}
]
},
'numBytes': '421940',
'numLongTermBytes': '421940',
'numRows': '1445',
'creationTime': '1468414660572',
'lastModifiedTime': '1468414660572',
'type': 'TABLE',
}

0 comments on commit 228f8f0

Please sign in to comment.