废话不多说,先来几行代检验下你是否适合本文,如果你发现看不懂建议先去看看简单点的东西。 ok,下面假设这样一个场景,有一家新闻媒体会持续向官方网站输出最新消息,刚好他们的后端提供了一个api可以获取指定分类的最新消息以及该类别预计下次有新消息的时间。我们再假设一下,他们还提供了一个SDK来帮助我们封装好了api: 翻看了一下SDK之后,发现还有这样俩玩意: 现在希望有个这样一个channel,我们可以一直从中读到news,剩下的逻辑交给channel的另一头来搞定。同时我们还希望之歌channel中可以读到多个不同分类的news。下面先来定义一下消费者 现在把代码主体结构搭一下 接下来定义个 那么问题来了,既然主要干活的是loop,那么loop需要做什么呢?首先肯定需要调用Fetch方法来拿消息,然后将拿到的消息放进channel去,然后别忘了响应Close方法调用,即退出循环。来试着写下loop 这一波操作之后,sub结构体就变成了这样: 顺便把Close也可以写出来了: 大部分都完成了,然后需要把上面的坑填一下,首先给SDK来点假数据 SDK里还有个fetcher 还有一个merge 到这就算”完成”了,代码总是有bug的,来找找bug吧 来看看sub的closed跟err字段,它在loop及Close方法中都用到了,而且这俩方法应该是在不同的goroutine。这样就有了并发安全问题。 另外这俩 再一个比较难发现,就是有可能会导致假死的地方,仔细看看这句 我们知道channel都是阻塞的(杠精别拿缓冲区说事,想让这里保证不阻塞,你说缓冲区需要多大?),如果我在调用了Close之后这行代码碰巧执行了,那么实际上它会永远卡在这里。找到这仨问题之后,来借助select一次性搞定这仨bug select是个神奇的东西,它就像站在常年堵车的十字路口的交警,让可以通行的车辆先走(这个比喻起始不太恰当,堵车是互相堵,channel阻塞多半跟其他channel无关)。现在请select出场 相应的sub结构体与Close方法: 到这还有点问题,如果cache里没数据了怎么办,继续改 给for循环里加个局部变量ch,每次循环一开始这个ch就变成了nil,当cache有数据就紧接着给ch赋值。下面只放了有改动的代码 这里有个知识点,对值为nil的channel执行读写操作不会panic,只会block 到这里其实还有可优化的地方,比如这个cache现在理论上可以无限“膨胀”,现在来给它加个限制 没什么好啰嗦的,直接上代码 代码优化总是无止境的,这里也只是相对优化而已。代码中还有一些地方可以精雕细琢,本文主要关注并发这块东西,所以其他的欢迎大家在评论区嗨所欲嗨。
Go并发实践
go f() go f("abc", 123) ch := make(chan int) go func() { c <- 123}() fmt.Println(<-ch)
简单的例子
func FetchNews(kind string) (newsList []News, next time.Time, err error) type News struct { Kind string Title string Text string }
type Fetcher interface { Fetch() (newsList []News, next time.Time, err error) } func Fetch(kind string) Fetcher
type Subscription interface { // 返回一个可以读到news的channel Flow() <-chan News // 用来关闭channel Close() error } // 用来将Fetcher转换成消费者 func Subscribe(fetcher Fetcher) Subscription // 用来合并多个channel的消息 func Merge(subs ...Subscription) Subscription
merged := Merge( Subscribe(Fetch("体育")), Subscribe(Fetch("财经")), Subscribe(Fetch("房产")), ) // 假设只需要保持3秒,然后关闭 time.AfterFunc(time.Second*3, func() { fmt.Println("close msg:", merged.Close()) }) for news := range merged.Flow() { fmt.Println(news.Title) }
sub
结构体来实现一下Subscription
这个接口,同时用它来完成Subscribe
方法type sub struct { fetcher Fetcher flow chan News } // sub的主体逻辑,持续网channel里放数据 func (s *sub) loop() { } // 返回channel func (s *sub) Flow() <-chan News { return s.flow } // 用来关闭channel func (s *sub) Close() error { return nil } // 用来将Fetcher转换成消费者 func Subscribe(fetcher Fetcher) Subscription { s := &sub{ fetcher: fetcher, flow: make(chan News), } go s.loop() return s }
loop怎么搞
// sub的主体逻辑,持续网channel里放数据 func (s *sub) loop() { // 首先大部分时候肯定是死循环读消息了 for { // 既然是死循环,那么肯定要有个地方判断是不是该关闭了,给s加个closed字段 if s.closed { close(s.flow) return } // 用它的fetcher来拿数据 newsList, next, err := s.fetcher.Fetch() if err != nil { // 错误也要存下来,在Close的时候返回,那再给s加个err字段吧 s.err = err time.Sleep(time.Second * 3) continue } // 把拿到的数据都丢进channel for _, news := range newsList { s.flow <- news } // 既然提供了下次有消息的事件,那就在到点之前"睡"一会吧 if now := time.Now(); next.After(now) { time.Sleep(next.Sub(now)) } } }
type sub struct { fetcher Fetcher flow chan News closed bool // 是否已经关了 err error // 存放出现的错误 }
// 用来关闭channel func (s *sub) Close() error { s.closed = true return s.err }
好像少点什么
func FetchNews(kind string) (newsList []News, next time.Time, err error) { newsList = append(newsList, News{ Kind: kind, Title: time.Now().Format("2006-01-02"), Text: time.Now().Format("15:04:05"), }) next = time.Now().Add(time.Second * 5) return }
type Fetcher interface { Fetch() (newsList []News, next time.Time, err error) } type fet struct { kind string } func (f fet) Fetch() (newsList []News, next time.Time, err error) { return FetchNews(f.kind) } func Fetch(kind string) Fetcher { return fet{kind: kind} }
// 用来合并多个channel的消息 func Merge(subs ...Subscription) Subscription { res := &sub{ flow: make(chan News), } for _, s := range subs { go func(s Subscription) { res.flow <- <-s.Flow() }(s) } return res }
bug
if s.closed {} s.closed = true
time.Sleep
也不太妙,假如在他“睡”的时候调用了Close呢,岂不是要等它醒了才能关掉time.Sleep(time.Second * 3) time.Sleep(next.Sub(now))
s.flow <- news
改bug
// sub的主体逻辑,持续网channel里放数据 func (s *sub) loop() { // 将这三个遍历定义在外面复用 var err error var next time.Time var newsList []News // 做个缓冲区,可以让读写独立工作 var cache []News // 首先大部分时候肯定是死循环读消息了 for { // 首次执行的时候,delay为0 // 非首次执行时,计算下次请求数据是什么时候 var delay time.Duration if now := time.Now(); next.After(now) { delay = next.Sub(now) } // 这里可以将之前的`time.Sleep`做成一个通道,到时会从通道读出超时的时刻 chFetch := time.After(delay) select { // 循环控制改成了用channel控制,在Close不调用的情况下,这个写操作会一直阻塞,select就会执行其他case case s.closeCh <- err: // 进这个case说明closeCh可写了,也就是说它的读操作正在发生,也就是说Close被调用了 close(s.flow) return case <-chFetch: // 进这个case说明`time.After`时间到了,该执行下一次fetch了 newsList, next, err = s.fetcher.Fetch() if err != nil { // 原来的3秒后重试,可以调整为3秒后再触发下一次fetch,效果完全一样 next = time.Now().Add(time.Second * 3) break // 需要break掉当前case } // 将数据放进缓冲区,然后就不管后续逻辑了,读写分离 cache = append(cache, newsList...) case s.flow <- cache[0]: // 进这个case说明成功“消费”一个news,把它从cache中踢走,然后开始下一个循环 cache = cache[1:] } } }
type sub struct { fetcher Fetcher flow chan News closeCh chan error // 把error放进channel,跟关闭用同一个 } // 用来关闭channel func (s *sub) Close() error { // 从这里读出error并返回,其他时候是不会从中读数据的,而不读数据就会让写操作阻塞 return <-s.closeCh }
向值为nil的channel发数据
for { // 每次循环ch都是nil var ch chan News // 也定义个news变量,防止select里面空指针 var news News if len(cache) > 0 { // cache有数据时才给ch赋值 ch = s.flow news = cache[0] } select { // ch为空则不会进这个case,ch不为空才会执行逻辑 case ch <- news: // 进这个case说明成功“消费”一个news,把它从cache中踢走,然后开始下一个循环 cache = cache[1:] } }
限制缓冲区大小
// 来个变量定义缓冲区最大值 const cacheSize = 10 chFetch := time.After(delay) // 在这加个逻辑,如果缓冲区满了,就给这个ch置为nil,让下面阻塞,就不会有新数据了 if len(cache) >= cacheSize { chFetch = nil }
结语
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算