diff --git a/cmd/mock/main.go b/cmd/mock/mock.go similarity index 76% rename from cmd/mock/main.go rename to cmd/mock/mock.go index aada2a0..1546dac 100644 --- a/cmd/mock/main.go +++ b/cmd/mock/mock.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "log" "math/rand" "net/http" "net/url" @@ -13,7 +14,7 @@ import ( const ( cameraURL = "http://localhost:9090/bar" duration = 5 - desiredStdDev = 0.02 + desiredStdDev = 0.001 ) var data = []float64{9.1, +20.00, 422.97, 124.8209, 984, 89.01, -1.8, .622} @@ -43,21 +44,22 @@ func main() { go device.SerialOut(in, "COM2") rand.Seed(time.Now().UnixNano()) r := rand.Intn(len(data)) - fmt.Println("rand", r) timer := time.NewTimer(time.Second * duration) + timer1 := time.NewTimer(time.Millisecond * time.Duration(duration*1000-500)) raw := data[r] - + total := 0 for { - sample := rand.NormFloat64()*desiredStdDev + raw - mck.Send(sample, in) select { case <-timer.C: - mck.Send(raw/2, in) - time.Sleep(time.Second) - mck.Send(raw/4, in) + mck.Send(data[r]/4, in) + log.Println("total sent", total) return + case <-timer1.C: + //mock truck leaving + raw = data[r] / 2 default: - continue + mck.Send(rand.NormFloat64()*desiredStdDev+raw, in) + total++ } } }() diff --git a/cmd/service/config.toml b/cmd/service/config.toml index 6c5747a..4c7a709 100644 --- a/cmd/service/config.toml +++ b/cmd/service/config.toml @@ -1,6 +1,6 @@ -ScaleSN = "1" #地磅序列号 -Location = "沙场1" #位置信息 +ScaleSN = "地磅1" #地磅序列号 +SiteSN = "沙场1" #位置信息 PortName = "COM1" #串口名称 Baud = 9600 #波特率 TF = 0 #通讯方式 diff --git a/cmd/service/main.go b/cmd/service/main.go index 6bfb02c..4500ffd 100644 --- a/cmd/service/main.go +++ b/cmd/service/main.go @@ -1,9 +1,12 @@ package main import ( + "context" + "fmt" "github.com/BurntSushi/toml" "log" "net/http" + _ "net/http/pprof" "serialdemo/service" "time" ) @@ -25,14 +28,15 @@ func barOpen(w http.ResponseWriter, r *http.Request) { log.Println("truck", v, "checkout", checkout) weightChan := make(chan string) errChan := make(chan error) - stopChan := make(chan struct{}) var msg interface{} + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(conf.Timeout)) + defer cancel() go func() { - read := service.ScaleReader{ + sr := service.ScaleReader{ Config: &conf, } - if err := read.Listen(weightChan, stopChan); err != nil { - log.Println("ERROR", err) + if err := sr.Listen(ctx, weightChan); err != nil { errChan <- err } }() @@ -42,24 +46,34 @@ func barOpen(w http.ResponseWriter, r *http.Request) { Weight: weit, Vehicle: v, ScaleSN: conf.ScaleSN, - Location: conf.Location, + SiteSN: conf.SiteSN, Checkout: checkout, TimeStamp: time.Now().Unix(), } case err := <-errChan: msg = err.Error() log.Println("ERROR", err) - case <-time.After(time.Second * time.Duration(conf.Timeout)): - stopChan <- struct{}{} + case <-ctx.Done(): msg = "ScaleReader timeout" log.Println("ERROR", msg) } if err := service.Post(msg, conf.BackendURL); err != nil { log.Println("ERROR", err) } + //stopChan <- struct{}{} + <-ctx.Done() + msg = "process timeout" + log.Println("ERROR", msg) } func main() { + go func() { + + ip := "0.0.0.0:6060" + if err := http.ListenAndServe(ip, nil); err != nil { + fmt.Printf("start pprof failed on %s\n", ip) + } + }() mux := http.NewServeMux() mux.HandleFunc("/bar", barOpen) url := "0.0.0.0:9090" diff --git a/core/weigh.go b/core/weigh.go new file mode 100644 index 0000000..0002bfb --- /dev/null +++ b/core/weigh.go @@ -0,0 +1,135 @@ +package service + +import ( + "bufio" + "context" + "fmt" + "github.com/tarm/serial" + "log" + "serialdemo/protocal" + "time" +) + +type Config struct { + ScaleSN string + SiteSN string + PortName string + Baud int + TF int + Duration int + Timeout int + Deviation int + BackendURL string +} + +type WeightInfo struct { + Error string + WeightInfoToSign + R []byte + S []byte +} + +type WeightInfoToSign struct { + Weight string + Vehicle string + ScaleSN string + SiteSN string + Checkout bool + TimeStamp int64 +} + +func Weigh(ctx context.Context, conf Config, wCh <-chan protocal.Weight, result chan<- string) error { + + defer func() { + log.Println("s.Close()") + }() + + var weight protocal.Weight + weight = <-wCh + fmt.Println("first weight", weight) + + maxEver, max, min := weight.Value, weight.Value, weight.Value + timer := time.NewTimer(time.Second * time.Duration(conf.Duration)) + var final float64 = 0 + for { + select { + case <-timer.C: + //remember the maximum value during last stable time window + final = max + fmt.Println("set final", final) + case <-ctx.Done(): + log.Println("Listen timeout, will stop. max value ever: ", maxEver) + return nil + case weight = <-wCh: + if weight.Value > max { + max = weight.Value + //fmt.Println("set max=", max) + } else if weight.Value < min { + min = weight.Value + //fmt.Println("set min=", min) + } + //it seems the truck is leaving when weight drops to 1/3 of the max + if final > 0 && weight.Value < final/3 { + theWeight := protocal.Weight{ + Value: final, + Sign: weight.Sign, + Digits: weight.Digits, + } + log.Println("truck is leaving...") + result <- theWeight.String() + log.Println("weigh success:", theWeight.String()) + return nil + } + //fmt.Println("max", max, "min", min) + if final == 0 && max-min > float64(conf.Deviation)/1000 { + d := time.Second * time.Duration(conf.Duration) + timer.Reset(d) + if maxEver < max { + maxEver = max + } + max = weight.Value + min = weight.Value + log.Println("Reset: max", max) + } + default: + + } + } +} + +func ListenWeight(conf Config, wCh chan protocal.Weight, quitCh chan struct{}) error { + c := &serial.Config{Name: conf.PortName, Baud: conf.Baud} + s, err := serial.OpenPort(c) + if err != nil { + return fmt.Errorf("OpenPort failed: %v", err) + } + defer func() { + _ = s.Close() + log.Println("s.Close()") + }() + log.Println("connected to:", c.Name, "TF=", conf.TF) + + reader := bufio.NewReader(s) + cdc := protocal.NewCodec(conf.TF) + for { + select { + case <-quitCh: + //reader.Reset(s) + //log.Println("reader.Reset()") + return nil + default: + raw, err := reader.ReadBytes(cdc.GetDelimit()) + if err != nil { + log.Println("ReadBytes error", err) + //continue + } + decoded, err := cdc.Decode(raw) + if err != nil { + log.Println("Decode error", err) + //continue + } + + wCh <- decoded + } + } +} diff --git a/mock/device/device.go b/mock/device/device.go index ba6ae7b..dc208ea 100644 --- a/mock/device/device.go +++ b/mock/device/device.go @@ -2,6 +2,7 @@ package device import ( "github.com/tarm/serial" + "time" ) const ( @@ -47,5 +48,7 @@ func SerialOut(inputs chan []byte, portName string) { if _, err = s.Write(item); err != nil { panic(err) } + //paud=9600 + time.Sleep(time.Duration(10) * time.Millisecond) } } diff --git a/mock/device/tf0mock.go b/mock/device/tf0mock.go index d3987ee..d49f999 100644 --- a/mock/device/tf0mock.go +++ b/mock/device/tf0mock.go @@ -2,7 +2,6 @@ package device import ( "fmt" - "log" "math" "strconv" "strings" @@ -22,7 +21,8 @@ func (t Tf0Mock) Send(data float64, in chan []byte) { return } in <- item - log.Printf("%s\t -> %x\n", formatted, item) + //log.Printf("%s\t -> %x\n", formatted, item) + fmt.Print(formatted + " ") } func (t Tf0Mock) encode(input string) ([]byte, error) { diff --git a/service/weigh.go b/service/weigh.go deleted file mode 100644 index c0093df..0000000 --- a/service/weigh.go +++ /dev/null @@ -1,125 +0,0 @@ -package service - -import ( - "bufio" - "fmt" - "github.com/tarm/serial" - "log" - "serialdemo/protocal" - "time" -) - -var ( - privateKey, _ = loadPrivateKey("private.key") -) - -type Config struct { - ScaleSN string - Location string - PortName string - Baud int - TF int - Duration int - Timeout int - Deviation int - BackendURL string -} - -type ScaleReader struct { - *Config -} - -type WeightInfo struct { - Error string - WeightInfoToSign - R []byte - S []byte -} - -type WeightInfoToSign struct { - Weight string - Vehicle string - ScaleSN string - Location string - Checkout bool - TimeStamp int64 -} - -func (w *ScaleReader) Listen(wt chan string, stop chan struct{}) error { - c := &serial.Config{Name: w.PortName, Baud: w.Baud} - s, err := serial.OpenPort(c) - if err != nil { - return fmt.Errorf("OpenPort failed: %v", err) - } - defer func() { - if s != nil { - if err = s.Close(); err != nil { - fmt.Println("serial port close failed: ", err) - } - fmt.Println("serial port closed") - } - fmt.Println("defer", "s == nil") - }() - log.Println("connected to:", c.Name, "TF=", w.TF) - - reader := bufio.NewReader(s) - var max, min, final float64 - timer := time.NewTimer(time.Second * time.Duration(w.Duration)) - - readWeight := func() protocal.Weight { - cdc := protocal.NewCodec(w.TF) - raw, err := reader.ReadBytes(cdc.GetDelimit()) - if err != nil { - log.Println("ReadBytes error", err) - } - w, err := cdc.Decode(raw) - if err != nil { - log.Println("Decode error", err) - } - return w - } - weight := readWeight() - max = weight.Value - min = weight.Value - final = 0 - - for { - - //log.Printf("read: %x=>%s", raw, weight.String()) - select { - case <-timer.C: - //after a few seconds of stable time, remember the maximum value ever - final = max - fmt.Println("set final", final) - case <-stop: - log.Println("Listen stopped") - return nil - default: - weight := readWeight() - if weight.Value > max { - max = weight.Value - } else if weight.Value < min { - min = weight.Value - } - //it seems the truck is leaving when weight drops to 1/3 of the max - if final > 0 && weight.Value < final/3 { - theWeight := protocal.Weight{ - Value: final, - Sign: weight.Sign, - Digits: weight.Digits, - } - wt <- theWeight.String() - fmt.Println("weigh success:", theWeight.String()) - return nil - } - //fmt.Println("max", max, "min", min) - if final < 0 && max-min > float64(w.Deviation)/1000 { - d := time.Second * time.Duration(w.Duration) - timer.Reset(d) - max = weight.Value - min = weight.Value - fmt.Println("Reset: max", max) - } - } - } -} diff --git a/service/crypto.go b/utils/crypto.go similarity index 100% rename from service/crypto.go rename to utils/crypto.go diff --git a/service/http.go b/utils/http.go similarity index 100% rename from service/http.go rename to utils/http.go