自动化测试
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

243 lines
6.4 KiB

package mqtt
import (
"encoding/json"
"fmt"
"sync"
"time"
// "usecases/usecases_server/utils/cyllib/serial"
"automatedtesting/usecases_server/utils/cyllib/log"
"automatedtesting/usecases_server/utils/cyllib/ssh"
"automatedtesting/usecases_server/utils/pubsub"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
var client MQTT.Client
var PushDebug = "application/agency/debug"
var PushInstr string = "application/agency/instr"
var SubReplay = "application/agency/replay"
var SubUart = "application/agency/uart"
var SubGw = "application/agency/gw"
// var AddrCh = make(chan string, 1)
type Message struct {
Identify string `json:"identify"` //terminal / gateway
Id string `json:"id"` //terminal id / gateway id
Ip string `json:"Ip"`
Port string `json:"Port"`
Name string `json:"name"` //具体指令名称
Data interface{} `json:"data"` //数据
Addition bool `json:"addition"`
}
// -------------------------------------------------------------cache
// 订阅ip存储 对于网关按照ip分组,对于串口按照ip:port分组
var StoreSub = make(map[string]*bool)
type MqttSshMsg struct {
Identify string // 这个参数如果是网关则存储1301/1302/addr,如果是串口则存储com口或者为空
Runing bool //标识是否订阅
Path string //保存文件路径
SSHmsg ssh.SSHMsg //ssh 连接信息
pubsub *pubsub.Publisher
isExist bool //是否存在
mux sync.Mutex
}
var MqttSshMsgMap = make(map[string]*MqttSshMsg)
// 返回是否存在字段
func initMqttSshMsgMap(name string) {
if _, ok := MqttSshMsgMap[name]; !ok {
var msg = MqttSshMsg{
// Identify: name,
Runing: false,
Path: "",
SSHmsg: ssh.SSHMsg{},
isExist: false,
pubsub: pubsub.NewPublisher(100*time.Millisecond, 1),
}
msg.mux.Lock()
MqttSshMsgMap[name] = &msg
msg.mux.Unlock()
}
}
func isExistMqttSshMsgMap(name string) bool {
if _, ok := MqttSshMsgMap[name]; !ok {
return false
}
return MqttSshMsgMap[name].isExist
}
func CreateMqttSshMsgMap(name string, sshmsg ssh.SSHMsg) {
if !isExistMqttSshMsgMap(name) {
SetMqttSshMsgMapSSHMsg(name, sshmsg)
MqttSshMsgMap[name].isExist = true
}
}
// 第二个返回参数bool标识map中字段是否存在
func GetMqttSshMsgMapRuning(name string) bool {
initMqttSshMsgMap(name)
return MqttSshMsgMap[name].Runing
}
func SetMqttSshMsgMapRuning(name string, runing bool) {
initMqttSshMsgMap(name)
MqttSshMsgMap[name].mux.Lock()
MqttSshMsgMap[name].Runing = runing
MqttSshMsgMap[name].mux.Unlock()
}
func GetMqttSshMsgMapPath(name string) string {
initMqttSshMsgMap(name)
return MqttSshMsgMap[name].Path
}
func SetMqttSshMsgMapPath(name string, path string) {
initMqttSshMsgMap(name)
MqttSshMsgMap[name].mux.Lock()
MqttSshMsgMap[name].Path = path
MqttSshMsgMap[name].mux.Unlock()
}
func GetMqttSshMsgMapSSHMsg(name string) *ssh.SSHMsg {
initMqttSshMsgMap(name)
return &MqttSshMsgMap[name].SSHmsg
}
func SetMqttSshMsgMapSSHMsg(name string, sshmsg ssh.SSHMsg) {
initMqttSshMsgMap(name)
MqttSshMsgMap[name].mux.Lock()
MqttSshMsgMap[name].SSHmsg = sshmsg
MqttSshMsgMap[name].mux.Unlock()
}
func GetMqttSshMsgMapIdentify(name string) string {
initMqttSshMsgMap(name)
return MqttSshMsgMap[name].Identify
}
func SetMqttSshMsgMapIdentify(name string, identify string) {
initMqttSshMsgMap(name)
MqttSshMsgMap[name].mux.Lock()
MqttSshMsgMap[name].Identify = identify
MqttSshMsgMap[name].mux.Unlock()
}
func GetMqttSshMsgMapPubsub(name string) *pubsub.Publisher {
initMqttSshMsgMap(name)
return MqttSshMsgMap[name].pubsub
}
// func SetMqttSshMsgUart(id string, data string) {
// if _, ok := ConnectMap[id]; !ok {
// ConnectMap[id] = initMqttCache()
// }
// ConnectMap[id].GwIndex = append(ConnectMap[id].GwIndex, data)
// }
//----------------------------------------------------------------mqtt
func MqttDebug(str string) {
Push(PushDebug, fmt.Sprint(time.Now().Format("2006-01-02 15:04:05"), "--", str))
}
func onConnectionLost(c MQTT.Client, err error) {
fmt.Println("mqtt connection lost,", err)
MqttDebug("mqtt connection lost," + err.Error())
}
func Mqtt_Subscribe(c MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) {
for {
if token := c.Subscribe(topic, 0, callback); token.Wait() && token.Error() != nil {
fmt.Println("subscribe topic,", topic, token.Error())
MqttDebug("subscribe topic," + topic + token.Error().Error())
time.Sleep(time.Second)
continue
}
return
}
}
func Mqtt_Unsubscribe(c MQTT.Client, topic string) {
if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
fmt.Println("unsubscribe topic,", topic, token.Error())
MqttDebug("unsubscribe topic," + topic + token.Error().Error())
return
}
}
/**
* @Description: 发布订阅
* @param clientID
* @param addr
* @param topic
* @param payload
*/
func Push(topic string, message interface{}) {
data, _ := json.Marshal(message)
token := client.Publish(topic, 0, false, data)
token.Wait()
// fmt.Println(topic, "Data Size is", data)
}
func Subscribe(topic string, callback MQTT.MessageHandler) {
Mqtt_Subscribe(client, topic, 0, callback)
}
func UnSubscribe(topic string) {
Mqtt_Unsubscribe(client, topic)
}
// func SubscribeBatch() {
// for _, v := range SubList {
// if !v.Flag {
// Subscribe(v.Topic, CallbackGW)
// v.Flag = true
// }
// }
// }
func CallbackReplay(c MQTT.Client, m MQTT.Message) {
}
// func CallbackInstr(c MQTT.Client, m MQTT.Message){
// }
func CallbackGW(c MQTT.Client, m MQTT.Message) {
}
func CallbackUart(c MQTT.Client, m MQTT.Message) {
}
func OnConnection(c MQTT.Client) {
// Subscribe(SubReplay, CallbackReplay)
// SubscribeBatch()
}
func MqttSetup() {
opts := MQTT.NewClientOptions()
opts.AddBroker("192.168.0.6:1883")
// opts.AddBroker("39.98.253.192:1885")
opts.SetUsername("app")
opts.SetPassword("CrDO5FvEdaIXUMYw")
opts.SetCleanSession(true)
opts.SetOnConnectHandler(OnConnection)
opts.SetConnectionLostHandler(onConnectionLost)
opts.SetMaxReconnectInterval(5 * time.Second)
client = MQTT.NewClient(opts)
for {
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Infof("mqtt connect error,%s", token.Error())
time.Sleep(2 * time.Second)
} else {
log.Infof("mqtt connect success")
fmt.Println("connect mqtt success")
break
}
}
}
func Mqtt_GetReplay(msg MQTT.Message) Message {
var replay Message
json.Unmarshal(msg.Payload(), &replay)
return replay
}