|
| 1 | +import shutil |
| 2 | +import pandas as pd |
| 3 | +import logging |
| 4 | + |
| 5 | +from tqdm import tqdm |
| 6 | +from concurrent import futures |
| 7 | +from datetime import datetime, date, time |
| 8 | +from dateutil.relativedelta import relativedelta, FR |
| 9 | + |
| 10 | +logger = logging.getLogger(__name__) |
| 11 | + |
| 12 | +AWS_BASE_URL='https://numerai-signals-public-data.s3-us-west-2.amazonaws.com' |
| 13 | +SIGNALS_UNIVERSE=f'{AWS_BASE_URL}/latest_universe.csv' |
| 14 | +SIGNALS_TICKER_MAP=f'{AWS_BASE_URL}/signals_ticker_map_w_bbg.csv' |
| 15 | +SIGNALS_TARGETS=f'{AWS_BASE_URL}/signals_train_val_bbg.csv' |
| 16 | + |
| 17 | + |
| 18 | +def get_tickers(): |
| 19 | + ticker_map = pd.read_csv(SIGNALS_TICKER_MAP) |
| 20 | + ticker_map = ticker_map.dropna(subset=['yahoo']) |
| 21 | + logger.info(f'Number of eligible tickers: {ticker_map.shape[0]}') |
| 22 | + |
| 23 | + if ticker_map['yahoo'].duplicated().any(): |
| 24 | + raise Exception( |
| 25 | + f'Found duplicated {ticker_map["yahoo"].duplicated().values().sum()}' |
| 26 | + ' yahoo tickers' |
| 27 | + ) |
| 28 | + |
| 29 | + if ticker_map['bloomberg_ticker'].duplicated().any(): |
| 30 | + raise Exception( |
| 31 | + f'Found duplicated {ticker_map["bloomberg_ticker"].duplicated().values().sum()}' |
| 32 | + ' bloomberg_ticker tickers' |
| 33 | + ) |
| 34 | + |
| 35 | + return ticker_map |
| 36 | + |
| 37 | + |
| 38 | +def get_ticker_data(db_dir): |
| 39 | + ticker_data = pd.DataFrame({ |
| 40 | + 'bloomberg_ticker' : pd.Series([], dtype='str'), |
| 41 | + 'date' : pd.Series([], dtype='datetime64[ns]') |
| 42 | + }) |
| 43 | + if len(list(db_dir.rglob('*.parquet'))) > 0: |
| 44 | + ticker_data = pd.read_parquet(db_dir) |
| 45 | + |
| 46 | + logger.info(f'Retrieving data for {ticker_data.bloomberg_ticker.unique().shape[0]} ' |
| 47 | + 'tickers from the database') |
| 48 | + |
| 49 | + return ticker_data |
| 50 | + |
| 51 | + |
| 52 | +def get_ticker_missing( |
| 53 | + ticker_data, ticker_map, last_friday = datetime.today() - relativedelta(weekday=FR(-1)) |
| 54 | +): |
| 55 | + tickers_available_data = ticker_data.groupby('bloomberg_ticker').agg({'date': [max, min]}) |
| 56 | + tickers_available_data.columns = ['date_max', 'date_min'] |
| 57 | + |
| 58 | + eligible_tickers_available_data = ticker_map.merge( |
| 59 | + tickers_available_data.reset_index(), |
| 60 | + on='bloomberg_ticker', |
| 61 | + how='left' |
| 62 | + ) |
| 63 | + |
| 64 | + ticker_not_found = eligible_tickers_available_data.loc[ |
| 65 | + eligible_tickers_available_data.date_max.isna(), ['bloomberg_ticker', 'yahoo'] |
| 66 | + ] |
| 67 | + |
| 68 | + ticker_not_found['start'] = '2002-12-01' |
| 69 | + |
| 70 | + last_friday_52 = last_friday - relativedelta(weeks=52) |
| 71 | + tickers_outdated = eligible_tickers_available_data.loc[ |
| 72 | + ( |
| 73 | + (eligible_tickers_available_data.date_max < last_friday.strftime('%Y-%m-%d')) & |
| 74 | + (eligible_tickers_available_data.date_max > last_friday_52.strftime('%Y-%m-%d')) |
| 75 | + ), |
| 76 | + ['bloomberg_ticker', 'yahoo', 'date_max'] |
| 77 | + ] |
| 78 | + |
| 79 | + tickers_outdated['start'] = ( |
| 80 | + tickers_outdated['date_max'] + pd.DateOffset(1) |
| 81 | + ).dt.strftime('%Y-%m-%d') |
| 82 | + tickers_outdated.drop(columns=['date_max'], inplace=True) |
| 83 | + |
| 84 | + return pd.concat( |
| 85 | + [ticker_not_found, tickers_outdated] |
| 86 | + ) |
| 87 | + |
| 88 | + |
| 89 | +def get_data( |
| 90 | + db_dir, |
| 91 | + features_generators = [], |
| 92 | + last_friday = datetime.today() - relativedelta(weekday=FR(-1)), |
| 93 | + target='target' |
| 94 | +): |
| 95 | + ticker_data = get_ticker_data(db_dir) |
| 96 | + |
| 97 | + ticker_universe = pd.read_csv(SIGNALS_UNIVERSE) |
| 98 | + ticker_data = ticker_data[ticker_data.bloomberg_ticker.isin(ticker_universe['bloomberg_ticker'])] |
| 99 | + |
| 100 | + targets = pd.read_csv(SIGNALS_TARGETS) |
| 101 | + targets['date'] = pd.to_datetime( |
| 102 | + targets['friday_date'], |
| 103 | + format='%Y%m%d' |
| 104 | + ) |
| 105 | + targets['target_6d'] = targets['target'] |
| 106 | + targets['target'] = targets[target] |
| 107 | + |
| 108 | + feature_names = [] |
| 109 | + for features_generator in features_generators: |
| 110 | + ticker_data, feature_names_aux = features_generator.generate_features(ticker_data) |
| 111 | + feature_names.extend(feature_names_aux) |
| 112 | + |
| 113 | + # merge our feature data with Numerai targets |
| 114 | + ml_data = pd.merge( |
| 115 | + ticker_data, targets, |
| 116 | + on=['date', 'bloomberg_ticker'], |
| 117 | + how='left' |
| 118 | + ) |
| 119 | + |
| 120 | + logger.info(f'Found {ml_data.target.isna().sum()} rows without target, filling with 0.5') |
| 121 | + ml_data['target'] = ml_data['target'].fillna(0.5) |
| 122 | + |
| 123 | + # convert date to datetime and index on it |
| 124 | + ml_data = ml_data.set_index('date') |
| 125 | + |
| 126 | + # for training and testing we want clean, complete data only |
| 127 | + ml_data = ml_data.dropna(subset=feature_names) |
| 128 | + # ensure we have only fridays |
| 129 | + ml_data = ml_data[ml_data.index.weekday == 4] |
| 130 | + # drop eras with under 50 observations per era |
| 131 | + ml_data = ml_data[ml_data.index.value_counts() > 50] |
| 132 | + |
| 133 | + # train test split |
| 134 | + train_data = ml_data[ml_data['data_type'] == 'train'] |
| 135 | + test_data = ml_data[ml_data['data_type'] == 'validation'] |
| 136 | + |
| 137 | + # generate live data |
| 138 | + date_string = last_friday.strftime('%Y-%m-%d') |
| 139 | + live_data = ticker_data[ticker_data.date == date_string].copy() |
| 140 | + |
| 141 | + # get data from the day before, for markets that were closed |
| 142 | + last_thursday = last_friday - relativedelta(days=1) |
| 143 | + thursday_date_string = last_thursday.strftime('%Y-%m-%d') |
| 144 | + thursday_data = ticker_data[ticker_data.date == thursday_date_string] |
| 145 | + |
| 146 | + # Only select tickers than aren't already present in live_data |
| 147 | + thursday_data = thursday_data[ |
| 148 | + ~thursday_data.bloomberg_ticker.isin(live_data.bloomberg_ticker.values) |
| 149 | + ].copy() |
| 150 | + |
| 151 | + live_data = pd.concat([live_data, thursday_data]) |
| 152 | + live_data = live_data.set_index('date') |
| 153 | + |
| 154 | + return train_data, test_data, live_data, feature_names |
| 155 | + |
| 156 | + |
| 157 | +def download_tickers(download_data_fn, tickers, start): |
| 158 | + start_epoch = int(datetime.strptime(start, '%Y-%m-%d').timestamp()) |
| 159 | + end_epoch = int(datetime.combine(date.today(), time()).timestamp()) |
| 160 | + |
| 161 | + pbar = tqdm( |
| 162 | + total=len(tickers), |
| 163 | + unit='tickers' |
| 164 | + ) |
| 165 | + |
| 166 | + dfs = {} |
| 167 | + with futures.ThreadPoolExecutor() as executor: |
| 168 | + _futures = [] |
| 169 | + for ticker in tickers: |
| 170 | + _futures.append( |
| 171 | + executor.submit(download_data_fn, ticker=ticker, start_epoch=start_epoch, end_epoch=end_epoch) |
| 172 | + ) |
| 173 | + |
| 174 | + for future in futures.as_completed(_futures): |
| 175 | + pbar.update(1) |
| 176 | + ticker, data = future.result() |
| 177 | + dfs[ticker] = data |
| 178 | + |
| 179 | + pbar.close() |
| 180 | + |
| 181 | + return pd.concat(dfs) |
| 182 | + |
| 183 | + |
| 184 | +def download_data(db_dir, download_data_fn, recreate = False): |
| 185 | + if recreate: |
| 186 | + logging.warn(f'Removing dataset {db_dir} to recreate it') |
| 187 | + shutil.rmtree(db_dir, ignore_errors=True) |
| 188 | + |
| 189 | + db_dir.mkdir(exist_ok=True) |
| 190 | + |
| 191 | + ticker_data = get_ticker_data(db_dir) |
| 192 | + ticker_map = get_tickers() |
| 193 | + ticker_missing = get_ticker_missing(ticker_data, ticker_map) |
| 194 | + |
| 195 | + n_ticker_missing = ticker_missing.shape[0] |
| 196 | + if n_ticker_missing <= 0: |
| 197 | + logger.info(f'Dataset up to date') |
| 198 | + return |
| 199 | + |
| 200 | + logger.info(f'Downloading missing data for {n_ticker_missing} tickers') |
| 201 | + |
| 202 | + ticker_missing_grouped = ticker_missing.groupby('start').apply( |
| 203 | + lambda x: ' '.join(x.yahoo.astype(str)) |
| 204 | + ) |
| 205 | + concat_dfs = [] |
| 206 | + for start_date, tickers in ticker_missing_grouped.iteritems(): |
| 207 | + temp_df = download_tickers(download_data_fn, tickers.split(' '), start=start_date) |
| 208 | + |
| 209 | + # Yahoo Finance returning previous day in some situations (e.g. Friday in TelAviv markets) |
| 210 | + temp_df = temp_df[temp_df.date >= start_date] |
| 211 | + if temp_df.empty: |
| 212 | + continue |
| 213 | + |
| 214 | + temp_df['created_at'] = datetime.now() |
| 215 | + temp_df['volume'] = temp_df['volume'].astype('float64') |
| 216 | + temp_df['bloomberg_ticker'] = temp_df['bloomberg_ticker'].map( |
| 217 | + dict(zip(ticker_map['yahoo'], ticker_map['bloomberg_ticker']))) |
| 218 | + |
| 219 | + concat_dfs.append(temp_df) |
| 220 | + |
| 221 | + if len(concat_dfs) == 0: |
| 222 | + logger.info(f'Dataset up to date') |
| 223 | + return |
| 224 | + |
| 225 | + df = pd.concat(concat_dfs) |
| 226 | + n_ticker_data = df.bloomberg_ticker.unique().shape[0] |
| 227 | + if n_ticker_data <= 0: |
| 228 | + logger.info(f'Dataset up to date') |
| 229 | + return |
| 230 | + |
| 231 | + logger.info(f'Storing data for {n_ticker_data} tickers') |
| 232 | + df.to_parquet(db_dir / f'{datetime.utcnow().timestamp()}.parquet', index=False) |
0 commit comments