Ruby - 多线程


传统程序具有单个执行线程,组成程序的语句或指令按顺序执行,直到程序终止。

多线程程序有多个执行线程。例如,在每个线程内,语句按顺序执行,但线程本身可以在多核 CPU 上并行执行。通常在单CPU机器上,多个线程实际上并不是并行执行的,而是通过线程的交错执行来模拟并行性。

Ruby 使用Thread类可以轻松编写多线程程序。Ruby 线程是在代码中实现并发的轻量级且高效的方法。

创建 Ruby 线程

要启动一个新线程,只需将一个块与对Thread.new 的调用关联起来。将创建一个新线程来执行块中的代码,原始线程将立即从Thread.new返回并继续执行下一条语句 -

# Thread #1 is running here
Thread.new {
   # Thread #2 runs this code
}
# Thread #1 runs this code

例子

下面是一个示例,它展示了如何使用多线程 Ruby 程序。

#!/usr/bin/ruby

def func1
   i = 0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i = i+1
   end
end

def func2
   j = 0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j = j+1
   end
end

puts "Started At #{Time.now}"
t1 = Thread.new{func1()}
t2 = Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

这将产生以下结果 -

Started At Wed May 14 08:21:54 -0700 2008
func1 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:54 -0700 2008
func2 at: Wed May 14 08:21:55 -0700 2008
func1 at: Wed May 14 08:21:56 -0700 2008
func2 at: Wed May 14 08:21:56 -0700 2008
func1 at: Wed May 14 08:21:58 -0700 2008
End at Wed May 14 08:22:00 -0700 2008

线程生命周期

使用Thread.new创建新线程。您还可以使用同义词Thread.startThread.fork

创建线程后无需启动线程,当 CPU 资源可用时它会自动开始运行。

Thread 类定义了许多在线程运行时查询和操作线程的方法。线程运行与Thread.new调用关联的块中的代码,然后停止运行。

该块中最后一个表达式的值是线程的值,可以通过调用Thread对象的value方法获得。如果线程已运行完成,则该值立即返回线程的值。否则,value方法会阻塞并且在线程完成之前不会返回。

类方法Thread.current返回表示当前线程的 Thread 对象。这允许线程自行操作。类方法Thread.main返回表示主线程的 Thread 对象。这是 Ruby 程序启动时开始的初始执行线程。

您可以通过调用特定线程的Thread.join方法来等待该线程完成。调用线程将阻塞,直到给定线程完成。

线程和异常

如果在主线程中引发异常并且未在任何地方进行处理,则 Ruby 解释器将打印一条消息并退出。在线程中,除了主线程之外,未处理的异常会导致线程停止运行。

如果线程t由于未处理的异常而退出,并且另一个线程s调用t.join 或 t.value,则t中发生的异常将在线程s中引发。

如果Thread.abort_on_exceptionfalse(默认情况),则未处理的异常只会终止当前线程,其余线程继续运行。

如果您希望任何线程中的任何未处理的异常导致解释器退出,请将类方法Thread.abort_on_exception设置为true

t = Thread.new { ... }
t.abort_on_exception = true

线程变量

线程通常可以访问创建线程时范围内的任何变量。线程块的本地变量对于线程来说是本地的,并且不被共享。

Thread 类具有一个特殊的功能,允许通过名称创建和访问线程局部变量。您只需将线程对象视为哈希,使用 []= 写入元素并使用 [] 读回它们。

在此示例中,每个线程使用键mycount将变量 count 的当前值记录在线程局部变量中。

#!/usr/bin/ruby

count = 0
arr = []

10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end

arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

这会产生以下结果 -

8, 0, 3, 7, 2, 1, 6, 5, 4, 9, count = 10

主线程等待子线程完成,然后打印出每个子线程捕获的数值。

线程优先级

影响线程调度的第一个因素是线程优先级:高优先级线程先于低优先级线程调度。更准确地说,只有在没有更高优先级的线程等待运行时,线程才会获得 CPU 时间。

您可以使用priority =priority设置和查询Ruby Thread 对象的优先级。新创建的线程以与创建它的线程相同的优先级启动。主线程从优先级 0 开始。

在线程开始运行之前,无法设置其优先级。然而,线程可以将提高或降低其自身的优先级作为其采取的第一个操作。

线程排除

如果两个线程共享对相同数据的访问,并且至少有一个线程修改了该数据,则必须特别小心,以确保没有线程能够看到处于不一致状态的数据。这称为线程排除

Mutex是一个实现简单信号量锁的类,用于对某些共享资源进行互斥访问。也就是说,在给定时间只有一个线程可以持有锁。其他线程可以选择排队等待锁变得可用,或者可以简单地选择立即获取指示锁不可用的错误。

通过将对共享数据的所有访问置于互斥体的控制下,我们确保了一致性和Atomics操作。让我们尝试举个例子,第一个没有 mutax,第二个有 mutax -

没有 Mutax 的示例

#!/usr/bin/ruby
require 'thread'

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下结果 -

count1 :  1583766
count2 :  1583766
difference : 0
#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
   end
end
spy = Thread.new do
   loop do
      mutex.synchronize do
         difference += (count1 - count2).abs
      end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

这将产生以下结果 -

count1 :  696591
count2 :  696591
difference : 0

处理死锁

当我们开始使用互斥对象进行线程排除时,我们必须小心避免死锁。死锁是当所有线程都等待获取另一个线程持有的资源时发生的情况。因为所有线程都被阻塞,所以它们无法释放它们所持有的锁。由于它们无法释放锁,因此其他线程无法获取这些锁。

这就是条件变量发挥作用的地方。条件变量只是与资源关联的信号量,并在特定互斥的保护范围内使用。当您需要不可用的资源时,您需要等待条件变量。该操作释放相应互斥锁上的锁。当其他线程发出资源可用的信号时,原始线程结束等待,同时重新获得关键区域的锁定。

例子

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new

cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}

puts "(Later, back at the ranch...)"

b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

这将产生以下结果 -

A: I have critical section, but will wait for cv
(Later, back at the ranch...)
B: Now I am critical, but am done with cv
B: I am still critical, finishing up
A: I have critical section again! I rule!

线程状态

有五种可能的返回值,对应五种可能的状态,如下表所示。status方法返回线程的状态。

线程状态 返回值
可运行 跑步
睡眠 睡眠
正在中止 中止
正常终止 错误的
异常终止

线程类方法

Thread类提供了以下方法,它们适用于程序中所有可用的线程。这些方法将使用Thread类名来调用,如下所示 -

Thread.abort_on_exception = true

线程实例方法

这些方法适用于线程的实例。这些方法将作为使用线程实例来调用,如下所示 -

#!/usr/bin/ruby

thr = Thread.new do   # Calling a class method new
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # Calling an instance method join