作者:电子工程世界(EEWorld)ningh
储备了不少墨水屏,一直没派上用场,继【用三色墨水屏显示哪吒】之后,又摆弄了一下 7.5 寸 7 色墨水屏(悬空放了两个小时,弯了-。-),换了个 3.7 寸双色继续折腾。
由于之前大致摸清楚了图像抖动算法,突发奇想,只需要将网页转换为图片,不就可以在墨水屏上显示任何内容了?毕竟网页界面制作起来工作量可小多了,而且有丰富的库可以使用,说干就干!
首先,确定技术方案:服务端:Python + Playwright 负责提供接口渲染网页;客户端:MicroPython 连接 WiFi,定期请求网页更新屏幕。
服务端源码:
import asyncio
import logging
import time
import os
import uuid
from io import BytesIO
from urllib.parse import urljoin
from secrets import compare_digest
from aiohttp import web
from dotenv import load_dotenv
from playwright.async_api import async_playwright
from PIL import Image
logger = logging.getLogger(__name__)
load_dotenv()
HOST = os.environ.get('HTTP_HOST')
PORT = int(os.environ.get('HTTP_PORT'))
RENDER_TIMEOUT = int(os.environ.get('RENDER_TIMEOUT', 30))
jrhz.infoTEMPLATE_PATH = os.path.join(os.path.dirname(__file__), os.environ.get('TEMPLATE_PATH', 'templates/hello.html'))
UPDATE_INTERVAL = int(os.environ.get('UPDATE_INTERVAL', 60 * 15))
IMAGE_URL = str(uuid.uuid4())
def _get_mac():
mac = uuid.getnode()
return ':'.join(("%012X" % mac)[i:i + 2] for i in range(0, 12, 2))
_device = {
'model': 'matters-370-opensource',
'firmware': '0.0.1',
'key': os.environ.get('API_KEY'),
# ignored!
'mac': _get_mac(),
'battery': 0.0,
'rssi': 0,
'ota': '',
# server only!
'width': 416,
'height': 240,
}
_image = None
_last_update = 0
_task = None
_task_lock = asyncio.Lock()
async def render(wait_for=UPDATE_INTERVAL):
global _image, _last_update
if wait_for:
logger.info(f'Waiting for {wait_for} seconds...')
await asyncio.sleep(wait_for)
logger.info('Rendering...')
async with async_playwright() as p:
try:
browser = await p.chromium.launch()
context = await browser.new_context()
page = await context.new_page()
await page.set_viewport_size({'width': _device['width'], 'height': _device['height']})
# TODO: Jinja2 template?
html = open(TEMPLATE_PATH).read()
await page.set_content(html, timeout=RENDER_TIMEOUT * 1000, wait_until='networkidle')
png_bytes = await page.screenshot(type='png')
img = Image.open(BytesIO(png_bytes))
img = img.convert('1', dither=Image.Dither.FLOYDSTEINBERG)
img_bytes = BytesIO()
img.save(img_bytes, format='BMP')
_image = img_bytes.getvalue()
_last_update = int(time.time())
except Exception as e:
logger.error('Render failed:', exc_info=e)
return
logger.info('Render √')
@web.middleware
async def auth_middleware(req, handler):
if req.method == 'POST':
if not compare_digest(req.headers.get('Access-Token', ''), _device['key']):
raise web.HTTPForbidden(reason='Invalid API key')
return await handler(req)
async def ping(req: web.Request) -> web.Response:
global _task
if _image:
image_url = urljoin(f'{req.scheme}://{req.host}{req.path}', IMAGE_URL)
else:
image_url = ''
res = {
'status': 200,
'message': 'OK',
'display': image_url,
'last_update': _last_update,
# unused!
'key': '',
'ota': '',
}
res['next_update'] = UPDATE_INTERVAL if _image else RENDER_TIMEOUT
async with _task_lock:
if not _task or _task.done():
_task = asyncio.create_task(render(max(res['next_update'] - RENDER_TIMEOUT, 0)))
return web.json_response(res)
def image(req: web.Request) -> web.Response:
if not _image:
raise web.HTTPNotFound()
return web.Response(body=_image, content_type='image/bmp')
def create_app() -> web.Application:
app = web.Application(middlewares=[auth_middleware])
app.add_routes([
web.post('/ping', ping),
web.get(f'/{IMAGE_URL}', image),
])
return app
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
if not _device['key']:
_device['key'] = uuid.uuid4().hex
logger.info('API key generated: %s', _device['key'])
logger.info('Starting server...')
app = create_app()
web.run_app(app, host=HOST, port=PORT)
为了方便测试,还用 Python + tkinter 实现了一个简单的模拟器,效果如下:

模拟器源码:
import threading
import time
import tkinter as tk
import uuid
from datetime import datetime, timedelta
from io import BytesIO
from tkinter import ttk
import requests
from PIL import ImageTk, Image
API_HOST = 'http://127.0.0.1:1988'
API_KEY = ''
def _get_mac():
mac = uuid.getnode()
return ':'.join(('%012X' % mac)[i:i + 2] for i in range(0, 12, 2))
_device = {
'model': 'matters-370-opensource',
'firmware': '0.0.1',
'mac': _get_mac(),
'battery': 0.0,
'rssi': 0,
'ota': '',
'width': 416,
'height': 240,
'key': API_KEY,
'last_update': 0,
}
_req = {
'model': _device['model'],
'firmware': _device['firmware'],
'mac': _device['mac'],
'battery': 0.0,
'rssi': 0,
}
class Simulator:
def __init__(self, master):
self.master = master
master.title(f'Simulator: {_device["model"]}')
master.resizable(0, 0)
self.frame = ttk.Frame(master, width=_device['width'], height=_device['height'])
self.frame.pack_propagate(0)
self.frame.pack()
self.img_label = ttk.Label(self.frame)
self.img_label.pack(expand=1)
self.status = ttk.Label(master, text='Initializing...',
relief=tk.SUNKEN, anchor=tk.W)
self.status.pack(side=tk.BOTTOM, fill=tk.X)
threading.Thread(target=self.upate, daemon=True).start()
def upate(self):
global _device
while True:
self.update_status('Connecting...')
try:
resp = requests.post(url=f'{API_HOST}/ping', json=_req, timeout=10, headers={
'Access-Token': _device['key'],
})
resp.raise_for_status()
data = resp.json()
self.update_status(data['message'])
if data['key']:
_device['key'] = data['key']
if data['last_update'] != _device['last_update']:
resp = requests.get(data['display'], timeout=10)
resp.raise_for_status()
img_io = BytesIO(resp.content)
pil_img = Image.open(img_io).resize((_device['width'], _device['height']))
self.tk_img = ImageTk.PhotoImage(pil_img)
self.master.after(0, self.show_image)
_device['last_update'] = data['last_update']
next_update = datetime.now() + (timedelta(seconds=data['next_update']))
self.update_status(f'Next update: {next_update}')
time.sleep(data['next_update'])
except Exception as e:
self.master.after(0, lambda: self.show_error(str(e)))
time.sleep(30)
def show_image(self):
self.img_label.configure(image=self.tk_img)
def update_status(self, message):
self.status.config(text=message)
self.status.update_idletasks()
def show_error(self, message):
self.update_status(f'Error: {message}')
if __name__ == '__main__':
root = tk.Tk()
root.geometry(f'{_device["width"]}x{_device["height"]+20}')
app = Simulator(root)
root.mainloop()
最后,把模拟器逻辑,移植到 MicroPython,最终效果:

先到这,过几天整理发布到 GitHub 上。




