今天弄一个查英语单词的小工具,给他一堆单词,从QQ词典得到每个单词的意思。我先把所有单词读进一个队列,然后用多个worker
从队列里取单词,从QQ词典得到意思,再写到一个输出队列。同时有一个consumer
从输出队列取单词意思再写到文件。
consumer
必须要等全部worker
都结束了才能退出,不然就会错过一些结果。所以我一个TVar变量workerCount
记录worker的个数,每个worker
结束的时候都会把这个变量减1,而consumer
在输出队列为空的时候就检查workerCount
,如果为0,那么consumer
也可以结束了,因为这时已经没有worker
,而且输出队列的结果也处理完了。
我的第一个实现大概是这样的
consumer :: TQueue Word -> TVar Int -> IO ()
consumer output workerCount = do
state <- atomically $ do
mword <- tryReadTQueue output -- 尝试从输出队列读结果 (1)
case mword of
Just w -> return (GetOutput w) -- 输出队列不为空
Nothing -> do -- 输出队列空 (2)
count <- readTVar workerCount -- 检查woker个数 (3)
if count == 0
then return Finish -- worker数为0,consumer可以结束
else return NoOutput -- worker数不为0,consumer不能结束 (4)
-- 根据consumer状态循环或退出
case state of
NoOutput -> consumer output workerCount
Finish -> return ()
GetOutput word -> do
print word
consumer output workerCount timeout action
上面(1)
的tryReadTQueue
是不会阻塞的,如果用readTQueue
,会在这里堵塞,后面的检查就没办法做了。因为IO慢,所以大部分时间输出队列都是空的,所以consumer
就会不断进行1-2-3-4
的循环。所以这里需要延时操作(sleep
),来减少这样无用的循环。
但是在STM里是不能执行IO操作的,所以不能直接System.Timeout.timeout
和threadDelay
。不过我们可以曲线救国,用STM的orElse
来实现。
consumer :: TQueue Word -> TVar Int -> TMVar (Maybe Word) > IO ()
consumer output workerCount sleep = do
state <- atomically $ do
-- use timeout to avoid busy loop
mword <- (Just <$> readTQueue output) `orElse` takeTMVar sleep
case mword of
...
consumer里只要改读队列的部分,readTQueue
阻塞的时候会执行retry
, orElse
就会候执行takeTMVar
。接着就是在另一个线程里定时向sleep
里写Nothing,让takeTMVar
可以定时返回:
wakeup :: TMVar (Maybe a) -> IO b
wakeup sleep = forever $ do
threadDelay (10^(6::Int))
atomically $ putTMVar sleep Nothing
这方法虽然不好扩展,但是处理1000个单词的时候,user时间从57秒多减少到4秒。