Rustで並行処理に便利なバッファーを実装してみた(後編)。

先日書いた以下の記事の続きです。
Rustで並行処理に便利なバッファーを実装してみた(前編)。 - Ri for (Real Estate|Residence) Influencers

Drop traitとSend traitの組み合わせで、並行処理におけるバッファーのメモリの管理がとても簡単にできるというところまで紹介しましたので、次はバッファーにおけるメソッドをいくつか実装してみようと思います。まずはloadメソッドを紹介します。このメソッドはバッファーの中身のリファレンスを返します。実装は以下のとおりです。

 99 macro_rules! impl_buffer {
100
101     ($name:ident, io::$iot:ident) => {
102
103         #[allow(dead_code)]
104         impl<T: io::$iot> $name<T> {
105
    ....
113
114             pub unsafe fn load(&mut self) -> &mut [u8] {
115                 let ptr = self.inner.load(Ordering::Relaxed);
116                 slice::from_raw_parts_mut(ptr, self.size())
117             }
118         }

実はこの実装を見て、loadメソッドで返すreferenceが何故mutableになっているかが自分でもよく思い出せないのですが(直すの面倒なので今のところ放置..)、ポインターではなくリファレンスを返していることには理由があります。それに言及するにはRust言語の特徴のひとつである、'Lifetime'について述べる必要がありそうです。

Lifetimeを理解するにはまず、(Raw)ポインターとリファレンスがどうちがうかを理解する必要があります。メモリデータの所有権にかかわる話なのですが、説明するのが少し億劫なので以下リンクを参照してください。

https://doc.rust-lang.org/beta/book/references-and-borrowing.html

まあLifetimeをひとことで言うと、あるメモリのリファレンスがどのブロックの中で使えるかを管理するための機能です。例えば、上述のload関数は暗黙的にlifetimeが付加されて、呼び出されたブロックの中でしか返り値となるリファレンスを使うことができません。このリファレンスが(Raw)ポインターだったと仮定してみましょう。(Raw)ポインターとはすなわちc言語でのポインターを指すので、OwnershipとかBorrowingとか関係ないので、呼び出したブロックから外れてもデータを参照することができてしまうことがあります。うっかり変なグローバル変数ポインターを格納してしまった場合、そのメモリが開放されてしまった後もそのデータにアクセスしてしまうことになり、もしその変数が外部から操作できるものであったとしたらプログラムにおける重大な脆弱性となる危険が生まれてしまいます。私はあまり自分のコーディング力を信用していないので、そのチェックをコンパイラーに投げてしまえるのは実はとてもありがたいと思っています。

次にInput用とOutput用のバッファを実装したので、read関数とwrite関数をそれぞれ実装しています。まず以下にてread関数について紹介します。

141 impl<T: io::Read> ReadBuffer<T> {
142
    ...
151
152     pub fn read(&mut self, input: &mut T) -> io::Result<usize> {
153         let buf = unsafe { self.load() };
154         input.read(buf)
155     }
156 }

Read traitについては以下を参照してください。
std::io::Read - Rust

色々な用途に使えるように、Read traitのメソッドを実装したものであればソースから読み込むことができるようにジェネリッククラスにしています。想定されるストリームとしてはファイルやソケット、あとはAlsaなんかだとCapture用途にセットされたPCMデバイスとかのインプットでしょうか。

次にOutput用途のwrite関数です。

160 impl<T: io::Write> WriteBuffer<T> {
161
    ....
180
181     fn write(&mut self, output: &mut T) -> io::Result<usize> {
182         let buf = unsafe { self.load() };
183         output.write(buf)
184     }
185 }

Write traitについてもRead traitと同様です。詳細は以下のリンクを参照してください。
std::io::Write - Rust

ちなみに両方の関数で返り値の型として定義されているResult型は、HaskellでいうEitherモナドみたいな機能を持っています。処理の失敗時のハンドリングができる便利な機能です。まあ、一般的な関数型プログラミングのような記述ができる言語は大概このような機能をもっているので、説明は割愛します。

最後に簡単なテストを書いたので載せておきます。

232 #[cfg(test)]
233 mod test {
234
235     use super::*;
236     use std::fs::File;
237     use std::io::Read;
238     use std::process::Command;
239     use std::str::from_utf8;
240     use std::thread;
241     use std::time::Duration;
242
243     const RIFF : &'static str = "RIFF";
244     const FILEPATH : &'static str = "/usr/share/sounds/k3b_error1.wav";
245     const BUFSIZE : usize = 4;
246     const BUFALIGN : usize = 1;
247     const OUTPUT_FILE : &'static str = "out";
248
249     #[test]
250     fn buffer_test() {
251
252         let mut f = File::open(FILEPATH).unwrap();
253         let mut rbuf = ReadBuffer::<File>::new(BUFSIZE);
254
255         rbuf.read(&mut f).unwrap();
256
257         let slice = unsafe {
258             rbuf.load()
259         };
260
261         let riff = from_utf8(slice)
262             .unwrap();
263
264         assert_eq!(RIFF, riff);
265
266         let mut wbuf = WriteBuffer::<File>::new(slice,
267             BUFALIGN);
268         assert_eq!(4usize, wbuf.size());
269
270         let mut out = File::create(OUTPUT_FILE).unwrap();
271         wbuf.write(&mut out).unwrap();
272
273         let mut ifstream = File::open(OUTPUT_FILE).unwrap();
274         let mut st = String::new();
275         ifstream.read_to_string(&mut st).unwrap();
276         assert_eq!(RIFF, st.as_str());
277
278         Command::new("rm").arg(OUTPUT_FILE)
279             .output()
280             .unwrap();
281     }
    ...
312 }

とりあえずWAVファイルのRIFFヘッダだけ読み込んで読み書きがちゃんとできているかをチェックしています。Rustのユニットテストの書き方は以下を参照してください。
Testing

あと、ここで実装したバッファを使ったNonBlockingIOをいくつか実装したので、気が向いたらブログに書きます。

Rustで並行処理に便利なバッファーを実装してみた(前編)。

私はffmpegとかalsaとかをいじるのが割と好きで(映像とか音源を編集していると自然とそうなる)、両方ともC言語で実装されているためC言語でまずはユーティリティを作ってみることになるのですが、正直、プログラム言語としてC言語は色々なものを自分で一から実装しなければいけないことが多いので、エンジニアでもなくプログラミングもできない私には少しつらいところがあり、C++を使うことになるのですが、C++はC++で依存するライブラリの管理が大変だったり、モダンな実装ができるけれどレガシーな実装もあってわけがわからなくなることが多々あります。じゃあGolangとかでローリングアップデートでライブラリを最新に保ってブイブイいわせりゃええやんという向きもありますが、どうしてもメモリの管理はある程度自分で把握したい、そんなときRustを使うのがオススメです。

Rustってどんなプログラミング言語なの?という話ですが、それはドキュメント見てください。
www.rust-lang.org

映像や音声を扱うときに、IOデバイスに書き出すデータを一時的に保管するバッファーは必ず使うのですが、IOデバイスを操作するときプログラムを並行に処理できるよう実装したくなることがあります。Rustは並行処理をシンプルに記述できるよう様々な仕組みがあって、この記事ではRustを用いた並行処理に便利に使えるバッファーを実装してみたので紹介します。

まずは構造体の定義から。何種類かバッファーを実装するので、マクロで記述してます。

 47 macro_rules! define_buffer {
 48
 49     ($name:ident, io::$iot:ident) => {
 50
 51         #[allow(dead_code)]
 52         pub struct $name<T: io::$iot> {
 53             inner : AtomicPtr<u8>,
 54             size  : usize,
 55             align : usize,
 56             _type : PhantomData<T>
 57         }
 58     }
 59 }

rustのマクロについては上述ドキュメント参照、PhantomDataについては無視してください。並行処理を意識しているためポインターにはAtomicPtrを使用しています。
doc.rust-lang.org

ドキュメントによるとAtomicPtrはThread-safeなraw pointerとのこと。特徴としてAtomicPtrにはSend traitが実装されていることが挙げられます。traitとはJavaでいうinterfaceみたいなもので、Send traitはmarker traitとして使用されていて、marker traitはJavaでいうアノテーションに該当します。Javaでもアノテーションの記述にinterfaceが使われていることから、データに付帯情報を持たせることにtraitが使われていると考えてください。このSend traitが付いていることがRustの並行処理におけるメモリの管理に影響をもたらします。

次にバッファー内部におけるメモリの割り当てについて。

 61 macro_rules! alloc_atomic_buf {
 62     ($size:expr, $align:expr) => {
 63         {
 64             let ptr = unsafe {
 65                  heap::allocate($size, $align)
 66             };
 67             AtomicPtr::new(ptr)
 68         }
 69     }
 70 }
 71
 72 macro_rules! dealloc_atomic_buf {
 73     ($slf:ident) => {
 74         unsafe {
 75             heap::deallocate($slf.inner.load(Ordering::Relaxed),
 76                 $slf.size(),
 77                 $slf.align());
 78         }
 79     }
 80 }

これも面倒なのでマクロで記述していますが、通常の設定ではlibcのmalloc/freeが呼び出されるはずです。上記のマクロを使って、コンストラクタを実装します。

122 impl<T: io::Read> ReadBuffer<T> {
123
124     pub fn new(size: usize) -> Self {
125         ReadBuffer {
126             inner: alloc_atomic_buf!(size, READBUF_ALIGN),
127             size: size,
128             align: READBUF_ALIGN,
129             _type: PhantomData
130         }
131     }
132
    ...

137 }

次にデストラクタ。

 82 macro_rules! impl_buffer {
 83
    ...
102
103         impl<T: io::$iot> Drop for $name<T> {
104             fn drop(&mut self) {
105                 dealloc_atomic_buf!(self);
106             }
107         }
108     }
109 }

RustではデストラクタをDrop traitを用いて実装するのですが、先ほど述べたSend traitと組み合わせて使うと強烈に威力を発揮します。

例えば、

  1 struct A;
  2
  3 impl A {
  4     fn new() -> Self {
  5         A {}
  6     }
  7 }
  8
  9 impl Drop for A {
 10     fn drop(&mut self) {
 11         println!("destroyed.");
 12     }
 13 }
 14
 15 unsafe impl Send for A {}
 16
 17 fn main() {
 18     //let (tx, rx) = std::sync::mpsc::channel();
 19
 20     let handle = std::thread::spawn(move || {
 21         let a = A::new();
 22         //tx.send(a).unwrap();
 23         println!("sent");
 24     });
 25
 26     //let a = rx.recv().unwrap();
 27     println!("received");
 28
 29     handle.join().unwrap();
 30     println!("main thread is done.");
 31 }

というコードを実行したときに、出力は以下のようになります。

$ cargo run
received
sent
destroyed.
main thread is done.

明らかに、まずサブスレッドの終了時にAのインスタンスが破壊されて、その後メインスレッドが終了しています。
では次に、少し上述のコードを変えてみましょう。

  1 struct A;
  2
  3 impl A {
  4     fn new() -> Self {
  5         A {}
  6     }
  7 }
  8
  9 impl Drop for A {
 10     fn drop(&mut self) {
 11         println!("destroyed.");
 12     }
 13 }
 14
 15 unsafe impl Send for A {}
 16
 17 fn main() {
 18     let (tx, rx) = std::sync::mpsc::channel();
 19
 20     let handle = std::thread::spawn(move || {
 21         let a = A::new();
 22         tx.send(a).unwrap();
 23         println!("sent");
 24     });
 25
 26     let a = rx.recv().unwrap();
 27     println!("received");
 28
 29     handle.join().unwrap();
 30     println!("main thread is done.");
 31 }

メッセージパッシングでサブスレッドからメインスレッドにAのインスタンスを渡すようにしました。これを実行すると以下のように出力します。

$ cargo run
sent
received
main thread is done.
destroyed.

このときサブスレッド終了時にAのインスタンスが破壊されずに、メインスレッドが終了した後に破壊されているようです。

Rust言語にはデータの所有権という概念があって、あるデータがどのスコープに所属するかをコンパイラーが管理する仕組みを指します。詳しくはドキュメントのOwnershipの項目を読んでください。
※Ownershipは別にRust特有の概念ではないです、誤解なきようお願いします。

libatomic(Cで記述)なんかではインラインアセンブラでアトミック演算が実装されていることがあるのですが、いまのコンパイラーは賢いので、どのデータをどこまで生かすかというメモリ管理をコンパイラーが判断してくれます。

上記の実行結果を、所有権にフォーカスしてざっくり整理すると、
1。Aのインスタンス(以降aと呼ぶ)がサブスレッドで生成される(所有権はサブスレッド)
2。aの所有権がデータの受け渡しによりメインスレッドに移る
3。サブスレッド終了(aは破壊されない)
4。メイスレッド終了時にaが破壊される。

Rustではスレッド間のデータの受け渡しができるのは、Send traitが実装されている型のみ(実装されていないとコンパイルすることができない)であるようです。
つまり、コンパイラーがrace conditionになるのを未然に防いでくれるというわけです。

余談ですが、'RustはLifetimeがよくわからない'と言われることがたまにあるのですがそんな時私はEffective C++を読むことを勧めています。Effective C++を読んで、Effective Modern C++を読めば絶対に理解できると思います。なぜC++の話を?とキョトンとされますが、C++にしてもそうですがRustが必要な人にはそれなりの理由があって、そこに該当しないなら別に必要ないかもしれないし、もしそうなら学習にかけるコストが無駄になってしまうので、他のことに時間をつかったほうがいいのかもしれません。

(後編に続く)

Spotifyの無料プランのレコメンドについて。

私は音楽のいわゆるライトユーザーで、ものすごくたくさん音源を買い込んだり、自分で演奏したりアレンジしたり、みたいなことはやらないんですが、昨今のストリーミングサービスの盛り上がりにより作業中のBGMには事欠かない。最近ようやくSpotifyが上陸してApple Musicからの有料プランの移行を計画しています。海外からなら使えたので何度も使用したことがあり、他のサービスよりも音楽への愛を感じるからです。具体的には、音楽を聴いてて何が楽しいかって自分の知らないかっこいいサウンドがたくさんあって、その未知との出会いが楽しいんだけど、昔はレコード屋でジャケ買いしたり、クラブで聞いたかっこいい音楽を探したりしたものだが、今はSpotifyみたいなストリーミングサービスが自分のライブラリから学習してガンガン新しい曲やアーティストを紹介してくれます。Apple Musicはそれが弱い。AWAとかLine Musicは残念だけど曲数少なすぎて話にならなかったです。

で、少し無料版でアカウントを育ててから、有料版に切り替えようと計画していたのですが、ひとつ問題がありました。洋楽はデータが豊富なせいかレコメンドの精度が高いんですが、邦楽がもうどうしようもない。Spotifyスマホの無料版は曲を指定して聴くことができなくて、曲をお気に入り登録してそれをもとにレコメンドされた曲が流れる仕組み。電気グルーヴのMAD EBISを登録してまあ近い曲が流れるだろうと思っていたのが大間違いで、なんか変なアイドルの歌が延々と流れ出して早送りもSKIPもできずに最悪な気分で仕事をするはめになりました。(no way, MAD EBISからアイドルの歌は落差が結構でかいのです。笑)

SpotifyAPIとか使ったBotとかAppつくったら楽しそうなんで、作ったら公開します。

”ゼロからつくるDeep Learning"を読むことにした。

近い将来、機械学習を使ったレコメンドが必要になりそうでいま流行りのDeep Learningを勉強することにしました。ソースを読んでもあまり頭に入ってこないたちなのでTensorFlowやChainerなどのフレームワークを使う前に、急がば回れということでオライリー社の「ゼロからつくるDeep Learning」を購入し、昨日届きました。

O'Reilly Japan - ゼロから作るDeep Learning


できるだけ並列処理を意識したいのとHaskellの学習を兼ねて、サンプルコードをHaskellで書き直すことにしました。

環境は以下。

$ uname -a
Linux march 4.7.6-1-ARCH #1 SMP PREEMPT Fri Sep 30 19:28:42 CEST 2016 x86_64 GNU/Linux

$ gcc --version
gcc (GCC) 6.2.1 20160830

$ ghc --version
The Glorious Glasgow Haskell Compilation System, version 8.0.1


次に線形代数とかのライブラリがないと間違いなくきついのでNumpyの代替となるHaskellライブラリとして、hmatrixをまず使うことにしました。そこで以下にてインストール。

$ yaourt -S blas lapack
$ cabal install bmatrix

詳しいインストールの方法は以下を参照してください。(ウィンドウズとか)
hmatrix/INSTALL.md at master · albertoruiz/hmatrix · GitHub


hmatrixの使い方は以下を参照してください。
Numeric.LinearAlgebra


ではまず、pythonのサンプルコードより。

import numpy as np


def AND(x1, x2):
    x = np.array([x1, x2])
    w = np.array([0.5, 0.5])
    b = -0.7
    tmp = np.sum(w*x) + b
    if tmp <= 0:
        return 0
    else:
        return 1

def OR(x1, x2):
    x = np.array([x1, x2])
    w = np.array([0.5, 0.5])
    b = -0.2
    tmp = np.sum(w*x) + b
    if tmp <= 0:
        return 0
    else:
        return 1

def NAND(x1, x2):
    x = np.array([x1, x2])
    w = np.array([-0.5, -0.5])
    b = 0.7
    tmp = np.sum(w*x) + b
    if tmp <= 0:
        return 0
    else:
        return 1

deep-learning-from-scratch/ch02 at master · oreilly-japan/deep-learning-from-scratch · GitHub

から抜粋。

論理回路パーセプトロンアルゴリズムで書いたシンプルなコードです。
ただ、重みとバイアス以外すべて同じコードというところには何か悪意のようなものを感じたのは多分気のせいだと思うことにする。

パーセプトロンとは:

視覚と脳の機能をモデル化したものであり、パターン認識を行う。シンプルなネットワークでありながら学習能力を持つ。1960年代に爆発的なニューラルネットブームを巻き起こしたが、1969年に人工知能学者マービン・ミンスキーらによって線形分離可能なものしか学習できないことが指摘されたことによって下火となった。他の研究者によってさまざまな変種が考案されており、ニューロン階層を多層化し入出力が二値から実数になったボルツマンマシン(1985年)やバックプロパゲーション(1986年)などによって再び注目を集めた。2009年現在でも広く使われている機械学習アルゴリズムの基礎となっている。
(Wikipediaより抜粋)

だそうです。

上のpythonのコードからすこし悪意を取り除いてHaskellで書き直してみました。

import Numeric.LinearAlgebra

op::(Double, Double) -> Double -> Double -> Double -> Int 
op ws b x1 x2 = case (sumElements (x * w) + b) > 0 of
    True -> 1
    _    -> 0
 where x = vector [x1, x2]
       w = vector [fst ws, snd ws]

pAnd  x1 x2 = op (0.5, 0.5) (-0.7) x1 x2
pNand x1 x2 = op (-0.5, -0.5) 0.7 x1 x2
pOr   x1 x2 = op (0.5, 0.5) (-0.2) x1 x2

pXor  x1 x2 = pAnd (fromIntegral s1) (fromIntegral s2)
 where s1 = pNand x1 x2
       s2 = pXor x1 x2  


特に説明のしようがない簡素なコードですが、はやくちゃんとしたHaskellのコードをかけるように頑張っていきたいと思います。
ちょっとまずはひと記事ということでササッと書いたブログですが、もう少しまともなことを書けるようすこし考えます。