You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
243 lines
6.4 KiB
243 lines
6.4 KiB
package mqtt
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
// "usecases/usecases_server/utils/cyllib/serial"
|
|
"automatedtesting/usecases_server/utils/cyllib/log"
|
|
"automatedtesting/usecases_server/utils/cyllib/ssh"
|
|
"automatedtesting/usecases_server/utils/pubsub"
|
|
|
|
MQTT "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
var client MQTT.Client
|
|
|
|
var PushDebug = "application/agency/debug"
|
|
var PushInstr string = "application/agency/instr"
|
|
|
|
var SubReplay = "application/agency/replay"
|
|
var SubUart = "application/agency/uart"
|
|
var SubGw = "application/agency/gw"
|
|
|
|
// var AddrCh = make(chan string, 1)
|
|
type Message struct {
|
|
Identify string `json:"identify"` //terminal / gateway
|
|
Id string `json:"id"` //terminal id / gateway id
|
|
Ip string `json:"Ip"`
|
|
Port string `json:"Port"`
|
|
Name string `json:"name"` //具体指令名称
|
|
Data interface{} `json:"data"` //数据
|
|
Addition bool `json:"addition"`
|
|
}
|
|
|
|
// -------------------------------------------------------------cache
|
|
|
|
// 订阅ip存储 对于网关按照ip分组,对于串口按照ip:port分组
|
|
var StoreSub = make(map[string]*bool)
|
|
|
|
type MqttSshMsg struct {
|
|
Identify string // 这个参数如果是网关则存储1301/1302/addr,如果是串口则存储com口或者为空
|
|
Runing bool //标识是否订阅
|
|
Path string //保存文件路径
|
|
SSHmsg ssh.SSHMsg //ssh 连接信息
|
|
|
|
pubsub *pubsub.Publisher
|
|
|
|
isExist bool //是否存在
|
|
|
|
mux sync.Mutex
|
|
}
|
|
var MqttSshMsgMap = make(map[string]*MqttSshMsg)
|
|
|
|
// 返回是否存在字段
|
|
func initMqttSshMsgMap(name string) {
|
|
if _, ok := MqttSshMsgMap[name]; !ok {
|
|
var msg = MqttSshMsg{
|
|
// Identify: name,
|
|
Runing: false,
|
|
Path: "",
|
|
SSHmsg: ssh.SSHMsg{},
|
|
isExist: false,
|
|
pubsub: pubsub.NewPublisher(100*time.Millisecond, 1),
|
|
}
|
|
msg.mux.Lock()
|
|
MqttSshMsgMap[name] = &msg
|
|
msg.mux.Unlock()
|
|
}
|
|
}
|
|
func isExistMqttSshMsgMap(name string) bool {
|
|
if _, ok := MqttSshMsgMap[name]; !ok {
|
|
return false
|
|
}
|
|
return MqttSshMsgMap[name].isExist
|
|
}
|
|
func CreateMqttSshMsgMap(name string, sshmsg ssh.SSHMsg) {
|
|
if !isExistMqttSshMsgMap(name) {
|
|
SetMqttSshMsgMapSSHMsg(name, sshmsg)
|
|
MqttSshMsgMap[name].isExist = true
|
|
}
|
|
}
|
|
|
|
// 第二个返回参数bool标识map中字段是否存在
|
|
func GetMqttSshMsgMapRuning(name string) bool {
|
|
initMqttSshMsgMap(name)
|
|
return MqttSshMsgMap[name].Runing
|
|
}
|
|
func SetMqttSshMsgMapRuning(name string, runing bool) {
|
|
initMqttSshMsgMap(name)
|
|
MqttSshMsgMap[name].mux.Lock()
|
|
MqttSshMsgMap[name].Runing = runing
|
|
MqttSshMsgMap[name].mux.Unlock()
|
|
}
|
|
func GetMqttSshMsgMapPath(name string) string {
|
|
initMqttSshMsgMap(name)
|
|
return MqttSshMsgMap[name].Path
|
|
}
|
|
func SetMqttSshMsgMapPath(name string, path string) {
|
|
initMqttSshMsgMap(name)
|
|
MqttSshMsgMap[name].mux.Lock()
|
|
MqttSshMsgMap[name].Path = path
|
|
MqttSshMsgMap[name].mux.Unlock()
|
|
}
|
|
func GetMqttSshMsgMapSSHMsg(name string) *ssh.SSHMsg {
|
|
initMqttSshMsgMap(name)
|
|
return &MqttSshMsgMap[name].SSHmsg
|
|
}
|
|
func SetMqttSshMsgMapSSHMsg(name string, sshmsg ssh.SSHMsg) {
|
|
initMqttSshMsgMap(name)
|
|
MqttSshMsgMap[name].mux.Lock()
|
|
MqttSshMsgMap[name].SSHmsg = sshmsg
|
|
MqttSshMsgMap[name].mux.Unlock()
|
|
}
|
|
func GetMqttSshMsgMapIdentify(name string) string {
|
|
initMqttSshMsgMap(name)
|
|
return MqttSshMsgMap[name].Identify
|
|
}
|
|
func SetMqttSshMsgMapIdentify(name string, identify string) {
|
|
initMqttSshMsgMap(name)
|
|
MqttSshMsgMap[name].mux.Lock()
|
|
MqttSshMsgMap[name].Identify = identify
|
|
MqttSshMsgMap[name].mux.Unlock()
|
|
}
|
|
func GetMqttSshMsgMapPubsub(name string) *pubsub.Publisher {
|
|
initMqttSshMsgMap(name)
|
|
return MqttSshMsgMap[name].pubsub
|
|
}
|
|
|
|
// func SetMqttSshMsgUart(id string, data string) {
|
|
// if _, ok := ConnectMap[id]; !ok {
|
|
// ConnectMap[id] = initMqttCache()
|
|
// }
|
|
// ConnectMap[id].GwIndex = append(ConnectMap[id].GwIndex, data)
|
|
// }
|
|
|
|
//----------------------------------------------------------------mqtt
|
|
|
|
func MqttDebug(str string) {
|
|
Push(PushDebug, fmt.Sprint(time.Now().Format("2006-01-02 15:04:05"), "--", str))
|
|
}
|
|
func onConnectionLost(c MQTT.Client, err error) {
|
|
fmt.Println("mqtt connection lost,", err)
|
|
MqttDebug("mqtt connection lost," + err.Error())
|
|
}
|
|
func Mqtt_Subscribe(c MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) {
|
|
for {
|
|
if token := c.Subscribe(topic, 0, callback); token.Wait() && token.Error() != nil {
|
|
fmt.Println("subscribe topic,", topic, token.Error())
|
|
MqttDebug("subscribe topic," + topic + token.Error().Error())
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
func Mqtt_Unsubscribe(c MQTT.Client, topic string) {
|
|
if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
|
|
fmt.Println("unsubscribe topic,", topic, token.Error())
|
|
MqttDebug("unsubscribe topic," + topic + token.Error().Error())
|
|
return
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @Description: 发布订阅
|
|
* @param clientID
|
|
* @param addr
|
|
* @param topic
|
|
* @param payload
|
|
*/
|
|
func Push(topic string, message interface{}) {
|
|
data, _ := json.Marshal(message)
|
|
|
|
token := client.Publish(topic, 0, false, data)
|
|
token.Wait()
|
|
// fmt.Println(topic, "Data Size is", data)
|
|
}
|
|
func Subscribe(topic string, callback MQTT.MessageHandler) {
|
|
Mqtt_Subscribe(client, topic, 0, callback)
|
|
}
|
|
func UnSubscribe(topic string) {
|
|
Mqtt_Unsubscribe(client, topic)
|
|
}
|
|
|
|
// func SubscribeBatch() {
|
|
|
|
// for _, v := range SubList {
|
|
// if !v.Flag {
|
|
// Subscribe(v.Topic, CallbackGW)
|
|
// v.Flag = true
|
|
// }
|
|
// }
|
|
// }
|
|
func CallbackReplay(c MQTT.Client, m MQTT.Message) {
|
|
|
|
}
|
|
|
|
// func CallbackInstr(c MQTT.Client, m MQTT.Message){
|
|
|
|
// }
|
|
func CallbackGW(c MQTT.Client, m MQTT.Message) {
|
|
|
|
}
|
|
func CallbackUart(c MQTT.Client, m MQTT.Message) {
|
|
|
|
}
|
|
|
|
func OnConnection(c MQTT.Client) {
|
|
// Subscribe(SubReplay, CallbackReplay)
|
|
|
|
// SubscribeBatch()
|
|
}
|
|
func MqttSetup() {
|
|
opts := MQTT.NewClientOptions()
|
|
opts.AddBroker("192.168.0.6:1883")
|
|
// opts.AddBroker("39.98.253.192:1885")
|
|
opts.SetUsername("app")
|
|
opts.SetPassword("CrDO5FvEdaIXUMYw")
|
|
opts.SetCleanSession(true)
|
|
opts.SetOnConnectHandler(OnConnection)
|
|
opts.SetConnectionLostHandler(onConnectionLost)
|
|
opts.SetMaxReconnectInterval(5 * time.Second)
|
|
client = MQTT.NewClient(opts)
|
|
for {
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
log.Infof("mqtt connect error,%s", token.Error())
|
|
time.Sleep(2 * time.Second)
|
|
} else {
|
|
log.Infof("mqtt connect success")
|
|
fmt.Println("connect mqtt success")
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func Mqtt_GetReplay(msg MQTT.Message) Message {
|
|
var replay Message
|
|
json.Unmarshal(msg.Payload(), &replay)
|
|
return replay
|
|
}
|
|
|