zipline icon indicating copy to clipboard operation
zipline copied to clipboard

After successfully ingesting local CSV data, Zipline's run_algorithm execution threw: "Value Error: At least one valid asset id is required."

Open jrfeibelman opened this issue 4 years ago • 4 comments

Dear Zipline Maintainers,

I am attempting to ingest my own csv stock data that I have already retrieved and formatted properly. I followed the tutorial that was posted here, making only some minor changes.

I was able to ingest the data successfully, but upon calling Zipline's run_algorithm method, it crashed with a Value Error as described below.

Environment

  • Operating System: OSX 10.14
  • Python Version: 3.5.10
  • Python Bitness: 64
  • How did you install Zipline: pip
  • Python packages: alembic==1.4.3 appnope==0.1.2 argon2-cffi==20.1.0 attrs==20.3.0 backcall==0.2.0 bcolz==1.2.1 bleach==3.3.0 Bottleneck==1.3.2 certifi==2020.12.5 cffi==1.14.5 chardet==4.0.0 click==7.1.2 cycler==0.10.0 Cython==0.29.22 decorator==4.4.2 defusedxml==0.7.1 empyrical==0.5.5 entrypoints==0.3 h5py==2.10.0 idna==2.10 importlib-metadata==2.1.1 intervaltree==3.1.0 ipykernel==5.5.0 ipython==7.9.0 ipython-genutils==0.2.0 ipywidgets==7.6.3 iso3166==1.0.1 iso4217==1.6.20180829 jedi==0.17.2 Jinja2==2.11.3 jsonschema==3.2.0 jupyter==1.0.0 jupyter-client==6.1.11 jupyter-console==6.1.0 jupyter-core==4.6.3 kiwisolver==1.1.0 Logbook==1.5.3 lru-dict==1.1.7 lxml==4.6.2 Mako==1.1.4 MarkupSafe==1.1.1 matplotlib==3.0.3 mistune==0.8.4 multipledispatch==0.6.0 nbconvert==5.6.1 nbformat==5.1.2 networkx==1.11 notebook==6.2.0 numexpr==2.7.3 numpy==1.18.5 packaging==20.9 pandas==0.22.0 pandas-datareader==0.8.1 pandocfilters==1.4.3 parso==0.7.1 patsy==0.5.1 pexpect==4.8.0 pickleshare==0.7.5 prometheus-client==0.9.0 prompt-toolkit==2.0.10 ptyprocess==0.7.0 pycparser==2.20 Pygments==2.8.1 pyparsing==2.4.7 pyrsistent==0.17.3 python-dateutil==2.8.1 python-editor==1.0.4 python-interface==1.6.0 pytz==2021.1 pyzmq==20.0.0 qtconsole==5.0.0 QtPy==1.9.0 requests==2.25.1 scipy==1.4.1 Send2Trash==1.5.0 six==1.15.0 sortedcontainers==2.3.0 SQLAlchemy==1.3.23 statsmodels==0.11.1 tables==3.6.1 terminado==0.8.3 testpath==0.4.4 toolz==0.11.1 tornado==6.1 trading-calendars==2.1.1 traitlets==4.3.3 urllib3==1.26.3 wcwidth==0.2.5 webencodings==0.5.1 widgetsnbextension==3.5.1 zipline==1.4.1 zipp==1.2.0

My Code

Note: I am creating and ingesting bundle 'sp500_data', which only includes 3 NASDAQ stocks right now just for debugging purposes (AMZN, TSLA, GOOG).

extension.py

import pandas as pd
from zipline.data.bundles import register, sp500_stock_data

start_session = pd.Timestamp('2019 06 03', tz='utc')
end_session = pd.Timestamp('2021 03 16', tz='utc')
register(
 'sp500_data',
 sp500_stock_data.sp500_data,
 calendar_name='NYSE',
 start_session=start_session,
 end_session=end_session
)

sp500_stock_data.py

import pandas as pd
import os
from trading_calendars import get_calendar

path = os.path.dirname(os.getcwd()) + "/Desktop/stockalyzer/data/SP500"

def sp500_data(environ,asset_db_writer,minute_bar_writer,daily_bar_writer,adjustment_writer,calendar,start_session,end_session,cache,show_progress,output_dir):
    # Function to ingest S&P500 data into zipline

    symbols = [f[:-4] for f in os.listdir(path) if f[0] != '.']
    if not symbols:
        raise ValueError("No symbols found in folder.")
    
    # Prepare an empty DataFrame for dividends
    divs = pd.DataFrame(columns=['sid','amount','ex_date','record_date','declared_date','pay_date'])
    
    # Prepare an empty DataFrame for splits
    splits = pd.DataFrame(columns=['sid','ratio','effective_date'])
    
    # Prepare an empty DataFrame for metadata
    metadata = pd.DataFrame(columns=('start_date','end_date','auto_close_date','symbol','exchange'))
    
    # Check valid trading dates, according to the selected exchange calendar
    sessions = calendar.sessions_in_range(start_session, end_session)
    
    # Get data for all stocks and write to Zipline
    daily_bar_writer.write(process_stocks(symbols, sessions, metadata, divs))
    
    # Write the metadata
    asset_db_writer.write(equities=metadata)
    
    # Write splits and dividends
    adjustment_writer.write(splits=splits,dividends=divs)
    
def process_stocks(symbols, sessions, metadata, divs):
    # Generator function to iterate stocks & build historical data, metadata, & dividend data
    for sid, symbol in enumerate(symbols): # ERROR?
        print('Loading {}...'.format(symbol))
   
        # Read the stock data from csv file.
        df = pd.read_csv('{}/{}.csv'.format(path, symbol), index_col=[0], parse_dates=[0])
        # filter data for valide trading calendar dates
        # print(len(df))
        # df = df[df.index.isin(sessions)] # ERROR: Results in dataframe of size 0
        # print(len(df))
        start_date = df.index[0]
        end_date = df.index[-1]
        # Synch to the official exchange calendar
        # df = df.reindex(sessions.tz_localize(None))[start_date:end_date] # ERROR: Results ?????
        # Forward fill missing data
        df.fillna(method='ffill', inplace=True)

        # Drop remaining NaN
        df.dropna(inplace=True) 

        # The auto_close date is the day after the last trade.
        ac_date = end_date + pd.Timedelta(days=1)

        # Add a row to the metadata DataFrame. Don’t forget to add an exchange field.
        metadata.loc[sid] = start_date, end_date, ac_date, symbol, 'NYSE'
        # If there’s dividend data, add that to the dividend DataFrame
        if 'dividend' in df.columns:
            # Slice off the days with dividends
            tmp = df[df['dividend'] != 0.0]['dividend']
            div = pd.DataFrame(data=tmp.index.tolist(), columns=['ex_date'])

            # Provide empty columns as we don’t have this data for now
            div['record_date'] = pd.NaT
            div['declared_date'] = pd.NaT
            div['pay_date'] = pd.NaT 

            # Store the dividends and set the Security ID
            div['amount'] = tmp.tolist()
            div['sid'] = sid

            # Start numbering at where we left off last time
            ind = pd.Index(range(divs.shape[0], divs.shape[0] + div.shape[0]))
            div.set_index(ind, inplace=True)

            # Append this stock’s dividends to the list of all dividends
            divs = divs.append(div) 
        
    yield sid, df

main.py

%load_ext zipline
%matplotlib inline

import zipline as zp
import pandas as pd
from datetime import datetime
import sys
import os
from zipline import run_algorithm
from zipline.api import order_target_percent, order_target, symbol, sid
from datetime import datetime
import pytz
import matplotlib.pyplot as plt
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

tickers = ["AMZN", "TSLA", "GOOG"]

def initialize(context):
    context.stocks = [symbol(t) for t in tickers]
    context.i = 0
    
def handle_data(context, data):
    # Skip first 100 days to get full windows
    context.i += 1
    if context.i < 100:
        return
    # Compute averages
    # data.history() has to be called with the same params
    # from above and returns a pandas dataframe.
    
    for stock in context.stocks:
        short_mavg = data.history(stock, 'price', bar_count=50, frequency="1d").mean()
        long_mavg = data.history(stock, 'price', bar_count=100, frequency="1d").mean()
        # Trading logic
        if short_mavg > long_mavg:
            # order_target orders as many shares as needed to
            # achieve the desired number of shares.
            order_target(stock, 10)
        elif short_mavg < long_mavg:
            order_target(stock, 0)
    
def analyze(context):
    fig = plt.figure(figsize=(12, 8))
    
    # First chart
    ax = fig.add_subplot(311)
    ax.set_title('Strategy Results')
    ax.plot(perf['portfolio_value'], linestyle='-', 
                label='Equity Curve', linewidth=1.0)
    ax.legend()
    ax.grid(False)
    
    # Second chart
    ax = fig.add_subplot(312)
    ax.plot(perf['gross_leverage'], 
            label='Exposure', linestyle='-', linewidth=1.0)
    ax.legend()
    ax.grid(True)
    # Third chart
    ax = fig.add_subplot(313)
    ax.plot(perf['returns'], label='Returns', linestyle='-.', linewidth=1.0)
    ax.legend()
    ax.grid(True)
    plt.savefig('strategy',dpi=400)
    
# Fire off the backtest
results = run_algorithm(
    start=pd.Timestamp(sm.start_date, tz=pytz.utc), 
    end=pd.Timestamp(sm.end_date, tz=pytz.utc), 
    initialize=initialize, 
    analyze=analyze,
    handle_data=handle_data, 
    capital_base=10000, 
    data_frequency = 'daily', 
    bundle='sp500_data' 
)

Files

I have 3 csv files: AMZN.csv, GOOG.csv, and TSLA.csv, which are saved in the same path used by sp500_stock_data.py (".../Desktop/stockalyzer/data/SP500"). Here are the first few lines of AMZN.csv:

date,high,low,open,close,volume 2019-06-03 05:00:00,1766.29,1672.0,1760.01,1692.69,9098708 2019-06-04 05:00:00,1730.82,1680.89,1699.24,1729.56,5679121 2019-06-05 05:00:00,1752.0,1715.2514,1749.6,1738.5,4239782

Data ingesting

I ran zipline ingest -b sp500_data in terminal, which ran successfully. I verified this work by checking the output of zipline bundles.

Description of Issue

The last line of code in main.py that runs the zipline algo throws a "ValueError: At least one valid asset id is required." I was expecting my code to execute and for me to be able to view backtesting results, but instead it crashed with the error message. The whole error is shown below:

ValueError Traceback (most recent call last) in ----> 1 perf = zp.run_algorithm(start=pd.Timestamp(sm.start_date, tz=pytz.utc), end=pd.Timestamp(sm.end_date, tz=pytz.utc), initialize=initialize, capital_base=100000, handle_data=handle_data, bundle='sp500_data')

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/utils/run_algo.py in run_algorithm(start, end, initialize, capital_base, handle_data, before_trading_start, analyze, data_frequency, bundle, bundle_timestamp, trading_calendar, metrics_set, benchmark_returns, default_extension, extensions, strict_extensions, environ, blotter) 407 environ=environ, 408 blotter=blotter, --> 409 benchmark_spec=benchmark_spec, 410 ) 411

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/utils/run_algo.py in _run(handle_data, initialize, before_trading_start, analyze, algofile, algotext, defines, data_frequency, capital_base, bundle, bundle_timestamp, start, end, output, trading_calendar, print_algo, metrics_set, local_namespace, environ, blotter, benchmark_spec) 214 } if algotext is None else { 215 'algo_filename': getattr(algofile, 'name', ''), --> 216 'script': algotext, 217 } 218 ).run()

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/algorithm.py in run(self, data_portal) 641 try: 642 perfs = [] --> 643 for perf in self.get_generator(): 644 perfs.append(perf) 645

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/gens/tradesimulation.py in transform(self) 203 for dt, action in self.clock: 204 if action == BAR: --> 205 for capital_change_packet in every_bar(dt): 206 yield capital_change_packet 207 elif action == SESSION_START:

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/gens/tradesimulation.py in every_bar(dt_to_use, current_data, handle_data) 131 metrics_tracker.process_commission(commission) 132 --> 133 handle_data(algo, current_data, dt_to_use) 134 135 # grab any new orders from the blotter, then clear the list.

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/utils/events.py in handle_data(self, context, data, dt) 216 context, 217 data, --> 218 dt, 219 ) 220

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/utils/events.py in handle_data(self, context, data, dt) 235 """ 236 if self.rule.should_trigger(dt): --> 237 self.callback(context, data) 238 239

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/algorithm.py in handle_data(self, data) 452 def handle_data(self, data): 453 if self._handle_data: --> 454 self._handle_data(self, data) 455 456 def analyze(self, perf):

in handle_data(context, data) 16 17 for stock in context.stocks: ---> 18 short_mavg = data.history(stock, 'price', bar_count=50, frequency="1d").mean() 19 long_mavg = data.history(stock, 'price', bar_count=100, frequency="1d").mean() 20 # Trading logic

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/_protocol.pyx in zipline._protocol.check_parameters.call.assert_keywords_and_call()

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/_protocol.pyx in zipline._protocol.BarData.history()

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/data_portal.py in get_history_window(self, assets, end_dt, bar_count, frequency, field, data_frequency, ffill) 962 if field == "price": 963 df = self._get_history_daily_window(assets, end_dt, bar_count, --> 964 "close", data_frequency) 965 else: 966 df = self._get_history_daily_window(assets, end_dt, bar_count,

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/data_portal.py in _get_history_daily_window(self, assets, end_dt, bar_count, field_to_use, data_frequency) 804 805 data = self._get_history_daily_window_data( --> 806 assets, days_for_window, end_dt, field_to_use, data_frequency 807 ) 808 return pd.DataFrame(

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/data_portal.py in _get_history_daily_window_data(self, assets, days_for_window, end_dt, field_to_use, data_frequency) 827 field_to_use, 828 days_for_window, --> 829 extra_slot=False 830 ) 831 else:

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/data_portal.py in _get_daily_window_data(self, assets, field, days_in_window, extra_slot) 1115 days_in_window, 1116 field, -> 1117 extra_slot) 1118 if extra_slot: 1119 return_array[:len(return_array) - 1, :] = data

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/history_loader.py in history(self, assets, dts, field, is_perspective_after) 547 dts, 548 field, --> 549 is_perspective_after) 550 end_ix = self._calendar.searchsorted(dts[-1]) 551

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/history_loader.py in _ensure_sliding_windows(self, assets, dts, field, is_perspective_after) 429 adj_dts = prefetch_dts 430 prefetch_len = len(prefetch_dts) --> 431 array = self._array(prefetch_dts, needed_assets, field) 432 433 if field == 'sid':

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/history_loader.py in _array(self, dts, assets, field) 571 dts[0], 572 dts[-1], --> 573 assets, 574 )[0] 575

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/dispatch_bar_reader.py in load_raw_arrays(self, fields, start_dt, end_dt, sids) 118 end_dt, 119 sid_groups[t]) --> 120 for t in asset_types if sid_groups[t]} 121 122 results = []

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/dispatch_bar_reader.py in (.0) 118 end_dt, 119 sid_groups[t]) --> 120 for t in asset_types if sid_groups[t]} 121 122 results = []

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/bcolz_daily_bars.py in load_raw_arrays(self, columns, start_date, end_date, assets) 577 start_idx, 578 end_idx, --> 579 assets, 580 ) 581 read_all = len(assets) > self._read_all_threshold

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/bcolz_daily_bars.py in _compute_slices(self, start_idx, end_idx, assets) 567 start_idx, 568 end_idx, --> 569 assets, 570 ) 571

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/_equities.pyx in zipline.data._equities._compute_row_slices()

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/_equities.pyx in zipline.data._equities._compute_row_slices()

ValueError: At least one valid asset id is required.

What steps have you taken to resolve this already?

To debug, I deleted everything in my handle_data function and replaced it with the single line "order_target(symbol("TSLA"),10)". This gave me a brand new error "KeyError: Equity(1 [TSLA])" The whole error is shown here:

KeyError Traceback (most recent call last) in 59 capital_base=10000, 60 data_frequency = 'daily', ---> 61 bundle='sp500_data' 62 )

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/utils/run_algo.py in run_algorithm(start, end, initialize, capital_base, handle_data, before_trading_start, analyze, data_frequency, bundle, bundle_timestamp, trading_calendar, metrics_set, benchmark_returns, default_extension, extensions, strict_extensions, environ, blotter) 407 environ=environ, 408 blotter=blotter, --> 409 benchmark_spec=benchmark_spec, 410 ) 411

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/utils/run_algo.py in _run(handle_data, initialize, before_trading_start, analyze, algofile, algotext, defines, data_frequency, capital_base, bundle, bundle_timestamp, start, end, output, trading_calendar, print_algo, metrics_set, local_namespace, environ, blotter, benchmark_spec) 214 } if algotext is None else { 215 'algo_filename': getattr(algofile, 'name', ''), --> 216 'script': algotext, 217 } 218 ).run()

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/algorithm.py in run(self, data_portal) 641 try: 642 perfs = [] --> 643 for perf in self.get_generator(): 644 perfs.append(perf) 645

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/gens/tradesimulation.py in transform(self) 203 for dt, action in self.clock: 204 if action == BAR: --> 205 for capital_change_packet in every_bar(dt): 206 yield capital_change_packet 207 elif action == SESSION_START:

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/gens/tradesimulation.py in every_bar(dt_to_use, current_data, handle_data) 117 # placed in the last bar 118 new_transactions, new_commissions, closed_orders =
--> 119 blotter.get_transactions(current_data) 120 121 blotter.prune_orders(closed_orders)

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/finance/blotter/simulation_blotter.py in get_transactions(self, bar_data) 343 344 for order, txn in
--> 345 slippage.simulate(bar_data, asset, asset_orders): 346 commission = self.commission_models[type(asset)] 347 additional_commission = commission.calculate(order, txn)

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/finance/slippage.py in simulate(self, data, asset, orders_for_asset) 164 def simulate(self, data, asset, orders_for_asset): 165 self._volume_for_bar = 0 --> 166 volume = data.current(asset, "volume") 167 168 if volume == 0:

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/_protocol.pyx in zipline._protocol.check_parameters.call.assert_keywords_and_call()

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/_protocol.pyx in zipline._protocol.BarData.current()

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/data_portal.py in get_spot_value(self, assets, field, dt, data_frequency) 522 field, 523 dt, --> 524 data_frequency, 525 ) 526 else:

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/data_portal.py in _get_single_asset_value(self, session_label, asset, field, dt, data_frequency) 459 else: 460 return self._get_daily_spot_value( --> 461 asset, field, session_label, 462 ) 463 else:

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/data_portal.py in _get_daily_spot_value(self, asset, column, dt) 746 # don't forward fill 747 try: --> 748 return reader.get_value(asset, dt, column) 749 except NoDataOnDate: 750 return np.nan

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/dispatch_bar_reader.py in get_value(self, sid, dt, field) 95 asset = self._asset_finder.retrieve_asset(sid) 96 r = self._readers[type(asset)] ---> 97 return r.get_value(asset, dt, field) 98 99 def get_last_traded_dt(self, asset, dt):

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/bcolz_daily_bars.py in get_value(self, sid, dt, field) 696 0. 697 """ --> 698 ix = self.sid_day_index(sid, dt) 699 price = self._spot_col(field)[ix] 700 if field != 'volume':

~/.pyenv/versions/3.5.10/envs/venv3.5/lib/python3.5/site-packages/zipline/data/bcolz_daily_bars.py in sid_day_index(self, sid, day) 664 raise NoDataOnDate("day={0} is outside of calendar={1}".format( 665 day, self.sessions)) --> 666 offset = day_loc - self._calendar_offsets[sid] 667 if offset < 0: 668 raise NoDataBeforeDate(

KeyError: Equity(1 [TSLA])

Has any one experienced a similar error or knows how to fix this? Any advice would be greatly appreciated!

Sincerely, J

jrfeibelman avatar Mar 16 '21 20:03 jrfeibelman

maybe you can use DataPortal to see whether you have ingest the data correct.

yinjin1314 avatar Mar 21 '21 06:03 yinjin1314

Hi,

I followed the same tutorial to ingest data download from Yahoo finance. And got into the same error:

ValueError: At least one valid asset id is required.

I was able to solve it by creating an altogether new bundle. I did use some parts of the code from the old bundle. My new bundle looks like below:

import os
import sys

from logbook import Logger, StreamHandler
from numpy import empty
from pandas import DataFrame, read_csv, Index, Timedelta, NaT
from trading_calendars import register_calendar_alias

from zipline.utils.cli import maybe_show_progress

from . import core as bundles

handler = StreamHandler(sys.stdout, format_string=" | {record.message}")
logger = Logger(__name__)
logger.handlers.append(handler)


def yahoo_nse_data(tframes=None, csvdir=None):
    
    return NSEDIRBundle(tframes, csvdir).ingest


class NSEDIRBundle:
    """
    Wrapper class to call csvdir_bundle with provided
    list of time frames and a path to the csvdir directory
    """

    def __init__(self, tframes=None, csvdir=None):
        self.tframes = tframes
        self.csvdir = csvdir

    def ingest(self,
               environ,
               asset_db_writer,
               minute_bar_writer,
               daily_bar_writer,
               adjustment_writer,
               calendar,
               start_session,
               end_session,
               cache,
               show_progress,
               output_dir):

        nsedir_bundle(environ,
                      asset_db_writer,
                      minute_bar_writer,
                      daily_bar_writer,
                      adjustment_writer,
                      calendar,
                      start_session,
                      end_session,
                      cache,
                      show_progress,
                      output_dir,
                      self.tframes,
                      self.csvdir)



def nsedir_bundle(environ,
                  asset_db_writer,
                  minute_bar_writer,
                  daily_bar_writer,
                  adjustment_writer,
                  calendar,
                  start_session,
                  end_session,
                  cache,
                  show_progress,
                  output_dir,
                  tframes=None,
                  csvdir=None):
    """
    Build a zipline data bundle from the directory with csv files.
    """
    if not csvdir:
        csvdir = environ.get('CSVDIR')
        if not csvdir:
            raise ValueError("CSVDIR environment variable is not set")

    if not os.path.isdir(csvdir):
        raise ValueError("%s is not a directory" % csvdir)

    if not tframes:
        tframes = set(["daily", "minute"]).intersection(os.listdir(csvdir))

        if not tframes:
            raise ValueError("'daily' and 'minute' directories "
                             "not found in '%s'" % csvdir)

    divs_splits = {'divs': DataFrame(columns=['sid', 'amount',
                                              'ex_date', 'record_date',
                                              'declared_date', 'pay_date']),
                   'splits': DataFrame(columns=['sid', 'ratio',
                                                'effective_date'])}
    for tframe in tframes:
        ddir = os.path.join(csvdir, tframe)

        symbols = sorted(item.split('.csv')[0]
                         for item in os.listdir(ddir)
                         if '.csv' in item)
        if not symbols:
            raise ValueError("no <symbol>.csv* files found in %s" % ddir)

        dtype = [('start_date', 'datetime64[ns]'),
                 ('end_date', 'datetime64[ns]'),
                 ('auto_close_date', 'datetime64[ns]'),
                 ('symbol', 'object')]
        metadata = DataFrame(empty(len(symbols), dtype=dtype))
        
        sessions = calendar.sessions_in_range(start_session, end_session)

        if tframe == 'minute':
            writer = minute_bar_writer
        else:
            writer = daily_bar_writer

        writer.write(_pricing_iter(ddir, symbols, metadata, sessions,
                     divs_splits, show_progress),
                     show_progress=show_progress)

        # Hardcode the exchange to "CSVDIR" for all assets and (elsewhere)
        # register "CSVDIR" to resolve to the NYSE calendar, because these
        # are all equities and thus can use the NYSE calendar.
        metadata['exchange'] = "XBOM"

        asset_db_writer.write(equities=metadata)

        divs_splits['divs']['sid'] = divs_splits['divs']['sid'].astype(int)
        divs_splits['splits']['sid'] = divs_splits['splits']['sid'].astype(int)
        adjustment_writer.write(splits=divs_splits['splits'],
                                dividends=divs_splits['divs'])


def _pricing_iter(csvdir, symbols, metadata, sessions, divs_splits, show_progress):
    with maybe_show_progress(symbols, show_progress,
                             label='Loading custom pricing data: ') as it:
        
        # files = [f[:-4] for f in os.listdir(csvdir)]
        
        files = os.listdir(csvdir)
        
        for sid, symbol in enumerate(it):
            logger.debug('%s: sid %s' % (symbol, sid))

            try:
                fname = [fname for fname in files
                         if '%s.csv' % symbol in fname][0]
            except IndexError:
                raise ValueError("%s.csv file is not in %s" % (symbol, csvdir))

            dfr = read_csv(os.path.join(csvdir, fname),
                           parse_dates=[0],
                           infer_datetime_format=True,
                           index_col=0).sort_index()
            
            # Remove Adjusted Close price
            dfr.drop(columns=['Adj Close'], inplace=True)
            
            # Rename columns
            dfr.rename(columns={'Open': 'open',
                           'Close': 'close',
                           'High': 'high',
                           'Low': 'low',
                           'Volume': 'volume'},
                      inplace=True)
            
            dfr.rename_axis('date', inplace=True)
            
            # Remove duplicates
            dfr = dfr[~dfr.index.duplicated()]
            
            dfr = dfr[dfr.index.isin(sessions)]

            start_date = dfr.index[0]
            end_date = dfr.index[-1]
            
            dfr = dfr.reindex(sessions.tz_localize(None))[start_date:end_date]
            
            dfr.fillna(method='ffill', inplace=True)
            
            dfr.dropna(inplace=True)

            # The auto_close date is the day after the last trade.
            ac_date = end_date + Timedelta(days=1)
            metadata.iloc[sid] = start_date, end_date, ac_date, symbol

            if 'split' in dfr.columns:
                tmp = 1. / dfr[dfr['split'] != 1.0]['split']
                split = DataFrame(data=tmp.index.tolist(),
                                  columns=['effective_date'])
                split['ratio'] = tmp.tolist()
                split['sid'] = sid

                splits = divs_splits['splits']
                index = Index(range(splits.shape[0],
                                    splits.shape[0] + split.shape[0]))
                split.set_index(index, inplace=True)
                divs_splits['splits'] = splits.append(split)

            if 'dividend' in dfr.columns:
                # ex_date   amount  sid record_date declared_date pay_date
                tmp = dfr[dfr['dividend'] != 0.0]['dividend']
                div = DataFrame(data=tmp.index.tolist(), columns=['ex_date'])
                div['record_date'] = NaT
                div['declared_date'] = NaT
                div['pay_date'] = NaT
                div['amount'] = tmp.tolist()
                div['sid'] = sid

                divs = divs_splits['divs']
                ind = Index(range(divs.shape[0], divs.shape[0] + div.shape[0]))
                div.set_index(ind, inplace=True)
                divs_splits['divs'] = divs.append(div)

            yield sid, dfr

# register_calendar_alias("CSVDIR", "XBOM")

parmarjay avatar Apr 21 '21 07:04 parmarjay

I have tried to use zipline locally for minute data, but I get the same error, has anyone solved this?

vinceperkins avatar Jul 31 '22 23:07 vinceperkins

I got the "ValueError: At least one valid asset id is required" error when running a pipeline due to having no daily data in my bundle. I ingested a separate bundle and merged the daily data with the same dates as the minute data.

vinceperkins avatar Aug 04 '22 14:08 vinceperkins