#pylint: disable=no-member
import snap7
import snap7.client
from snap7.snap7types import *
from snap7.util import *
import sqlite3
from sqlite3 import Error
from datetime import datetime as DateTime
class DBObject(object):
pass
offsets = {
"Bool" : 2,
"Int" : 2,
"Real" : 4,
"DWord" : 4,
"String": 256
}
db=\
"""
Count\tDWord\t0.0
Temperature\tReal\t4.0
DistanceOverAll\tReal\t8.0
CyclesOverAll\tDWord\t12.0
MotorCurrent\tReal\t16.0
DB_String\tString\t20.0
"""
def DBRead(plc,db_num,length,dbitems):
data = plc.read_area(areas['DB'],db_num,0,length)
obj = DBObject()
for item in dbitems:
value = None
offset = int(item['bytebit'].split('.')[0])
if item['datatype']=='Real':
value = get_real(data,offset)
if item['datatype']=='Bool':
bit =int(item['bytebit'].split('.')[1])
value = get_bool(data,offset,bit)
if item['datatype']=='DWord':
value = get_dword(data, offset)
if item['datatype']=='String':
value = get_string(data, offset)
obj.__setattr__(item['name'], value)
return obj
#function changed 18.06.2024, old function don't worked correctly
def get_db_size(array,bytekey,datatypekey):
seq,length = [x[bytekey] for x in array],[x[datatypekey] for x in array]
lastByte = 0
for idx in length:
lastByte = lastByte + int(offsets.get(idx))
return lastByte
###########################################################
# sql database
###########################################################
def create_connection(db_file):
""" create a database connection to a SQLite database
specified by db_file
:param db_file: database file
:return: Connection object or None
"""
conn = None
try:
conn = sqlite3.connect(db_file, check_same_thread=False)
return conn
except Error as e:
print('sqlite3 error: ', e)
return conn
def create_table(conn, create_table_sql):
""" create a table from the create_table_sql statement
:param conn: Connection object
:param create_table_sql: a CREATE TABLE statement
:return:
"""
try:
c = conn.cursor()
c.execute(create_table_sql)
except Error as e:
print('table create error: ', e)
database = r"pythonsqlite_Vertikalachse.db"
def create_database(database):
sql_create_projects_table = """ CREATE TABLE IF NOT EXISTS projects (
id integer PRIMARY KEY,
name text NOT NULL,
time text,
Count integer,
Temperature real,
DistanceOverAll real,
CyclesOverAll integer,
MotorCurrent real,
DB_String text
); """
# create a database connection
conn = create_connection(database)
# create tables
if conn is not None:
# create projects table
create_table(conn, sql_create_projects_table)
conn.close()
else:
print("Error! cannot create the database connection.")
def create_project(conn, project):
"""
Create a new project into the projects table
:param conn:
:param project:
:return: project id
"""
sql = ''' INSERT INTO projects(name,time,Count,Temperature,DistanceOverAll,MotorCurrent,CyclesOverAll,DB_String)
VALUES(?,?,?,?,?,?,?,?) '''
cur = conn.cursor()
cur.execute(sql, project)
conn.commit()
return cur.lastrowid
###########################################################
# main routine
###########################################################
if __name__ == "__main__":
plc = snap7.client.Client()
IPAdress = input("Input IP-Adress and press enter to start operations...")
if IPAdress == '':
IPAdress = "10.204.5.11"
print('connect to ' + IPAdress)
plc.connect(IPAdress,0,0)
itemlist = filter(lambda a: a!='',db.split('\n'))
deliminator='\t'
items = [
{
"name":x.split(deliminator)[0],
"datatype":x.split(deliminator)[1],
"bytebit":x.split(deliminator)[2]
} for x in itemlist
]
#get length of datablock
DBlength = get_db_size(items,'bytebit','datatype')
#main loop
create_database(database)
conn = create_connection(database)
if conn is not None:
print('conn opened: ', format(DateTime.now(), '%Y-%m-%d %H:%M:%S'))
cursor = conn.cursor()
OldCount = 0
Start = True
i = 1
while Start:
meh = DBRead(plc,30,DBlength,items)
if meh.Count > OldCount:
print('Abfrage Nr\t:', i)
print('Count\t\t:', meh.Count)
print('Temperature\t:', "{:.1f}".format(meh.Temperature))
print('DistanceOverAll\t:', "{:.1f}".format(meh.DistanceOverAll))
print('CyclesOverAll\t:', meh.CyclesOverAll)
print('MotorCurrent\t:', "{:.3f}".format(meh.MotorCurrent))
print('DB_String\t:\t', meh.DB_String)
OldCount = meh.Count
i = i + 1
project = (i, format(DateTime.now(), '%Y-%m-%d %H:%M:%S'), meh.Count, "{:10.1f}".format(meh.Temperature), "{:10.1f}".format(meh.DistanceOverAll), meh.CyclesOverAll, "{:2.3f}".format(meh.MotorCurrent), meh.DB_String)
project_id = create_project(conn, project)
conn.close()
print('conn closed: ', format(DateTime.now(), '%Y-%m-%d %H:%M:%S'))
plc.disconnect()