サンプルの転送
ストリーミングスレッド
実装に入る前に、ストリーミングスレッドについて整理しておきます。
複数の入力ピンを持つフィルタを作る場合、入力ピンごとに異なるスレッドでサンプルを受信する可能性があります。なぜなら、入力ピンに接続されているアップストリームフィルタがストリーミングスレッドを作成する場合があるためです(図3)。
従って、入力ピンごとに一時的なバッファを用意し、可能な分だけそれらを結合し、ダウンストリームフィルタへ送信することになります(図4)。
一時バッファの作成
以上を踏まえて、実装に入っていきましょう。入力ピンから受信したオーディオデータを記憶しておく一時バッファを作成します。
一時バッファは必要な変数をまとめたWavBuffer
という構造体として定義し、それを出力ピンのメンバ変数として持ちます。バッファは出力ピンの接続が完了した時に呼ばれるAudioOutPin::CompleteConnect
で確保し、切断した時に呼ばれるAudioOutPin::Disconnect
で解放します。
// 一時バッファの構造体 struct WavBuffer { DWORD size; //バッファサイズ DWORD ptr; //追記する位置(書き込んだサイズに等しい) BYTE *pData;//確保したバッファへのポインタ }; // AudioOutPinのメンバ変数(入力ピンの数=2だけ用意) WavBuffer m_Buf[2]; // 接続時 HRESULT AudioOutPin::CompleteConnect(IPin *pReceivePin) { WAVEFORMATEXTENSIBLE *wfxe=(WAVEFORMATEXTENSIBLE *)m_mt.pbFormat; const DWORD size=wfxe->Format.nAvgBytesPerSec; for(unsigned i=0;i<_countof(m_Buf);i++) { m_Buf[i].size=size; m_Buf[i].ptr=0; m_Buf[i].pData=new BYTE[size]; } return __super::CompleteConnect(pReceivePin); } // 切断時 HRESULT AudioOutPin::Disconnect() { for(unsigned i=0;i<_countof(m_Buf);i++) { if(m_Buf[i].pData) { delete []m_Buf[i].pData; m_Buf[i].pData=NULL; } } return __super::Disconnect(); }
サンプルの受信
オーディオデータが格納されているサンプルを受信した時に呼ばれるAudioInPin::Receive
を実装しましょう。
まず、基底クラスのReceive
を呼び出しエラーチェックを行います。次に、ストリーム終了通知を受け取っていないか確認します。これはメンバ変数として宣言したm_EOS
で確認します。詳しくは「通知処理の実装」で説明します。最後にAudioOutPin::Pass
にサンプルを渡します。
STDMETHODIMP AudioInPin::Receive(IMediaSample *pSample) { HRESULT hr=__super::Receive(pSample); if(hr!=S_OK) { return hr; } if(m_EOS) { return S_FALSE; } AudioChMuxerFilter *filter=(AudioChMuxerFilter*)m_pFilter; AudioOutPin *out=filter->GetOutPin(); hr=out->Pass(m_Channel, pSample); return hr; }
サンプルの送信
オーディオデータを結合してサンプルとして送信する処理をAudioOutPin::Pass
に実装しましょう。「サンプルの受信」で説明したように、入力ピンがサンプルを受信する度にこのメソッドを呼びます。
まず、出力ピンがダウンストリームフィルタの入力ピンと接続されているか、一時バッファが確保されているかを確認し、入力ピンのメディアタイプを取得します。取得したメディアタイプからWAVEFORMATEXTENSIBLE
を取得しておきます。これは結合処理時に参照します。
HRESULT AudioOutPin::Pass(int channel, IMediaSample *pInputSample) { HRESULT hr=NOERROR; IPin* connected=GetConnected(); if(connected==NULL || m_Buf[channel].pData==NULL) { return VFW_E_RUNTIME_ERROR; } // 入力ピンのメディアタイプを取得 CMediaType mt; AudioChMuxerFilter *pFilter=(AudioChMuxerFilter*)m_pFilter; AudioInPin *pPin=pFilter->GetInPin(0); pPin->ConnectionMediaType(&mt); WAVEFORMATEXTENSIBLE *wfxe=(WAVEFORMATEXTENSIBLE *)mt.Format(); // (...続く)
「入力ピンクラスの実装」で説明したように、2つの入力ピンのメディアタイプが一致することを、AudioInPin::CheckMediaType
で確認しています。従って、送信するサンプルのメディアタイプを決定するには、ここで示したソースコードのように1つ入力ピンから取得すれば十分です。
次に、一時バッファの空き領域を調べます。もし、受信したサンプルのオーディオデータが一時バッファに入りきらないサイズだった場合、もう一方の入力ピンのストリーミングスレッドによって一時バッファが消費されるのを待機します。
// 一時バッファに、追記できる空き領域ができるまで待つ DWORD copy_size=pInputSample->GetActualDataLength(); m_RecvByte[channel] += copy_size; NOTE3("%S channel=%d %I64d bytes received" ,__FUNCTION__, channel, m_RecvByte[channel]); while(true) { if (IsStopped()) { NOTE2("%S channel=%d stop" ,__FUNCTION__, channel); break; } Sleep(0); CAutoLock str_lock(&m_StreamingLock); if (m_Buf[channel].ptr+copy_size <= m_Buf[channel].size) { break; } }
今回のような実装の場合、もう一方のピンは異なるスレッドで動いているということが前提条件になります。同一スレッドでも動作させたい場合は、バッファを拡張して戻るなど、別の仕組みが必要になるでしょう。
また、待機中にフィルタが停止状態になった時は、速やかに待機を中止する必要があります。これはCBasePin::IsStopped
メソッドで確認できます。
一時バッファに空き領域ができたら、オーディオデータを書き込みます。IMediaSample::GetPointer
でオーディオデータへのポインタ、IMediaSample::GetActualSize
でサイズをそれぞれ取得できるので、それをもとに一時バッファへ追記します。これ以降、もう一方のストリーミングスレッドと同時実行しないようにクリティカルセクションで排他制御しなければなりません。
//クリティカルセクション CAutoLock str_lock(&m_StreamingLock); // 一時バッファへ書き込む LPBYTE copy_src; pInputSample->GetPointer(©_src); memcpy(&m_Buf[channel].pData[m_Buf[channel].ptr] ,copy_src, copy_size); m_Buf[channel].ptr+=copy_size; copy_size=min(m_Buf[0].ptr, m_Buf[1].ptr); if(copy_size==0) { // 結合する対象が無かった return S_OK; }
送信するサンプルを作成するためにCBaseOutputPin::GetDeliveryBuffer
を呼び出します。バッファが確保された状態でサンプルが取得できます。
CComPtr<IMediaSample> out_sample; hr=GetDeliveryBuffer(&out_sample, NULL, NULL, 0);
最後に、一時バッファを結合しサンプルを送信します。L/Rそれぞれの一時バッファから読みだし、L、Rの順で送信サンプルのバッファに書き込みます。読み出した分だけ一時バッファを詰めて、空き領域を確保します。COutputQueue::Receive
でサンプルをダウンストリームフィルタへ送信します。
LPBYTE copy_dest; hr=out_sample->GetPointer(©_dest); DWORD out_sample_length=out_sample->GetSize(); copy_size=min(copy_size, out_sample_length); // (...結合処理のコードは省略) // 読み出した分だけバッファを詰める for(unsigned i=0;i<_countof(m_Buf);i++) { memmove(m_Buf[i].pData, &m_Buf[i].pData[copy_size], m_Buf[i].ptr-copy_size); m_Buf[i].ptr-=copy_size; } hr=Deliver(out_sample);