Commit 3816b1b1 authored by Johannes Bleher's avatar Johannes Bleher
Browse files

Initial Commit

parents
Loading
Loading
Loading
Loading
+63 −0
Original line number Diff line number Diff line
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# 1. PRAEAMBLE ----
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
## a. CLEAN WORKSPACE AND LOAD LIBRARIES ----
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
rm(list=ls())
options(stringsAsFactors = FALSE)

#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
## b. FUNCTIONS AND OTHER USEFUL DEFINITIONS ----
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

`%!in%` <- Negate(`%in%`)
`%ni%` <- Negate(`%in%`)

library("RPostgreSQL")
library("data.table")
library("sqldf")


dsn_database <- "aidaho"   # Specify the name of your Database
dsn_hostname <- "193.196.53.49"  # localhost = 127.0.0.1
dsn_port <- "8001"                # Specify your port number. e.g. 98939
#ADMIN USER is: 
dsn_uid <- "<The secret admin user>"         # Specify your username. e.g. "admin"
# ADMIN PASSWORD is: 
dsn_pwd <- "<The secret admin password>"        # Specify your password. e.g. "xxx"


path <- getwd()
datapath <- paste0(path,"/DATA/")

tryCatch({
    drv <- dbDriver("PostgreSQL")
    print("Connecting to Database…")
    connect <- dbConnect(drv, 
                         dbname = dsn_database,
                         host = dsn_hostname, 
                         port = dsn_port,
                         user = dsn_uid, 
                         password = dsn_pwd)
    print("Database Connected!")
},
error=function(cond) {
    print("Unable to connect to Database.")
}
)

res <- dbSendQuery(connect,"SELECT version();")
dbFetch(res, n = -1)


trade_report <- read.csv(paste0(datapath,"data_feed_20220124_20220124_IEXTP1_DEEP1.0_trade_report.csv"))
setDT(trade_report)
trade_report <- trade_report[-nrow(trade_report),]
trade_report <- trade_report[,timestamp := as.POSIXct(timestamp,tz="UTC"),]

dbWriteTable(connect,c('iex','trade_reports'),trade_report,row.names=FALSE)
res <- dbSendQuery(connect,'ALTER TABLE iex.trade_reports ADD CONSTRAINT primary_key PRIMARY KEY (ordinal,timestamp,symbol);')
dbFetch(res, n = -1)
 No newline at end of file

CODE/IEX2CSV.py

0 → 100644
+511 −0
Original line number Diff line number Diff line

#---------------------------------------------------
#     +++  Convert an IEX file to CSV files +++
#---------------------------------------------------
from datetime import datetime
import gzip
import logging
from pathlib import Path
import re
import sys
from typing import Any, Callable, Dict, IO, List, Mapping, Optional

from iex_parser import Parser, DEEP_1_0, TOPS_1_5, TOPS_1_6

# data_feeds_20200305_20200305_IEXTP1_DEEP1.0.pcap.gz
FILENAME_REGEX = re.compile(
    r"data_feeds_(?P<start_date>\d{8})_(?P<end_date>\d{8})_(?P<protocol>[^_]+)_(?P<feed>(DEEP|TOPS))(?P<version>\d+\.\d+)\.pcap\.gz"
)


def int_to_str(value: int) -> str:
    return str(value)


def decimal_to_str(value: int) -> str:
    return str(value)


def bytes_to_str(value: bytes) -> str:
    return '"' + value.decode() + '"'


def timestamp_to_str(value: datetime) -> str:
    return value.isoformat()


def datetime_to_str(value: datetime) -> str:
    return value.isoformat()


def int_to_str(value: int) -> str:
    return str(value)


def decimal_to_str(value: int) -> str:
    return str(value)


def bytes_to_str(value: bytes) -> str:
    return '"' + value.decode() + '"'


def timestamp_to_str(value: datetime) -> str:
    return value.isoformat()


def datetime_to_str(value: datetime) -> str:
    return value.isoformat()


FILE_FORMATS: Mapping[str, Mapping[str, Callable[[Any], str]]] = {
    'security_directive': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'flags':  int_to_str,
        'symbol': bytes_to_str,
        'round_lot_size': int_to_str,
        'adjusted_poc_close': decimal_to_str,
        'luld_tier': int_to_str
    },
    'trading_status': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'status': bytes_to_str,
        'symbol': bytes_to_str,
        'reason': bytes_to_str,
    },
    'retail_liquidity_indicator': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'indicator': bytes_to_str,
        'symbol': bytes_to_str,
    },
    'operational_halt': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'halt_status': bytes_to_str,
        'symbol': bytes_to_str
    },
    'short_sale_price_test_status': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'status': int_to_str,
        'symbol':  bytes_to_str,
        'detail': bytes_to_str
    },
    'quote_update': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'flags': int_to_str,
        'symbol': bytes_to_str,
        'bid_size': int_to_str,
        'bid_price':  decimal_to_str,
        'ask_size': int_to_str,
        'ask_price': decimal_to_str
    },
    'trade_report': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'flags':  int_to_str,
        'symbol': bytes_to_str,
        'size': int_to_str,
        'price': decimal_to_str,
        'trade_id': int_to_str
    },
    'official_price': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'price_type': bytes_to_str,
        'symbol': bytes_to_str,
        'price': decimal_to_str
    },
    'trade_break': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'flags': int_to_str,
        'symbol':  bytes_to_str,
        'size':  int_to_str,
        'price':  decimal_to_str,
        'trade_id': int_to_str
    },
    'auction_information': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'auction_type': bytes_to_str,
        'symbol':  bytes_to_str,
        'paired_shares': int_to_str,
        'reference_price': decimal_to_str,
        'indicative_clearing_price': decimal_to_str,
        'imbalance_shares': int_to_str,
        'imbalance_side': bytes_to_str,
        'extension_number': int_to_str,
        'scheduled_auction_time': datetime_to_str,
        'auction_book_clearing_price': decimal_to_str,
        'collar_reference_price': decimal_to_str,
        'lower_auction_collar_price': decimal_to_str,
        'upper_auction_collar_price': decimal_to_str
    },
    'price_level_update': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'side': bytes_to_str,
        'flags': int_to_str,
        'symbol': bytes_to_str,
        'size': int_to_str,
        'price': decimal_to_str
    },
    'security_event': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'security_event': bytes_to_str,
        'symbol': bytes_to_str
    },
    'system_event': {
        'ordinal': int_to_str,
        'timestamp': timestamp_to_str,
        'event': bytes_to_str
    }
}



def _convert_tops_1_5(
        filename: Path,
        output_folder: Path,
        tickers: List[bytes],
        is_silent: bool,
        is_timestamp_ordinal,
        start_date: datetime,
        end_date: datetime,
        protocol: str,
        feed: str,
        version: str,
        feed_def: str
) -> None:
    root_filename = f'data_feed_{start_date:%Y%m%d}_{end_date:%Y%m%d}_{protocol}_{feed}{version}_'
    quote_update_filename = root_filename + 'quote_update.csv.gz'
    trade_report_filename = root_filename + 'trade_report.csv.gz'
    trade_break_filename = root_filename + 'trade_break.csv.gz'

    file_ptr_map: Dict[str, IO[Any]] = {}
    ordinal = 0
    previous_timestamp: Optional[datetime] = None
    with Parser(str(filename), feed_def) as reader:
        with gzip.open(output_folder / quote_update_filename, "wt") as file_ptr_map['quote_update']:
            print(",".join(FILE_FORMATS['quote_update'].keys(
            )), file=file_ptr_map['quote_update'])
            with gzip.open(output_folder / trade_report_filename, "wt") as file_ptr_map['trade_report']:
                print(",".join(FILE_FORMATS['trade_report'].keys(
                )), file=file_ptr_map['trade_report'])
                with gzip.open(output_folder / trade_break_filename, "wt") as file_ptr_map['trade_break']:
                    print(",".join(FILE_FORMATS['trade_break'].keys(
                    )), file=file_ptr_map['trade_break'])

                    for message in reader:
                        if is_timestamp_ordinal and previous_timestamp != message['timestamp']:
                            ordinal = 0
                        ordinal += 1
                        message['ordinal'] = ordinal
                        previous_timestamp = message['timestamp']

                        if not is_silent and ordinal % 1000 == 0:
                            print(
                                f"{message['timestamp'].isoformat()} ({ordinal})", file=sys.stderr)

                        if tickers and 'symbol' in message and message['symbol'] not in tickers:
                            if not is_silent:
                                print(
                                    f"Skipping {message['symbol']}", file=sys.stderr)
                            continue

                        file_ptr = file_ptr_map[message['type']]
                        formats = FILE_FORMATS[message['type']]
                        data = [
                            fmt(message[name])
                            for name, fmt in formats.items()
                        ]
                        print(
                            ",".join(data), file=file_ptr)


def _convert_deep_1_0_and_tops_1_6(
        filename: Path,
        output_folder: Path,
        tickers: List[bytes],
        is_silent: bool,
        is_timestamp_ordinal,
        start_date: datetime,
        end_date: datetime,
        protocol: str,
        feed: str,
        version: str,
        feed_def: str
) -> None:
    root_filename = f'data_feed_{start_date:%Y%m%d}_{end_date:%Y%m%d}_{protocol}_{feed}{version}_'
    security_directory_filename = root_filename + 'security_directory.csv.gz'
    trading_status_filename = root_filename + 'trading_status.csv.gz'
    retail_liquidity_indicator_filename = root_filename + 'retail_liquidity_indicator.csv.gz'
    operational_halt_filename = root_filename + 'operational_halt.csv.gz'
    short_sale_price_test_status_filename = root_filename + \
        'short_sale_price_test_status.csv.gz'
    security_event_filename = root_filename + 'security_event.csv.gz'
    quote_update_filename = root_filename + 'quote_update.csv.gz'
    price_level_update_filename = root_filename + 'price_level_update.csv.gz'
    trade_report_filename = root_filename + 'trade_report.csv.gz'
    official_price_filename = root_filename + 'official_price.csv.gz'
    trade_break_filename = root_filename + 'trade_break.csv.gz'
    auction_information_filename = root_filename + 'auction_information.csv.gz'
    system_event_filename = root_filename + 'system_event.csv.gz'

    file_ptr_map: Dict[str, IO[Any]] = {}
    ordinal = 0
    previous_timestamp: Optional[datetime] = None
    with Parser(str(filename), feed_def) as reader:
        with gzip.open(
                output_folder / security_directory_filename,
                "wt"
        ) as file_ptr_map['security_directive']:
            print(
                ",".join(FILE_FORMATS['security_directive'].keys()),
                file=file_ptr_map['security_directive']
            )
            with gzip.open(
                    output_folder / trading_status_filename,
                    "wt"
            ) as file_ptr_map['trading_status']:
                print(
                    ",".join(FILE_FORMATS['trading_status'].keys()),
                    file=file_ptr_map['trading_status']
                )
                with gzip.open(
                        output_folder / retail_liquidity_indicator_filename,
                        "wt"
                ) as file_ptr_map['retail_liquidity_indicator']:
                    print(
                        ",".join(FILE_FORMATS['retail_liquidity_indicator'].keys()),
                        file=file_ptr_map['retail_liquidity_indicator']
                    )
                    with gzip.open(
                            output_folder / operational_halt_filename,
                            "wt"
                    ) as file_ptr_map['operational_halt']:
                        print(
                            ",".join(FILE_FORMATS['operational_halt'].keys()),
                            file=file_ptr_map['operational_halt']
                        )
                        with gzip.open(
                                output_folder / short_sale_price_test_status_filename,
                                "wt"
                        ) as file_ptr_map['short_sale_price_test_status']:
                            print(
                                ",".join(
                                    FILE_FORMATS['short_sale_price_test_status'].keys()),
                                file=file_ptr_map['short_sale_price_test_status']
                            )
                            with gzip.open(
                                    output_folder / quote_update_filename,
                                    "wt"
                            ) as file_ptr_map['quote_update']:
                                print(
                                    ",".join(FILE_FORMATS['quote_update'].keys()),
                                    file=file_ptr_map['quote_update']
                                )
                                with gzip.open(
                                        output_folder / trade_report_filename,
                                        "wt"
                                ) as file_ptr_map['trade_report']:
                                    print(
                                        ",".join(
                                            FILE_FORMATS['trade_report'].keys()),
                                        file=file_ptr_map['trade_report']
                                    )
                                    with gzip.open(
                                            output_folder / official_price_filename,
                                            "wt"
                                    ) as file_ptr_map['official_price']:
                                        print(
                                            ",".join(
                                                FILE_FORMATS['official_price'].keys()),
                                            file=file_ptr_map['official_price']
                                        )
                                        with gzip.open(
                                                output_folder / trade_break_filename,
                                                "wt"
                                        ) as file_ptr_map['trade_break']:
                                            print(
                                                ",".join(
                                                    FILE_FORMATS['trade_break'].keys()),
                                                file=file_ptr_map['trade_break']
                                            )
                                            with gzip.open(
                                                    output_folder / auction_information_filename,
                                                    "wt"
                                            ) as file_ptr_map['auction_information']:
                                                print(
                                                    ",".join(
                                                        FILE_FORMATS['auction_information'].keys()),
                                                    file=file_ptr_map['auction_information']
                                                )
                                                with gzip.open(
                                                        output_folder / price_level_update_filename,
                                                        "wt"
                                                ) as file_ptr_map['price_level_update']:
                                                    print(
                                                        ",".join(
                                                            FILE_FORMATS['price_level_update'].keys()),
                                                        file=file_ptr_map['price_level_update']
                                                    )
                                                    with gzip.open(
                                                            output_folder / security_event_filename,
                                                            "wt"
                                                    ) as file_ptr_map['security_event']:
                                                        print(
                                                            ",".join(
                                                                FILE_FORMATS['security_event'].keys(
                                                                )
                                                            ),
                                                            file=file_ptr_map['security_event']
                                                        )
                                                        with gzip.open(
                                                                output_folder / system_event_filename,
                                                                "wt"
                                                        ) as file_ptr_map['system_event']:
                                                            print(
                                                                ",".join(
                                                                    FILE_FORMATS['system_event'].keys(
                                                                    )
                                                                ),
                                                                file=file_ptr_map['system_event']
                                                            )

                                                            for message in reader:
                                                                if is_timestamp_ordinal and previous_timestamp != message['timestamp']:
                                                                    ordinal = 0
                                                                ordinal += 1
                                                                message['ordinal'] = ordinal
                                                                previous_timestamp = message['timestamp']

                                                                if not is_silent and ordinal % 1000 == 0:
                                                                    print(
                                                                        f"{message['timestamp'].isoformat()} ({ordinal})", file=sys.stderr)

                                                                if tickers and 'symbol' in message and message['symbol'] not in tickers:
                                                                    if not is_silent:
                                                                        print(
                                                                            f"Skipping {message['symbol']}", file=sys.stderr)
                                                                    continue

                                                                file_ptr = file_ptr_map[message['type']]
                                                                formats = FILE_FORMATS[message['type']]
                                                                data = [
                                                                    fmt(message[name])
                                                                    for name, fmt in formats.items()
                                                                ]
                                                                print(
                                                                    ",".join(data), file=file_ptr)


def convert(
        filename: Path,
        output_folder: Path,
        tickers: List[bytes],
        is_silent: bool,
        is_timestamp_ordinal
) -> None:
    matches = FILENAME_REGEX.match(filename.name)

    if not matches:
        raise ValueError('Unable to process filename')

    dct = matches.groupdict()
    start_date = datetime.strptime(dct['start_date'], '%Y%m%d')
    end_date = datetime.strptime(dct['end_date'], '%Y%m%d')
    protocol: str = dct['protocol']
    feed: str = dct['feed']
    version: str = dct['version']

    if feed == 'DEEP' and version == '1.0':
        _convert_deep_1_0_and_tops_1_6(
            filename,
            output_folder,
            tickers,
            is_silent,
            is_timestamp_ordinal,
            start_date,
            end_date,
            protocol,
            feed,
            version,
            DEEP_1_0
        )
    elif feed == 'TOPS' and version == '1.5':
        _convert_tops_1_5(
            filename,
            output_folder,
            tickers,
            is_silent,
            is_timestamp_ordinal,
            start_date,
            end_date,
            protocol,
            feed,
            version,
            TOPS_1_5
        )
    elif feed == 'TOPS' and version == '1.6':
        _convert_deep_1_0_and_tops_1_6(
            filename,
            output_folder,
            tickers,
            is_silent,
            is_timestamp_ordinal,
            start_date,
            end_date,
            protocol,
            feed,
            version,
            TOPS_1_6
        )
    else:
        raise ValueError(f'Unknown protocol "{feed}" "{version}"')


def parse_args(args):
    parser = argparse.ArgumentParser("IEX to csv")
    parser.add_argument(
        '-i', '--input-file',
        help='Input filename',
        action='store',
        dest='input_filename')
    parser.add_argument(
        '-o', '--output-folder',
        help='Output folder',
        action='store',
        dest='output_folder',
        default='.')
    parser.add_argument(
        '-t', '--ticker',
        help='Add a ticker to record',
        action='append',
        dest='tickers',
        default=[])
    parser.add_argument(
        '-s', '--silent',
        help='Suppress progress report',
        action='store_true',
        dest='is_silent',
        default=False)
    parser.add_argument(
        '-c', '--timestamp-ordinal',
        help='Reset the ordinal when the timestamp changes',
        action='store_true',
        dest='is_timestamp_ordinal',
        default=False)
    parser.add_argument(
        '-v', '--verbose',
        help='Verbose',
        action='store_true',
        dest='is_verbose',
        default=False)
    return parser.parse_args(args)

CODE/iex_data.py

0 → 100644
+0 −0

File added.

Preview size limit exceeded, changes collapsed.