diff --git a/README.md b/README.md index c2807f8..601923a 100644 --- a/README.md +++ b/README.md @@ -14,16 +14,20 @@ pip install smartapi-python ```python # package import statement -from smartapi.smartConnect import SmartConnect +from smartapi import SmartConnect #or from smartapi.smartConnect import SmartConnect +#import smartapi.smartExceptions(for smartExceptions) #create object of call -obj=SmartConnect() +obj=SmartConnect(api_key="your api key") #login api call -data = obj.generateSession("D88311","Angel@444") +data = obj.generateSession("Your Client ID","Your Password") refreshToken= data['data']['refreshToken'] +#fetch the feedtoken +feedToken=obj.getfeedToken() + #fetch User Profile userProfile= obj.getProfile(refreshToken) #place order @@ -46,16 +50,81 @@ try: print("The order id is: {}".format(orderId)) except Exception as e: print("Order placement failed: {}".format(e.message)) +#gtt rule creation +try: + gttCreateParams={ + "tradingsymbol" : "SBIN-EQ", + "symboltoken" : "3045", + "exchange" : "NSE", + "producttype" : "MARGIN", + "transactiontype" : "BUY", + "price" : 100000, + "qty" : 10, + "disclosedqty": 10, + "triggerprice" : 200000, + "timeperiod" : 365 + } + rule_id=gtt.gttCreateRule(gttCreateParams) + print("The GTT rule id is: {}".format(rule_id)) +except Exception as e: + print("GTT Rule creation failed: {}".format(e.message)) + +#gtt rule list +try: + status=["FORALL"] #should be a list + page=1 + count=10 + lists=smartApi.gttLists(status,page,count) +except Exception as e: + print("GTT Rule List failed: {}".format(e.message)) +#Historic api +try: + historicParam={ + "exchange": "NSE", + "symboltoken": "3045", + "interval": "MINUTE", + "fromdate": "2021-02-08 09:00", + "todate": "2021-02-08 09:16" + } + smartApi.getCandleData(historicParam) +except Exception as e: + print("Historic Api failed: {}".format(e.message)) #logout try: - logout=obj.terminateSession('D88311') + logout=obj.terminateSession('Your Client Id') print("Logout Successfull") except Exception as e: print("Logout failed: {}".format(e.message)) -License -MIT + +## WebSocket +from smartapi import WebSocket +FEED_TOKEN= "your feed token" +CLIENT_CODE="your client Id" +token="channel you want the information of" #"nse_cm|2885&nse_cm|1594&nse_cm|11536" +task="task" #"mw"|"sfi"|"dp" +ss = WebSocket(FEED_TOKEN, CLIENT_CODE) + +def on_tick(ws, tick): + print("Ticks: {}".format(tick)) + +def on_connect(ws, response): + ws.websocket_connection() # Websocket connection + ws.send_request(token,task) + +def on_close(ws, code, reason): + ws.stop() + +# Assign the callbacks. +ss.on_ticks = on_tick +ss.on_connect = on_connect +ss.on_close = on_close + +ss.connect( ) + + + diff --git a/SmartApi/__init__.py b/SmartApi/__init__.py index fd40910..4ddea18 100644 --- a/SmartApi/__init__.py +++ b/SmartApi/__init__.py @@ -1,3 +1,9 @@ +from __future__ import unicode_literals,absolute_import + +from smartapi.smartConnect import SmartConnect +from smartapi.webSocket import WebSocket + +__all__ = ["SmartConnect","WebSocket"] diff --git a/SmartApi/__pycache__/__init__.cpython-36.pyc b/SmartApi/__pycache__/__init__.cpython-36.pyc index d9cb643..f619741 100644 Binary files a/SmartApi/__pycache__/__init__.cpython-36.pyc and b/SmartApi/__pycache__/__init__.cpython-36.pyc differ diff --git a/SmartApi/__pycache__/smartConnect.cpython-36.pyc b/SmartApi/__pycache__/smartConnect.cpython-36.pyc index 3f5e480..7a7be23 100644 Binary files a/SmartApi/__pycache__/smartConnect.cpython-36.pyc and b/SmartApi/__pycache__/smartConnect.cpython-36.pyc differ diff --git a/SmartApi/__pycache__/smartExceptions.cpython-36.pyc b/SmartApi/__pycache__/smartExceptions.cpython-36.pyc index 8f9b5aa..0abc8e0 100644 Binary files a/SmartApi/__pycache__/smartExceptions.cpython-36.pyc and b/SmartApi/__pycache__/smartExceptions.cpython-36.pyc differ diff --git a/SmartApi/__pycache__/smartSocket.cpython-36.pyc b/SmartApi/__pycache__/smartSocket.cpython-36.pyc new file mode 100644 index 0000000..99c5ce7 Binary files /dev/null and b/SmartApi/__pycache__/smartSocket.cpython-36.pyc differ diff --git a/SmartApi/__pycache__/socketTP.cpython-36.pyc b/SmartApi/__pycache__/socketTP.cpython-36.pyc index 5217bd2..5b56185 100644 Binary files a/SmartApi/__pycache__/socketTP.cpython-36.pyc and b/SmartApi/__pycache__/socketTP.cpython-36.pyc differ diff --git a/SmartApi/__pycache__/version.cpython-36.pyc b/SmartApi/__pycache__/version.cpython-36.pyc index 289b8a7..5c18fcb 100644 Binary files a/SmartApi/__pycache__/version.cpython-36.pyc and b/SmartApi/__pycache__/version.cpython-36.pyc differ diff --git a/SmartApi/smartConnect.py b/SmartApi/smartConnect.py index 78d3d60..b1de3ad 100644 --- a/SmartApi/smartConnect.py +++ b/SmartApi/smartConnect.py @@ -1,7 +1,5 @@ from six.moves.urllib.parse import urljoin import sys - -sys.path.append('c:\AngelSmartApi\SmartApi') import csv import json import dateutil.parser @@ -13,58 +11,19 @@ from requests import get import re, uuid import socket - +import platform from smartapi.version import __version__, __title__ log = logging.getLogger(__name__) - +#user_sys=platform.system() +#print("the system",user_sys) class SmartConnect(object): - _rootUrl = "https://openapisuat.angelbroking.com" - _loginUrl = "https://openapisuat.angelbroking.com/rest/auth/angelbroking/user/v1/loginByPassword" - _login_url ="https://smartapi.angelbroking.com/login" + #_rootUrl = "https://openapisuat.angelbroking.com" + _rootUrl="https://apiconnect.angelbroking.com" #prod endpoint + #_login_url ="https://smartapi.angelbroking.com/login" + _login_url="https://smartapi.angelbroking.com/publisher-login" #prod endpoint _default_timeout = 7 # In seconds - # Products - PRODUCT_MIS = "MIS" - - PRODUCT_CNC = "CNC" - PRODUCT_NRML = "NRML" - PRODUCT_CO = "CO" - PRODUCT_BO = "BO" - - # Order types - ORDER_TYPE_MARKET = "MARKET" - ORDER_TYPE_LIMIT = "LIMIT" - ORDER_TYPE_SLM = "SL-M" - ORDER_TYPE_SL = "SL" - - # Varities - VARIETY_REGULAR = "regular" - VARIETY_BO = "bo" - VARIETY_CO = "co" - VARIETY_AMO = "amo" - - # Transaction type - TRANSACTION_TYPE_BUY = "BUY" - TRANSACTION_TYPE_SELL = "SELL" - - # Validity - VALIDITY_DAY = "DAY" - VALIDITY_IOC = "IOC" - - # Exchanges - EXCHANGE_NSE = "NSE" - EXCHANGE_BSE = "BSE" - EXCHANGE_NFO = "NFO" - EXCHANGE_CDS = "CDS" - EXCHANGE_BFO = "BFO" - EXCHANGE_MCX = "MCX" - EXCHANGE_NCDEX="NCDEX" - - # Status constants - STATUS_COMPLETE = "COMPLETE" - STATUS_REJECTED = "REJECTED" - STATUS_CANCELLED = "CANCELLED" _routes = { "api.login":"/rest/auth/angelbroking/user/v1/loginByPassword", @@ -83,20 +42,59 @@ class SmartConnect(object): "api.rms.limit": "/rest/secure/angelbroking/user/v1/getRMS", "api.holding": "/rest/secure/angelbroking/portfolio/v1/getHolding", "api.position": "/rest/secure/angelbroking/order/v1/getPosition", - "api.convert.position": "/rest/secure/angelbroking/order/v1/convertPosition" + "api.convert.position": "/rest/secure/angelbroking/order/v1/convertPosition", + + "api.gtt.create":"/gtt-service/rest/secure/angelbroking/gtt/v1/createRule", + "api.gtt.modify":"/gtt-service/rest/secure/angelbroking/gtt/v1/modifyRule", + "api.gtt.cancel":"/gtt-service/rest/secure/angelbroking/gtt/v1/cancelRule", + "api.gtt.details":"/rest/secure/angelbroking/gtt/v1/ruleDetails", +<<<<<<< HEAD + "api.gtt.list":"/rest/secure/angelbroking/gtt/v1/ruleList" +======= + "api.gtt.list":"/rest/secure/angelbroking/gtt/v1/ruleList", + + "api.candle.data":"/rest/secure/angelbroking/historical/v1/getCandleData" +>>>>>>> 7cd96b4aadd572e456013fa73f9e89421f958414 } - def __init__(self, api_key=None, access_token=None, refresh_token=None, userId=None, root=None, debug=False, timeout=None, proxies=None, pool=None, disable_ssl=False): + + try: + clientPublicIp= " " + get('https://api.ipify.org').text + if " " in clientPublicIp: + clientPublicIp=clientPublicIp.replace(" ","") + hostname = socket.gethostname() + clientLocalIp=socket.gethostbyname(hostname) + except Exception as e: + print("Exception while retriving IP Address,using local host IP address",e) + finally: + clientPublicIp="106.193.147.98" + clientLocalIp="127.0.0.1" + clientMacAddress=':'.join(re.findall('..', '%012x' % uuid.getnode())) + accept = "application/json" + userType = "USER" + sourceID = "WEB" + + + def __init__(self, api_key=None, access_token=None, refresh_token=None,feed_token=None, userId=None, root=None, debug=False, timeout=None, proxies=None, pool=None, disable_ssl=False,accept=None,userType=None,sourceID=None,Authorization=None,clientPublicIP=None,clientMacAddress=None,clientLocalIP=None,privateKey=None): self.debug = debug self.api_key = api_key self.session_expiry_hook = None self.disable_ssl = disable_ssl self.access_token = access_token self.refresh_token = refresh_token + self.feed_token = feed_token self.userId = userId self.proxies = proxies if proxies else {} - self.root = root or self._loginUrl + self.root = root or self._rootUrl self.timeout = timeout or self._default_timeout + self.Authorization= None + self.clientLocalIP=self.clientLocalIp + self.clientPublicIP=self.clientPublicIp + self.clientMacAddress=self.clientMacAddress + self.privateKey=api_key + self.accept=self.accept + self.userType=self.userType + self.sourceID=self.sourceID if pool: self.reqsession = requests.Session() @@ -108,6 +106,17 @@ def __init__(self, api_key=None, access_token=None, refresh_token=None, userId=N # disable requests SSL warning requests.packages.urllib3.disable_warnings() + def requestHeaders(self): + return{ + "Content-type":self.accept, + "X-ClientLocalIP": self.clientLocalIp, + "X-ClientPublicIP": self.clientPublicIp, + "X-MACAddress": self.clientMacAddress, + "Accept": self.accept, + "X-PrivateKey": self.privateKey, + "X-UserType": self.userType, + "X-SourceID": self.sourceID + } def setSessionExpiryHook(self, method): if not callable(method): @@ -115,7 +124,7 @@ def setSessionExpiryHook(self, method): self.session_expiry_hook = method def getUserId(): - return userId; + return userId def setUserId(self,id): self.userId=id @@ -127,6 +136,14 @@ def setAccessToken(self, access_token): def setRefreshToken(self, refresh_token): self.refresh_token = refresh_token + + def setFeedToken(self,feedToken): + + self.feed_token=feedToken + + def getfeedToken(self): + return self.feed_token + def login_url(self): """Get the remote login url to which a user should be redirected to initiate the login flow.""" @@ -137,33 +154,12 @@ def _request(self, route, method, parameters=None): params = parameters.copy() if parameters else {} uri =self._routes[route].format(**params) - print(uri) url = urljoin(self.root, uri) - print(url) - hostname = socket.gethostname() - clientLocalIP=socket.gethostbyname(hostname) - clientPublicIP=get('https://api.ipify.org').text - macAddress = ':'.join(re.findall('..', '%012x' % uuid.getnode())) - privateKey = "test" - accept = "application/json" - userType = "USER" - sourceID = "WEB" + # Custom headers - headers = { - #"X-SmartApi-Version": "", - #"User-Agent": self._user_agent() - "Content-type":accept, - "X-ClientLocalIP": clientLocalIP, - "X-ClientPublicIP": clientPublicIP, - "X-MACAddress": macAddress, - "Accept": accept, - "X-PrivateKey": privateKey, - "X-UserType": userType, - "X-SourceID": sourceID - } + headers = self.requestHeaders() - #if self.api_key and self.access_token: if self.access_token: # set authorization header @@ -183,7 +179,7 @@ def _request(self, route, method, parameters=None): allow_redirects=True, timeout=self.timeout, proxies=self.proxies) - print("The Response Content",r.content) + except Exception as e: raise e @@ -234,22 +230,26 @@ def generateSession(self,clientCode,password): params={"clientcode":clientCode,"password":password} loginResultObject=self._postRequest("api.login",params) - jwtToken=loginResultObject['data']['jwtToken'] - self.setAccessToken(jwtToken) - refreshToken=loginResultObject['data']['refreshToken'] - self.setRefreshToken(refreshToken) - user=self.getProfile(refreshToken) - - id=user['data']['clientcode'] - #id='D88311' - print(id) - - self.setUserId(id) - user['data']['jwtToken']="Bearer "+jwtToken - user['data']['refreshToken']=refreshToken - print("USER",user) - return user - + + if loginResultObject['status']==True: + jwtToken=loginResultObject['data']['jwtToken'] + self.setAccessToken(jwtToken) + refreshToken=loginResultObject['data']['refreshToken'] + feedToken=loginResultObject['data']['feedToken'] + self.setRefreshToken(refreshToken) + self.setFeedToken(feedToken) + user=self.getProfile(refreshToken) + + id=user['data']['clientcode'] + #id='D88311' + self.setUserId(id) + user['data']['jwtToken']="Bearer "+jwtToken + user['data']['refreshToken']=refreshToken + + + return user + else: + return loginResultObject def terminateSession(self,clientCode): logoutResponseObject=self._postRequest("api.logout",{"clientcode":clientCode}) return logoutResponseObject @@ -257,36 +257,34 @@ def terminateSession(self,clientCode): def generateToken(self,refresh_token): response=self._postRequest('api.token',{"refreshToken":refresh_token}) jwtToken=response['data']['jwtToken'] + feedToken=response['data']['feedToken'] + self.setFeedToken(feedToken) self.setAccessToken(jwtToken) + return response def renewAccessToken(self): - - # h = hashlib.sha256(self.api_key.encode("utf-8") + refresh_token.encode("utf-8") + access_token.encode("utf-8")) - # checksum = h.hexdigest() - response =self._postRequest('api.refresh', { "jwtToken": self.access_token, "refreshToken": self.refresh_token, - #"checksum": checksum + }) tokenSet={} if "jwtToken" in response: tokenSet['jwtToken']=response['data']['jwtToken'] - tokenSet['clientcode']=self.userId + tokenSet['clientcode']=self. userId tokenSet['refreshToken']=response['data']["refreshToken"] return tokenSet def getProfile(self,refreshToken): user=self._getRequest("api.user.profile",{"refreshToken":refreshToken}) - print("USER PROFILE",user) return user def placeOrder(self,orderparams): - #params = {"exchange":orderparams.exchange,"symbolToken":orderparams.symboltoken,"transactionType":orderparams.transactionType,"quantity":orderparams.quantity,"price":orderparams.price,"productType":orderparams.producttype,"orderType":orderparams.ordertype,"duration":orderparams.duration,"variety":orderparams.variety,"tradingSymbol":orderparams.tradingsymbol,"triggerPrice":orderparams.trigger_price,"squareoff":orderparams.squareoff,"stoploss":orderparams.stoploss,"trailingStoploss":orderparams.trailing_stoploss,"tag":orderparams.tag} + params=orderparams for k in list(params.keys()): @@ -294,7 +292,7 @@ def placeOrder(self,orderparams): del(params[k]) orderResponse= self._postRequest("api.order.place", params)['data']['orderid'] - + return orderResponse def modifyOrder(self,orderparams): @@ -305,8 +303,6 @@ def modifyOrder(self,orderparams): del(params[k]) orderResponse= self._postRequest("api.order.modify", params) - #order=Order(orderResponse) - #order['orderId']=orderResponse['data']['orderid'] return orderResponse def cancelOrder(self, order_id,variety): @@ -352,6 +348,73 @@ def convertPosition(self,positionParams): return convertPositionResponse + def gttCreateRule(self,createRuleParams): + params=createRuleParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + + createGttRuleResponse=self._postRequest("api.gtt.create",params) +<<<<<<< HEAD + print(createGttRuleResponse) +======= + #print(createGttRuleResponse) +>>>>>>> 7cd96b4aadd572e456013fa73f9e89421f958414 + return createGttRuleResponse['data']['id'] + + def gttModifyRule(self,modifyRuleParams): + params=modifyRuleParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + modifyGttRuleResponse=self._postRequest("api.gtt.modify",params) +<<<<<<< HEAD + print(modifyGttRuleResponse) +======= + #print(modifyGttRuleResponse) +>>>>>>> 7cd96b4aadd572e456013fa73f9e89421f958414 + return modifyGttRuleResponse['data']['id'] + + def gttCancelRule(self,gttCancelParams): + params=gttCancelParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + + #print(params) + cancelGttRuleResponse=self._postRequest("api.gtt.cancel",params) + #print(cancelGttRuleResponse) + return cancelGttRuleResponse + + def gttDetails(self,id): + params={ + "id":id + } + gttDetailsResponse=self._postRequest("api.gtt.details",params) + return gttDetailsResponse + + def gttLists(self,status,page,count): + if type(status)== list: + params={ + "status":status, + "page":page, + "count":count + } + gttListResponse=self._postRequest("api.gtt.list",params) + #print(gttListResponse) + return gttListResponse + else: + message="The status param is entered as" +str(type(status))+". Please enter status param as a list i.e., status=['CANCELLED']" + return message + + def getCandleData(self,historicDataParams): + params=historicDataParams + for k in list(params.keys()): + if params[k] is None: + del(params[k]) + getCandleDataResponse=self._postRequest("api.candle.data",historicDataParams) + return getCandleDataResponse + def _user_agent(self): return (__title__ + "-python/").capitalize() + __version__ diff --git a/SmartApi/smartSocket.py b/SmartApi/smartSocket.py deleted file mode 100644 index cf32595..0000000 --- a/SmartApi/smartSocket.py +++ /dev/null @@ -1,80 +0,0 @@ -# Import WebSocket client library -from websocket import create_connection -# Connect to WebSocket API and subscribe to trade feed for XBT/USD and XRP/USD -import _thread -import time -import ssl -import six -import json -import base64 -import binascii -import re -import struct - -url="wss://omnefeeds.angelbroking.com/NestHtml5Mobile/socket/stream" -FEED_TOKEN='1096783294' -CLIENT_CODE='S212741' -MODE_FULL = "full" -MODE_QUOTE = "quote" -MODE_LTP = "ltp" -EXCHANGE_MAP = { - "nse": 1, - "nfo": 2, - "cds": 3, - "bse": 4, - "bfo": 5, - "bsecds": 6, - "mcx": 7, - "mcxsx": 8, - "indices": 9 -} -import websocket -try: - import thread -except ImportError: - import _thread as thread -import time - -def on_message(ws, message): - print("Printing the Messsage",message) - #print(message) - # data= binascii.a2b_uu(message) - # print(data) - # base64_message =binascii.a2b_base64(message) - # temp=bytes(message,encoding='ascii') - # print("temp",temp) - # resp=temp.decode() - resp=base64.b64decode(message) - print("response",resp) - ws.close() - -def on_error(ws, error): - print(error) - -def on_close(ws): - print("### closed ###") - -def on_open(ws): - def run(*args): - _req='{"task":"cn","channel":"","token":"' + FEED_TOKEN + '","user": "' + CLIENT_CODE + '","acctid":"' + CLIENT_CODE + '"}'; - ws.send(_req) - strwatchlistscrips = "nse_cm|2885&nse_cm|1594&nse_cm|11536"; - _req = '{"task":"cn","channel":"'+strwatchlistscrips+'","token":"' + FEED_TOKEN + '","user": "' + CLIENT_CODE + '","acctid":"' + CLIENT_CODE + '"}'; - ws.send(_req) - - - thread.start_new_thread(run, ()) - - -if __name__ == "__main__": - websocket.enableTrace(True) - ws = websocket.WebSocketApp(url, - on_message = on_message, - on_error = on_error, - on_close = on_close) - ws.on_open = on_open - - ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) - ws.close() - - diff --git a/SmartApi/smartWebsocket.py b/SmartApi/smartWebsocket.py new file mode 100644 index 0000000..8894a80 --- /dev/null +++ b/SmartApi/smartWebsocket.py @@ -0,0 +1,455 @@ + +import six +import sys +import time +import json +import struct +import logging +import threading +import base64 +import zlib +from datetime import datetime +from twisted.internet import reactor, ssl +from twisted.python import log as twisted_log +from twisted.internet.protocol import ReconnectingClientFactory +from autobahn.twisted.websocket import WebSocketClientProtocol, \ + WebSocketClientFactory, connectWS + +log = logging.getLogger(__name__) + +class SmartSocketClientProtocol(WebSocketClientProtocol): + + def __init__(self, *args, **kwargs): + super(SmartSocketClientProtocol,self).__init__(*args,**kwargs) + + def onConnect(self, response): # noqa + """Called when WebSocket server connection was established""" + self.factory.ws = self + + if self.factory.on_connect: + self.factory.on_connect(self, response) + + def onOpen(self): + if self.factory.on_open: + self.factory.on_open(self) + + + + def onMessage(self, payload, is_binary): # noqa + """Called when text or binary message is received.""" + if self.factory.on_message: + self.factory.on_message(self, payload, is_binary) + + + def onClose(self, was_clean, code, reason): # noqa + """Called when connection is closed.""" + if not was_clean: + if self.factory.on_error: + self.factory.on_error(self, code, reason) + + if self.factory.on_close: + self.factory.on_close(self, code, reason) + + +class SmartSocketClientFactory(WebSocketClientFactory,ReconnectingClientFactory): + protocol = SmartSocketClientProtocol + + maxDelay = 5 + maxRetries = 10 + + _last_connection_time = None + + def __init__(self, *args, **kwargs): + """Initialize with default callback method values.""" + self.debug = False + self.ws = None + self.on_open = None + self.on_error = None + self.on_close = None + self.on_message = None + self.on_connect = None + self.on_reconnect = None + self.on_noreconnect = None + + + super(SmartSocketClientFactory, self).__init__(*args, **kwargs) + + def startedConnecting(self, connector): # noqa + """On connecting start or reconnection.""" + if not self._last_connection_time and self.debug: + log.debug("Start WebSocket connection.") + + self._last_connection_time = time.time() + + def clientConnectionFailed(self, connector, reason): # noqa + """On connection failure (When connect request fails)""" + if self.retries > 0: + print("Retrying connection. Retry attempt count: {}. Next retry in around: {} seconds".format(self.retries, int(round(self.delay)))) + + # on reconnect callback + if self.on_reconnect: + self.on_reconnect(self.retries) + + # Retry the connection + self.retry(connector) + self.send_noreconnect() + + def clientConnectionLost(self, connector, reason): # noqa + """On connection lost (When ongoing connection got disconnected).""" + if self.retries > 0: + # on reconnect callback + if self.on_reconnect: + self.on_reconnect(self.retries) + + # Retry the connection + self.retry(connector) + self.send_noreconnect() + + def send_noreconnect(self): + """Callback `no_reconnect` if max retries are exhausted.""" + if self.maxRetries is not None and (self.retries > self.maxRetries): + if self.debug: + log.debug("Maximum retries ({}) exhausted.".format(self.maxRetries)) + + if self.on_noreconnect: + self.on_noreconnect() + +class SimpleWebSocket(object): + EXCHANGE_MAP = { + "nse": 1, + "nfo": 2, + "cds": 3, + "bse": 4, + "bfo": 5, + "bsecds": 6, + "mcx": 7, + "mcxsx": 8, + "indices": 9 + } + # Default connection timeout + CONNECT_TIMEOUT = 30 + # Default Reconnect max delay. + RECONNECT_MAX_DELAY = 60 + # Default reconnect attempts + RECONNECT_MAX_TRIES = 50 + + # ROOT_URI='wss://omnefeeds.angelbroking.com/NestHtml5Mobile/socket/stream' + ROOT_URI='wss://smartapisocket.angelbroking.com/websocket' + + # Flag to set if its first connect + _is_first_connect = True + + # Minimum delay which should be set between retries. User can't set less than this + _minimum_reconnect_max_delay = 5 + # Maximum number or retries user can set + _maximum_reconnect_max_tries = 300 + + feed_token=None + client_code=None + def __init__(self, JWT_TOKEN, CLIENT_CODE,API_KEY,debug=False, root=None,reconnect=True,reconnect_max_tries=RECONNECT_MAX_TRIES, reconnect_max_delay=RECONNECT_MAX_DELAY,connect_timeout=CONNECT_TIMEOUT): + + + self.root = root or self.ROOT_URI+"?jwttoken="+JWT_TOKEN+"&&clientcode="+CLIENT_CODE+"&&apikey="+API_KEY + self.jwt_token= JWT_TOKEN + self.client_code= CLIENT_CODE + self.api_key=API_KEY + + # Set max reconnect tries + if reconnect_max_tries > self._maximum_reconnect_max_tries: + log.warning("`reconnect_max_tries` can not be more than {val}. Setting to highest possible value - {val}.".format( + val=self._maximum_reconnect_max_tries)) + self.reconnect_max_tries = self._maximum_reconnect_max_tries + else: + self.reconnect_max_tries = reconnect_max_tries + + # Set max reconnect delay + if reconnect_max_delay < self._minimum_reconnect_max_delay: + log.warning("`reconnect_max_delay` can not be less than {val}. Setting to lowest possible value - {val}.".format( + val=self._minimum_reconnect_max_delay)) + self.reconnect_max_delay = self._minimum_reconnect_max_delay + else: + self.reconnect_max_delay = reconnect_max_delay + + self.connect_timeout = connect_timeout + + # Debug enables logs + self.debug = debug + + # Placeholders for callbacks. + self.on_ticks = None + self.on_open = None + self.on_close = None + self.on_error = None + self.on_connect = None + self.on_message = None + self.on_reconnect = None + self.on_noreconnect = None + + + def _create_connection(self, url, **kwargs): + """Create a WebSocket client connection.""" + self.factory = SmartSocketClientFactory(url, **kwargs) + + # Alias for current websocket connection + self.ws = self.factory.ws + + self.factory.debug = self.debug + + # Register private callbacks + self.factory.on_open = self._on_open + self.factory.on_error = self._on_error + self.factory.on_close = self._on_close + self.factory.on_message = self._on_message + self.factory.on_connect = self._on_connect + self.factory.on_reconnect = self._on_reconnect + self.factory.on_noreconnect = self._on_noreconnect + + + self.factory.maxDelay = self.reconnect_max_delay + self.factory.maxRetries = self.reconnect_max_tries + + def connect(self, threaded=False, disable_ssl_verification=False, proxy=None): + #print("Connect") + self._create_connection(self.ROOT_URI) + + context_factory = None + #print(self.factory.isSecure,disable_ssl_verification) + if self.factory.isSecure and not disable_ssl_verification: + context_factory = ssl.ClientContextFactory() + #print("context_factory",context_factory) + connectWS(self.factory, contextFactory=context_factory, timeout=30) + + # Run in seperate thread of blocking + opts = {} + + # Run when reactor is not running + if not reactor.running: + if threaded: + #print("inside threaded") + # Signals are not allowed in non main thread by twisted so suppress it. + opts["installSignalHandlers"] = False + self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts) + self.websocket_thread.daemon = True + self.websocket_thread.start() + else: + reactor.run(**opts) + + + def is_connected(self): + #print("Check if WebSocket connection is established.") + if self.ws and self.ws.state == self.ws.STATE_OPEN: + return True + else: + return False + + def _close(self, code=None, reason=None): + #print("Close the WebSocket connection.") + if self.ws: + self.ws.sendClose(code, reason) + + def close(self, code=None, reason=None): + """Close the WebSocket connection.""" + self.stop_retry() + self._close(code, reason) + + def stop(self): + """Stop the event loop. Should be used if main thread has to be closed in `on_close` method.""" + #print("stop") + + reactor.stop() + + def stop_retry(self): + """Stop auto retry when it is in progress.""" + if self.factory: + self.factory.stopTrying() + + def _on_reconnect(self, attempts_count): + if self.on_reconnect: + return self.on_reconnect(self, attempts_count) + + def _on_noreconnect(self): + if self.on_noreconnect: + return self.on_noreconnect(self) + + def websocket_connection(self): + # if self.client_code == None or self.feed_token == None: + # return "client_code or feed_token or task is missing" + + # request={"task":"cn","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code} + # self.ws.sendMessage( + # six.b(json.dumps(request)) + # ) + # print(request) + + threading.Thread(target=self.heartBeat,daemon=True).start() + + def send_request(self,actiontype,feedtype): + if actiontype in ("subscribe","unsubscribe "): + try: + request={"actiontype":actiontype,"feedtype":feedtype,"jwttoken":self.jwt_token,"clientcode":self.client_code,"apikey":self.api_key} + + self.ws.sendMessage( + six.b(json.dumps(request)) + ) + return True + # threading.Thread(target=self.heartBeat,daemon=True).start() + except Exception as e: + self._close(reason="Error while request sending: {}".format(str(e))) + raise + else: + print("The actiontype entered is invalid, Please enter correct task(subscribe,unsubscribe) ") + + def _on_connect(self, ws, response): + #print("-----_on_connect-------") + self.ws = ws + if self.on_connect: + + print(self.on_connect) + self.on_connect(self, response) + #self.websocket_connection + + def _on_close(self, ws, code, reason): + """Call `on_close` callback when connection is closed.""" + log.debug("Connection closed: {} - {}".format(code, str(reason))) + + if self.on_close: + self.on_close(self, code, reason) + + def _on_error(self, ws, code, reason): + """Call `on_error` callback when connection throws an error.""" + log.debug("Connection error: {} - {}".format(code, str(reason))) + + if self.on_error: + self.on_error(self, code, reason) + + + + def _on_message(self, ws, payload, is_binary): + """Call `on_message` callback when text message is received.""" + if self.on_message: + self.on_message(self, payload, is_binary) + + # If the message is binary, parse it and send it to the callback. + if self.on_ticks and is_binary and len(payload) > 4: + self.on_ticks(self, self._parse_binary(payload)) + + # Parse text messages + if not is_binary: + self._parse_text_message(payload) + + def _on_open(self, ws): + if not self._is_first_connect: + self.connect() + + self._is_first_connect = False + + if self.on_open: + return self.on_open(self) + + + def heartBeat(self): + while True: + try: + request={ + "actiontype":"heartbeat", + "feedtype":feedtype, + "jwttoken":self.jwt_token, + "clientcode":self.client_code, + "apikey":self.api_key + } + self.ws.sendMessage( + six.b(json.dumps(request)) + ) + + except: + print("HeartBeats Failed") + time.sleep(60) + + + def _parse_text_message(self, payload): + """Parse text message.""" + # Decode unicode data + if not six.PY2 and type(payload) == bytes: + payload = payload.decode("utf-8") + + data =base64.b64decode(payload) + + try: + data = bytes((zlib.decompress(data)).decode("utf-8"), 'utf-8') + data = json.loads(data.decode('utf8').replace("'", '"')) + data = json.loads(json.dumps(data, indent=4, sort_keys=True)) + except ValueError: + return + + self.on_ticks(self, data) + + def _parse_binary(self, bin): + """Parse binary data to a (list of) ticks structure.""" + packets = self._split_packets(bin) # split data to individual ticks packet + data = [] + + for packet in packets: + instrument_token = self._unpack_int(packet, 0, 4) + segment = instrument_token & 0xff # Retrive segment constant from instrument_token + + divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0 + + # All indices are not tradable + tradable = False if segment == self.EXCHANGE_MAP["indices"] else True + try: + last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48)) + except Exception: + last_trade_time = None + + try: + timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64)) + except Exception: + timestamp = None + + d["last_trade_time"] = last_trade_time + d["oi"] = self._unpack_int(packet, 48, 52) + d["oi_day_high"] = self._unpack_int(packet, 52, 56) + d["oi_day_low"] = self._unpack_int(packet, 56, 60) + d["timestamp"] = timestamp + + # Market depth entries. + depth = { + "buy": [], + "sell": [] + } + + # Compile the market depth lists. + for i, p in enumerate(range(64, len(packet), 12)): + depth["sell" if i >= 5 else "buy"].append({ + "quantity": self._unpack_int(packet, p, p + 4), + "price": self._unpack_int(packet, p + 4, p + 8) / divisor, + "orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H") + }) + + d["depth"] = depth + + data.append(d) + + return data + + def _unpack_int(self, bin, start, end, byte_format="I"): + """Unpack binary data as unsgined interger.""" + return struct.unpack(">" + byte_format, bin[start:end])[0] + + def _split_packets(self, bin): + """Split the data to individual packets of ticks.""" + # Ignore heartbeat data. + if len(bin) < 2: + return [] + + number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H") + packets = [] + + j = 2 + for i in range(number_of_packets): + packet_length = self._unpack_int(bin, j, j + 2, byte_format="H") + packets.append(bin[j + 2: j + 2 + packet_length]) + j = j + 2 + packet_length + + return packets + \ No newline at end of file diff --git a/SmartApi/socket1.py b/SmartApi/socket1.py deleted file mode 100644 index e540fdf..0000000 --- a/SmartApi/socket1.py +++ /dev/null @@ -1,32 +0,0 @@ -# import websocket -# import ssl -# import base64 -# ws = websocket.WebSocket() -# from websocket import create_connection -# FEED_TOKEN='125645827' -# CLIENT_CODE='S212741' -# ws= create_connection("wss://omnefeeds.angelbroking.com/NestHtml5Mobile/socket/stream",sslopt={"cert_reqs": ssl.CERT_NONE}) -# _req='{"task":"cn","channel":"","token":"' + FEED_TOKEN + '","user": "' + CLIENT_CODE + '","acctid":"' + CLIENT_CODE + '"}'; -# ws.send(_req) -# strwatchlistscrips = "nse_cm|2885&nse_cm|1594&nse_cm|11536"; -# _req = '{"task":"cn","channel":"'+strwatchlistscrips+'","token":"' + FEED_TOKEN + '","user": "' + CLIENT_CODE + '","acctid":"' + CLIENT_CODE + '"}'; -# ws.send(_req) -# print("Sent") -# print("Receiving...") -# result = ws.recv() -# print(result) -# #str = unicode(str, errors='ignore') -# print("Received '%s'" % result.decode(encoding="utf-8")) -# ws.close() - -import asyncio -import websockets - -async def message(): - async with websockets.connect("wss://omnefeeds.angelbroking.com/NestHtml5Mobile/socket/stream") as socket: - await socket.send({"task":"cn","channel":"","token":"' + FEED_TOKEN + '","user": "' + CLIENT_CODE + '","acctid":"' + CLIENT_CODE + '"}) - strwatchlistscrips = "nse_cm|2885&nse_cm|1594&nse_cm|11536" - await socket.send({"task":"cn","channel":strwatchlistscrips,"token":"' + FEED_TOKEN + '","user": "' + CLIENT_CODE + '","acctid":"' + CLIENT_CODE + '"}) - print(await socket.recv()) - -asyncio.get_event_loop().run_until_complete(message()) diff --git a/SmartApi/socketTP.py b/SmartApi/socketTP.py deleted file mode 100644 index 8149ab3..0000000 --- a/SmartApi/socketTP.py +++ /dev/null @@ -1,301 +0,0 @@ -import six -import sys -import time -import json -import struct -import logging -import threading -import base64 -from datetime import datetime -from twisted.internet import reactor, ssl -from twisted.python import log as twisted_log -from autobahn.twisted.websocket import WebSocketClientProtocol, \ - WebSocketClientFactory, connectWS - - -class SmartSocketClientProtocol(WebSocketClientProtocol): - def __init__(self, *args, **kwargs): - super(SmartSocketClientProtocol,self).__init__(*args,**kwargs) - - def onConnect(self, response): # noqa - """Called when WebSocket server connection was established""" - self.factory.ws = self - - if self.factory.on_connect: - self.factory.on_connect(self, response) - - def onOpen(self): - if self.factory.on_open: - self.factory.on_open(self) - - def onMessage(self, payload, is_binary): # noqa - print("""Called when text or binary message is received.""",payload,is_binary) - if self.factory.on_message: - self.factory.on_message(self, payload, is_binary) - - def onClose(self, was_clean, code, reason): # noqa - """Called when connection is closed.""" - if not was_clean: - if self.factory.on_error: - self.factory.on_error(self, code, reason) - - if self.factory.on_close: - self.factory.on_close(self, code, reason) - -class SmartSocketClientFactory(WebSocketClientFactory): - protocol = SmartSocketClientProtocol - - def __init__(self, *args, **kwargs): - """Initialize with default callback method values.""" - self.debug = False - self.ws = None - self.on_open = None - self.on_error = None - self.on_close = None - self.on_message = None - self.on_connect = None - - - super(SmartSocketClientFactory, self).__init__(*args, **kwargs) - - -class SmartSocket(object): - EXCHANGE_MAP = { - "nse": 1, - "nfo": 2, - "cds": 3, - "bse": 4, - "bfo": 5, - "bsecds": 6, - "mcx": 7, - "mcxsx": 8, - "indices": 9 - } - ROOT_URI='wss://omnefeeds.angelbroking.com/NestHtml5Mobile/socket/stream' - feed_token=None - client_code=None - def __init__(self, FEED_TOKEN, CLIENT_CODE, debug=False, root=None): - self.root = root or self.ROOT_URI - self.feed_token= FEED_TOKEN - self.client_code= CLIENT_CODE - - - # Debug enables logs - self.debug = debug - - # Placeholders for callbacks. - self.on_ticks = None - self.on_open = None - self.on_close = None - self.on_error = None - self.on_connect = None - self.on_message = None - - def _create_connection(self, url, **kwargs): - """Create a WebSocket client connection.""" - self.factory = SmartSocketClientFactory(url, **kwargs) - - # Alias for current websocket connection - self.ws = self.factory.ws - - self.factory.debug = self.debug - - # Register private callbacks - self.factory.on_open = self._on_open - self.factory.on_error = self._on_error - self.factory.on_close = self._on_close - self.factory.on_message = self._on_message - self.factory.on_connect = self._on_connect - - - def connect(self, threaded=False, disable_ssl_verification=False, proxy=None): - print("Connect") - self._create_connection(self.ROOT_URI) - - context_factory = None - print(self.factory.isSecure,disable_ssl_verification) - if self.factory.isSecure and not disable_ssl_verification: - context_factory = ssl.ClientContextFactory() - print("context_factory",context_factory) - connectWS(self.factory, contextFactory=context_factory, timeout=30) - - # Run in seperate thread of blocking - opts = {} - - # Run when reactor is not running - if not reactor.running: - if threaded: - print("inside threaded") - # Signals are not allowed in non main thread by twisted so suppress it. - opts["installSignalHandlers"] = False - self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts) - self.websocket_thread.daemon = True - self.websocket_thread.start() - else: - reactor.run(**opts) - - def is_connected(self): - print("Check if WebSocket connection is established.") - if self.ws and self.ws.state == self.ws.STATE_OPEN: - return True - else: - return False - def _close(self, code=None, reason=None): - print("Close the WebSocket connection.") - if self.ws: - self.ws.sendClose(code, reason) - - def close(self, code=None, reason=None): - """Close the WebSocket connection.""" - self._close(code, reason) - - def stop(self): - """Stop the event loop. Should be used if main thread has to be closed in `on_close` method. - Reconnection mechanism cannot happen past this method - """ - print("stop") - reactor.stop() - - def send_request(self,token): - print('Request Send') - strwatchlistscrips = "nse_cm|2885&nse_cm|1594&nse_cm|11536" #or token - #token_scripts=token //dynamic call - try: - print("Inside") - request={"task":"cn","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code} - self.ws.sendMessage( - six.b(json.dumps(request)) - ) - - request={"task":"cn","channel":strwatchlistscrips,"token":self.feed_token,"user":self.client_code,"acctid":self.client_code} - #request={"task":"cn","channel":token_scripts,"token":self.feed_token,"user":self.client_code,"acctid":self.client_code} //dynamic call - self.ws.sendMessage( - six.b(json.dumps(request)) - ) - - return True - except Exception as e: - self._close(reason="Error while request sending: {}".format(str(e))) - raise - - def _on_connect(self, ws, response): - self.ws = ws - if self.on_connect: - self.on_connect(self, response) - - def _on_close(self, ws, code, reason): - """Call `on_close` callback when connection is closed.""" - print("Connection closed: {} - {}".format(code, str(reason))) - - if self.on_close: - self.on_close(self, code, reason) - - def _on_error(self, ws, code, reason): - """Call `on_error` callback when connection throws an error.""" - print("Connection error: {} - {}".format(code, str(reason))) - - if self.on_error: - self.on_error(self, code, reason) - - def _on_message(self, ws, payload, is_binary): - """Call `on_message` callback when text message is received.""" - if self.on_message: - self.on_message(self, payload, is_binary) - - # If the message is binary, parse it and send it to the callback. - if self.on_ticks and is_binary and len(payload) > 4: - self.on_ticks(self, self._parse_binary(payload)) - - # Parse text messages - if not is_binary: - self._parse_text_message(payload) - - def _on_open(self, ws): - if self.on_open: - return self.on_open(self) - - def _parse_text_message(self, payload): - """Parse text message.""" - # Decode unicode data - if not six.PY2 and type(payload) == bytes: - payload = payload.decode("utf-8") - print("PAYLOAD",payload) - data =base64.b64decode(payload) - print("DATA",data.encode().decode()) - try: - data = json.loads(payload) - print("DATA",data) - except ValueError: - return - - def _parse_binary(self, bin): - print("""Parse binary data to a (list of) ticks structure.""") - packets = self._split_packets(bin) # split data to individual ticks packet - data = [] - - for packet in packets: - instrument_token = self._unpack_int(packet, 0, 4) - segment = instrument_token & 0xff # Retrive segment constant from instrument_token - - divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0 - - # All indices are not tradable - tradable = False if segment == self.EXCHANGE_MAP["indices"] else True - try: - last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48)) - except Exception: - last_trade_time = None - - try: - timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64)) - except Exception: - timestamp = None - - d["last_trade_time"] = last_trade_time - d["oi"] = self._unpack_int(packet, 48, 52) - d["oi_day_high"] = self._unpack_int(packet, 52, 56) - d["oi_day_low"] = self._unpack_int(packet, 56, 60) - d["timestamp"] = timestamp - - # Market depth entries. - depth = { - "buy": [], - "sell": [] - } - - # Compile the market depth lists. - for i, p in enumerate(range(64, len(packet), 12)): - depth["sell" if i >= 5 else "buy"].append({ - "quantity": self._unpack_int(packet, p, p + 4), - "price": self._unpack_int(packet, p + 4, p + 8) / divisor, - "orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H") - }) - - d["depth"] = depth - - data.append(d) - - return data - - def _unpack_int(self, bin, start, end, byte_format="I"): - """Unpack binary data as unsgined interger.""" - return struct.unpack(">" + byte_format, bin[start:end])[0] - - def _split_packets(self, bin): - """Split the data to individual packets of ticks.""" - # Ignore heartbeat data. - if len(bin) < 2: - return [] - - number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H") - packets = [] - - j = 2 - for i in range(number_of_packets): - packet_length = self._unpack_int(bin, j, j + 2, byte_format="H") - packets.append(bin[j + 2: j + 2 + packet_length]) - j = j + 2 + packet_length - - return packets - - diff --git a/SmartApi/version.py b/SmartApi/version.py index 6549f56..40c5528 100644 --- a/SmartApi/version.py +++ b/SmartApi/version.py @@ -2,11 +2,11 @@ __description__ = "Angel Broking openApi integration" __url__ = "https://www.angelbroking.com/" __download_url__ = "https://github.com/angelbroking-github/smartapi-python" -__version__ = "1.0.1" +__version__ = "1.2.3" __author__ = "ab-smartapi" __token__ = "ab-smartapi" __author_email__ = "smartapi.sdk@gmail.com" -__license__ = "MIT" + # [pypi] # username = __token__ diff --git a/SmartApi/webSocket.py b/SmartApi/webSocket.py new file mode 100644 index 0000000..b6077f7 --- /dev/null +++ b/SmartApi/webSocket.py @@ -0,0 +1,466 @@ + +import six +import sys +import time +import json +import struct +import logging +import threading +import base64 +import zlib +from datetime import datetime +from twisted.internet import reactor, ssl +from twisted.python import log as twisted_log +from twisted.internet.protocol import ReconnectingClientFactory +from autobahn.twisted.websocket import WebSocketClientProtocol, \ + WebSocketClientFactory, connectWS + +log = logging.getLogger(__name__) + +class SmartSocketClientProtocol(WebSocketClientProtocol): + PING_INTERVAL = 2.5 + KEEPALIVE_INTERVAL = 5 + + _ping_message = "" + _next_ping = None + _next_pong_check = None + _last_pong_time = None + _last_ping_time = None + + def __init__(self, *args, **kwargs): + super(SmartSocketClientProtocol,self).__init__(*args,**kwargs) + + def onConnect(self, response): # noqa + """Called when WebSocket server connection was established""" + self.factory.ws = self + + if self.factory.on_connect: + self.factory.on_connect(self, response) + + def onOpen(self): + # send ping + self._loop_ping() + # init last pong check after X seconds + self._loop_pong_check() + + if self.factory.on_open: + self.factory.on_open(self) + + + + def onMessage(self, payload, is_binary): # noqa + """Called when text or binary message is received.""" + if self.factory.on_message: + self.factory.on_message(self, payload, is_binary) + + + def onClose(self, was_clean, code, reason): # noqa + """Called when connection is closed.""" + if not was_clean: + if self.factory.on_error: + self.factory.on_error(self, code, reason) + + if self.factory.on_close: + self.factory.on_close(self, code, reason) + + +class SmartSocketClientFactory(WebSocketClientFactory,ReconnectingClientFactory): + protocol = SmartSocketClientProtocol + + maxDelay = 5 + maxRetries = 10 + + _last_connection_time = None + + maxDelay = 5 + maxRetries = 10 + + _last_connection_time = None + + def __init__(self, *args, **kwargs): + """Initialize with default callback method values.""" + self.debug = False + self.ws = None + self.on_open = None + self.on_error = None + self.on_close = None + self.on_message = None + self.on_connect = None + self.on_reconnect = None + self.on_noreconnect = None + + + super(SmartSocketClientFactory, self).__init__(*args, **kwargs) + + def startedConnecting(self, connector): # noqa + """On connecting start or reconnection.""" + if not self._last_connection_time and self.debug: + log.debug("Start WebSocket connection.") + + self._last_connection_time = time.time() + + def clientConnectionFailed(self, connector, reason): # noqa + """On connection failure (When connect request fails)""" + if self.retries > 0: + log.error("Retrying connection. Retry attempt count: {}. Next retry in around: {} seconds".format(self.retries, int(round(self.delay)))) + + # on reconnect callback + if self.on_reconnect: + self.on_reconnect(self.retries) + + # Retry the connection + self.retry(connector) + self.send_noreconnect() + + def clientConnectionLost(self, connector, reason): # noqa + """On connection lost (When ongoing connection got disconnected).""" + if self.retries > 0: + # on reconnect callback + if self.on_reconnect: + self.on_reconnect(self.retries) + + # Retry the connection + self.retry(connector) + self.send_noreconnect() + + def send_noreconnect(self): + """Callback `no_reconnect` if max retries are exhausted.""" + if self.maxRetries is not None and (self.retries > self.maxRetries): + if self.debug: + log.debug("Maximum retries ({}) exhausted.".format(self.maxRetries)) + + if self.on_noreconnect: + self.on_noreconnect() + +class WebSocket(object): + EXCHANGE_MAP = { + "nse": 1, + "nfo": 2, + "cds": 3, + "bse": 4, + "bfo": 5, + "bsecds": 6, + "mcx": 7, + "mcxsx": 8, + "indices": 9 + } + # Default connection timeout + CONNECT_TIMEOUT = 30 + # Default Reconnect max delay. + RECONNECT_MAX_DELAY = 60 + # Default reconnect attempts + RECONNECT_MAX_TRIES = 50 + + ROOT_URI='wss://omnefeeds.angelbroking.com/NestHtml5Mobile/socket/stream' + + # Flag to set if its first connect + _is_first_connect = True + + # Minimum delay which should be set between retries. User can't set less than this + _minimum_reconnect_max_delay = 5 + # Maximum number or retries user can set + _maximum_reconnect_max_tries = 300 + + feed_token=None + client_code=None + def __init__(self, FEED_TOKEN, CLIENT_CODE,debug=False, root=None,reconnect=True,reconnect_max_tries=RECONNECT_MAX_TRIES, reconnect_max_delay=RECONNECT_MAX_DELAY,connect_timeout=CONNECT_TIMEOUT): + + + self.root = root or self.ROOT_URI + self.feed_token= FEED_TOKEN + self.client_code= CLIENT_CODE + self.task=task + + # Set max reconnect tries + if reconnect_max_tries > self._maximum_reconnect_max_tries: + log.warning("`reconnect_max_tries` can not be more than {val}. Setting to highest possible value - {val}.".format( + val=self._maximum_reconnect_max_tries)) + self.reconnect_max_tries = self._maximum_reconnect_max_tries + else: + self.reconnect_max_tries = reconnect_max_tries + + # Set max reconnect delay + if reconnect_max_delay < self._minimum_reconnect_max_delay: + log.warning("`reconnect_max_delay` can not be less than {val}. Setting to lowest possible value - {val}.".format( + val=self._minimum_reconnect_max_delay)) + self.reconnect_max_delay = self._minimum_reconnect_max_delay + else: + self.reconnect_max_delay = reconnect_max_delay + + self.connect_timeout = connect_timeout + + # Debug enables logs + self.debug = debug + + # Placeholders for callbacks. + self.on_ticks = None + self.on_open = None + self.on_close = None + self.on_error = None + self.on_connect = None + self.on_message = None + self.on_reconnect = None + self.on_noreconnect = None + + def _create_connection(self, url, **kwargs): + """Create a WebSocket client connection.""" + self.factory = SmartSocketClientFactory(url, **kwargs) + + # Alias for current websocket connection + self.ws = self.factory.ws + + self.factory.debug = self.debug + + # Register private callbacks + self.factory.on_open = self._on_open + self.factory.on_error = self._on_error + self.factory.on_close = self._on_close + self.factory.on_message = self._on_message + self.factory.on_connect = self._on_connect + self.factory.on_reconnect = self._on_reconnect + self.factory.on_noreconnect = self._on_noreconnect + + + self.factory.maxDelay = self.reconnect_max_delay + self.factory.maxRetries = self.reconnect_max_tries + + def connect(self, threaded=False, disable_ssl_verification=False, proxy=None): + #print("Connect") + self._create_connection(self.ROOT_URI) + + context_factory = None + #print(self.factory.isSecure,disable_ssl_verification) + if self.factory.isSecure and not disable_ssl_verification: + context_factory = ssl.ClientContextFactory() + #print("context_factory",context_factory) + connectWS(self.factory, contextFactory=context_factory, timeout=30) + + # Run in seperate thread of blocking + opts = {} + + # Run when reactor is not running + if not reactor.running: + if threaded: + #print("inside threaded") + # Signals are not allowed in non main thread by twisted so suppress it. + opts["installSignalHandlers"] = False + self.websocket_thread = threading.Thread(target=reactor.run, kwargs=opts) + self.websocket_thread.daemon = True + self.websocket_thread.start() + else: + reactor.run(**opts) + + + def is_connected(self): + #print("Check if WebSocket connection is established.") + if self.ws and self.ws.state == self.ws.STATE_OPEN: + return True + else: + return False + + def _close(self, code=None, reason=None): + #print("Close the WebSocket connection.") + if self.ws: + self.ws.sendClose(code, reason) + + def close(self, code=None, reason=None): + """Close the WebSocket connection.""" + self.stop_retry() + self._close(code, reason) + + def stop(self): + """Stop the event loop. Should be used if main thread has to be closed in `on_close` method.""" + #print("stop") + + reactor.stop() + + def stop_retry(self): + """Stop auto retry when it is in progress.""" + if self.factory: + self.factory.stopTrying() + + def _on_reconnect(self, attempts_count): + if self.on_reconnect: + return self.on_reconnect(self, attempts_count) + + def _on_noreconnect(self): + if self.on_noreconnect: + return self.on_noreconnect(self) + + def websocket_connection(self): + if self.client_code == None or self.feed_token == None: + return "client_code or feed_token or task is missing" + + request={"task":"cn","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code} + self.ws.sendMessage( + six.b(json.dumps(request)) + ) + print(request) + + threading.Thread(target=self.heartBeat,daemon=True).start() + + def send_request(self,token,task): + if task in ("mw","sfi","dp"): + strwatchlistscrips = token #dynamic call + + try: + request={"task":task,"channel":strwatchlistscrips,"token":self.feed_token,"user":self.client_code,"acctid":self.client_code} + + self.ws.sendMessage( + six.b(json.dumps(request)) + ) + return True + except Exception as e: + self._close(reason="Error while request sending: {}".format(str(e))) + raise + else: + print("The task entered is invalid, Please enter correct task(mw,sfi,dp) ") + + def _on_connect(self, ws, response): + #print("-----_on_connect-------") + self.ws = ws + if self.on_connect: + + print(self.on_connect) + self.on_connect(self, response) + #self.websocket_connection + + def _on_close(self, ws, code, reason): + """Call `on_close` callback when connection is closed.""" + log.debug("Connection closed: {} - {}".format(code, str(reason))) + + if self.on_close: + self.on_close(self, code, reason) + + def _on_error(self, ws, code, reason): + """Call `on_error` callback when connection throws an error.""" + log.debug("Connection error: {} - {}".format(code, str(reason))) + + if self.on_error: + self.on_error(self, code, reason) + + + + def _on_message(self, ws, payload, is_binary): + """Call `on_message` callback when text message is received.""" + if self.on_message: + self.on_message(self, payload, is_binary) + + # If the message is binary, parse it and send it to the callback. + if self.on_ticks and is_binary and len(payload) > 4: + self.on_ticks(self, self._parse_binary(payload)) + + # Parse text messages + if not is_binary: + self._parse_text_message(payload) + + def _on_open(self, ws): + if not self._is_first_connect: + self.connect() + + self._is_first_connect = False + + if self.on_open: + return self.on_open(self) + + + def heartBeat(self): + while True: + try: + request={"task":"hb","channel":"","token":self.feed_token,"user":self.client_code,"acctid":self.client_code} + self.ws.sendMessage( + six.b(json.dumps(request)) + ) + + except: + print("HeartBeats Failed") + time.sleep(60) + + + def _parse_text_message(self, payload): + """Parse text message.""" + # Decode unicode data + if not six.PY2 and type(payload) == bytes: + payload = payload.decode("utf-8") + + data =base64.b64decode(payload) + + try: + data = bytes((zlib.decompress(data)).decode("utf-8"), 'utf-8') + data = json.loads(data.decode('utf8').replace("'", '"')) + data = json.loads(json.dumps(data, indent=4, sort_keys=True)) + except ValueError: + return + + self.on_ticks(self, data) + + def _parse_binary(self, bin): + """Parse binary data to a (list of) ticks structure.""" + packets = self._split_packets(bin) # split data to individual ticks packet + data = [] + + for packet in packets: + instrument_token = self._unpack_int(packet, 0, 4) + segment = instrument_token & 0xff # Retrive segment constant from instrument_token + + divisor = 10000000.0 if segment == self.EXCHANGE_MAP["cds"] else 100.0 + + # All indices are not tradable + tradable = False if segment == self.EXCHANGE_MAP["indices"] else True + try: + last_trade_time = datetime.fromtimestamp(self._unpack_int(packet, 44, 48)) + except Exception: + last_trade_time = None + + try: + timestamp = datetime.fromtimestamp(self._unpack_int(packet, 60, 64)) + except Exception: + timestamp = None + + d["last_trade_time"] = last_trade_time + d["oi"] = self._unpack_int(packet, 48, 52) + d["oi_day_high"] = self._unpack_int(packet, 52, 56) + d["oi_day_low"] = self._unpack_int(packet, 56, 60) + d["timestamp"] = timestamp + + # Market depth entries. + depth = { + "buy": [], + "sell": [] + } + + # Compile the market depth lists. + for i, p in enumerate(range(64, len(packet), 12)): + depth["sell" if i >= 5 else "buy"].append({ + "quantity": self._unpack_int(packet, p, p + 4), + "price": self._unpack_int(packet, p + 4, p + 8) / divisor, + "orders": self._unpack_int(packet, p + 8, p + 10, byte_format="H") + }) + + d["depth"] = depth + + data.append(d) + + return data + + def _unpack_int(self, bin, start, end, byte_format="I"): + """Unpack binary data as unsgined interger.""" + return struct.unpack(">" + byte_format, bin[start:end])[0] + + def _split_packets(self, bin): + """Split the data to individual packets of ticks.""" + # Ignore heartbeat data. + if len(bin) < 2: + return [] + + number_of_packets = self._unpack_int(bin, 0, 2, byte_format="H") + packets = [] + + j = 2 + for i in range(number_of_packets): + packet_length = self._unpack_int(bin, j, j + 2, byte_format="H") + packets.append(bin[j + 2: j + 2 + packet_length]) + j = j + 2 + packet_length + + return packets + \ No newline at end of file diff --git a/example/sample.py b/example/sample.py index eb9f1fb..7764fd6 100644 --- a/example/sample.py +++ b/example/sample.py @@ -1,10 +1,10 @@ # package import statement -from smartapi.smartConnect import SmartConnect +from smartapi.smartConnect import SmartConnect #from smartapi import SmartConnect obj=SmartConnect() #login api call -data = obj.generateSession('D88311','Angel@444') +data = obj.generateSession('Your Client Id','Password') refreshToken= data['data']['refreshToken'] #fetch User Profile @@ -33,7 +33,7 @@ #logout try: - logout=obj.terminateSession('D88311') + logout=obj.terminateSession('Your Client Id') print("Logout Successfull") except Exception as e: print("Logout failed: {}".format(e.message)) \ No newline at end of file diff --git a/requirements_dev.txt b/requirements_dev.txt index c1693f2..84c6643 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -1,2 +1,2 @@ requests>=2.24.0 -twine>=1.13.0 \ No newline at end of file +twine>=1.13.0 diff --git a/setup.py b/setup.py index 75ec875..c737358 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ setup( name="smartapi-python", - version="1.0.1", + version="1.2.3", author="ab-smartapi", author_email="smartapi.sdk@gmail.com", description="Angel Broking openApi integration", @@ -28,7 +28,6 @@ "Intended Audience :: Financial and Insurance Industry", "Programming Language :: Python", "Natural Language :: English", - "License :: OSI Approved :: MIT License", "Programming Language :: Python :: 2.7", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6", diff --git a/test/test.py b/test/test.py index 1ce0ae0..2f6f8c1 100644 --- a/test/test.py +++ b/test/test.py @@ -1,86 +1,134 @@ -import sys -import os - -dir = os.getcwd() -print(dir) -#paths=dir.split("\") - -sys.path.append(dir + "\SmartApi") -print(sys.path) - -# import smartConnect - -# smartApi = smartConnect.SmartConnect() - -# login = smartApi.generateSession('D88311', 'Angel@444') -# print(login) -# refreshToken = login['data']['refreshToken'] -# smartApi.getProfile(refreshToken) -# smartApi.generateToken(refreshToken) -# orderparams = { -# "variety": "NORMAL", -# "tradingsymbol": "SBIN-EQ", -# "symboltoken": "3045", -# "transactiontype": "BUY", -# "exchange": "NSE", -# "ordertype": "LIMIT", -# "producttype": "INTRADAY", -# "duration": "DAY", -# "price": "19500", -# "squareoff": "0", -# "stoploss": "0", -# "quantity": "1" -# } -# orderid = smartApi.placeOrder(orderparams) - -# modifyparams = { -# "variety": "NORMAL", -# "orderid": orderid, -# "ordertype": "LIMIT", -# "producttype": "INTRADAY", -# "duration": "DAY", -# "price": "19500", -# "quantity": "1", -# "tradingsymbol":"SBIN-EQ", -# "symboltoken":"3045", -# "exchange":"NSE" -# } -# smartApi.modifyOrder(modifyparams) - -# smartApi.cancelOrder(orderid, "NORMAL") - -# smartApi.orderBook() -# smartApi.tradeBook() -# smartApi.rmsLimit() -# smartApi.position() -# smartApi.holding() -# exchange = "NSE" -# tradingsymbol = "SBIN-EQ" -# symboltoken = 3045 -# smartApi.ltpData("NSE", "SBIN-EQ", "3045") -# params={ -# "exchange": "NSE", -# "oldproducttype":"DELIVERY", -# "newproducttype": "MARGIN", -# "tradingsymbol": "SBIN-EQ", -# "transactiontype":"BUY", -# "quantity":1, -# "type":"DAY" - -# } - -# smartApi.convertPosition(params) -# smartApi.terminateSession('D88311') - -from socketTP import SmartSocket -FEED_TOKEN='2017967114' -CLIENT_CODE='S212741' +from smartapi import SmartConnect + +#---------for smartExceptions--------- +#import smartapi.smartExceptions +#or +#from smartapi import smartExceptions + +smartApi =SmartConnect(api_key="Your Api Key") + +login = smartApi.generateSession('Your Client Id', 'Your Password') + +refreshToken = login['data']['refreshToken'] + +feedToken = smartApi.getfeedToken() + +smartApi.getProfile(refreshToken) + +smartApi.generateToken(refreshToken) + +orderparams = { + "variety": "NORMAL", + "tradingsymbol": "SBIN-EQ", + "symboltoken": "3045", + "transactiontype": "BUY", + "exchange": "NSE", + "ordertype": "LIMIT", + "producttype": "INTRADAY", + "duration": "DAY", + "price": "19500", + "squareoff": "0", + "stoploss": "0", + "quantity": "1" +} +orderid = smartApi.placeOrder(orderparams) + +modifyparams = { + "variety": "NORMAL", + "orderid": orderid, + "ordertype": "LIMIT", + "producttype": "INTRADAY", + "duration": "DAY", + "price": "19500", + "quantity": "1", + "tradingsymbol":"SBIN-EQ", + "symboltoken":"3045", + "exchange":"NSE" +} +smartApi.modifyOrder(modifyparams) + +smartApi.cancelOrder(orderid, "NORMAL") + +smartApi.orderBook() + +smartApi.tradeBook() + +smartApi.rmsLimit() + +smartApi.position() + +smartApi.holding() + +exchange = "NSE" +tradingsymbol = "SBIN-EQ" +symboltoken = 3045 +smartApi.ltpData("NSE", "SBIN-EQ", "3045") + +params={ + "exchange": "NSE", + "oldproducttype":"DELIVERY", + "newproducttype": "MARGIN", + "tradingsymbol": "SBIN-EQ", + "transactiontype":"BUY", + "quantity":1, + "type":"DAY" + +} + +smartApi.convertPosition(params) +gttCreateParams={ + "tradingsymbol" : "SBIN-EQ", + "symboltoken" : "3045", + "exchange" : "NSE", + "producttype" : "MARGIN", + "transactiontype" : "BUY", + "price" : 100000, + "qty" : 10, + "disclosedqty": 10, + "triggerprice" : 200000, + "timeperiod" : 365 + } +rule_id=smartApi.gttCreateRule(gttCreateParams) + +gttModifyParams={ + "id": rule_id, + "symboltoken":"3045", + "exchange":"NSE", + "price":19500, + "quantity":10, + "triggerprice":200000, + "disclosedqty":10, + "timeperiod":365 + } +modified_id=smartApi.gttModifyRule(gttModifyParams) + +cancelParams={ + "id": rule_id, + "symboltoken":"3045", + "exchange":"NSE" + } + +cancelled_id=smartApi.gttCancelRule(cancelParams) + +smartApi.gttDetails(rule_id) + +smartApi.gttLists('List of status',,) + +smartApi.terminateSession('Your Client Id') + +## Websocket Programming + +from smartapi import WebSocket +FEED_TOKEN=feedToken +CLIENT_CODE="Your Client Id" token=None -ss = SmartSocket(FEED_TOKEN, CLIENT_CODE) +task=None +ss = WebSocket(FEED_TOKEN, CLIENT_CODE) def on_tick(ws, tick): print("Ticks: {}".format(tick)) + def on_connect(ws, response): - ws.send_request(token) + ws.send_request(token,task) def on_close(ws, code, reason): ws.stop() @@ -90,5 +138,4 @@ def on_close(ws, code, reason): ss.on_connect = on_connect ss.on_close = on_close -ss.connect() - +ss.connect( ) \ No newline at end of file