"""
AMS modul to form a ADS packet for a Beckhoff PLC.
"""
import struct
import array
import re
cmd_list = {'Read Device Info':1, 'Read':2, 'Write':3, 'Read State':4, \
'Write Control':5, 'Add D evice Notification':6, \
'Delete Device Notification':7, 'Device Notification':8, \
'Read Write':9}
ads_states = ['Idle', 'Reset', 'Init', 'Start', 'Run', 'Stop', 'Save CFG', \
'Load CFG', 'Power failure', 'Power good', 'Error', 'Shutdown', 'Suspend', \
'Resume', 'Config', 'Reconfig']
class amsError(Exception):
"""Base class for exceptions"""
def __init__(self, value=''):
self.value = value
def __str__(self):
return repr(self.value)
class amsID(object):
reexpr = re.compile('(\d+)\.(\d+)\.(\d+)\.(\d+)\.(\d+)\.(\d+)')
def __init__(self, id):
"""
Construct a list of bytes values from a string id
"""
# Test format
matches = amsID.reexpr.match(id)
if not matches:
raise amsError('Malformed amsID')
self.amsID = id
self.amsIDlist = [int(x) for x in matches.groups()]
self.amsIDbin = self._makeBinary()
def _makeBinary(self):
buff = array.array('c',6*'\0')
for p, byte in enumerate(self.amsIDlist):
struct.pack_into("!B", buff, p, byte)
return buff
class amsPacket:
"""
Class to format a packet for the AMS protocol
dID : AMS addr for the dest
dPort : AMS port for the dest
sID : AMS addr for the source
sPort : AMS port for the source
cmdManager : reference to class ADScmdHandler for
for processing received data and format
cmd specific data
The method getPacket is delivering the data for the socket.send function.
The method processResponse processes the data received by socket.recv() and
deliver a dict() with the command data.
"""
amsHeaderSize = 6+32
def __init__(self, dID, dPort, sID, sPort, cmd, idcode = 0):
self.dID = amsID(dID)
self.sID = amsID(sID)
if not (dPort>0 and sPort>0):
raise amsError('Portnumber must be integer and greater than 0')
self.sPort = sPort
self.dPort = dPort
# Reserve buffer
try:
self.buffer = array.array('c', (len(cmd.cmddata)+6+32)*'\0')
self._insertBytes(cmd.cmddata, 6+32)
except (TypeError, AttributeError):
self.buffer = array.array('c', (6+32)*'\0')
self.cmd = cmd
self.idcode = idcode
# build packet
struct.pack_into('<H', self.buffer, 0, 0) # 2 bytes with 0
try:
struct.pack_into('<L', self.buffer, 2, 32+len(cmd.cmddata)) # length of AMS header + data
struct.pack_into('<L', self.buffer, 6+20, len(cmd.cmddata))
except (TypeError, AttributeError):
struct.pack_into('<L', self.buffer, 2, 32) # length of AMS header + data
struct.pack_into('<L', self.buffer, 6+20, 0)
self._insertBytes(self.dID.amsIDbin, 6+0) # insert amsid for destination
struct.pack_into('<H', self.buffer, 6+6, self.dPort) # insert port for destination
self._insertBytes(self.sID.amsIDbin, 6+8) # insert amsid for source
struct.pack_into('<H', self.buffer, 6+14, self.dPort) # insert port for source
struct.pack_into('<H', self.buffer, 6+16, self.cmd.cmd) # insert cmd
struct.pack_into('<H', self.buffer, 6+18, 0x04) # state = send cmd and request
struct.pack_into('<L', self.buffer, 6+24, 0) # Error = 0
struct.pack_into('<L', self.buffer, 6+28, self.idcode) # Invoke ID
def _insertBytes(self, bytes, offset):
"""
Insert bytes into the buffer by overwriting old values.
"""
for idx in range(0, len(bytes)):
self.buffer[idx+offset] = bytes[idx]
def _printPacket(self):
for i, b in enumerate(self.buffer):
if not i%8:
print '\n{0:02d}:'.format(i),
print '{0:02X}'.format(ord(b)),
print
def processResponse(self, data):
"""
Process the response of the PLC
"""
return self.cmd.process(data)
def changeCMDData(self, data):
"""
For a wrote command, the data must modified after initializing
"""
self.cmd.changeData(data)
self._insertBytes(self.cmd.cmddata, 6+32)
def getPacket(self):
"""
Get packet data as string
"""
return self.buffer.tostring()
class ADSCmdHandler(object):
def __init__(self):
self.cmd = 0
def process(self, data):
self.respdata = array.array('c', data[amsPacket.amsHeaderSize:])
def _printData(self, data):
for i, b in enumerate(data):
if not i%8:
print '\n{0:02d}:'.format(i),
print '{0:02X}'.format(ord(b)),
print
class ADSCmdReadDeviceInfo(ADSCmdHandler):
"""
Get the response of Read Device Info and decompose the data
"""
def __init__(self):
ADSCmdHandler.__init__(self)
self.cmd = cmd_list['Read Device Info']
def process(self, data):
ADSCmdHandler.process(self, data)
t = struct.unpack_from('<LBBH', self.respdata)
self.result = t[0]
self.major_version = t[1]
self.minor_version = t[2]
self.version_build = t[3]
self.name = ''
for c in self.respdata[8:]:
if ord(c)>0:
self.name += c
ret = {'ADSstatus':self.result, 'major_version': self.major_version, 'minor_version': self.minor_version, \
'version_build':self.version_build, 'name':self.name}
return ret
class ADSCmdRead(ADSCmdHandler):
"""
Store data from a PLC with group and offset
"""
def __init__(self, group, offset, varlist):
"""
varlist is a list of tuples, each with 2 values, the name
of the variable and the format code from unpack.
"""
ADSCmdHandler.__init__(self)
self.cmd = cmd_list['Read']
self.varlist = varlist
self.types ='<'
for t, type in self.varlist:
self.types += type
self.length = struct.calcsize(self.types)+12
self.cmddata = array.array('c', self.length*'\0')
struct.pack_into('<L', self.cmddata, 0, group)
struct.pack_into('<L', self.cmddata, 4, offset)
struct.pack_into('<L', self.cmddata, 8, self.length)
def process(self, data):
ADSCmdHandler.process(self, data)
t = struct.unpack_from('<LL', self.respdata)
self.ADSstatus = t[0]
self.length = t[1]
ret = {'ADSstatus':self.ADSstatus}
if self.ADSstatus==0:
list = struct.unpack_from(self.types, self.respdata, 8)
for (var, t), val in zip(self.varlist, list):
ret[var] = val
return ret
class ADSCmdWrite(ADSCmdHandler):
"""
Write data from to a PLC with group and offset
"""
def __init__(self, group, offset, varlist):
ADSCmdHandler.__init__(self)
self.cmd = cmd_list['Write']
self.types ='<'
self.variables = {}
for var, type in varlist:
self.variables[var] = (type, struct.calcsize(self.types))
self.types += type
self.length = struct.calcsize(self.types)
self.cmddata = array.array('c', (self.length+12)*'\0')
struct.pack_into('<L', self.cmddata, 0, group)
struct.pack_into('<L', self.cmddata, 4, offset)
struct.pack_into('<L', self.cmddata, 8, self.length)
def changeData(self, data):
"""
data is a list of tuples, each with 2 values, the name
of the variable and the value
"""
for var, val in data:
struct.pack_into('<'+self.variables[var][0], self.cmddata, self.variables[var][1]+12, val)
def process(self, data):
ADSCmdHandler.process(self, data)
t = struct.unpack_from('<L', self.respdata)
self.ADSstatus = t[0]
ret = {'ADSstatus':self.ADSstatus}
return ret
class ADSCmdReadState(ADSCmdHandler):
"""
Get the response of ReadState and decompose the data
"""
def __init__(self):
ADSCmdHandler.__init__(self)
self.cmd = cmd_list['Read State']
def process(self, data):
ADSCmdHandler.process(self, data)
t = struct.unpack_from('<LHH', self.respdata)
ret = {'ADSstatus':t[0], 'ADSstate': t[1], 'DeviceState': t[2]}
return ret
if __name__ == '__main__':
# Test a well formed id
aid = amsID('1.2.3.4.1.2')
try:
aid = amsID('1.1.1.1.1x2')
except amsError as e:
print 'Malformed amsID detected {0}'.format(e)
packet = amsPacket('1.1.1.1.1.1', 801, '10.11.12.13.1.2', 801, cmd_list['Read Device Info'], '',1)
print packet.buffer.tolist()