using System; using System.Net.Http; using System.Threading.Tasks; using System.Linq; using System.Threading; using TradeIdeasCommon.Services.Persistence; using System.Collections.Generic; using System.Collections.Specialized; using TradeIdeasCommon.Models; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using MarketDataSnapshot; using System.Diagnostics; using System.Runtime.InteropServices; using BrokersServices; using AlpacaService.Models; /// /// This is a wrapper around Alpaca's REST and WebSocket API /// Alpaca offers a C# SDK, however, the package has many 3rd party dependencies /// that seem excessive. This is a simple enough API to implement and maintain /// on our own. /// namespace AlpacaService { public class AlpacaUserSession : IUserSession, IDisposable { private IPortfolioDataSubscriber _portfolioDataSubscriber; private readonly int _brokerId; private readonly BrokerIntegration _brokerIntegration; private readonly DatabaseService _databaseService; private readonly MarketDataSnapshotService _snapshotService; private readonly IUserSessionCallbacks _userSessionCallbacks; private readonly ILogger _logger; private AlpacaOAuthSession _oAuthSession; private BrokerUserSession _brokerUserSession; private LogHeader _logHeader = new LogHeader(); private List _brokerAccounts; private bool _streaming; private bool _portfolioLoaded; private List _phantomOrders = new List(); private Task _taskPortfolioWorker; private Task _taskExecutionWorker; private int _clientOrderId; private SemaphoreSlimX _semaphoreSlim = new SemaphoreSlimX(1, 1); private readonly int SEMAPHORE_WAIT_TIME = 60000; private readonly int RESET_EVENT_WAIT_TIME = 20000; private readonly int PORTFOLIO_WORKER_INTERVAL = 1500; private readonly int EXECUTION_WORKER_INTERVAL = 1000; private System.Timers.Timer _streamingReconnectPendingTimer; private const string ALPACA_API_VERSION = "v2"; private ClientWebSocketWrapper _ws; public AlpacaUserSession(Brokers brokerid, ILoggerFactory loggerFactory, ApiEnvironment environment, BrokerIntegration brokerIntegration, DatabaseService databaseService, MarketDataSnapshotService snapshotService, IUserSessionCallbacks userSessionCallbacks) { _brokerId = (int)brokerid; _logger = loggerFactory.CreateLogger(); _brokerIntegration = brokerIntegration; _databaseService = databaseService; _snapshotService = snapshotService; _userSessionCallbacks = userSessionCallbacks; _oAuthSession = new AlpacaOAuthSession(loggerFactory, environment, _brokerIntegration.client_id, _brokerIntegration.client_secret, Notify); } #region Service Methods public BrokerUserSession GetBrokerUserSession() { return new BrokerUserSession() { TargetInstance = _brokerUserSession.TargetInstance, Connected = IsUserConnected, LastConnectedWhen = _brokerUserSession.LastConnectedWhen, BrokerId = _brokerUserSession.BrokerId, BrokerUserId = _brokerUserSession.BrokerUserId, UserName = _brokerUserSession.UserName, UserSessionId = _brokerUserSession.UserSessionId, NextOrderId = _brokerUserSession.NextOrderId, LocalOrders = _brokerAccounts.SelectMany(a => a.Orders.Where(o => o.Monitor())).ToList().Count(), TokenExpirationWhen = _brokerUserSession.TokenExpirationWhen, CreatedWhen = _brokerUserSession.CreatedWhen, UpdatedWhen = _brokerUserSession.UpdatedWhen, BrokerUserSessionId = _brokerUserSession.BrokerUserSessionId, StatusTypeId = _brokerUserSession.StatusTypeId, StatusType = _brokerUserSession.StatusType, BrokerUserSessionCreatedWhen = _brokerUserSession.BrokerUserSessionCreatedWhen, BrokerUserSessionUpdatedWhen = _brokerUserSession.BrokerUserSessionUpdatedWhen }; } public IReadOnlyCollection GetObfuscatedUserPortfolio() { var brokerAccounts = JsonConvert.DeserializeObject>(JsonConvert.SerializeObject(_brokerAccounts)); brokerAccounts.ForEach(ba => { ba.account_identifier = ba.account_identifier[0..^4] + "****"; ba.account_key = ba.account_key[0..^4] + "****"; }); return brokerAccounts.AsReadOnly(); } public async Task Connect(BrokerConnectRequest connectRequest) { var connectResult = new BrokerConnectResult(); try { if (!string.IsNullOrEmpty(connectRequest.verifierCode)) { return await ConnectWithVerifierCode(connectRequest); } else if (!string.IsNullOrEmpty(connectRequest.tokens)) { return await ConnectWithTokens(connectRequest); } var statusTypeId = StatusTypes.NotSet; var brokerUserResult = await _databaseService.GetBrokerUser(_brokerId, connectRequest.userName); var brokerUserSessionResult = await _databaseService.AddBrokerUserSession(connectRequest.targetInstance, connectRequest.userSessionId, brokerUserResult.Obj.BrokerUserId); _brokerUserSession = brokerUserSessionResult.Obj; UpdateLogHeader(); //if (connectRequest.reAuthenticate || string.IsNullOrEmpty(_brokerUserSession.AccessToken) || !_oAuthSession.Renew(_brokerUserSession.AccessToken, _brokerUserSession.AccessTokenSecret)) //{ // connectResult.Set(connectRequest.targetInstance, // false, // _brokerUserSession.BrokerUserSessionId, // false, // "Authentication with ETrade required.", // _oAuthSession.GetRequestToken()); // _brokerUserSession.AccessToken = _oAuthSession.RequestToken; // _brokerUserSession.AccessTokenSecret = _oAuthSession.RequestTokenSecret; // statusTypeId = StatusTypes.AuthenticationWithBrokerRequired; // UpdateLogHeader(); //} //else //{ // _brokerUserSession.AccessToken = _oAuthSession.AccessToken; // _brokerUserSession.AccessTokenSecret = _oAuthSession.AccessTokenSecret; // _brokerUserSession.LastConnectedWhen = DateTime.Now; // statusTypeId = StatusTypes.AuthorizationRenewed; // connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Connected."); //} //Temporary until we switch to App OAuth Workflow _brokerUserSession.AccessToken = _oAuthSession.AccessToken; _brokerUserSession.AccessTokenSecret = _oAuthSession.AccessTokenSecret; _brokerUserSession.LastConnectedWhen = DateTime.Now; statusTypeId = StatusTypes.AuthorizationRenewed; connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Connected."); _brokerUserSession.StatusTypeId = (int)statusTypeId; var result = await _databaseService.UpdateBrokerUserSession(connectRequest.targetInstance, _brokerUserSession.BrokerUserId, _brokerUserSession.BrokerUserSessionId, _brokerUserSession.StatusTypeId, _brokerUserSession.AccessToken, _brokerUserSession.AccessTokenSecret, DateTime.Now.AddMinutes(15)); } catch (Exception ex) { LogError(ex.Message, ex); connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later."); } return connectResult; } public async Task ConnectWithVerifierCode(BrokerConnectRequest connectRequest) { var connectResult = new BrokerConnectResult(); try { var statusTypeId = StatusTypes.AuthenticationWithBrokerRequired; var brokerUserSessionResult = await _databaseService.GetBrokerUserSession(connectRequest.userSessionId,_brokerId, null, (int)statusTypeId); _brokerUserSession = brokerUserSessionResult.Obj; UpdateLogHeader(); if (_oAuthSession.Authorize(connectRequest.verifierCode, _brokerUserSession.AccessToken, _brokerUserSession.AccessTokenSecret)) { statusTypeId = StatusTypes.Authorized; _brokerUserSession.LastConnectedWhen = DateTime.Now; connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Connected."); } else { statusTypeId = StatusTypes.Unauthorized; connectResult.Set(connectRequest.targetInstance, false, _brokerUserSession.BrokerUserSessionId, true, "Please Try Again Later."); } _brokerUserSession.StatusTypeId = (int)statusTypeId; var result = await _databaseService.UpdateBrokerUserSession(connectRequest.targetInstance, _brokerUserSession.BrokerUserId, _brokerUserSession.BrokerUserSessionId, _brokerUserSession.StatusTypeId, _oAuthSession.AccessToken, _oAuthSession.AccessTokenSecret, DateTime.Now.AddMinutes(15)); } catch (Exception ex) { LogError(ex.Message, ex); connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later."); } return connectResult; } public async Task ConnectWithTokens(BrokerConnectRequest connectRequest) { var connectResult = new BrokerConnectResult(); try { var statusTypeId = StatusTypes.AuthenticationWithBrokerRequired; var brokerUserSessionResult = await _databaseService.GetBrokerUserSession(connectRequest.userSessionId, _brokerId, null, (int)statusTypeId); _brokerUserSession = brokerUserSessionResult.Obj; UpdateLogHeader(); _oAuthSession.SetTokens(connectRequest.tokens); statusTypeId = StatusTypes.Authorized; _brokerUserSession.StatusTypeId = (int)statusTypeId; var result = await _databaseService.UpdateBrokerUserSession(connectRequest.targetInstance, _brokerUserSession.BrokerUserId, _brokerUserSession.BrokerUserSessionId, _brokerUserSession.StatusTypeId, _oAuthSession.AccessToken, _oAuthSession.AccessTokenSecret, DateTime.Now.AddMinutes(15)); connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Connected."); } catch (Exception ex) { LogError(ex.Message, ex); connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later."); } return connectResult; } public async Task Disconnect() { var connectResult = new BrokerConnectResult(); try { await UpdateBrokerUserSessionStatus(StatusTypes.UserDisconnected); // _brokerUserSession.StatusTypeId = (int)StatusTypes.UserDisconnecting; connectResult.Set(_brokerUserSession.TargetInstance, false, _brokerUserSession.BrokerUserSessionId, false, "Disconnection successful", string.Empty); } catch (Exception ex) { LogError(ex.Message, ex); connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later."); } return connectResult; } public async Task ReConnect(BrokerConnectRequest connectRequest) { var connectResult = new BrokerConnectResult(); try { await UpdateBrokerUserSessionStatus(StatusTypes.UserReconnected); _brokerUserSession.LastConnectedWhen = DateTime.Now; connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Re-Connected to your User Session"); } catch (Exception ex) { LogError(ex.Message, ex); connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later."); } return connectResult; } public void ConnectStreaming(IPortfolioDataSubscriber portfolioDataSubscriber) { ClearPendingDisconnectTimer(); _portfolioDataSubscriber = portfolioDataSubscriber; //On Streaming Reconnects it's Important that we don't change the status until the new _portfolioDataSubscriber is assigned if ((StatusTypes)_brokerUserSession.StatusTypeId == StatusTypes.StreamingReconnectPending) { Task.Run(async () => await UpdateBrokerUserSessionStatus(StatusTypes.StreamingReconnected)).GetAwaiter().GetResult(); } _portfolioDataSubscriber.OnMessage(JsonConvert.SerializeObject(new ConnectionMsg("Successfully Connected to your ETrade User Session."))); if (!_streaming) { _streaming = Task.Run(async () => { return await ConnectAlpacaStreaming(); }).GetAwaiter().GetResult(); _taskExecutionWorker = Task.Run(ExecutionWorker); } } private async Task ConnectAlpacaStreaming() { try { _ws = ClientWebSocketWrapper.Create($"wss://{_oAuthSession.ApiHost}/stream"); _ws.OnMessage(OnWebSocketMessage); _ws.OnDisconnect(OnWebSocketDisconnect); await _ws.Connect(); var auth = new ActionMessage() { action = "authenticate", data = new AuthKeys() { key_id = _oAuthSession.AccessToken, secret_key = _oAuthSession.AccessTokenSecret } }; await _ws.SendMessageAsync(JsonConvert.SerializeObject(auth)); return true; } catch (Exception ex) { LogError(ex.Message,ex); return false; } } private void OnWebSocketMessage(string message, ClientWebSocketWrapper cws) { var json = JObject.Parse(message); var result = json.TryGetValue("stream", out var messageType); switch (messageType.ToString()) { case "authorization": var authMessage = json.ToObject >(); if( authMessage.data.status == "authorized") { var listen = new ActionMessage() { action = "listen", data = new Streams() { streams = new List() { "trade_updates" } } }; Task.Run(async () => await _ws.SendMessageAsync(JsonConvert.SerializeObject(listen))); } break; case "listening": break; case "trade_updates": var streamMessage = json.ToObject>(); Task.Run(async () => await ProcessStreamingTradeUpdate(streamMessage.data)); break; default: break; } Debug.WriteLine(message); } private async Task ProcessStreamingTradeUpdate(OrderStream orderStream) { try { if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) LogWarn("Could not enter Semaphore from ProcessStreamingTradeUpdate()"); Debug.WriteLine($"{TimeZoneNow:HH:mm:ss.fff}:{orderStream.Event}:{orderStream.order.symbol}:{orderStream.order.status}"); var account = _brokerAccounts[0]; var order = CreateBrokerOrder(account.account_identifier, orderStream.order); var orderIndex = account.GetOrderIndex(orderStream.order.id); if (orderIndex == -1) { account.Orders.Add(order); await _databaseService.AddOrder(order); } else { //Most likely out of order streaming events: EG, new after filled if (account.Orders[orderIndex].Filled()) return; else { account.Orders[orderIndex] = order; await _databaseService.UpdateOrder(order); } } _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, GetOrderAction(order.orderStatus), order))); //Filled or Partially Filled Orders result in Account/Position Updates if (orderStream.Event == "fill" || orderStream.Event == "partial_fill") _ = Task.Run(async () => await RefreshAccount(account, order.symbol)); } catch (Exception) { throw; } finally { _semaphoreSlim.Release(true); } } private void OnWebSocketDisconnect(ClientWebSocketWrapper cwsw) { } private void ClearPendingDisconnectTimer() { if (_streamingReconnectPendingTimer != null) { _streamingReconnectPendingTimer.Enabled = false; _streamingReconnectPendingTimer.Stop(); _streamingReconnectPendingTimer.Dispose(); _streamingReconnectPendingTimer = null; } } public async Task DisconnectStreaming() { //When a user disconnects from the ETrade Service a check is first made to see if there //are any existing loacal orders to monitor in their portfolio //if so, we keep the user session running so local orders are still evaluated for their trigger status //if the user session is left running and there are no longer any orders to monitor while the user is disconnected //the user session will be terminated try { //If _disconnecting = true, the the user has just sent a request to Disconnect form their ETrade user sesssion //Otherwise it is most likely the WebSocket Connection disconnected because of a users iternet issue //or they abruptuly terminated their application or their system crashed if ((StatusTypes)_brokerUserSession.StatusTypeId != StatusTypes.UserDisconnected) { await UpdateBrokerUserSessionStatus(StatusTypes.StreamingDisconnected); await UpdateBrokerUserSessionStatus(StatusTypes.StreamingReconnectPending); _streamingReconnectPendingTimer = new System.Timers.Timer(60000); //Callback When the Timer Expires _streamingReconnectPendingTimer.Elapsed += async (s, e) => { await UpdateBrokerUserSessionStatus(StatusTypes.StreamingReconnectPendingTimeout); await UpdateBrokerUserSessionStatus(StatusTypes.UserDisconnected); ClearPendingDisconnectTimer(); }; _streamingReconnectPendingTimer.Start(); } else { await UpdateBrokerUserSessionStatus(StatusTypes.StreamingDisconnected); } } catch (Exception ex) { LogError(ex.Message, ex); } _portfolioDataSubscriber = null; return true; } private async Task UpdateBrokerUserSessionStatus(StatusTypes statusType) { _brokerUserSession.StatusTypeId = (int)statusType; await _databaseService.UpdateBrokerUserSessionStatus(_brokerUserSession.TargetInstance, _brokerUserSession.BrokerUserId, _brokerUserSession.BrokerUserSessionId, _brokerUserSession.StatusTypeId); return; } private async Task EndSession(StatusTypes statusType) { _streaming = false; _taskPortfolioWorker.Wait(); _taskExecutionWorker.Wait(); await UpdateBrokerUserSessionStatus(statusType); _portfolioDataSubscriber = null; _userSessionCallbacks?.OnSessionEnded(_brokerUserSession.UserName); } public string QuoteMode(int quoteMode) { switch (quoteMode) { case 0: return "Realtime"; case 1: return "Delayed"; case 2: return "Closing"; case 3: return "AHT Realtime"; case 4: return "AHT Before Open"; case 5: return "AHT Closing"; default: return "None"; } } public async Task SaveLinkedAccountSettings(string linkedAccounts) { try { var result = await _databaseService.UpdateBrokerUserLinkedAccounts(_brokerUserSession.BrokerUserId, linkedAccounts); if (result.IsSuccess) _brokerUserSession.LinkedAccounts = linkedAccounts; return result; } catch (Exception ex) { LogError(ex.Message, ex); return ServiceResult.GeneralError(ex); } } public async Task>> GetUserPortfolioAllAccounts() { var serviceResult = new ServiceResult>(); //try //{ // var accountsResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, "/v1/accounts/list"); // var brokerAccounts = new List(); // if (string.IsNullOrEmpty(_brokerUserSession.LinkedAccounts)) // { // _brokerUserSession.LinkedAccounts = string.Join(",", accountsResponse.Data.AccountListResponse.Accounts.Account.Select(a => a.accountId).ToList()); // await _databaseService.UpdateBrokerUserLinkedAccounts(_brokerUserSession.BrokerUserId, _brokerUserSession.LinkedAccounts); // } // foreach (var account in accountsResponse.Data.AccountListResponse.Accounts.Account) // { // var ba = new BrokerAccount() // { // account_identifier = account.accountId, // account_key = account.accountIdKey, // account_name = account.accountName, // account_type = account.accountType, // account_institution_type = account.institutionType, // account_status = account.accountStatus, // nickname = account.accountDesc, // account_description = account.accountDesc, // linked = _brokerUserSession.LinkedAccounts.Contains(account.accountId), // }; // var accountBalanceResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/balance?instType={account.institutionType}&realTimeNAV=true"); // if (!accountBalanceResponse.IsSuccess) // continue; // try // { // var balanceResponse = accountBalanceResponse.Data.BalanceResponse; // ba.account_mode = balanceResponse.accountMode; // ba.quote_mode = QuoteMode(balanceResponse.quoteMode); // ba.day_trader_status = balanceResponse.dayTraderStatus; // ba.account_value = balanceResponse.Computed.RealTimeValues.totalAccountValue; // ba.available_funds = balanceResponse.Computed.cashAvailableForWithdrawal; // ba.buying_power = balanceResponse.Computed.cashAvailableForInvestment; // ba.buying_power_in_use = balanceResponse.Computed.fundsWithheldFromPurchasePower; // } // catch (Exception) { } // var accountPortfolioResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/portfolio?view=QUICK&totalsRequired=true&lotsRequired=true"); // if (!accountPortfolioResponse.IsSuccess) // continue; // var accountPortfolios = accountPortfolioResponse.Data.PortfolioResponse; // ba.realized_profit_loss = accountPortfolios.Totals.todaysGainLoss; // ba.unrealized_profit_loss = accountPortfolios.Totals.totalGainLoss; // foreach (var portfolio in accountPortfolios.AccountPortfolio ?? Enumerable.Empty()) // { // var positionLotsTasks = new List>>(); // foreach (var position in portfolio.Position) // { // var bp = new BrokerPosition() // { // id = 0, // broker_id = _brokerId, // broker_account_identifier = account.accountId, // broker_position_id = position.positionId, // symbol = position.Product.symbol, // shares = position.quantity, // average_open_price = position.costPerShare, // last_trade = position.Quick.lastTrade, // realized_profit_loss = position.RealizedProfitLoss // }; // ba.AddPosition(bp); // } // } // var ordersTimeFrame = TimeZoneNow; // //Get Cached Orders // var orders = await _databaseService.GetOrders(_brokerUserSession.UserName, _brokerId, account.accountId, "OPEN", null, null); // if (orders.Obj != null && orders.Obj.Count() > 0) // { // var localOrders = orders.Obj.ToList(); // localOrders?.RemoveAll(lo => (lo.localExecutionType == "time-cancel" && TimeZoneNow > lo.goodTillTime) || // (lo.localExecutionType == "time-exit" && TimeZoneNow > lo.goodAfterTime)); // } // foreach (var order in orders.Obj) // { // if (order.IsLocal) // { // ba.AddOrder(order); // } // } // brokerAccounts.Add(ba); // } // serviceResult.Set(brokerAccounts.AsReadOnly(), ServiceResultStatuses.Success); //} //catch (Exception ex) //{ // serviceResult.Set(ServiceResultStatuses.ServerError, "Server Error"); // LogError(ex.Message, ex); //} return serviceResult; } public async Task>> GetUserPortfolioLinkedAccounts() { var serviceResult = new ServiceResult>(); try { if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) { LogWarn("Could not enter Semaphore from GetUserPortfolioLinkedAccounts()"); } var accountResult = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/{ALPACA_API_VERSION}/account"); var account = accountResult.Data; _brokerAccounts = new List(); if (string.IsNullOrEmpty(_brokerUserSession.LinkedAccounts)) { _brokerUserSession.LinkedAccounts = account.account_number; await _databaseService.UpdateBrokerUserLinkedAccounts(_brokerUserSession.BrokerUserId, _brokerUserSession.LinkedAccounts); } var brokerAccount = new BrokerAccount() { account_identifier = account.account_number, account_key = account.id, account_name = account.account_number, account_type = account.sma, account_institution_type = account.sma, account_status = account.status, nickname = account.account_number, account_description = account.account_number, linked = _brokerUserSession.LinkedAccounts.Contains(account.account_number), account_mode = "N/A", quote_mode = "Realtime", day_trader_status = "N/A", account_value = account.portfolio_value ?? 0, available_funds = account.cash ?? 0, buying_power = account.buying_power ?? 0, buying_power_in_use = account.daytrading_buying_power ?? 0 }; var positionsResult = await _oAuthSession.SendRequestAsync>(HttpMethod.Get, $"/{ALPACA_API_VERSION}/positions"); if (positionsResult.Data != null && positionsResult.Data.Count > 0) { var positions = positionsResult.Data; foreach( var position in positions) { var brokerPosition = new BrokerPosition() { id = 0, broker_id = _brokerId, broker_account_identifier = account.account_number, broker_position_id = position.asset_id, symbol = position.symbol, shares = position.qty ?? 0, average_open_price = position.avg_entry_price ?? 0, last_trade = position.lastday_price ?? 0, realized_profit_loss = position.unrealized_pl }; Debug.WriteLine($"Position-{position.symbol}:{position.asset_id}"); brokerAccount.AddPosition(brokerPosition); } } var ordersTimeFrame = TimeZoneNow; var ordersResult = await _oAuthSession.SendRequestAsync>(HttpMethod.Get, $"/{ALPACA_API_VERSION}/orders?status=all"); if (ordersResult.Data != null && ordersResult.Data.Count > 0) { ordersResult.Data.RemoveAll(o => o.status != "OPEN" & o.created_at.Value.Date < TimeZoneNow.Date); foreach(var order in ordersResult.Data) { var brokerOrder = CreateBrokerOrder(account.account_number, order); Debug.WriteLine($"Order-{order.symbol}:{order.asset_id}"); brokerAccount.AddOrder(brokerOrder); } } // //Get Todays Orders // var ordersResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/orders?count=100&fromDate={ordersTimeFrame:MMddyyyy}&toDate={ordersTimeFrame:MMddyyyy}", "application/json"); // if (ordersResponse.Data != null) // { // foreach (var order in ordersResponse.Data.OrdersResponse.Order) // { // var orderDetail = order.OrderDetail.First(); // var instrument = orderDetail.Instrument.First(); // if (orderDetail.status == "CANCELED" || orderDetail.status == "EXPIRED" || orderDetail.status == "REJECTED") // { // var orderDetailsResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/orders/{order.orderId}", "application/json"); // if (orderDetailsResponse.IsSuccess) // order.Events = orderDetailsResponse.Data.OrdersResponse.Order[0].Events; // } // ba.AddOrder(CreateBrokerOrder(ba.account_identifier, order)); // } // } // //Get All Open Orders, and discard Todays Open Orders retrieved in the previous orders request // var openOrdersResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/orders?status=OPEN"); // if (openOrdersResponse.IsSuccess && openOrdersResponse.Data != null) // { // foreach (var order in openOrdersResponse.Data.OrdersResponse.Order) // { // if (ba.Orders.Count == 0 || !ba.Orders.Any(o => o.brokerOrderId == order.orderId)) // { // ba.AddOrder(CreateBrokerOrder(ba.account_identifier, order)); // } // } // } // //Get Cached Orders // var orders = await _databaseService.GetOrders(_brokerUserSession.UserName, _brokerId, account.accountId, "OPEN", null, null); // if (orders.Obj != null && orders.Obj.Count() > 0) // { // var localOrders = orders.Obj.ToList(); // localOrders?.RemoveAll(lo => (lo.localExecutionType == "time-cancel" && TimeZoneNow > lo.goodTillTime) || // (lo.localExecutionType == "time-exit" && TimeZoneNow > lo.goodAfterTime)); // } // foreach (var order in orders.Obj) // { // if (order.IsLocal) // { // ba.AddOrder(order); // } // else // { // var oi = ba.GetOrderIndex(order.brokerOrderId); // if (oi > -1) // { // ba.Orders[oi].id = order.id; // ba.Orders[oi].brokerParentOrderId = order.brokerParentOrderId; // ba.Orders[oi].goodTillTime = order.goodTillTime; // ba.Orders[oi].ocaGroup = order.ocaGroup; // ba.Orders[oi].strategyName = order.strategyName; // } // } // } // foreach (var bp in ba.Positions ?? Enumerable.Empty()) // { // var order = ba.GetOrder(bp.opening_order_id); // if (null == order && !string.IsNullOrEmpty(bp.opening_order_id)) // { // var openingOrderResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/orders/{bp.opening_order_id}"); // var orderExecutedTime = openingOrderResponse.Data.OrdersResponse.Order.First().OrderDetail.First().executedTime; // bp.entry_when = FromUnixTimeMilliseconds(orderExecutedTime); // } // else // { // bp.entry_when = order.executedWhen.Value; // } // } _brokerAccounts.Add(brokerAccount); // } serviceResult.Set(_brokerAccounts.AsReadOnly(), ServiceResultStatuses.Success); _portfolioLoaded = true; } catch (Exception ex) { serviceResult.Set(ServiceResultStatuses.ServerError, "Server Error"); LogError(ex.Message, ex); } finally { _semaphoreSlim.Release(true); } return serviceResult; } public async Task PlaceOrdersWorker(BrokerOrderRequest brokerOrderRequest) { try { switch (brokerOrderRequest.Macro) { case "DOUBLE_POSITION": await DoublePosition(brokerOrderRequest); break; case "HALF_POSITION": await HalfPosition(brokerOrderRequest); break; case "FLATTEN_POSITION": await FlattenPosition(brokerOrderRequest); break; case "ENTER_POSITION": await EnterPosition(brokerOrderRequest); break; case "REVERSE_POSITION": await ReversePosition(brokerOrderRequest); break; default: { var order = brokerOrderRequest.Orders[0]; var account = GetAccount(order.brokerAccountIdentifier); var position = account.GetPosition(order.symbol); var orderActions = GetOrderActions(order.orderAction == "BUY", position == null ? 0 : position.shares, order.orderedQuantity ?? 0); var orders = new List(); foreach (var o in orderActions) { orders.Add(CopyBrokerOrder(order, o.Item1, o.Item2, string.Empty)); } await PlaceOrders(orders); break; } } } catch (Exception ex) { LogError(ex.Message, ex); } } public async Task ModifyOrdersWorker(BrokerOrderRequest brokerOrderRequest) { try { switch (brokerOrderRequest.Macro) { case "NONE": foreach (var order in brokerOrderRequest.Orders) { var existingOrder = GetOrder(order.brokerAccountIdentifier, order.brokerOrderId); if (existingOrder.IsLocal) { await ModifyLocalOrder(order); } else { ConfigureBrokerOrder(order); existingOrder.replacing = true; await ReplaceOrder(order, existingOrder); } } break; } } catch (Exception ex) { LogError(ex.Message, ex); } } public async Task CancelOrdersWorker(BrokerOrderRequest brokerOrderRequest) { try { var ordersToCancel = new List(); ordersToCancel.AddRange(brokerOrderRequest.Orders); foreach (var order in brokerOrderRequest.Orders) { ordersToCancel.AddRange(GetGroupOrders(order)); } await CancelOrders(ordersToCancel); } catch (Exception ex) { LogError(ex.Message, ex); } } private async Task PlaceOrders(List orders) { try { var enteredSemaphore = false; foreach (var order in orders) { //order.manualResetEvent.Reset(); if (await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) enteredSemaphore = true; await PlaceOrder(order, false); _semaphoreSlim.Release(enteredSemaphore); //order.manualResetEvent.WaitOne(RESET_EVENT_WAIT_TIME); } } catch (Exception ex) { LogError(ex.Message, ex); } } private async Task PlaceOrder(BrokerOrder order, bool forceLocal) { try { var account = GetAccount(order.brokerAccountIdentifier); if( !order.convertFromLocal) order.localOrderId = await GetNextOrderIdAsync(); order.brokerId = _brokerId; order.tiUser = _brokerUserSession.UserName; order.isEntry = order.orderAction == "BUY" || order.orderAction == "SELL_SHORT"; order.createdWhen = TimeZoneNow; order.updatedWhen = TimeZoneNow; //Local Orders if ( order.goodAfterTime.HasValue) { order.brokerOrderId = order.localOrderId; order.isEntry = false; order.localExecutionType = "time-exit"; if (string.IsNullOrEmpty(order.orderStatus)) order.orderStatus = "OPEN"; await AddOrder(account, order); _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, GetOrderAction(order.orderStatus), order))); return true; } //Alpaca Orders if (order.goodTillTime.HasValue) order.localExecutionType = "time-cancel"; if (!order.convertFromLocal) await AddOrder(account, order); var alpacaOrder = CreateAlpacaOrder(order); var alpacaOrderResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Post, $"/{ALPACA_API_VERSION}/orders",alpacaOrder); if (alpacaOrderResponse.IsSuccess) { order.brokerOrderId = alpacaOrderResponse.Data.id; order.orderStatus = ConvertOrderStatus(alpacaOrderResponse.Data.status); order.updatedWhen = alpacaOrderResponse.Data.updated_at.Value; } else { order.localOrderId = await GetNextOrderIdAsync(); order.brokerOrderId = order.localOrderId; order.orderStatus = "REJECTED"; order.errorMessage = alpacaOrderResponse.Message; } await _databaseService.UpdateOrder(order); Debug.WriteLine($"{"N/A"}:{order.symbol}:{order.orderStatus}"); _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage(alpacaOrderResponse.Message, TimeZoneNow, GetOrderAction(order.orderStatus), order))); } catch (Exception ex) { LogError(ex.Message, ex); } return true; } private async Task CancelOrders(List orders) { try { foreach (var orderToCancel in orders) { var account = GetAccount(orderToCancel.brokerAccountIdentifier); var order = account.GetOrder(orderToCancel.brokerOrderId); if (order == null) continue; if (order.IsLocal) { order.comments = "Cancel Requested"; order.orderStatus = "CANCELLED"; var updateOrderResult = await _databaseService.UpdateOrder(order); if (updateOrderResult.IsSuccess) _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Canceled", TimeZoneNow, order.orderStatus, order))); order.manualResetEvent.Set(); } else { var cancelOrderResult = await _oAuthSession.SendRequestAsync(HttpMethod.Delete, $"/{ALPACA_API_VERSION}/orders/{order.brokerOrderId}"); if (!cancelOrderResult.IsSuccess) { var updateOrderResult = await _databaseService.ErrorOrder(cancelOrderResult.Message, order); _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage(cancelOrderResult.Message, TimeZoneNow, "ERROR", order))); } } } } catch (Exception ex) { LogError(ex.Message, ex); } } private async Task ModifyLocalOrder(BrokerOrder order) { try { var account = GetAccount(order.brokerAccountIdentifier); var brokerOrderIdToReplace = string.IsNullOrEmpty(order.replacingBrokerOrderId) ? order.brokerOrderId : order.replacingBrokerOrderId; var oi = account.GetOrderIndex(brokerOrderIdToReplace); if (oi < 0) return; account.Orders[oi].replacing = order.replacing; //Local Orders if (account.Orders[oi].IsLocal) { account.Orders[oi].orderedQuantity = order.orderedQuantity; switch (account.Orders[oi].localExecutionType) { case "time-exit": account.Orders[oi].goodAfterTime = order.goodAfterTime; break; case "time-cancel": account.Orders[oi].goodTillTime = order.goodTillTime; break; case "price-exit": if (order.stopPrice.HasValue) account.Orders[oi].stopPrice = order.stopPrice; if (order.limitPrice.HasValue) account.Orders[oi].limitPrice = order.limitPrice; break; } var updateOrderResult = await _databaseService.UpdateOrder(account.Orders[oi]); _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, "ORDER_MODIFIED", account.Orders[oi]))); return; } } catch (Exception ex) { LogError(ex.Message, ex); } } private async Task DoublePosition(BrokerOrderRequest brokerOrderRequest) { //Double the position size, then double any pending exit orders; both local and etrade orders var enteredSemaphore = false; try { var order = brokerOrderRequest.Orders[0]; var account = GetAccount(order.brokerAccountIdentifier); var position = account.GetPosition(order.symbol); var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction == "BUY" ? "SELL" : "BUY"); var localExitOrders = pendingExitOrders.Where(o => o.IsLocal).ToList(); var eTradeExitOrders = pendingExitOrders.Where(o => !o.IsLocal).ToList(); if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) { order.orderStatus = "ERROR"; _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order))); return; } else enteredSemaphore = true; var orderAction = GetOrderActions(order.orderAction == "BUY", position == null ? 0 : position.shares, order.orderedQuantity ?? 0).FirstOrDefault(); order = CopyBrokerOrder(order, orderAction.Item1, orderAction.Item2, string.Empty); order.waitStatus = "EXECUTED"; order.manualResetEvent.Reset(); await PlaceOrder(order, false); _semaphoreSlim.Release(enteredSemaphore); order.manualResetEvent.WaitOne(RESET_EVENT_WAIT_TIME); if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) LogWarn($"Could not enter Semaphore from DoublePosition()"); else enteredSemaphore = true; foreach (var pendingExitOrder in localExitOrders) { pendingExitOrder.orderedQuantity = pendingExitOrder.orderedQuantity * 2; await ModifyLocalOrder(pendingExitOrder); } if (eTradeExitOrders.Count > 0) { foreach (var eTradeExitOrder in eTradeExitOrders) { var newOrder = CopyBrokerOrder(eTradeExitOrder, eTradeExitOrder.orderAction, eTradeExitOrder.orderedQuantity * 2, "NEW"); newOrder.replacing = true; eTradeExitOrder.replacing = true; await ReplaceOrder(newOrder, eTradeExitOrder); } } } catch (Exception ex) { LogError(ex.Message, ex); } finally { _semaphoreSlim.Release(enteredSemaphore); } } private async Task HalfPosition(BrokerOrderRequest brokerOrderRequest) { // 1/2 the pending exit orders, then 1/2 the position size // we must first wait for the replacement of the ETrade Exit orders, // so we don't get an over bought/sold rejection when selling to reduce the position by 1/2 var enteredSemaphore = false; try { var order = brokerOrderRequest.Orders[0]; if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) { order.orderStatus = "ERROR"; _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order))); return; } else enteredSemaphore = true; var account = GetAccount(order.brokerAccountIdentifier); var position = account.GetPosition(order.symbol); var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction); var eTradeExitOrders = pendingExitOrders.Where(o => !o.IsLocal).OrderBy(o => o.orderedQuantity).ToList(); var allLocalExitOrders = pendingExitOrders.Where(o => o.IsLocal).ToList(); if (eTradeExitOrders.Count > 0) { var waitHandles = new List(); var existingPositionShares = position?.shares; var newPositionShares = (int)(Math.Abs(existingPositionShares.Value) - order.orderedQuantity); var newExitOrderQuantity = 0; int orderCounter = 0; foreach (var eTradeExitOrder in eTradeExitOrders) { orderCounter++; var cancelOrder = false; int newOrderedQuantity = 0; if (newExitOrderQuantity == newPositionShares) { cancelOrder = true; } else { //If we are processing the last order make sure we make up any missing shares if (orderCounter == eTradeExitOrders.Count) newOrderedQuantity = newPositionShares - newExitOrderQuantity; else newOrderedQuantity = Math.Max(1, GetShares((int)eTradeExitOrder.orderedQuantity.Value, -50)); } eTradeExitOrder.manualResetEvent.Reset(); //These are the local exit orders in the same oca group as the ETrade exit order var groupExitOrders = allLocalExitOrders.Where(o => o.brokerParentOrderId == eTradeExitOrder.brokerParentOrderId || o.ocaGroup == eTradeExitOrder.ocaGroup).ToList(); if (cancelOrder) { //Add ETrade exit order to the list of orders to cancel; groupExitOrders.Add(eTradeExitOrder); await CancelOrders(groupExitOrders); waitHandles.Add(eTradeExitOrder.manualResetEvent); } else { newExitOrderQuantity += newOrderedQuantity; //Skip Adjusting Orders with change in quantity if (newOrderedQuantity == eTradeExitOrder.orderedQuantity) continue; var newOrder = CopyBrokerOrder(eTradeExitOrder, eTradeExitOrder.orderAction, newOrderedQuantity, "NEW"); newOrder.replacing = true; eTradeExitOrder.replacing = true; newOrder.manualResetEvent.Reset(); await ReplaceOrder(newOrder, eTradeExitOrder); waitHandles.Add(newOrder.manualResetEvent); waitHandles.Add(eTradeExitOrder.manualResetEvent); foreach (var pendingExitOrder in groupExitOrders) { pendingExitOrder.orderedQuantity = newOrderedQuantity; await ModifyLocalOrder(pendingExitOrder); } } } _semaphoreSlim.Release(enteredSemaphore); WaitHandle.WaitAll(waitHandles.ToArray(), RESET_EVENT_WAIT_TIME); } else { //If there is a possibility that we can have exit orders that are not part of an OCA group then we'll need to add logic to cancel them independently if (allLocalExitOrders.Count > 0) { //foreach (var pendingExitOrder in localExitOrders) //{ // pendingExitOrder.orderedQuantity = order.orderedQuantity; // await ModifyLocalOrder(pendingExitOrder); //} } _semaphoreSlim.Release(enteredSemaphore); } if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) LogWarn($"Could not enter Semaphore from HalfPosition()"); else enteredSemaphore = true; var orderAction = GetOrderActions(order.orderAction == "BUY", position == null ? 0 : position.shares, order.orderedQuantity ?? 0).FirstOrDefault(); order = CopyBrokerOrder(order, orderAction.Item1, orderAction.Item2, string.Empty); await PlaceOrder(order, false); } catch (Exception ex) { LogError(ex.Message, ex); } finally { _semaphoreSlim.Release(enteredSemaphore); } } private async Task FlattenPosition(BrokerOrderRequest brokerOrderRequest) { var enteredSemaphore = false; try { var order = brokerOrderRequest.Orders[0]; #region Option 1 //Option 1 - Cancel ETrade Exit Order First, and other Local Orders, then send Market Order to Close var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction); if (pendingExitOrders != null && pendingExitOrders.Count > 0) { if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) { order.orderStatus = "ERROR"; _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order))); return; } else enteredSemaphore = true; //Allows suspending further order processing until an order status cange is detected by the PortfolioWorker() pendingExitOrders.ForEach(peo => { peo.manualResetEvent.Reset(); peo.replacing = true; }); await CancelOrders(pendingExitOrders); //We Cannot send the close order request for a flatten until the pending exit orders are canceled //otherwise ETrade will return an overbought or oversold exception //CancelOrders will Reset the Event on each order //When the order status change is detected in the PortfolioWorker, the event will be Set() _semaphoreSlim.Release(enteredSemaphore); WaitHandle.WaitAll(pendingExitOrders.Select(o => o.manualResetEvent).ToArray(), 15000); } if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) LogWarn($"Could not enter Semaphore from FlattenPosition()"); else enteredSemaphore = true; var account = GetAccount(order.brokerAccountIdentifier); var position = account.GetPosition(order.symbol); var orderAction = GetOrderActions(order.orderAction == "BUY", position == null ? 0 : position.shares, order.orderedQuantity ?? 0).FirstOrDefault(); var newOrder = CopyBrokerOrder(order, orderAction.Item1, orderAction.Item2, string.Empty); await PlaceOrder(newOrder, false); #endregion #region Option 2 //Option 2 - Cancel Local Exit Orders First, //Replace ETrade Exit Order with a Market Order to close //Or Send Market Order if no ETrade Exit Order Exists //Option 2, also works for button_limit_out by using a Limit Order //if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) //{ // order.orderStatus = "ERROR"; // _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order))); // return; //} //else // enteredSemaphore = true; //var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction); //var localExitOrders = pendingExitOrders.Where(o => o.IsLocal).ToList(); //var eTradeExitOrders = pendingExitOrders.Where(o => !o.IsLocal).ToList(); //if (localExitOrders != null && localExitOrders.Count > 0) // await CancelOrders(localExitOrders); //order.replacing = true; //if (eTradeExitOrders.Count > 0) //{ // var waitHandles = new List(); // foreach (var eTradeExitOrder in eTradeExitOrders) // { // eTradeExitOrder.replacing = true; // ConfigureBrokerOrder(order); // await ReplaceOrder(order, eTradeExitOrder); // waitHandles.Add(order.manualResetEvent); // waitHandles.Add(eTradeExitOrder.manualResetEvent); // } // _semaphoreSlim.Release(enteredSemaphore); // WaitHandle.WaitAll(waitHandles.ToArray(), RESET_EVENT_WAIT_TIME); //} //else //await PlaceOrder(order, false); #endregion } catch (Exception ex) { LogError(ex.Message, ex); } finally { _semaphoreSlim.Release(enteredSemaphore); } } private async Task ReversePosition(BrokerOrderRequest brokerOrderRequest) { var enteredSemaphore = false; try { var order = brokerOrderRequest.Orders[0]; //Cancel ETrade Exit Order First, and other Local Orders, then send Orders to Reverse the Position var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction); if (pendingExitOrders != null && pendingExitOrders.Count > 0) { if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) { order.orderStatus = "ERROR"; _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order))); return; } else enteredSemaphore = true; //Allows suspending further order processing until an order status cange is detected by the PortfolioWorker() pendingExitOrders.ForEach(peo => { peo.manualResetEvent.Reset(); peo.replacing = true; }); await CancelOrders(pendingExitOrders); //We Cannot send the orders for a reverse until the pending exit orders are canceled //otherwise ETrade will return an overbought or oversold exception //CancelOrders will Reset the Event on each order //When the order status change is detected in the PortfolioWorker, the event will be Set() _semaphoreSlim.Release(enteredSemaphore); WaitHandle.WaitAll(pendingExitOrders.Select(o => o.manualResetEvent).ToArray(), 15000); } var account = GetAccount(order.brokerAccountIdentifier); var position = account.GetPosition(order.symbol); var orderActions = GetOrderActions(order.orderAction == "BUY", position == null ? 0 : position.shares, order.orderedQuantity ?? 0); var orders = new List(); foreach (var o in orderActions) { orders.Add(CopyBrokerOrder(order, o.Item1, o.Item2, string.Empty)); } await PlaceOrders(orders); } catch (Exception ex) { _semaphoreSlim.Release(enteredSemaphore); LogError(ex.Message, ex); } } private async Task EnterPosition(BrokerOrderRequest brokerOrderRequest) { var enteredSemaphore = false; try { var order = brokerOrderRequest.Orders[0]; order.manualResetEvent.Reset(); if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) { order.orderStatus = "ERROR"; _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order))); return; } else enteredSemaphore = true; //This is the entry order //If the Entry order is rejected then end processing var account = GetAccount(order.brokerAccountIdentifier); var position = account.GetPosition(order.symbol); var orderAction = GetOrderActions(order.orderAction == "BUY", position == null ? 0 : position.shares, order.orderedQuantity ?? 0).FirstOrDefault(); order = CopyBrokerOrder(order, orderAction.Item1, orderAction.Item2, string.Empty); var result = await PlaceOrder(order, false); if (!result) return; //These are the exit orders foreach (var exitOrder in brokerOrderRequest.Orders.Skip(1)) { orderAction = GetOrderActions(exitOrder.orderAction == "BUY", position == null ? 0 : position.shares, exitOrder.orderedQuantity ?? 0).FirstOrDefault(); exitOrder.brokerParentOrderId = order.brokerOrderId; exitOrder.ocaGroup = order.brokerOrderId; exitOrder.orderStatus = "NEW"; await PlaceOrder(exitOrder, true); } } catch (Exception ex) { LogError(ex.Message, ex); } finally { _semaphoreSlim.Release(enteredSemaphore); } } private async Task ReplaceOrder(BrokerOrder newOrder, BrokerOrder orderToReplace) { //try //{ // var account = GetAccount(orderToReplace.brokerAccountIdentifier); // var previewOrderRequest = MakePreviewOrderRequest(newOrder); // var previewOrderResponse = await _oAuthSession // .SendRequestAsync(HttpMethod.Put, // $"/v1/accounts/{account.account_key}/orders/{orderToReplace.brokerOrderId}/change/preview", // new PreviewOrderRequestWrapper(previewOrderRequest)); // if (!previewOrderResponse.IsSuccess) // { // newOrder.createdWhen = TimeZoneNow; // newOrder.updatedWhen = TimeZoneNow; // newOrder.orderStatus = "REJECTED"; // newOrder.errorMessage = previewOrderResponse.Message; // _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage(previewOrderResponse.Message, TimeZoneNow, newOrder.orderStatus, newOrder))); // return; // } // var newOrderRequest = new PlaceOrderRequest() // { // clientOrderId = previewOrderRequest.clientOrderId, // orderType = previewOrderRequest.orderType, // Order = previewOrderRequest.Order, // PreviewIds = previewOrderResponse.Data.PreviewOrderResponse.PreviewIds // }; // try // { // var orderResponse = await _oAuthSession // .SendRequestAsync(HttpMethod.Put, // $"/v1/accounts/{account.account_key}/orders/{orderToReplace.brokerOrderId}/change/place", // new PlaceOrderRequestWrapper(newOrderRequest)); // if (orderResponse.IsSuccess) // { // var eOrder = orderResponse.Data.PlaceOrderResponse.GetOrder(); // newOrder.brokerOrderId = eOrder.orderId; // newOrder.brokerParentOrderId = orderToReplace.brokerParentOrderId; // //This is for local order executions converted to a Market Order // //Revisit // if (newOrder.convertFromLocal) // { // var oi = account.GetOrderIndex(newOrder.brokerOrderId); // account.Orders[oi] = newOrder; // await _databaseService.UpdateOrder(account.Orders[oi]); // } // else // { // await AddOrder(account, newOrder); // } // } // } // catch (Exception ex) // { // LogError(ex.Message, ex); // } //} //catch (Exception ex) //{ // LogError(ex.Message, ex); //} } //private async Task ActivateChildOrders(BrokerAccount account, BrokerOrder order) //{ // try // { // var childOrders = account.Orders.Where(o => o.brokerParentOrderId == order.brokerOrderId).ToList(); // if (childOrders.Count == 0) return; // await RefreshAccount(account); // var eTradeOrderActivated = false; // //Trailing Stop Order has highest priority for converting from local to ETrade Order // var trailingStopOrder = childOrders.FirstOrDefault(o => o.priceType == "TRAILING_STOP_CNST"); // if (trailingStopOrder != null) // { // trailingStopOrder.orderStatus = "NEW"; // trailingStopOrder.replacing = true; // trailingStopOrder.convertFromLocal = true; // trailingStopOrder.localExecutionType = null; // await PlaceOrder(trailingStopOrder, false); // eTradeOrderActivated = true; // } // //Stop Order has 2nd priority for converting from local to ETrade Order // var stopOrder = childOrders.FirstOrDefault(o => o.priceType == "STOP"); // if (stopOrder != null) // { // if (eTradeOrderActivated) // { // stopOrder.orderStatus = "OPEN"; // var updateOrderResult = await _databaseService.UpdateOrder(stopOrder); // _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, "ORDER_MODIFIED", stopOrder))); // } // else // { // stopOrder.orderStatus = "NEW"; // stopOrder.replacing = true; // stopOrder.convertFromLocal = true; // stopOrder.localExecutionType = null; // await PlaceOrder(stopOrder, false); // eTradeOrderActivated = true; // } // } // //Limit Order has 3rd priority for converting from local to ETrade Order // var limitOrder = childOrders.FirstOrDefault(o => o.priceType == "LIMIT"); // if (limitOrder != null) // { // if (eTradeOrderActivated) // { // limitOrder.orderStatus = "OPEN"; // var updateOrderResult = await _databaseService.UpdateOrder(limitOrder); // _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, "ORDER_MODIFIED", limitOrder))); // } // else // { // limitOrder.orderStatus = "NEW"; // limitOrder.replacing = true; // limitOrder.convertFromLocal = true; // limitOrder.localExecutionType = null; // await PlaceOrder(limitOrder, false); // eTradeOrderActivated = true; // } // } // //Time Stop Not Supported by ETrade // var marketOrder = childOrders.FirstOrDefault(o => o.priceType == "MARKET" && o.localExecutionType == "time-exit"); // if (marketOrder != null) // { // marketOrder.orderStatus = "OPEN"; // var updateOrderResult = await _databaseService.UpdateOrder(marketOrder); // _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, "ORDER_MODIFIED", marketOrder))); // } // } // catch (Exception ex) // { // LogError(ex.Message, ex); // } //} #endregion private BrokerOrder CreateBrokerOrder(string accountId, AlpacaOrder order) { return new BrokerOrder() { brokerId = _brokerId, tiUser = _brokerUserSession.UserName, brokerAccountIdentifier = accountId, positionId = order.asset_id, brokerOrderId = order.id, // brokerOrderNumber = order.id, orderType = order.order_class, securityType = order.asset_class, symbol = order.symbol, orderAction = order.side.ToUpper(), isEntry = true, orderedQuantity = order.qty, filledQuantity = order.filled_qty, averageExecutionPrice = order.filled_avg_price, estimatedCommission = 0, orderStatus = ConvertOrderStatus(order.status), priceType = order.type, stopPrice = order.stop_price, limitPrice = order.limit_price, trailPrice = order.trail_price, offsetType = "percent", offsetValue = order.trail_percent, term = GetBrokerTimeInForce(order.time_in_force), createdWhen = order.created_at.Value, updatedWhen = order.updated_at.Value, executedWhen = order.filled_at }; } private AlpacaOrder CreateAlpacaOrder(BrokerOrder order) { var alpacaOrder = new AlpacaOrder() { client_order_id = order.localOrderId, order_class = order.orderType, type = order.priceType, symbol = order.symbol, time_in_force = GetAlpacaTimeInForce(order.term), limit_price = order.limitPrice, stop_price = order.stopPrice, side = order.orderAction.ToLower(), qty = order.orderedQuantity, extended_hours = order.outsideRegularTradingHours }; return alpacaOrder; } private string GetAlpacaTimeInForce(string timeInForce, string orderAction = "") { if (orderAction == "SELL_SHORT") return "day"; switch (timeInForce.ToUpper().Replace(" ", "_")) { case "GTC": return "gtc"; case "GDA": return "day"; default: return "day"; } } private string GetBrokerTimeInForce(string duration) { switch (duration.ToUpper().Replace(" ", "_")) { case "GTC": return "GTC"; case "DAY": return "GDA"; case "FOK": return "FOK"; default: return "GDA"; } } private string GetOrderAction(string alpacaOrderStatus) { switch (alpacaOrderStatus.ToUpper().Replace(" ", "_")) { case "ACCEPTED": return "ACCEPTED"; case "NEW": case "PENDING_NEW": return "NEW"; case "OPEN": return "OPEN"; case "ORDER_MODIFIED": return "ORDER_MODIFIED"; case "PENDING_CANCEL": return "PENDING_CANCEL"; case "CANCELED": case "CANCELLED": case "EXPIRED": return "CANCELLED"; case "FILL": case "FILLED": case "EXECUTED": return "EXECUTED"; case "REJECTED": return "REJECTED"; case "ERROR": return "ERROR"; default: return "ERROR"; } } private string ConvertOrderStatus(string alpacaOrderStatus) { return alpacaOrderStatus.ToUpper().Replace(" ", "_"); } private int GetShares(int shares, double percentage) { return Math.Abs((int)Math.Round(shares * percentage / 100, 0)); } private string jcso(object data) { return JsonConvert.SerializeObject(data); } private bool ShouldPlaceLocal(BrokerAccount account, BrokerOrder order) { if (order.goodAfterTime.HasValue) return true; if (order.isEntry ?? false) return false; var quantity = account.Orders.Where(o => !o.IsLocal && o.brokerOrderId != order.brokerOrderId && o.symbol == order.symbol && (o.orderStatus == "OPEN" || o.orderStatus == "NEW") && o.orderAction == order.orderAction && (o.priceType == "LIMIT" || o.priceType == "STOP" || o.priceType == "TRAILING_STOP_CNST")).Sum(o => o.orderedQuantity); var position = account.GetPosition(order.symbol); var positionQuantity = position == null ? 0 : position.shares; return (quantity + order.orderedQuantity) > Math.Abs(positionQuantity); } private string ParseOrderId(string value) { var pos = value.IndexOf('.'); return pos > -1 ? value.Substring(0, pos) : value; } private BrokerAccount GetAccount(string accountId) { return _brokerAccounts.FirstOrDefault(a => a.account_identifier == accountId); } //private List GetOrders(BrokerAccount account) //{ // //This function retrives a new order snapshot that is used to detect new orders and/or order changes // //when compared to the prior snapshot. // //ETrades order api date filters only apply to the order creation date, not the executed or updated dates // //Therefore is is most efficient to check for the absense of orders in subsequent snapshots to detect // //the order status changes on orders with a previous status of OPEN. // DateTime ordersTimeFrameEnd = TimeZoneNow.Date; // DateTime ordersTimeFrameStart = ordersTimeFrameEnd.AddDays(-1); // DateTime oldest = DateTime.MinValue; // List orders = new List(); // var fetchTasks = new List>>(); // try // { // //Get the orders that have an OPEN status and that were Placed prior to the current day // var pastOpenOrders = account.GetOrders(false, "OPEN,CANCEL_REQUESTED", TimeZoneNow); // if (pastOpenOrders != null && pastOpenOrders.Count > 0) // { // //1. Retrieve all Open Orders within the date range determinded by the prior open orders snapshot // oldest = pastOpenOrders.Select(o => o.createdWhen).Min(); // fetchTasks.Add( // _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.account_key}/orders?status=OPEN&fromDate={oldest:MMddyyyy}&toDate={ordersTimeFrameEnd.AddDays(-1):MMddyyyy}") // ); // } // //2. Get Today's orders // fetchTasks.Add( // _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.account_key}/orders?count=100&fromDate={ordersTimeFrameStart:MMddyyyy}&toDate={ordersTimeFrameEnd:MMddyyyy}") // ); // Task.WaitAll(fetchTasks.ToArray()); // for (int i = 0; i < fetchTasks.Count; i++) // { // if (fetchTasks[i].Result.IsSuccess && fetchTasks[i].Result.Data != null) // { // var taskOrders = fetchTasks[i].Result.Data.OrdersResponse.Order; // if (i == (fetchTasks.Count - 1)) // { // //Add all orders from 2. that aren't in pastOpenOrders // var notInPastOrders = taskOrders.Where(o => FromUnixTimeMilliseconds(o.OrderDetail[0].placedTime).Date == ordersTimeFrameEnd && !pastOpenOrders.Any(po => po.brokerOrderId == o.orderId)).ToList(); // if (notInPastOrders.Count > 0) // orders.AddRange(notInPastOrders); // //Check for any Status Changes // var statusChangedOrders = taskOrders.Where(o => pastOpenOrders.Any(po => po.brokerOrderId == o.orderId && po.orderStatus != o.OrderDetail[0].status)).ToList(); // if (statusChangedOrders.Count > 0) // orders.AddRange(statusChangedOrders); // } // else // { // //3. Detect the OPEN orders missing from pastOpenOrders, we need to get their order details from ETrade to get the new status // var ordersChanged = pastOpenOrders.Where(o => !taskOrders.Any(oc => oc.orderId == o.brokerOrderId)); // if (ordersChanged != null && ordersChanged.Count() > 0) // { // var detailsTasks = new List>>(); // foreach (var order in ordersChanged) // { // detailsTasks.Add( // _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.account_key}/orders/{order.brokerOrderId}") // ); // } // Task.WaitAll(detailsTasks.ToArray()); // foreach (var task in detailsTasks) // { // if (task.Result.IsSuccess && task.Result.Data != null) // orders.AddRange(task.Result.Data.OrdersResponse.Order); // } // } // } // } // } // } // catch (Exception ex) // { // LogError(ex.Message, ex); // } // return orders; //} //private async Task UpdateBrokerOrder(BrokerOrder brokerOrder, Order refreshedOrder) //{ // try // { // var orderDetail = refreshedOrder.OrderDetail.First(); // var instrument = orderDetail.Instrument.First(); // brokerOrder.filledQuantity = instrument.filledQuantity; // brokerOrder.averageExecutionPrice = instrument.averageExecutionPrice; // brokerOrder.estimatedCommission = instrument.estimatedCommission; // brokerOrder.replaceOrderId = orderDetail.replacedByOrderId; // brokerOrder.orderStatus = orderDetail.status; // brokerOrder.stopPrice = orderDetail.stopPrice == 0 ? null : orderDetail.stopPrice; // brokerOrder.limitPrice = orderDetail.limitPrice == 0 ? null : orderDetail.limitPrice; // brokerOrder.offsetType = orderDetail.offsetType; // brokerOrder.offsetValue = orderDetail.offsetValue; // brokerOrder.term = orderDetail.orderTerm; // brokerOrder.createdWhen = FromUnixTimeMilliseconds(orderDetail.placedTime); // brokerOrder.updatedWhen = FromUnixTimeMilliseconds(refreshedOrder.Events == null ? orderDetail.placedTime : refreshedOrder.Events.Event.Last().dateTime); // brokerOrder.executedWhen = orderDetail.executedTime.HasValue ? FromUnixTimeMilliseconds(orderDetail.executedTime) : (DateTime?)null; // await _databaseService.UpdateOrder(brokerOrder); // return true; // } // catch (Exception) // { // return false; // } //} //private BrokerOrder CreateBrokerOrder(string accountId, Order order) //{ // var orderDetail = order.OrderDetail.First(); // var instrument = orderDetail.Instrument.First(); // return new BrokerOrder() // { // brokerId = _brokerId, // tiUser = _brokerUserSession.UserName, // brokerAccountIdentifier = accountId, // brokerOrderId = order.orderId, // brokerOrderNumber = orderDetail.orderNumber, // orderType = order.orderType, // securityType = instrument.Product.securityType, // symbol = instrument.Product.symbol, // orderAction = instrument.orderAction, // isEntry = instrument.orderAction == "BUY" || instrument.orderAction == "SELL_SHORT", // orderedQuantity = instrument.orderedQuantity, // filledQuantity = instrument.filledQuantity, // averageExecutionPrice = instrument.averageExecutionPrice, // estimatedCommission = instrument.estimatedCommission, // orderStatus = orderDetail.status, // priceType = orderDetail.priceType, // stopPrice = orderDetail.stopPrice, // limitPrice = orderDetail.limitPrice, // trailPrice = orderDetail.trailPrice, // offsetType = orderDetail.offsetType, // offsetValue = orderDetail.offsetValue, // term = orderDetail.orderTerm, // createdWhen = FromUnixTimeMilliseconds(orderDetail.placedTime), // updatedWhen = FromUnixTimeMilliseconds(order.Events == null ? orderDetail.placedTime : order.Events.Event.Last().dateTime), // executedWhen = orderDetail.executedTime.HasValue ? FromUnixTimeMilliseconds(orderDetail.executedTime) : (DateTime?)null, // }; //} private BrokerOrder CopyBrokerOrder(BrokerOrder order, string action, decimal? orderedQuantity, string orderStatus) { return new BrokerOrder() { tiUser = order.tiUser, brokerId = _brokerId, brokerParentOrderId = order.brokerParentOrderId, isEntry = action == "BUY" || action == "SELL_SHORT", createdWhen = TimeZoneNow, updatedWhen = TimeZoneNow, brokerAccountIdentifier = order.brokerAccountIdentifier, goodAfterTime = order.goodAfterTime, goodTillTime = order.goodTillTime, securityType = order.securityType, orderType = order.orderType, term = order.term, priceType = order.priceType, limitPrice = order.limitPrice, stopPrice = order.stopPrice, orderAction = action, orderedQuantity = orderedQuantity.HasValue ? orderedQuantity.Value : order.orderedQuantity, symbol = order.symbol, ocaGroup = order.ocaGroup, strategyName = order.strategyName, orderStatus = orderStatus }; } private void ConfigureBrokerOrder(BrokerOrder order) { order.brokerId = _brokerId; order.tiUser = _brokerUserSession.UserName; order.isEntry = order.orderAction == "BUY" || order.orderAction == "SELL_SHORT"; order.createdWhen = TimeZoneNow; order.updatedWhen = TimeZoneNow; order.orderStatus = "NEW"; } private async Task AddOrder(BrokerAccount account, BrokerOrder order) { var addOrderResult = await _databaseService.AddOrder(order); if (addOrderResult.IsSuccess) order.id = addOrderResult.Obj.id; return account.AddOrder(order); } private List GetPendingExitOrders(string accountIdentifier, string symbol, string orderAction) { var pendingExitOrders = new List(); var account = GetAccount(accountIdentifier); pendingExitOrders.AddRange(account.Orders.Where(o => o.symbol == symbol && o.orderStatus != "CANCELLED" && o.orderStatus != "EXECUTED" && o.orderStatus != "REJECTED" && o.orderAction.Contains(orderAction))); return pendingExitOrders; } private List GetGroupOrders(BrokerOrder brokerOrder) { var groupOrdersToCancel = new List(); var account = GetAccount(brokerOrder.brokerAccountIdentifier); var order = account.GetOrder(brokerOrder.brokerOrderId); if (order != null) { if (string.IsNullOrEmpty(order.ocaGroup)) { groupOrdersToCancel.AddRange(account.Orders.Where(o => o.orderStatus != "CANCELLED" && o.orderStatus != "EXECUTED" && o.orderStatus != "REJECTED" && o.brokerParentOrderId == order.brokerOrderId)); } else { groupOrdersToCancel.AddRange(account.Orders.Where(o => o.brokerOrderId != order.brokerOrderId && o.orderStatus != "CANCELLED" && o.orderStatus != "EXECUTED" && o.orderStatus != "REJECTED" && o.ocaGroup == order.ocaGroup && o.orderAction == order.orderAction)); } } return groupOrdersToCancel; } private List> GetOrderActions(bool buy, decimal positionQuantity, decimal orderQuantity) { var orderActions = new List>(); if (positionQuantity <= 0 && !buy) { orderActions.Add(Tuple.Create("SELL_SHORT", orderQuantity)); } else if (positionQuantity > 0 && !buy) { var remainder = positionQuantity - orderQuantity; if (remainder >= 0) orderActions.Add(Tuple.Create("SELL", orderQuantity)); else { orderActions.Add(Tuple.Create("SELL", Math.Abs(positionQuantity))); orderActions.Add(Tuple.Create("SELL_SHORT", Math.Abs(remainder))); } } else if (positionQuantity >= 0 && buy) { orderActions.Add(Tuple.Create("BUY", orderQuantity)); } else if (positionQuantity < 0 && buy) { var remainder = positionQuantity + orderQuantity; if (remainder <= 0) orderActions.Add(Tuple.Create("BUY_TO_COVER", orderQuantity)); else { orderActions.Add(Tuple.Create("BUY_TO_COVER", Math.Abs(positionQuantity))); orderActions.Add(Tuple.Create("BUY", Math.Abs(remainder))); } } return orderActions; } private BrokerOrder GetOrder(string brokerAccountIdentifier, string brokerOrderId) { var account = GetAccount(brokerAccountIdentifier); return account.GetOrder(brokerOrderId); } private BrokerOrder GetExitOrder(BrokerOrder brokerOrder) { var exitOrders = new List(); var account = GetAccount(brokerOrder.brokerAccountIdentifier); var order = account.GetOrder(brokerOrder.brokerOrderId); if (order != null) { exitOrders = account.Orders.Where(o => o.orderStatus == "OPEN" && (o.priceType == "STOP" || o.priceType == "LIMIT" || o.priceType == "STOP_LIMIT") && !o.IsLocal && o.symbol == order.symbol && o.orderAction == order.orderAction && o.orderedQuantity == order.orderedQuantity).ToList(); } return exitOrders.FirstOrDefault(); } private async Task CancelGroupOrders(BrokerOrder order) { var ordersToCancel = new List(); ordersToCancel.AddRange(GetGroupOrders(order)); if (ordersToCancel.Count > 0) await CancelOrders(ordersToCancel); } private DateTime TimeZoneNow { get { return DateTime.UtcNow; //return TimeZoneInfo // .ConvertTimeFromUtc( // DateTime.UtcNow, // _timeZoneInfo); } } private DateTime FromUnixTimeMilliseconds(long? time) { return DateTimeOffset.FromUnixTimeMilliseconds(time ?? 0).UtcDateTime; } private void UpdateLogHeader() { _logHeader.sessionId = _brokerUserSession.UserSessionId.ToString(); _logHeader.broker = _brokerIntegration.broker; _logHeader.userName = _brokerUserSession.UserName; //_logHeader.appid = _brokerUserSession.ap ToString(); //_logHeader.version = _brokerUserSession . application_version; _oAuthSession.UpdateLogHeader(_logHeader); } private void LogInfo(string message, object data = null) { //May Need Lock _logHeader.message = message; _logHeader.data = data; var j = JsonConvert.SerializeObject(_logHeader); j = j.Replace(@"\", ""); _logger.LogInformation($"{JsonConvert.SerializeObject(_logHeader)}"); } private void LogWarn(string message, object data = null) { //May Need Lock _logHeader.message = message; _logHeader.data = data; _logger.LogWarning("{Message}", JsonConvert.SerializeObject(_logHeader)); } private void LogError(string message, Exception ex) { //May Need Lock _logHeader.message = message; _logHeader.data = null; _logger.LogError(ex, "{Message}", JsonConvert.SerializeObject(_logHeader)); } private void Notify(NotificationType notificationType, object data) { try { switch (notificationType) { case NotificationType.ExpiredToken: break; } } catch (Exception) { } } private async Task GetNextOrderIdAsync() { var nextOrderIdResult = await _databaseService.GetBrokerUserNextOrderId(_brokerUserSession.BrokerUserId); return $"L{nextOrderIdResult.Obj}"; } private string NewClientOrderId() { _clientOrderId++; return $"{DateTime.Now:yyyyHHmm}{_clientOrderId}"; } private bool IsUserConnected { get { return !(_portfolioDataSubscriber == null); } } #region Workers private async Task RefreshAccount(BrokerAccount brokerAccount, string symbol) { try { if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME)) { LogWarn("Could not enter Semaphore from GetUserPortfolioLinkedAccounts()"); } var accountResult = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/{ALPACA_API_VERSION}/account"); var alpacaAccount = accountResult.Data; brokerAccount.account_value = alpacaAccount.portfolio_value ?? 0; brokerAccount.available_funds = alpacaAccount.cash ?? 0; brokerAccount.buying_power = alpacaAccount.buying_power ?? 0; brokerAccount.buying_power_in_use = alpacaAccount.daytrading_buying_power ?? 0; var positionsResult = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/{ALPACA_API_VERSION}/positions/{symbol.Replace("/","")}"); if (positionsResult.Data != null ) { var position = positionsResult.Data; var positionIndex = brokerAccount.GetPositionIndex(symbol); if (positionIndex < 0) { brokerAccount.AddPosition( new BrokerPosition() { id = 0, broker_id = _brokerId, broker_account_identifier = brokerAccount.account_identifier, broker_position_id = position.asset_id, symbol = position.symbol, shares = position.qty ?? 0, average_open_price = position.avg_entry_price ?? 0, last_trade = position.lastday_price ?? 0, realized_profit_loss = position.unrealized_pl } ); } else { brokerAccount.Positions[positionIndex].average_open_price = position.avg_entry_price; brokerAccount.Positions[positionIndex].shares = position.qty ?? 0; } } _portfolioDataSubscriber?.OnMessage(jcso(new AccountMessage("Account Update", TimeZoneNow, "Update", brokerAccount))); } catch (Exception ex) { LogError(ex.Message, ex); return false; } finally { _semaphoreSlim.Release(true); } return true; } private async Task ExecutionWorker() { try { while (_streaming) { await Task.Delay(EXECUTION_WORKER_INTERVAL); if (!_portfolioLoaded || _brokerAccounts == null || _brokerAccounts.Count == 0) continue; var ordersToMonitor = _brokerAccounts.SelectMany(a => a.Orders.Where(o => o.Monitor())).ToList(); //If the user is not connected and there are no more orders to monitor and !StreamingReconnectPending //Then we can end the sesssion if (!IsUserConnected && ordersToMonitor.Count == 0 && (StatusTypes)_brokerUserSession.StatusTypeId != StatusTypes.StreamingReconnectPending) { _streaming = false; _ = Task.Run(async () => await EndSession(StatusTypes.SessionEnded)); } foreach (var order in ordersToMonitor) { try { var triggered = false; if (order.localExecutionType == "time-cancel" && TimeZoneNow > order.goodTillTime) { await CancelOrders(new List() { order }); } else if (order.localExecutionType == "time-exit" && TimeZoneNow > order.goodAfterTime) { triggered = true; } if (triggered) { var exitOrder = GetExitOrder(order); order.replacing = false; order.convertFromLocal = true; order.priceType = "market"; order.orderStatus = "WORKING"; order.localExecutionType = null; order.stopPrice = null; order.limitPrice = null; order.goodAfterTime = null; if (exitOrder != null) { await ReplaceOrder(order, exitOrder); } else await PlaceOrders(new List() { order }); } } catch (Exception ex) { LogError(ex.Message, ex); } } } } catch (Exception ex) { LogError(ex.Message, ex); } } private bool IsOrderTriggered(BrokerOrder order, decimal last) { var trigger = false; switch (order.orderAction) { case "BUY": case "BUY_TO_COVER": switch (order.priceType) { case "LIMIT": if (last <= order.limitPrice) { trigger = true; } break; case "STOP": if (last >= order.stopPrice) { trigger = true; } break; } break; case "SELL": case "SELL_SHORT": switch (order.priceType) { case "LIMIT": if (last >= order.limitPrice) { trigger = true; } break; case "STOP": if (last <= order.stopPrice) { trigger = true; } break; } break; } return trigger; } #endregion public void Dispose() { } } }