読者です 読者をやめる 読者になる 読者になる

Rustで並行処理に便利なバッファーを実装してみた(前編)。

私はffmpegとかalsaとかをいじるのが割と好きで(映像とか音源を編集していると自然とそうなる)、両方ともC言語で実装されているためC言語でまずはユーティリティを作ってみることになるのですが、正直、プログラム言語としてC言語は色々なものを自分で一から実装しなければいけないことが多いので、エンジニアでもなくプログラミングもできない私には少しつらいところがあり、C++を使うことになるのですが、C++はC++で依存するライブラリの管理が大変だったり、モダンな実装ができるけれどレガシーな実装もあってわけがわからなくなることが多々あります。じゃあGolangとかでローリングアップデートでライブラリを最新に保ってブイブイいわせりゃええやんという向きもありますが、どうしてもメモリの管理はある程度自分で把握したい、そんなときRustを使うのがオススメです。

Rustってどんなプログラミング言語なの?という話ですが、それはドキュメント見てください。
www.rust-lang.org

映像や音声を扱うときに、IOデバイスに書き出すデータを一時的に保管するバッファーは必ず使うのですが、IOデバイスを操作するときプログラムを並行に処理できるよう実装したくなることがあります。Rustは並行処理をシンプルに記述できるよう様々な仕組みがあって、この記事ではRustを用いた並行処理に便利に使えるバッファーを実装してみたので紹介します。

まずは構造体の定義から。何種類かバッファーを実装するので、マクロで記述してます。

 47 macro_rules! define_buffer {
 48
 49     ($name:ident, io::$iot:ident) => {
 50
 51         #[allow(dead_code)]
 52         pub struct $name<T: io::$iot> {
 53             inner : AtomicPtr<u8>,
 54             size  : usize,
 55             align : usize,
 56             _type : PhantomData<T>
 57         }
 58     }
 59 }

rustのマクロについては上述ドキュメント参照、PhantomDataについては無視してください。並行処理を意識しているためポインターにはAtomicPtrを使用しています。
doc.rust-lang.org

ドキュメントによるとAtomicPtrはThread-safeなraw pointerとのこと。特徴としてAtomicPtrにはSend traitが実装されていることが挙げられます。traitとはJavaでいうinterfaceみたいなもので、Send traitはmarker traitとして使用されていて、marker traitはJavaでいうアノテーションに該当します。Javaでもアノテーションの記述にinterfaceが使われていることから、データに付帯情報を持たせることにtraitが使われていると考えてください。このSend traitが付いていることがRustの並行処理におけるメモリの管理に影響をもたらします。

次にバッファー内部におけるメモリの割り当てについて。

 61 macro_rules! alloc_atomic_buf {
 62     ($size:expr, $align:expr) => {
 63         {
 64             let ptr = unsafe {
 65                  heap::allocate($size, $align)
 66             };
 67             AtomicPtr::new(ptr)
 68         }
 69     }
 70 }
 71
 72 macro_rules! dealloc_atomic_buf {
 73     ($slf:ident) => {
 74         unsafe {
 75             heap::deallocate($slf.inner.load(Ordering::Relaxed),
 76                 $slf.size(),
 77                 $slf.align());
 78         }
 79     }
 80 }

これも面倒なのでマクロで記述していますが、通常の設定ではlibcのmalloc/freeが呼び出されるはずです。上記のマクロを使って、コンストラクタを実装します。

122 impl<T: io::Read> ReadBuffer<T> {
123
124     pub fn new(size: usize) -> Self {
125         ReadBuffer {
126             inner: alloc_atomic_buf!(size, READBUF_ALIGN),
127             size: size,
128             align: READBUF_ALIGN,
129             _type: PhantomData
130         }
131     }
132
    ...

137 }

次にデストラクタ。

 82 macro_rules! impl_buffer {
 83
    ...
102
103         impl<T: io::$iot> Drop for $name<T> {
104             fn drop(&mut self) {
105                 dealloc_atomic_buf!(self);
106             }
107         }
108     }
109 }

RustではデストラクタをDrop traitを用いて実装するのですが、先ほど述べたSend traitと組み合わせて使うと強烈に威力を発揮します。

例えば、

  1 struct A;
  2
  3 impl A {
  4     fn new() -> Self {
  5         A {}
  6     }
  7 }
  8
  9 impl Drop for A {
 10     fn drop(&mut self) {
 11         println!("destroyed.");
 12     }
 13 }
 14
 15 unsafe impl Send for A {}
 16
 17 fn main() {
 18     //let (tx, rx) = std::sync::mpsc::channel();
 19
 20     let handle = std::thread::spawn(move || {
 21         let a = A::new();
 22         //tx.send(a).unwrap();
 23         println!("sent");
 24     });
 25
 26     //let a = rx.recv().unwrap();
 27     println!("received");
 28
 29     handle.join().unwrap();
 30     println!("main thread is done.");
 31 }

というコードを実行したときに、出力は以下のようになります。

$ cargo run
received
sent
destroyed.
main thread is done.

明らかに、まずサブスレッドの終了時にAのインスタンスが破壊されて、その後メインスレッドが終了しています。
では次に、少し上述のコードを変えてみましょう。

  1 struct A;
  2
  3 impl A {
  4     fn new() -> Self {
  5         A {}
  6     }
  7 }
  8
  9 impl Drop for A {
 10     fn drop(&mut self) {
 11         println!("destroyed.");
 12     }
 13 }
 14
 15 unsafe impl Send for A {}
 16
 17 fn main() {
 18     let (tx, rx) = std::sync::mpsc::channel();
 19
 20     let handle = std::thread::spawn(move || {
 21         let a = A::new();
 22         tx.send(a).unwrap();
 23         println!("sent");
 24     });
 25
 26     let a = rx.recv().unwrap();
 27     println!("received");
 28
 29     handle.join().unwrap();
 30     println!("main thread is done.");
 31 }

メッセージパッシングでサブスレッドからメインスレッドにAのインスタンスを渡すようにしました。これを実行すると以下のように出力します。

$ cargo run
sent
received
main thread is done.
destroyed.

このときサブスレッド終了時にAのインスタンスが破壊されずに、メインスレッドが終了した後に破壊されているようです。

Rust言語にはデータの所有権という概念があって、あるデータがどのスコープに所属するかをコンパイラーが管理する仕組みを指します。詳しくはドキュメントのOwnershipの項目を読んでください。
※Ownershipは別にRust特有の概念ではないです、誤解なきようお願いします。

libatomic(Cで記述)なんかではインラインアセンブラでアトミック演算が実装されていることがあるのですが、いまのコンパイラーは賢いので、どのデータをどこまで生かすかというメモリ管理をコンパイラーが判断してくれます。

上記の実行結果を、所有権にフォーカスしてざっくり整理すると、
1。Aのインスタンス(以降aと呼ぶ)がサブスレッドで生成される(所有権はサブスレッド)
2。aの所有権がデータの受け渡しによりメインスレッドに移る
3。サブスレッド終了(aは破壊されない)
4。メイスレッド終了時にaが破壊される。

Rustではスレッド間のデータの受け渡しができるのは、Send traitが実装されている型のみ(実装されていないとコンパイルすることができない)であるようです。
つまり、コンパイラーがrace conditionになるのを未然に防いでくれるというわけです。

余談ですが、'RustはLifetimeがよくわからない'と言われることがたまにあるのですがそんな時私はEffective C++を読むことを勧めています。Effective C++を読んで、Effective Modern C++を読めば絶対に理解できると思います。なぜC++の話を?とキョトンとされますが、C++にしてもそうですがRustが必要な人にはそれなりの理由があって、そこに該当しないなら別に必要ないかもしれないし、もしそうなら学習にかけるコストが無駄になってしまうので、他のことに時間をつかったほうがいいのかもしれません。

(後編に続く)