using System;
using System.Net.Http;
using System.Threading.Tasks;
using System.Linq;
using System.Threading;
using TradeIdeasCommon.Services.Persistence;
using System.Collections.Generic;
using System.Collections.Specialized;
using TradeIdeasCommon.Models;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using MarketDataSnapshot;
using System.Diagnostics;
using System.Runtime.InteropServices;
using BrokersServices;
using AlpacaService.Models;
///
/// This is a wrapper around Alpaca's REST and WebSocket API
/// Alpaca offers a C# SDK, however, the package has many 3rd party dependencies
/// that seem excessive. This is a simple enough API to implement and maintain
/// on our own.
///
namespace AlpacaService
{
public class AlpacaUserSession : IUserSession, IDisposable
{
private IPortfolioDataSubscriber _portfolioDataSubscriber;
private readonly int _brokerId;
private readonly BrokerIntegration _brokerIntegration;
private readonly DatabaseService _databaseService;
private readonly MarketDataSnapshotService _snapshotService;
private readonly IUserSessionCallbacks _userSessionCallbacks;
private readonly ILogger _logger;
private AlpacaOAuthSession _oAuthSession;
private BrokerUserSession _brokerUserSession;
private LogHeader _logHeader = new LogHeader();
private List _brokerAccounts;
private bool _streaming;
private bool _portfolioLoaded;
private List _phantomOrders = new List();
private Task _taskPortfolioWorker;
private Task _taskExecutionWorker;
private int _clientOrderId;
private SemaphoreSlimX _semaphoreSlim = new SemaphoreSlimX(1, 1);
private readonly int SEMAPHORE_WAIT_TIME = 60000;
private readonly int RESET_EVENT_WAIT_TIME = 20000;
private readonly int PORTFOLIO_WORKER_INTERVAL = 1500;
private readonly int EXECUTION_WORKER_INTERVAL = 1000;
private System.Timers.Timer _streamingReconnectPendingTimer;
private const string ALPACA_API_VERSION = "v2";
private ClientWebSocketWrapper _ws;
public AlpacaUserSession(Brokers brokerid,
ILoggerFactory loggerFactory,
ApiEnvironment environment,
BrokerIntegration brokerIntegration,
DatabaseService databaseService,
MarketDataSnapshotService snapshotService,
IUserSessionCallbacks userSessionCallbacks)
{
_brokerId = (int)brokerid;
_logger = loggerFactory.CreateLogger();
_brokerIntegration = brokerIntegration;
_databaseService = databaseService;
_snapshotService = snapshotService;
_userSessionCallbacks = userSessionCallbacks;
_oAuthSession = new AlpacaOAuthSession(loggerFactory,
environment,
_brokerIntegration.client_id,
_brokerIntegration.client_secret,
Notify);
}
#region Service Methods
public BrokerUserSession GetBrokerUserSession()
{
return new BrokerUserSession()
{
TargetInstance = _brokerUserSession.TargetInstance,
Connected = IsUserConnected,
LastConnectedWhen = _brokerUserSession.LastConnectedWhen,
BrokerId = _brokerUserSession.BrokerId,
BrokerUserId = _brokerUserSession.BrokerUserId,
UserName = _brokerUserSession.UserName,
UserSessionId = _brokerUserSession.UserSessionId,
NextOrderId = _brokerUserSession.NextOrderId,
LocalOrders = _brokerAccounts.SelectMany(a => a.Orders.Where(o => o.Monitor())).ToList().Count(),
TokenExpirationWhen = _brokerUserSession.TokenExpirationWhen,
CreatedWhen = _brokerUserSession.CreatedWhen,
UpdatedWhen = _brokerUserSession.UpdatedWhen,
BrokerUserSessionId = _brokerUserSession.BrokerUserSessionId,
StatusTypeId = _brokerUserSession.StatusTypeId,
StatusType = _brokerUserSession.StatusType,
BrokerUserSessionCreatedWhen = _brokerUserSession.BrokerUserSessionCreatedWhen,
BrokerUserSessionUpdatedWhen = _brokerUserSession.BrokerUserSessionUpdatedWhen
};
}
public IReadOnlyCollection GetObfuscatedUserPortfolio()
{
var brokerAccounts = JsonConvert.DeserializeObject>(JsonConvert.SerializeObject(_brokerAccounts));
brokerAccounts.ForEach(ba => {
ba.account_identifier = ba.account_identifier[0..^4] + "****";
ba.account_key = ba.account_key[0..^4] + "****";
});
return brokerAccounts.AsReadOnly();
}
public async Task Connect(BrokerConnectRequest connectRequest)
{
var connectResult = new BrokerConnectResult();
try
{
if (!string.IsNullOrEmpty(connectRequest.verifierCode))
{
return await ConnectWithVerifierCode(connectRequest);
}
else if (!string.IsNullOrEmpty(connectRequest.tokens))
{
return await ConnectWithTokens(connectRequest);
}
var statusTypeId = StatusTypes.NotSet;
var brokerUserResult = await _databaseService.GetBrokerUser(_brokerId, connectRequest.userName);
var brokerUserSessionResult = await _databaseService.AddBrokerUserSession(connectRequest.targetInstance,
connectRequest.userSessionId,
brokerUserResult.Obj.BrokerUserId);
_brokerUserSession = brokerUserSessionResult.Obj;
UpdateLogHeader();
//if (connectRequest.reAuthenticate || string.IsNullOrEmpty(_brokerUserSession.AccessToken) || !_oAuthSession.Renew(_brokerUserSession.AccessToken, _brokerUserSession.AccessTokenSecret))
//{
// connectResult.Set(connectRequest.targetInstance,
// false,
// _brokerUserSession.BrokerUserSessionId,
// false,
// "Authentication with ETrade required.",
// _oAuthSession.GetRequestToken());
// _brokerUserSession.AccessToken = _oAuthSession.RequestToken;
// _brokerUserSession.AccessTokenSecret = _oAuthSession.RequestTokenSecret;
// statusTypeId = StatusTypes.AuthenticationWithBrokerRequired;
// UpdateLogHeader();
//}
//else
//{
// _brokerUserSession.AccessToken = _oAuthSession.AccessToken;
// _brokerUserSession.AccessTokenSecret = _oAuthSession.AccessTokenSecret;
// _brokerUserSession.LastConnectedWhen = DateTime.Now;
// statusTypeId = StatusTypes.AuthorizationRenewed;
// connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Connected.");
//}
//Temporary until we switch to App OAuth Workflow
_brokerUserSession.AccessToken = _oAuthSession.AccessToken;
_brokerUserSession.AccessTokenSecret = _oAuthSession.AccessTokenSecret;
_brokerUserSession.LastConnectedWhen = DateTime.Now;
statusTypeId = StatusTypes.AuthorizationRenewed;
connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Connected.");
_brokerUserSession.StatusTypeId = (int)statusTypeId;
var result = await _databaseService.UpdateBrokerUserSession(connectRequest.targetInstance,
_brokerUserSession.BrokerUserId,
_brokerUserSession.BrokerUserSessionId,
_brokerUserSession.StatusTypeId,
_brokerUserSession.AccessToken,
_brokerUserSession.AccessTokenSecret,
DateTime.Now.AddMinutes(15));
}
catch (Exception ex)
{
LogError(ex.Message, ex);
connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later.");
}
return connectResult;
}
public async Task ConnectWithVerifierCode(BrokerConnectRequest connectRequest)
{
var connectResult = new BrokerConnectResult();
try
{
var statusTypeId = StatusTypes.AuthenticationWithBrokerRequired;
var brokerUserSessionResult = await _databaseService.GetBrokerUserSession(connectRequest.userSessionId,_brokerId, null, (int)statusTypeId);
_brokerUserSession = brokerUserSessionResult.Obj;
UpdateLogHeader();
if (_oAuthSession.Authorize(connectRequest.verifierCode, _brokerUserSession.AccessToken, _brokerUserSession.AccessTokenSecret))
{
statusTypeId = StatusTypes.Authorized;
_brokerUserSession.LastConnectedWhen = DateTime.Now;
connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Connected.");
}
else
{
statusTypeId = StatusTypes.Unauthorized;
connectResult.Set(connectRequest.targetInstance, false, _brokerUserSession.BrokerUserSessionId, true, "Please Try Again Later.");
}
_brokerUserSession.StatusTypeId = (int)statusTypeId;
var result = await _databaseService.UpdateBrokerUserSession(connectRequest.targetInstance,
_brokerUserSession.BrokerUserId,
_brokerUserSession.BrokerUserSessionId,
_brokerUserSession.StatusTypeId,
_oAuthSession.AccessToken,
_oAuthSession.AccessTokenSecret,
DateTime.Now.AddMinutes(15));
}
catch (Exception ex)
{
LogError(ex.Message, ex);
connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later.");
}
return connectResult;
}
public async Task ConnectWithTokens(BrokerConnectRequest connectRequest)
{
var connectResult = new BrokerConnectResult();
try
{
var statusTypeId = StatusTypes.AuthenticationWithBrokerRequired;
var brokerUserSessionResult = await _databaseService.GetBrokerUserSession(connectRequest.userSessionId, _brokerId, null, (int)statusTypeId);
_brokerUserSession = brokerUserSessionResult.Obj;
UpdateLogHeader();
_oAuthSession.SetTokens(connectRequest.tokens);
statusTypeId = StatusTypes.Authorized;
_brokerUserSession.StatusTypeId = (int)statusTypeId;
var result = await _databaseService.UpdateBrokerUserSession(connectRequest.targetInstance,
_brokerUserSession.BrokerUserId,
_brokerUserSession.BrokerUserSessionId,
_brokerUserSession.StatusTypeId,
_oAuthSession.AccessToken,
_oAuthSession.AccessTokenSecret,
DateTime.Now.AddMinutes(15));
connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Connected.");
}
catch (Exception ex)
{
LogError(ex.Message, ex);
connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later.");
}
return connectResult;
}
public async Task Disconnect()
{
var connectResult = new BrokerConnectResult();
try
{
await UpdateBrokerUserSessionStatus(StatusTypes.UserDisconnected);
// _brokerUserSession.StatusTypeId = (int)StatusTypes.UserDisconnecting;
connectResult.Set(_brokerUserSession.TargetInstance, false, _brokerUserSession.BrokerUserSessionId, false, "Disconnection successful", string.Empty);
}
catch (Exception ex)
{
LogError(ex.Message, ex);
connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later.");
}
return connectResult;
}
public async Task ReConnect(BrokerConnectRequest connectRequest)
{
var connectResult = new BrokerConnectResult();
try
{
await UpdateBrokerUserSessionStatus(StatusTypes.UserReconnected);
_brokerUserSession.LastConnectedWhen = DateTime.Now;
connectResult.Set(connectRequest.targetInstance, true, _brokerUserSession.BrokerUserSessionId, false, "Successfully Re-Connected to your User Session");
}
catch (Exception ex)
{
LogError(ex.Message, ex);
connectResult.Set(string.Empty, false, 0, false, "Please Try Again Later.");
}
return connectResult;
}
public void ConnectStreaming(IPortfolioDataSubscriber portfolioDataSubscriber)
{
ClearPendingDisconnectTimer();
_portfolioDataSubscriber = portfolioDataSubscriber;
//On Streaming Reconnects it's Important that we don't change the status until the new _portfolioDataSubscriber is assigned
if ((StatusTypes)_brokerUserSession.StatusTypeId == StatusTypes.StreamingReconnectPending)
{
Task.Run(async () => await UpdateBrokerUserSessionStatus(StatusTypes.StreamingReconnected)).GetAwaiter().GetResult();
}
_portfolioDataSubscriber.OnMessage(JsonConvert.SerializeObject(new ConnectionMsg("Successfully Connected to your ETrade User Session.")));
if (!_streaming)
{
_streaming = Task.Run(async () =>
{
return await ConnectAlpacaStreaming();
}).GetAwaiter().GetResult();
_taskExecutionWorker = Task.Run(ExecutionWorker);
}
}
private async Task ConnectAlpacaStreaming()
{
try
{
_ws = ClientWebSocketWrapper.Create($"wss://{_oAuthSession.ApiHost}/stream");
_ws.OnMessage(OnWebSocketMessage);
_ws.OnDisconnect(OnWebSocketDisconnect);
await _ws.Connect();
var auth = new ActionMessage()
{
action = "authenticate",
data = new AuthKeys()
{
key_id = _oAuthSession.AccessToken,
secret_key = _oAuthSession.AccessTokenSecret
}
};
await _ws.SendMessageAsync(JsonConvert.SerializeObject(auth));
return true;
}
catch (Exception ex)
{
LogError(ex.Message,ex);
return false;
}
}
private void OnWebSocketMessage(string message, ClientWebSocketWrapper cws)
{
var json = JObject.Parse(message);
var result = json.TryGetValue("stream", out var messageType);
switch (messageType.ToString())
{
case "authorization":
var authMessage = json.ToObject >();
if( authMessage.data.status == "authorized")
{
var listen = new ActionMessage()
{
action = "listen",
data = new Streams()
{
streams = new List() { "trade_updates" }
}
};
Task.Run(async () => await _ws.SendMessageAsync(JsonConvert.SerializeObject(listen)));
}
break;
case "listening":
break;
case "trade_updates":
var streamMessage = json.ToObject>();
Task.Run(async () => await ProcessStreamingTradeUpdate(streamMessage.data));
break;
default:
break;
}
Debug.WriteLine(message);
}
private async Task ProcessStreamingTradeUpdate(OrderStream orderStream)
{
try
{
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
LogWarn("Could not enter Semaphore from ProcessStreamingTradeUpdate()");
Debug.WriteLine($"{TimeZoneNow:HH:mm:ss.fff}:{orderStream.Event}:{orderStream.order.symbol}:{orderStream.order.status}");
var account = _brokerAccounts[0];
var order = CreateBrokerOrder(account.account_identifier, orderStream.order);
var orderIndex = account.GetOrderIndex(orderStream.order.id);
if (orderIndex == -1)
{
account.Orders.Add(order);
await _databaseService.AddOrder(order);
}
else
{
//Most likely out of order streaming events: EG, new after filled
if (account.Orders[orderIndex].Filled())
return;
else
{
account.Orders[orderIndex] = order;
await _databaseService.UpdateOrder(order);
}
}
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, GetOrderAction(order.orderStatus), order)));
//Filled or Partially Filled Orders result in Account/Position Updates
if (orderStream.Event == "fill" || orderStream.Event == "partial_fill")
_ = Task.Run(async () => await RefreshAccount(account, order.symbol));
}
catch (Exception)
{
throw;
}
finally
{
_semaphoreSlim.Release(true);
}
}
private void OnWebSocketDisconnect(ClientWebSocketWrapper cwsw)
{
}
private void ClearPendingDisconnectTimer()
{
if (_streamingReconnectPendingTimer != null)
{
_streamingReconnectPendingTimer.Enabled = false;
_streamingReconnectPendingTimer.Stop();
_streamingReconnectPendingTimer.Dispose();
_streamingReconnectPendingTimer = null;
}
}
public async Task DisconnectStreaming()
{
//When a user disconnects from the ETrade Service a check is first made to see if there
//are any existing loacal orders to monitor in their portfolio
//if so, we keep the user session running so local orders are still evaluated for their trigger status
//if the user session is left running and there are no longer any orders to monitor while the user is disconnected
//the user session will be terminated
try
{
//If _disconnecting = true, the the user has just sent a request to Disconnect form their ETrade user sesssion
//Otherwise it is most likely the WebSocket Connection disconnected because of a users iternet issue
//or they abruptuly terminated their application or their system crashed
if ((StatusTypes)_brokerUserSession.StatusTypeId != StatusTypes.UserDisconnected)
{
await UpdateBrokerUserSessionStatus(StatusTypes.StreamingDisconnected);
await UpdateBrokerUserSessionStatus(StatusTypes.StreamingReconnectPending);
_streamingReconnectPendingTimer = new System.Timers.Timer(60000);
//Callback When the Timer Expires
_streamingReconnectPendingTimer.Elapsed += async (s, e) =>
{
await UpdateBrokerUserSessionStatus(StatusTypes.StreamingReconnectPendingTimeout);
await UpdateBrokerUserSessionStatus(StatusTypes.UserDisconnected);
ClearPendingDisconnectTimer();
};
_streamingReconnectPendingTimer.Start();
}
else
{
await UpdateBrokerUserSessionStatus(StatusTypes.StreamingDisconnected);
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
_portfolioDataSubscriber = null;
return true;
}
private async Task UpdateBrokerUserSessionStatus(StatusTypes statusType)
{
_brokerUserSession.StatusTypeId = (int)statusType;
await _databaseService.UpdateBrokerUserSessionStatus(_brokerUserSession.TargetInstance,
_brokerUserSession.BrokerUserId,
_brokerUserSession.BrokerUserSessionId,
_brokerUserSession.StatusTypeId);
return;
}
private async Task EndSession(StatusTypes statusType)
{
_streaming = false;
_taskPortfolioWorker.Wait();
_taskExecutionWorker.Wait();
await UpdateBrokerUserSessionStatus(statusType);
_portfolioDataSubscriber = null;
_userSessionCallbacks?.OnSessionEnded(_brokerUserSession.UserName);
}
public string QuoteMode(int quoteMode)
{
switch (quoteMode)
{
case 0:
return "Realtime";
case 1:
return "Delayed";
case 2:
return "Closing";
case 3:
return "AHT Realtime";
case 4:
return "AHT Before Open";
case 5:
return "AHT Closing";
default:
return "None";
}
}
public async Task SaveLinkedAccountSettings(string linkedAccounts)
{
try
{
var result = await _databaseService.UpdateBrokerUserLinkedAccounts(_brokerUserSession.BrokerUserId, linkedAccounts);
if (result.IsSuccess)
_brokerUserSession.LinkedAccounts = linkedAccounts;
return result;
}
catch (Exception ex)
{
LogError(ex.Message, ex);
return ServiceResult.GeneralError(ex);
}
}
public async Task>> GetUserPortfolioAllAccounts()
{
var serviceResult = new ServiceResult>();
//try
//{
// var accountsResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, "/v1/accounts/list");
// var brokerAccounts = new List();
// if (string.IsNullOrEmpty(_brokerUserSession.LinkedAccounts))
// {
// _brokerUserSession.LinkedAccounts = string.Join(",", accountsResponse.Data.AccountListResponse.Accounts.Account.Select(a => a.accountId).ToList());
// await _databaseService.UpdateBrokerUserLinkedAccounts(_brokerUserSession.BrokerUserId, _brokerUserSession.LinkedAccounts);
// }
// foreach (var account in accountsResponse.Data.AccountListResponse.Accounts.Account)
// {
// var ba = new BrokerAccount()
// {
// account_identifier = account.accountId,
// account_key = account.accountIdKey,
// account_name = account.accountName,
// account_type = account.accountType,
// account_institution_type = account.institutionType,
// account_status = account.accountStatus,
// nickname = account.accountDesc,
// account_description = account.accountDesc,
// linked = _brokerUserSession.LinkedAccounts.Contains(account.accountId),
// };
// var accountBalanceResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/balance?instType={account.institutionType}&realTimeNAV=true");
// if (!accountBalanceResponse.IsSuccess)
// continue;
// try
// {
// var balanceResponse = accountBalanceResponse.Data.BalanceResponse;
// ba.account_mode = balanceResponse.accountMode;
// ba.quote_mode = QuoteMode(balanceResponse.quoteMode);
// ba.day_trader_status = balanceResponse.dayTraderStatus;
// ba.account_value = balanceResponse.Computed.RealTimeValues.totalAccountValue;
// ba.available_funds = balanceResponse.Computed.cashAvailableForWithdrawal;
// ba.buying_power = balanceResponse.Computed.cashAvailableForInvestment;
// ba.buying_power_in_use = balanceResponse.Computed.fundsWithheldFromPurchasePower;
// }
// catch (Exception) { }
// var accountPortfolioResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/portfolio?view=QUICK&totalsRequired=true&lotsRequired=true");
// if (!accountPortfolioResponse.IsSuccess)
// continue;
// var accountPortfolios = accountPortfolioResponse.Data.PortfolioResponse;
// ba.realized_profit_loss = accountPortfolios.Totals.todaysGainLoss;
// ba.unrealized_profit_loss = accountPortfolios.Totals.totalGainLoss;
// foreach (var portfolio in accountPortfolios.AccountPortfolio ?? Enumerable.Empty())
// {
// var positionLotsTasks = new List>>();
// foreach (var position in portfolio.Position)
// {
// var bp = new BrokerPosition()
// {
// id = 0,
// broker_id = _brokerId,
// broker_account_identifier = account.accountId,
// broker_position_id = position.positionId,
// symbol = position.Product.symbol,
// shares = position.quantity,
// average_open_price = position.costPerShare,
// last_trade = position.Quick.lastTrade,
// realized_profit_loss = position.RealizedProfitLoss
// };
// ba.AddPosition(bp);
// }
// }
// var ordersTimeFrame = TimeZoneNow;
// //Get Cached Orders
// var orders = await _databaseService.GetOrders(_brokerUserSession.UserName, _brokerId, account.accountId, "OPEN", null, null);
// if (orders.Obj != null && orders.Obj.Count() > 0)
// {
// var localOrders = orders.Obj.ToList();
// localOrders?.RemoveAll(lo => (lo.localExecutionType == "time-cancel" && TimeZoneNow > lo.goodTillTime) ||
// (lo.localExecutionType == "time-exit" && TimeZoneNow > lo.goodAfterTime));
// }
// foreach (var order in orders.Obj)
// {
// if (order.IsLocal)
// {
// ba.AddOrder(order);
// }
// }
// brokerAccounts.Add(ba);
// }
// serviceResult.Set(brokerAccounts.AsReadOnly(), ServiceResultStatuses.Success);
//}
//catch (Exception ex)
//{
// serviceResult.Set(ServiceResultStatuses.ServerError, "Server Error");
// LogError(ex.Message, ex);
//}
return serviceResult;
}
public async Task>> GetUserPortfolioLinkedAccounts()
{
var serviceResult = new ServiceResult>();
try
{
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
{
LogWarn("Could not enter Semaphore from GetUserPortfolioLinkedAccounts()");
}
var accountResult = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/{ALPACA_API_VERSION}/account");
var account = accountResult.Data;
_brokerAccounts = new List();
if (string.IsNullOrEmpty(_brokerUserSession.LinkedAccounts))
{
_brokerUserSession.LinkedAccounts = account.account_number;
await _databaseService.UpdateBrokerUserLinkedAccounts(_brokerUserSession.BrokerUserId, _brokerUserSession.LinkedAccounts);
}
var brokerAccount = new BrokerAccount()
{
account_identifier = account.account_number,
account_key = account.id,
account_name = account.account_number,
account_type = account.sma,
account_institution_type = account.sma,
account_status = account.status,
nickname = account.account_number,
account_description = account.account_number,
linked = _brokerUserSession.LinkedAccounts.Contains(account.account_number),
account_mode = "N/A",
quote_mode = "Realtime",
day_trader_status = "N/A",
account_value = account.portfolio_value ?? 0,
available_funds = account.cash ?? 0,
buying_power = account.buying_power ?? 0,
buying_power_in_use = account.daytrading_buying_power ?? 0
};
var positionsResult = await _oAuthSession.SendRequestAsync>(HttpMethod.Get, $"/{ALPACA_API_VERSION}/positions");
if (positionsResult.Data != null && positionsResult.Data.Count > 0)
{
var positions = positionsResult.Data;
foreach( var position in positions)
{
var brokerPosition = new BrokerPosition()
{
id = 0,
broker_id = _brokerId,
broker_account_identifier = account.account_number,
broker_position_id = position.asset_id,
symbol = position.symbol,
shares = position.qty ?? 0,
average_open_price = position.avg_entry_price ?? 0,
last_trade = position.lastday_price ?? 0,
realized_profit_loss = position.unrealized_pl
};
Debug.WriteLine($"Position-{position.symbol}:{position.asset_id}");
brokerAccount.AddPosition(brokerPosition);
}
}
var ordersTimeFrame = TimeZoneNow;
var ordersResult = await _oAuthSession.SendRequestAsync>(HttpMethod.Get, $"/{ALPACA_API_VERSION}/orders?status=all");
if (ordersResult.Data != null && ordersResult.Data.Count > 0)
{
ordersResult.Data.RemoveAll(o => o.status != "OPEN" & o.created_at.Value.Date < TimeZoneNow.Date);
foreach(var order in ordersResult.Data)
{
var brokerOrder = CreateBrokerOrder(account.account_number, order);
Debug.WriteLine($"Order-{order.symbol}:{order.asset_id}");
brokerAccount.AddOrder(brokerOrder);
}
}
// //Get Todays Orders
// var ordersResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/orders?count=100&fromDate={ordersTimeFrame:MMddyyyy}&toDate={ordersTimeFrame:MMddyyyy}", "application/json");
// if (ordersResponse.Data != null)
// {
// foreach (var order in ordersResponse.Data.OrdersResponse.Order)
// {
// var orderDetail = order.OrderDetail.First();
// var instrument = orderDetail.Instrument.First();
// if (orderDetail.status == "CANCELED" || orderDetail.status == "EXPIRED" || orderDetail.status == "REJECTED")
// {
// var orderDetailsResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/orders/{order.orderId}", "application/json");
// if (orderDetailsResponse.IsSuccess)
// order.Events = orderDetailsResponse.Data.OrdersResponse.Order[0].Events;
// }
// ba.AddOrder(CreateBrokerOrder(ba.account_identifier, order));
// }
// }
// //Get All Open Orders, and discard Todays Open Orders retrieved in the previous orders request
// var openOrdersResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/orders?status=OPEN");
// if (openOrdersResponse.IsSuccess && openOrdersResponse.Data != null)
// {
// foreach (var order in openOrdersResponse.Data.OrdersResponse.Order)
// {
// if (ba.Orders.Count == 0 || !ba.Orders.Any(o => o.brokerOrderId == order.orderId))
// {
// ba.AddOrder(CreateBrokerOrder(ba.account_identifier, order));
// }
// }
// }
// //Get Cached Orders
// var orders = await _databaseService.GetOrders(_brokerUserSession.UserName, _brokerId, account.accountId, "OPEN", null, null);
// if (orders.Obj != null && orders.Obj.Count() > 0)
// {
// var localOrders = orders.Obj.ToList();
// localOrders?.RemoveAll(lo => (lo.localExecutionType == "time-cancel" && TimeZoneNow > lo.goodTillTime) ||
// (lo.localExecutionType == "time-exit" && TimeZoneNow > lo.goodAfterTime));
// }
// foreach (var order in orders.Obj)
// {
// if (order.IsLocal)
// {
// ba.AddOrder(order);
// }
// else
// {
// var oi = ba.GetOrderIndex(order.brokerOrderId);
// if (oi > -1)
// {
// ba.Orders[oi].id = order.id;
// ba.Orders[oi].brokerParentOrderId = order.brokerParentOrderId;
// ba.Orders[oi].goodTillTime = order.goodTillTime;
// ba.Orders[oi].ocaGroup = order.ocaGroup;
// ba.Orders[oi].strategyName = order.strategyName;
// }
// }
// }
// foreach (var bp in ba.Positions ?? Enumerable.Empty())
// {
// var order = ba.GetOrder(bp.opening_order_id);
// if (null == order && !string.IsNullOrEmpty(bp.opening_order_id))
// {
// var openingOrderResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.accountIdKey}/orders/{bp.opening_order_id}");
// var orderExecutedTime = openingOrderResponse.Data.OrdersResponse.Order.First().OrderDetail.First().executedTime;
// bp.entry_when = FromUnixTimeMilliseconds(orderExecutedTime);
// }
// else
// {
// bp.entry_when = order.executedWhen.Value;
// }
// }
_brokerAccounts.Add(brokerAccount);
// }
serviceResult.Set(_brokerAccounts.AsReadOnly(), ServiceResultStatuses.Success);
_portfolioLoaded = true;
}
catch (Exception ex)
{
serviceResult.Set(ServiceResultStatuses.ServerError, "Server Error");
LogError(ex.Message, ex);
}
finally
{
_semaphoreSlim.Release(true);
}
return serviceResult;
}
public async Task PlaceOrdersWorker(BrokerOrderRequest brokerOrderRequest)
{
try
{
switch (brokerOrderRequest.Macro)
{
case "DOUBLE_POSITION":
await DoublePosition(brokerOrderRequest);
break;
case "HALF_POSITION":
await HalfPosition(brokerOrderRequest);
break;
case "FLATTEN_POSITION":
await FlattenPosition(brokerOrderRequest);
break;
case "ENTER_POSITION":
await EnterPosition(brokerOrderRequest);
break;
case "REVERSE_POSITION":
await ReversePosition(brokerOrderRequest);
break;
default:
{
var order = brokerOrderRequest.Orders[0];
var account = GetAccount(order.brokerAccountIdentifier);
var position = account.GetPosition(order.symbol);
var orderActions = GetOrderActions(order.orderAction == "BUY",
position == null ? 0 : position.shares,
order.orderedQuantity ?? 0);
var orders = new List();
foreach (var o in orderActions)
{
orders.Add(CopyBrokerOrder(order, o.Item1, o.Item2, string.Empty));
}
await PlaceOrders(orders);
break;
}
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
}
public async Task ModifyOrdersWorker(BrokerOrderRequest brokerOrderRequest)
{
try
{
switch (brokerOrderRequest.Macro)
{
case "NONE":
foreach (var order in brokerOrderRequest.Orders)
{
var existingOrder = GetOrder(order.brokerAccountIdentifier, order.brokerOrderId);
if (existingOrder.IsLocal)
{
await ModifyLocalOrder(order);
}
else
{
ConfigureBrokerOrder(order);
existingOrder.replacing = true;
await ReplaceOrder(order, existingOrder);
}
}
break;
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
}
public async Task CancelOrdersWorker(BrokerOrderRequest brokerOrderRequest)
{
try
{
var ordersToCancel = new List();
ordersToCancel.AddRange(brokerOrderRequest.Orders);
foreach (var order in brokerOrderRequest.Orders)
{
ordersToCancel.AddRange(GetGroupOrders(order));
}
await CancelOrders(ordersToCancel);
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
}
private async Task PlaceOrders(List orders)
{
try
{
var enteredSemaphore = false;
foreach (var order in orders)
{
//order.manualResetEvent.Reset();
if (await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
enteredSemaphore = true;
await PlaceOrder(order, false);
_semaphoreSlim.Release(enteredSemaphore);
//order.manualResetEvent.WaitOne(RESET_EVENT_WAIT_TIME);
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
}
private async Task PlaceOrder(BrokerOrder order, bool forceLocal)
{
try
{
var account = GetAccount(order.brokerAccountIdentifier);
if( !order.convertFromLocal)
order.localOrderId = await GetNextOrderIdAsync();
order.brokerId = _brokerId;
order.tiUser = _brokerUserSession.UserName;
order.isEntry = order.orderAction == "BUY" || order.orderAction == "SELL_SHORT";
order.createdWhen = TimeZoneNow;
order.updatedWhen = TimeZoneNow;
//Local Orders
if ( order.goodAfterTime.HasValue)
{
order.brokerOrderId = order.localOrderId;
order.isEntry = false;
order.localExecutionType = "time-exit";
if (string.IsNullOrEmpty(order.orderStatus))
order.orderStatus = "OPEN";
await AddOrder(account, order);
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, GetOrderAction(order.orderStatus), order)));
return true;
}
//Alpaca Orders
if (order.goodTillTime.HasValue)
order.localExecutionType = "time-cancel";
if (!order.convertFromLocal)
await AddOrder(account, order);
var alpacaOrder = CreateAlpacaOrder(order);
var alpacaOrderResponse = await _oAuthSession.SendRequestAsync(HttpMethod.Post, $"/{ALPACA_API_VERSION}/orders",alpacaOrder);
if (alpacaOrderResponse.IsSuccess)
{
order.brokerOrderId = alpacaOrderResponse.Data.id;
order.orderStatus = ConvertOrderStatus(alpacaOrderResponse.Data.status);
order.updatedWhen = alpacaOrderResponse.Data.updated_at.Value;
}
else
{
order.localOrderId = await GetNextOrderIdAsync();
order.brokerOrderId = order.localOrderId;
order.orderStatus = "REJECTED";
order.errorMessage = alpacaOrderResponse.Message;
}
await _databaseService.UpdateOrder(order);
Debug.WriteLine($"{"N/A"}:{order.symbol}:{order.orderStatus}");
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage(alpacaOrderResponse.Message, TimeZoneNow, GetOrderAction(order.orderStatus), order)));
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
return true;
}
private async Task CancelOrders(List orders)
{
try
{
foreach (var orderToCancel in orders)
{
var account = GetAccount(orderToCancel.brokerAccountIdentifier);
var order = account.GetOrder(orderToCancel.brokerOrderId);
if (order == null) continue;
if (order.IsLocal)
{
order.comments = "Cancel Requested";
order.orderStatus = "CANCELLED";
var updateOrderResult = await _databaseService.UpdateOrder(order);
if (updateOrderResult.IsSuccess)
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Canceled", TimeZoneNow, order.orderStatus, order)));
order.manualResetEvent.Set();
}
else
{
var cancelOrderResult = await _oAuthSession.SendRequestAsync(HttpMethod.Delete, $"/{ALPACA_API_VERSION}/orders/{order.brokerOrderId}");
if (!cancelOrderResult.IsSuccess)
{
var updateOrderResult = await _databaseService.ErrorOrder(cancelOrderResult.Message, order);
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage(cancelOrderResult.Message, TimeZoneNow, "ERROR", order)));
}
}
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
}
private async Task ModifyLocalOrder(BrokerOrder order)
{
try
{
var account = GetAccount(order.brokerAccountIdentifier);
var brokerOrderIdToReplace = string.IsNullOrEmpty(order.replacingBrokerOrderId) ? order.brokerOrderId : order.replacingBrokerOrderId;
var oi = account.GetOrderIndex(brokerOrderIdToReplace);
if (oi < 0) return;
account.Orders[oi].replacing = order.replacing;
//Local Orders
if (account.Orders[oi].IsLocal)
{
account.Orders[oi].orderedQuantity = order.orderedQuantity;
switch (account.Orders[oi].localExecutionType)
{
case "time-exit":
account.Orders[oi].goodAfterTime = order.goodAfterTime;
break;
case "time-cancel":
account.Orders[oi].goodTillTime = order.goodTillTime;
break;
case "price-exit":
if (order.stopPrice.HasValue)
account.Orders[oi].stopPrice = order.stopPrice;
if (order.limitPrice.HasValue)
account.Orders[oi].limitPrice = order.limitPrice;
break;
}
var updateOrderResult = await _databaseService.UpdateOrder(account.Orders[oi]);
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, "ORDER_MODIFIED", account.Orders[oi])));
return;
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
}
private async Task DoublePosition(BrokerOrderRequest brokerOrderRequest)
{
//Double the position size, then double any pending exit orders; both local and etrade orders
var enteredSemaphore = false;
try
{
var order = brokerOrderRequest.Orders[0];
var account = GetAccount(order.brokerAccountIdentifier);
var position = account.GetPosition(order.symbol);
var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction == "BUY" ? "SELL" : "BUY");
var localExitOrders = pendingExitOrders.Where(o => o.IsLocal).ToList();
var eTradeExitOrders = pendingExitOrders.Where(o => !o.IsLocal).ToList();
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
{
order.orderStatus = "ERROR";
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order)));
return;
}
else
enteredSemaphore = true;
var orderAction = GetOrderActions(order.orderAction == "BUY",
position == null ? 0 : position.shares,
order.orderedQuantity ?? 0).FirstOrDefault();
order = CopyBrokerOrder(order, orderAction.Item1, orderAction.Item2, string.Empty);
order.waitStatus = "EXECUTED";
order.manualResetEvent.Reset();
await PlaceOrder(order, false);
_semaphoreSlim.Release(enteredSemaphore);
order.manualResetEvent.WaitOne(RESET_EVENT_WAIT_TIME);
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
LogWarn($"Could not enter Semaphore from DoublePosition()");
else
enteredSemaphore = true;
foreach (var pendingExitOrder in localExitOrders)
{
pendingExitOrder.orderedQuantity = pendingExitOrder.orderedQuantity * 2;
await ModifyLocalOrder(pendingExitOrder);
}
if (eTradeExitOrders.Count > 0)
{
foreach (var eTradeExitOrder in eTradeExitOrders)
{
var newOrder = CopyBrokerOrder(eTradeExitOrder, eTradeExitOrder.orderAction, eTradeExitOrder.orderedQuantity * 2, "NEW");
newOrder.replacing = true;
eTradeExitOrder.replacing = true;
await ReplaceOrder(newOrder, eTradeExitOrder);
}
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
finally
{
_semaphoreSlim.Release(enteredSemaphore);
}
}
private async Task HalfPosition(BrokerOrderRequest brokerOrderRequest)
{
// 1/2 the pending exit orders, then 1/2 the position size
// we must first wait for the replacement of the ETrade Exit orders,
// so we don't get an over bought/sold rejection when selling to reduce the position by 1/2
var enteredSemaphore = false;
try
{
var order = brokerOrderRequest.Orders[0];
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
{
order.orderStatus = "ERROR";
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order)));
return;
}
else
enteredSemaphore = true;
var account = GetAccount(order.brokerAccountIdentifier);
var position = account.GetPosition(order.symbol);
var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction);
var eTradeExitOrders = pendingExitOrders.Where(o => !o.IsLocal).OrderBy(o => o.orderedQuantity).ToList();
var allLocalExitOrders = pendingExitOrders.Where(o => o.IsLocal).ToList();
if (eTradeExitOrders.Count > 0)
{
var waitHandles = new List();
var existingPositionShares = position?.shares;
var newPositionShares = (int)(Math.Abs(existingPositionShares.Value) - order.orderedQuantity);
var newExitOrderQuantity = 0;
int orderCounter = 0;
foreach (var eTradeExitOrder in eTradeExitOrders)
{
orderCounter++;
var cancelOrder = false;
int newOrderedQuantity = 0;
if (newExitOrderQuantity == newPositionShares)
{
cancelOrder = true;
}
else
{
//If we are processing the last order make sure we make up any missing shares
if (orderCounter == eTradeExitOrders.Count)
newOrderedQuantity = newPositionShares - newExitOrderQuantity;
else
newOrderedQuantity = Math.Max(1, GetShares((int)eTradeExitOrder.orderedQuantity.Value, -50));
}
eTradeExitOrder.manualResetEvent.Reset();
//These are the local exit orders in the same oca group as the ETrade exit order
var groupExitOrders = allLocalExitOrders.Where(o => o.brokerParentOrderId == eTradeExitOrder.brokerParentOrderId
|| o.ocaGroup == eTradeExitOrder.ocaGroup).ToList();
if (cancelOrder)
{
//Add ETrade exit order to the list of orders to cancel;
groupExitOrders.Add(eTradeExitOrder);
await CancelOrders(groupExitOrders);
waitHandles.Add(eTradeExitOrder.manualResetEvent);
}
else
{
newExitOrderQuantity += newOrderedQuantity;
//Skip Adjusting Orders with change in quantity
if (newOrderedQuantity == eTradeExitOrder.orderedQuantity) continue;
var newOrder = CopyBrokerOrder(eTradeExitOrder, eTradeExitOrder.orderAction, newOrderedQuantity, "NEW");
newOrder.replacing = true;
eTradeExitOrder.replacing = true;
newOrder.manualResetEvent.Reset();
await ReplaceOrder(newOrder, eTradeExitOrder);
waitHandles.Add(newOrder.manualResetEvent);
waitHandles.Add(eTradeExitOrder.manualResetEvent);
foreach (var pendingExitOrder in groupExitOrders)
{
pendingExitOrder.orderedQuantity = newOrderedQuantity;
await ModifyLocalOrder(pendingExitOrder);
}
}
}
_semaphoreSlim.Release(enteredSemaphore);
WaitHandle.WaitAll(waitHandles.ToArray(), RESET_EVENT_WAIT_TIME);
}
else
{
//If there is a possibility that we can have exit orders that are not part of an OCA group then we'll need to add logic to cancel them independently
if (allLocalExitOrders.Count > 0)
{
//foreach (var pendingExitOrder in localExitOrders)
//{
// pendingExitOrder.orderedQuantity = order.orderedQuantity;
// await ModifyLocalOrder(pendingExitOrder);
//}
}
_semaphoreSlim.Release(enteredSemaphore);
}
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
LogWarn($"Could not enter Semaphore from HalfPosition()");
else
enteredSemaphore = true;
var orderAction = GetOrderActions(order.orderAction == "BUY",
position == null ? 0 : position.shares,
order.orderedQuantity ?? 0).FirstOrDefault();
order = CopyBrokerOrder(order, orderAction.Item1, orderAction.Item2, string.Empty);
await PlaceOrder(order, false);
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
finally
{
_semaphoreSlim.Release(enteredSemaphore);
}
}
private async Task FlattenPosition(BrokerOrderRequest brokerOrderRequest)
{
var enteredSemaphore = false;
try
{
var order = brokerOrderRequest.Orders[0];
#region Option 1
//Option 1 - Cancel ETrade Exit Order First, and other Local Orders, then send Market Order to Close
var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction);
if (pendingExitOrders != null && pendingExitOrders.Count > 0)
{
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
{
order.orderStatus = "ERROR";
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order)));
return;
}
else
enteredSemaphore = true;
//Allows suspending further order processing until an order status cange is detected by the PortfolioWorker()
pendingExitOrders.ForEach(peo => { peo.manualResetEvent.Reset(); peo.replacing = true; });
await CancelOrders(pendingExitOrders);
//We Cannot send the close order request for a flatten until the pending exit orders are canceled
//otherwise ETrade will return an overbought or oversold exception
//CancelOrders will Reset the Event on each order
//When the order status change is detected in the PortfolioWorker, the event will be Set()
_semaphoreSlim.Release(enteredSemaphore);
WaitHandle.WaitAll(pendingExitOrders.Select(o => o.manualResetEvent).ToArray(), 15000);
}
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
LogWarn($"Could not enter Semaphore from FlattenPosition()");
else
enteredSemaphore = true;
var account = GetAccount(order.brokerAccountIdentifier);
var position = account.GetPosition(order.symbol);
var orderAction = GetOrderActions(order.orderAction == "BUY",
position == null ? 0 : position.shares,
order.orderedQuantity ?? 0).FirstOrDefault();
var newOrder = CopyBrokerOrder(order, orderAction.Item1, orderAction.Item2, string.Empty);
await PlaceOrder(newOrder, false);
#endregion
#region Option 2
//Option 2 - Cancel Local Exit Orders First,
//Replace ETrade Exit Order with a Market Order to close
//Or Send Market Order if no ETrade Exit Order Exists
//Option 2, also works for button_limit_out by using a Limit Order
//if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
//{
// order.orderStatus = "ERROR";
// _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order)));
// return;
//}
//else
// enteredSemaphore = true;
//var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction);
//var localExitOrders = pendingExitOrders.Where(o => o.IsLocal).ToList();
//var eTradeExitOrders = pendingExitOrders.Where(o => !o.IsLocal).ToList();
//if (localExitOrders != null && localExitOrders.Count > 0)
// await CancelOrders(localExitOrders);
//order.replacing = true;
//if (eTradeExitOrders.Count > 0)
//{
// var waitHandles = new List();
// foreach (var eTradeExitOrder in eTradeExitOrders)
// {
// eTradeExitOrder.replacing = true;
// ConfigureBrokerOrder(order);
// await ReplaceOrder(order, eTradeExitOrder);
// waitHandles.Add(order.manualResetEvent);
// waitHandles.Add(eTradeExitOrder.manualResetEvent);
// }
// _semaphoreSlim.Release(enteredSemaphore);
// WaitHandle.WaitAll(waitHandles.ToArray(), RESET_EVENT_WAIT_TIME);
//}
//else
//await PlaceOrder(order, false);
#endregion
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
finally
{
_semaphoreSlim.Release(enteredSemaphore);
}
}
private async Task ReversePosition(BrokerOrderRequest brokerOrderRequest)
{
var enteredSemaphore = false;
try
{
var order = brokerOrderRequest.Orders[0];
//Cancel ETrade Exit Order First, and other Local Orders, then send Orders to Reverse the Position
var pendingExitOrders = GetPendingExitOrders(order.brokerAccountIdentifier, order.symbol, order.orderAction);
if (pendingExitOrders != null && pendingExitOrders.Count > 0)
{
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
{
order.orderStatus = "ERROR";
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order)));
return;
}
else
enteredSemaphore = true;
//Allows suspending further order processing until an order status cange is detected by the PortfolioWorker()
pendingExitOrders.ForEach(peo => { peo.manualResetEvent.Reset(); peo.replacing = true; });
await CancelOrders(pendingExitOrders);
//We Cannot send the orders for a reverse until the pending exit orders are canceled
//otherwise ETrade will return an overbought or oversold exception
//CancelOrders will Reset the Event on each order
//When the order status change is detected in the PortfolioWorker, the event will be Set()
_semaphoreSlim.Release(enteredSemaphore);
WaitHandle.WaitAll(pendingExitOrders.Select(o => o.manualResetEvent).ToArray(), 15000);
}
var account = GetAccount(order.brokerAccountIdentifier);
var position = account.GetPosition(order.symbol);
var orderActions = GetOrderActions(order.orderAction == "BUY",
position == null ? 0 : position.shares,
order.orderedQuantity ?? 0);
var orders = new List();
foreach (var o in orderActions)
{
orders.Add(CopyBrokerOrder(order, o.Item1, o.Item2, string.Empty));
}
await PlaceOrders(orders);
}
catch (Exception ex)
{
_semaphoreSlim.Release(enteredSemaphore);
LogError(ex.Message, ex);
}
}
private async Task EnterPosition(BrokerOrderRequest brokerOrderRequest)
{
var enteredSemaphore = false;
try
{
var order = brokerOrderRequest.Orders[0];
order.manualResetEvent.Reset();
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
{
order.orderStatus = "ERROR";
_portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, order.orderStatus, order)));
return;
}
else
enteredSemaphore = true;
//This is the entry order
//If the Entry order is rejected then end processing
var account = GetAccount(order.brokerAccountIdentifier);
var position = account.GetPosition(order.symbol);
var orderAction = GetOrderActions(order.orderAction == "BUY",
position == null ? 0 : position.shares,
order.orderedQuantity ?? 0).FirstOrDefault();
order = CopyBrokerOrder(order, orderAction.Item1, orderAction.Item2, string.Empty);
var result = await PlaceOrder(order, false);
if (!result) return;
//These are the exit orders
foreach (var exitOrder in brokerOrderRequest.Orders.Skip(1))
{
orderAction = GetOrderActions(exitOrder.orderAction == "BUY",
position == null ? 0 : position.shares,
exitOrder.orderedQuantity ?? 0).FirstOrDefault();
exitOrder.brokerParentOrderId = order.brokerOrderId;
exitOrder.ocaGroup = order.brokerOrderId;
exitOrder.orderStatus = "NEW";
await PlaceOrder(exitOrder, true);
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
finally
{
_semaphoreSlim.Release(enteredSemaphore);
}
}
private async Task ReplaceOrder(BrokerOrder newOrder, BrokerOrder orderToReplace)
{
//try
//{
// var account = GetAccount(orderToReplace.brokerAccountIdentifier);
// var previewOrderRequest = MakePreviewOrderRequest(newOrder);
// var previewOrderResponse = await _oAuthSession
// .SendRequestAsync(HttpMethod.Put,
// $"/v1/accounts/{account.account_key}/orders/{orderToReplace.brokerOrderId}/change/preview",
// new PreviewOrderRequestWrapper(previewOrderRequest));
// if (!previewOrderResponse.IsSuccess)
// {
// newOrder.createdWhen = TimeZoneNow;
// newOrder.updatedWhen = TimeZoneNow;
// newOrder.orderStatus = "REJECTED";
// newOrder.errorMessage = previewOrderResponse.Message;
// _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage(previewOrderResponse.Message, TimeZoneNow, newOrder.orderStatus, newOrder)));
// return;
// }
// var newOrderRequest = new PlaceOrderRequest()
// {
// clientOrderId = previewOrderRequest.clientOrderId,
// orderType = previewOrderRequest.orderType,
// Order = previewOrderRequest.Order,
// PreviewIds = previewOrderResponse.Data.PreviewOrderResponse.PreviewIds
// };
// try
// {
// var orderResponse = await _oAuthSession
// .SendRequestAsync(HttpMethod.Put,
// $"/v1/accounts/{account.account_key}/orders/{orderToReplace.brokerOrderId}/change/place",
// new PlaceOrderRequestWrapper(newOrderRequest));
// if (orderResponse.IsSuccess)
// {
// var eOrder = orderResponse.Data.PlaceOrderResponse.GetOrder();
// newOrder.brokerOrderId = eOrder.orderId;
// newOrder.brokerParentOrderId = orderToReplace.brokerParentOrderId;
// //This is for local order executions converted to a Market Order
// //Revisit
// if (newOrder.convertFromLocal)
// {
// var oi = account.GetOrderIndex(newOrder.brokerOrderId);
// account.Orders[oi] = newOrder;
// await _databaseService.UpdateOrder(account.Orders[oi]);
// }
// else
// {
// await AddOrder(account, newOrder);
// }
// }
// }
// catch (Exception ex)
// {
// LogError(ex.Message, ex);
// }
//}
//catch (Exception ex)
//{
// LogError(ex.Message, ex);
//}
}
//private async Task ActivateChildOrders(BrokerAccount account, BrokerOrder order)
//{
// try
// {
// var childOrders = account.Orders.Where(o => o.brokerParentOrderId == order.brokerOrderId).ToList();
// if (childOrders.Count == 0) return;
// await RefreshAccount(account);
// var eTradeOrderActivated = false;
// //Trailing Stop Order has highest priority for converting from local to ETrade Order
// var trailingStopOrder = childOrders.FirstOrDefault(o => o.priceType == "TRAILING_STOP_CNST");
// if (trailingStopOrder != null)
// {
// trailingStopOrder.orderStatus = "NEW";
// trailingStopOrder.replacing = true;
// trailingStopOrder.convertFromLocal = true;
// trailingStopOrder.localExecutionType = null;
// await PlaceOrder(trailingStopOrder, false);
// eTradeOrderActivated = true;
// }
// //Stop Order has 2nd priority for converting from local to ETrade Order
// var stopOrder = childOrders.FirstOrDefault(o => o.priceType == "STOP");
// if (stopOrder != null)
// {
// if (eTradeOrderActivated)
// {
// stopOrder.orderStatus = "OPEN";
// var updateOrderResult = await _databaseService.UpdateOrder(stopOrder);
// _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, "ORDER_MODIFIED", stopOrder)));
// }
// else
// {
// stopOrder.orderStatus = "NEW";
// stopOrder.replacing = true;
// stopOrder.convertFromLocal = true;
// stopOrder.localExecutionType = null;
// await PlaceOrder(stopOrder, false);
// eTradeOrderActivated = true;
// }
// }
// //Limit Order has 3rd priority for converting from local to ETrade Order
// var limitOrder = childOrders.FirstOrDefault(o => o.priceType == "LIMIT");
// if (limitOrder != null)
// {
// if (eTradeOrderActivated)
// {
// limitOrder.orderStatus = "OPEN";
// var updateOrderResult = await _databaseService.UpdateOrder(limitOrder);
// _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, "ORDER_MODIFIED", limitOrder)));
// }
// else
// {
// limitOrder.orderStatus = "NEW";
// limitOrder.replacing = true;
// limitOrder.convertFromLocal = true;
// limitOrder.localExecutionType = null;
// await PlaceOrder(limitOrder, false);
// eTradeOrderActivated = true;
// }
// }
// //Time Stop Not Supported by ETrade
// var marketOrder = childOrders.FirstOrDefault(o => o.priceType == "MARKET" && o.localExecutionType == "time-exit");
// if (marketOrder != null)
// {
// marketOrder.orderStatus = "OPEN";
// var updateOrderResult = await _databaseService.UpdateOrder(marketOrder);
// _portfolioDataSubscriber?.OnMessage(jcso(new OrderMessage("Order Update", TimeZoneNow, "ORDER_MODIFIED", marketOrder)));
// }
// }
// catch (Exception ex)
// {
// LogError(ex.Message, ex);
// }
//}
#endregion
private BrokerOrder CreateBrokerOrder(string accountId, AlpacaOrder order)
{
return new BrokerOrder()
{
brokerId = _brokerId,
tiUser = _brokerUserSession.UserName,
brokerAccountIdentifier = accountId,
positionId = order.asset_id,
brokerOrderId = order.id,
// brokerOrderNumber = order.id,
orderType = order.order_class,
securityType = order.asset_class,
symbol = order.symbol,
orderAction = order.side.ToUpper(),
isEntry = true,
orderedQuantity = order.qty,
filledQuantity = order.filled_qty,
averageExecutionPrice = order.filled_avg_price,
estimatedCommission = 0,
orderStatus = ConvertOrderStatus(order.status),
priceType = order.type,
stopPrice = order.stop_price,
limitPrice = order.limit_price,
trailPrice = order.trail_price,
offsetType = "percent",
offsetValue = order.trail_percent,
term = GetBrokerTimeInForce(order.time_in_force),
createdWhen = order.created_at.Value,
updatedWhen = order.updated_at.Value,
executedWhen = order.filled_at
};
}
private AlpacaOrder CreateAlpacaOrder(BrokerOrder order)
{
var alpacaOrder = new AlpacaOrder()
{
client_order_id = order.localOrderId,
order_class = order.orderType,
type = order.priceType,
symbol = order.symbol,
time_in_force = GetAlpacaTimeInForce(order.term),
limit_price = order.limitPrice,
stop_price = order.stopPrice,
side = order.orderAction.ToLower(),
qty = order.orderedQuantity,
extended_hours = order.outsideRegularTradingHours
};
return alpacaOrder;
}
private string GetAlpacaTimeInForce(string timeInForce, string orderAction = "")
{
if (orderAction == "SELL_SHORT")
return "day";
switch (timeInForce.ToUpper().Replace(" ", "_"))
{
case "GTC":
return "gtc";
case "GDA":
return "day";
default:
return "day";
}
}
private string GetBrokerTimeInForce(string duration)
{
switch (duration.ToUpper().Replace(" ", "_"))
{
case "GTC":
return "GTC";
case "DAY":
return "GDA";
case "FOK":
return "FOK";
default:
return "GDA";
}
}
private string GetOrderAction(string alpacaOrderStatus)
{
switch (alpacaOrderStatus.ToUpper().Replace(" ", "_"))
{
case "ACCEPTED":
return "ACCEPTED";
case "NEW":
case "PENDING_NEW":
return "NEW";
case "OPEN":
return "OPEN";
case "ORDER_MODIFIED":
return "ORDER_MODIFIED";
case "PENDING_CANCEL":
return "PENDING_CANCEL";
case "CANCELED":
case "CANCELLED":
case "EXPIRED":
return "CANCELLED";
case "FILL":
case "FILLED":
case "EXECUTED":
return "EXECUTED";
case "REJECTED":
return "REJECTED";
case "ERROR":
return "ERROR";
default:
return "ERROR";
}
}
private string ConvertOrderStatus(string alpacaOrderStatus)
{
return alpacaOrderStatus.ToUpper().Replace(" ", "_");
}
private int GetShares(int shares, double percentage)
{
return Math.Abs((int)Math.Round(shares * percentage / 100, 0));
}
private string jcso(object data)
{
return JsonConvert.SerializeObject(data);
}
private bool ShouldPlaceLocal(BrokerAccount account, BrokerOrder order)
{
if (order.goodAfterTime.HasValue)
return true;
if (order.isEntry ?? false)
return false;
var quantity = account.Orders.Where(o =>
!o.IsLocal
&& o.brokerOrderId != order.brokerOrderId
&& o.symbol == order.symbol
&& (o.orderStatus == "OPEN" || o.orderStatus == "NEW")
&& o.orderAction == order.orderAction
&& (o.priceType == "LIMIT" || o.priceType == "STOP" || o.priceType == "TRAILING_STOP_CNST")).Sum(o => o.orderedQuantity);
var position = account.GetPosition(order.symbol);
var positionQuantity = position == null ? 0 : position.shares;
return (quantity + order.orderedQuantity) > Math.Abs(positionQuantity);
}
private string ParseOrderId(string value)
{
var pos = value.IndexOf('.');
return pos > -1 ? value.Substring(0, pos) : value;
}
private BrokerAccount GetAccount(string accountId)
{
return _brokerAccounts.FirstOrDefault(a => a.account_identifier == accountId);
}
//private List GetOrders(BrokerAccount account)
//{
// //This function retrives a new order snapshot that is used to detect new orders and/or order changes
// //when compared to the prior snapshot.
// //ETrades order api date filters only apply to the order creation date, not the executed or updated dates
// //Therefore is is most efficient to check for the absense of orders in subsequent snapshots to detect
// //the order status changes on orders with a previous status of OPEN.
// DateTime ordersTimeFrameEnd = TimeZoneNow.Date;
// DateTime ordersTimeFrameStart = ordersTimeFrameEnd.AddDays(-1);
// DateTime oldest = DateTime.MinValue;
// List orders = new List();
// var fetchTasks = new List>>();
// try
// {
// //Get the orders that have an OPEN status and that were Placed prior to the current day
// var pastOpenOrders = account.GetOrders(false, "OPEN,CANCEL_REQUESTED", TimeZoneNow);
// if (pastOpenOrders != null && pastOpenOrders.Count > 0)
// {
// //1. Retrieve all Open Orders within the date range determinded by the prior open orders snapshot
// oldest = pastOpenOrders.Select(o => o.createdWhen).Min();
// fetchTasks.Add(
// _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.account_key}/orders?status=OPEN&fromDate={oldest:MMddyyyy}&toDate={ordersTimeFrameEnd.AddDays(-1):MMddyyyy}")
// );
// }
// //2. Get Today's orders
// fetchTasks.Add(
// _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.account_key}/orders?count=100&fromDate={ordersTimeFrameStart:MMddyyyy}&toDate={ordersTimeFrameEnd:MMddyyyy}")
// );
// Task.WaitAll(fetchTasks.ToArray());
// for (int i = 0; i < fetchTasks.Count; i++)
// {
// if (fetchTasks[i].Result.IsSuccess && fetchTasks[i].Result.Data != null)
// {
// var taskOrders = fetchTasks[i].Result.Data.OrdersResponse.Order;
// if (i == (fetchTasks.Count - 1))
// {
// //Add all orders from 2. that aren't in pastOpenOrders
// var notInPastOrders = taskOrders.Where(o => FromUnixTimeMilliseconds(o.OrderDetail[0].placedTime).Date == ordersTimeFrameEnd && !pastOpenOrders.Any(po => po.brokerOrderId == o.orderId)).ToList();
// if (notInPastOrders.Count > 0)
// orders.AddRange(notInPastOrders);
// //Check for any Status Changes
// var statusChangedOrders = taskOrders.Where(o => pastOpenOrders.Any(po => po.brokerOrderId == o.orderId && po.orderStatus != o.OrderDetail[0].status)).ToList();
// if (statusChangedOrders.Count > 0)
// orders.AddRange(statusChangedOrders);
// }
// else
// {
// //3. Detect the OPEN orders missing from pastOpenOrders, we need to get their order details from ETrade to get the new status
// var ordersChanged = pastOpenOrders.Where(o => !taskOrders.Any(oc => oc.orderId == o.brokerOrderId));
// if (ordersChanged != null && ordersChanged.Count() > 0)
// {
// var detailsTasks = new List>>();
// foreach (var order in ordersChanged)
// {
// detailsTasks.Add(
// _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/v1/accounts/{account.account_key}/orders/{order.brokerOrderId}")
// );
// }
// Task.WaitAll(detailsTasks.ToArray());
// foreach (var task in detailsTasks)
// {
// if (task.Result.IsSuccess && task.Result.Data != null)
// orders.AddRange(task.Result.Data.OrdersResponse.Order);
// }
// }
// }
// }
// }
// }
// catch (Exception ex)
// {
// LogError(ex.Message, ex);
// }
// return orders;
//}
//private async Task UpdateBrokerOrder(BrokerOrder brokerOrder, Order refreshedOrder)
//{
// try
// {
// var orderDetail = refreshedOrder.OrderDetail.First();
// var instrument = orderDetail.Instrument.First();
// brokerOrder.filledQuantity = instrument.filledQuantity;
// brokerOrder.averageExecutionPrice = instrument.averageExecutionPrice;
// brokerOrder.estimatedCommission = instrument.estimatedCommission;
// brokerOrder.replaceOrderId = orderDetail.replacedByOrderId;
// brokerOrder.orderStatus = orderDetail.status;
// brokerOrder.stopPrice = orderDetail.stopPrice == 0 ? null : orderDetail.stopPrice;
// brokerOrder.limitPrice = orderDetail.limitPrice == 0 ? null : orderDetail.limitPrice;
// brokerOrder.offsetType = orderDetail.offsetType;
// brokerOrder.offsetValue = orderDetail.offsetValue;
// brokerOrder.term = orderDetail.orderTerm;
// brokerOrder.createdWhen = FromUnixTimeMilliseconds(orderDetail.placedTime);
// brokerOrder.updatedWhen = FromUnixTimeMilliseconds(refreshedOrder.Events == null ? orderDetail.placedTime : refreshedOrder.Events.Event.Last().dateTime);
// brokerOrder.executedWhen = orderDetail.executedTime.HasValue ? FromUnixTimeMilliseconds(orderDetail.executedTime) : (DateTime?)null;
// await _databaseService.UpdateOrder(brokerOrder);
// return true;
// }
// catch (Exception)
// {
// return false;
// }
//}
//private BrokerOrder CreateBrokerOrder(string accountId, Order order)
//{
// var orderDetail = order.OrderDetail.First();
// var instrument = orderDetail.Instrument.First();
// return new BrokerOrder()
// {
// brokerId = _brokerId,
// tiUser = _brokerUserSession.UserName,
// brokerAccountIdentifier = accountId,
// brokerOrderId = order.orderId,
// brokerOrderNumber = orderDetail.orderNumber,
// orderType = order.orderType,
// securityType = instrument.Product.securityType,
// symbol = instrument.Product.symbol,
// orderAction = instrument.orderAction,
// isEntry = instrument.orderAction == "BUY" || instrument.orderAction == "SELL_SHORT",
// orderedQuantity = instrument.orderedQuantity,
// filledQuantity = instrument.filledQuantity,
// averageExecutionPrice = instrument.averageExecutionPrice,
// estimatedCommission = instrument.estimatedCommission,
// orderStatus = orderDetail.status,
// priceType = orderDetail.priceType,
// stopPrice = orderDetail.stopPrice,
// limitPrice = orderDetail.limitPrice,
// trailPrice = orderDetail.trailPrice,
// offsetType = orderDetail.offsetType,
// offsetValue = orderDetail.offsetValue,
// term = orderDetail.orderTerm,
// createdWhen = FromUnixTimeMilliseconds(orderDetail.placedTime),
// updatedWhen = FromUnixTimeMilliseconds(order.Events == null ? orderDetail.placedTime : order.Events.Event.Last().dateTime),
// executedWhen = orderDetail.executedTime.HasValue ? FromUnixTimeMilliseconds(orderDetail.executedTime) : (DateTime?)null,
// };
//}
private BrokerOrder CopyBrokerOrder(BrokerOrder order, string action, decimal? orderedQuantity, string orderStatus)
{
return new BrokerOrder()
{
tiUser = order.tiUser,
brokerId = _brokerId,
brokerParentOrderId = order.brokerParentOrderId,
isEntry = action == "BUY" || action == "SELL_SHORT",
createdWhen = TimeZoneNow,
updatedWhen = TimeZoneNow,
brokerAccountIdentifier = order.brokerAccountIdentifier,
goodAfterTime = order.goodAfterTime,
goodTillTime = order.goodTillTime,
securityType = order.securityType,
orderType = order.orderType,
term = order.term,
priceType = order.priceType,
limitPrice = order.limitPrice,
stopPrice = order.stopPrice,
orderAction = action,
orderedQuantity = orderedQuantity.HasValue ? orderedQuantity.Value : order.orderedQuantity,
symbol = order.symbol,
ocaGroup = order.ocaGroup,
strategyName = order.strategyName,
orderStatus = orderStatus
};
}
private void ConfigureBrokerOrder(BrokerOrder order)
{
order.brokerId = _brokerId;
order.tiUser = _brokerUserSession.UserName;
order.isEntry = order.orderAction == "BUY" || order.orderAction == "SELL_SHORT";
order.createdWhen = TimeZoneNow;
order.updatedWhen = TimeZoneNow;
order.orderStatus = "NEW";
}
private async Task AddOrder(BrokerAccount account, BrokerOrder order)
{
var addOrderResult = await _databaseService.AddOrder(order);
if (addOrderResult.IsSuccess)
order.id = addOrderResult.Obj.id;
return account.AddOrder(order);
}
private List GetPendingExitOrders(string accountIdentifier, string symbol, string orderAction)
{
var pendingExitOrders = new List();
var account = GetAccount(accountIdentifier);
pendingExitOrders.AddRange(account.Orders.Where(o =>
o.symbol == symbol
&& o.orderStatus != "CANCELLED"
&& o.orderStatus != "EXECUTED"
&& o.orderStatus != "REJECTED"
&& o.orderAction.Contains(orderAction)));
return pendingExitOrders;
}
private List GetGroupOrders(BrokerOrder brokerOrder)
{
var groupOrdersToCancel = new List();
var account = GetAccount(brokerOrder.brokerAccountIdentifier);
var order = account.GetOrder(brokerOrder.brokerOrderId);
if (order != null)
{
if (string.IsNullOrEmpty(order.ocaGroup))
{
groupOrdersToCancel.AddRange(account.Orders.Where(o =>
o.orderStatus != "CANCELLED"
&& o.orderStatus != "EXECUTED"
&& o.orderStatus != "REJECTED"
&& o.brokerParentOrderId == order.brokerOrderId));
}
else
{
groupOrdersToCancel.AddRange(account.Orders.Where(o =>
o.brokerOrderId != order.brokerOrderId
&& o.orderStatus != "CANCELLED"
&& o.orderStatus != "EXECUTED"
&& o.orderStatus != "REJECTED"
&& o.ocaGroup == order.ocaGroup
&& o.orderAction == order.orderAction));
}
}
return groupOrdersToCancel;
}
private List> GetOrderActions(bool buy, decimal positionQuantity, decimal orderQuantity)
{
var orderActions = new List>();
if (positionQuantity <= 0 && !buy)
{
orderActions.Add(Tuple.Create("SELL_SHORT", orderQuantity));
}
else if (positionQuantity > 0 && !buy)
{
var remainder = positionQuantity - orderQuantity;
if (remainder >= 0)
orderActions.Add(Tuple.Create("SELL", orderQuantity));
else
{
orderActions.Add(Tuple.Create("SELL", Math.Abs(positionQuantity)));
orderActions.Add(Tuple.Create("SELL_SHORT", Math.Abs(remainder)));
}
}
else if (positionQuantity >= 0 && buy)
{
orderActions.Add(Tuple.Create("BUY", orderQuantity));
}
else if (positionQuantity < 0 && buy)
{
var remainder = positionQuantity + orderQuantity;
if (remainder <= 0)
orderActions.Add(Tuple.Create("BUY_TO_COVER", orderQuantity));
else
{
orderActions.Add(Tuple.Create("BUY_TO_COVER", Math.Abs(positionQuantity)));
orderActions.Add(Tuple.Create("BUY", Math.Abs(remainder)));
}
}
return orderActions;
}
private BrokerOrder GetOrder(string brokerAccountIdentifier, string brokerOrderId)
{
var account = GetAccount(brokerAccountIdentifier);
return account.GetOrder(brokerOrderId);
}
private BrokerOrder GetExitOrder(BrokerOrder brokerOrder)
{
var exitOrders = new List();
var account = GetAccount(brokerOrder.brokerAccountIdentifier);
var order = account.GetOrder(brokerOrder.brokerOrderId);
if (order != null)
{
exitOrders = account.Orders.Where(o =>
o.orderStatus == "OPEN"
&& (o.priceType == "STOP" || o.priceType == "LIMIT" || o.priceType == "STOP_LIMIT")
&& !o.IsLocal
&& o.symbol == order.symbol
&& o.orderAction == order.orderAction
&& o.orderedQuantity == order.orderedQuantity).ToList();
}
return exitOrders.FirstOrDefault();
}
private async Task CancelGroupOrders(BrokerOrder order)
{
var ordersToCancel = new List();
ordersToCancel.AddRange(GetGroupOrders(order));
if (ordersToCancel.Count > 0)
await CancelOrders(ordersToCancel);
}
private DateTime TimeZoneNow
{
get
{
return DateTime.UtcNow;
//return TimeZoneInfo
// .ConvertTimeFromUtc(
// DateTime.UtcNow,
// _timeZoneInfo);
}
}
private DateTime FromUnixTimeMilliseconds(long? time)
{
return DateTimeOffset.FromUnixTimeMilliseconds(time ?? 0).UtcDateTime;
}
private void UpdateLogHeader()
{
_logHeader.sessionId = _brokerUserSession.UserSessionId.ToString();
_logHeader.broker = _brokerIntegration.broker;
_logHeader.userName = _brokerUserSession.UserName;
//_logHeader.appid = _brokerUserSession.ap ToString();
//_logHeader.version = _brokerUserSession . application_version;
_oAuthSession.UpdateLogHeader(_logHeader);
}
private void LogInfo(string message, object data = null)
{
//May Need Lock
_logHeader.message = message;
_logHeader.data = data;
var j = JsonConvert.SerializeObject(_logHeader);
j = j.Replace(@"\", "");
_logger.LogInformation($"{JsonConvert.SerializeObject(_logHeader)}");
}
private void LogWarn(string message, object data = null)
{
//May Need Lock
_logHeader.message = message;
_logHeader.data = data;
_logger.LogWarning("{Message}", JsonConvert.SerializeObject(_logHeader));
}
private void LogError(string message, Exception ex)
{
//May Need Lock
_logHeader.message = message;
_logHeader.data = null;
_logger.LogError(ex, "{Message}", JsonConvert.SerializeObject(_logHeader));
}
private void Notify(NotificationType notificationType, object data)
{
try
{
switch (notificationType)
{
case NotificationType.ExpiredToken:
break;
}
}
catch (Exception)
{
}
}
private async Task GetNextOrderIdAsync()
{
var nextOrderIdResult = await _databaseService.GetBrokerUserNextOrderId(_brokerUserSession.BrokerUserId);
return $"L{nextOrderIdResult.Obj}";
}
private string NewClientOrderId()
{
_clientOrderId++;
return $"{DateTime.Now:yyyyHHmm}{_clientOrderId}";
}
private bool IsUserConnected
{
get { return !(_portfolioDataSubscriber == null); }
}
#region Workers
private async Task RefreshAccount(BrokerAccount brokerAccount, string symbol)
{
try
{
if (!await _semaphoreSlim.WaitAsync(SEMAPHORE_WAIT_TIME))
{
LogWarn("Could not enter Semaphore from GetUserPortfolioLinkedAccounts()");
}
var accountResult = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/{ALPACA_API_VERSION}/account");
var alpacaAccount = accountResult.Data;
brokerAccount.account_value = alpacaAccount.portfolio_value ?? 0;
brokerAccount.available_funds = alpacaAccount.cash ?? 0;
brokerAccount.buying_power = alpacaAccount.buying_power ?? 0;
brokerAccount.buying_power_in_use = alpacaAccount.daytrading_buying_power ?? 0;
var positionsResult = await _oAuthSession.SendRequestAsync(HttpMethod.Get, $"/{ALPACA_API_VERSION}/positions/{symbol.Replace("/","")}");
if (positionsResult.Data != null )
{
var position = positionsResult.Data;
var positionIndex = brokerAccount.GetPositionIndex(symbol);
if (positionIndex < 0)
{
brokerAccount.AddPosition(
new BrokerPosition()
{
id = 0,
broker_id = _brokerId,
broker_account_identifier = brokerAccount.account_identifier,
broker_position_id = position.asset_id,
symbol = position.symbol,
shares = position.qty ?? 0,
average_open_price = position.avg_entry_price ?? 0,
last_trade = position.lastday_price ?? 0,
realized_profit_loss = position.unrealized_pl
}
);
}
else
{
brokerAccount.Positions[positionIndex].average_open_price = position.avg_entry_price;
brokerAccount.Positions[positionIndex].shares = position.qty ?? 0;
}
}
_portfolioDataSubscriber?.OnMessage(jcso(new AccountMessage("Account Update",
TimeZoneNow,
"Update",
brokerAccount)));
}
catch (Exception ex)
{
LogError(ex.Message, ex);
return false;
}
finally
{
_semaphoreSlim.Release(true);
}
return true;
}
private async Task ExecutionWorker()
{
try
{
while (_streaming)
{
await Task.Delay(EXECUTION_WORKER_INTERVAL);
if (!_portfolioLoaded || _brokerAccounts == null || _brokerAccounts.Count == 0) continue;
var ordersToMonitor = _brokerAccounts.SelectMany(a => a.Orders.Where(o => o.Monitor())).ToList();
//If the user is not connected and there are no more orders to monitor and !StreamingReconnectPending
//Then we can end the sesssion
if (!IsUserConnected && ordersToMonitor.Count == 0 && (StatusTypes)_brokerUserSession.StatusTypeId != StatusTypes.StreamingReconnectPending)
{
_streaming = false;
_ = Task.Run(async () => await EndSession(StatusTypes.SessionEnded));
}
foreach (var order in ordersToMonitor)
{
try
{
var triggered = false;
if (order.localExecutionType == "time-cancel" && TimeZoneNow > order.goodTillTime)
{
await CancelOrders(new List() { order });
}
else if (order.localExecutionType == "time-exit" && TimeZoneNow > order.goodAfterTime)
{
triggered = true;
}
if (triggered)
{
var exitOrder = GetExitOrder(order);
order.replacing = false;
order.convertFromLocal = true;
order.priceType = "market";
order.orderStatus = "WORKING";
order.localExecutionType = null;
order.stopPrice = null;
order.limitPrice = null;
order.goodAfterTime = null;
if (exitOrder != null)
{
await ReplaceOrder(order, exitOrder);
}
else
await PlaceOrders(new List() { order });
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
}
}
}
catch (Exception ex)
{
LogError(ex.Message, ex);
}
}
private bool IsOrderTriggered(BrokerOrder order, decimal last)
{
var trigger = false;
switch (order.orderAction)
{
case "BUY":
case "BUY_TO_COVER":
switch (order.priceType)
{
case "LIMIT":
if (last <= order.limitPrice)
{
trigger = true;
}
break;
case "STOP":
if (last >= order.stopPrice)
{
trigger = true;
}
break;
}
break;
case "SELL":
case "SELL_SHORT":
switch (order.priceType)
{
case "LIMIT":
if (last >= order.limitPrice)
{
trigger = true;
}
break;
case "STOP":
if (last <= order.stopPrice)
{
trigger = true;
}
break;
}
break;
}
return trigger;
}
#endregion
public void Dispose()
{
}
}
}