Rustで並行処理に便利なバッファーを実装してみた(後編)。
先日書いた以下の記事の続きです。
Rustで並行処理に便利なバッファーを実装してみた(前編)。 - Ri for (Real Estate|Residence) Influencers
Drop traitとSend traitの組み合わせで、並行処理におけるバッファーのメモリの管理がとても簡単にできるというところまで紹介しましたので、次はバッファーにおけるメソッドをいくつか実装してみようと思います。まずはloadメソッドを紹介します。このメソッドはバッファーの中身のリファレンスを返します。実装は以下のとおりです。
99 macro_rules! impl_buffer { 100 101 ($name:ident, io::$iot:ident) => { 102 103 #[allow(dead_code)] 104 impl<T: io::$iot> $name<T> { 105 .... 113 114 pub unsafe fn load(&mut self) -> &mut [u8] { 115 let ptr = self.inner.load(Ordering::Relaxed); 116 slice::from_raw_parts_mut(ptr, self.size()) 117 } 118 }
実はこの実装を見て、loadメソッドで返すreferenceが何故mutableになっているかが自分でもよく思い出せないのですが(直すの面倒なので今のところ放置..)、ポインターではなくリファレンスを返していることには理由があります。それに言及するにはRust言語の特徴のひとつである、'Lifetime'について述べる必要がありそうです。
Lifetimeを理解するにはまず、(Raw)ポインターとリファレンスがどうちがうかを理解する必要があります。メモリデータの所有権にかかわる話なのですが、説明するのが少し億劫なので以下リンクを参照してください。
https://doc.rust-lang.org/beta/book/references-and-borrowing.html
まあLifetimeをひとことで言うと、あるメモリのリファレンスがどのブロックの中で使えるかを管理するための機能です。例えば、上述のload関数は暗黙的にlifetimeが付加されて、呼び出されたブロックの中でしか返り値となるリファレンスを使うことができません。このリファレンスが(Raw)ポインターだったと仮定してみましょう。(Raw)ポインターとはすなわちc言語でのポインターを指すので、OwnershipとかBorrowingとか関係ないので、呼び出したブロックから外れてもデータを参照することができてしまうことがあります。うっかり変なグローバル変数にポインターを格納してしまった場合、そのメモリが開放されてしまった後もそのデータにアクセスしてしまうことになり、もしその変数が外部から操作できるものであったとしたらプログラムにおける重大な脆弱性となる危険が生まれてしまいます。私はあまり自分のコーディング力を信用していないので、そのチェックをコンパイラーに投げてしまえるのは実はとてもありがたいと思っています。
次にInput用とOutput用のバッファを実装したので、read関数とwrite関数をそれぞれ実装しています。まず以下にてread関数について紹介します。
141 impl<T: io::Read> ReadBuffer<T> { 142 ... 151 152 pub fn read(&mut self, input: &mut T) -> io::Result<usize> { 153 let buf = unsafe { self.load() }; 154 input.read(buf) 155 } 156 }
Read traitについては以下を参照してください。
std::io::Read - Rust
色々な用途に使えるように、Read traitのメソッドを実装したものであればソースから読み込むことができるようにジェネリッククラスにしています。想定されるストリームとしてはファイルやソケット、あとはAlsaなんかだとCapture用途にセットされたPCMデバイスとかのインプットでしょうか。
次にOutput用途のwrite関数です。
160 impl<T: io::Write> WriteBuffer<T> { 161 .... 180 181 fn write(&mut self, output: &mut T) -> io::Result<usize> { 182 let buf = unsafe { self.load() }; 183 output.write(buf) 184 } 185 }
Write traitについてもRead traitと同様です。詳細は以下のリンクを参照してください。
std::io::Write - Rust
ちなみに両方の関数で返り値の型として定義されているResult型は、HaskellでいうEitherモナドみたいな機能を持っています。処理の失敗時のハンドリングができる便利な機能です。まあ、一般的な関数型プログラミングのような記述ができる言語は大概このような機能をもっているので、説明は割愛します。
最後に簡単なテストを書いたので載せておきます。
232 #[cfg(test)] 233 mod test { 234 235 use super::*; 236 use std::fs::File; 237 use std::io::Read; 238 use std::process::Command; 239 use std::str::from_utf8; 240 use std::thread; 241 use std::time::Duration; 242 243 const RIFF : &'static str = "RIFF"; 244 const FILEPATH : &'static str = "/usr/share/sounds/k3b_error1.wav"; 245 const BUFSIZE : usize = 4; 246 const BUFALIGN : usize = 1; 247 const OUTPUT_FILE : &'static str = "out"; 248 249 #[test] 250 fn buffer_test() { 251 252 let mut f = File::open(FILEPATH).unwrap(); 253 let mut rbuf = ReadBuffer::<File>::new(BUFSIZE); 254 255 rbuf.read(&mut f).unwrap(); 256 257 let slice = unsafe { 258 rbuf.load() 259 }; 260 261 let riff = from_utf8(slice) 262 .unwrap(); 263 264 assert_eq!(RIFF, riff); 265 266 let mut wbuf = WriteBuffer::<File>::new(slice, 267 BUFALIGN); 268 assert_eq!(4usize, wbuf.size()); 269 270 let mut out = File::create(OUTPUT_FILE).unwrap(); 271 wbuf.write(&mut out).unwrap(); 272 273 let mut ifstream = File::open(OUTPUT_FILE).unwrap(); 274 let mut st = String::new(); 275 ifstream.read_to_string(&mut st).unwrap(); 276 assert_eq!(RIFF, st.as_str()); 277 278 Command::new("rm").arg(OUTPUT_FILE) 279 .output() 280 .unwrap(); 281 } ... 312 }
とりあえずWAVファイルのRIFFヘッダだけ読み込んで読み書きがちゃんとできているかをチェックしています。Rustのユニットテストの書き方は以下を参照してください。
Testing
あと、ここで実装したバッファを使ったNonBlockingIOをいくつか実装したので、気が向いたらブログに書きます。
Rustで並行処理に便利なバッファーを実装してみた(前編)。
私はffmpegとかalsaとかをいじるのが割と好きで(映像とか音源を編集していると自然とそうなる)、両方ともC言語で実装されているためC言語でまずはユーティリティを作ってみることになるのですが、正直、プログラム言語としてC言語は色々なものを自分で一から実装しなければいけないことが多いので、エンジニアでもなくプログラミングもできない私には少しつらいところがあり、C++を使うことになるのですが、C++はC++で依存するライブラリの管理が大変だったり、モダンな実装ができるけれどレガシーな実装もあってわけがわからなくなることが多々あります。じゃあGolangとかでローリングアップデートでライブラリを最新に保ってブイブイいわせりゃええやんという向きもありますが、どうしてもメモリの管理はある程度自分で把握したい、そんなときRustを使うのがオススメです。
Rustってどんなプログラミング言語なの?という話ですが、それはドキュメント見てください。
www.rust-lang.org
映像や音声を扱うときに、IOデバイスに書き出すデータを一時的に保管するバッファーは必ず使うのですが、IOデバイスを操作するときプログラムを並行に処理できるよう実装したくなることがあります。Rustは並行処理をシンプルに記述できるよう様々な仕組みがあって、この記事ではRustを用いた並行処理に便利に使えるバッファーを実装してみたので紹介します。
まずは構造体の定義から。何種類かバッファーを実装するので、マクロで記述してます。
47 macro_rules! define_buffer { 48 49 ($name:ident, io::$iot:ident) => { 50 51 #[allow(dead_code)] 52 pub struct $name<T: io::$iot> { 53 inner : AtomicPtr<u8>, 54 size : usize, 55 align : usize, 56 _type : PhantomData<T> 57 } 58 } 59 }
rustのマクロについては上述ドキュメント参照、PhantomDataについては無視してください。並行処理を意識しているためポインターにはAtomicPtrを使用しています。
doc.rust-lang.org
ドキュメントによるとAtomicPtrはThread-safeなraw pointerとのこと。特徴としてAtomicPtrにはSend traitが実装されていることが挙げられます。traitとはJavaでいうinterfaceみたいなもので、Send traitはmarker traitとして使用されていて、marker traitはJavaでいうアノテーションに該当します。Javaでもアノテーションの記述にinterfaceが使われていることから、データに付帯情報を持たせることにtraitが使われていると考えてください。このSend traitが付いていることがRustの並行処理におけるメモリの管理に影響をもたらします。
次にバッファー内部におけるメモリの割り当てについて。
61 macro_rules! alloc_atomic_buf { 62 ($size:expr, $align:expr) => { 63 { 64 let ptr = unsafe { 65 heap::allocate($size, $align) 66 }; 67 AtomicPtr::new(ptr) 68 } 69 } 70 } 71 72 macro_rules! dealloc_atomic_buf { 73 ($slf:ident) => { 74 unsafe { 75 heap::deallocate($slf.inner.load(Ordering::Relaxed), 76 $slf.size(), 77 $slf.align()); 78 } 79 } 80 }
これも面倒なのでマクロで記述していますが、通常の設定ではlibcのmalloc/freeが呼び出されるはずです。上記のマクロを使って、コンストラクタを実装します。
122 impl<T: io::Read> ReadBuffer<T> { 123 124 pub fn new(size: usize) -> Self { 125 ReadBuffer { 126 inner: alloc_atomic_buf!(size, READBUF_ALIGN), 127 size: size, 128 align: READBUF_ALIGN, 129 _type: PhantomData 130 } 131 } 132 ... 137 }
次にデストラクタ。
82 macro_rules! impl_buffer { 83 ... 102 103 impl<T: io::$iot> Drop for $name<T> { 104 fn drop(&mut self) { 105 dealloc_atomic_buf!(self); 106 } 107 } 108 } 109 }
RustではデストラクタをDrop traitを用いて実装するのですが、先ほど述べたSend traitと組み合わせて使うと強烈に威力を発揮します。
例えば、
1 struct A; 2 3 impl A { 4 fn new() -> Self { 5 A {} 6 } 7 } 8 9 impl Drop for A { 10 fn drop(&mut self) { 11 println!("destroyed."); 12 } 13 } 14 15 unsafe impl Send for A {} 16 17 fn main() { 18 //let (tx, rx) = std::sync::mpsc::channel(); 19 20 let handle = std::thread::spawn(move || { 21 let a = A::new(); 22 //tx.send(a).unwrap(); 23 println!("sent"); 24 }); 25 26 //let a = rx.recv().unwrap(); 27 println!("received"); 28 29 handle.join().unwrap(); 30 println!("main thread is done."); 31 }
というコードを実行したときに、出力は以下のようになります。
$ cargo run received sent destroyed. main thread is done.
明らかに、まずサブスレッドの終了時にAのインスタンスが破壊されて、その後メインスレッドが終了しています。
では次に、少し上述のコードを変えてみましょう。
1 struct A; 2 3 impl A { 4 fn new() -> Self { 5 A {} 6 } 7 } 8 9 impl Drop for A { 10 fn drop(&mut self) { 11 println!("destroyed."); 12 } 13 } 14 15 unsafe impl Send for A {} 16 17 fn main() { 18 let (tx, rx) = std::sync::mpsc::channel(); 19 20 let handle = std::thread::spawn(move || { 21 let a = A::new(); 22 tx.send(a).unwrap(); 23 println!("sent"); 24 }); 25 26 let a = rx.recv().unwrap(); 27 println!("received"); 28 29 handle.join().unwrap(); 30 println!("main thread is done."); 31 }
メッセージパッシングでサブスレッドからメインスレッドにAのインスタンスを渡すようにしました。これを実行すると以下のように出力します。
$ cargo run sent received main thread is done. destroyed.
このときサブスレッド終了時にAのインスタンスが破壊されずに、メインスレッドが終了した後に破壊されているようです。
Rust言語にはデータの所有権という概念があって、あるデータがどのスコープに所属するかをコンパイラーが管理する仕組みを指します。詳しくはドキュメントのOwnershipの項目を読んでください。
※Ownershipは別にRust特有の概念ではないです、誤解なきようお願いします。
libatomic(Cで記述)なんかではインラインアセンブラでアトミック演算が実装されていることがあるのですが、いまのコンパイラーは賢いので、どのデータをどこまで生かすかというメモリ管理をコンパイラーが判断してくれます。
上記の実行結果を、所有権にフォーカスしてざっくり整理すると、
1。Aのインスタンス(以降aと呼ぶ)がサブスレッドで生成される(所有権はサブスレッド)
2。aの所有権がデータの受け渡しによりメインスレッドに移る
3。サブスレッド終了(aは破壊されない)
4。メイスレッド終了時にaが破壊される。
Rustではスレッド間のデータの受け渡しができるのは、Send traitが実装されている型のみ(実装されていないとコンパイルすることができない)であるようです。
つまり、コンパイラーがrace conditionになるのを未然に防いでくれるというわけです。
余談ですが、'RustはLifetimeがよくわからない'と言われることがたまにあるのですがそんな時私はEffective C++を読むことを勧めています。Effective C++を読んで、Effective Modern C++を読めば絶対に理解できると思います。なぜC++の話を?とキョトンとされますが、C++にしてもそうですがRustが必要な人にはそれなりの理由があって、そこに該当しないなら別に必要ないかもしれないし、もしそうなら学習にかけるコストが無駄になってしまうので、他のことに時間をつかったほうがいいのかもしれません。
(後編に続く)
Spotifyの無料プランのレコメンドについて。
私は音楽のいわゆるライトユーザーで、ものすごくたくさん音源を買い込んだり、自分で演奏したりアレンジしたり、みたいなことはやらないんですが、昨今のストリーミングサービスの盛り上がりにより作業中のBGMには事欠かない。最近ようやくSpotifyが上陸してApple Musicからの有料プランの移行を計画しています。海外からなら使えたので何度も使用したことがあり、他のサービスよりも音楽への愛を感じるからです。具体的には、音楽を聴いてて何が楽しいかって自分の知らないかっこいいサウンドがたくさんあって、その未知との出会いが楽しいんだけど、昔はレコード屋でジャケ買いしたり、クラブで聞いたかっこいい音楽を探したりしたものだが、今はSpotifyみたいなストリーミングサービスが自分のライブラリから学習してガンガン新しい曲やアーティストを紹介してくれます。Apple Musicはそれが弱い。AWAとかLine Musicは残念だけど曲数少なすぎて話にならなかったです。
で、少し無料版でアカウントを育ててから、有料版に切り替えようと計画していたのですが、ひとつ問題がありました。洋楽はデータが豊富なせいかレコメンドの精度が高いんですが、邦楽がもうどうしようもない。Spotifyのスマホの無料版は曲を指定して聴くことができなくて、曲をお気に入り登録してそれをもとにレコメンドされた曲が流れる仕組み。電気グルーヴのMAD EBISを登録してまあ近い曲が流れるだろうと思っていたのが大間違いで、なんか変なアイドルの歌が延々と流れ出して早送りもSKIPもできずに最悪な気分で仕事をするはめになりました。(no way, MAD EBISからアイドルの歌は落差が結構でかいのです。笑)
”ゼロからつくるDeep Learning"を読むことにした。
近い将来、機械学習を使ったレコメンドが必要になりそうでいま流行りのDeep Learningを勉強することにしました。ソースを読んでもあまり頭に入ってこないたちなのでTensorFlowやChainerなどのフレームワークを使う前に、急がば回れということでオライリー社の「ゼロからつくるDeep Learning」を購入し、昨日届きました。
O'Reilly Japan - ゼロから作るDeep Learning
できるだけ並列処理を意識したいのとHaskellの学習を兼ねて、サンプルコードをHaskellで書き直すことにしました。
環境は以下。
$ uname -a Linux march 4.7.6-1-ARCH #1 SMP PREEMPT Fri Sep 30 19:28:42 CEST 2016 x86_64 GNU/Linux $ gcc --version gcc (GCC) 6.2.1 20160830 $ ghc --version The Glorious Glasgow Haskell Compilation System, version 8.0.1
次に線形代数とかのライブラリがないと間違いなくきついのでNumpyの代替となるHaskellライブラリとして、hmatrixをまず使うことにしました。そこで以下にてインストール。
$ yaourt -S blas lapack $ cabal install bmatrix
詳しいインストールの方法は以下を参照してください。(ウィンドウズとか)
hmatrix/INSTALL.md at master · albertoruiz/hmatrix · GitHub
hmatrixの使い方は以下を参照してください。
Numeric.LinearAlgebra
ではまず、pythonのサンプルコードより。
import numpy as np def AND(x1, x2): x = np.array([x1, x2]) w = np.array([0.5, 0.5]) b = -0.7 tmp = np.sum(w*x) + b if tmp <= 0: return 0 else: return 1 def OR(x1, x2): x = np.array([x1, x2]) w = np.array([0.5, 0.5]) b = -0.2 tmp = np.sum(w*x) + b if tmp <= 0: return 0 else: return 1 def NAND(x1, x2): x = np.array([x1, x2]) w = np.array([-0.5, -0.5]) b = 0.7 tmp = np.sum(w*x) + b if tmp <= 0: return 0 else: return 1
deep-learning-from-scratch/ch02 at master · oreilly-japan/deep-learning-from-scratch · GitHub
から抜粋。
論理回路をパーセプトロンアルゴリズムで書いたシンプルなコードです。
ただ、重みとバイアス以外すべて同じコードというところには何か悪意のようなものを感じたのは多分気のせいだと思うことにする。
パーセプトロンとは:
視覚と脳の機能をモデル化したものであり、パターン認識を行う。シンプルなネットワークでありながら学習能力を持つ。1960年代に爆発的なニューラルネットブームを巻き起こしたが、1969年に人工知能学者マービン・ミンスキーらによって線形分離可能なものしか学習できないことが指摘されたことによって下火となった。他の研究者によってさまざまな変種が考案されており、ニューロン階層を多層化し入出力が二値から実数になったボルツマンマシン(1985年)やバックプロパゲーション(1986年)などによって再び注目を集めた。2009年現在でも広く使われている機械学習アルゴリズムの基礎となっている。
(Wikipediaより抜粋)
だそうです。
上のpythonのコードからすこし悪意を取り除いてHaskellで書き直してみました。
import Numeric.LinearAlgebra op::(Double, Double) -> Double -> Double -> Double -> Int op ws b x1 x2 = case (sumElements (x * w) + b) > 0 of True -> 1 _ -> 0 where x = vector [x1, x2] w = vector [fst ws, snd ws] pAnd x1 x2 = op (0.5, 0.5) (-0.7) x1 x2 pNand x1 x2 = op (-0.5, -0.5) 0.7 x1 x2 pOr x1 x2 = op (0.5, 0.5) (-0.2) x1 x2 pXor x1 x2 = pAnd (fromIntegral s1) (fromIntegral s2) where s1 = pNand x1 x2 s2 = pXor x1 x2
特に説明のしようがない簡素なコードですが、はやくちゃんとしたHaskellのコードをかけるように頑張っていきたいと思います。
ちょっとまずはひと記事ということでササッと書いたブログですが、もう少しまともなことを書けるようすこし考えます。