using System; using System.Text; using System.Configuration; using System.Collections.Generic; using System.Collections.Concurrent; using System.Data; using System.Data.SqlClient; using MySqlConnector; using System.Linq; using System.Threading; using System.Threading.Tasks; using System.Windows.Forms; using System.Runtime.InteropServices; using MarketDataProcessor; using System.Diagnostics; using Confluent.Kafka; namespace MarketDataProcessorGUI { public struct Point { public int x; public int y; } public partial class frmTapeImport : Form { private List symbols = new List(); private long userData; private TapeProcessor _tapeProcessor; private bool _running = false; private BlockingCollection _bcqTrades = new BlockingCollection(new ConcurrentQueue()); private long _queuedTradesCount; private long _loggedTradesCount; private long _lastUpdateTime; public frmTapeImport() { InitializeComponent(); } private void button1_Click(object sender, EventArgs e) { listBox1.Items.Insert(0, $"{DateTime.Now:G} - Running...{Environment.NewLine}"); Task.Run(() => tradeWorkerMySql()); //Task.Run(() => tradeWorkerMySql()); //Task.Run(() => tradeWorkerMySql()); //Task.Run(() => tradeWorkerMySql()); //Task.Run(() => tradeWorkerMySql()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); //Task.Run(() => tradeWorker()); _running = true; Task.Run(() => { _tapeProcessor = new TapeProcessor(); _tapeProcessor.OnEvent(onEvent); _tapeProcessor.OnTradeDump(onTrade); _tapeProcessor.StartTape(textBox1.Text.Trim(),true,""); }); Task.Run(() => monitor()); } private void onTrade(Trade trade) { while (_bcqTrades.Count > 1000000) { BeginInvoke((Action)(() => { listBox1.Items.Insert(0, $"{DateTime.Now:G} - Waiting for Queue to Catch Up... {trade.Time}{Environment.NewLine}"); })); Thread.Sleep(250); } _bcqTrades.TryAdd(trade); _queuedTradesCount++; } private void onEvent(TapeProcessorEvents type, object value) { BeginInvoke((Action)(() => { switch (type) { case TapeProcessorEvents.Status: listBox1.Items.Insert(0, $"{DateTime.Now:G} - {value}{Environment.NewLine}"); break; case TapeProcessorEvents.TapeTime: lblTapeTime.Text = $"{(DateTime)value:G}"; break; case TapeProcessorEvents.TapeOpened: listBox1.Items.Insert(0, $"{DateTime.Now:G} - {value}{Environment.NewLine}"); break; case TapeProcessorEvents.TapeComplete: listBox1.Items.Insert(0, $"{DateTime.Now:G} - {value}{Environment.NewLine}"); break; } })); } private async void monitor() { while (_running) { try { if (this.InvokeRequired) { BeginInvoke((Action)(() => { lblQueuedTrades.Text = $"{_queuedTradesCount:n0}"; lblLoggedCount.Text = $"{_loggedTradesCount:n0}"; lblQueueCount.Text = $"{_bcqTrades.Count:n0}"; label5.Text = $"{_lastUpdateTime}"; })); } Thread.Sleep(2500); } catch (Exception ex) { var e = ex.Message; } } } private async void tradeWorker() { SqlConnection conn = new SqlConnection(ConfigurationManager.AppSettings["DB_BROKER_INTEGRATIONS"]); await conn.OpenAsync(); SqlCommand cmd = new SqlCommand("Trades_X_Add", conn); cmd.CommandType = CommandType.StoredProcedure; DataTable trades = new DataTable(); trades.Columns.Add(new DataColumn("Type", System.Type.GetType("System.String"))); trades.Columns.Add(new DataColumn("Symbol", System.Type.GetType("System.String"))); trades.Columns.Add(new DataColumn("Time", System.Type.GetType("System.DateTime"))); trades.Columns.Add(new DataColumn("PriceFlags", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("TradeCondition", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("ConditionFlags", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("VolumeType", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("BATECode", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("Size", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("TickVolume", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("TotalVolume", System.Type.GetType("System.Int64"))); trades.Columns.Add(new DataColumn("Tick", System.Type.GetType("System.Decimal"))); trades.Columns.Add(new DataColumn("Price", System.Type.GetType("System.Decimal"))); trades.Columns.Add(new DataColumn("Open", System.Type.GetType("System.Decimal"))); trades.Columns.Add(new DataColumn("High", System.Type.GetType("System.Decimal"))); trades.Columns.Add(new DataColumn("Low", System.Type.GetType("System.Decimal"))); trades.Columns.Add(new DataColumn("Last", System.Type.GetType("System.Decimal"))); trades.Columns.Add(new DataColumn("ListedExchange", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("ReportingExchange", System.Type.GetType("System.Int32"))); trades.Columns.Add(new DataColumn("Filtered", System.Type.GetType("System.Boolean"))); cmd.Parameters.AddWithValue("@Trades", trades); cmd.Prepare(); DataRow row; while (_running) { try { if (_bcqTrades.TryTake(out Trade trade, 1000)) { row = trades.Rows.Add(); row["Type"] = trade.Type; row["Symbol"] = trade.Symbol; row["Time"] = trade.Time; row["PriceFlags"] = trade.PriceFlags; row["TradeCondition"] = trade.TradeCondition; row["ConditionFlags"] = trade.ConditionFlags; row["VolumeType"] = trade.VolumeType; row["BATECode"] = trade.BATECode; row["Size"] = trade.Size; row["TickVolume"] = trade.TickVolume; row["TotalVolume"] = trade.TotalVolume; row["Tick"] = trade.Tick; row["Price"] = trade.Price; row["Open"] = trade.Open; row["High"] = trade.High; row["Low"] = trade.Low; row["Last"] = trade.Last; row["ListedExchange"] = trade.ListedExchange; row["ReportingExchange"] = trade.ReportingExchange; row["Filtered"] = trade.Filtered; Interlocked.Increment(ref _loggedTradesCount); if (trades.Rows.Count == 1000) { cmd.Parameters["@Trades"].Value = trades; await cmd.ExecuteNonQueryAsync(); trades.Rows.Clear(); } } } catch (Exception ex) { var e = ex.Message; } } } private async void tradeWorkerMySql() { MySqlConnection conn = new MySqlConnection("Server=dev-md-mariadb01.ct7fqkfqkndt.us-east-1.rds.amazonaws.com;User ID=root;Password=TiAM4rk3tD17a;Database=ti-broker-integrations"); await conn.OpenAsync(); MySqlCommand cmd = new MySqlCommand("trades_bulk_insert", conn); cmd.CommandType = CommandType.StoredProcedure; cmd.Parameters.AddWithValue("trades",string.Empty); cmd.Prepare(); var sql = "INSERT INTO trade(type,symbol,time,price_flags,trade_condition,condition_flags,tick_volume,total_volume,price,last,open,high,low,listed_exchange) VALUES "; var trades = new List(); while (_running) { try { if (_bcqTrades.TryTake(out Trade trade, 1000)) { //trades.Add($"{trade.Type},{trade.Symbol},{trade.Time},{trade.PriceFlags},{trade.TradeCondition},{trade.ConditionFlags},{trade.TickVolume},{trade.TotalVolume},{trade.Price},{trade.Last},{trade.Open},{trade.High},{trade.Low},{trade.ListedExchangeStr}"); trades.Add($"('{trade.Type}','{trade.Symbol}','{trade.Time}',{trade.PriceFlags},{trade.TradeCondition},{trade.ConditionFlags},{trade.TickVolume},{trade.TotalVolume},{trade.Price},{trade.Last},{trade.Open},{trade.High},{trade.Low},'{trade.ListedExchangeStr}')"); Interlocked.Increment(ref _loggedTradesCount); if (trades.Count == 10000) { Stopwatch sw = new Stopwatch(); sw.Start(); //var tradesStr = string.Join("|", trades); var tradesStr = string.Join(",", trades); cmd.Parameters["trades"].Value = sql + tradesStr; cmd.ExecuteNonQuery(); //using (MySqlCommand command = new MySqlCommand(sql + tradesStr,conn)) //{ // command.CommandType = CommandType.Text; // command.ExecuteNonQuery(); //} sw.Stop(); _lastUpdateTime = sw.ElapsedMilliseconds; trades.Clear(); } } } catch (Exception ex) { var e = ex.Message; } } } private void label6_Click(object sender, EventArgs e) { } private void label5_Click(object sender, EventArgs e) { } private void button2_Click(object sender, EventArgs e) { } } } //private async void tradeWorker() //{ // MySqlConnection conn = new MySqlConnection(ConfigurationManager.AppSettings["DB_BROKER_INTEGRATIONS"]); // await conn.OpenAsync(); // MySqlCommand cmd = new MySqlCommand("trade_add", conn); // cmd.CommandType = CommandType.StoredProcedure; // cmd.Prepare(); // cmd.Parameters.AddWithValue("Type", ""); // cmd.Parameters.AddWithValue("Symbol", ""); // cmd.Parameters.AddWithValue("Time", "1970-01-01 00:00:00.001"); // cmd.Parameters.AddWithValue("PriceFlags", 1); // cmd.Parameters.AddWithValue("TradeCondition", 1); // cmd.Parameters.AddWithValue("ConditionFlags", 1); // cmd.Parameters.AddWithValue("VolumeType", 1); // cmd.Parameters.AddWithValue("BATECode", 1); // cmd.Parameters.AddWithValue("Size", 1); // cmd.Parameters.AddWithValue("TickVolume", 1); // cmd.Parameters.AddWithValue("TotalVolume", 1); // cmd.Parameters.AddWithValue("Tick", 1.01); // cmd.Parameters.AddWithValue("Price", 1.01); // cmd.Parameters.AddWithValue("Open", 1.01); // cmd.Parameters.AddWithValue("High", 1.01); // cmd.Parameters.AddWithValue("Low", 1.01); // cmd.Parameters.AddWithValue("Last", 1.01); // cmd.Parameters.AddWithValue("ListedExchange", 1); // cmd.Parameters.AddWithValue("ReportingExchange", 1); // while (_running) // { // try // { // if (_bcqTrades.TryTake(out Trade trade, 1000)) // { // cmd.Parameters["Type"].Value = trade.Type; // cmd.Parameters["Symbol"].Value = trade.Symbol; // cmd.Parameters["Time"].Value = trade.Time; // cmd.Parameters["PriceFlags"].Value = trade.PriceFlags; // cmd.Parameters["TradeCondition"].Value = trade.TradeCondition; // cmd.Parameters["ConditionFlags"].Value = trade.ConditionFlags; // cmd.Parameters["VolumeType"].Value = trade.VolumeType; // cmd.Parameters["BATECode"].Value = trade.BATECode; // cmd.Parameters["Size"].Value = trade.Size; // cmd.Parameters["TickVolume"].Value = trade.TickVolume; // cmd.Parameters["TotalVolume"].Value = trade.TotalVolume; // cmd.Parameters["Tick"].Value = trade.Tick; // cmd.Parameters["Price"].Value = trade.Price; // cmd.Parameters["Open"].Value = trade.Open; // cmd.Parameters["High"].Value = trade.High; // cmd.Parameters["Low"].Value = trade.Low; // cmd.Parameters["Last"].Value = trade.Last; // cmd.Parameters["ListedExchange"].Value = trade.ListedExchange; // cmd.Parameters["ReportingExchange"].Value = trade.ReportingExchange; // await cmd.ExecuteNonQueryAsync(); // Interlocked.Increment(ref _loggedTradesCount); // } // } // catch (Exception ex) // { // var e = ex.Message; // } // }