STM中的延时操作

May 1, 2013

今天弄一个查英语单词的小工具,给他一堆单词,从QQ词典得到每个单词的意思。我先把所有单词读进一个队列,然后用多个worker从队列里取单词,从QQ词典得到意思,再写到一个输出队列。同时有一个consumer从输出队列取单词意思再写到文件。

consumer必须要等全部worker都结束了才能退出,不然就会错过一些结果。所以我一个TVar变量workerCount记录worker的个数,每个worker结束的时候都会把这个变量减1,而consumer在输出队列为空的时候就检查workerCount,如果为0,那么consumer也可以结束了,因为这时已经没有worker,而且输出队列的结果也处理完了。

我的第一个实现大概是这样的

consumer :: TQueue Word -> TVar Int -> IO ()
consumer output workerCount  = do
    state <- atomically $ do

        mword <- tryReadTQueue output     -- 尝试从输出队列读结果 (1)
        case mword of
          Just w -> return (GetOutput w)  -- 输出队列不为空
          Nothing -> do                   -- 输出队列空         (2)
            count <- readTVar workerCount -- 检查woker个数      (3)
            if count == 0
               then return Finish    -- worker数为0,consumer可以结束
               else return NoOutput  -- worker数不为0,consumer不能结束 (4)

    -- 根据consumer状态循环或退出
    case state of
      NoOutput -> consumer output workerCount
      Finish   -> return ()
      GetOutput word -> do
        print word
        consumer output workerCount timeout action

上面(1)tryReadTQueue是不会阻塞的,如果用readTQueue,会在这里堵塞,后面的检查就没办法做了。因为IO慢,所以大部分时间输出队列都是空的,所以consumer就会不断进行1-2-3-4的循环。所以这里需要延时操作(sleep),来减少这样无用的循环。

但是在STM里是不能执行IO操作的,所以不能直接System.Timeout.timeoutthreadDelay。不过我们可以曲线救国,用STM的orElse来实现。

consumer :: TQueue Word -> TVar Int -> TMVar (Maybe Word) > IO ()
consumer output workerCount sleep = do
    state <- atomically $ do
        -- use timeout to avoid busy loop
        mword <- (Just <$> readTQueue output) `orElse` takeTMVar sleep
        case mword of
        ...

consumer里只要改读队列的部分,readTQueue阻塞的时候会执行retry, orElse就会候执行takeTMVar。接着就是在另一个线程里定时向sleep里写Nothing,让takeTMVar可以定时返回:

wakeup :: TMVar (Maybe a) -> IO b
wakeup sleep = forever $ do
    threadDelay (10^(6::Int))
    atomically $ putTMVar sleep Nothing

这方法虽然不好扩展,但是处理1000个单词的时候,user时间从57秒多减少到4秒。