自动化测试
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

361 lines
8.4 KiB

package main
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os/exec"
"strings"
"sync"
"time"
uart "automatedtesting/usecases_proxy/serial"
MQTT "github.com/eclipse/paho.mqtt.golang"
"go.bug.st/serial"
)
var client MQTT.Client
var Subtopic string = "application/agency/instr" //订阅主题
var PushUart string = "application/agency/uart"
var PushGateway string = "application/agency/gw"
var PushDebug string = "application/agency/debug"
var PushReplay string = "application/agency/replay"
var IP string = "0.0.0.0"
var stdout io.ReadCloser
var Identify string = "Uart"
var Runing bool = false
var TerRuning = make(map[string]bool, 1)
var Uarts = make(map[string]uart.SerialPort, 1)
var mux sync.Mutex
// var xUart uart.SerialPort
type Message struct {
Identify string `json:"identify"` //Uart / gateway /也可用于标志
Id string `json:"id"`
Ip string `json:"Ip"`
Port string `json:"Port"`
Name string `json:"name"` //具体指令名称
Data interface{} `json:"data"` //数据
Addition bool `json:"addition"`
}
var SendDataUart = make(chan interface{}, 32)
func onConnectionLost(c MQTT.Client, err error) {
fmt.Println("mqtt connection lost,", err)
MqttDebug("mqtt connection lost," + err.Error())
}
type uartmsg struct {
Port string `json:"port"`
Baud int `json:"baud"`
Check serial.Parity `json:"check"`
Data int `json:"data"`
Stop serial.StopBits `json:"stop"`
}
// type SendS struct {
// Com string
// Data interface{}
// }
func SendUart(com string, data interface{}, ishex bool) {
u := Uarts[com]
mux.Lock()
str := fmt.Sprint(data)
PushVal(PushUart+"/"+com, str)
if !ishex {
u.WriteString(str)
} else {
s := StrToHex(str)
u.Write(s)
}
mux.Unlock()
}
func GetPorts(id string) {
comlist := uart.GetComList()
var chks = make(map[string]bool, 1)
var commap = make(map[string]interface{}, 1)
for _, v := range comlist {
chks[v] = uart.CheckCom(v)
}
commap["comlist"] = comlist
commap["chks"] = chks
relay := Message{
Identify: "Uart",
Name: "GetPorts",
Data: commap,
Ip: IP,
}
Push(PushReplay+"/"+id, relay)
}
/*
{
"identify":"Gateway",
"ip": "192.168.0.240",
"port":"0",
"name":"IsRuning",
"data":""
}
*/
func SubHandler(client MQTT.Client, msg MQTT.Message) {
var message Message
json.Unmarshal(msg.Payload(), &message)
if message.Ip == IP {
if message.Identify == Identify {
if Identify == "Gateway" {
switch message.Name {
case "IsRuningProgram":
Push(PushReplay+"/"+message.Id, Message{
Identify: Identify,
Id: message.Id,
Ip: IP,
Name: "IsRuningProgram",
Data: Runing,
})
}
} else if Identify == "Uart" {
switch message.Name {
case "IsRuningProgram":
Push(PushReplay+"/"+message.Id, Message{
Identify: Identify,
Id: message.Id,
Ip: IP,
Name: "IsRuningProgram",
Data: Runing,
})
fmt.Println(TernaryVal(Runing,"程序已经运行","程序未运行"))
case "IsRuning":
Push(PushReplay+"/"+message.Id+"/"+message.Port, Message{
Identify: Identify,
Id: message.Id,
Ip: IP,
Name: "IsRuning",
Data: TerRuning[message.Port],
})
fmt.Println(TernaryVal(TerRuning[message.Port],"串口是开启状态","串口是关闭状态"))
case "Send":
SendUart(message.Port, message.Data, message.Addition)
// SendDataUart<- message.Data
case "Start":
UartDevice(message)
case "Stop":
fmt.Println("关闭串口 ",message.Port)
// if _, ok := TerRuning[message.Port]; !ok {
// mux.Lock()
// TerRuning[message.Port] = false
// mux.Unlock()
// }
mux.Lock()
TerRuning[message.Port] = false
if _, ok := Uarts[message.Port]; ok {
p := Uarts[message.Port]
p.Close()
Uarts[message.Port] = p
}
mux.Unlock()
fmt.Println("关闭串口 ",TerRuning[message.Port])
case "GetPorts":
GetPorts(message.Id)
}
}
}
}
fmt.Println("--------------------",time.Now().Format("2006-01-02 15:04:05"))
}
func Mqtt_Subscribe(c MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) {
for {
if token := c.Subscribe(topic, qos, callback); token.Wait() && token.Error() != nil {
fmt.Println("subscribe topic,", topic, token.Error())
time.Sleep(time.Second)
continue
}
return
}
}
func Mqtt_Unsubscribe(c MQTT.Client, topic string) {
if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
fmt.Println("unsubscribe topic,", topic, token.Error())
return
}
}
/**
* @Description: 发布订阅
*/
func PushVal(topic string, message interface{}) {
token := client.Publish(topic, 0, false, message)
token.Wait()
}
func Push(topic string, message interface{}) {
data, _ := json.Marshal(message)
token := client.Publish(topic, 0, false, data)
token.Wait()
}
func Subscribe(topic string, callback MQTT.MessageHandler) {
Mqtt_Subscribe(client, topic, 0, callback)
}
func UnSubscribe(topic string) {
Mqtt_Unsubscribe(client, topic)
}
func OnConnection(c MQTT.Client) {
go Mqtt_Subscribe(c, Subtopic, 0, SubHandler)
}
func MqttSetup() {
opts := MQTT.NewClientOptions()
opts.AddBroker("192.168.0.6:1883")
// "39.98.253.192:1885"
// opts.AddBroker("39.98.253.192:1885")
opts.SetUsername("app")
opts.SetPassword("CrDO5FvEdaIXUMYw")
opts.SetCleanSession(true)
opts.SetOnConnectHandler(OnConnection)
opts.SetConnectionLostHandler(onConnectionLost)
opts.SetMaxReconnectInterval(5 * time.Second)
client = MQTT.NewClient(opts)
for {
if token := client.Connect(); token.Wait() && token.Error() != nil {
fmt.Println("connect mqtt error,", token.Error())
time.Sleep(2 * time.Second)
} else {
fmt.Println("connect mqtt success")
IP, _ = getLocalIPv4Address()
PushGateway = PushGateway + "/" + IP
PushUart = PushUart + "/" + IP
Identify = Device
if Device == "Uart" {
Runing = true
} else if Device == "Gateway" {
go GatewayDevice()
}
break
}
}
}
func GatewayDevice() {
cmd := exec.Command(`/gateway/start.sh`)
// 连接管道
var err error
stdout, err = cmd.StdoutPipe()
if err != nil {
MqttDebug(err.Error())
return
}
cmd.Start()
reader := bufio.NewReader(stdout)
go func() {
Runing = true
for Runing {
data, err := reader.ReadString('\n')
if err != nil {
MqttDebug("ReadString: " + err.Error())
time.Sleep(time.Second)
}
if data != "" {
PushVal(PushGateway, data)
} else {
time.Sleep(time.Millisecond * 2)
}
}
cmd.Wait()
}()
}
// var si int = 1
func UartDevice(msg Message) {
var mesg uartmsg
ms, _ := json.Marshal(msg.Data)
json.Unmarshal(ms, &mesg)
port := mesg.Port
s := Uarts[port]
if _, ok := Uarts[port]; !ok || !s.GetFlag() {
var err error
var model = &serial.Mode{
BaudRate: mesg.Baud,
DataBits: mesg.Data,
Parity: mesg.Check,
StopBits: mesg.Stop,
}
model = uart.ModeDef
mux.Lock()
Uarts[port],err = uart.OpenSerial(port, model)
mux.Unlock()
if err!=nil{
fmt.Println("打开串口失败",err)
return
}
pp:=Uarts[port]
fmt.Println("open uart success", port,pp.GetFlag())
} else {
fmt.Println("串口已经被打开", port)
// MqttDebug("port is open")
return
}
p := Uarts[port]
if p.GetFlag() {
go func() {
fmt.Println("Start ", port)
mux.Lock()
TerRuning[port] = true
mux.Unlock()
// 这里返回启动成功
Push(PushReplay+"/"+msg.Id+"/"+msg.Port, Message{
Identify: Identify,
Id: msg.Id,
Ip: IP,
Name: "Start",
Data: TerRuning[msg.Port],
})
fmt.Println("Push Start")
// 开始读取数据
buff := make([]byte, 0)
for TerRuning[port] {
buf := make([]byte, 1024)
n, err := p.Read(buf)
if err != nil {
mux.Lock()
TerRuning[port] = false
mux.Unlock()
}
if len(string(buf[:n])) > 0 {
buff = append(buff, buf[:n]...)
if strings.HasSuffix(string(buf[:n]), "\n") || len(buff) > 2048 || !TerRuning[port] {
PushVal(PushUart+"/"+port, buff)
buff = make([]byte, 0)
}
}
}
// Push(PushUart+"/"+port+"/"+"heartbeat", Message{
// Name: "heartbeat",
// Data: false,//心跳终止
// })
}()
} else {
fmt.Println("open uart error", TerRuning[port])
fmt.Println("The serial port fails to be opened or is occupied")
return
}
}
func MqttDebug(str string) {
fmt.Println(str)
Push(PushDebug, fmt.Sprint(time.Now().Format("2006-01-02 15:04:05"), "--", str))
}