"""
Test Code for ESP32-S3-ETH-8DI-8RO-C
https://www.waveshare.com/wiki/ESP32-S3-ETH-8DI-8RO-C#Inteface_Description
This works:
- WLAN
- LAN
- BT LE
- WS2812B LED
- Inputs
- Outputs
- Buzzer
- CAN
Not tested yet:
- RTC
"""
import asyncio
import time
import os
import vfs
from aioespnow import AIOESPNow
from machine import Pin, PWM, Signal, I2C, SPI, SDCard, Encoder
from network import LAN, PHY_W5500
from neopixel import NeoPixel
# https://github.com/lewisxhe/PCF8563_PythonLibrary/blob/master/pcf8563.py
from pcf8563 import PCF8563 as PCF
from bt import ble_service
from umodbus.tcp import ModbusTCP
# https://github.com/straga/micropython-esp32-twai
import CAN
# TCA99554PWR is used to control the relay outputs
TCA_ADDR = 32
TCA_CONFIG = 0x03
TCA_OUTPUT = 0x01
# on/off for encoder input a/b
on = [1]
off = [0]
def irq_a1(pin):
can.send(on if pin() else off, 0x10)
def irq_b1(pin):
can.send(on if pin() else off, 0x11)
def irq_a2(pin):
can.send(on if pin() else off, 0x12)
def irq_b2(pin):
can.send(on if pin() else off, 0x13)
def irq_a3(pin):
can.send(on if pin() else off, 0x14)
def irq_b3(pin):
can.send(on if pin() else off, 0x15)
def setup_encoder() -> tuple[Encoder, Encoder, Encoder]:
a1 = Pin(4, pull=Pin.PULL_UP)
b1 = Pin(5, pull=Pin.PULL_UP)
a2 = Pin(6, pull=Pin.PULL_UP)
b2 = Pin(7, pull=Pin.PULL_UP)
a3 = Pin(8, pull=Pin.PULL_UP)
b3 = Pin(9, pull=Pin.PULL_UP)
enc_x = Encoder(0, a1, b1, phases=4)
enc_y = Encoder(1, a2, b2, phases=4)
enc_z = Encoder(2, a3, b3, phases=4)
a1.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a1)
b1.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b1)
a2.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a2)
b2.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b2)
a3.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_a3)
b3.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING, handler=irq_b3)
return enc_x, enc_y, enc_z
def led():
# LED at USB-C Port
np = NeoPixel(Pin(38), 1)
for c in [(255, 0, 0), (0, 255, 0), (0, 0, 255), (0, 0, 0)]:
np[0] = c
np.write()
time.sleep_ms(150)
def init_i2c():
# TCA: 32, RTC: 81
return I2C(scl=Pin(41), sda=Pin(42), freq=400_000)
def init_outputs():
# first set all outputs to 0
i2c.writeto_mem(TCA_ADDR, TCA_OUTPUT, b'\x00')
#then configure all gpios as outputs
i2c.writeto_mem(TCA_ADDR, TCA_CONFIG, b'\x00')
def init_inputs():
# GPIO-Inputs are from ESP32
return [Signal(Pin(4 + n, mode=Pin.IN, pull=Pin.PULL_UP), invert=True) for n in range(8)]
def read_port(port) -> bool:
"""
Read Port 1 - 8
"""
return inputs[port - 1].value()
def set_output(port: int, value: bool):
"""
Write to Port 1 - 8
Chip: TCA99554PWR (I2C)
"""
buffer = bytearray(1)
i2c.readfrom_mem_into(TCA_ADDR, TCA_OUTPUT, buffer)
if value:
buffer[0] |= 1 << (port - 1)
else:
buffer[0] &= ~(1 << (port - 1))
i2c.writeto_mem(TCA_ADDR, TCA_OUTPUT, buffer)
def init_lan():
lan_spi = SPI(2, sck=Pin(15), mosi=Pin(13), miso=Pin(14), baudrate=20_000_000)
lan_int = Pin(12, mode=Pin.IN)
lan_sel = Pin(16, mode=Pin.OUT, value=1)
lan_rst = Pin(39, mode=Pin.OUT)
eth0 = LAN(spi=lan_spi, cs=lan_sel, int=lan_int, phy_type=PHY_W5500, phy_addr=0, reset=lan_rst)
eth0.active(True)
return eth0
def init_sdcard():
return SDCard(
slot=0,
width=1,
cmd=Pin(47),
data=(Pin(45),),
sck=Pin(48),
)
def mount_sd(sd):
os.mount(sd, "/sd")
def test1():
# test 1
for color in [(255,0,0), (0,255,0), (0,0,255)]:
np[0] = color
np.write()
time.sleep(1)
np[0] = (0,0,0)
np.write()
def test2():
for port in range(1, 9):
if port > 1:
set_output(port - 1, False)
set_output(port, True)
time.sleep(0.2)
set_output(8, False)
async def recv_can():
while True:
if can.any():
msg = can.recv()
print("CAN:", msg)
# CAN: (80, False, False, b'\x03')
if msg[0] == 0x50 and len(msg[-1]) == 1:
value = msg[-1][0]
if value & 0x01:
print("Reset X Encoder")
enc_x.value(0)
if value & 0x02:
print("Reset Y Encoder")
enc_y.value(0)
if value & 0x04:
print("Reset Z Encoder")
enc_z.value(0)
elif msg[0] == 0x51 and len(msg[-1]) == 1:
value = msg[-1][0]
for shift in range(8):
set_output(shift + 1, bool(value & (1 << shift)))
await asyncio.sleep(0.1)
async def send_can():
while True:
if can.info()["msgs_to_tx"] == 0:
try:
can.send([1,2,3], 0x82, timeout=1)
except OSError:
pass
await asyncio.sleep(1)
def modbus_coils_set(address, val, reg_type):
for addr, value in enumerate(val, address):
print(f"Set coil {addr} to {value}")
set_output(addr, value)
def modbus_coils_get(address, val, reg_type):
print(address, val, reg_type)
async def espnow_client(espnow: AIOESPNow):
print("Waiting for ESPNow messages")
async for client, msg in espnow:
if msg == b"ON":
set_output(1, True)
elif msg == b"OFF":
set_output(1, False)
async def read_enc():
last = None
x_factor = 0.2989010989010989
y_factor = 0.2974560673675727
z_factor = 0.29498525073746323
while True:
current = (enc_x.value(), enc_y.value(), enc_z.value())
if current != last:
last = current
print(f"X: {current[0] * x_factor:5.1f} mm | Y: {current[1] * y_factor:5.1f} mm | Z: {-current[2] * z_factor:5.1f} mm")
if can.info()["msgs_to_tx"] == 0:
try:
can.send([
current[0] & 0xFF, (current[0] >> 8) & 0xFF,
current[1] & 0xFF, (current[2] >> 8) & 0xFF,
current[2] & 0xFF, (current[2] >> 8) & 0xFF,
],
0x20,
timeout=1,
)
except OSError:
pass
await asyncio.sleep_ms(100)
async def modbus_worker():
modbus = ModbusTCP()
modbus.setup_registers()
modbus.bind("")
for addr in range(1, 9):
modbus.add_coil(
addr,
False,
on_set_cb=modbus_coils_set,
on_get_cb=modbus_coils_get,
)
while True:
modbus.process()
await asyncio.sleep_ms(10)
async def main():
print("Starting CAN receive task")
asyncio.create_task(recv_can())
print("Starting CAN send task")
asyncio.create_task(send_can())
print("Starting encoder read task")
asyncio.create_task(read_enc())
print("Starting BLE task")
asyncio.create_task(ble_service(set_output))
print("Starting ESPNow task")
asyncio.create_task(espnow_client(espnow))
print("Starting Modbus task")
asyncio.create_task(modbus_worker())
while True:
await asyncio.sleep(5)
led()
# CAN blocks if msgs_to_tx == 2
can = CAN(0, tx=17, rx=18, mode=CAN.NORMAL, bitrate=2_000_000, extframe=0)
# CAN.NACK << test if this could help with blocking
# can = CAN(0, tx=17, rx=18, mode=CAN.NACK, bitrate=2_000_000, extframe=0)
enc_x, enc_y, enc_z = setup_encoder()
i2c = init_i2c()
inputs = init_inputs()
sd_card = init_sdcard()
mount_sd(sd_card)
init_outputs()
rtc = PCF(i2c)
lan = init_lan()
buzzer = PWM(Pin(46), freq=880, duty=0)
espnow = AIOESPNow()
espnow.active(True)
espnow.add_peer(bytes([255]*6))
register_definitions = {
"COILS": {
"COIL": {
"register": 0,
"len": 8,
"val": 0
}
},
"HREGS": {
"EXAMPLE_HREG": {
"register": 93,
"len": 1,
"val": 19
}
},
"ISTS": {
"EXAMPLE_ISTS": {
"register": 67,
"len": 1,
"val": 0
}
},
"IREGS": {
"DIN": {
"register": 0,
"len": 1,
"val": 0
}
}
}
print("Running mainloop")
asyncio.run(main())