2024-06-10IoT

山体滑坡监测与预警系统:实时数据传输与智能预警

10 min

山体滑坡监测与预警系统

基于 Python + FastAPI 构建的嵌入式后端服务,实现山体滑坡监测数据的实时传输与智能预警,支持多终端协同与高可靠性。

项目背景

山体滑坡是自然灾害中的高危场景,需要实时监测与快速预警。本系统通过嵌入式设备采集现场数据,经由后端服务实时传输至监控端,结合智能预警算法实现灾害预防。

技术架构

核心技术栈

  • Python + FastAPI:高性能异步 Web 框架
  • WebSocket:实时双向通信
  • SQLite/MySQL:数据存储与查询
  • Docker:容器化部署

核心功能实现

1. 实时数据传输

使用 FastAPI + WebSocket 实现嵌入式设备与监控端的实时数据传输:

from fastapi import FastAPI, WebSocket
from typing import List

app = FastAPI()
active_connections: List[WebSocket] = []

@app.websocket("/ws/monitor")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    active_connections.append(websocket)
    
    try:
        while True:
            data = await websocket.receive_json()
            
            # 广播数据至所有监控端
            for connection in active_connections:
                await connection.send_json(data)
    except:
        active_connections.remove(websocket)

2. 传感器数据处理

设计 RESTful 接口接收传感器数据,支持传输速率优化与数据校验:

from pydantic import BaseModel
from datetime import datetime

class SensorData(BaseModel):
    device_id: str
    temperature: float
    humidity: float
    tilt_angle: float
    timestamp: datetime

@app.post("/api/sensor/data")
async def receive_sensor_data(data: SensorData):
    # 数据校验与存储
    if validate_data(data):
        await store_to_database(data)
        
        # 触发预警检查
        if check_alert_condition(data):
            await trigger_alert(data)
        
        return {"status": "success"}
    
    return {"status": "error", "message": "Invalid data"}

3. 智能预警算法

基于历史数据与实时监测,实现多级预警机制:

def check_alert_condition(data: SensorData) -> bool:
    # 倾斜角度阈值检查
    if data.tilt_angle > 15:
        return True
    
    # 历史数据趋势分析
    historical_data = fetch_historical_data(data.device_id, hours=24)
    trend = calculate_trend(historical_data)
    
    if trend > 0.5:  # 快速变化趋势
        return True
    
    return False

async def trigger_alert(data: SensorData):
    alert = {
        "level": "high",
        "device_id": data.device_id,
        "message": f"Tilt angle exceeded: {data.tilt_angle}°",
        "timestamp": datetime.now()
    }
    
    # 推送至所有监控端
    for connection in active_connections:
        await connection.send_json(alert)

4. 数据存储与查询

使用 SQLite 实现轻量级数据存储,支持云端部署时切换至 MySQL:

import sqlite3
from contextlib import asynccontextmanager

@asynccontextmanager
async def get_db():
    conn = sqlite3.connect("monitoring.db")
    try:
        yield conn
    finally:
        conn.close()

async def store_to_database(data: SensorData):
    async with get_db() as conn:
        cursor = conn.cursor()
        cursor.execute("""
            INSERT INTO sensor_data 
            (device_id, temperature, humidity, tilt_angle, timestamp)
            VALUES (?, ?, ?, ?, ?)
        """, (data.device_id, data.temperature, data.humidity, 
              data.tilt_angle, data.timestamp))
        conn.commit()

性能优化

1. 传输速率优化

通过数据压缩与批量传输,降低网络带宽占用 60%:

import gzip
import json

def compress_data(data: dict) -> bytes:
    json_str = json.dumps(data)
    return gzip.compress(json_str.encode())

@app.post("/api/sensor/batch")
async def receive_batch_data(compressed_data: bytes):
    decompressed = gzip.decompress(compressed_data)
    data_list = json.loads(decompressed)
    
    for data in data_list:
        await store_to_database(SensorData(**data))
    
    return {"status": "success", "count": len(data_list)}

2. 云端部署优化

使用 Docker 容器化部署,支持快速扩展与环境隔离:

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

系统稳定性保障

1. 断线重连机制

实现 WebSocket 自动重连,确保数据传输可靠性:

import asyncio

async def websocket_client_with_retry(url: str):
    while True:
        try:
            async with websockets.connect(url) as ws:
                await handle_connection(ws)
        except Exception as e:
            print(f"Connection lost: {e}, retrying in 5s...")
            await asyncio.sleep(5)

2. 数据备份与恢复

定期备份监测数据,支持灾难恢复:

import shutil
from datetime import datetime

def backup_database():
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    backup_path = f"backups/monitoring_{timestamp}.db"
    shutil.copy("monitoring.db", backup_path)
    print(f"Backup created: {backup_path}")

技术挑战与解决方案

挑战 1:网络不稳定环境

解决方案:断线重连 + 本地缓存,确保数据不丢失

挑战 2:实时性要求高

解决方案:WebSocket 双向通信 + 异步处理,毫秒级响应

挑战 3:多终端协同

解决方案:广播机制 + 连接池管理,支持多监控端同时接入

项目成果

  • 实现山体滑坡监测数据的实时传输与智能预警
  • 传输速率优化 60%,支持多终端协同监控
  • 系统稳定性高,支持 7x24 小时不间断运行

技术启示

  1. WebSocket 是实时通信的最佳选择:低延迟、双向通信
  2. 嵌入式场景需考虑网络不稳定:断线重连与本地缓存必不可少
  3. 容器化部署提升可维护性:Docker 简化环境配置与扩展