Commit 1e4a77d4 authored by Peter Cheng's avatar Peter Cheng

1.修正Error觸發後動作

2.修正工作佇列排程內容
parent 5e87fee0
package controllers package controllers
import ( import (
"fmt"
"log" "log"
"github.com/bitly/go-nsq" "github.com/bitly/go-nsq"
...@@ -18,7 +17,7 @@ func postMessage(channel string, message string) { ...@@ -18,7 +17,7 @@ func postMessage(channel string, message string) {
w, _ := nsq.NewProducer(cfg.Message.Post.Address, config) w, _ := nsq.NewProducer(cfg.Message.Post.Address, config)
err := w.Publish(channel, []byte(message)) err := w.Publish(channel, []byte(message))
if err != nil { if err != nil {
fmt.Println(err) log.Panicln(err)
} }
w.Stop() w.Stop()
} }
...@@ -27,6 +26,6 @@ func postMessage(channel string, message string) { ...@@ -27,6 +26,6 @@ func postMessage(channel string, message string) {
func getParams(c *gin.Context, params interface{}) { func getParams(c *gin.Context, params interface{}) {
err := c.BindJSON(params) err := c.BindJSON(params)
if err != nil { if err != nil {
log.Println(err) log.Panicln(err)
} }
} }
package env package env
import "time"
//Config 系統參數 //Config 系統參數
type Config struct { type Config struct {
Env string Env string
...@@ -17,6 +19,16 @@ type Config struct { ...@@ -17,6 +19,16 @@ type Config struct {
Housekeeper struct { Housekeeper struct {
Host string Host string
} }
Works struct {
Worker []*Worker
}
}
//Worker 工作佇列
type Worker struct {
Topic string
Channel string
Interval time.Duration
} }
var cfg = &Config{} var cfg = &Config{}
...@@ -29,6 +41,10 @@ func init() { ...@@ -29,6 +41,10 @@ func init() {
cfg.Message.Topic = "Mail" cfg.Message.Topic = "Mail"
cfg.Message.Channel = "SendMail" cfg.Message.Channel = "SendMail"
cfg.Housekeeper.Host = "http://127.0.0.1:8806" cfg.Housekeeper.Host = "http://127.0.0.1:8806"
workers := []*Worker{
&Worker{Topic: "Mail", Channel: "SendMail", Interval: time.Second * 2},
&Worker{Topic: "PunchClock", Channel: "UploadDailyPunchclockData", Interval: time.Second * 10}}
cfg.Works.Worker = workers
} }
//GetEnv 取得環境參數 //GetEnv 取得環境參數
......
...@@ -44,24 +44,22 @@ func messageService() { ...@@ -44,24 +44,22 @@ func messageService() {
//InitConsumer 初始化消費者 //InitConsumer 初始化消費者
func InitConsumer(topic string, channel string, host string) bool { func InitConsumer(topic string, channel string, host string) bool {
upStream := make(chan int, 1) upStream := make(chan int, 1)
go func() { for _, worker := range cfg.Works.Worker {
interval := time.Second * 10 go func(worker *env.Worker) {
mQuery(host, "PunchClock", "UploadDailyPunchclockData", interval) mQuery(host, worker.Topic, worker.Channel, worker.Interval)
}() }(worker)
go func() { }
interval := time.Second * 2
mQuery(host, "Mail", "SendMail", interval)
}()
<-upStream <-upStream
return true return true
} }
//mQuery 啟用單一消費者
func mQuery(host string, topic string, channel string, interval time.Duration) *nsq.Consumer { func mQuery(host string, topic string, channel string, interval time.Duration) *nsq.Consumer {
config := nsq.NewConfig() config := nsq.NewConfig()
config.LookupdPollInterval = interval config.LookupdPollInterval = interval
query, err := nsq.NewConsumer(topic, channel, config) query, err := nsq.NewConsumer(topic, channel, config)
if err != nil { if err != nil {
log.Panic(err) log.Panicln(err)
} }
query.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { query.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
chat := string(message.Body) chat := string(message.Body)
...@@ -71,16 +69,18 @@ func mQuery(host string, topic string, channel string, interval time.Duration) * ...@@ -71,16 +69,18 @@ func mQuery(host string, topic string, channel string, interval time.Duration) *
})) }))
err = query.ConnectToNSQLookupd(host) err = query.ConnectToNSQLookupd(host)
if err != nil { if err != nil {
panic(err) log.Panicln(err)
} }
return query return query
} }
//getQuete 取得工作內容
func getQuete(message string) []string { func getQuete(message string) []string {
quete := strings.Split(message, "</UseService>") quete := strings.Split(message, "</UseService>")
return quete return quete
} }
//runServices 運行系統服務
func runServices(quete []string) { func runServices(quete []string) {
path := quete[0] path := quete[0]
params := []byte(quete[1]) params := []byte(quete[1])
...@@ -88,6 +88,7 @@ func runServices(quete []string) { ...@@ -88,6 +88,7 @@ func runServices(quete []string) {
postURL(url, params) postURL(url, params)
} }
//postURL 將資料打到REST API
func postURL(url string, params []byte) { func postURL(url string, params []byte) {
request, err := http.NewRequest("POST", url, bytes.NewBuffer(params)) request, err := http.NewRequest("POST", url, bytes.NewBuffer(params))
request.Header.Set("X-Custom-Header", "counter") request.Header.Set("X-Custom-Header", "counter")
...@@ -95,7 +96,7 @@ func postURL(url string, params []byte) { ...@@ -95,7 +96,7 @@ func postURL(url string, params []byte) {
client := &http.Client{} client := &http.Client{}
resp, err := client.Do(request) resp, err := client.Do(request)
if err != nil { if err != nil {
log.Panic(err) log.Panicln(err)
} }
defer resp.Body.Close() defer resp.Body.Close()
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment