package main import ( "bufio" "encoding/json" "fmt" "io" "os/exec" "strings" "sync" "time" uart "automatedtesting/usecases_proxy/serial" MQTT "github.com/eclipse/paho.mqtt.golang" "go.bug.st/serial" ) var client MQTT.Client var Subtopic string = "application/agency/instr" //订阅主题 var PushUart string = "application/agency/uart" var PushGateway string = "application/agency/gw" var PushDebug string = "application/agency/debug" var PushReplay string = "application/agency/replay" var IP string = "0.0.0.0" var stdout io.ReadCloser var Identify string = "Uart" var Runing bool = false var TerRuning = make(map[string]bool, 1) var Uarts = make(map[string]uart.SerialPort, 1) var mux sync.Mutex // var xUart uart.SerialPort type Message struct { Identify string `json:"identify"` //Uart / gateway /也可用于标志 Id string `json:"id"` Ip string `json:"Ip"` Port string `json:"Port"` Name string `json:"name"` //具体指令名称 Data interface{} `json:"data"` //数据 Addition bool `json:"addition"` } var SendDataUart = make(chan interface{}, 32) func onConnectionLost(c MQTT.Client, err error) { fmt.Println("mqtt connection lost,", err) MqttDebug("mqtt connection lost," + err.Error()) } type uartmsg struct { Port string `json:"port"` Baud int `json:"baud"` Check serial.Parity `json:"check"` Data int `json:"data"` Stop serial.StopBits `json:"stop"` } // type SendS struct { // Com string // Data interface{} // } func SendUart(com string, data interface{}, ishex bool) { u := Uarts[com] mux.Lock() str := fmt.Sprint(data) PushVal(PushUart+"/"+com, str) if !ishex { u.WriteString(str) } else { s := StrToHex(str) u.Write(s) } mux.Unlock() } func GetPorts(id string) { comlist := uart.GetComList() var chks = make(map[string]bool, 1) var commap = make(map[string]interface{}, 1) for _, v := range comlist { chks[v] = uart.CheckCom(v) } commap["comlist"] = comlist commap["chks"] = chks relay := Message{ Identify: "Uart", Name: "GetPorts", Data: commap, Ip: IP, } Push(PushReplay+"/"+id, relay) } /* { "identify":"Gateway", "ip": "192.168.0.240", "port":"0", "name":"IsRuning", "data":"" } */ func SubHandler(client MQTT.Client, msg MQTT.Message) { var message Message json.Unmarshal(msg.Payload(), &message) if message.Ip == IP { if message.Identify == Identify { if Identify == "Gateway" { switch message.Name { case "IsRuningProgram": Push(PushReplay+"/"+message.Id, Message{ Identify: Identify, Id: message.Id, Ip: IP, Name: "IsRuningProgram", Data: Runing, }) } } else if Identify == "Uart" { switch message.Name { case "IsRuningProgram": Push(PushReplay+"/"+message.Id, Message{ Identify: Identify, Id: message.Id, Ip: IP, Name: "IsRuningProgram", Data: Runing, }) fmt.Println(TernaryVal(Runing,"程序已经运行","程序未运行")) case "IsRuning": Push(PushReplay+"/"+message.Id+"/"+message.Port, Message{ Identify: Identify, Id: message.Id, Ip: IP, Name: "IsRuning", Data: TerRuning[message.Port], }) fmt.Println(TernaryVal(TerRuning[message.Port],"串口是开启状态","串口是关闭状态")) case "Send": SendUart(message.Port, message.Data, message.Addition) // SendDataUart<- message.Data case "Start": UartDevice(message) case "Stop": fmt.Println("关闭串口 ",message.Port) // if _, ok := TerRuning[message.Port]; !ok { // mux.Lock() // TerRuning[message.Port] = false // mux.Unlock() // } mux.Lock() TerRuning[message.Port] = false if _, ok := Uarts[message.Port]; ok { p := Uarts[message.Port] p.Close() Uarts[message.Port] = p } mux.Unlock() fmt.Println("关闭串口 ",TerRuning[message.Port]) case "GetPorts": GetPorts(message.Id) } } } } fmt.Println("--------------------",time.Now().Format("2006-01-02 15:04:05")) } func Mqtt_Subscribe(c MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) { for { if token := c.Subscribe(topic, qos, callback); token.Wait() && token.Error() != nil { fmt.Println("subscribe topic,", topic, token.Error()) time.Sleep(time.Second) continue } return } } func Mqtt_Unsubscribe(c MQTT.Client, topic string) { if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil { fmt.Println("unsubscribe topic,", topic, token.Error()) return } } /** * @Description: 发布订阅 */ func PushVal(topic string, message interface{}) { token := client.Publish(topic, 0, false, message) token.Wait() } func Push(topic string, message interface{}) { data, _ := json.Marshal(message) token := client.Publish(topic, 0, false, data) token.Wait() } func Subscribe(topic string, callback MQTT.MessageHandler) { Mqtt_Subscribe(client, topic, 0, callback) } func UnSubscribe(topic string) { Mqtt_Unsubscribe(client, topic) } func OnConnection(c MQTT.Client) { go Mqtt_Subscribe(c, Subtopic, 0, SubHandler) } func MqttSetup() { opts := MQTT.NewClientOptions() opts.AddBroker("192.168.0.6:1883") // "39.98.253.192:1885" // opts.AddBroker("39.98.253.192:1885") opts.SetUsername("app") opts.SetPassword("CrDO5FvEdaIXUMYw") opts.SetCleanSession(true) opts.SetOnConnectHandler(OnConnection) opts.SetConnectionLostHandler(onConnectionLost) opts.SetMaxReconnectInterval(5 * time.Second) client = MQTT.NewClient(opts) for { if token := client.Connect(); token.Wait() && token.Error() != nil { fmt.Println("connect mqtt error,", token.Error()) time.Sleep(2 * time.Second) } else { fmt.Println("connect mqtt success") IP, _ = getLocalIPv4Address() PushGateway = PushGateway + "/" + IP PushUart = PushUart + "/" + IP Identify = Device if Device == "Uart" { Runing = true } else if Device == "Gateway" { go GatewayDevice() } break } } } func GatewayDevice() { cmd := exec.Command(`/gateway/start.sh`) // 连接管道 var err error stdout, err = cmd.StdoutPipe() if err != nil { MqttDebug(err.Error()) return } cmd.Start() reader := bufio.NewReader(stdout) go func() { Runing = true for Runing { data, err := reader.ReadString('\n') if err != nil { MqttDebug("ReadString: " + err.Error()) time.Sleep(time.Second) } if data != "" { PushVal(PushGateway, data) } else { time.Sleep(time.Millisecond * 2) } } cmd.Wait() }() } // var si int = 1 func UartDevice(msg Message) { var mesg uartmsg ms, _ := json.Marshal(msg.Data) json.Unmarshal(ms, &mesg) port := mesg.Port s := Uarts[port] if _, ok := Uarts[port]; !ok || !s.GetFlag() { var err error var model = &serial.Mode{ BaudRate: mesg.Baud, DataBits: mesg.Data, Parity: mesg.Check, StopBits: mesg.Stop, } model = uart.ModeDef mux.Lock() Uarts[port],err = uart.OpenSerial(port, model) mux.Unlock() if err!=nil{ fmt.Println("打开串口失败",err) return } pp:=Uarts[port] fmt.Println("open uart success", port,pp.GetFlag()) } else { fmt.Println("串口已经被打开", port) // MqttDebug("port is open") return } p := Uarts[port] if p.GetFlag() { go func() { fmt.Println("Start ", port) mux.Lock() TerRuning[port] = true mux.Unlock() // 这里返回启动成功 Push(PushReplay+"/"+msg.Id+"/"+msg.Port, Message{ Identify: Identify, Id: msg.Id, Ip: IP, Name: "Start", Data: TerRuning[msg.Port], }) fmt.Println("Push Start") // 开始读取数据 buff := make([]byte, 0) for TerRuning[port] { buf := make([]byte, 1024) n, err := p.Read(buf) if err != nil { mux.Lock() TerRuning[port] = false mux.Unlock() } if len(string(buf[:n])) > 0 { buff = append(buff, buf[:n]...) if strings.HasSuffix(string(buf[:n]), "\n") || len(buff) > 2048 || !TerRuning[port] { PushVal(PushUart+"/"+port, buff) buff = make([]byte, 0) } } } // Push(PushUart+"/"+port+"/"+"heartbeat", Message{ // Name: "heartbeat", // Data: false,//心跳终止 // }) }() } else { fmt.Println("open uart error", TerRuning[port]) fmt.Println("The serial port fails to be opened or is occupied") return } } func MqttDebug(str string) { fmt.Println(str) Push(PushDebug, fmt.Sprint(time.Now().Format("2006-01-02 15:04:05"), "--", str)) }